matsim-proto/matsim_visualizer.py

121 lines
4.3 KiB
Python
Raw Normal View History

import gzip
import xmltodict
import networkx as nx
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
from collections import defaultdict
import matplotlib.cm as cm
import matplotlib.colors as colors
2024-02-14 16:54:41 -05:00
class MatsimVisualizer():
2024-02-14 16:54:41 -05:00
def __init__(self, network_file_path, events_file_path, output_file_path):
self._nodes = None
self._links = None
self._pos = None
self.norm = None
2024-02-14 16:54:41 -05:00
self._output_file_path = output_file_path
self._network_file_path = network_file_path
self._events_file_path = events_file_path
self._G = nx.Graph()
self._traffic_per_tick = defaultdict(lambda: defaultdict(int))
self._cumulative_traffic = defaultdict(lambda: defaultdict(int))
self._cmap = cm.viridis
def load_data(self):
# Load network data
2024-02-14 16:54:41 -05:00
with gzip.open(self._network_file_path, 'rb') as file:
network_doc = xmltodict.parse(file.read().decode('utf-8'))
# Parse nodes
2024-02-14 16:54:41 -05:00
self._nodes = {node['@id']: (float(node['@x']), float(node['@y'])) for node in
network_doc['network']['nodes']['node']}
# Parse links
2024-02-14 16:54:41 -05:00
self._links = [{
'id': link['@id'],
'from': link['@from'],
'to': link['@to']
} for link in network_doc['network']['links']['link']]
link_state = defaultdict(list)
# Load and parse the events file
2024-02-14 16:54:41 -05:00
with gzip.open(self._events_file_path, 'rb') as file:
events_doc = xmltodict.parse(file.read().decode('utf-8'))
for event in events_doc['events']['event']:
link_id = event.get('@link')
event_type = event.get('@type')
tick = float(event.get('@time'))
vehicle_id = event.get('@vehicle')
if link_id is not None and event_type is not None and tick is not None:
if event_type == 'entered link' or event_type == 'vehicle enters traffic':
2024-02-14 16:54:41 -05:00
self._traffic_per_tick[tick][link_id] += 1
link_state[link_id].append(vehicle_id)
elif event_type == 'left link' or event_type == 'vehicle leaves traffic':
2024-02-14 16:54:41 -05:00
self._traffic_per_tick[tick][link_id] -= 1
link_state[link_id].remove(vehicle_id)
2024-02-14 16:54:41 -05:00
for link in self._links:
self._cumulative_traffic[0][link['id']] = 0
# Accumulate the counts to get the total number of vehicles on each link up to each tick
actual_tick = 0
2024-02-14 16:54:41 -05:00
sorted_ticks = sorted(self._traffic_per_tick.keys())
for tick in sorted_ticks:
2024-02-14 16:54:41 -05:00
if actual_tick not in self._cumulative_traffic:
# Start with the vehicle counts of the previous tick
2024-02-14 16:54:41 -05:00
self._cumulative_traffic[actual_tick] = defaultdict(int, self._cumulative_traffic.get(actual_tick - 1, {}))
# Apply the changes recorded for the current tick
2024-02-14 16:54:41 -05:00
for link_id, change in self._traffic_per_tick[tick].items():
self._cumulative_traffic[actual_tick][link_id] += change
actual_tick += 1 # Move to the next tick
def create_graph(self):
2024-02-14 16:54:41 -05:00
for node_id, coords in self._nodes.items():
self._G.add_node(node_id, pos=coords)
for link in self._links:
self._G.add_edge(link['from'], link['to'])
self._pos = nx.get_node_attributes(self._G, 'pos')
def setup_color_mapping(self):
# Find max traffic to setup the normalization instance
2024-02-14 16:54:41 -05:00
max_traffic = max(max(self._cumulative_traffic[tick].values()) for tick in self._cumulative_traffic)
self.norm = colors.Normalize(vmin=0, vmax=max_traffic)
def update(self, frame_number):
2024-02-14 16:54:41 -05:00
tick = sorted(self._cumulative_traffic.keys())[frame_number]
traffic_data = self._cumulative_traffic[tick]
2024-02-14 16:54:41 -05:00
edge_colors = [self._cmap(self.norm(traffic_data.get(link['id'], 0))) for link in self._links]
2024-02-14 17:26:20 -05:00
edge_widths = [1 + self.norm(traffic_data.get(link['id'], 0)) * 10 for link in self._links]
plt.cla()
2024-02-14 16:54:41 -05:00
nx.draw(self._G, self._pos, node_size=0, node_color='blue', width=edge_widths, edge_color=edge_colors,
with_labels=False,
edge_cmap=self._cmap)
plt.title(f"Time: {tick}")
def visualize(self):
self.load_data()
self.create_graph()
self.setup_color_mapping()
fig, ax = plt.subplots()
2024-02-14 16:54:41 -05:00
sm = plt.cm.ScalarMappable(cmap=self._cmap, norm=self.norm)
sm.set_array([])
plt.colorbar(sm, ax=ax, label='Traffic Density')
2024-02-14 16:54:41 -05:00
ani = FuncAnimation(fig, self.update, frames=len(self._cumulative_traffic), repeat=False)
ani.save(f"{self._output_file_path}/traffic_animation.gif", writer='ffmpeg', fps=5)
plt.show()