feat: multi-objective optimization is doen

This commit is contained in:
Saeed Ranjbar 2024-10-04 13:16:17 +02:00
parent a694a8f47c
commit d8ad28e709
4 changed files with 372 additions and 68 deletions

View File

@ -33,7 +33,7 @@ class EnergySystemsSizingFactory:
"""
Size Energy Systems using a Single or Multi Objective GA
"""
OptimalSizing(self._city, optimization_scenario='cost').enrich_buildings()
OptimalSizing(self._city, optimization_scenario='cost_energy_consumption').enrich_buildings()
self._city.level_of_detail.energy_systems = 1
for building in self._city.buildings:
building.level_of_detail.energy_systems = 1

View File

@ -1,4 +1,5 @@
import copy
import math
import random
import hub.helpers.constants as cte
from energy_system_modelling_package.energy_system_modelling_factories.system_sizing_methods.genetic_algorithm.individual import \
@ -6,7 +7,7 @@ from energy_system_modelling_package.energy_system_modelling_factories.system_si
class GeneticAlgorithm:
def __init__(self, population_size=10, generations=10, crossover_rate=0.8, mutation_rate=0.1,
def __init__(self, population_size=50, generations=10, crossover_rate=0.8, mutation_rate=0.1,
optimization_scenario=None, output_path=None):
self.population_size = population_size
self.population = []
@ -53,14 +54,150 @@ class GeneticAlgorithm:
selected.append(copy.deepcopy(self.population[j]))
return selected
def nsga2_selection(self, population, fronts, crowding_distance):
"""
The selection used in multi-objective optimization based on non-dominated sorting and crowding distance
:return:
"""
new_population = []
i = 0
# Go through each front until we have enough individuals for the new population
while len(new_population) + len(fronts[i]) <= self.population_size:
for index in fronts[i]:
new_population.append(population[index])
i += 1
# Ensure we do not go out of bounds
if i >= len(fronts):
break
# If we still need more individuals to fill the population
if len(new_population) < self.population_size and i < len(fronts):
# Sort the individuals in the current front by crowding distance in descending order
fronts[i].sort(key=lambda x: crowding_distance[x], reverse=True)
for index in fronts[i]:
if len(new_population) < self.population_size:
new_population.append(population[index])
else:
break
return new_population
def fast_non_dominated_sort(self):
"""
Perform non-dominated sorting on the population based on multiple objective fitness scores
(e.g., life cycle cost (LCC) and energy consumption (LCE)).
This method assigns each individual in the population to different fronts based on dominance.
Returns:
A list of fronts where each front is a list of individual indices.
Front 0 is the best (non-dominated) front, followed by worse dominated fronts.
"""
# Number of individuals in the population
population_size = self.population_size
# s[p] contains individuals dominated by individual p
s = [[] for _ in range(population_size)]
# front[i] contains individuals in the i-th front (i = 0 is the best front)
front = [[]]
# n[p] is the count of individuals that dominate individual p
n = [0] * population_size
# rank[p] stores the front rank of individual p
rank = [0] * population_size
# Loop through each individual in the population (indexed by p)
for p in range(population_size):
s[p] = [] # Initialize the list of individuals p dominates
n[p] = 0 # Initialize the count of individuals dominating p
for q in range(population_size):
if self.dominates(self.population[p], self.population[q]):
# If individual p dominates individual q, add q to s[p]
s[p].append(q)
elif self.dominates(self.population[q], self.population[p]):
# If individual q dominates individual p, increment n[p]
n[p] += 1
# If no one dominates p, assign p to the first front (rank 0)
if n[p] == 0:
rank[p] = 0
front[0].append(p)
# Begin sorting into subsequent fronts
i = 0
while front[i]:
q = [] # Next front to populate
for p in front[i]: # For each individual p in the current front
for t in s[p]: # For each individual q dominated by p
# Ensure that `t` is a valid index (integer)
if isinstance(t, int) and 0 <= t < population_size:
n[t] -= 1 # Decrement n[q] since p is processed
if n[t] == 0: # If q is not dominated by any other individuals
rank[t] = i + 1 # Assign q to the next front
if t not in q: # Add q to the next front if not already added
q.append(t)
else:
# Debugging log to help identify the issue
print(f"Invalid value for t: {t}, expected an integer.")
i += 1 # Move to the next front
front.append(q) # Add the new front to the list of fronts
# Remove the last empty front (no individuals in it)
del front[-1]
# Return the list of fronts, each front containing indices of non-dominated individuals
return front
@staticmethod
def dominates(individual1, individual2):
"""
Checks if individual1 dominates individual2.
A solution dominates another if it is no worse in all objectives (LCC and LCE)
and better in at least one.
Args:
individual1: First individual (to check if it dominates individual2).
individual2: second individual (to check if it's dominated by individual1).
Returns:
True if individual1 dominates individual2, False otherwise.
"""
# Extract life cycle cost (LCC) and life cycle energy (LCE) from both individuals
lcc1, lce1 = individual1.individual['fitness_score']
lcc2, lce2 = individual2.individual['fitness_score']
# Check if individual1 dominates individual2
return (lcc1 <= lcc2 and lce1 <= lce2) and (lcc1 < lcc2 or lce1 < lce2)
def calculate_crowding_distance(self, front):
"""
Calculate the crowding distance for individuals in a given front based on objective values
(life cycle cost and energy consumption).
:param front: A list of indices of individuals in the current front.
:return crowding_distance: A list of crowding distances for individuals in the front.
"""
# Initialize distance list for the entire population
crowding_distance = [0] * len(self.population)
# There are two objectives: LCC (self.population[i].individual['lcc']) and
# LCE (self.population[i].individual['total_energy_consumption'])
num_objectives = 2
# For each objective (LCC and LCE)
for m, objective in enumerate(['lcc', 'total_energy_consumption']):
# Sort the front based on the objective 'm'
front = sorted(front, key=lambda x: self.population[x].individual[objective])
# Assign infinite distance to boundary individuals
crowding_distance[front[0]] = float('inf')
crowding_distance[front[-1]] = float('inf')
# Calculate crowding distance for individuals in between
for i in range(1, len(front) - 1):
objective_min = self.population[front[0]].individual[objective]
objective_max = self.population[front[-1]].individual[objective]
if objective_max != objective_min: # Avoid division by zero
crowding_distance[front[i]] += (self.population[front[i + 1]].individual[objective] -
self.population[front[i - 1]].individual[objective]) / (
objective_max - objective_min)
return crowding_distance
def crossover(self, parent1, parent2):
"""
Crossover between two parents to produce two children.
Swaps generation components and storage components between the two parents with a 50% chance.
swaps generation components and storage components between the two parents with a 50% chance.
:param parent1: First parent individual.
:param parent2: Second parent individual.
:param parent2: second parent individual.
:return: Two child individuals (child1 and child2).
"""
if random.random() < self.crossover_rate:
@ -69,16 +206,16 @@ class GeneticAlgorithm:
# Crossover for Generation Components
for i in range(len(parent1.individual['Generation Components'])):
if random.random() < 0.5:
# Swap the entire generation component
# swap the entire generation component
child1.individual['Generation Components'][i], child2.individual['Generation Components'][i] = (
child2.individual['Generation Components'][i],
child1.individual['Generation Components'][i]
)
# Crossover for Energy Storage Components
# Crossover for Energy storage Components
for i in range(len(parent1.individual['Energy Storage Components'])):
if random.random() < 0.5:
# Swap the entire storage component
# swap the entire storage component
child1.individual['Energy Storage Components'][i], child2.individual['Energy Storage Components'][i] = (
child2.individual['Energy Storage Components'][i],
child1.individual['Energy Storage Components'][i]
@ -103,8 +240,8 @@ class GeneticAlgorithm:
for generation_component in individual['Generation Components']:
if random.random() < self.mutation_rate:
if (generation_component['nominal_heating_efficiency'] is not None and cte.HEATING or cte.DOMESTIC_HOT_WATER in
energy_system.demand_types):
# Mutate heating capacity
energy_system.demand_types):
# Mutate heating capacity
if cte.HEATING in energy_system.demand_types:
generation_component['heating_capacity'] = random.uniform(
0, max(design_period_energy_demands[cte.HEATING]['demands']) / cte.WATTS_HOUR_TO_JULES)
@ -112,14 +249,14 @@ class GeneticAlgorithm:
generation_component['heating_capacity'] = random.uniform(
0, max(design_period_energy_demands[cte.DOMESTIC_HOT_WATER]['demands']) / cte.WATTS_HOUR_TO_JULES)
if generation_component['nominal_cooling_efficiency'] is not None and cte.COOLING in energy_system.demand_types:
# Mutate cooling capacity
# Mutate cooling capacity
generation_component['cooling_capacity'] = random.uniform(
0, max(design_period_energy_demands[cte.COOLING]['demands']) / cte.WATTS_HOUR_TO_JULES)
# Mutate Storage Components
# Mutate storage Components
for storage_component in individual['Energy Storage Components']:
if random.random() < self.mutation_rate:
if storage_component['type'] == f'{cte.THERMAL}_storage':
# Mutate the volume of thermal storage
# Mutate the volume of thermal storage
max_available_space = 0.01 * building.volume / building.storeys_above_ground
storage_component['volume'] = random.uniform(0, max_available_space)
if storage_component['heating_coil_capacity'] is not None:
@ -129,15 +266,14 @@ class GeneticAlgorithm:
else:
storage_component['heating_coil_capacity'] = random.uniform(0, max(
design_period_energy_demands[cte.DOMESTIC_HOT_WATER]['demands']) / cte.WATTS_HOUR_TO_JULES)
return individual
def solve_ga(self, building, energy_system):
"""
Solving GA for a single energy system. Here are the steps:
solving GA for a single energy system. Here are the steps:
1. Initialize population using the "initialize_population" method in this class.
2. Evaluate the initial population using the "score_evaluation" method in the Individual class.
3. Sort population based on fitness score.
3. sort population based on fitness score.
4. Repeat selection, crossover, and mutation for a fixed number of generations.
5. Track the best solution found during the optimization process.
@ -145,49 +281,143 @@ class GeneticAlgorithm:
:param energy_system: Energy system to optimize.
:return: Best solution after running the GA.
"""
# Step 1: Initialize the population
# step 1: Initialize the population
self.initialize_population(building, energy_system)
# Step 2: Evaluate the initial population
# step 2: Evaluate the initial population
for individual in self.population:
individual.score_evaluation()
# Step 3: Order population based on fitness scores
self.order_population()
# Track the best solution
self.best_solution = self.population[0]
self.best_solution_generation = 0
self.list_of_solutions.append(copy.deepcopy(self.best_solution.individual))
# Step 4: Run GA for a fixed number of generations
for generation in range(1, self.generations):
print(f"Generation {generation}")
# Selection (using tournament selection)
selected_population = self.tournament_selection()
# Create the next generation through crossover and mutation
next_population = []
for i in range(0, self.population_size, 2):
parent1 = selected_population[i]
parent2 = selected_population[i + 1] if (i + 1) < len(selected_population) else selected_population[0]
# Step 5: Apply crossover
child1, child2 = self.crossover(parent1, parent2)
# Step 6: Apply mutation
self.mutate(child1.individual, building, energy_system)
self.mutate(child2.individual, building, energy_system)
# Step 7: Evaluate the children
child1.score_evaluation()
child2.score_evaluation()
next_population.extend([child1, child2])
# Replace old population with the new one
self.population = next_population
# Step 8: Sort the new population based on fitness
# Create a list to store all non-dominated solutions from each generation
pareto_population = []
if self.optimization_scenario == 'cost_energy_consumption':
# NSGA-II optimization loop
for generation in range(1, self.generations + 1):
print(f"Generation {generation}")
# Step 3: Perform non-dominated sorting
fronts = self.fast_non_dominated_sort()
# Step 4: Calculate crowding distances for all fronts
crowding_distances = []
for front in fronts:
front = sorted(front, key=lambda x: self.population[x].individual['lcc'])
crowding_distances.extend(self.calculate_crowding_distance(front))
# Step 5: Selection using NSGA-II
self.population = self.nsga2_selection(self.population, fronts, crowding_distances)
# Step 6: Add non-dominated solutions (front[0]) to the Pareto population
pareto_population.extend([self.population[i] for i in fronts[0]])
# Step 7: Apply crossover and mutation to create offspring
next_population = []
while len(next_population) < self.population_size:
parent1 = random.choice(self.population)
parent2 = random.choice(self.population)
child1, child2 = self.crossover(parent1, parent2)
self.mutate(child1.individual, building, energy_system)
self.mutate(child2.individual, building, energy_system)
child1.score_evaluation()
child2.score_evaluation()
next_population.extend([child1, child2])
self.population = next_population[:self.population_size]
# Step 8: Perform non-dominated sorting on pareto_population to get the global Pareto front
self.population = pareto_population
pareto_front_indices = self.fast_non_dominated_sort()[0]
global_pareto_front = [pareto_population[i] for i in pareto_front_indices]
# Step 9: Use TOPSIS to find the best solution from the global Pareto front
self.best_solution = self.topsis_decision_making(global_pareto_front)
print(f"Best solution found using TOPSIS from global Pareto front: {self.best_solution.individual}")
return self.best_solution
else:
# step 3: Order population based on fitness scores
self.order_population()
# Track the best solution found in this generation
if self.population[0].individual['fitness_score'] < self.best_solution.individual['fitness_score']:
self.best_solution = self.population[0]
self.best_solution_generation = generation
# Store the best solution in the list of solutions
self.list_of_solutions.append(copy.deepcopy(self.population[0].individual))
print(f"Best solution found in generation {self.best_solution_generation}")
print(f"Best solution: {self.best_solution.individual}")
return self.best_solution
# Track the best solution
self.best_solution = self.population[0]
self.best_solution_generation = 0
self.list_of_solutions.append(copy.deepcopy(self.best_solution.individual))
# step 4: Run GA for a fixed number of generations
for generation in range(1, self.generations):
print(f"Generation {generation}")
# selection (using tournament selection)
selected_population = self.tournament_selection()
# Create the next generation through crossover and mutation
next_population = []
for i in range(0, self.population_size, 2):
parent1 = selected_population[i]
parent2 = selected_population[i + 1] if (i + 1) < len(selected_population) else selected_population[0]
# step 5: Apply crossover
child1, child2 = self.crossover(parent1, parent2)
# step 6: Apply mutation
self.mutate(child1.individual, building, energy_system)
self.mutate(child2.individual, building, energy_system)
# step 7: Evaluate the children
child1.score_evaluation()
child2.score_evaluation()
next_population.extend([child1, child2])
# Replace old population with the new one
self.population = next_population
# step 8: sort the new population based on fitness
self.order_population()
# Track the best solution found in this generation
if self.population[0].individual['fitness_score'] < self.best_solution.individual['fitness_score']:
self.best_solution = self.population[0]
self.best_solution_generation = generation
# store the best solution in the list of solutions
self.list_of_solutions.append(copy.deepcopy(self.population[0].individual))
print(f"Best solution found in generation {self.best_solution_generation}")
print(f"Best solution: {self.best_solution.individual}")
return self.best_solution
@staticmethod
def topsis_decision_making(pareto_front):
"""
Perform TOPSIS decision-making to choose the best solution from the Pareto front.
:param pareto_front: List of individuals in the Pareto front
:return: The best individual based on TOPSIS ranking
"""
# Step 1: Normalize the objective functions (cost and energy consumption)
min_lcc = min([ind.individual['lcc'] for ind in pareto_front])
max_lcc = max([ind.individual['lcc'] for ind in pareto_front])
min_lce = min([ind.individual['total_energy_consumption'] for ind in pareto_front])
max_lce = max([ind.individual['total_energy_consumption'] for ind in pareto_front])
normalized_pareto_front = []
for ind in pareto_front:
normalized_lcc = (ind.individual['lcc'] - min_lcc) / (max_lcc - min_lcc) if max_lcc > min_lcc else 0
normalized_lce = (ind.individual['total_energy_consumption'] - min_lce) / (
max_lce - min_lce) if max_lce > min_lce else 0
normalized_pareto_front.append((ind, normalized_lcc, normalized_lce))
# Step 2: Calculate the ideal and worst solutions
ideal_solution = (0, 0) # Ideal is minimum LCC and minimum LCE (0, 0 after normalization)
worst_solution = (1, 1) # Worst is maximum LCC and maximum LCE (1, 1 after normalization)
# Step 3: Calculate the distance to the ideal and worst solutions
best_distances = []
worst_distances = []
for ind, normalized_lcc, normalized_lce in normalized_pareto_front:
distance_to_ideal = math.sqrt(
(normalized_lcc - ideal_solution[0]) ** 2 + (normalized_lce - ideal_solution[1]) ** 2)
distance_to_worst = math.sqrt(
(normalized_lcc - worst_solution[0]) ** 2 + (normalized_lce - worst_solution[1]) ** 2)
best_distances.append(distance_to_ideal)
worst_distances.append(distance_to_worst)
# Step 4: Calculate relative closeness to the ideal solution
similarity = [worst / (best + worst) for best, worst in zip(best_distances, worst_distances)]
# Step 5: Select the individual with the highest similarity score
best_index = similarity.index(max(similarity))
best_solution = pareto_front[best_index]
return best_solution
@staticmethod
def design_period_identification(building):

View File

@ -126,7 +126,8 @@ class Individual:
generation_components = self.individual['Generation Components']
storage_components = self.individual['Energy Storage Components']
for generation_component in generation_components:
if generation_component['nominal_heating_efficiency'] is not None and cte.HEATING or cte.DOMESTIC_HOT_WATER in self.demand_types:
if generation_component[
'nominal_heating_efficiency'] is not None and cte.HEATING or cte.DOMESTIC_HOT_WATER in self.demand_types:
if self.heating_design_load is not None:
generation_component['heating_capacity'] = random.uniform(0, self.heating_design_load)
else:
@ -166,6 +167,8 @@ class Individual:
self.initialization()
self.system_simulation()
self.individual['feasible'] = self.feasibility
lcc = 0
total_energy_consumption = 0
if self.feasibility:
if 'cost' in self.optimization_scenario:
investment_cost = 0
@ -191,9 +194,7 @@ class Individual:
operation_cost_year_0=operation_cost_year_0,
maintenance_cost_year_0=maintenance_cost_year_0)
self.individual['lcc'] = lcc
self.individual['fitness_score'] = lcc
self.fitness_score = lcc
elif 'energy_consumption' in self.optimization_scenario:
if 'energy_consumption' in self.optimization_scenario:
total_energy_consumption = 0
for generation_system in self.individual['Generation Components']:
total_energy_consumption += generation_system['total_energy_consumption(kWh)']
@ -201,13 +202,27 @@ class Individual:
if cte.THERMAL in storage_system['type'] and storage_system['heating_coil_capacity'] is not None:
total_energy_consumption += storage_system['total_energy_consumption(kWh)']
self.individual['total_energy_consumption'] = total_energy_consumption
self.individual['fitness_score'] = total_energy_consumption
# Fitness score based on the optimization scenario
if self.optimization_scenario == 'cost':
self.fitness_score = lcc
self.individual['fitness_score'] = lcc
elif self.optimization_scenario == 'energy_consumption':
self.fitness_score = total_energy_consumption
self.individual['fitness_score'] = total_energy_consumption
elif self.optimization_scenario == 'cost_energy_consumption':
pass
self.fitness_score = (lcc, total_energy_consumption)
self.individual['fitness_score'] = (lcc, total_energy_consumption)
else:
self.individual['fitness_score'] = float('inf')
self.fitness_score = float('inf')
lcc = float('inf')
total_energy_consumption = float('inf')
self.individual['lcc'] = lcc
self.individual['total_energy_consumption'] = total_energy_consumption
if self.optimization_scenario == 'cost_energy_consumption':
self.individual['fitness_score'] = (float('inf'), float('inf'))
self.fitness_score = (float('inf'), float('inf'))
else:
self.individual['fitness_score'] = float('inf')
self.fitness_score = float('inf')
def system_simulation(self):
"""
@ -225,14 +240,15 @@ class Individual:
tes.volume = self.individual['Energy Storage Components'][0]['volume']
tes.height = self.building.average_storey_height - 1
tes.heating_coil_capacity = self.individual['Energy Storage Components'][0]['heating_coil_capacity'] \
if self.individual['Energy Storage Components'][0]['heating_coil_capacity'] is not None else None
if self.individual['Energy Storage Components'][0]['heating_coil_capacity'] is not None else None
heating_demand_joules = self.design_period_energy_demands[cte.HEATING]['demands']
heating_peak_load_watts = max(self.design_period_energy_demands[cte.HEATING]) if \
(self.heating_design_load is not None) else self.building.heating_peak_load[cte.YEAR][0]
(self.heating_design_load is not None) else self.building.heating_peak_load[cte.YEAR][0]
upper_limit_tes_heating = 55
design_period_start_index = self.design_period_energy_demands[cte.HEATING]['start_index']
design_period_end_index = self.design_period_energy_demands[cte.HEATING]['end_index']
outdoor_temperature = self.building.external_temperature[cte.HOUR][design_period_start_index:design_period_end_index]
outdoor_temperature = self.building.external_temperature[cte.HOUR][
design_period_start_index:design_period_end_index]
results = HeatPumpBoilerTesHeating(hp=hp,
boiler=boiler,
tes=tes,
@ -250,7 +266,7 @@ class Individual:
tes.volume = self.individual['Energy Storage Components'][0]['volume']
tes.height = self.building.average_storey_height - 1
tes.heating_coil_capacity = self.individual['Energy Storage Components'][0]['heating_coil_capacity'] \
if self.individual['Energy Storage Components'][0]['heating_coil_capacity'] is not None else None
if self.individual['Energy Storage Components'][0]['heating_coil_capacity'] is not None else None
dhw_demand_joules = self.design_period_energy_demands[cte.DOMESTIC_HOT_WATER]['demands']
upper_limit_tes = 65
design_period_start_index = self.design_period_energy_demands[cte.DOMESTIC_HOT_WATER]['start_index']
@ -274,7 +290,7 @@ class Individual:
for demand_type in self.demand_types:
if demand_type in self.energy_system.generation_systems[index].energy_consumption:
generation_component['total_energy_consumption(kWh)'] = (sum(
self.energy_system.generation_systems[index].energy_consumption[demand_type][cte.HOUR]) / 3.6e6)
self.energy_system.generation_systems[index].energy_consumption[demand_type][cte.HOUR]) / 3.6e6)
for storage_component in self.individual['Energy Storage Components']:
if storage_component['type'] == f'{cte.THERMAL}_storage' and storage_component[
'heating_coil_capacity'] is not None:
@ -285,7 +301,7 @@ class Individual:
for demand_type in self.demand_types:
if demand_type in storage_system.heating_coil_energy_consumption:
storage_component['total_energy_consumption(kWh)'] = (sum(
storage_system.heating_coil_energy_consumption[demand_type][cte.HOUR]) / 3.6e6)
storage_system.heating_coil_energy_consumption[demand_type][cte.HOUR]) / 3.6e6)
def life_cycle_cost_calculation(self, investment_cost, operation_cost_year_0, maintenance_cost_year_0,
life_cycle_duration=41):

58
test.py Normal file
View File

@ -0,0 +1,58 @@
from pathlib import Path
import subprocess
from building_modelling.ep_run_enrich import energy_plus_workflow
from energy_system_modelling_package.energy_system_modelling_factories.montreal_energy_system_archetype_modelling_factory import \
MontrealEnergySystemArchetypesSimulationFactory
from energy_system_modelling_package.energy_system_modelling_factories.system_sizing_methods.genetic_algorithm.genetic_algorithm import GeneticAlgorithm
from hub.imports.geometry_factory import GeometryFactory
from hub.helpers.dictionaries import Dictionaries
from hub.imports.construction_factory import ConstructionFactory
from hub.imports.usage_factory import UsageFactory
from hub.imports.weather_factory import WeatherFactory
from hub.imports.results_factory import ResultFactory
import hub.helpers.constants as cte
from building_modelling.geojson_creator import process_geojson
from energy_system_modelling_package import random_assignation
from hub.imports.energy_systems_factory import EnergySystemsFactory
from energy_system_modelling_package.energy_system_modelling_factories.energy_system_sizing_factory import \
EnergySystemsSizingFactory
from energy_system_modelling_package.energy_system_retrofit.energy_system_retrofit_results import consumption_data, \
cost_data
from costing_package.cost import Cost
from costing_package.constants import *
from hub.exports.exports_factory import ExportsFactory
# Specify the GeoJSON file path
input_files_path = (Path(__file__).parent / 'input_files')
input_files_path.mkdir(parents=True, exist_ok=True)
geojson_file = process_geojson(x=-73.5681295982132, y=45.49218262677643, diff=0.00006)
geojson_file_path = input_files_path / 'output_buildings.geojson'
output_path = (Path(__file__).parent / 'out_files').resolve()
output_path.mkdir(parents=True, exist_ok=True)
energy_plus_output_path = output_path / 'energy_plus_outputs'
energy_plus_output_path.mkdir(parents=True, exist_ok=True)
simulation_results_path = (Path(__file__).parent / 'out_files' / 'simulation_results').resolve()
simulation_results_path.mkdir(parents=True, exist_ok=True)
sra_output_path = output_path / 'sra_outputs'
sra_output_path.mkdir(parents=True, exist_ok=True)
cost_analysis_output_path = output_path / 'cost_analysis'
cost_analysis_output_path.mkdir(parents=True, exist_ok=True)
city = GeometryFactory(file_type='geojson',
path=geojson_file_path,
height_field='height',
year_of_construction_field='year_of_construction',
function_field='function',
function_to_hub=Dictionaries().montreal_function_to_hub_function).city
ConstructionFactory('nrcan', city).enrich()
UsageFactory('nrcan', city).enrich()
WeatherFactory('epw', city).enrich()
ExportsFactory('sra', city, sra_output_path).export()
sra_path = (sra_output_path / f'{city.name}_sra.xml').resolve()
subprocess.run(['sra', str(sra_path)])
ResultFactory('sra', city, sra_output_path).enrich()
energy_plus_workflow(city, energy_plus_output_path)
random_assignation.call_random(city.buildings, random_assignation.residential_new_systems_percentage)
EnergySystemsFactory('montreal_future', city).enrich()
for building in city.buildings:
energy_system = building.energy_systems[1]
GeneticAlgorithm(optimization_scenario='cost_energy_consumption').solve_ga(building, energy_system)