feature: update dhn generator with the parent repo

This commit is contained in:
Majid Rezaei 2024-07-30 17:11:58 -04:00
parent 904fe91e5a
commit 4e75024817
2 changed files with 166 additions and 122 deletions

View File

@ -86,4 +86,4 @@ UsageFactory('nrcan', city).enrich()
# # Save the plot
# plt.savefig('plot_nrcan.png', bbox_inches='tight')
# plt.close()
print('test')
print('test')

View File

@ -5,7 +5,11 @@ import networkx as nx
from typing import List, Tuple
from rtree import index
import math
import logging
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logging.getLogger('numexpr').setLevel(logging.ERROR)
def haversine(lon1, lat1, lon2, lat2):
"""
@ -42,69 +46,81 @@ class DistrictHeatingNetworkCreator:
Main method to execute the district heating network creation process.
:return: NetworkX graph with nodes and edges representing the network.
"""
self._load_and_process_data()
self._find_nearest_roads()
self._find_nearest_points()
self._break_down_roads()
self._create_graph()
self._create_mst()
self._iteratively_remove_edges()
self._add_centroids_to_mst()
self._convert_edge_weights_to_meters()
return self.final_mst
try:
self._load_and_process_data()
self._find_nearest_roads()
self._find_nearest_points()
self._break_down_roads()
self._create_graph()
self._create_mst()
self._iteratively_remove_edges()
self._add_centroids_to_mst()
self._convert_edge_weights_to_meters()
return self.final_mst
except Exception as e:
logging.error(f"Error during network creation: {e}")
raise
def _load_and_process_data(self):
"""
Load and process the building and road data.
"""
# Load building data
with open(self.buildings_file, 'r') as file:
city = json.load(file)
try:
# Load building data
with open(self.buildings_file, 'r') as file:
city = json.load(file)
self.centroids = []
self.building_ids = []
buildings = city['features']
for building in buildings:
coordinates = building['geometry']['coordinates'][0]
building_polygon = Polygon(coordinates)
centroid = building_polygon.centroid
self.centroids.append(centroid)
self.building_ids.append(building['id'])
self.centroids = []
self.building_ids = []
buildings = city['features']
for building in buildings:
coordinates = building['geometry']['coordinates'][0]
building_polygon = Polygon(coordinates)
centroid = building_polygon.centroid
self.centroids.append(centroid)
self.building_ids.append(building['id'])
# Load road data
with open(self.roads_file, 'r') as file:
roads = json.load(file)
# Load road data
with open(self.roads_file, 'r') as file:
roads = json.load(file)
line_features = [feature for feature in roads['features'] if feature['geometry']['type'] == 'LineString']
line_features = [feature for feature in roads['features'] if feature['geometry']['type'] == 'LineString']
self.lines = [LineString(feature['geometry']['coordinates']) for feature in line_features]
self.cleaned_lines = [LineString([line.coords[0], line.coords[-1]]) for line in self.lines]
self.lines = [LineString(feature['geometry']['coordinates']) for feature in line_features]
self.cleaned_lines = [LineString([line.coords[0], line.coords[-1]]) for line in self.lines]
except Exception as e:
logging.error(f"Error loading and processing data: {e}")
raise
def _find_nearest_roads(self):
"""
Find the nearest road for each building centroid.
"""
self.closest_roads = []
unique_roads_set = set()
try:
self.closest_roads = []
unique_roads_set = set()
# Create spatial index for roads
idx = index.Index()
for pos, line in enumerate(self.cleaned_lines):
idx.insert(pos, line.bounds)
# Create spatial index for roads
idx = index.Index()
for pos, line in enumerate(self.cleaned_lines):
idx.insert(pos, line.bounds)
for centroid in self.centroids:
min_distance = float('inf')
closest_road = None
for pos in idx.nearest(centroid.bounds, 10):
road = self.cleaned_lines[pos]
distance = road.distance(centroid)
if distance < min_distance:
min_distance = distance
closest_road = road
for centroid in self.centroids:
min_distance = float('inf')
closest_road = None
for pos in idx.nearest(centroid.bounds, 10):
road = self.cleaned_lines[pos]
distance = road.distance(centroid)
if distance < min_distance:
min_distance = distance
closest_road = road
if closest_road and closest_road.wkt not in unique_roads_set:
unique_roads_set.add(closest_road.wkt)
self.closest_roads.append(closest_road)
if closest_road and closest_road.wkt not in unique_roads_set:
unique_roads_set.add(closest_road.wkt)
self.closest_roads.append(closest_road)
except Exception as e:
logging.error(f"Error finding nearest roads: {e}")
raise
def _find_nearest_points(self):
"""
@ -113,19 +129,23 @@ class DistrictHeatingNetworkCreator:
def find_nearest_point_on_line(point: Point, line: LineString) -> Point:
return line.interpolate(line.project(point))
self.nearest_points = []
for centroid in self.centroids:
min_distance = float('inf')
closest_road = None
for road in self.closest_roads:
distance = centroid.distance(road)
if distance < min_distance:
min_distance = distance
closest_road = road
try:
self.nearest_points = []
for centroid in self.centroids:
min_distance = float('inf')
closest_road = None
for road in self.closest_roads:
distance = centroid.distance(road)
if distance < min_distance:
min_distance = distance
closest_road = road
if closest_road:
nearest_point = find_nearest_point_on_line(centroid, closest_road)
self.nearest_points.append(nearest_point)
if closest_road:
nearest_point = find_nearest_point_on_line(centroid, closest_road)
self.nearest_points.append(nearest_point)
except Exception as e:
logging.error(f"Error finding nearest points: {e}")
raise
def _break_down_roads(self):
"""
@ -144,19 +164,27 @@ class DistrictHeatingNetworkCreator:
new_segments.append(segment)
return new_segments
self.new_segments = break_down_roads(self.closest_roads, self.nearest_points)
self.cleaned_lines = [line for line in self.cleaned_lines if line not in self.closest_roads]
self.cleaned_lines.extend(self.new_segments)
try:
self.new_segments = break_down_roads(self.closest_roads, self.nearest_points)
self.cleaned_lines = [line for line in self.cleaned_lines if line not in self.closest_roads]
self.cleaned_lines.extend(self.new_segments)
except Exception as e:
logging.error(f"Error breaking down roads: {e}")
raise
def _create_graph(self):
"""
Create a NetworkX graph from the cleaned lines.
"""
self.G = nx.Graph()
for line in self.cleaned_lines:
coords = list(line.coords)
for i in range(len(coords) - 1):
self.G.add_edge(coords[i], coords[i + 1], weight=Point(coords[i]).distance(Point(coords[i + 1])))
try:
self.G = nx.Graph()
for line in self.cleaned_lines:
coords = list(line.coords)
for i in range(len(coords) - 1):
self.G.add_edge(coords[i], coords[i + 1], weight=Point(coords[i]).distance(Point(coords[i + 1])))
except Exception as e:
logging.error(f"Error creating graph: {e}")
raise
def _create_mst(self):
"""
@ -174,18 +202,22 @@ class DistrictHeatingNetworkCreator:
edges.extend((u, v, g[u][v]['weight']) for u, v in path_edges)
return edges
edges = find_paths_between_nearest_points(self.G, self.nearest_points)
h = nx.Graph()
h.add_weighted_edges_from(edges)
mst = nx.minimum_spanning_tree(h, weight='weight')
final_edges = []
for u, v in mst.edges():
if nx.has_path(self.G, u, v):
path = nx.shortest_path(self.G, source=u, target=v, weight='weight')
path_edges = list(zip(path[:-1], path[1:]))
final_edges.extend((x, y, self.G[x][y]['weight']) for x, y in path_edges)
self.final_mst = nx.Graph()
self.final_mst.add_weighted_edges_from(final_edges)
try:
edges = find_paths_between_nearest_points(self.G, self.nearest_points)
h = nx.Graph()
h.add_weighted_edges_from(edges)
mst = nx.minimum_spanning_tree(h, weight='weight')
final_edges = []
for u, v in mst.edges():
if nx.has_path(self.G, u, v):
path = nx.shortest_path(self.G, source=u, target=v, weight='weight')
path_edges = list(zip(path[:-1], path[1:]))
final_edges.extend((x, y, self.G[x][y]['weight']) for x, y in path_edges)
self.final_mst = nx.Graph()
self.final_mst.add_weighted_edges_from(final_edges)
except Exception as e:
logging.error(f"Error creating MST: {e}")
raise
def _iteratively_remove_edges(self):
"""
@ -209,65 +241,77 @@ class DistrictHeatingNetworkCreator:
nodes_to_remove.append(node)
return nodes_to_remove
edges_to_remove = find_edges_to_remove(self.final_mst)
self.final_mst_steps = [list(self.final_mst.edges(data=True))]
while edges_to_remove or find_nodes_to_remove(self.final_mst):
self.final_mst.remove_edges_from(edges_to_remove)
nodes_to_remove = find_nodes_to_remove(self.final_mst)
self.final_mst.remove_nodes_from(nodes_to_remove)
try:
edges_to_remove = find_edges_to_remove(self.final_mst)
self.final_mst_steps.append(list(self.final_mst.edges(data=True)))
self.final_mst_steps = [list(self.final_mst.edges(data=True))]
def find_single_connection_street_nodes(graph: nx.Graph) -> List[Tuple]:
single_connection_street_nodes = []
for node in graph.nodes():
if node not in nearest_points_tuples and graph.degree(node) == 1:
single_connection_street_nodes.append(node)
return single_connection_street_nodes
while edges_to_remove or find_nodes_to_remove(self.final_mst):
self.final_mst.remove_edges_from(edges_to_remove)
nodes_to_remove = find_nodes_to_remove(self.final_mst)
self.final_mst.remove_nodes_from(nodes_to_remove)
edges_to_remove = find_edges_to_remove(self.final_mst)
self.final_mst_steps.append(list(self.final_mst.edges(data=True)))
single_connection_street_nodes = find_single_connection_street_nodes(self.final_mst)
def find_single_connection_street_nodes(graph: nx.Graph) -> List[Tuple]:
single_connection_street_nodes = []
for node in graph.nodes():
if node not in nearest_points_tuples and graph.degree(node) == 1:
single_connection_street_nodes.append(node)
return single_connection_street_nodes
while single_connection_street_nodes:
for node in single_connection_street_nodes:
neighbors = list(self.final_mst.neighbors(node))
self.final_mst.remove_node(node)
for neighbor in neighbors:
if self.final_mst.degree(neighbor) == 0:
self.final_mst.remove_node(neighbor)
single_connection_street_nodes = find_single_connection_street_nodes(self.final_mst)
self.final_mst_steps.append(list(self.final_mst.edges(data=True)))
while single_connection_street_nodes:
for node in single_connection_street_nodes:
neighbors = list(self.final_mst.neighbors(node))
self.final_mst.remove_node(node)
for neighbor in neighbors:
if self.final_mst.degree(neighbor) == 0:
self.final_mst.remove_node(neighbor)
single_connection_street_nodes = find_single_connection_street_nodes(self.final_mst)
self.final_mst_steps.append(list(self.final_mst.edges(data=True)))
except Exception as e:
logging.error(f"Error iteratively removing edges: {e}")
raise
def _add_centroids_to_mst(self):
"""
Add centroids to the final MST graph and connect them to their associated node on the graph.
"""
for i, centroid in enumerate(self.centroids):
centroid_tuple = (centroid.x, centroid.y)
building_id = self.building_ids[i]
self.final_mst.add_node(centroid_tuple, type='building', id=building_id)
nearest_point = None
min_distance = float('inf')
for node in self.final_mst.nodes():
if self.final_mst.nodes[node].get('type') != 'building':
node_point = Point(node)
distance = centroid.distance(node_point)
if distance < min_distance:
min_distance = distance
nearest_point = node
try:
for i, centroid in enumerate(self.centroids):
centroid_tuple = (centroid.x, centroid.y)
building_id = self.building_ids[i]
self.final_mst.add_node(centroid_tuple, type='building', id=building_id)
nearest_point = None
min_distance = float('inf')
for node in self.final_mst.nodes():
if self.final_mst.nodes[node].get('type') != 'building':
node_point = Point(node)
distance = centroid.distance(node_point)
if distance < min_distance:
min_distance = distance
nearest_point = node
if nearest_point:
self.final_mst.add_edge(centroid_tuple, nearest_point, weight=min_distance)
if nearest_point:
self.final_mst.add_edge(centroid_tuple, nearest_point, weight=min_distance)
except Exception as e:
logging.error(f"Error adding centroids to MST: {e}")
raise
def _convert_edge_weights_to_meters(self):
"""
Convert all edge weights in the final MST graph to meters using the Haversine formula.
"""
for u, v, data in self.final_mst.edges(data=True):
lon1, lat1 = u
lon2, lat2 = v
distance = haversine(lon1, lat1, lon2, lat2)
self.final_mst[u][v]['weight'] = distance
try:
for u, v, data in self.final_mst.edges(data=True):
lon1, lat1 = u
lon2, lat2 = v
distance = haversine(lon1, lat1, lon2, lat2)
self.final_mst[u][v]['weight'] = distance
except Exception as e:
logging.error(f"Error converting edge weights to meters: {e}")
raise
def plot_network_graph(self):
"""