district_heating_network_an.../DistrictHeatingNetworkCreator.py

187 lines
8.3 KiB
Python

import json
import geopandas as gpd
import matplotlib.pyplot as plt
from shapely.geometry import Polygon, Point, LineString, MultiPoint
import networkx as nx
class DistrictHeatingNetworkCreator:
def __init__(self, buildings_file, roads_file):
"""
Initialize the class with paths to the buildings and roads data files.
:param buildings_file: Path to the GeoJSON file containing building data.
:param roads_file: Path to the Shapefile containing road data.
"""
self.buildings_file = buildings_file
self.roads_file = roads_file
def run(self):
"""
Main method to execute the district heating network creation process.
:return: NetworkX graph with nodes and edges representing the network.
"""
self._load_and_process_data()
self._find_nearest_roads()
self._process_intersections()
network_graph = self._create_network_graph()
return network_graph
def _load_and_process_data(self):
"""
Load and process the building and road data.
"""
# Load building data
with open(self.buildings_file, 'r') as file:
city = json.load(file)
# Extract centroids from building data
centroids = []
buildings = city['features']
for building in buildings:
coordinates = building['geometry']['coordinates'][0]
building_polygon = Polygon(coordinates)
centroid = building_polygon.centroid
centroids.append(centroid)
# Convert centroids to a GeoDataFrame
self.centroids_gdf = gpd.GeoDataFrame(geometry=[Point(centroid.x, centroid.y) for centroid in centroids],
crs='EPSG:4326')
# Load road data
self.gdf_road = gpd.read_file(self.roads_file)
# Ensure centroids are in the same CRS as roads
self.centroids_gdf = self.centroids_gdf.to_crs(self.gdf_road.crs)
def _find_nearest_roads(self):
"""
Find the nearest road for each building centroid.
"""
# Process road geometries
self.gdf_clean = gpd.GeoDataFrame(
{'geometry': [LineString([coord for coord in line.coords]) for line in self.gdf_road.geometry]})
# Find the nearest road line and point for each centroid
self.closest_linestrings = []
self.nearest_points = []
for centroid in self.centroids_gdf.geometry:
closest_road = min(self.gdf_clean.geometry, key=lambda x: x.distance(centroid))
self.closest_linestrings.append(closest_road)
nearest_point = closest_road.interpolate(closest_road.project(centroid))
self.nearest_points.append(nearest_point)
def _process_intersections(self):
"""
Process intersections and create final geometries.
"""
# Create additional GeoDataFrames for points and nearest points
self.gdf_pts = gpd.GeoDataFrame(
{'geometry': [Point(coord) for line in self.gdf_clean.geometry for coord in line.coords]})
self.gdf_pts2 = gpd.GeoDataFrame({'geometry': self.nearest_points})
# Combine nearest points and road points into one GeoDataFrame
self.gdf_pts3 = gpd.GeoDataFrame({'geometry': self.nearest_points + list(self.gdf_pts.geometry)})
# Identify intersections and create LineStrings based on intersections
self.gdf_clean["intersect"] = [
[y for y in range(len(self.gdf_pts2)) if self.gdf_pts2.geometry[y].distance(geom) <= 1.0] for geom in
self.gdf_clean.geometry]
self.gdf_cleaner = self.gdf_clean[self.gdf_clean["intersect"].apply(len).gt(0)].reset_index(drop=True)
self.test_list = []
for idx, row in self.gdf_cleaner.iterrows():
for i in range(len(row["intersect"]) + 1):
if i == 0:
self.test_list.append(
LineString([row['geometry'].coords[0], self.gdf_pts3.geometry[row['intersect'][i]]]))
elif i < len(row['intersect']):
self.test_list.append(LineString(
[self.gdf_pts3.geometry[row['intersect'][i - 1]], self.gdf_pts3.geometry[row['intersect'][i]]]))
else:
self.test_list.append(
LineString([self.gdf_pts3.geometry[row['intersect'][i - 1]], row['geometry'].coords[-1]]))
self.gdf_cleanest = gpd.GeoDataFrame({'geometry': self.test_list})
points = [coord for geom in self.gdf_cleanest.geometry for coord in geom.coords]
gdf_pts_cnt = self.gdf_pts.copy()
gdf_pts_cnt["count"] = gdf_pts_cnt.geometry.apply(lambda x: points.count(x.coords[0]))
gdf_pts_reset = gdf_pts_cnt[gdf_pts_cnt["count"] > 1].reset_index(drop=True)
gdf_pts_drop = gdf_pts_cnt[gdf_pts_cnt["count"] <= 1].reset_index(drop=True)
# Remove unnecessary geometries from gdf_cleanest
for idx, geom in self.gdf_cleanest.iterrows():
for coord in geom.geometry.coords:
if coord in [pt.coords[0] for pt in gdf_pts_drop.geometry]:
self.gdf_cleanest = self.gdf_cleanest.drop(idx)
self.gdf_cleanest.reset_index(drop=True, inplace=True)
def _create_network_graph(self):
"""
Create a NetworkX graph from the processed geospatial data.
:return: A NetworkX graph representing the district heating network.
"""
G = nx.Graph()
# Add centroids and nearest points as nodes
for centroid in self.centroids_gdf.geometry:
G.add_node((centroid.x, centroid.y), type='centroid')
for point in self.nearest_points:
G.add_node((point.x, point.y), type='nearest_point')
# Add edges with lengths as weights for the road network
for line in self.gdf_cleanest.geometry:
length = line.length
if isinstance(line.boundary, MultiPoint):
# Handle MultiPoint boundary
points = list(line.boundary.geoms)
for i in range(len(points) - 1):
start_point = points[i]
end_point = points[i + 1]
G.add_edge((start_point.x, start_point.y), (end_point.x, end_point.y), weight=length)
else:
# Handle typical case with two endpoints
start_point, end_point = line.boundary
G.add_edge((start_point.x, start_point.y), (end_point.x, end_point.y), weight=length)
# Add edges connecting nearest points to their centroids
for point, centroid in zip(self.nearest_points, self.centroids_gdf.geometry):
distance = point.distance(centroid)
G.add_edge((point.x, point.y), (centroid.x, centroid.y), weight=distance)
return G
def plot_network_graph(self, network_graph):
"""
Plot the network graph using matplotlib and networkx.
:param network_graph: The NetworkX graph to be plotted.
"""
plt.figure(figsize=(12, 12))
pos = {node: (node[0], node[1]) for node in network_graph.nodes()}
# Draw nodes with type 'centroid'
centroids = [node for node, attr in network_graph.nodes(data=True) if attr.get('type') == 'centroid']
nx.draw_networkx_nodes(network_graph, pos, nodelist=centroids, node_color='blue', node_size=50,
label='Centroids')
# Draw nodes with type 'nearest_point'
nearest_points = [node for node, attr in network_graph.nodes(data=True) if attr.get('type') == 'nearest_point']
nx.draw_networkx_nodes(network_graph, pos, nodelist=nearest_points, node_color='green', node_size=30,
label='Nearest Points')
# Draw other nodes
other_nodes = [node for node, attr in network_graph.nodes(data=True) if 'type' not in attr]
nx.draw_networkx_nodes(network_graph, pos, nodelist=other_nodes, node_color='red', node_size=20,
label='Other Nodes')
# Draw edges
nx.draw_networkx_edges(network_graph, pos, edge_color='gray')
plt.legend()
plt.title('District Heating Network Graph')
plt.axis('off')
plt.show()