import json import matplotlib.pyplot as plt from shapely.geometry import Polygon, Point, LineString import networkx as nx class DistrictHeatingNetworkCreator: def __init__(self, buildings_file, roads_file): """ Initialize the class with paths to the buildings and roads data files. :param buildings_file: Path to the GeoJSON file containing building data. :param roads_file: Path to the GeoJSON file containing roads data. """ self.buildings_file = buildings_file self.roads_file = roads_file def run(self): """ Main method to execute the district heating network creation process. :return: NetworkX graph with nodes and edges representing the network. """ self._load_and_process_data() self._find_nearest_roads() self._find_nearest_points() self._break_down_roads() self._create_graph() self._create_mst() self._iteratively_remove_edges() return self.final_mst def _load_and_process_data(self): """ Load and process the building and road data. """ # Load building data with open(self.buildings_file, 'r') as file: city = json.load(file) # Extract centroids and building IDs from building data self.centroids = [] self.building_ids = [] # List to store building IDs buildings = city['features'] for building in buildings: coordinates = building['geometry']['coordinates'][0] building_polygon = Polygon(coordinates) centroid = building_polygon.centroid self.centroids.append(centroid) self.building_ids.append(building['id']) # Extract building ID # Load road data with open(self.roads_file, 'r') as file: roads = json.load(file) line_features = [feature for feature in roads['features'] if feature['geometry']['type'] == 'LineString'] # Create a list of LineString objects and their properties self.lines = [] for feature in line_features: # Create a LineString from coordinates linestring = LineString(feature['geometry']['coordinates']) self.lines.append(linestring) self.cleaned_lines = [] for line in self.lines: coords = list(line.coords) cleaned_line = LineString([coords[0], coords[-1]]) self.cleaned_lines.append(cleaned_line) def _find_nearest_roads(self): """ Find the nearest road for each building centroid. """ self.closest_roads = [] unique_roads_set = set() # Loop through each centroid for centroid in self.centroids: min_distance = float('inf') # Start with a large number to ensure any real distance is smaller closest_road = None # Loop through each road and calculate the distance to the current centroid for line in self.cleaned_lines: distance = line.distance(centroid) # Check if the current road is closer than the ones previously checked if distance < min_distance: min_distance = distance closest_road = line # Add the closest road to the list if it's not already added if closest_road and closest_road.wkt not in unique_roads_set: unique_roads_set.add(closest_road.wkt) self.closest_roads.append(closest_road) def _find_nearest_points(self): """ Find the nearest point on each closest road for each centroid. """ def find_nearest_point_on_line(point, line): return line.interpolate(line.project(point)) self.nearest_points = [] # Find the nearest point on each closest road for each centroid for centroid in self.centroids: # Find the closest road for this centroid min_distance = float('inf') closest_road = None for road in self.closest_roads: distance = centroid.distance(road) if distance < min_distance: min_distance = distance closest_road = road # Find the nearest point on the closest road if closest_road: nearest_point = find_nearest_point_on_line(centroid, closest_road) self.nearest_points.append(nearest_point) def _break_down_roads(self): """ Break down roads into segments connecting nearest points. """ def break_down_roads(closest_roads, nearest_points_list): new_segments = [] for road in closest_roads: # Get coordinates of the road coords = list(road.coords) # Find all nearest points for this road points_on_road = [point for point in nearest_points_list if road.distance(point) < 0.000000001] # Sort nearest points along the road sorted_points = sorted(points_on_road, key=lambda point: road.project(point)) # Add the start node to the sorted points sorted_points.insert(0, Point(coords[0])) # Add the end node to the sorted points sorted_points.append(Point(coords[-1])) # Create new segments for i in range(len(sorted_points) - 1): segment = LineString([sorted_points[i], sorted_points[i + 1]]) new_segments.append(segment) return new_segments # Create new segments self.new_segments = break_down_roads(self.closest_roads, self.nearest_points) self.cleaned_lines = [line for line in self.cleaned_lines if line not in self.closest_roads] self.cleaned_lines.extend(self.new_segments) def _create_graph(self): """ Create a NetworkX graph from the cleaned lines. """ self.G = nx.Graph() # Add edges to the graph from the cleaned lines for line in self.cleaned_lines: coords = list(line.coords) for i in range(len(coords) - 1): self.G.add_edge(coords[i], coords[i + 1], weight=Point(coords[i]).distance(Point(coords[i + 1]))) def _create_mst(self): """ Create a Minimum Spanning Tree (MST) from the graph. """ def find_paths_between_nearest_points(g, nearest_points): edges = [] for i, start_point in enumerate(nearest_points): start = (start_point.x, start_point.y) for end_point in nearest_points[i + 1:]: end = (end_point.x, end_point.y) if nx.has_path(g, start, end): path = nx.shortest_path(g, source=start, target=end, weight='weight') path_edges = list(zip(path[:-1], path[1:])) edges.extend((u, v, g[u][v]['weight']) for u, v in path_edges) return edges # Find the edges used to connect the nearest points edges = find_paths_between_nearest_points(self.G, self.nearest_points) # Create a graph from these edges h = nx.Graph() h.add_weighted_edges_from(edges) # Compute the Minimum Spanning Tree (MST) using Kruskal's algorithm mst = nx.minimum_spanning_tree(h, weight='weight') # Perform pathfinding again on the MST to ensure shortest paths within the MST final_edges = [] for u, v in mst.edges(): if nx.has_path(self.G, u, v): path = nx.shortest_path(self.G, source=u, target=v, weight='weight') path_edges = list(zip(path[:-1], path[1:])) final_edges.extend((x, y, self.G[x][y]['weight']) for x, y in path_edges) # Create the final MST graph with these edges self.final_mst = nx.Graph() self.final_mst.add_weighted_edges_from(final_edges) def _iteratively_remove_edges(self): """ Iteratively remove edges that do not have any nearest points and have one end with only one connection. Also remove nodes that don't have any connections. """ nearest_points_tuples = [(point.x, point.y) for point in self.nearest_points] def find_edges_to_remove(graph): edges_to_remove = [] for u, v in graph.edges(): if u not in nearest_points_tuples and v not in nearest_points_tuples: if graph.degree(u) == 1 or graph.degree(v) == 1: edges_to_remove.append((u, v)) return edges_to_remove edges_to_remove = find_edges_to_remove(self.final_mst) while edges_to_remove: self.final_mst.remove_edges_from(edges_to_remove) # Find and remove nodes with no connections nodes_to_remove = [node for node in self.final_mst.nodes() if self.final_mst.degree(node) == 0] self.final_mst.remove_nodes_from(nodes_to_remove) edges_to_remove = find_edges_to_remove(self.final_mst) def plot_network_graph(self): """ Plot the network graph using matplotlib and networkx. """ plt.figure(figsize=(15, 10)) pos = {node: (node[0], node[1]) for node in self.final_mst.nodes()} # Draw nodes and edges nx.draw_networkx_nodes(self.final_mst, pos, node_color='blue', node_size=50) nx.draw_networkx_edges(self.final_mst, pos, edge_color='gray') plt.title('District Heating Network Graph') plt.axis('off') plt.show()