import gzip from lxml import etree import networkx as nx import matplotlib.pyplot as plt from matplotlib.animation import FuncAnimation from collections import defaultdict import matplotlib.cm as cm import matplotlib.colors as colors class MatsimVisualizer(): def __init__(self, network_file_path, events_file_path, output_file_path, viewport): self._nodes = {} self._links = [] self._pos = None self.norm = None self._max_x = None self._min_x = None self._max_y = None self._min_y = None self._viewport = viewport self._output_file_path = output_file_path self._network_file_path = network_file_path self._events_file_path = events_file_path self._G = nx.Graph() self._traffic_per_tick = defaultdict(lambda: defaultdict(int)) self._cmap = cm.viridis self._tick = 0 self._max_traffic = 0 self._max_traffic_link = None self._max_traffic_tick = None def load_data(self): # ====== LOAD NETWORK ====== # with gzip.open(self._network_file_path, 'rb') as file: network_doc = etree.parse(file) for node in network_doc.xpath('/network/nodes/node'): x = float(node.get('x')) y = float(node.get('y')) if self._max_x is None or x > self._max_x: self._max_x = x if self._min_x is None or x < self._min_x: self._min_x = x if self._max_y is None or y > self._max_y: self._max_y = y if self._min_y is None or y < self._min_y: self._min_y = y if self._viewport[0] < x < self._viewport[2] and self._viewport[1] > y > self._viewport[3]: self._nodes[node.get('id')] = (x, y) for link in network_doc.xpath('/network/links/link'): start_node = link.get('from') end_node = link.get('to') id = link.get('id') if start_node in self._nodes and end_node in self._nodes: self._links.append({ 'id': id, 'from': start_node, 'to': end_node, 'vehicles': 0, }) cumulative_traffic = defaultdict(int) # ====== LOAD EVENTS ====== # with gzip.open(self._events_file_path, 'rb') as file: events_doc = etree.parse(file) self._tick = 0 for event in events_doc.xpath('/events/event'): link_id = event.get('link') event_type = event.get('type') time = float(event.get('time')) ticked = False if link_id is not None and event_type is not None and time is not None: if event_type in ['entered link', 'vehicle enters traffic']: self._traffic_per_tick[self._tick][link_id] += 1 ticked = True cumulative_traffic[link_id] += 1 # We need to find the maximum value for traffic at any given point for the heatmap if self._max_traffic < cumulative_traffic[link_id]: self._max_traffic = cumulative_traffic[link_id] self._max_traffic_link = link_id self._max_traffic_tick = self._tick elif event_type in ['left link', 'vehicle leaves traffic']: self._traffic_per_tick[self._tick][link_id] -= 1 cumulative_traffic[link_id] -= 1 ticked = True if ticked: self._tick += 1 def create_graph(self): for node_id, coords in self._nodes.items(): self._G.add_node(node_id, pos=coords) for link in self._links: self._G.add_edge(link['from'], link['to']) self._pos = nx.get_node_attributes(self._G, 'pos') def setup_color_mapping(self): self.norm = colors.Normalize(vmin=0, vmax=self._max_traffic) def update(self, frame): traffic_change = self._traffic_per_tick[self._tick] edge_colors = [] edge_widths = [] for link in self._links: for link_id, change in traffic_change.items(): if link_id == link['id']: link['vehicles'] += change edge_colors.append(self._cmap(link['vehicles'])) edge_widths.append(1 + self.norm(link['vehicles']) * 10) plt.cla() nx.draw(self._G, self._pos, node_size=0, node_color='blue', width=edge_widths, edge_color=edge_colors, with_labels=False, edge_cmap=self._cmap) plt.title(f"Time: {self._tick}") self._tick += 1 def visualize(self): self.load_data() self.create_graph() self.setup_color_mapping() fig, ax = plt.subplots() ax.set_aspect((self._max_x - self._min_x) / (self._max_y - self._min_y)) sm = plt.cm.ScalarMappable(cmap=self._cmap, norm=self.norm) sm.set_array([]) plt.colorbar(sm, ax=ax, label='Traffic Density') self._tick = 0 ani = FuncAnimation(fig, self.update, frames=len(self._traffic_per_tick), repeat=False) ani.save(f"{self._output_file_path}/traffic_animation.gif", writer='ffmpeg', fps=5) plt.show() print(f"Largest amount of vehicles ({self._max_traffic}) seen at {self._max_traffic_tick} on link {self._max_traffic_link}")