fix: NSGA-II is now working properly for heating systems.

This commit is contained in:
Saeed Ranjbar 2024-11-08 13:08:23 +01:00
parent c47071bc2d
commit 2243e22866
5 changed files with 713 additions and 647 deletions

View File

@ -7,8 +7,8 @@ from energy_system_modelling_package.energy_system_modelling_factories.hvac_dhw_
class HeatPumpBoilerTesHeating:
def __init__(self, hp, boiler, tes, hourly_heating_demand_joules, heating_peak_load_watts, upper_limit_tes,
outdoor_temperature, dt=None):
def __init__(self, hp=None, boiler=None, tes=None, hourly_heating_demand_joules=None, heating_peak_load_watts=None,
upper_limit_tes=None, outdoor_temperature=None, dt=None):
self.hp = hp
self.boiler = boiler
self.tes = tes
@ -25,6 +25,9 @@ class HeatPumpBoilerTesHeating:
def simulation(self):
hp, boiler, tes = self.hp, self.boiler, self.tes
if hp.nominal_heat_output < 0 or boiler.nominal_heat_output < 0:
raise ValueError("Heat output values must be non-negative. Check the nominal_heat_output for hp and boiler.")
heating_coil_nominal_output = 0
if tes.heating_coil_capacity is not None:
heating_coil_nominal_output = float(tes.heating_coil_capacity)

View File

@ -195,7 +195,7 @@ class Individual:
operation_cost_year_0=operation_cost_year_0,
maintenance_cost_year_0=maintenance_cost_year_0)
self.individual['lcc'] = lcc
if 'energy_consumption' in self.optimization_scenario:
if 'energy-consumption' in self.optimization_scenario:
total_energy_consumption = 0
for generation_system in self.individual['Generation Components']:
total_energy_consumption += generation_system['total_energy_consumption(kWh)']
@ -207,18 +207,20 @@ class Individual:
if self.optimization_scenario == 'cost':
self.fitness_score = lcc
self.individual['fitness_score'] = lcc
elif self.optimization_scenario == 'energy_consumption':
elif self.optimization_scenario == 'energy-consumption':
self.fitness_score = total_energy_consumption
self.individual['fitness_score'] = total_energy_consumption
elif self.optimization_scenario == 'cost_energy_consumption' or self.optimization_scenario == 'energy_consumption_cost':
elif self.optimization_scenario == 'cost_energy-consumption':
self.fitness_score = (lcc, total_energy_consumption)
elif self.optimization_scenario == 'energy-consumption_cost':
self.fitness_score = (total_energy_consumption, lcc)
self.individual['fitness_score'] = (lcc, total_energy_consumption)
else:
lcc = float('inf')
total_energy_consumption = float('inf')
self.individual['lcc'] = lcc
self.individual['total_energy_consumption'] = total_energy_consumption
if self.optimization_scenario == 'cost_energy_consumption' or self.optimization_scenario == 'energy_consumption_cost':
if self.optimization_scenario == 'cost_energy-consumption' or self.optimization_scenario == 'energy-consumption_cost':
self.individual['fitness_score'] = (float('inf'), float('inf'))
self.fitness_score = (float('inf'), float('inf'))
else:

View File

