matsim-proto/matsim_visualizer.py
2024-02-15 14:32:49 -05:00

142 lines
4.3 KiB
Python

import gzip
from lxml import etree
import networkx as nx
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
from collections import defaultdict
import matplotlib.cm as cm
import matplotlib.colors as colors
class MatsimVisualizer():
def __init__(self, network_file_path, events_file_path, output_file_path):
self._nodes = None
self._links = None
self._pos = None
self.norm = None
self._output_file_path = output_file_path
self._network_file_path = network_file_path
self._events_file_path = events_file_path
self._G = nx.Graph()
self._traffic_per_tick = defaultdict(lambda: defaultdict(int))
self._cmap = cm.viridis
self._tick = 0
self._max_traffic = 0
def load_data(self):
# ====== LOAD NETWORK ====== #
with gzip.open(self._network_file_path, 'rb') as file:
network_doc = etree.parse(file)
self._nodes = {node.get('id'): (float(node.get('x')), float(node.get('y')))
for node in network_doc.xpath('/network/nodes/node')}
self._links = [{
'id': link.get('id'),
'from': link.get('from'),
'to': link.get('to'),
'vehicles': 0,
} for link in network_doc.xpath('/network/links/link')]
seen_links = set()
cumulative_traffic = defaultdict(int)
# ====== LOAD EVENTS ====== #
with gzip.open(self._events_file_path, 'rb') as file:
events_doc = etree.parse(file)
self._tick = 0
last_time = None
for event in events_doc.xpath('/events/event'):
link_id = event.get('link')
event_type = event.get('type')
time = float(event.get('time'))
vehicle_id = event.get('vehicle')
ticked = False
if link_id is not None and event_type is not None and time is not None:
if event_type in ['entered link', 'vehicle enters traffic']:
self._traffic_per_tick[self._tick][link_id] += 1
seen_links.add(link_id)
cumulative_traffic[link_id] += 1
if self._max_traffic < cumulative_traffic[link_id]:
self._max_traffic = cumulative_traffic[link_id]
ticked = True
elif event_type in ['left link', 'vehicle leaves traffic']:
self._traffic_per_tick[self._tick][link_id] -= 1
cumulative_traffic[link_id] -= 1
ticked = True
if ticked and last_time is not None and last_time != time:
self._tick += 1
last_time = time
# ====== FILTER LINKS ====== #
unique_links = []
for link in self._links:
if link['id'] in seen_links:
unique_links.append(link)
self._links = unique_links
# ====== FILTER NODES ====== #
seen_nodes = set()
for link in self._links:
seen_nodes.add(link['from'])
seen_nodes.add(link['to'])
filtered_nodes = {node_id: self._nodes[node_id] for node_id in seen_nodes if node_id in self._nodes}
self._nodes = filtered_nodes
def create_graph(self):
for node_id, coords in self._nodes.items():
self._G.add_node(node_id, pos=coords)
for link in self._links:
self._G.add_edge(link['from'], link['to'])
self._pos = nx.get_node_attributes(self._G, 'pos')
def setup_color_mapping(self):
self.norm = colors.Normalize(vmin=0, vmax=self._max_traffic)
def update(self, frame_number):
traffic_change = self._traffic_per_tick[self._tick]
edge_colors = []
edge_widths = []
for link in self._links:
for link_id, change in traffic_change.items():
if link_id == link['id']:
link['vehicles'] += change
break
edge_colors.append(self._cmap(link['vehicles']))
edge_widths.append(1 + self.norm(link['vehicles'])*10)
plt.cla()
nx.draw(self._G, self._pos, node_size=0, node_color='blue', width=edge_widths, edge_color=edge_colors,
with_labels=False,
edge_cmap=self._cmap)
plt.title(f"Time: {self._tick}")
self._tick += 1
def visualize(self):
self.load_data()
self.create_graph()
self.setup_color_mapping()
fig, ax = plt.subplots()
sm = plt.cm.ScalarMappable(cmap=self._cmap, norm=self.norm)
sm.set_array([])
plt.colorbar(sm, ax=ax, label='Traffic Density')
self._tick = 0
ani = FuncAnimation(fig, self.update, frames=len(self._traffic_per_tick), repeat=False)
ani.save(f"{self._output_file_path}/traffic_animation.gif", writer='ffmpeg', fps=5)
plt.show()