From 6deffa9323287f83eefd5a90e200c82f82938809 Mon Sep 17 00:00:00 2001 From: Majid Rezaei Date: Wed, 31 Jul 2024 15:49:17 -0400 Subject: [PATCH] chore: fix indentation and change building id to name --- .../district_heating_network_creator.py | 552 +++++++++--------- 1 file changed, 276 insertions(+), 276 deletions(-) diff --git a/scripts/district_heating_network/district_heating_network_creator.py b/scripts/district_heating_network/district_heating_network_creator.py index e66e3e2c..4ccbb7f8 100644 --- a/scripts/district_heating_network/district_heating_network_creator.py +++ b/scripts/district_heating_network/district_heating_network_creator.py @@ -12,315 +12,315 @@ logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %( logging.getLogger('numexpr').setLevel(logging.ERROR) def haversine(lon1, lat1, lon2, lat2): - """ - Calculate the great-circle distance between two points - on the Earth specified by their longitude and latitude. - """ - R = 6371000 # Radius of the Earth in meters - phi1 = math.radians(lat1) - phi2 = math.radians(lat2) - delta_phi = math.radians(lat2 - lat1) - delta_lambda = math.radians(lon2 - lon1) + """ + Calculate the great-circle distance between two points + on the Earth specified by their longitude and latitude. + """ + R = 6371000 # Radius of the Earth in meters + phi1 = math.radians(lat1) + phi2 = math.radians(lat2) + delta_phi = math.radians(lat2 - lat1) + delta_lambda = math.radians(lon2 - lon1) - a = math.sin(delta_phi / 2.0) ** 2 + \ - math.cos(phi1) * math.cos(phi2) * \ - math.sin(delta_lambda / 2.0) ** 2 + a = math.sin(delta_phi / 2.0) ** 2 + \ + math.cos(phi1) * math.cos(phi2) * \ + math.sin(delta_lambda / 2.0) ** 2 - c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a)) - return R * c # Output distance in meters + c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a)) + return R * c # Output distance in meters class DistrictHeatingNetworkCreator: - def __init__(self, buildings_file: str, roads_file: str): - """ - Initialize the class with paths to the buildings and roads data files. + def __init__(self, buildings_file: str, roads_file: str): + """ + Initialize the class with paths to the buildings and roads data files. - :param buildings_file: Path to the GeoJSON file containing building data. - :param roads_file: Path to the GeoJSON file containing roads data. - """ - self.buildings_file = buildings_file - self.roads_file = roads_file + :param buildings_file: Path to the GeoJSON file containing building data. + :param roads_file: Path to the GeoJSON file containing roads data. + """ + self.buildings_file = buildings_file + self.roads_file = roads_file - def run(self) -> nx.Graph: - """ - Main method to execute the district heating network creation process. - :return: NetworkX graph with nodes and edges representing the network. - """ - try: - self._load_and_process_data() - self._find_nearest_roads() - self._find_nearest_points() - self._break_down_roads() - self._create_graph() - self._create_mst() - self._iteratively_remove_edges() - self._add_centroids_to_mst() - self._convert_edge_weights_to_meters() - return self.final_mst - except Exception as e: - logging.error(f"Error during network creation: {e}") - raise + def run(self) -> nx.Graph: + """ + Main method to execute the district heating network creation process. + :return: NetworkX graph with nodes and edges representing the network. + """ + try: + self._load_and_process_data() + self._find_nearest_roads() + self._find_nearest_points() + self._break_down_roads() + self._create_graph() + self._create_mst() + self._iteratively_remove_edges() + self._add_centroids_to_mst() + self._convert_edge_weights_to_meters() + return self.final_mst + except Exception as e: + logging.error(f"Error during network creation: {e}") + raise - def _load_and_process_data(self): - """ - Load and process the building and road data. - """ - try: - # Load building data - with open(self.buildings_file, 'r') as file: - city = json.load(file) + def _load_and_process_data(self): + """ + Load and process the building and road data. + """ + try: + # Load building data + with open(self.buildings_file, 'r') as file: + city = json.load(file) - self.centroids = [] - self.building_names = [] - buildings = city['features'] - for building in buildings: - coordinates = building['geometry']['coordinates'][0] - building_polygon = Polygon(coordinates) - centroid = building_polygon.centroid - self.centroids.append(centroid) - self.building_names.append(building['id']) + self.centroids = [] + self.building_names = [] + buildings = city['features'] + for building in buildings: + coordinates = building['geometry']['coordinates'][0] + building_polygon = Polygon(coordinates) + centroid = building_polygon.centroid + self.centroids.append(centroid) + self.building_names.append(str(building['id'])) - # Load road data - with open(self.roads_file, 'r') as file: - roads = json.load(file) + # Load road data + with open(self.roads_file, 'r') as file: + roads = json.load(file) - line_features = [feature for feature in roads['features'] if feature['geometry']['type'] == 'LineString'] + line_features = [feature for feature in roads['features'] if feature['geometry']['type'] == 'LineString'] - self.lines = [LineString(feature['geometry']['coordinates']) for feature in line_features] - self.cleaned_lines = [LineString([line.coords[0], line.coords[-1]]) for line in self.lines] - except Exception as e: - logging.error(f"Error loading and processing data: {e}") - raise + self.lines = [LineString(feature['geometry']['coordinates']) for feature in line_features] + self.cleaned_lines = [LineString([line.coords[0], line.coords[-1]]) for line in self.lines] + except Exception as e: + logging.error(f"Error loading and processing data: {e}") + raise - def _find_nearest_roads(self): - """ - Find the nearest road for each building centroid. - """ - try: - self.closest_roads = [] - unique_roads_set = set() + def _find_nearest_roads(self): + """ + Find the nearest road for each building centroid. + """ + try: + self.closest_roads = [] + unique_roads_set = set() - # Create spatial index for roads - idx = index.Index() - for pos, line in enumerate(self.cleaned_lines): - idx.insert(pos, line.bounds) + # Create spatial index for roads + idx = index.Index() + for pos, line in enumerate(self.cleaned_lines): + idx.insert(pos, line.bounds) - for centroid in self.centroids: - min_distance = float('inf') - closest_road = None - for pos in idx.nearest(centroid.bounds, 10): - road = self.cleaned_lines[pos] - distance = road.distance(centroid) - if distance < min_distance: - min_distance = distance - closest_road = road + for centroid in self.centroids: + min_distance = float('inf') + closest_road = None + for pos in idx.nearest(centroid.bounds, 10): + road = self.cleaned_lines[pos] + distance = road.distance(centroid) + if distance < min_distance: + min_distance = distance + closest_road = road - if closest_road and closest_road.wkt not in unique_roads_set: - unique_roads_set.add(closest_road.wkt) - self.closest_roads.append(closest_road) - except Exception as e: - logging.error(f"Error finding nearest roads: {e}") - raise + if closest_road and closest_road.wkt not in unique_roads_set: + unique_roads_set.add(closest_road.wkt) + self.closest_roads.append(closest_road) + except Exception as e: + logging.error(f"Error finding nearest roads: {e}") + raise - def _find_nearest_points(self): - """ - Find the nearest point on each closest road for each centroid. - """ - def find_nearest_point_on_line(point: Point, line: LineString) -> Point: - return line.interpolate(line.project(point)) + def _find_nearest_points(self): + """ + Find the nearest point on each closest road for each centroid. + """ + def find_nearest_point_on_line(point: Point, line: LineString) -> Point: + return line.interpolate(line.project(point)) - try: - self.nearest_points = [] - for centroid in self.centroids: - min_distance = float('inf') - closest_road = None - for road in self.closest_roads: - distance = centroid.distance(road) - if distance < min_distance: - min_distance = distance - closest_road = road + try: + self.nearest_points = [] + for centroid in self.centroids: + min_distance = float('inf') + closest_road = None + for road in self.closest_roads: + distance = centroid.distance(road) + if distance < min_distance: + min_distance = distance + closest_road = road - if closest_road: - nearest_point = find_nearest_point_on_line(centroid, closest_road) - self.nearest_points.append(nearest_point) - except Exception as e: - logging.error(f"Error finding nearest points: {e}") - raise + if closest_road: + nearest_point = find_nearest_point_on_line(centroid, closest_road) + self.nearest_points.append(nearest_point) + except Exception as e: + logging.error(f"Error finding nearest points: {e}") + raise - def _break_down_roads(self): - """ - Break down roads into segments connecting nearest points. - """ - def break_down_roads(closest_roads: List[LineString], nearest_points_list: List[Point]) -> List[LineString]: - new_segments = [] - for road in closest_roads: - coords = list(road.coords) - points_on_road = [point for point in nearest_points_list if road.distance(point) < 0.000000001] - sorted_points = sorted(points_on_road, key=lambda point: road.project(point)) - sorted_points.insert(0, Point(coords[0])) - sorted_points.append(Point(coords[-1])) - for i in range(len(sorted_points) - 1): - segment = LineString([sorted_points[i], sorted_points[i + 1]]) - new_segments.append(segment) - return new_segments + def _break_down_roads(self): + """ + Break down roads into segments connecting nearest points. + """ + def break_down_roads(closest_roads: List[LineString], nearest_points_list: List[Point]) -> List[LineString]: + new_segments = [] + for road in closest_roads: + coords = list(road.coords) + points_on_road = [point for point in nearest_points_list if road.distance(point) < 0.000000001] + sorted_points = sorted(points_on_road, key=lambda point: road.project(point)) + sorted_points.insert(0, Point(coords[0])) + sorted_points.append(Point(coords[-1])) + for i in range(len(sorted_points) - 1): + segment = LineString([sorted_points[i], sorted_points[i + 1]]) + new_segments.append(segment) + return new_segments - try: - self.new_segments = break_down_roads(self.closest_roads, self.nearest_points) - self.cleaned_lines = [line for line in self.cleaned_lines if line not in self.closest_roads] - self.cleaned_lines.extend(self.new_segments) - except Exception as e: - logging.error(f"Error breaking down roads: {e}") - raise + try: + self.new_segments = break_down_roads(self.closest_roads, self.nearest_points) + self.cleaned_lines = [line for line in self.cleaned_lines if line not in self.closest_roads] + self.cleaned_lines.extend(self.new_segments) + except Exception as e: + logging.error(f"Error breaking down roads: {e}") + raise - def _create_graph(self): - """ - Create a NetworkX graph from the cleaned lines. - """ - try: - self.G = nx.Graph() - for line in self.cleaned_lines: - coords = list(line.coords) - for i in range(len(coords) - 1): - self.G.add_edge(coords[i], coords[i + 1], weight=Point(coords[i]).distance(Point(coords[i + 1]))) - except Exception as e: - logging.error(f"Error creating graph: {e}") - raise + def _create_graph(self): + """ + Create a NetworkX graph from the cleaned lines. + """ + try: + self.G = nx.Graph() + for line in self.cleaned_lines: + coords = list(line.coords) + for i in range(len(coords) - 1): + self.G.add_edge(coords[i], coords[i + 1], weight=Point(coords[i]).distance(Point(coords[i + 1]))) + except Exception as e: + logging.error(f"Error creating graph: {e}") + raise - def _create_mst(self): - """ - Create a Minimum Spanning Tree (MST) from the graph. - """ - def find_paths_between_nearest_points(g: nx.Graph, nearest_points: List[Point]) -> List[Tuple]: - edges = [] - for i, start_point in enumerate(nearest_points): - start = (start_point.x, start_point.y) - for end_point in nearest_points[i + 1:]: - end = (end_point.x, end_point.y) - if nx.has_path(g, start, end): - path = nx.shortest_path(g, source=start, target=end, weight='weight') - path_edges = list(zip(path[:-1], path[1:])) - edges.extend((u, v, g[u][v]['weight']) for u, v in path_edges) - return edges + def _create_mst(self): + """ + Create a Minimum Spanning Tree (MST) from the graph. + """ + def find_paths_between_nearest_points(g: nx.Graph, nearest_points: List[Point]) -> List[Tuple]: + edges = [] + for i, start_point in enumerate(nearest_points): + start = (start_point.x, start_point.y) + for end_point in nearest_points[i + 1:]: + end = (end_point.x, end_point.y) + if nx.has_path(g, start, end): + path = nx.shortest_path(g, source=start, target=end, weight='weight') + path_edges = list(zip(path[:-1], path[1:])) + edges.extend((u, v, g[u][v]['weight']) for u, v in path_edges) + return edges - try: - edges = find_paths_between_nearest_points(self.G, self.nearest_points) - h = nx.Graph() - h.add_weighted_edges_from(edges) - mst = nx.minimum_spanning_tree(h, weight='weight') - final_edges = [] - for u, v in mst.edges(): - if nx.has_path(self.G, u, v): - path = nx.shortest_path(self.G, source=u, target=v, weight='weight') - path_edges = list(zip(path[:-1], path[1:])) - final_edges.extend((x, y, self.G[x][y]['weight']) for x, y in path_edges) - self.final_mst = nx.Graph() - self.final_mst.add_weighted_edges_from(final_edges) - except Exception as e: - logging.error(f"Error creating MST: {e}") - raise + try: + edges = find_paths_between_nearest_points(self.G, self.nearest_points) + h = nx.Graph() + h.add_weighted_edges_from(edges) + mst = nx.minimum_spanning_tree(h, weight='weight') + final_edges = [] + for u, v in mst.edges(): + if nx.has_path(self.G, u, v): + path = nx.shortest_path(self.G, source=u, target=v, weight='weight') + path_edges = list(zip(path[:-1], path[1:])) + final_edges.extend((x, y, self.G[x][y]['weight']) for x, y in path_edges) + self.final_mst = nx.Graph() + self.final_mst.add_weighted_edges_from(final_edges) + except Exception as e: + logging.error(f"Error creating MST: {e}") + raise - def _iteratively_remove_edges(self): - """ - Iteratively remove edges that do not have any nearest points and have one end with only one connection. - Also remove nodes that don't have any connections and street nodes with only one connection. - """ - nearest_points_tuples = [(point.x, point.y) for point in self.nearest_points] + def _iteratively_remove_edges(self): + """ + Iteratively remove edges that do not have any nearest points and have one end with only one connection. + Also remove nodes that don't have any connections and street nodes with only one connection. + """ + nearest_points_tuples = [(point.x, point.y) for point in self.nearest_points] - def find_edges_to_remove(graph: nx.Graph) -> List[Tuple]: - edges_to_remove = [] - for u, v, d in graph.edges(data=True): - if u not in nearest_points_tuples and v not in nearest_points_tuples: - if graph.degree(u) == 1 or graph.degree(v) == 1: - edges_to_remove.append((u, v, d)) - return edges_to_remove + def find_edges_to_remove(graph: nx.Graph) -> List[Tuple]: + edges_to_remove = [] + for u, v, d in graph.edges(data=True): + if u not in nearest_points_tuples and v not in nearest_points_tuples: + if graph.degree(u) == 1 or graph.degree(v) == 1: + edges_to_remove.append((u, v, d)) + return edges_to_remove - def find_nodes_to_remove(graph: nx.Graph) -> List[Tuple]: - nodes_to_remove = [] - for node in graph.nodes(): - if graph.degree(node) == 0: - nodes_to_remove.append(node) - return nodes_to_remove + def find_nodes_to_remove(graph: nx.Graph) -> List[Tuple]: + nodes_to_remove = [] + for node in graph.nodes(): + if graph.degree(node) == 0: + nodes_to_remove.append(node) + return nodes_to_remove - try: - edges_to_remove = find_edges_to_remove(self.final_mst) - self.final_mst_steps = [list(self.final_mst.edges(data=True))] + try: + edges_to_remove = find_edges_to_remove(self.final_mst) + self.final_mst_steps = [list(self.final_mst.edges(data=True))] - while edges_to_remove or find_nodes_to_remove(self.final_mst): - self.final_mst.remove_edges_from(edges_to_remove) - nodes_to_remove = find_nodes_to_remove(self.final_mst) - self.final_mst.remove_nodes_from(nodes_to_remove) - edges_to_remove = find_edges_to_remove(self.final_mst) - self.final_mst_steps.append(list(self.final_mst.edges(data=True))) + while edges_to_remove or find_nodes_to_remove(self.final_mst): + self.final_mst.remove_edges_from(edges_to_remove) + nodes_to_remove = find_nodes_to_remove(self.final_mst) + self.final_mst.remove_nodes_from(nodes_to_remove) + edges_to_remove = find_edges_to_remove(self.final_mst) + self.final_mst_steps.append(list(self.final_mst.edges(data=True))) - def find_single_connection_street_nodes(graph: nx.Graph) -> List[Tuple]: - single_connection_street_nodes = [] - for node in graph.nodes(): - if node not in nearest_points_tuples and graph.degree(node) == 1: - single_connection_street_nodes.append(node) - return single_connection_street_nodes + def find_single_connection_street_nodes(graph: nx.Graph) -> List[Tuple]: + single_connection_street_nodes = [] + for node in graph.nodes(): + if node not in nearest_points_tuples and graph.degree(node) == 1: + single_connection_street_nodes.append(node) + return single_connection_street_nodes - single_connection_street_nodes = find_single_connection_street_nodes(self.final_mst) + single_connection_street_nodes = find_single_connection_street_nodes(self.final_mst) - while single_connection_street_nodes: - for node in single_connection_street_nodes: - neighbors = list(self.final_mst.neighbors(node)) - self.final_mst.remove_node(node) - for neighbor in neighbors: - if self.final_mst.degree(neighbor) == 0: - self.final_mst.remove_node(neighbor) - single_connection_street_nodes = find_single_connection_street_nodes(self.final_mst) - self.final_mst_steps.append(list(self.final_mst.edges(data=True))) - except Exception as e: - logging.error(f"Error iteratively removing edges: {e}") - raise + while single_connection_street_nodes: + for node in single_connection_street_nodes: + neighbors = list(self.final_mst.neighbors(node)) + self.final_mst.remove_node(node) + for neighbor in neighbors: + if self.final_mst.degree(neighbor) == 0: + self.final_mst.remove_node(neighbor) + single_connection_street_nodes = find_single_connection_street_nodes(self.final_mst) + self.final_mst_steps.append(list(self.final_mst.edges(data=True))) + except Exception as e: + logging.error(f"Error iteratively removing edges: {e}") + raise - def _add_centroids_to_mst(self): - """ - Add centroids to the final MST graph and connect them to their associated node on the graph. - """ - try: - for i, centroid in enumerate(self.centroids): - centroid_tuple = (centroid.x, centroid.y) - building_name = self.building_names[i] - self.final_mst.add_node(centroid_tuple, type='building', id=building_name) - nearest_point = None - min_distance = float('inf') - for node in self.final_mst.nodes(): - if self.final_mst.nodes[node].get('type') != 'building': - node_point = Point(node) - distance = centroid.distance(node_point) - if distance < min_distance: - min_distance = distance - nearest_point = node + def _add_centroids_to_mst(self): + """ + Add centroids to the final MST graph and connect them to their associated node on the graph. + """ + try: + for i, centroid in enumerate(self.centroids): + centroid_tuple = (centroid.x, centroid.y) + building_name = self.building_names[i] + self.final_mst.add_node(centroid_tuple, type='building', id=building_name) + nearest_point = None + min_distance = float('inf') + for node in self.final_mst.nodes(): + if self.final_mst.nodes[node].get('type') != 'building': + node_point = Point(node) + distance = centroid.distance(node_point) + if distance < min_distance: + min_distance = distance + nearest_point = node - if nearest_point: - self.final_mst.add_edge(centroid_tuple, nearest_point, weight=min_distance) - except Exception as e: - logging.error(f"Error adding centroids to MST: {e}") - raise + if nearest_point: + self.final_mst.add_edge(centroid_tuple, nearest_point, weight=min_distance) + except Exception as e: + logging.error(f"Error adding centroids to MST: {e}") + raise - def _convert_edge_weights_to_meters(self): - """ - Convert all edge weights in the final MST graph to meters using the Haversine formula. - """ - try: - for u, v, data in self.final_mst.edges(data=True): - lon1, lat1 = u - lon2, lat2 = v - distance = haversine(lon1, lat1, lon2, lat2) - self.final_mst[u][v]['weight'] = distance - except Exception as e: - logging.error(f"Error converting edge weights to meters: {e}") - raise + def _convert_edge_weights_to_meters(self): + """ + Convert all edge weights in the final MST graph to meters using the Haversine formula. + """ + try: + for u, v, data in self.final_mst.edges(data=True): + lon1, lat1 = u + lon2, lat2 = v + distance = haversine(lon1, lat1, lon2, lat2) + self.final_mst[u][v]['weight'] = distance + except Exception as e: + logging.error(f"Error converting edge weights to meters: {e}") + raise - def plot_network_graph(self): - """ - Plot the network graph using matplotlib and networkx. - """ - plt.figure(figsize=(15, 10)) - pos = {node: (node[0], node[1]) for node in self.final_mst.nodes()} - nx.draw_networkx_nodes(self.final_mst, pos, node_color='blue', node_size=50) - nx.draw_networkx_edges(self.final_mst, pos, edge_color='gray') - plt.title('District Heating Network Graph') - plt.axis('off') - plt.show() + def plot_network_graph(self): + """ + Plot the network graph using matplotlib and networkx. + """ + plt.figure(figsize=(15, 10)) + pos = {node: (node[0], node[1]) for node in self.final_mst.nodes()} + nx.draw_networkx_nodes(self.final_mst, pos, node_color='blue', node_size=50) + nx.draw_networkx_edges(self.final_mst, pos, edge_color='gray') + plt.title('District Heating Network Graph') + plt.axis('off') + plt.show()