191 lines
8.5 KiB
Python
191 lines
8.5 KiB
Python
import json
|
|
import geopandas as gpd
|
|
import matplotlib.pyplot as plt
|
|
from shapely.geometry import Polygon, Point, LineString, MultiPoint
|
|
import networkx as nx
|
|
|
|
|
|
class DistrictHeatingNetworkCreator:
|
|
def __init__(self, buildings_file, roads_file):
|
|
"""
|
|
Initialize the class with paths to the buildings and roads data files.
|
|
|
|
:param buildings_file: Path to the GeoJSON file containing building data.
|
|
:param roads_file: Path to the Shapefile containing road data.
|
|
"""
|
|
self.buildings_file = buildings_file
|
|
self.roads_file = roads_file
|
|
|
|
def run(self):
|
|
"""
|
|
Main method to execute the district heating network creation process.
|
|
:return: NetworkX graph with nodes and edges representing the network.
|
|
"""
|
|
self._load_and_process_data()
|
|
self._find_nearest_roads()
|
|
self._process_intersections()
|
|
network_graph = self._create_network_graph()
|
|
return network_graph
|
|
|
|
def _load_and_process_data(self):
|
|
"""
|
|
Load and process the building and road data.
|
|
"""
|
|
# Load building data
|
|
with open(self.buildings_file, 'r') as file:
|
|
city = json.load(file)
|
|
|
|
# Extract centroids and building IDs from building data
|
|
centroids = []
|
|
building_ids = [] # List to store building IDs
|
|
buildings = city['features']
|
|
for building in buildings:
|
|
coordinates = building['geometry']['coordinates'][0]
|
|
building_polygon = Polygon(coordinates)
|
|
centroid = building_polygon.centroid
|
|
centroids.append(centroid)
|
|
building_ids.append(building['id']) # Extract building ID
|
|
|
|
# Convert centroids to a GeoDataFrame and include building IDs
|
|
self.centroids_gdf = gpd.GeoDataFrame({
|
|
'geometry': [Point(centroid.x, centroid.y) for centroid in centroids],
|
|
'building_id': building_ids # Add building IDs as a column
|
|
}, crs='EPSG:4326')
|
|
|
|
# Load road data
|
|
self.gdf_road = gpd.read_file(self.roads_file)
|
|
|
|
# Ensure centroids are in the same CRS as roads
|
|
self.centroids_gdf = self.centroids_gdf.to_crs(self.gdf_road.crs)
|
|
|
|
def _find_nearest_roads(self):
|
|
"""
|
|
Find the nearest road for each building centroid.
|
|
"""
|
|
# Process road geometries
|
|
self.gdf_clean = gpd.GeoDataFrame(
|
|
{'geometry': [LineString([coord for coord in line.coords]) for line in self.gdf_road.geometry]})
|
|
|
|
# Find the nearest road line and point for each centroid
|
|
self.closest_linestrings = []
|
|
self.nearest_points = []
|
|
for centroid in self.centroids_gdf.geometry:
|
|
closest_road = min(self.gdf_clean.geometry, key=lambda x: x.distance(centroid))
|
|
self.closest_linestrings.append(closest_road)
|
|
nearest_point = closest_road.interpolate(closest_road.project(centroid))
|
|
self.nearest_points.append(nearest_point)
|
|
|
|
def _process_intersections(self):
|
|
"""
|
|
Process intersections and create final geometries.
|
|
"""
|
|
# Create additional GeoDataFrames for points and nearest points
|
|
self.gdf_pts = gpd.GeoDataFrame(
|
|
{'geometry': [Point(coord) for line in self.gdf_clean.geometry for coord in line.coords]})
|
|
self.gdf_pts2 = gpd.GeoDataFrame({'geometry': self.nearest_points})
|
|
|
|
# Combine nearest points and road points into one GeoDataFrame
|
|
self.gdf_pts3 = gpd.GeoDataFrame({'geometry': self.nearest_points + list(self.gdf_pts.geometry)})
|
|
|
|
# Identify intersections and create LineStrings based on intersections
|
|
self.gdf_clean["intersect"] = [
|
|
[y for y in range(len(self.gdf_pts2)) if self.gdf_pts2.geometry[y].distance(geom) <= 1.0] for geom in
|
|
self.gdf_clean.geometry]
|
|
self.gdf_cleaner = self.gdf_clean[self.gdf_clean["intersect"].apply(len).gt(0)].reset_index(drop=True)
|
|
|
|
self.test_list = []
|
|
for idx, row in self.gdf_cleaner.iterrows():
|
|
for i in range(len(row["intersect"]) + 1):
|
|
if i == 0:
|
|
self.test_list.append(
|
|
LineString([row['geometry'].coords[0], self.gdf_pts3.geometry[row['intersect'][i]]]))
|
|
elif i < len(row['intersect']):
|
|
self.test_list.append(LineString(
|
|
[self.gdf_pts3.geometry[row['intersect'][i - 1]], self.gdf_pts3.geometry[row['intersect'][i]]]))
|
|
else:
|
|
self.test_list.append(
|
|
LineString([self.gdf_pts3.geometry[row['intersect'][i - 1]], row['geometry'].coords[-1]]))
|
|
|
|
self.gdf_cleanest = gpd.GeoDataFrame({'geometry': self.test_list})
|
|
|
|
points = [coord for geom in self.gdf_cleanest.geometry for coord in geom.coords]
|
|
gdf_pts_cnt = self.gdf_pts.copy()
|
|
gdf_pts_cnt["count"] = gdf_pts_cnt.geometry.apply(lambda x: points.count(x.coords[0]))
|
|
gdf_pts_reset = gdf_pts_cnt[gdf_pts_cnt["count"] > 1].reset_index(drop=True)
|
|
gdf_pts_drop = gdf_pts_cnt[gdf_pts_cnt["count"] <= 1].reset_index(drop=True)
|
|
|
|
# Remove unnecessary geometries from gdf_cleanest
|
|
for idx, geom in self.gdf_cleanest.iterrows():
|
|
for coord in geom.geometry.coords:
|
|
if coord in [pt.coords[0] for pt in gdf_pts_drop.geometry]:
|
|
self.gdf_cleanest = self.gdf_cleanest.drop(idx)
|
|
|
|
self.gdf_cleanest.reset_index(drop=True, inplace=True)
|
|
|
|
def _create_network_graph(self):
|
|
"""
|
|
Create a NetworkX graph from the processed geospatial data.
|
|
:return: A NetworkX graph representing the district heating network.
|
|
"""
|
|
G = nx.Graph()
|
|
|
|
# Convert centroids to EPSG:4326 for Google Maps compatibility
|
|
for idx, row in self.centroids_gdf.iterrows():
|
|
building_name = f"Building_{idx + 1}"
|
|
G.add_node((row.geometry.x, row.geometry.y),
|
|
type='centroid',
|
|
name=building_name,
|
|
building_id=row['building_id']) # Add building ID as an attribute
|
|
|
|
for point in self.nearest_points:
|
|
G.add_node((point.x, point.y), type='nearest_point')
|
|
|
|
# Add edges with lengths as weights for the road network
|
|
for line in self.gdf_cleanest.geometry:
|
|
length = line.length
|
|
if isinstance(line.boundary, MultiPoint):
|
|
# Handle MultiPoint boundary
|
|
points = list(line.boundary.geoms)
|
|
for i in range(len(points) - 1):
|
|
start_point = points[i]
|
|
end_point = points[i + 1]
|
|
G.add_edge((start_point.x, start_point.y), (end_point.x, end_point.y), weight=length)
|
|
else:
|
|
# Handle typical case with two endpoints
|
|
start_point, end_point = line.boundary
|
|
G.add_edge((start_point.x, start_point.y), (end_point.x, end_point.y), weight=length)
|
|
|
|
# Add edges connecting nearest points to their centroids
|
|
for point, centroid in zip(self.nearest_points, self.centroids_gdf.geometry):
|
|
distance = point.distance(centroid)
|
|
G.add_edge((point.x, point.y), (centroid.x, centroid.y), weight=distance)
|
|
|
|
return G
|
|
|
|
def plot_network_graph(self, network_graph):
|
|
"""
|
|
Plot the network graph using matplotlib and networkx.
|
|
|
|
:param network_graph: The NetworkX graph to be plotted.
|
|
"""
|
|
plt.figure(figsize=(12, 12))
|
|
pos = {node: (node[0], node[1]) for node in network_graph.nodes()}
|
|
|
|
# Draw nodes and edges
|
|
nx.draw_networkx_nodes(network_graph, pos, node_color='blue', node_size=50)
|
|
nx.draw_networkx_edges(network_graph, pos, edge_color='gray')
|
|
|
|
# Create a dictionary for node labels for centroids only
|
|
node_labels = {node: data['name'] for node, data in network_graph.nodes(data=True) if
|
|
data.get('type') == 'centroid'}
|
|
|
|
# Adjust node label positions to reduce overlap
|
|
label_pos = {node: (coords[0], coords[1] + 0.03) for node, coords in pos.items()} # Shift labels up
|
|
|
|
# Draw node labels for centroids
|
|
nx.draw_networkx_labels(network_graph, label_pos, labels=node_labels, font_size=8, verticalalignment='bottom')
|
|
|
|
plt.title('District Heating Network Graph')
|
|
plt.axis('off')
|
|
plt.show()
|