import json import matplotlib.pyplot as plt from shapely.geometry import Polygon, Point, LineString import networkx as nx from typing import List, Tuple from rtree import index import math def haversine(lon1, lat1, lon2, lat2): """ Calculate the great-circle distance between two points on the Earth specified by their longitude and latitude. """ R = 6371000 # Radius of the Earth in meters phi1 = math.radians(lat1) phi2 = math.radians(lat2) delta_phi = math.radians(lat2 - lat1) delta_lambda = math.radians(lon2 - lon1) a = math.sin(delta_phi / 2.0) ** 2 + \ math.cos(phi1) * math.cos(phi2) * \ math.sin(delta_lambda / 2.0) ** 2 c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a)) return R * c # Output distance in meters class DistrictHeatingNetworkCreator: def __init__(self, buildings_file: str, roads_file: str): """ Initialize the class with paths to the buildings and roads data files. :param buildings_file: Path to the GeoJSON file containing building data. :param roads_file: Path to the GeoJSON file containing roads data. """ self.buildings_file = buildings_file self.roads_file = roads_file def run(self) -> nx.Graph: """ Main method to execute the district heating network creation process. :return: NetworkX graph with nodes and edges representing the network. """ self._load_and_process_data() self._find_nearest_roads() self._find_nearest_points() self._break_down_roads() self._create_graph() self._create_mst() self._iteratively_remove_edges() self._add_centroids_to_mst() self._convert_edge_weights_to_meters() return self.final_mst def _load_and_process_data(self): """ Load and process the building and road data. """ # Load building data with open(self.buildings_file, 'r') as file: city = json.load(file) self.centroids = [] self.building_ids = [] buildings = city['features'] for building in buildings: coordinates = building['geometry']['coordinates'][0] building_polygon = Polygon(coordinates) centroid = building_polygon.centroid self.centroids.append(centroid) self.building_ids.append(building['id']) # Load road data with open(self.roads_file, 'r') as file: roads = json.load(file) line_features = [feature for feature in roads['features'] if feature['geometry']['type'] == 'LineString'] self.lines = [LineString(feature['geometry']['coordinates']) for feature in line_features] self.cleaned_lines = [LineString([line.coords[0], line.coords[-1]]) for line in self.lines] def _find_nearest_roads(self): """ Find the nearest road for each building centroid. """ self.closest_roads = [] unique_roads_set = set() # Create spatial index for roads idx = index.Index() for pos, line in enumerate(self.cleaned_lines): idx.insert(pos, line.bounds) for centroid in self.centroids: min_distance = float('inf') closest_road = None for pos in idx.nearest(centroid.bounds, 10): road = self.cleaned_lines[pos] distance = road.distance(centroid) if distance < min_distance: min_distance = distance closest_road = road if closest_road and closest_road.wkt not in unique_roads_set: unique_roads_set.add(closest_road.wkt) self.closest_roads.append(closest_road) def _find_nearest_points(self): """ Find the nearest point on each closest road for each centroid. """ def find_nearest_point_on_line(point: Point, line: LineString) -> Point: return line.interpolate(line.project(point)) self.nearest_points = [] for centroid in self.centroids: min_distance = float('inf') closest_road = None for road in self.closest_roads: distance = centroid.distance(road) if distance < min_distance: min_distance = distance closest_road = road if closest_road: nearest_point = find_nearest_point_on_line(centroid, closest_road) self.nearest_points.append(nearest_point) def _break_down_roads(self): """ Break down roads into segments connecting nearest points. """ def break_down_roads(closest_roads: List[LineString], nearest_points_list: List[Point]) -> List[LineString]: new_segments = [] for road in closest_roads: coords = list(road.coords) points_on_road = [point for point in nearest_points_list if road.distance(point) < 0.000000001] sorted_points = sorted(points_on_road, key=lambda point: road.project(point)) sorted_points.insert(0, Point(coords[0])) sorted_points.append(Point(coords[-1])) for i in range(len(sorted_points) - 1): segment = LineString([sorted_points[i], sorted_points[i + 1]]) new_segments.append(segment) return new_segments self.new_segments = break_down_roads(self.closest_roads, self.nearest_points) self.cleaned_lines = [line for line in self.cleaned_lines if line not in self.closest_roads] self.cleaned_lines.extend(self.new_segments) def _create_graph(self): """ Create a NetworkX graph from the cleaned lines. """ self.G = nx.Graph() for line in self.cleaned_lines: coords = list(line.coords) for i in range(len(coords) - 1): self.G.add_edge(coords[i], coords[i + 1], weight=Point(coords[i]).distance(Point(coords[i + 1]))) def _create_mst(self): """ Create a Minimum Spanning Tree (MST) from the graph. """ def find_paths_between_nearest_points(g: nx.Graph, nearest_points: List[Point]) -> List[Tuple]: edges = [] for i, start_point in enumerate(nearest_points): start = (start_point.x, start_point.y) for end_point in nearest_points[i + 1:]: end = (end_point.x, end_point.y) if nx.has_path(g, start, end): path = nx.shortest_path(g, source=start, target=end, weight='weight') path_edges = list(zip(path[:-1], path[1:])) edges.extend((u, v, g[u][v]['weight']) for u, v in path_edges) return edges edges = find_paths_between_nearest_points(self.G, self.nearest_points) h = nx.Graph() h.add_weighted_edges_from(edges) mst = nx.minimum_spanning_tree(h, weight='weight') final_edges = [] for u, v in mst.edges(): if nx.has_path(self.G, u, v): path = nx.shortest_path(self.G, source=u, target=v, weight='weight') path_edges = list(zip(path[:-1], path[1:])) final_edges.extend((x, y, self.G[x][y]['weight']) for x, y in path_edges) self.final_mst = nx.Graph() self.final_mst.add_weighted_edges_from(final_edges) def _iteratively_remove_edges(self): """ Iteratively remove edges that do not have any nearest points and have one end with only one connection. Also remove nodes that don't have any connections and street nodes with only one connection. """ nearest_points_tuples = [(point.x, point.y) for point in self.nearest_points] def find_edges_to_remove(graph: nx.Graph) -> List[Tuple]: edges_to_remove = [] for u, v, d in graph.edges(data=True): if u not in nearest_points_tuples and v not in nearest_points_tuples: if graph.degree(u) == 1 or graph.degree(v) == 1: edges_to_remove.append((u, v, d)) return edges_to_remove def find_nodes_to_remove(graph: nx.Graph) -> List[Tuple]: nodes_to_remove = [] for node in graph.nodes(): if graph.degree(node) == 0: nodes_to_remove.append(node) return nodes_to_remove edges_to_remove = find_edges_to_remove(self.final_mst) self.final_mst_steps = [list(self.final_mst.edges(data=True))] while edges_to_remove or find_nodes_to_remove(self.final_mst): self.final_mst.remove_edges_from(edges_to_remove) nodes_to_remove = find_nodes_to_remove(self.final_mst) self.final_mst.remove_nodes_from(nodes_to_remove) edges_to_remove = find_edges_to_remove(self.final_mst) self.final_mst_steps.append(list(self.final_mst.edges(data=True))) def find_single_connection_street_nodes(graph: nx.Graph) -> List[Tuple]: single_connection_street_nodes = [] for node in graph.nodes(): if node not in nearest_points_tuples and graph.degree(node) == 1: single_connection_street_nodes.append(node) return single_connection_street_nodes single_connection_street_nodes = find_single_connection_street_nodes(self.final_mst) while single_connection_street_nodes: for node in single_connection_street_nodes: neighbors = list(self.final_mst.neighbors(node)) self.final_mst.remove_node(node) for neighbor in neighbors: if self.final_mst.degree(neighbor) == 0: self.final_mst.remove_node(neighbor) single_connection_street_nodes = find_single_connection_street_nodes(self.final_mst) self.final_mst_steps.append(list(self.final_mst.edges(data=True))) def _add_centroids_to_mst(self): """ Add centroids to the final MST graph and connect them to their associated node on the graph. """ for i, centroid in enumerate(self.centroids): centroid_tuple = (centroid.x, centroid.y) building_id = self.building_ids[i] self.final_mst.add_node(centroid_tuple, type='building', id=building_id) nearest_point = None min_distance = float('inf') for node in self.final_mst.nodes(): if self.final_mst.nodes[node].get('type') != 'building': node_point = Point(node) distance = centroid.distance(node_point) if distance < min_distance: min_distance = distance nearest_point = node if nearest_point: self.final_mst.add_edge(centroid_tuple, nearest_point, weight=min_distance) def _convert_edge_weights_to_meters(self): """ Convert all edge weights in the final MST graph to meters using the Haversine formula. """ for u, v, data in self.final_mst.edges(data=True): lon1, lat1 = u lon2, lat2 = v distance = haversine(lon1, lat1, lon2, lat2) self.final_mst[u][v]['weight'] = distance def plot_network_graph(self): """ Plot the network graph using matplotlib and networkx. """ plt.figure(figsize=(15, 10)) pos = {node: (node[0], node[1]) for node in self.final_mst.nodes()} nx.draw_networkx_nodes(self.final_mst, pos, node_color='blue', node_size=50) nx.draw_networkx_edges(self.final_mst, pos, edge_color='gray') plt.title('District Heating Network Graph') plt.axis('off') plt.show()