system_assignation/DistrictHeatingNetworkCreator.py

210 lines
9.4 KiB
Python
Raw Normal View History

import json
import geopandas as gpd
import matplotlib.pyplot as plt
from shapely.geometry import Polygon, Point, LineString, MultiPoint
import networkx as nx
class DistrictHeatingNetworkCreator:
def __init__(self, buildings_file, roads_file):
"""
Initialize the class with paths to the buildings and roads data files.
:param buildings_file: Path to the GeoJSON file containing building data.
:param roads_file: Path to the Shapefile containing road data.
"""
self.buildings_file = buildings_file
self.roads_file = roads_file
def run(self):
"""
Main method to execute the district heating network creation process.
:return: NetworkX graph with nodes and edges representing the network.
"""
self._load_and_process_data()
self._find_nearest_roads()
self._process_intersections()
network_graph = self._create_network_graph()
return network_graph
def _load_and_process_data(self):
"""
Load and process the building and road data.
"""
# Load building data
with open(self.buildings_file, 'r') as file:
city = json.load(file)
# Extract centroids and building IDs from building data
centroids = []
building_ids = [] # List to store building IDs
buildings = city['features']
for building in buildings:
coordinates = building['geometry']['coordinates'][0]
building_polygon = Polygon(coordinates)
centroid = building_polygon.centroid
centroids.append(centroid)
building_ids.append(building['id']) # Extract building ID
# Convert centroids to a GeoDataFrame and include building IDs
self.centroids_gdf = gpd.GeoDataFrame({
'geometry': [Point(centroid.x, centroid.y) for centroid in centroids],
'building_id': building_ids # Add building IDs as a column
}, crs='EPSG:4326')
# Load road data
self.gdf_road = gpd.read_file(self.roads_file)
# Ensure centroids are in the same CRS as roads
self.centroids_gdf = self.centroids_gdf.to_crs(self.gdf_road.crs)
def _find_nearest_roads(self):
"""
Find the nearest road for each building centroid.
"""
# Process road geometries
self.gdf_clean = gpd.GeoDataFrame(
{'geometry': [LineString([coord for coord in line.coords]) for line in self.gdf_road.geometry]})
# Find the nearest road line and point for each centroid
self.closest_linestrings = []
self.nearest_points = []
for centroid in self.centroids_gdf.geometry:
closest_road = min(self.gdf_clean.geometry, key=lambda x: x.distance(centroid))
self.closest_linestrings.append(closest_road)
nearest_point = closest_road.interpolate(closest_road.project(centroid))
self.nearest_points.append(nearest_point)
def _process_intersections(self):
"""
Process intersections and create final geometries.
"""
# Create additional GeoDataFrames for points and nearest points
self.gdf_pts = gpd.GeoDataFrame(
{'geometry': [Point(coord) for line in self.gdf_clean.geometry for coord in line.coords]})
self.gdf_pts2 = gpd.GeoDataFrame({'geometry': self.nearest_points})
# Combine nearest points and road points into one GeoDataFrame
self.gdf_pts3 = gpd.GeoDataFrame({'geometry': self.nearest_points + list(self.gdf_pts.geometry)})
# Identify intersections and create LineStrings based on intersections
self.gdf_clean["intersect"] = [
[y for y in range(len(self.gdf_pts2)) if self.gdf_pts2.geometry[y].distance(geom) <= 1.0] for geom in
self.gdf_clean.geometry]
self.gdf_cleaner = self.gdf_clean[self.gdf_clean["intersect"].apply(len).gt(0)].reset_index(drop=True)
self.test_list = []
for idx, row in self.gdf_cleaner.iterrows():
for i in range(len(row["intersect"]) + 1):
if i == 0:
self.test_list.append(
LineString([row['geometry'].coords[0], self.gdf_pts3.geometry[row['intersect'][i]]]))
elif i < len(row['intersect']):
self.test_list.append(LineString(
[self.gdf_pts3.geometry[row['intersect'][i - 1]], self.gdf_pts3.geometry[row['intersect'][i]]]))
else:
self.test_list.append(
LineString([self.gdf_pts3.geometry[row['intersect'][i - 1]], row['geometry'].coords[-1]]))
self.gdf_cleanest = gpd.GeoDataFrame({'geometry': self.test_list})
points = [coord for geom in self.gdf_cleanest.geometry for coord in geom.coords]
gdf_pts_cnt = self.gdf_pts.copy()
gdf_pts_cnt["count"] = gdf_pts_cnt.geometry.apply(lambda x: points.count(x.coords[0]))
gdf_pts_reset = gdf_pts_cnt[gdf_pts_cnt["count"] > 1].reset_index(drop=True)
gdf_pts_drop = gdf_pts_cnt[gdf_pts_cnt["count"] <= 1].reset_index(drop=True)
# Remove unnecessary geometries from gdf_cleanest
for idx, geom in self.gdf_cleanest.iterrows():
for coord in geom.geometry.coords:
if coord in [pt.coords[0] for pt in gdf_pts_drop.geometry]:
self.gdf_cleanest = self.gdf_cleanest.drop(idx)
self.gdf_cleanest.reset_index(drop=True, inplace=True)
def _create_network_graph(self):
"""
Create a NetworkX graph from the processed geospatial data.
:return: A NetworkX graph representing the district heating network.
"""
G = nx.Graph()
# Convert centroids to EPSG:4326 for Google Maps compatibility
for idx, row in self.centroids_gdf.iterrows():
building_name = f"Building_{idx + 1}"
G.add_node((row.geometry.x, row.geometry.y),
type='centroid',
name=building_name,
building_id=row['building_id']) # Add building ID as an attribute
for point in self.nearest_points:
G.add_node((point.x, point.y), type='nearest_point')
# Add edges with lengths as weights for the road network
for line in self.gdf_cleanest.geometry:
length = line.length
if isinstance(line.boundary, MultiPoint):
# Handle MultiPoint boundary
points = list(line.boundary.geoms)
for i in range(len(points) - 1):
start_point = points[i]
end_point = points[i + 1]
G.add_edge((start_point.x, start_point.y), (end_point.x, end_point.y), weight=length)
else:
# Handle typical case with two endpoints
start_point, end_point = line.boundary
G.add_edge((start_point.x, start_point.y), (end_point.x, end_point.y), weight=length)
# Add edges connecting nearest points to their centroids
for point, centroid in zip(self.nearest_points, self.centroids_gdf.geometry):
distance = point.distance(centroid)
G.add_edge((point.x, point.y), (centroid.x, centroid.y), weight=distance)
return G
def plot_network_graph(self, network_graph):
"""
Plot the network graph using matplotlib and networkx.
:param network_graph: The NetworkX graph to be plotted.
"""
plt.figure(figsize=(12, 12))
pos = {node: (node[0], node[1]) for node in network_graph.nodes()}
# Draw nodes and edges
nx.draw_networkx_nodes(network_graph, pos, node_color='blue', node_size=50)
nx.draw_networkx_edges(network_graph, pos, edge_color='gray')
# Create a dictionary for node labels for centroids only
node_labels = {node: data['name'] for node, data in network_graph.nodes(data=True) if
data.get('type') == 'centroid'}
# Adjust node label positions to reduce overlap
label_pos = {node: (coords[0], coords[1] + 0.03) for node, coords in pos.items()} # Shift labels up
# Draw node labels for centroids
nx.draw_networkx_labels(network_graph, label_pos, labels=node_labels, font_size=8, verticalalignment='bottom')
plt.title('District Heating Network Graph')
plt.axis('off')
plt.show()
def graph_enrich(self, network_graph, city):
"""
Enrich the nodes in the network graph with attributes from corresponding buildings in the city.
:param network_graph: NetworkX graph object representing the network.
:param city: Hub's City object.
"""
for building in city.buildings:
for node, attrs in network_graph.nodes(data=True):
if attrs.get('type') == 'centroid' and attrs.get('building_id') == building.name:
network_graph.nodes[node]['building_cooling_peak_load'] = building.cooling_peak_load
network_graph.nodes[node]['building_cooling_demand'] = building.cooling_demand
network_graph.nodes[node]['building_domestic_hot_water_demand'] = (
building.domestic_hot_water_heat_demand)
network_graph.nodes[node]['building_heating_demand'] = building.heating_demand
network_graph.nodes[node]['building_heating_peak_load'] = building.heating_peak_load
network_graph.nodes[node]['building_external_temperature'] = building.external_temperature