import json import geopandas as gpd import matplotlib.pyplot as plt from shapely.geometry import Polygon, Point, LineString, MultiPoint import networkx as nx from scipy.spatial import cKDTree import hub.helpers.constants as cte class DistrictHeatingNetworkCreator: def __init__(self, buildings_file, roads_file, central_plant_longitude, central_plant_latitude): self.buildings_file = buildings_file self.roads_file = roads_file self.central_plant_longitude = central_plant_longitude self.central_plant_latitude = central_plant_latitude def run(self): self._load_and_process_data() self._find_nearest_roads() self._process_intersections() network_graph = self._create_network_graph() return network_graph def _load_and_process_data(self): self.gdf_road = gpd.read_file(self.roads_file) with open(self.buildings_file, 'r') as file: city = json.load(file) centroids = [] building_ids = [] for building in city['features']: coordinates = building['geometry']['coordinates'][0] building_polygon = Polygon(coordinates) centroid = building_polygon.centroid centroids.append(centroid) building_ids.append(building['id']) centroids.append(Point(self.central_plant_longitude, self.central_plant_latitude)) building_ids.append(1) self.centroids_gdf = gpd.GeoDataFrame({ 'geometry': [Point(centroid.x, centroid.y) for centroid in centroids], 'building_id': building_ids, 'type': ['centroid' for _ in centroids] }, crs='EPSG:4326') def _find_nearest_roads(self): self.centroids_gdf = self.centroids_gdf.to_crs(self.gdf_road.crs) self.gdf_clean = gpd.GeoDataFrame( {'geometry': [LineString([coord for coord in line.coords]) for line in self.gdf_road.geometry]}) self.closest_linestrings = [] self.nearest_points = [] for centroid in self.centroids_gdf.geometry: closest_road = min(self.gdf_clean.geometry, key=lambda x: x.distance(centroid)) self.closest_linestrings.append(closest_road) nearest_point = closest_road.interpolate(closest_road.project(centroid)) self.nearest_points.append(nearest_point) def _process_intersections(self): self.gdf_pts = gpd.GeoDataFrame( {'geometry': [Point(coord) for line in self.gdf_clean.geometry for coord in line.coords]}) self.gdf_pts2 = gpd.GeoDataFrame({'geometry': self.nearest_points}) self.gdf_pts3 = gpd.GeoDataFrame({'geometry': self.nearest_points + list(self.gdf_pts.geometry)}) intersects = [] for geom in self.gdf_clean.geometry: intersecting_points = [] if geom is not None: for y in range(len(self.gdf_pts2)): point_geom = self.gdf_pts2.geometry[y] if point_geom is not None and point_geom.distance(geom) <= 1.0: intersecting_points.append(y) intersects.append(intersecting_points) self.gdf_clean["intersect"] = intersects self.gdf_cleaner = self.gdf_clean[self.gdf_clean["intersect"].apply(len).gt(0)].reset_index(drop=True) self.test_list = [] for idx, row in self.gdf_cleaner.iterrows(): for i in range(len(row["intersect"]) + 1): if i == 0: self.test_list.append( LineString([row['geometry'].coords[0], self.gdf_pts3.geometry[row['intersect'][i]]])) elif i < len(row['intersect']): self.test_list.append(LineString( [self.gdf_pts3.geometry[row['intersect'][i - 1]], self.gdf_pts3.geometry[row['intersect'][i]]])) else: self.test_list.append( LineString([self.gdf_pts3.geometry[row['intersect'][i - 1]], row['geometry'].coords[-1]])) self.gdf_cleanest = gpd.GeoDataFrame({'geometry': self.test_list}) points = [coord for geom in self.gdf_cleanest.geometry for coord in geom.coords] gdf_pts_cnt = self.gdf_pts.copy() gdf_pts_cnt["count"] = gdf_pts_cnt.geometry.apply(lambda x: points.count(x.coords[0])) gdf_pts_reset = gdf_pts_cnt[gdf_pts_cnt["count"] > 1].reset_index(drop=True) gdf_pts_drop = gdf_pts_cnt[gdf_pts_cnt["count"] <= 1].reset_index(drop=True) for idx, geom in self.gdf_cleanest.iterrows(): for coord in geom.geometry.coords: if coord in [pt.coords[0] for pt in gdf_pts_drop.geometry]: self.gdf_cleanest = self.gdf_cleanest.drop(idx) self.gdf_cleanest.reset_index(drop=True, inplace=True) def _create_network_graph(self): g = nx.Graph() # Add nodes with geometry attribute for idx, row in self.centroids_gdf.iterrows(): building_name = f"Building_{idx}" g.add_node((row.geometry.x, row.geometry.y), geometry=row.geometry, # Include geometry attribute type='centroid', name=building_name, building_id=str(row.get('building_id'))) for point in self.nearest_points: g.add_node((point.x, point.y), geometry=point, # Include geometry attribute type='nearest_point') for line in self.gdf_cleanest.geometry: length = line.length if isinstance(line.boundary, MultiPoint): points = list(line.boundary.geoms) for i in range(len(points) - 1): start_point = points[i] end_point = points[i + 1] g.add_edge((start_point.x, start_point.y), (end_point.x, end_point.y), weight=length) else: start_point, end_point = line.boundary g.add_edge((start_point.x, start_point.y), (end_point.x, end_point.y), weight=length) for point, centroid in zip(self.nearest_points, self.centroids_gdf.geometry): distance = point.distance(centroid) g.add_edge((point.x, point.y), (centroid.x, centroid.y), weight=distance) # Check and connect isolated components components = list(nx.connected_components(g)) if len(components) > 1: components.sort(key=len) main_component = components[-1] for comp in components[:-1]: self._connect_component_to_main(g, comp, main_component) return g def _connect_component_to_main(self, graph, component, main_component): main_component_nodes = [graph.nodes[node] for node in main_component if 'geometry' in graph.nodes[node]] # Create cKDTree for efficient nearest neighbor search tree = cKDTree([(node['geometry'].x, node['geometry'].y) for node in main_component_nodes]) # For each node in the component, find the closest street node in the main component for node in component: if 'geometry' in graph.nodes[node]: # Check for geometry attribute node_geometry = graph.nodes[node]['geometry'] distance, idx = tree.query((node_geometry.x, node_geometry.y)) closest_node_geometry = main_component_nodes[idx]['geometry'] # Add edge to the graph graph.add_edge((node_geometry.x, node_geometry.y), (closest_node_geometry.x, closest_node_geometry.y), weight=distance) def plot_network_graph(network_graph, central_plant_id=1): plt.figure(figsize=(12, 12)) pos = {node: (node[0], node[1]) for node in network_graph.nodes()} # Node colors based on type node_colors = ['red' if data.get('building_id') == str(central_plant_id) else 'green' if data.get( 'type') == 'centroid' else 'blue' for node, data in network_graph.nodes(data=True)] # Node sizes, larger for central plant node_sizes = [100 if data.get('building_id') == str(central_plant_id) else 50 for node, data in network_graph.nodes(data=True)] nx.draw_networkx_nodes(network_graph, pos, node_color=node_colors, node_size=node_sizes) nx.draw_networkx_edges(network_graph, pos, edge_color='gray', width=1) plt.title('District Heating Network Graph') plt.axis('off') plt.savefig('network_graph_visualization.png', format='png', dpi=300) # Save as PNG with high dpi for clarity plt.show()