@ -1,336 +0,0 @@
import copy
import math
import random
import hub.helpers.constants as cte
from energy_system_modelling_package.energy_system_modelling_factories.system_sizing_methods.genetic_algorithm.individual import \
Individual
import matplotlib.pyplot as plt
import time
class MultiObjectiveGeneticAlgorithm:
def __init__(self, population_size=50, generations=50, crossover_rate=0.9, mutation_rate=0.33,
optimization_scenario=None, output_path=None):
self.population_size = population_size
self.population = []
self.generations = generations
self.crossover_rate = crossover_rate
self.mutation_rate = mutation_rate
self.optimization_scenario = optimization_scenario
self.list_of_solutions = []
self.best_solution = None
self.best_solution_generation = None
self.crowding_distances = None
self.output_path = output_path
# Initialize Population
def initialize_population(self, building, energy_system):
design_period_energy_demands = self.design_period_identification(building)
attempts = 0
max_attempts = self.population_size * 20
while len(self.population) < self.population_size and attempts < max_attempts:
individual = Individual(building=building,
energy_system=energy_system,
design_period_energy_demands=design_period_energy_demands,
optimization_scenario=self.optimization_scenario)
individual.initialization()
individual.score_evaluation()
attempts += 1
if individual.feasibility:
self.population.append(individual)
if len(self.population) < self.population_size:
raise RuntimeError(f"Could not generate a feasible population of size {self.population_size}. "
f"Only {len(self.population)} feasible individuals were generated.")
@staticmethod
def design_period_identification(building):
def get_start_end_indices(max_day_index, total_days):
if max_day_index > 0 and max_day_index < total_days - 1:
start_index = (max_day_index - 1) * 24
end_index = (max_day_index + 2) * 24
elif max_day_index == 0:
start_index = 0
end_index = (max_day_index + 2) * 24
else:
start_index = (max_day_index - 1) * 24
end_index = total_days * 24
return start_index, end_index
# Calculate daily demands
heating_daily_demands = [sum(building.heating_demand[cte.HOUR][i:i + 24]) for i in
range(0, len(building.heating_demand[cte.HOUR]), 24)]
cooling_daily_demands = [sum(building.cooling_demand[cte.HOUR][i:i + 24]) for i in
range(0, len(building.cooling_demand[cte.HOUR]), 24)]
dhw_daily_demands = [sum(building.domestic_hot_water_heat_demand[cte.HOUR][i:i + 24]) for i in
range(0, len(building.domestic_hot_water_heat_demand[cte.HOUR]), 24)]
# Get the day with maximum demand for each type
heating_max_day = heating_daily_demands.index(max(heating_daily_demands))
cooling_max_day = cooling_daily_demands.index(max(cooling_daily_demands))
dhw_max_day = dhw_daily_demands.index(max(dhw_daily_demands))
# Get the start and end indices for each demand type
heating_start, heating_end = get_start_end_indices(heating_max_day, len(heating_daily_demands))
cooling_start, cooling_end = get_start_end_indices(cooling_max_day, len(cooling_daily_demands))
dhw_start, dhw_end = get_start_end_indices(dhw_max_day, len(dhw_daily_demands))
# Return the design period energy demands
return {
f'{cte.HEATING}': {'demands': building.heating_demand[cte.HOUR][heating_start:heating_end],
'start_index': heating_start, 'end_index': heating_end},
f'{cte.COOLING}': {'demands': building.cooling_demand[cte.HOUR][cooling_start:cooling_end],
'start_index': cooling_start, 'end_index': cooling_end},
f'{cte.DOMESTIC_HOT_WATER}': {'demands': building.domestic_hot_water_heat_demand[cte.HOUR][dhw_start:dhw_end],
'start_index': dhw_start, 'end_index': dhw_end}
}
def sbx_crossover(self, parent1, parent2):
eta_c = 2 # Distribution index for SBX
if random.random() < self.crossover_rate:
child1, child2 = copy.deepcopy(parent1), copy.deepcopy(parent2)
# Crossover for Generation Components
for i in range(len(parent1.individual['Generation Components'])):
for cap_type in ['heating_capacity', 'cooling_capacity']:
if parent1.individual['Generation Components'][i][cap_type] is not None:
x1 = parent1.individual['Generation Components'][i][cap_type]
x2 = parent2.individual['Generation Components'][i][cap_type]
if abs(x1 - x2) > 1e-14: # Avoid division by zero
u = random.random()
if u <= 0.5:
beta = (2 * u) ** (1 / (eta_c + 1))
else:
beta = (1 / (2 * (1 - u))) ** (1 / (eta_c + 1))
child1.individual['Generation Components'][i][cap_type] = max(0.1,
0.5 * ((1 + beta) * x1 + (1 - beta) * x2))
child2.individual['Generation Components'][i][cap_type] = max(0.1,
0.5 * ((1 - beta) * x1 + (1 + beta) * x2))
# Crossover for Energy Storage Components
for i in range(len(parent1.individual['Energy Storage Components'])):
for item in ['capacity', 'volume', 'heating_coil_capacity']:
if parent1.individual['Energy Storage Components'][i][item] is not None:
x1 = parent1.individual['Energy Storage Components'][i][item]
x2 = parent2.individual['Energy Storage Components'][i][item]
if abs(x1 - x2) > 1e-14: # Avoid division by zero
u = random.random()
if u <= 0.5:
beta = (2 * u) ** (1 / (eta_c + 1))
else:
beta = (1 / (2 * (1 - u))) ** (1 / (eta_c + 1))
threshold1 = random.uniform(0, 0.01) * child1.available_space
child1.individual['Energy Storage Components'][i][item] = max(threshold1,
0.5 * ((1 + beta) * x1 + (1 - beta) * x2))
threshold2 = random.uniform(0, 0.01) * child1.available_space
child2.individual['Energy Storage Components'][i][item] = max(threshold2,
0.5 * ((1 - beta) * x1 + (1 + beta) * x2))
return child1, child2
else:
return copy.deepcopy(parent1), copy.deepcopy(parent2)
def polynomial_mutation(self, individual, building, energy_system):
"""
Mutates the individual's generation and storage components using the Polynomial Mutation Operator.
- `individual`: The individual to mutate (contains generation and storage components).
- `building`: Building data that contains constraints such as peak heating load and available space.
Returns the mutated individual.
"""
eta_m = 20 # Mutation distribution index (can be adjusted)
design_period_energy_demands = self.design_period_identification(building)
def polynomial_mutation_operator(value, lower_bound, upper_bound):
delta = (value - lower_bound) / (upper_bound - lower_bound)
u = random.random()
if u < 0.5:
delta_q = (2 * u) ** (1 / (eta_m + 1)) - 1
else:
delta_q = 1 - (2 * (1 - u)) ** (1 / (eta_m + 1))
mutated_value = value + delta_q * (upper_bound - lower_bound)
return max(lower_bound, min(mutated_value, upper_bound)) # Ensure it's within bounds
# Mutate Generation Components
for generation_component in individual['Generation Components']:
if random.random() < self.mutation_rate:
if (generation_component['nominal_heating_efficiency'] is not None and
(cte.HEATING or cte.DOMESTIC_HOT_WATER in energy_system.demand_types)):
# Mutate heating capacity
if cte.HEATING in energy_system.demand_types:
max_demand = max(design_period_energy_demands[cte.HEATING]['demands']) / cte.WATTS_HOUR_TO_JULES
else:
max_demand = max(design_period_energy_demands[cte.DOMESTIC_HOT_WATER]['demands']) / cte.WATTS_HOUR_TO_JULES
generation_component['heating_capacity'] = polynomial_mutation_operator(
generation_component['heating_capacity'], 0.1, max_demand)
if generation_component['nominal_cooling_efficiency'] is not None and cte.COOLING in energy_system.demand_types:
# Mutate cooling capacity
max_cooling_demand = max(design_period_energy_demands[cte.COOLING]['demands']) / cte.WATTS_HOUR_TO_JULES
generation_component['cooling_capacity'] = polynomial_mutation_operator(
generation_component['cooling_capacity'], 0.1, max_cooling_demand)
# Mutate Storage Components
for storage_component in individual['Energy Storage Components']:
if random.random() < self.mutation_rate:
if storage_component['type'] == f'{cte.THERMAL}_storage':
# Mutate the volume of thermal storage
max_available_space = 0.01 * building.volume / building.storeys_above_ground
lower_bound = random.uniform(0, 0.001) * max_available_space
storage_component['volume'] = polynomial_mutation_operator(
storage_component['volume'], 0, max_available_space)
if storage_component['heating_coil_capacity'] is not None:
if cte.HEATING in energy_system.demand_types:
max_heating_demand = max(design_period_energy_demands[cte.HEATING]['demands']) / cte.WATTS_HOUR_TO_JULES
else:
max_heating_demand = max(
design_period_energy_demands[cte.DOMESTIC_HOT_WATER]['demands']) / cte.WATTS_HOUR_TO_JULES
storage_component['heating_coil_capacity'] = polynomial_mutation_operator(
storage_component['heating_coil_capacity'], 0, max_heating_demand)
return individual
def fast_non_dominated_sorting(self):
population = self.population
s = [[] for _ in range(len(population))]
n = [0] * len(population)
rank = [0] * len(population)
fronts = [[]]
for p in range(len(population)):
for q in range(len(population)):
if self.domination_status(population[p], population[q]):
s[p].append(q)
elif self.domination_status(population[q], population[p]):
n[p] += 1
if n[p] == 0:
rank[p] = 0
fronts[0].append(p)
i = 0
while fronts[i]:
next_front = set()
for p in fronts[i]:
for q in s[p]:
n[q] -= 1
if n[q] == 0:
rank[q] = i + 1
next_front.add(q)
i += 1
fronts.append(list(next_front))
del fronts[-1]
for (j, front) in enumerate(fronts):
for i in front:
self.population[i].rank = j + 1
return fronts
def calculate_crowding_distance(self, front, crowding_distance):
for j in range(len(self.population[0].fitness_score)):
sorted_front = sorted(front, key=lambda x: self.population[x].individual['fitness_score'][j])
# Set distances to finite large numbers rather than `inf`
crowding_distance[sorted_front[0]] = float(1e12)
crowding_distance[sorted_front[-1]] = float(1e12)
objective_min = self.population[sorted_front[0]].individual['fitness_score'][j]
objective_max = self.population[sorted_front[-1]].individual['fitness_score'][j]
if objective_max != objective_min:
for i in range(1, len(sorted_front) - 1):
crowding_distance[sorted_front[i]] += (
(self.population[sorted_front[i + 1]].individual['fitness_score'][j] -
self.population[sorted_front[i - 1]].individual['fitness_score'][j]) /
(objective_max - objective_min))
for i in front:
self.population[i].crowding_distance = crowding_distance[i]
return crowding_distance
@staticmethod
def domination_status(individual1, individual2):
# Extract the list of objectives for both individuals
objectives1 = individual1.individual['fitness_score']
objectives2 = individual2.individual['fitness_score']
# Ensure both individuals have the same number of objectives
assert len(objectives1) == len(objectives2), "Both individuals must have the same number of objectives"
# Flags to check if one dominates the other
better_in_all = True # Whether individual1 is better in all objectives
strictly_better_in_at_least_one = False # Whether individual1 is strictly better in at least one objective
for obj1, obj2 in zip(objectives1, objectives2):
if obj1 > obj2:
better_in_all = False # individual1 is worse in at least one objective
if obj1 < obj2:
strictly_better_in_at_least_one = True # individual1 is better in at least one objective
result = True if better_in_all and strictly_better_in_at_least_one else False
return result
def nsga2_selection(self, fronts):
new_population = []
i = 0
while len(new_population) + len(fronts[i]) <= self.population_size:
for index in fronts[i]:
# Skip individuals with infinite fitness values to avoid propagation
if not math.isinf(self.population[index].individual['fitness_score'][0]) and \
not math.isinf(self.population[index].individual['fitness_score'][1]):
new_population.append(self.population[index])
i += 1
if i >= len(fronts):
break
if len(new_population) < self.population_size and i < len(fronts):
sorted_front = sorted(fronts[i], key=lambda x: self.population[x].crowding_distance, reverse=True)
for index in sorted_front:
if len(new_population) < self.population_size:
if not math.isinf(self.population[index].individual['fitness_score'][0]) and \
not math.isinf(self.population[index].individual['fitness_score'][1]):
new_population.append(self.population[index])
else:
break
return new_population
def solve_ga(self, building, energy_system):
optimization_start = time.time()
self.initialize_population(building, energy_system)
for n in range(self.generations + 1):
generation_n_start_time = time.time()
print(f"Generation {n}")
progeny_population = []
while len(progeny_population) < self.population_size:
parent1, parent2 = random.choice(self.population), random.choice(self.population)
child1, child2 = self.sbx_crossover(parent1, parent2)
self.polynomial_mutation(child1.individual, building, energy_system)
self.polynomial_mutation(child2.individual, building, energy_system)
child1.score_evaluation()
child2.score_evaluation()
progeny_population.extend([child1, child2])
self.population.extend(progeny_population)
fronts = self.fast_non_dominated_sorting()
print([self.population[ind].fitness_score for ind in fronts[0]])
print(fronts)
crowding_distances = [0] * len(self.population)
for front in fronts:
self.calculate_crowding_distance(front=front, crowding_distance=crowding_distances)
new_population = self.nsga2_selection(fronts=fronts)
self.population = new_population
generation_n_calc_duration = time.time() - generation_n_start_time
print(f"Generation {n:.2f} took {generation_n_calc_duration:.2f} seconds")
if n == self.generations:
fronts = self.fast_non_dominated_sorting()
print(fronts)
self.plot_pareto_front(fronts)
optimization_duration = time.time() - optimization_start
print(f"NSGA-II took {optimization_duration:.2f} seconds")
def plot_pareto_front(self, fronts):
# Extract LCC and LCE for plotting all individuals
lcc_values = [individual.individual['lcc'] for individual in self.population]
lce_values = [individual.individual['total_energy_consumption'] for individual in self.population]
# Plot all individuals as blue points
plt.figure(figsize=(10, 6))
plt.scatter(lcc_values, lce_values, color='blue', label='Population', alpha=0.6, edgecolors='w', s=80)
# Extract and sort LCC and LCE for individuals in the first front (front_0) to ensure a connected line
front_0 = [self.population[i] for i in fronts[0]]
front_0_lcc = [individual.individual['lcc'] for individual in front_0]
front_0_lce = [individual.individual['total_energy_consumption'] for individual in front_0]
# Sort front_0 individuals by LCC or LCE to connect them in order (optional step for smooth line)
sorted_front_0 = sorted(zip(front_0_lcc, front_0_lce), key=lambda x: x[0])
sorted_lcc, sorted_lce = zip(*sorted_front_0)
# Plot the Pareto front (front_0) as a red line
plt.plot(sorted_lcc, sorted_lce, color='red', label='Pareto Front (Front 0)', linewidth=2)
# Configure plot details
plt.title('Pareto Front for Life Cycle Cost vs Life Cycle Energy')
plt.xlabel('Life Cycle Cost (LCC)')
plt.ylabel('Life Cycle Energy (LCE)')
plt.grid(True)
plt.legend()
plt.show()

