matsim-proto/matsim.py

292 lines
9.9 KiB
Python
Raw Normal View History

import math
import subprocess
import gzip
import shutil
import geopandas as gpd
from shapely.geometry import Point
import hub.helpers.constants as cte
from lxml import etree
CONFIG_DTD = "http://www.matsim.org/files/dtd/config_v2.dtd"
FACILITIES_DTD = "http://www.matsim.org/files/dtd/facilities_v1.dtd"
POPULATION_DTD = "http://www.matsim.org/files/dtd/population_v5.dtd"
# TODO: remove xmltodict completely and replace with lxml as it doesnt allow for repeated mixed ordered tags
class Matsim:
def __init__(self, city, output_file_path):
self._city = city
self._output_file_path = output_file_path
self._facilities = {
'name': self._city.name + ' Facilities',
'facility': []
}
def _export(self):
self._export_facilities()
self._export_network()
self._export_population()
self._export_config()
def _export_facilities(self):
buildings_shape_data = {
'id': [],
'geometry': []
}
facilities_xml = etree.Element('facilities', name=self._facilities['name'])
for building in self._city.buildings:
for surface in building.grounds:
for coord in surface.solid_polygon.coordinates:
buildings_shape_data['id'].append(f"{building.name}")
buildings_shape_data['geometry'].append(Point(coord[0], coord[1]))
facility = {
'id': building.name,
'x': str(building.centroid[0]),
'y': str(building.centroid[1]),
'activity': []
}
if len(building.thermal_zones_from_internal_zones) > 1:
raise NotImplementedError("multi-zone buildings aren't yet supported")
building_schedules = []
capacity = 0
for thermal_zone in building.thermal_zones_from_internal_zones:
capacity = thermal_zone.occupancy.occupancy_density * building.floor_area * building.storeys_above_ground
for schedule in thermal_zone.occupancy.occupancy_schedules:
building_schedules.append(schedule)
activity_info = {
'type': building.function,
'capacity': math.ceil(capacity),
'opentime': _convert_schedules(building_schedules)
}
facility_xml = etree.SubElement(facilities_xml, 'facility', {
'id': facility['id'],
'x': facility['x'],
'y': facility['y'],
})
activity_xml = etree.SubElement(facility_xml, 'activity', {
'type': activity_info['type']
})
etree.SubElement(activity_xml, 'capacity', {
'value': activity_info['capacity']
})
etree.SubElement(activity_xml, 'opentime', {
'day': activity_info['opentime'][0]['day'],
'start_time': activity_info['opentime'][0]['start_time'],
'end_time': activity_info['opentime'][0]['end_time']
})
facility['activity'].append(activity_info)
self._facilities['facility'].append(facility)
gdf = gpd.GeoDataFrame(
buildings_shape_data,
crs=self._city.srs_name
)
gdf.to_file("input_files/buildings_shapefile.shp")
# Convert the Python dictionary to an XML string
xml_content = etree.tostring(facilities_xml, pretty_print=True, encoding='UTF-8', xml_declaration=True).decode('utf-8')
# Write the XML to the file
output_file = f"{self._output_file_path}/{self._city.name}_facilities.xml"
with open(output_file, 'w') as file:
file.write("<?xml version=\"1.0\" encoding=\"utf-8\"?>")
file.write(f"<!DOCTYPE facilities SYSTEM \"{FACILITIES_DTD}\">")
file.write(xml_content)
with open(output_file, 'rb') as f_in:
with gzip.open(output_file + '.gz', 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
def _export_network(self):
java_path = "java"
jar_path = "matsim-network-from-osm.jar"
command = [
java_path,
"-jar", jar_path,
"input_files/merged-network.osm.pbf",
"input_files/buildings_shapefile.shp",
f"{self._output_file_path}/{self._city.name}_network.xml.gz"
]
subprocess.run(command)
def _export_population(self):
population = etree.Element("population")
id = 0
# Generate work facilities
work = []
for facility in self._facilities['facility']:
if facility['activity'][0]['type'] != cte.RESIDENTIAL:
work.append({
'type': facility['activity'][0]['type'],
'capacity': int(facility['activity'][0]['capacity']),
'facility': facility['id'],
'x': facility['x'],
'y': facility['y'],
'start_time': '08:00:00',
'end_time': '18:00:00'
})
# Generate the population from residential places first
current_work = 0
for facility in self._facilities['facility']:
if facility['activity'][0]['type'] == cte.RESIDENTIAL:
max_capacity = int(facility['activity'][0]['capacity']['value'])
for _ in range(max_capacity):
person = etree.SubElement(population, 'person', {
'id': str(id),
'sex': 'm',
'age': '32',
'car_avail': 'always',
'employed': 'yes',
})
plan = etree.SubElement(person, 'plan', {'selected': 'yes'})
# Residential activity
etree.SubElement(plan, 'act', {
'type': facility['activity'][0]['type'],
'facility': facility['id'],
'x': facility['x'],
'y': facility['y'],
'end_time': '7:30:00'
})
# Leg to work
etree.SubElement(plan, 'leg', {'mode': 'car'})
# Work activity
etree.SubElement(plan, 'act', {
'type': work[current_work]['type'],
'facility': work[current_work]['facility'],
'x': work[current_work]['x'],
'y': work[current_work]['y'],
'start_time': work[current_work]['start_time'],
'end_time': work[current_work]['end_time'],
})
# Leg to home
etree.SubElement(plan, 'leg', {'mode': 'car'})
# Residential activity (return)
etree.SubElement(plan, 'act', {
'type': facility['activity'][0]['type'],
'facility': facility['id'],
'x': facility['x'],
'y': facility['y'],
})
work[current_work]['capacity'] -= 1
if work[current_work]['capacity'] == 0:
current_work += 1
id += 1
# Convert the Python dictionary to an XML string
xml_content = etree.tostring(population, pretty_print=True, encoding='UTF-8', xml_declaration=True).decode('utf-8')
# Write the XML to the file
output_file = f"{self._output_file_path}/{self._city.name}_population.xml"
with open(output_file, 'w') as file:
file.write(xml_content)
with open(output_file, 'rb') as f_in:
with gzip.open(output_file + '.gz', 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
def _export_config(self):
parameterset = []
for facility in self._facilities['facility']:
if facility['activity'][0]['@type'] == cte.RESIDENTIAL:
parameterset.append({'@type':'activityParams', 'param': [
{'@name':'activityType','@value':facility['activity'][0]['@type']},
{'@name':'typicalDuration', '@value' : '12:00:00'},
{'@name':'priority','@value': '1'}
]})
else:
parameterset.append({'@type':'activityParams', 'param': [
{'@name': 'activityType', '@value': facility['activity'][0]['@type']},
{'@name':'openingTime','@value': '08:00:00'},
{'@name':'closingTime','@value': '18:00:00'},
{'@name':'typicalDuration','@value': '08:00:00'},
{'@name':'priority','@value': '1'}
]
})
config = {
'module': [
{'@name': 'network', 'param': {'@name':'inputNetworkFile', '@value': f"{self._city.name}_network.xml.gz"}},
{'@name': 'plans', 'param': {'@name':'inputPlansFile', '@value': f"{self._city.name}_population.xml.gz"}},
{'@name': 'facilities', 'param': {'@name':'inputFacilitiesFile', '@value': f"{self._city.name}_facilities.xml.gz"}},
{'@name': 'controler', 'param': [
{'@name': 'outputDirectory', '@value': '/output'},
{'@name': 'firstIteration', '@value': '0'},
{'@name': 'lastIteration', '@value': '10'},
]},
{'@name': 'qsim', 'param': [
{'@name': 'startTime', '@value': '00:00:00'},
{'@name': 'endTime', '@value': '00:00:00'},
{'@name': 'snapshotperiod', '@value': '00:00:00'},
]},
{'@name':'planCalcStore','param':[
{'@name':'learningRate', '@value':'1.0'},
{'@name':'BrainExpBeta', '@value':'2.0'},
{'@name':'lateArrival', '@value':'-18'},
{'@name':'earlyDeparture', '@value':'-0'},
{'@name':'performing', '@value':'+6'},
{'@name':'waiting', '@value':'-0'},
],'parameterset': parameterset},
{'@name':'strategy','param':[
{'@name': 'maxAgentPlanMemorySize', '@value': '5'},
{'@name': 'ModuleProbability_1', '@value': '0.9'},
{'@name': 'Module_1', '@value': 'BestScore'},
{'@name': 'ModuleProbability_2', '@value': '0.1'},
{'@name': 'Module_2', '@value': 'ReRoute'},
]}
]
}
xml_content = xmltodict.unparse({'config': config}, pretty=True, short_empty_elements=True)
with open(f"{self._output_file_path}/{self._city.name}_config.xml", 'w') as file:
file.write(xml_content)
def _convert_schedules(building_schedules):
converted_schedules = []
for schedule in building_schedules:
opening_hour = 0
closing_hour = 0
for i, value in enumerate(schedule.values):
if value > 0:
opening_hour = i
break
for i, value in reversed(list(enumerate(schedule.values))):
if value > 0:
closing_hour = i
break
for day in schedule.day_types:
if day[0:3] != 'hol':
converted_schedules.append({
'day': day[0:3],
'start_time': opening_hour,
'end_time': closing_hour
})
return converted_schedules