2024-01-24 17:39:05 -05:00
|
|
|
import json
|
|
|
|
import matplotlib.pyplot as plt
|
2024-06-10 22:50:27 -04:00
|
|
|
from shapely.geometry import Polygon, Point, LineString
|
2024-01-24 17:39:05 -05:00
|
|
|
import networkx as nx
|
2024-06-10 22:50:27 -04:00
|
|
|
from typing import List, Tuple
|
|
|
|
from rtree import index
|
2024-01-24 17:39:05 -05:00
|
|
|
|
|
|
|
|
|
|
|
class DistrictHeatingNetworkCreator:
|
2024-06-10 22:50:27 -04:00
|
|
|
def __init__(self, buildings_file: str, roads_file: str):
|
2024-01-24 17:39:05 -05:00
|
|
|
"""
|
|
|
|
Initialize the class with paths to the buildings and roads data files.
|
|
|
|
|
|
|
|
:param buildings_file: Path to the GeoJSON file containing building data.
|
2024-06-10 22:50:27 -04:00
|
|
|
:param roads_file: Path to the GeoJSON file containing roads data.
|
2024-01-24 17:39:05 -05:00
|
|
|
"""
|
|
|
|
self.buildings_file = buildings_file
|
|
|
|
self.roads_file = roads_file
|
|
|
|
|
2024-06-10 22:50:27 -04:00
|
|
|
def run(self) -> nx.Graph:
|
2024-01-24 17:39:05 -05:00
|
|
|
"""
|
|
|
|
Main method to execute the district heating network creation process.
|
|
|
|
:return: NetworkX graph with nodes and edges representing the network.
|
|
|
|
"""
|
|
|
|
self._load_and_process_data()
|
|
|
|
self._find_nearest_roads()
|
2024-06-10 22:50:27 -04:00
|
|
|
self._find_nearest_points()
|
|
|
|
self._break_down_roads()
|
|
|
|
self._create_graph()
|
|
|
|
self._create_mst()
|
|
|
|
self._iteratively_remove_edges()
|
|
|
|
self.add_centroids_to_mst() # Add centroids to the MST
|
|
|
|
return self.final_mst
|
2024-01-24 17:39:05 -05:00
|
|
|
|
|
|
|
def _load_and_process_data(self):
|
|
|
|
"""
|
|
|
|
Load and process the building and road data.
|
|
|
|
"""
|
|
|
|
# Load building data
|
|
|
|
with open(self.buildings_file, 'r') as file:
|
|
|
|
city = json.load(file)
|
|
|
|
|
2024-06-10 22:50:27 -04:00
|
|
|
self.centroids = []
|
|
|
|
self.building_ids = []
|
2024-01-24 17:39:05 -05:00
|
|
|
buildings = city['features']
|
|
|
|
for building in buildings:
|
|
|
|
coordinates = building['geometry']['coordinates'][0]
|
|
|
|
building_polygon = Polygon(coordinates)
|
|
|
|
centroid = building_polygon.centroid
|
2024-06-10 22:50:27 -04:00
|
|
|
self.centroids.append(centroid)
|
|
|
|
self.building_ids.append(building['id'])
|
2024-01-24 17:39:05 -05:00
|
|
|
|
|
|
|
# Load road data
|
2024-06-10 22:50:27 -04:00
|
|
|
with open(self.roads_file, 'r') as file:
|
|
|
|
roads = json.load(file)
|
|
|
|
|
|
|
|
line_features = [feature for feature in roads['features'] if feature['geometry']['type'] == 'LineString']
|
2024-01-24 17:39:05 -05:00
|
|
|
|
2024-06-10 22:50:27 -04:00
|
|
|
self.lines = [LineString(feature['geometry']['coordinates']) for feature in line_features]
|
|
|
|
self.cleaned_lines = [LineString([line.coords[0], line.coords[-1]]) for line in self.lines]
|
2024-01-24 17:39:05 -05:00
|
|
|
|
|
|
|
def _find_nearest_roads(self):
|
|
|
|
"""
|
|
|
|
Find the nearest road for each building centroid.
|
|
|
|
"""
|
2024-06-10 22:50:27 -04:00
|
|
|
self.closest_roads = []
|
|
|
|
unique_roads_set = set()
|
|
|
|
|
|
|
|
# Create spatial index for roads
|
|
|
|
idx = index.Index()
|
|
|
|
for pos, line in enumerate(self.cleaned_lines):
|
|
|
|
idx.insert(pos, line.bounds)
|
|
|
|
|
|
|
|
for centroid in self.centroids:
|
|
|
|
min_distance = float('inf')
|
|
|
|
closest_road = None
|
|
|
|
for pos in idx.nearest(centroid.bounds, 10):
|
|
|
|
road = self.cleaned_lines[pos]
|
|
|
|
distance = road.distance(centroid)
|
|
|
|
if distance < min_distance:
|
|
|
|
min_distance = distance
|
|
|
|
closest_road = road
|
|
|
|
|
|
|
|
if closest_road and closest_road.wkt not in unique_roads_set:
|
|
|
|
unique_roads_set.add(closest_road.wkt)
|
|
|
|
self.closest_roads.append(closest_road)
|
|
|
|
|
|
|
|
def _find_nearest_points(self):
|
|
|
|
"""
|
|
|
|
Find the nearest point on each closest road for each centroid.
|
|
|
|
"""
|
|
|
|
def find_nearest_point_on_line(point: Point, line: LineString) -> Point:
|
|
|
|
return line.interpolate(line.project(point))
|
2024-01-24 17:39:05 -05:00
|
|
|
|
|
|
|
self.nearest_points = []
|
2024-06-10 22:50:27 -04:00
|
|
|
for centroid in self.centroids:
|
|
|
|
min_distance = float('inf')
|
|
|
|
closest_road = None
|
|
|
|
for road in self.closest_roads:
|
|
|
|
distance = centroid.distance(road)
|
|
|
|
if distance < min_distance:
|
|
|
|
min_distance = distance
|
|
|
|
closest_road = road
|
|
|
|
|
|
|
|
if closest_road:
|
|
|
|
nearest_point = find_nearest_point_on_line(centroid, closest_road)
|
|
|
|
self.nearest_points.append(nearest_point)
|
|
|
|
|
|
|
|
def _break_down_roads(self):
|
2024-01-24 17:39:05 -05:00
|
|
|
"""
|
2024-06-10 22:50:27 -04:00
|
|
|
Break down roads into segments connecting nearest points.
|
2024-01-24 17:39:05 -05:00
|
|
|
"""
|
2024-06-10 22:50:27 -04:00
|
|
|
def break_down_roads(closest_roads: List[LineString], nearest_points_list: List[Point]) -> List[LineString]:
|
|
|
|
new_segments = []
|
|
|
|
for road in closest_roads:
|
|
|
|
coords = list(road.coords)
|
|
|
|
points_on_road = [point for point in nearest_points_list if road.distance(point) < 0.000000001]
|
|
|
|
sorted_points = sorted(points_on_road, key=lambda point: road.project(point))
|
|
|
|
sorted_points.insert(0, Point(coords[0]))
|
|
|
|
sorted_points.append(Point(coords[-1]))
|
|
|
|
for i in range(len(sorted_points) - 1):
|
|
|
|
segment = LineString([sorted_points[i], sorted_points[i + 1]])
|
|
|
|
new_segments.append(segment)
|
|
|
|
return new_segments
|
|
|
|
|
|
|
|
self.new_segments = break_down_roads(self.closest_roads, self.nearest_points)
|
|
|
|
self.cleaned_lines = [line for line in self.cleaned_lines if line not in self.closest_roads]
|
|
|
|
self.cleaned_lines.extend(self.new_segments)
|
|
|
|
|
|
|
|
def _create_graph(self):
|
|
|
|
"""
|
|
|
|
Create a NetworkX graph from the cleaned lines.
|
|
|
|
"""
|
|
|
|
self.G = nx.Graph()
|
|
|
|
for line in self.cleaned_lines:
|
|
|
|
coords = list(line.coords)
|
|
|
|
for i in range(len(coords) - 1):
|
|
|
|
self.G.add_edge(coords[i], coords[i + 1], weight=Point(coords[i]).distance(Point(coords[i + 1])))
|
2024-01-24 17:39:05 -05:00
|
|
|
|
2024-06-10 22:50:27 -04:00
|
|
|
def _create_mst(self):
|
|
|
|
"""
|
|
|
|
Create a Minimum Spanning Tree (MST) from the graph.
|
|
|
|
"""
|
|
|
|
def find_paths_between_nearest_points(g: nx.Graph, nearest_points: List[Point]) -> List[Tuple]:
|
|
|
|
edges = []
|
|
|
|
for i, start_point in enumerate(nearest_points):
|
|
|
|
start = (start_point.x, start_point.y)
|
|
|
|
for end_point in nearest_points[i + 1:]:
|
|
|
|
end = (end_point.x, end_point.y)
|
|
|
|
if nx.has_path(g, start, end):
|
|
|
|
path = nx.shortest_path(g, source=start, target=end, weight='weight')
|
|
|
|
path_edges = list(zip(path[:-1], path[1:]))
|
|
|
|
edges.extend((u, v, g[u][v]['weight']) for u, v in path_edges)
|
|
|
|
return edges
|
|
|
|
|
|
|
|
edges = find_paths_between_nearest_points(self.G, self.nearest_points)
|
|
|
|
h = nx.Graph()
|
|
|
|
h.add_weighted_edges_from(edges)
|
|
|
|
mst = nx.minimum_spanning_tree(h, weight='weight')
|
|
|
|
final_edges = []
|
|
|
|
for u, v in mst.edges():
|
|
|
|
if nx.has_path(self.G, u, v):
|
|
|
|
path = nx.shortest_path(self.G, source=u, target=v, weight='weight')
|
|
|
|
path_edges = list(zip(path[:-1], path[1:]))
|
|
|
|
final_edges.extend((x, y, self.G[x][y]['weight']) for x, y in path_edges)
|
|
|
|
self.final_mst = nx.Graph()
|
|
|
|
self.final_mst.add_weighted_edges_from(final_edges)
|
|
|
|
|
|
|
|
def _iteratively_remove_edges(self):
|
|
|
|
"""
|
|
|
|
Iteratively remove edges that do not have any nearest points and have one end with only one connection.
|
|
|
|
Also remove nodes that don't have any connections and street nodes with only one connection.
|
|
|
|
"""
|
|
|
|
nearest_points_tuples = [(point.x, point.y) for point in self.nearest_points]
|
|
|
|
|
|
|
|
def find_edges_to_remove(graph: nx.Graph) -> List[Tuple]:
|
|
|
|
edges_to_remove = []
|
|
|
|
for u, v, d in graph.edges(data=True):
|
|
|
|
if u not in nearest_points_tuples and v not in nearest_points_tuples:
|
|
|
|
if graph.degree(u) == 1 or graph.degree(v) == 1:
|
|
|
|
edges_to_remove.append((u, v, d))
|
|
|
|
return edges_to_remove
|
|
|
|
|
|
|
|
def find_nodes_to_remove(graph: nx.Graph) -> List[Tuple]:
|
|
|
|
nodes_to_remove = []
|
|
|
|
for node in graph.nodes():
|
|
|
|
if graph.degree(node) == 0:
|
|
|
|
nodes_to_remove.append(node)
|
|
|
|
return nodes_to_remove
|
|
|
|
|
|
|
|
edges_to_remove = find_edges_to_remove(self.final_mst)
|
|
|
|
self.final_mst_steps = [list(self.final_mst.edges(data=True))]
|
|
|
|
|
|
|
|
while edges_to_remove or find_nodes_to_remove(self.final_mst):
|
|
|
|
self.final_mst.remove_edges_from(edges_to_remove)
|
|
|
|
nodes_to_remove = find_nodes_to_remove(self.final_mst)
|
|
|
|
self.final_mst.remove_nodes_from(nodes_to_remove)
|
|
|
|
edges_to_remove = find_edges_to_remove(self.final_mst)
|
|
|
|
self.final_mst_steps.append(list(self.final_mst.edges(data=True)))
|
|
|
|
|
|
|
|
def find_single_connection_street_nodes(graph: nx.Graph) -> List[Tuple]:
|
|
|
|
single_connection_street_nodes = []
|
|
|
|
for node in graph.nodes():
|
|
|
|
if node not in nearest_points_tuples and graph.degree(node) == 1:
|
|
|
|
single_connection_street_nodes.append(node)
|
|
|
|
return single_connection_street_nodes
|
|
|
|
|
|
|
|
single_connection_street_nodes = find_single_connection_street_nodes(self.final_mst)
|
|
|
|
|
|
|
|
while single_connection_street_nodes:
|
|
|
|
for node in single_connection_street_nodes:
|
|
|
|
neighbors = list(self.final_mst.neighbors(node))
|
|
|
|
self.final_mst.remove_node(node)
|
|
|
|
for neighbor in neighbors:
|
|
|
|
if self.final_mst.degree(neighbor) == 0:
|
|
|
|
self.final_mst.remove_node(neighbor)
|
|
|
|
single_connection_street_nodes = find_single_connection_street_nodes(self.final_mst)
|
|
|
|
self.final_mst_steps.append(list(self.final_mst.edges(data=True)))
|
|
|
|
|
|
|
|
def add_centroids_to_mst(self):
|
|
|
|
"""
|
|
|
|
Add centroids to the final MST graph and connect them to their associated node on the graph.
|
|
|
|
"""
|
|
|
|
for centroid in self.centroids:
|
|
|
|
centroid_tuple = (centroid.x, centroid.y)
|
|
|
|
self.final_mst.add_node(centroid_tuple, type='centroid')
|
|
|
|
nearest_point = None
|
|
|
|
min_distance = float('inf')
|
|
|
|
for node in self.final_mst.nodes():
|
|
|
|
if self.final_mst.nodes[node].get('type') != 'centroid':
|
|
|
|
node_point = Point(node)
|
|
|
|
distance = centroid.distance(node_point)
|
|
|
|
if distance < min_distance:
|
|
|
|
min_distance = distance
|
|
|
|
nearest_point = node
|
|
|
|
|
|
|
|
if nearest_point:
|
|
|
|
self.final_mst.add_edge(centroid_tuple, nearest_point, weight=min_distance)
|
|
|
|
|
|
|
|
def plot_network_graph(self):
|
|
|
|
"""
|
|
|
|
Plot the network graph using matplotlib and networkx.
|
|
|
|
"""
|
|
|
|
plt.figure(figsize=(15, 10))
|
|
|
|
pos = {node: (node[0], node[1]) for node in self.final_mst.nodes()}
|
|
|
|
nx.draw_networkx_nodes(self.final_mst, pos, node_color='blue', node_size=50)
|
|
|
|
nx.draw_networkx_edges(self.final_mst, pos, edge_color='gray')
|
2024-01-24 17:39:05 -05:00
|
|
|
plt.title('District Heating Network Graph')
|
|
|
|
plt.axis('off')
|
|
|
|
plt.show()
|