188 lines
8.4 KiB
Python
188 lines
8.4 KiB
Python
import json
|
|
import geopandas as gpd
|
|
import matplotlib.pyplot as plt
|
|
from shapely.geometry import Polygon, Point, LineString, MultiPoint
|
|
import networkx as nx
|
|
from scipy.spatial import cKDTree
|
|
import hub.helpers.constants as cte
|
|
|
|
|
|
class DistrictHeatingNetworkCreator:
|
|
def __init__(self, buildings_file, roads_file, central_plant_longitude, central_plant_latitude):
|
|
self.buildings_file = buildings_file
|
|
self.roads_file = roads_file
|
|
self.central_plant_longitude = central_plant_longitude
|
|
self.central_plant_latitude = central_plant_latitude
|
|
|
|
def run(self):
|
|
self._load_and_process_data()
|
|
self._find_nearest_roads()
|
|
self._process_intersections()
|
|
network_graph = self._create_network_graph()
|
|
return network_graph
|
|
|
|
def _load_and_process_data(self):
|
|
self.gdf_road = gpd.read_file(self.roads_file)
|
|
with open(self.buildings_file, 'r') as file:
|
|
city = json.load(file)
|
|
|
|
centroids = []
|
|
building_ids = []
|
|
for building in city['features']:
|
|
coordinates = building['geometry']['coordinates'][0]
|
|
building_polygon = Polygon(coordinates)
|
|
centroid = building_polygon.centroid
|
|
centroids.append(centroid)
|
|
building_ids.append(building['id'])
|
|
|
|
centroids.append(Point(self.central_plant_longitude, self.central_plant_latitude))
|
|
building_ids.append(1)
|
|
|
|
self.centroids_gdf = gpd.GeoDataFrame({
|
|
'geometry': [Point(centroid.x, centroid.y) for centroid in centroids],
|
|
'building_id': building_ids,
|
|
'type': ['centroid' for _ in centroids]
|
|
}, crs='EPSG:4326')
|
|
|
|
def _find_nearest_roads(self):
|
|
self.centroids_gdf = self.centroids_gdf.to_crs(self.gdf_road.crs)
|
|
self.gdf_clean = gpd.GeoDataFrame(
|
|
{'geometry': [LineString([coord for coord in line.coords]) for line in self.gdf_road.geometry]})
|
|
|
|
self.closest_linestrings = []
|
|
self.nearest_points = []
|
|
for centroid in self.centroids_gdf.geometry:
|
|
closest_road = min(self.gdf_clean.geometry, key=lambda x: x.distance(centroid))
|
|
self.closest_linestrings.append(closest_road)
|
|
nearest_point = closest_road.interpolate(closest_road.project(centroid))
|
|
self.nearest_points.append(nearest_point)
|
|
|
|
def _process_intersections(self):
|
|
self.gdf_pts = gpd.GeoDataFrame(
|
|
{'geometry': [Point(coord) for line in self.gdf_clean.geometry for coord in line.coords]})
|
|
self.gdf_pts2 = gpd.GeoDataFrame({'geometry': self.nearest_points})
|
|
|
|
self.gdf_pts3 = gpd.GeoDataFrame({'geometry': self.nearest_points + list(self.gdf_pts.geometry)})
|
|
|
|
intersects = []
|
|
for geom in self.gdf_clean.geometry:
|
|
intersecting_points = []
|
|
if geom is not None:
|
|
for y in range(len(self.gdf_pts2)):
|
|
point_geom = self.gdf_pts2.geometry[y]
|
|
if point_geom is not None and point_geom.distance(geom) <= 1.0:
|
|
intersecting_points.append(y)
|
|
intersects.append(intersecting_points)
|
|
|
|
self.gdf_clean["intersect"] = intersects
|
|
self.gdf_cleaner = self.gdf_clean[self.gdf_clean["intersect"].apply(len).gt(0)].reset_index(drop=True)
|
|
|
|
self.test_list = []
|
|
for idx, row in self.gdf_cleaner.iterrows():
|
|
for i in range(len(row["intersect"]) + 1):
|
|
if i == 0:
|
|
self.test_list.append(
|
|
LineString([row['geometry'].coords[0], self.gdf_pts3.geometry[row['intersect'][i]]]))
|
|
elif i < len(row['intersect']):
|
|
self.test_list.append(LineString(
|
|
[self.gdf_pts3.geometry[row['intersect'][i - 1]], self.gdf_pts3.geometry[row['intersect'][i]]]))
|
|
else:
|
|
self.test_list.append(
|
|
LineString([self.gdf_pts3.geometry[row['intersect'][i - 1]], row['geometry'].coords[-1]]))
|
|
|
|
self.gdf_cleanest = gpd.GeoDataFrame({'geometry': self.test_list})
|
|
|
|
points = [coord for geom in self.gdf_cleanest.geometry for coord in geom.coords]
|
|
gdf_pts_cnt = self.gdf_pts.copy()
|
|
gdf_pts_cnt["count"] = gdf_pts_cnt.geometry.apply(lambda x: points.count(x.coords[0]))
|
|
gdf_pts_reset = gdf_pts_cnt[gdf_pts_cnt["count"] > 1].reset_index(drop=True)
|
|
gdf_pts_drop = gdf_pts_cnt[gdf_pts_cnt["count"] <= 1].reset_index(drop=True)
|
|
|
|
for idx, geom in self.gdf_cleanest.iterrows():
|
|
for coord in geom.geometry.coords:
|
|
if coord in [pt.coords[0] for pt in gdf_pts_drop.geometry]:
|
|
self.gdf_cleanest = self.gdf_cleanest.drop(idx)
|
|
|
|
self.gdf_cleanest.reset_index(drop=True, inplace=True)
|
|
|
|
def _create_network_graph(self):
|
|
g = nx.Graph()
|
|
|
|
# Add nodes with geometry attribute
|
|
for idx, row in self.centroids_gdf.iterrows():
|
|
building_name = f"Building_{idx}"
|
|
g.add_node((row.geometry.x, row.geometry.y),
|
|
geometry=row.geometry, # Include geometry attribute
|
|
type='centroid',
|
|
name=building_name,
|
|
building_id=str(row.get('building_id')))
|
|
|
|
for point in self.nearest_points:
|
|
g.add_node((point.x, point.y),
|
|
geometry=point, # Include geometry attribute
|
|
type='nearest_point')
|
|
|
|
for line in self.gdf_cleanest.geometry:
|
|
length = line.length
|
|
if isinstance(line.boundary, MultiPoint):
|
|
points = list(line.boundary.geoms)
|
|
for i in range(len(points) - 1):
|
|
start_point = points[i]
|
|
end_point = points[i + 1]
|
|
g.add_edge((start_point.x, start_point.y), (end_point.x, end_point.y), weight=length)
|
|
else:
|
|
start_point, end_point = line.boundary
|
|
g.add_edge((start_point.x, start_point.y), (end_point.x, end_point.y), weight=length)
|
|
|
|
for point, centroid in zip(self.nearest_points, self.centroids_gdf.geometry):
|
|
distance = point.distance(centroid)
|
|
g.add_edge((point.x, point.y), (centroid.x, centroid.y), weight=distance)
|
|
|
|
# Check and connect isolated components
|
|
components = list(nx.connected_components(g))
|
|
if len(components) > 1:
|
|
components.sort(key=len)
|
|
main_component = components[-1]
|
|
for comp in components[:-1]:
|
|
self._connect_component_to_main(g, comp, main_component)
|
|
|
|
return g
|
|
|
|
def _connect_component_to_main(self, graph, component, main_component):
|
|
main_component_nodes = [graph.nodes[node] for node in main_component if 'geometry' in graph.nodes[node]]
|
|
|
|
# Create cKDTree for efficient nearest neighbor search
|
|
tree = cKDTree([(node['geometry'].x, node['geometry'].y) for node in main_component_nodes])
|
|
|
|
# For each node in the component, find the closest street node in the main component
|
|
for node in component:
|
|
if 'geometry' in graph.nodes[node]: # Check for geometry attribute
|
|
node_geometry = graph.nodes[node]['geometry']
|
|
distance, idx = tree.query((node_geometry.x, node_geometry.y))
|
|
closest_node_geometry = main_component_nodes[idx]['geometry']
|
|
|
|
# Add edge to the graph
|
|
graph.add_edge((node_geometry.x, node_geometry.y),
|
|
(closest_node_geometry.x, closest_node_geometry.y), weight=distance)
|
|
|
|
|
|
def plot_network_graph(network_graph, central_plant_id=1):
|
|
plt.figure(figsize=(12, 12))
|
|
pos = {node: (node[0], node[1]) for node in network_graph.nodes()}
|
|
|
|
# Node colors based on type
|
|
node_colors = ['red' if data.get('building_id') == str(central_plant_id) else 'green' if data.get(
|
|
'type') == 'centroid' else 'blue' for node, data in network_graph.nodes(data=True)]
|
|
|
|
# Node sizes, larger for central plant
|
|
node_sizes = [100 if data.get('building_id') == str(central_plant_id) else 50 for node, data in
|
|
network_graph.nodes(data=True)]
|
|
|
|
nx.draw_networkx_nodes(network_graph, pos, node_color=node_colors, node_size=node_sizes)
|
|
nx.draw_networkx_edges(network_graph, pos, edge_color='gray', width=1)
|
|
|
|
plt.title('District Heating Network Graph')
|
|
plt.axis('off')
|
|
plt.savefig('network_graph_visualization.png', format='png', dpi=300) # Save as PNG with high dpi for clarity
|
|
plt.show()
|