View File

@ -3,7 +3,7 @@ import subprocess
from building_modelling.ep_run_enrich import energy_plus_workflow
from energy_system_modelling_package.energy_system_modelling_factories.montreal_energy_system_archetype_modelling_factory import \
MontrealEnergySystemArchetypesSimulationFactory
from energy_system_modelling_package.energy_system_modelling_factories.system_sizing_methods.genetic_algorithm.multi_objective_genetic_algorithm_rethinking import MultiObjectiveGeneticAlgorithm
from energy_system_modelling_package.energy_system_modelling_factories.system_sizing_methods.genetic_algorithm.multi_objective_genetic_algorithm import MultiObjectiveGeneticAlgorithm
from hub.imports.geometry_factory import GeometryFactory
from hub.helpers.dictionaries import Dictionaries
from hub.imports.construction_factory import ConstructionFactory
@ -55,4 +55,4 @@ random_assignation.call_random(city.buildings, random_assignation.residential_ne
EnergySystemsFactory('montreal_future', city).enrich()
for building in city.buildings:
energy_system = building.energy_systems[1]
MultiObjectiveGeneticAlgorithm(optimization_scenario='cost_energy_consumption').solve_ga(building, energy_system)
MultiObjectiveGeneticAlgorithm(optimization_scenario='energy-consumption_cost').solve_ga(building, energy_system)