matsim-proto/matsim_visualizer.py

114 lines
4.1 KiB
Python
Raw Normal View History

import gzip
import xmltodict
import networkx as nx
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
from collections import defaultdict
import matplotlib.cm as cm
import matplotlib.colors as colors
class MatSimVisualizer():
def __init__(self, network_file_path, events_file_path):
self.network_file_path = network_file_path
self.events_file_path = events_file_path
self.G = nx.Graph()
self.pos = None
self.traffic_per_tick = defaultdict(lambda: defaultdict(int))
self.cumulative_traffic = defaultdict(lambda: defaultdict(int))
self.cmap = cm.viridis
self.norm = None
def load_data(self):
# Load network data
with gzip.open(self.network_file_path, 'rb') as file:
network_doc = xmltodict.parse(file.read().decode('utf-8'))
# Parse nodes
self.nodes = {node['@id']: (float(node['@x']), float(node['@y'])) for node in
network_doc['network']['nodes']['node']}
# Parse links
self.links = [{
'id': link['@id'],
'from': link['@from'],
'to': link['@to']
} for link in network_doc['network']['links']['link']]
link_state = defaultdict(list)
# Load and parse the events file
with gzip.open(self.events_file_path, 'rb') as file:
events_doc = xmltodict.parse(file.read().decode('utf-8'))
for event in events_doc['events']['event']:
link_id = event.get('@link')
event_type = event.get('@type')
tick = float(event.get('@time'))
vehicle_id = event.get('@vehicle')
if link_id is not None and event_type is not None and tick is not None:
if event_type == 'entered link' or event_type == 'vehicle enters traffic':
self.traffic_per_tick[tick][link_id] += 1
link_state[link_id].append(vehicle_id)
elif event_type == 'left link' or event_type == 'vehicle leaves traffic':
self.traffic_per_tick[tick][link_id] -= 1
link_state[link_id].remove(vehicle_id)
for link in self.links:
self.cumulative_traffic[0][link['id']] = 0
# Accumulate the counts to get the total number of vehicles on each link up to each tick
actual_tick = 0
sorted_ticks = sorted(self.traffic_per_tick.keys())
for tick in sorted_ticks:
if actual_tick not in self.cumulative_traffic:
# Start with the vehicle counts of the previous tick
self.cumulative_traffic[actual_tick] = defaultdict(int, self.cumulative_traffic.get(actual_tick - 1, {}))
# Apply the changes recorded for the current tick
for link_id, change in self.traffic_per_tick[tick].items():
self.cumulative_traffic[actual_tick][link_id] += change
actual_tick += 1 # Move to the next tick
def create_graph(self):
for node_id, coords in self.nodes.items():
self.G.add_node(node_id, pos=coords)
for link in self.links:
self.G.add_edge(link['from'], link['to'])
self.pos = nx.get_node_attributes(self.G, 'pos')
def setup_color_mapping(self):
# Find max traffic to setup the normalization instance
max_traffic = max(max(self.cumulative_traffic[tick].values()) for tick in self.cumulative_traffic)
self.norm = colors.Normalize(vmin=0, vmax=max_traffic)
def update(self, frame_number):
tick = sorted(self.cumulative_traffic.keys())[frame_number]
traffic_data = self.cumulative_traffic[tick]
edge_colors = [self.cmap(self.norm(traffic_data.get(link['id'], 0))) for link in self.links]
edge_widths = [2 + self.norm(traffic_data.get(link['id'], 0)) * 3 for link in self.links]
plt.cla()
nx.draw(self.G, self.pos, node_size=10, node_color='blue', width=edge_widths, edge_color=edge_colors, with_labels=False,
edge_cmap=self.cmap)
plt.title(f"Time: {tick}")
def visualize(self):
self.load_data()
self.create_graph()
self.setup_color_mapping()
fig, ax = plt.subplots()
sm = plt.cm.ScalarMappable(cmap=self.cmap, norm=self.norm)
sm.set_array([])
plt.colorbar(sm, ax=ax, label='Traffic Density')
ani = FuncAnimation(fig, self.update, frames=len(self.cumulative_traffic), repeat=False)
ani.save('traffic_animation.gif', writer='ffmpeg', fps=5)
plt.show()