import json import geopandas as gpd import matplotlib.pyplot as plt from shapely.geometry import Polygon, Point, LineString, MultiPoint import networkx as nx class DistrictHeatingNetworkCreator: def __init__(self, buildings_file, roads_file): """ Initialize the class with paths to the buildings and roads data files. :param buildings_file: Path to the GeoJSON file containing building data. :param roads_file: Path to the Shapefile containing road data. """ self.buildings_file = buildings_file self.roads_file = roads_file def run(self): """ Main method to execute the district heating network creation process. :return: NetworkX graph with nodes and edges representing the network. """ self._load_and_process_data() self._find_nearest_roads() self._process_intersections() network_graph = self._create_network_graph() return network_graph def _load_and_process_data(self): """ Load and process the building and road data. """ # Load building data with open(self.buildings_file, 'r') as file: city = json.load(file) # Extract centroids and building IDs from building data centroids = [] building_ids = [] # List to store building IDs buildings = city['features'] for building in buildings: coordinates = building['geometry']['coordinates'][0] building_polygon = Polygon(coordinates) centroid = building_polygon.centroid centroids.append(centroid) building_ids.append(building['id']) # Extract building ID # Convert centroids to a GeoDataFrame and include building IDs self.centroids_gdf = gpd.GeoDataFrame({ 'geometry': [Point(centroid.x, centroid.y) for centroid in centroids], 'building_id': building_ids # Add building IDs as a column }, crs='EPSG:4326') # Load road data self.gdf_road = gpd.read_file(self.roads_file) # Ensure centroids are in the same CRS as roads self.centroids_gdf = self.centroids_gdf.to_crs(self.gdf_road.crs) def _find_nearest_roads(self): """ Find the nearest road for each building centroid. """ # Process road geometries self.gdf_clean = gpd.GeoDataFrame( {'geometry': [LineString([coord for coord in line.coords]) for line in self.gdf_road.geometry]}) # Find the nearest road line and point for each centroid self.closest_linestrings = [] self.nearest_points = [] for centroid in self.centroids_gdf.geometry: closest_road = min(self.gdf_clean.geometry, key=lambda x: x.distance(centroid)) self.closest_linestrings.append(closest_road) nearest_point = closest_road.interpolate(closest_road.project(centroid)) self.nearest_points.append(nearest_point) def _process_intersections(self): """ Process intersections and create final geometries. """ # Create additional GeoDataFrames for points and nearest points self.gdf_pts = gpd.GeoDataFrame( {'geometry': [Point(coord) for line in self.gdf_clean.geometry for coord in line.coords]}) self.gdf_pts2 = gpd.GeoDataFrame({'geometry': self.nearest_points}) # Combine nearest points and road points into one GeoDataFrame self.gdf_pts3 = gpd.GeoDataFrame({'geometry': self.nearest_points + list(self.gdf_pts.geometry)}) # Identify intersections and create LineStrings based on intersections self.gdf_clean["intersect"] = [ [y for y in range(len(self.gdf_pts2)) if self.gdf_pts2.geometry[y].distance(geom) <= 1.0] for geom in self.gdf_clean.geometry] self.gdf_cleaner = self.gdf_clean[self.gdf_clean["intersect"].apply(len).gt(0)].reset_index(drop=True) self.test_list = [] for idx, row in self.gdf_cleaner.iterrows(): for i in range(len(row["intersect"]) + 1): if i == 0: self.test_list.append( LineString([row['geometry'].coords[0], self.gdf_pts3.geometry[row['intersect'][i]]])) elif i < len(row['intersect']): self.test_list.append(LineString( [self.gdf_pts3.geometry[row['intersect'][i - 1]], self.gdf_pts3.geometry[row['intersect'][i]]])) else: self.test_list.append( LineString([self.gdf_pts3.geometry[row['intersect'][i - 1]], row['geometry'].coords[-1]])) self.gdf_cleanest = gpd.GeoDataFrame({'geometry': self.test_list}) points = [coord for geom in self.gdf_cleanest.geometry for coord in geom.coords] gdf_pts_cnt = self.gdf_pts.copy() gdf_pts_cnt["count"] = gdf_pts_cnt.geometry.apply(lambda x: points.count(x.coords[0])) gdf_pts_reset = gdf_pts_cnt[gdf_pts_cnt["count"] > 1].reset_index(drop=True) gdf_pts_drop = gdf_pts_cnt[gdf_pts_cnt["count"] <= 1].reset_index(drop=True) # Remove unnecessary geometries from gdf_cleanest for idx, geom in self.gdf_cleanest.iterrows(): for coord in geom.geometry.coords: if coord in [pt.coords[0] for pt in gdf_pts_drop.geometry]: self.gdf_cleanest = self.gdf_cleanest.drop(idx) self.gdf_cleanest.reset_index(drop=True, inplace=True) def _create_network_graph(self): """ Create a NetworkX graph from the processed geospatial data. :return: A NetworkX graph representing the district heating network. """ G = nx.Graph() # Convert centroids to EPSG:4326 for Google Maps compatibility for idx, row in self.centroids_gdf.iterrows(): building_name = f"Building_{idx + 1}" G.add_node((row.geometry.x, row.geometry.y), type='centroid', name=building_name, building_id=row['building_id']) # Add building ID as an attribute for point in self.nearest_points: G.add_node((point.x, point.y), type='nearest_point') # Add edges with lengths as weights for the road network for line in self.gdf_cleanest.geometry: length = line.length if isinstance(line.boundary, MultiPoint): # Handle MultiPoint boundary points = list(line.boundary.geoms) for i in range(len(points) - 1): start_point = points[i] end_point = points[i + 1] G.add_edge((start_point.x, start_point.y), (end_point.x, end_point.y), weight=length) else: # Handle typical case with two endpoints start_point, end_point = line.boundary G.add_edge((start_point.x, start_point.y), (end_point.x, end_point.y), weight=length) # Add edges connecting nearest points to their centroids for point, centroid in zip(self.nearest_points, self.centroids_gdf.geometry): distance = point.distance(centroid) G.add_edge((point.x, point.y), (centroid.x, centroid.y), weight=distance) return G def plot_network_graph(self, network_graph): """ Plot the network graph using matplotlib and networkx. :param network_graph: The NetworkX graph to be plotted. """ plt.figure(figsize=(12, 12)) pos = {node: (node[0], node[1]) for node in network_graph.nodes()} # Draw nodes and edges nx.draw_networkx_nodes(network_graph, pos, node_color='blue', node_size=50) nx.draw_networkx_edges(network_graph, pos, edge_color='gray') # Create a dictionary for node labels for centroids only node_labels = {node: data['name'] for node, data in network_graph.nodes(data=True) if data.get('type') == 'centroid'} # Adjust node label positions to reduce overlap label_pos = {node: (coords[0], coords[1] + 0.03) for node, coords in pos.items()} # Shift labels up # Draw node labels for centroids nx.draw_networkx_labels(network_graph, label_pos, labels=node_labels, font_size=8, verticalalignment='bottom') plt.title('District Heating Network Graph') plt.axis('off') plt.show()