Compare commits

...

22 Commits

Author SHA1 Message Date
f48f118443 feat: action plan code added 2024-12-05 15:28:01 +01:00
a143100b39 fix: simulation models fixed for hourly consumption calculation, optimization process improved 2024-11-25 21:12:42 +01:00
09d1d08ed7 fix: small changes in NSGA-II 2024-11-13 14:15:22 +01:00
7944a36dbf fix: DHW system optimization tested and confirmed 2024-11-12 09:32:09 +01:00
2243e22866 fix: NSGA-II is now working properly for heating systems. 2024-11-08 13:08:23 +01:00
c47071bc2d fix: nsga-II is complete the only remaining step is TOPSIS decision-making 2024-10-31 09:50:12 +00:00
57c23f290f fix: multi objective optimization work continued 2024-10-25 11:55:12 +02:00
227b70b451 fix: rethinking of multi objective optimization started 2024-10-17 17:08:44 +02:00
368ab757a6 fix: single objective optimization made more accurate 2024-10-09 11:26:15 +02:00
fee8120be5 fix: energy system archetypes modified. Now we have 4 separate systems for PV, heating, cooling, and dhw 2024-10-08 10:37:52 +02:00
d8ad28e709 feat: multi-objective optimization is doen 2024-10-04 13:16:17 +02:00
a694a8f47c fix: optimal sizing for single objective is completed 2024-10-02 17:49:17 +02:00
7ff94a56c1 fix: starting to test the workflow for finding the system sizing period 2024-09-30 17:27:41 +02:00
f7815dc4c0 feat: single objective optimization finished 2024-09-27 16:59:09 -04:00
07cdfc4bf7 feat: single objective optimization started 2024-09-26 17:51:01 -04:00
4c3b28a088 feat: individual class finished 2024-09-25 23:24:32 -04:00
26c744ded0 fix: the class Individual as a part of modifying the GA is modified 2024-09-24 13:10:38 -04:00
2ed9555fc8 fix: single objective optimization algorithm is completed. The cooling system needs to be separated in the archetype 2024-09-16 18:51:53 -04:00
59815f4bbf feat: an issue in calculation of cooling cop is detected 2024-09-14 21:33:23 -04:00
7bd6b81a55 feat: cooling and dhw system added to the optimization 2024-09-13 08:47:46 -04:00
eee55fc453 feat: cost optimization of heating system is completed 2024-09-11 18:13:39 -04:00
21b4fc5202 feat: single objective optimization started 2024-09-11 09:36:30 -04:00
203 changed files with 37444 additions and 330 deletions

1
.gitignore vendored
View File

@ -12,3 +12,4 @@
cerc_hub.egg-info
/out_files
/input_files/output_buildings.geojson
*.pyc

View File

@ -28,12 +28,12 @@ class ArchetypeCluster1:
def heating_system_simulation(self):
building_heating_hourly_consumption = []
boiler = self.building.energy_systems[1].generation_systems[0]
hp = self.building.energy_systems[1].generation_systems[1]
tes = self.building.energy_systems[1].generation_systems[0].energy_storage_systems[0]
boiler = self.building.energy_systems[0].generation_systems[0]
hp = self.building.energy_systems[0].generation_systems[1]
tes = self.building.energy_systems[0].generation_systems[0].energy_storage_systems[0]
heating_demand_joules = self.building.heating_demand[cte.HOUR]
heating_peak_load_watts = self.building.heating_peak_load[cte.YEAR][0]
upper_limit_tes_heating = 55
upper_limit_tes_heating = 45
outdoor_temperature = self.building.external_temperature[cte.HOUR]
results = HeatPumpBoilerTesHeating(hp=hp,
boiler=boiler,
@ -55,10 +55,10 @@ class ArchetypeCluster1:
return results, building_heating_hourly_consumption
def cooling_system_simulation(self):
hp = self.building.energy_systems[1].generation_systems[1]
hp = self.building.energy_systems[1].generation_systems[0]
cooling_demand_joules = self.building.cooling_demand[cte.HOUR]
cooling_peak_load = self.building.cooling_peak_load[cte.YEAR][0]
cutoff_temperature = 13
cutoff_temperature = 11
outdoor_temperature = self.building.external_temperature[cte.HOUR]
results = HeatPumpCooling(hp=hp,
hourly_cooling_demand_joules=cooling_demand_joules,
@ -71,8 +71,8 @@ class ArchetypeCluster1:
def dhw_system_simulation(self):
building_dhw_hourly_consumption = []
hp = self.building.energy_systems[2].generation_systems[0]
tes = self.building.energy_systems[2].generation_systems[0].energy_storage_systems[0]
hp = self.building.energy_systems[-1].generation_systems[0]
tes = self.building.energy_systems[-1].generation_systems[0].energy_storage_systems[0]
dhw_demand_joules = self.building.domestic_hot_water_heat_demand[cte.HOUR]
upper_limit_tes = 65
outdoor_temperature = self.building.external_temperature[cte.HOUR]

View File

@ -4,11 +4,10 @@ SPDX - License - Identifier: LGPL - 3.0 - or -later
Copyright © 2024 Concordia CERC group
Project Coder Saeed Ranjbar saeed.ranjbar@mail.concordia.ca
"""
from energy_system_modelling_package.energy_system_modelling_factories.system_sizing_methods.optimal_sizing import \
OptimalSizing
from energy_system_modelling_package.energy_system_modelling_factories.system_sizing_methods.peak_load_sizing import \
PeakLoadSizing
from energy_system_modelling_package.energy_system_modelling_factories.system_sizing_methods.heuristic_sizing import \
HeuristicSizing
from energy_system_modelling_package.energy_system_modelling_factories.pv_assessment.pv_sizing import PVSizing
@ -30,11 +29,11 @@ class EnergySystemsSizingFactory:
for building in self._city.buildings:
building.level_of_detail.energy_systems = 1
def _heurisitc_sizing(self):
def _optimal_sizing(self):
"""
Size Energy Systems using a Single or Multi Objective GA
"""
HeuristicSizing(self._city).enrich_buildings()
OptimalSizing(self._city, optimization_scenario='cost_energy_consumption').enrich_buildings()
self._city.level_of_detail.energy_systems = 1
for building in self._city.buildings:
building.level_of_detail.energy_systems = 1

View File

@ -6,7 +6,7 @@ from hub.helpers.monthly_values import MonthlyValues
class DomesticHotWaterHeatPumpTes:
def __init__(self, hp, tes, hourly_dhw_demand_joules, upper_limit_tes,
outdoor_temperature, dt=900):
outdoor_temperature, dt=None):
self.hp = hp
self.tes = tes
self.dhw_demand = [demand / cte.WATTS_HOUR_TO_JULES for demand in hourly_dhw_demand_joules]
@ -30,8 +30,8 @@ class DomesticHotWaterHeatPumpTes:
hp_delta_t = 8
number_of_ts = int(cte.HOUR_TO_SECONDS / self.dt)
source_temperature_hourly = self.hp_characteristics.hp_source_temperature()
source_temperature = [x for x in source_temperature_hourly for _ in range(number_of_ts)]
demand = [x for x in self.dhw_demand for _ in range(number_of_ts)]
source_temperature = [0] + [x for x in source_temperature_hourly for _ in range(number_of_ts)]
demand = [0] + [x for x in self.dhw_demand for _ in range(number_of_ts)]
variable_names = ["t_sup_hp", "t_tank", "m_ch", "m_dis", "q_hp", "q_coil", "hp_cop",
"hp_electricity", "available hot water (m3)", "refill flow rate (kg/s)", "total_consumption"]
num_hours = len(demand)
@ -39,7 +39,7 @@ class DomesticHotWaterHeatPumpTes:
(t_sup_hp, t_tank, m_ch, m_dis, m_refill, q_hp, q_coil, hp_cop, hp_electricity, v_dhw, total_consumption) = \
[variables[name] for name in variable_names]
freshwater_temperature = 18
t_tank[0] = 70
t_tank[0] = 65
for i in range(len(demand) - 1):
delta_t_demand = demand[i] * (self.dt / (cte.WATER_DENSITY * cte.WATER_HEAT_CAPACITY *
storage_tank.volume))
@ -73,16 +73,16 @@ class DomesticHotWaterHeatPumpTes:
t_tank[i + 1] = t_tank[i] + (delta_t_hp - delta_t_freshwater - delta_t_demand + delta_t_coil)
total_consumption[i] = hp_electricity[i] + q_coil[i]
tes.temperature = []
hp_electricity_j = [(x * cte.WATTS_HOUR_TO_JULES) / number_of_ts for x in hp_electricity]
heating_coil_j = [(x * cte.WATTS_HOUR_TO_JULES) / number_of_ts for x in q_coil]
hp_electricity_j = [(x * cte.WATTS_HOUR_TO_JULES) / number_of_ts for x in hp_electricity[1:]]
heating_coil_j = [(x * cte.WATTS_HOUR_TO_JULES) / number_of_ts for x in q_coil[1:]]
hp_hourly = []
coil_hourly = []
coil_sum = 0
hp_sum = 0
for i in range(1, len(demand)):
for i in range(len(demand) - 1):
hp_sum += hp_electricity_j[i]
coil_sum += heating_coil_j[i]
if (i - 1) % number_of_ts == 0:
if i % number_of_ts == 0 or i == len(demand) - 1:
tes.temperature.append(t_tank[i])
hp_hourly.append(hp_sum)
coil_hourly.append(coil_sum)
@ -90,29 +90,31 @@ class DomesticHotWaterHeatPumpTes:
coil_sum = 0
hp.energy_consumption[cte.DOMESTIC_HOT_WATER] = {}
hp.energy_consumption[cte.DOMESTIC_HOT_WATER][cte.HOUR] = hp_hourly
hp.energy_consumption[cte.DOMESTIC_HOT_WATER][cte.MONTH] = MonthlyValues.get_total_month(
hp.energy_consumption[cte.DOMESTIC_HOT_WATER][cte.HOUR])
hp.energy_consumption[cte.DOMESTIC_HOT_WATER][cte.YEAR] = [
sum(hp.energy_consumption[cte.DOMESTIC_HOT_WATER][cte.MONTH])]
if len(self.dhw_demand) == 8760:
hp.energy_consumption[cte.DOMESTIC_HOT_WATER][cte.MONTH] = MonthlyValues.get_total_month(
hp.energy_consumption[cte.DOMESTIC_HOT_WATER][cte.HOUR])
hp.energy_consumption[cte.DOMESTIC_HOT_WATER][cte.YEAR] = [
sum(hp.energy_consumption[cte.DOMESTIC_HOT_WATER][cte.MONTH])]
if self.tes.heating_coil_capacity is not None:
tes.heating_coil_energy_consumption[cte.DOMESTIC_HOT_WATER] = {}
tes.heating_coil_energy_consumption[cte.DOMESTIC_HOT_WATER][cte.HOUR] = coil_hourly
tes.heating_coil_energy_consumption[cte.DOMESTIC_HOT_WATER][cte.MONTH] = MonthlyValues.get_total_month(
tes.heating_coil_energy_consumption[cte.DOMESTIC_HOT_WATER][cte.HOUR])
tes.heating_coil_energy_consumption[cte.DOMESTIC_HOT_WATER][cte.YEAR] = [
sum(tes.heating_coil_energy_consumption[cte.DOMESTIC_HOT_WATER][cte.MONTH])]
if len(self.dhw_demand) == 8760:
tes.heating_coil_energy_consumption[cte.DOMESTIC_HOT_WATER][cte.MONTH] = MonthlyValues.get_total_month(
tes.heating_coil_energy_consumption[cte.DOMESTIC_HOT_WATER][cte.HOUR])
tes.heating_coil_energy_consumption[cte.DOMESTIC_HOT_WATER][cte.YEAR] = [
sum(tes.heating_coil_energy_consumption[cte.DOMESTIC_HOT_WATER][cte.MONTH])]
self.results['DHW Demand (W)'] = demand
self.results['DHW HP Heat Output (W)'] = q_hp
self.results['DHW HP Electricity Consumption (W)'] = hp_electricity
self.results['DHW HP Source Temperature'] = source_temperature
self.results['DHW HP Supply Temperature'] = t_sup_hp
self.results['DHW HP COP'] = hp_cop
self.results['DHW TES Heating Coil Heat Output (W)'] = q_coil
self.results['DHW TES Temperature'] = t_tank
self.results['DHW TES Charging Flow Rate (kg/s)'] = m_ch
self.results['DHW Flow Rate (kg/s)'] = m_dis
self.results['DHW TES Refill Flow Rate (kg/s)'] = m_refill
self.results['Available Water in Tank (m3)'] = v_dhw
self.results['Total DHW Power Consumption (W)'] = total_consumption
self.results['DHW Demand (W)'] = demand[1:]
self.results['DHW HP Heat Output (W)'] = q_hp[1:]
self.results['DHW HP Electricity Consumption (W)'] = hp_electricity[1:]
self.results['DHW HP Source Temperature'] = source_temperature[1:]
self.results['DHW HP Supply Temperature'] = t_sup_hp[1:]
self.results['DHW HP COP'] = hp_cop[1:]
self.results['DHW TES Heating Coil Heat Output (W)'] = q_coil[1:]
self.results['DHW TES Temperature'] = t_tank[1:]
self.results['DHW TES Charging Flow Rate (kg/s)'] = m_ch[1:]
self.results['DHW Flow Rate (kg/s)'] = m_dis[1:]
self.results['DHW TES Refill Flow Rate (kg/s)'] = m_refill[1:]
self.results['Available Water in Tank (m3)'] = v_dhw[1:]
self.results['Total DHW Power Consumption (W)'] = total_consumption[1:]
return self.results

View File

@ -7,13 +7,16 @@ from energy_system_modelling_package.energy_system_modelling_factories.hvac_dhw_
class HeatPumpBoilerTesHeating:
def __init__(self, hp, boiler, tes, hourly_heating_demand_joules, heating_peak_load_watts, upper_limit_tes,
outdoor_temperature, dt=900):
def __init__(self, hp=None, boiler=None, tes=None, hourly_heating_demand_joules=None, heating_peak_load_watts=None,
upper_limit_tes=None, outdoor_temperature=None, dt=None):
self.hp = hp
self.boiler = boiler
self.tes = tes
self.heating_demand = [demand / cte.WATTS_HOUR_TO_JULES for demand in hourly_heating_demand_joules]
self.heating_peak_load = heating_peak_load_watts
if heating_peak_load_watts is not None:
self.heating_peak_load = heating_peak_load_watts
else:
self.heating_peak_load = max(hourly_heating_demand_joules) / cte.HOUR_TO_SECONDS
self.upper_limit_tes = upper_limit_tes
self.hp_characteristics = HeatPump(self.hp, outdoor_temperature)
self.t_out = outdoor_temperature
@ -22,6 +25,10 @@ class HeatPumpBoilerTesHeating:
def simulation(self):
hp, boiler, tes = self.hp, self.boiler, self.tes
if boiler is not None:
if hp.nominal_heat_output < 0 or boiler.nominal_heat_output < 0:
raise ValueError("Heat output values must be non-negative. Check the nominal_heat_output for hp and boiler.")
heating_coil_nominal_output = 0
if tes.heating_coil_capacity is not None:
heating_coil_nominal_output = float(tes.heating_coil_capacity)
@ -55,41 +62,54 @@ class HeatPumpBoilerTesHeating:
ambient_temperature=t_out[i],
dt=self.dt)
# hp operation
if t_tank[i + 1] < 40:
q_hp[i + 1] = hp.nominal_heat_output
m_ch[i + 1] = q_hp[i + 1] / (cte.WATER_HEAT_CAPACITY * hp_delta_t)
t_sup_hp[i + 1] = (q_hp[i + 1] / (m_ch[i + 1] * cte.WATER_HEAT_CAPACITY)) + t_tank[i + 1]
elif 40 <= t_tank[i + 1] < self.upper_limit_tes and q_hp[i] == 0:
q_hp[i + 1] = 0
m_ch[i + 1] = 0
t_sup_hp[i + 1] = t_tank[i + 1]
elif 40 <= t_tank[i + 1] < self.upper_limit_tes and q_hp[i] > 0:
q_hp[i + 1] = hp.nominal_heat_output
m_ch[i + 1] = q_hp[i + 1] / (cte.WATER_HEAT_CAPACITY * hp_delta_t)
t_sup_hp[i + 1] = (q_hp[i + 1] / (m_ch[i + 1] * cte.WATER_HEAT_CAPACITY)) + t_tank[i + 1]
else:
q_hp[i + 1], m_ch[i + 1], t_sup_hp[i + 1] = 0, 0, t_tank[i + 1]
if q_hp[i + 1] > 0:
if hp.source_medium == cte.AIR and self.hp.supply_medium == cte.WATER:
hp_cop[i + 1] = self.hp_characteristics.air_to_water_cop(source_temperature[i + 1], t_tank[i + 1], mode=cte.HEATING)
hp_electricity[i + 1] = q_hp[i + 1] / hp_cop[i + 1]
else:
hp_cop[i + 1] = 0
hp_electricity[i + 1] = 0
# boiler operation
if q_hp[i + 1] > 0:
if t_sup_hp[i + 1] < 45:
q_boiler[i + 1] = boiler.nominal_heat_output
elif demand[i + 1] > 0.5 * self.heating_peak_load / self.dt:
q_boiler[i + 1] = 0.5 * boiler.nominal_heat_output
boiler_energy_consumption[i + 1] = q_boiler[i + 1] / float(boiler.heat_efficiency)
if boiler.fuel_type == cte.ELECTRICITY:
boiler_fuel_consumption[i + 1] = boiler_energy_consumption[i + 1]
if t_out[i + 1] > -20:
if t_tank[i + 1] < 40:
q_hp[i + 1] = hp.nominal_heat_output
m_ch[i + 1] = q_hp[i + 1] / (cte.WATER_HEAT_CAPACITY * hp_delta_t)
t_sup_hp[i + 1] = (q_hp[i + 1] / (m_ch[i + 1] * cte.WATER_HEAT_CAPACITY)) + t_tank[i + 1]
elif 40 <= t_tank[i + 1] < self.upper_limit_tes and q_hp[i] == 0:
q_hp[i + 1] = 0
m_ch[i + 1] = 0
t_sup_hp[i + 1] = t_tank[i + 1]
elif 40 <= t_tank[i + 1] < self.upper_limit_tes and q_hp[i] > 0:
q_hp[i + 1] = hp.nominal_heat_output
m_ch[i + 1] = q_hp[i + 1] / (cte.WATER_HEAT_CAPACITY * hp_delta_t)
t_sup_hp[i + 1] = (q_hp[i + 1] / (m_ch[i + 1] * cte.WATER_HEAT_CAPACITY)) + t_tank[i + 1]
else:
# TODO: Other fuels should be considered
boiler_fuel_consumption[i + 1] = (q_boiler[i + 1] * self.dt) / (
float(boiler.heat_efficiency) * cte.NATURAL_GAS_LHV)
q_hp[i + 1], m_ch[i + 1], t_sup_hp[i + 1] = 0, 0, t_tank[i + 1]
if q_hp[i + 1] > 0:
if hp.source_medium == cte.AIR and self.hp.supply_medium == cte.WATER:
hp_cop[i + 1] = self.hp_characteristics.air_to_water_cop(source_temperature[i + 1], t_tank[i + 1], mode=cte.HEATING)
hp_electricity[i + 1] = q_hp[i + 1] / hp_cop[i + 1]
else:
hp_cop[i + 1] = 0
hp_electricity[i + 1] = 0
else:
q_hp[i + 1] = 0
t_sup_hp[i + 1] = t_tank[i + 1]
if t_tank[i + 1] < 40:
q_boiler[i + 1] = boiler.nominal_heat_output
m_ch[i + 1] = q_boiler[i + 1] / (cte.WATER_HEAT_CAPACITY * hp_delta_t)
elif 40 <= t_tank[i + 1] < self.upper_limit_tes and q_boiler[i] == 0:
q_boiler[i + 1] = 0
m_ch[i + 1] = 0
elif 40 <= t_tank[i + 1] < self.upper_limit_tes and q_boiler[i] > 0:
q_boiler[i + 1] = boiler.nominal_heat_output
m_ch[i + 1] = q_boiler[i + 1] / (cte.WATER_HEAT_CAPACITY * hp_delta_t)
else:
q_boiler[i + 1], m_ch[i + 1] = 0, 0
boiler_energy_consumption[i + 1] = q_boiler[i + 1] / float(boiler.heat_efficiency)
if boiler.fuel_type == cte.ELECTRICITY:
boiler_fuel_consumption[i + 1] = boiler_energy_consumption[i + 1]
else:
# TODO: Other fuels should be considered
boiler_fuel_consumption[i + 1] = (q_boiler[i + 1] * self.dt) / (
float(boiler.heat_efficiency) * cte.NATURAL_GAS_LHV)
if m_ch[i + 1] > 0:
t_sup_boiler[i + 1] = t_sup_hp[i + 1] + (q_boiler[i + 1] / (m_ch[i + 1] * cte.WATER_HEAT_CAPACITY))
else:
t_sup_boiler[i + 1] = t_sup_hp[i + 1]
# heating coil operation
if t_tank[i + 1] < 35:
q_coil[i + 1] = heating_coil_nominal_output
@ -107,20 +127,20 @@ class HeatPumpBoilerTesHeating:
# total consumption
total_consumption[i + 1] = hp_electricity[i + 1] + boiler_energy_consumption[i + 1] + q_coil[i + 1]
tes.temperature = []
hp_electricity_j = [(x * cte.WATTS_HOUR_TO_JULES) / number_of_ts for x in hp_electricity]
boiler_consumption_j = [(x * cte.WATTS_HOUR_TO_JULES) / number_of_ts for x in boiler_energy_consumption]
heating_coil_j = [(x * cte.WATTS_HOUR_TO_JULES) / number_of_ts for x in q_coil]
hp_electricity_j = [(x * cte.WATTS_HOUR_TO_JULES) / number_of_ts for x in hp_electricity[1:]]
boiler_consumption_j = [(x * cte.WATTS_HOUR_TO_JULES) / number_of_ts for x in boiler_energy_consumption[1:]]
heating_coil_j = [(x * cte.WATTS_HOUR_TO_JULES) / number_of_ts for x in q_coil[1:]]
hp_hourly = []
boiler_hourly = []
coil_hourly = []
boiler_sum = 0
hp_sum = 0
coil_sum = 0
for i in range(1, len(demand)):
for i in range(len(demand) - 1):
hp_sum += hp_electricity_j[i]
boiler_sum += boiler_consumption_j[i]
coil_sum += heating_coil_j[i]
if (i - 1) % number_of_ts == 0:
if i % number_of_ts == 0 or i == len(demand) - 1:
tes.temperature.append(t_tank[i])
hp_hourly.append(hp_sum)
boiler_hourly.append(boiler_sum)
@ -130,37 +150,41 @@ class HeatPumpBoilerTesHeating:
coil_sum = 0
hp.energy_consumption[cte.HEATING] = {}
hp.energy_consumption[cte.HEATING][cte.HOUR] = hp_hourly
hp.energy_consumption[cte.HEATING][cte.MONTH] = MonthlyValues.get_total_month(
hp.energy_consumption[cte.HEATING][cte.HOUR])
hp.energy_consumption[cte.HEATING][cte.YEAR] = [
sum(hp.energy_consumption[cte.HEATING][cte.MONTH])]
boiler.energy_consumption[cte.HEATING] = {}
boiler.energy_consumption[cte.HEATING][cte.HOUR] = boiler_hourly
boiler.energy_consumption[cte.HEATING][cte.MONTH] = MonthlyValues.get_total_month(
boiler.energy_consumption[cte.HEATING][cte.HOUR])
boiler.energy_consumption[cte.HEATING][cte.YEAR] = [
sum(boiler.energy_consumption[cte.HEATING][cte.MONTH])]
if boiler is not None:
boiler.energy_consumption[cte.HEATING] = {}
boiler.energy_consumption[cte.HEATING][cte.HOUR] = boiler_hourly
if len(self.heating_demand) == 8760:
hp.energy_consumption[cte.HEATING][cte.MONTH] = MonthlyValues.get_total_month(
hp.energy_consumption[cte.HEATING][cte.HOUR])
hp.energy_consumption[cte.HEATING][cte.YEAR] = [
sum(hp.energy_consumption[cte.HEATING][cte.MONTH])]
if boiler is not None:
boiler.energy_consumption[cte.HEATING][cte.MONTH] = MonthlyValues.get_total_month(
boiler.energy_consumption[cte.HEATING][cte.HOUR])
boiler.energy_consumption[cte.HEATING][cte.YEAR] = [
sum(boiler.energy_consumption[cte.HEATING][cte.MONTH])]
if tes.heating_coil_capacity is not None:
tes.heating_coil_energy_consumption[cte.HEATING] = {}
tes.heating_coil_energy_consumption[cte.HEATING][cte.HOUR] = coil_hourly
tes.heating_coil_energy_consumption[cte.HEATING][cte.MONTH] = MonthlyValues.get_total_month(
tes.heating_coil_energy_consumption[cte.HEATING][cte.HOUR])
tes.heating_coil_energy_consumption[cte.HEATING][cte.YEAR] = [
sum(tes.heating_coil_energy_consumption[cte.HEATING][cte.MONTH])]
self.results['Heating Demand (W)'] = demand
self.results['HP Heat Output (W)'] = q_hp
self.results['HP Source Temperature'] = source_temperature
self.results['HP Supply Temperature'] = t_sup_hp
self.results['HP COP'] = hp_cop
self.results['HP Electricity Consumption (W)'] = hp_electricity
self.results['Boiler Heat Output (W)'] = q_boiler
self.results['Boiler Power Consumption (W)'] = boiler_energy_consumption
self.results['Boiler Supply Temperature'] = t_sup_boiler
self.results['Boiler Fuel Consumption'] = boiler_fuel_consumption
self.results['TES Temperature'] = t_tank
self.results['Heating Coil heat input'] = q_coil
self.results['TES Charging Flow Rate (kg/s)'] = m_ch
self.results['TES Discharge Flow Rate (kg/s)'] = m_dis
self.results['Heating Loop Return Temperature'] = t_ret
self.results['Total Heating Power Consumption (W)'] = total_consumption
if len(self.heating_demand) == 8760:
tes.heating_coil_energy_consumption[cte.HEATING][cte.HOUR] = coil_hourly
tes.heating_coil_energy_consumption[cte.HEATING][cte.MONTH] = MonthlyValues.get_total_month(
tes.heating_coil_energy_consumption[cte.HEATING][cte.HOUR])
tes.heating_coil_energy_consumption[cte.HEATING][cte.YEAR] = [
sum(tes.heating_coil_energy_consumption[cte.HEATING][cte.MONTH])]
self.results['Heating Demand (W)'] = demand[1:]
self.results['HP Heat Output (W)'] = q_hp[1:]
self.results['HP Source Temperature'] = source_temperature[1:]
self.results['HP Supply Temperature'] = t_sup_hp[1:]
self.results['HP COP'] = hp_cop[1:]
self.results['HP Electricity Consumption (W)'] = hp_electricity[1:]
self.results['Boiler Heat Output (W)'] = q_boiler[1:]
self.results['Boiler Power Consumption (W)'] = boiler_energy_consumption[1:]
self.results['Boiler Supply Temperature'] = t_sup_boiler[1:]
self.results['Boiler Fuel Consumption'] = boiler_fuel_consumption[1:]
self.results['TES Temperature'] = t_tank[1:]
self.results['Heating Coil heat input'] = q_coil[1:]
self.results['TES Charging Flow Rate (kg/s)'] = m_ch[1:]
self.results['TES Discharge Flow Rate (kg/s)'] = m_dis[1:]
self.results['Heating Loop Return Temperature'] = t_ret[1:]
self.results['Total Heating Power Consumption (W)'] = total_consumption[1:]
return self.results

View File

@ -38,7 +38,7 @@ class HeatPump:
cop_curve_coefficients[2] * t_inlet_water_fahrenheit ** 2 +
cop_curve_coefficients[3] * t_source_fahrenheit +
cop_curve_coefficients[4] * t_source_fahrenheit ** 2 +
cop_curve_coefficients[5] * t_inlet_water_fahrenheit * t_source_fahrenheit))
cop_curve_coefficients[5] * t_inlet_water_fahrenheit * t_source_fahrenheit)) / 3.41214
hp_efficiency = float(self.hp.cooling_efficiency)
else:
if self.hp.heat_efficiency_curve is not None:

View File

@ -53,32 +53,35 @@ class HeatPumpCooling:
if self.hp.source_medium == cte.AIR and self.hp.supply_medium == cte.WATER:
hp_cop[i] = self.heat_pump_characteristics.air_to_water_cop(source_temperature[i], t_ret[i],
mode=cte.COOLING)
hp_electricity[i] = q_hp[i] / cooling_efficiency
hp_electricity[i] = q_hp[i] / hp_cop[i]
else:
hp_cop[i] = 0
hp_electricity[i] = 0
hp_electricity_j = [(x * cte.WATTS_HOUR_TO_JULES) / number_of_ts for x in hp_electricity]
hp_electricity_j = [(x * cte.WATTS_HOUR_TO_JULES) / number_of_ts for x in hp_electricity[1:]]
hp_hourly = []
hp_supply_temperature_hourly = []
hp_sum = 0
for i in range(1, len(demand)):
for i in range(len(demand) - 1):
hp_sum += hp_electricity_j[i]
if (i - 1) % number_of_ts == 0:
hp_hourly.append(hp_sum)
hp_supply_temperature_hourly.append(t_sup_hp[i])
hp_sum = 0
self.hp.cooling_supply_temperature = hp_supply_temperature_hourly
self.hp.energy_consumption[cte.COOLING] = {}
self.hp.energy_consumption[cte.COOLING][cte.HOUR] = hp_hourly
self.hp.energy_consumption[cte.COOLING][cte.MONTH] = MonthlyValues.get_total_month(
self.hp.energy_consumption[cte.COOLING][cte.HOUR])
self.hp.energy_consumption[cte.COOLING][cte.YEAR] = [
sum(self.hp.energy_consumption[cte.COOLING][cte.MONTH])]
self.results['Cooling Demand (W)'] = demand
self.results['HP Cooling Output (W)'] = q_hp
self.results['HP Source Temperature'] = source_temperature
self.results['HP Cooling Supply Temperature'] = t_sup_hp
self.results['HP Cooling COP'] = hp_cop
self.results['HP Electricity Consumption'] = hp_electricity
self.results['Cooling Loop Flow Rate (kg/s)'] = m
self.results['Cooling Loop Return Temperature'] = t_ret
self.results['Cooling Demand (W)'] = demand[1:]
self.results['HP Cooling Output (W)'] = q_hp[1:]
self.results['HP Cooling Source Temperature'] = source_temperature[1:]
self.results['HP Cooling Supply Temperature'] = t_sup_hp[1:]
self.results['HP Cooling COP'] = hp_cop[1:]
self.results['HP Electricity Consumption'] = hp_electricity[1:]
self.results['Cooling Loop Flow Rate (kg/s)'] = m[1:]
self.results['Cooling Loop Return Temperature'] = t_ret[1:]
return self.results

View File

@ -13,17 +13,18 @@ class MontrealEnergySystemArchetypesSimulationFactory:
EnergySystemsFactory class
"""
def __init__(self, handler, building, output_path):
def __init__(self, handler, building, output_path, csv_output=True):
self._output_path = output_path
self._handler = '_' + handler.lower()
self._building = building
self._csv_output = csv_output
def _archetype_cluster_1(self):
"""
Enrich the city by using the sizing and simulation model developed for archetype13 of montreal_future_systems
"""
dt = 900
ArchetypeCluster1(self._building, dt, self._output_path, csv_output=True).enrich_building()
ArchetypeCluster1(self._building, dt, self._output_path, self._csv_output).enrich_building()
self._building.level_of_detail.energy_systems = 2
self._building.level_of_detail.energy_systems = 2

View File

@ -0,0 +1,506 @@
import math
import random
from hub.helpers.dictionaries import Dictionaries
from hub.catalog_factories.costs_catalog_factory import CostsCatalogFactory
import hub.helpers.constants as cte
from energy_system_modelling_package.energy_system_modelling_factories.hvac_dhw_systems_simulation_models.domestic_hot_water_heat_pump_with_tes import \
DomesticHotWaterHeatPumpTes
from energy_system_modelling_package.energy_system_modelling_factories.hvac_dhw_systems_simulation_models.heat_pump_boiler_tes_heating import \
HeatPumpBoilerTesHeating
import numpy_financial as npf
class Individual:
def __init__(self, building, energy_system, design_period_energy_demands, optimization_scenario,
heating_design_load=None, cooling_design_load=None, dt=900, fuel_price_index=0.05,
electricity_tariff_type='fixed', consumer_price_index=0.04, interest_rate=0.04,
discount_rate=0.03, percentage_credit=0, credit_years=15):
"""
:param building: building object
:param energy_system: energy system to be optimized
:param design_period_energy_demands: A dictionary of design period heating, cooling and dhw demands. Design period
is the day with the highest total demand and the two days before and after it
:param optimization_scenario: a string indicating the objective function from minimization of cost,
energy consumption, and both together
:param heating_design_load: heating design load in W
:param cooling_design_load: cooling design load in W
:param dt the time step size used for simulations
:param fuel_price_index the price increase index of all fuels. A single value is used for all fuels.
:param electricity_tariff_type the electricity tariff type between 'fixed' and 'variable' for economic optimization
:param consumer_price_index
"""
self.building = building
self.energy_system = energy_system
self.design_period_energy_demands = design_period_energy_demands
self.demand_types = energy_system.demand_types
self.optimization_scenario = optimization_scenario
self.heating_design_load = heating_design_load
self.cooling_design_load = cooling_design_load
self.available_space = building.volume / building.storeys_above_ground
self.dt = dt
self.fuel_price_index = fuel_price_index
self.electricity_tariff_type = electricity_tariff_type
self.consumer_price_index = consumer_price_index
self.interest_rate = interest_rate
self.discount_rate = discount_rate
self.credit_years = credit_years
self.percentage_credit = percentage_credit
self.costs = self.costs_archetype()
self.feasibility = True
self.fitness_score = 0
self.rank = 0
self.crowding_distance = 0
self.individual = {}
def system_components(self):
"""
Extracts system components (generation and storage) for a given energy system.
:return: Dictionary of system components.
"""
self.individual['Generation Components'] = []
self.individual['Energy Storage Components'] = []
self.individual['End of Life Cost'] = self.costs.end_of_life_cost
for generation_component in self.energy_system.generation_systems:
investment_cost, reposition_cost, lifetime = self.unit_investment_cost('Generation',
generation_component.system_type)
maintenance_cost = self.unit_maintenance_cost(generation_component)
if generation_component.system_type == cte.PHOTOVOLTAIC:
heating_capacity = None
cooling_capacity = None
heat_efficiency = None
cooling_efficiency = None
unit_fuel_cost = 0
else:
heating_capacity = 0
cooling_capacity = 0
heat_efficiency = generation_component.heat_efficiency
cooling_efficiency = generation_component.cooling_efficiency
unit_fuel_cost = self.fuel_cost_per_kwh(generation_component.fuel_type, 'fixed')
self.individual['Generation Components'].append({
'type': generation_component.system_type,
'heating_capacity': heating_capacity,
'cooling_capacity': cooling_capacity,
'electricity_capacity': 0,
'nominal_heating_efficiency': heat_efficiency,
'nominal_cooling_efficiency': cooling_efficiency,
'nominal_electricity_efficiency': generation_component.electricity_efficiency,
'fuel_type': generation_component.fuel_type,
'unit_investment_cost': investment_cost,
'unit_reposition_cost': reposition_cost,
'unit_fuel_cost(CAD/kWh)': unit_fuel_cost,
'unit_maintenance_cost': maintenance_cost,
'lifetime': lifetime
})
if generation_component.energy_storage_systems is not None:
for energy_storage_system in generation_component.energy_storage_systems:
investment_cost, reposition_cost, lifetime = (
self.unit_investment_cost('Storage',
f'{energy_storage_system.type_energy_stored}_storage'))
if energy_storage_system.type_energy_stored == cte.THERMAL:
heating_coil_capacity = energy_storage_system.heating_coil_capacity
heating_coil_fuel_cost = self.fuel_cost_per_kwh(f'{cte.ELECTRICITY}', 'fixed')
volume = 0
capacity = None
else:
heating_coil_capacity = None
heating_coil_fuel_cost = None
volume = None
capacity = 0
self.individual['Energy Storage Components'].append(
{'type': f'{energy_storage_system.type_energy_stored}_storage',
'capacity': capacity,
'volume': volume,
'heating_coil_capacity': heating_coil_capacity,
'unit_investment_cost': investment_cost,
'unit_reposition_cost': reposition_cost,
'heating_coil_fuel_cost': heating_coil_fuel_cost,
'unit_maintenance_cost': 0,
'lifetime': lifetime
})
def initialization(self):
"""
Assigns initial sizes to generation and storage components based on heating and cooling design loads and
available space in the building.
:return:
"""
self.system_components()
generation_components = self.individual['Generation Components']
storage_components = self.individual['Energy Storage Components']
for generation_component in generation_components:
if generation_component[
'nominal_heating_efficiency'] is not None and cte.HEATING or cte.DOMESTIC_HOT_WATER in self.demand_types:
if self.heating_design_load is not None:
generation_component['heating_capacity'] = random.uniform(0, self.heating_design_load)
else:
if cte.HEATING in self.demand_types:
generation_component['heating_capacity'] = random.uniform(0,
self.building.heating_peak_load[cte.YEAR][0])
else:
generation_component['heating_capacity'] = random.uniform(0,
self.building.domestic_hot_water_peak_load[cte.YEAR][0])
else:
generation_component['heating_capacity'] = None
if generation_component['nominal_cooling_efficiency'] is not None and cte.COOLING in self.demand_types:
if self.cooling_design_load is not None:
generation_component['cooling_capacity'] = random.uniform(0, self.cooling_design_load)
else:
generation_component['cooling_capacity'] = random.uniform(0,
self.building.cooling_peak_load[cte.YEAR][0])
else:
generation_component['cooling_capacity'] = None
if generation_component['nominal_electricity_efficiency'] is None:
generation_component['electricity_capacity'] = None
for storage_component in storage_components:
if storage_component['type'] == f'{cte.THERMAL}_storage':
storage_operation_range = 10 if cte.DOMESTIC_HOT_WATER in self.demand_types else 20
storage_energy_capacity = random.uniform(0, 6)
volume = (storage_energy_capacity * self.building.heating_peak_load[cte.YEAR][0] * 3600) / (cte.WATER_HEAT_CAPACITY * 1000 * storage_operation_range)
max_available_space = min(0.01 * self.available_space, volume)
storage_component['volume'] = random.uniform(0, max_available_space)
if storage_component['heating_coil_capacity'] is not None:
if self.heating_design_load is not None:
storage_component['heating_coil_capacity'] = random.uniform(0, self.heating_design_load)
else:
if cte.HEATING in self.demand_types:
storage_component['heating_coil_capacity'] = random.uniform(0,
self.building.heating_peak_load[cte.YEAR][0])
else:
storage_component['heating_coil_capacity'] = random.uniform(0,
self.building.domestic_hot_water_peak_load[cte.YEAR][0])
def score_evaluation(self):
self.system_simulation()
self.individual['feasible'] = self.feasibility
lcc = 0
total_energy_consumption = 0
if self.feasibility:
if 'cost' in self.optimization_scenario:
investment_cost = 0
operation_cost_year_0 = 0
maintenance_cost_year_0 = 0
for generation_system in self.individual['Generation Components']:
heating_capacity = 0 if generation_system['heating_capacity'] is None else generation_system[
'heating_capacity']
cooling_capacity = 0 if generation_system['cooling_capacity'] is None else generation_system[
'cooling_capacity']
capacity = max(heating_capacity, cooling_capacity)
investment_cost += capacity * generation_system['unit_investment_cost'] / 1000
maintenance_cost_year_0 += capacity * generation_system['unit_maintenance_cost'] / 1000
operation_cost_year_0 += (generation_system['total_energy_consumption(kWh)'] *
generation_system['unit_fuel_cost(CAD/kWh)'])
for storage_system in self.individual['Energy Storage Components']:
if cte.THERMAL in storage_system['type']:
investment_cost += storage_system['volume'] * storage_system['unit_investment_cost']
if storage_system['heating_coil_capacity'] is not None:
operation_cost_year_0 += (storage_system['total_energy_consumption(kWh)'] *
storage_system['heating_coil_fuel_cost'])
lcc = self.life_cycle_cost_calculation(investment_cost=investment_cost,
operation_cost_year_0=operation_cost_year_0,
maintenance_cost_year_0=maintenance_cost_year_0)
self.individual['lcc'] = lcc
if 'energy-consumption' in self.optimization_scenario:
total_energy_consumption = 0
for generation_system in self.individual['Generation Components']:
total_energy_consumption += generation_system['total_energy_consumption(kWh)']
for storage_system in self.individual['Energy Storage Components']:
if cte.THERMAL in storage_system['type'] and storage_system['heating_coil_capacity'] is not None:
total_energy_consumption += storage_system['total_energy_consumption(kWh)']
self.individual['total_energy_consumption'] = total_energy_consumption
# Fitness score based on the optimization scenario
if self.optimization_scenario == 'cost':
self.fitness_score = lcc
self.individual['fitness_score'] = lcc
elif self.optimization_scenario == 'energy-consumption':
self.fitness_score = total_energy_consumption
self.individual['fitness_score'] = total_energy_consumption
elif self.optimization_scenario == 'cost_energy-consumption':
self.fitness_score = (lcc, total_energy_consumption)
elif self.optimization_scenario == 'energy-consumption_cost':
self.fitness_score = (total_energy_consumption, lcc)
self.individual['fitness_score'] = (lcc, total_energy_consumption)
else:
lcc = float('inf')
total_energy_consumption = float('inf')
self.individual['lcc'] = lcc
self.individual['total_energy_consumption'] = total_energy_consumption
if self.optimization_scenario == 'cost_energy-consumption' or self.optimization_scenario == 'energy-consumption_cost':
self.individual['fitness_score'] = (float('inf'), float('inf'))
self.fitness_score = (float('inf'), float('inf'))
else:
self.individual['fitness_score'] = float('inf')
self.fitness_score = float('inf')
def system_simulation(self):
"""
The method to run the energy system model using the existing models in the energy_system_modelling_package.
Based on cluster id and demands, model is imported and run.
:return: dictionary of results
"""
if self.building.energy_systems_archetype_cluster_id == '1':
if cte.HEATING in self.demand_types:
boiler = self.energy_system.generation_systems[0]
boiler.nominal_heat_output = self.individual['Generation Components'][0]['heating_capacity']
hp = self.energy_system.generation_systems[1]
hp.nominal_heat_output = self.individual['Generation Components'][1]['heating_capacity']
tes = self.energy_system.generation_systems[0].energy_storage_systems[0]
tes.volume = self.individual['Energy Storage Components'][0]['volume']
tes.height = self.building.average_storey_height - 1
tes.heating_coil_capacity = self.individual['Energy Storage Components'][0]['heating_coil_capacity'] \
if self.individual['Energy Storage Components'][0]['heating_coil_capacity'] is not None else None
heating_demand_joules = self.design_period_energy_demands[cte.HEATING]['demands']
heating_peak_load_watts = max(self.design_period_energy_demands[cte.HEATING]) if \
(self.heating_design_load is not None) else self.building.heating_peak_load[cte.YEAR][0]
upper_limit_tes_heating = 55
design_period_start_index = self.design_period_energy_demands[cte.HEATING]['start_index']
design_period_end_index = self.design_period_energy_demands[cte.HEATING]['end_index']
outdoor_temperature = self.building.external_temperature[cte.HOUR][
design_period_start_index:design_period_end_index]
results = HeatPumpBoilerTesHeating(hp=hp,
boiler=boiler,
tes=tes,
hourly_heating_demand_joules=heating_demand_joules,
heating_peak_load_watts=heating_peak_load_watts,
upper_limit_tes=upper_limit_tes_heating,
outdoor_temperature=outdoor_temperature,
dt=self.dt).simulation()
if min(results['TES Temperature']) < 35:
self.feasibility = False
elif cte.DOMESTIC_HOT_WATER in self.demand_types:
hp = self.energy_system.generation_systems[0]
hp.nominal_heat_output = self.individual['Generation Components'][0]['heating_capacity']
tes = self.energy_system.generation_systems[0].energy_storage_systems[0]
tes.volume = self.individual['Energy Storage Components'][0]['volume']
tes.height = self.building.average_storey_height - 1
tes.heating_coil_capacity = self.individual['Energy Storage Components'][0]['heating_coil_capacity'] \
if self.individual['Energy Storage Components'][0]['heating_coil_capacity'] is not None else None
dhw_demand_joules = self.design_period_energy_demands[cte.DOMESTIC_HOT_WATER]['demands']
upper_limit_tes = 65
design_period_start_index = self.design_period_energy_demands[cte.DOMESTIC_HOT_WATER]['start_index']
design_period_end_index = self.design_period_energy_demands[cte.DOMESTIC_HOT_WATER]['end_index']
outdoor_temperature = self.building.external_temperature[cte.HOUR][
design_period_start_index:design_period_end_index]
results = DomesticHotWaterHeatPumpTes(hp=hp,
tes=tes,
hourly_dhw_demand_joules=dhw_demand_joules,
upper_limit_tes=upper_limit_tes,
outdoor_temperature=outdoor_temperature,
dt=self.dt).simulation()
if min(results['DHW TES Temperature']) < 55:
self.feasibility = False
elif self.building.energy_systems_archetype_cluster_id == '3':
if cte.HEATING in self.demand_types:
hp = self.energy_system.generation_systems[0]
hp.nominal_heat_output = self.individual['Generation Components'][0]['heating_capacity']
tes = self.energy_system.generation_systems[0].energy_storage_systems[0]
tes.volume = self.individual['Energy Storage Components'][0]['volume']
tes.height = self.building.average_storey_height - 1
tes.heating_coil_capacity = self.individual['Energy Storage Components'][0]['heating_coil_capacity'] \
if self.individual['Energy Storage Components'][0]['heating_coil_capacity'] is not None else None
heating_demand_joules = self.design_period_energy_demands[cte.HEATING]['demands']
heating_peak_load_watts = max(self.design_period_energy_demands[cte.HEATING]) if \
(self.heating_design_load is not None) else self.building.heating_peak_load[cte.YEAR][0]
upper_limit_tes_heating = 55
design_period_start_index = self.design_period_energy_demands[cte.HEATING]['start_index']
design_period_end_index = self.design_period_energy_demands[cte.HEATING]['end_index']
outdoor_temperature = self.building.external_temperature[cte.HOUR][
design_period_start_index:design_period_end_index]
results = HeatPumpBoilerTesHeating(hp=hp,
boiler=None,
tes=tes,
hourly_heating_demand_joules=heating_demand_joules,
heating_peak_load_watts=heating_peak_load_watts,
upper_limit_tes=upper_limit_tes_heating,
outdoor_temperature=outdoor_temperature,
dt=self.dt).simulation()
if min(results['TES Temperature']) < 35:
self.feasibility = False
if self.feasibility:
generation_system_types = [generation_system.system_type for generation_system in
self.energy_system.generation_systems]
for generation_component in self.individual['Generation Components']:
if generation_component['type'] in generation_system_types:
index = generation_system_types.index(generation_component['type'])
for demand_type in self.demand_types:
if demand_type in self.energy_system.generation_systems[index].energy_consumption:
generation_component['total_energy_consumption(kWh)'] = (sum(
self.energy_system.generation_systems[index].energy_consumption[demand_type][cte.HOUR]) / 3.6e6)
for storage_component in self.individual['Energy Storage Components']:
if storage_component['type'] == f'{cte.THERMAL}_storage' and storage_component[
'heating_coil_capacity'] is not None:
for generation_system in self.energy_system.generation_systems:
if generation_system.energy_storage_systems is not None:
for storage_system in generation_system.energy_storage_systems:
if storage_system.type_energy_stored == cte.THERMAL:
for demand_type in self.demand_types:
if demand_type in storage_system.heating_coil_energy_consumption:
storage_component['total_energy_consumption(kWh)'] = (sum(
storage_system.heating_coil_energy_consumption[demand_type][cte.HOUR]) / 3.6e6)
def life_cycle_cost_calculation(self, investment_cost, operation_cost_year_0, maintenance_cost_year_0,
life_cycle_duration=41):
"""
Calculating the life cycle cost of the energy system. The original cost workflow in hub is not used to reduce
computation time.Here are the steps:
1- Costs catalog and different components are imported.
2- Capital costs (investment and reposition) are calculated and appended to a list to have the capital cash
flow.
3-
:param maintenance_cost_year_0:
:param operation_cost_year_0:
:param investment_cost:
:param life_cycle_duration:
:return:
"""
capital_costs_cash_flow = [investment_cost]
operational_costs_cash_flow = [0]
maintenance_costs_cash_flow = [0]
end_of_life_costs = [0] * (life_cycle_duration + 1)
for i in range(1, life_cycle_duration + 1):
yearly_operational_cost = math.pow(1 + self.fuel_price_index, i) * operation_cost_year_0
yearly_maintenance_cost = math.pow(1 + self.consumer_price_index, i) * maintenance_cost_year_0
yearly_capital_cost = 0
for generation_system in self.individual['Generation Components']:
if (i % generation_system['lifetime']) == 0 and i != (life_cycle_duration - 1):
cost_increase = math.pow(1 + self.consumer_price_index, i)
heating_capacity = 0 if generation_system['heating_capacity'] is None else generation_system[
'heating_capacity']
cooling_capacity = 0 if generation_system['cooling_capacity'] is None else generation_system[
'cooling_capacity']
capacity = max(heating_capacity, cooling_capacity)
yearly_capital_cost += generation_system['unit_reposition_cost'] * capacity * cost_increase / 1000
yearly_capital_cost += -npf.pmt(self.interest_rate, self.credit_years,
investment_cost * self.percentage_credit)
for storage_system in self.individual['Energy Storage Components']:
if (i % storage_system['lifetime']) == 0 and i != (life_cycle_duration - 1):
cost_increase = math.pow(1 + self.consumer_price_index, i)
capacity = storage_system['volume'] if cte.THERMAL in storage_system['type'] else storage_system['capacity']
yearly_capital_cost += storage_system['unit_reposition_cost'] * capacity * cost_increase
yearly_capital_cost += -npf.pmt(self.interest_rate, self.credit_years,
investment_cost * self.percentage_credit)
capital_costs_cash_flow.append(yearly_capital_cost)
operational_costs_cash_flow.append(yearly_operational_cost)
maintenance_costs_cash_flow.append(yearly_maintenance_cost)
for year in range(1, life_cycle_duration + 1):
price_increase = math.pow(1 + self.consumer_price_index, year)
if year == life_cycle_duration:
end_of_life_costs[year] = (
self.building.thermal_zones_from_internal_zones[0].total_floor_area *
self.individual['End of Life Cost'] * price_increase
)
life_cycle_capital_cost = npf.npv(self.discount_rate, capital_costs_cash_flow)
life_cycle_operational_cost = npf.npv(self.discount_rate, operational_costs_cash_flow)
life_cycle_maintenance_cost = npf.npv(self.discount_rate, maintenance_costs_cash_flow)
life_cycle_end_of_life_cost = npf.npv(self.discount_rate, end_of_life_costs)
total_life_cycle_cost = life_cycle_capital_cost + life_cycle_operational_cost + life_cycle_maintenance_cost + life_cycle_end_of_life_cost
return total_life_cycle_cost
def costs_archetype(self):
costs_catalogue = CostsCatalogFactory('montreal_new').catalog.entries().archetypes
dictionary = Dictionaries().hub_function_to_montreal_custom_costs_function
costs_archetype = None
for archetype in costs_catalogue:
if dictionary[str(self.building.function)] == str(archetype.function):
costs_archetype = archetype
return costs_archetype
def unit_investment_cost(self, component_category, component_type):
"""
Reading the investment and reposition costs of any component from costs catalogue
:param component_category: Due to the categorizations in the cost catalogue, we need this parameter to realize if
the component is a generation or storage component
:param component_type: Type of the component
:return:
"""
investment_cost = 0
reposition_cost = 0
lifetime = 0
name = ''
capital_costs_chapter = self.costs.capital_cost.chapter('D_services')
if component_category == 'Generation':
generation_systems = self.energy_system.generation_systems
for generation_system in generation_systems:
if component_type == generation_system.system_type:
if generation_system.system_type == cte.HEAT_PUMP:
name += (
generation_system.source_medium.lower() + '_to_' + generation_system.supply_medium.lower() + '_' +
generation_system.system_type.lower().replace(' ', '_'))
elif generation_system.system_type == cte.BOILER:
if generation_system.fuel_type == cte.ELECTRICITY:
name += cte.ELECTRICAL.lower() + f'_{generation_system.system_type}'.lower()
else:
name += generation_system.fuel_type.lower() + f'_{generation_system.system_type}'.lower()
elif generation_system.system_type == cte.PHOTOVOLTAIC:
name += 'D2010_photovoltaic_system'
else:
if cte.HEATING or cte.DOMESTIC_HOT_WATER in self.demand_types:
name += 'template_heat'
else:
name += 'template_cooling'
for item in capital_costs_chapter.items:
if name in item.type:
investment_cost += float(capital_costs_chapter.item(item.type).initial_investment[0])
reposition_cost += float(capital_costs_chapter.item(item.type).reposition[0])
lifetime += float(capital_costs_chapter.item(item.type).lifetime)
elif component_category == 'Storage':
for generation_system in self.energy_system.generation_systems:
if generation_system.energy_storage_systems is not None:
for energy_storage_system in generation_system.energy_storage_systems:
if energy_storage_system.type_energy_stored == cte.THERMAL:
if energy_storage_system.heating_coil_capacity is not None:
investment_cost += float(capital_costs_chapter.item('D306010_storage_tank').initial_investment[0])
reposition_cost += float(capital_costs_chapter.item('D306010_storage_tank').reposition[0])
lifetime += float(capital_costs_chapter.item('D306010_storage_tank').lifetime)
else:
investment_cost += float(
capital_costs_chapter.item('D306020_storage_tank_with_coil').initial_investment[0])
reposition_cost += float(capital_costs_chapter.item('D306020_storage_tank_with_coil').reposition[0])
lifetime += float(capital_costs_chapter.item('D306020_storage_tank_with_coil').lifetime)
return investment_cost, reposition_cost, lifetime
def unit_maintenance_cost(self, generation_system):
hvac_maintenance = self.costs.operational_cost.maintenance_hvac
pv_maintenance = self.costs.operational_cost.maintenance_pv
maintenance_cost = 0
component = None
if generation_system.system_type == cte.HEAT_PUMP and generation_system.source_medium == cte.AIR:
component = 'air_source_heat_pump'
elif generation_system.system_type == cte.HEAT_PUMP and generation_system.source_medium == cte.GROUND:
component = 'ground_source_heat_pump'
elif generation_system.system_type == cte.HEAT_PUMP and generation_system.source_medium == cte.WATER:
component = 'water_source_heat_pump'
elif generation_system.system_type == cte.BOILER and generation_system.fuel_type == cte.GAS:
component = 'gas_boiler'
elif generation_system.system_type == cte.BOILER and generation_system.fuel_type == cte.ELECTRICITY:
component = 'electric_boiler'
elif generation_system.system_type == cte.PHOTOVOLTAIC:
maintenance_cost += pv_maintenance
else:
if cte.HEATING or cte.DOMESTIC_HOT_WATER in self.demand_types:
component = 'general_heating_equipment'
else:
component = 'general_cooling_equipment'
for item in hvac_maintenance:
if item.type == component:
maintenance_cost += item.maintenance[0]
return maintenance_cost
def fuel_cost_per_kwh(self, fuel_type, fuel_cost_tariff_type):
fuel_cost = 0
catalog_fuels = self.costs.operational_cost.fuels
for fuel in catalog_fuels:
if fuel_type == fuel.type and fuel_cost_tariff_type == fuel.variable.rate_type:
if fuel.type == cte.ELECTRICITY and fuel_cost_tariff_type == 'fixed':
fuel_cost = fuel.variable.values[0]
elif fuel.type == cte.ELECTRICITY and fuel_cost_tariff_type == 'variable':
fuel_cost = fuel.variable.values[0]
else:
if fuel.type == cte.BIOMASS:
conversion_factor = 1
else:
conversion_factor = fuel.density[0]
fuel_cost = fuel.variable.values[0] / (conversion_factor * fuel.lower_heating_value[0] * 0.277)
return fuel_cost

View File

@ -0,0 +1,827 @@
import copy
import math
import random
import hub.helpers.constants as cte
from energy_system_modelling_package.energy_system_modelling_factories.system_sizing_methods.genetic_algorithm.individual import \
Individual
import matplotlib.pyplot as plt
class MultiObjectiveGeneticAlgorithm:
"""
A class to perform NSGA-II optimization. This class is specifically designed to optimize
multiple conflicting objectives, facilitating evolutionary-based search to determine the optimal sizes of all
components in an energy system.
Attributes:
----------
population_size : int
The total number of individuals in the population.
generations : int
The number of generations to evolve the population.
crossover_rate : float
Probability of crossover between pairs of individuals.
mutation_rate : float
Probability of mutation in individual genes.
number_of_selected_solutions : int, optional
Number of optimal solutions to select from the final population.
optimization_scenario : str
The specific optimization scenario defining the objectives to be optimized.
pareto_front : list, optional
Stores the Pareto front solutions after optimization, representing non-dominated solutions.
Methods:
--------
__init__(self, population_size=40, generations=50, crossover_rate=0.9, mutation_rate=0.33,
number_of_selected_solutions=None, optimization_scenario=None)
Initializes the MultiObjectiveGeneticAlgorithm with population size, generation count, and evolutionary
operators such as crossover and mutation rates.
"""
def __init__(self, population_size=100, generations=50, initial_crossover_rate=0.9, final_crossover_rate=0.5,
mutation_rate=0.1, number_of_selected_solutions=None, optimization_scenario=None):
self.population_size = population_size
self.population = []
self.generations = generations
self.initial_crossover_rate = initial_crossover_rate
self.final_crossover_rate = final_crossover_rate
self.crossover_rate = initial_crossover_rate # Initial value
self.mutation_rate = mutation_rate
self.optimization_scenario = optimization_scenario
self.number_of_selected_solutions = number_of_selected_solutions
self.selected_solutions = None
self.pareto_front = None
# Initialize Population
def initialize_population(self, building, energy_system):
"""
Initializes the population for the genetic algorithm with feasible individuals based on building constraints
and energy system characteristics.
Parameters:
----------
building : Building
The building object containing data on building demands and properties.
energy_system : EnergySystem
The energy system object containing demand types and components to be optimized.
Procedure:
----------
1. Identifies a design period based on building energy demands.
2. Attempts to create and initialize individuals for the population until the specified population size
is reached or the maximum number of attempts is exceeded.
3. Each individual is evaluated for feasibility based on defined constraints.
4. Only feasible individuals are added to the population.
Raises:
-------
RuntimeError
If a feasible population of the specified size cannot be generated within the set maximum attempts.
Notes:
------
- This method stops when we achieve feasible population size or raises an error if unsuccessful.
- The feasibility of each individual is checked to ensure that it satisfies the constraints of the
building and energy system demands.
- The `design_period_energy_demands` is used to evaluate individual feasibility based on specific demand
periods within the buildings operation.
"""
design_period_energy_demands = self.design_period_identification(building)
attempts = 0
max_attempts = self.population_size * 20
while len(self.population) < self.population_size and attempts < max_attempts:
individual = Individual(building=building,
energy_system=energy_system,
design_period_energy_demands=design_period_energy_demands,
optimization_scenario=self.optimization_scenario)
individual.initialization()
individual.score_evaluation()
attempts += 1
self.population.append(individual)
# if individual.feasibility:
# self.population.append(individual)
if len(self.population) < self.population_size:
raise RuntimeError(f"Could not generate a feasible population of size {self.population_size}. "
f"Only {len(self.population)} feasible individuals were generated.")
def sbx_crossover(self, parent1, parent2):
"""
Simulated Binary Crossover (SBX) operator to produce offspring from two parent individuals by recombining their
attributes. This method is applied to both generation and storage components, using a distribution index
to control the diversity of the generated offspring.
Parameters:
----------
parent1 : Individual
The first parent individual, containing attributes for generation and storage components.
parent2 : Individual
The second parent individual with a similar structure to parent1.
Returns:
-------
tuple
Two new offspring individuals (child1, child2) generated via the SBX crossover process. If crossover is
not performed (based on crossover probability), it returns deep copies of the parent individuals.
Process:
-------
1. Determines whether crossover should occur based on a randomly generated probability and the crossover rate.
2. For each component in the generation and storage attributes of both parents:
- Calculates crossover coefficients for each attribute based on a randomly generated value `u`.
- Uses a distribution index (`eta_c`) to compute the value of `beta`, which controls the spread of offspring.
- Generates two offspring values (`alpha1`, `alpha2`) based on beta and the parent values (x1 and x2).
- Ensures that all offspring values are positive; if `alpha1` or `alpha2` is non-positive, assigns the
average of `x1` and `x2` instead.
3. Returns the two newly created offspring if crossover is successful; otherwise, returns copies of the parents.
Details:
--------
- `eta_c` (8): Distribution index influencing the SBX process; higher values result in offspring closer
to the parents, while lower values create more diversity.
- Each attribute in `Generation Components` (e.g., `heating_capacity`, `cooling_capacity`) and
`Energy Storage Components` (e.g., `capacity`, `volume`, `heating_coil_capacity`) has crossover applied.
- Ensures robustness by preventing the assignment of non-positive values to offspring attributes.
"""
eta_c = 8 # Distribution index for SBX
if random.random() < self.crossover_rate:
child1, child2 = copy.deepcopy(parent1), copy.deepcopy(parent2)
# Crossover for Generation Components
for i in range(len(parent1.individual['Generation Components'])):
for cap_type in ['heating_capacity', 'cooling_capacity']:
if parent1.individual['Generation Components'][i][cap_type] is not None:
x1 = parent1.individual['Generation Components'][i][cap_type]
x2 = parent2.individual['Generation Components'][i][cap_type]
u = random.random()
if u <= 0.5:
beta = (2 * u) ** (1 / (eta_c + 1))
else:
beta = (1 / (2 * (1 - u))) ** (1 / (eta_c + 1))
alpha1 = 0.5 * ((1 + beta) * x1 + (1 - beta) * x2)
alpha2 = 0.5 * ((1 - beta) * x1 + (1 + beta) * x2)
if alpha1 > 0:
child1.individual['Generation Components'][i][cap_type] = 0.5 * ((1 + beta) * x1 + (1 - beta) * x2)
else:
child1.individual['Generation Components'][i][cap_type] = 0.5 * (x1 + x2)
if alpha2 > 0:
child2.individual['Generation Components'][i][cap_type] = 0.5 * ((1 - beta) * x1 + (1 + beta) * x2)
else:
child2.individual['Generation Components'][i][cap_type] = 0.5 * (x1 + x2)
# Crossover for Energy Storage Components
for i in range(len(parent1.individual['Energy Storage Components'])):
for item in ['capacity', 'volume', 'heating_coil_capacity']:
if parent1.individual['Energy Storage Components'][i][item] is not None:
x1 = parent1.individual['Energy Storage Components'][i][item]
x2 = parent2.individual['Energy Storage Components'][i][item]
u = random.random()
if u <= 0.5:
beta = (2 * u) ** (1 / (eta_c + 1))
else:
beta = (1 / (2 * (1 - u))) ** (1 / (eta_c + 1))
alpha1 = 0.5 * ((1 + beta) * x1 + (1 - beta) * x2)
alpha2 = 0.5 * ((1 - beta) * x1 + (1 + beta) * x2)
if alpha1 > 0:
child1.individual['Energy Storage Components'][i][item] = 0.5 * ((1 + beta) * x1 + (1 - beta) * x2)
else:
child1.individual['Energy Storage Components'][i][item] = 0.5 * (x1 + x2)
if alpha2 > 0:
child2.individual['Energy Storage Components'][i][item] = 0.5 * ((1 - beta) * x1 + (1 + beta) * x2)
else:
child2.individual['Energy Storage Components'][i][item] = 0.5 * (x1 + x2)
return child1, child2
else:
return copy.deepcopy(parent1), copy.deepcopy(parent2)
def polynomial_mutation(self, individual, building, energy_system):
"""
Applies a Polynomial Mutation operator to modify the attributes of an individual's generation and storage components.
This operator introduces controlled variation in the population, guided by the mutation distribution index, to enhance
exploration within the search space.
Parameters:
----------
individual : Individual
The individual to mutate, containing attributes for generation and storage components.
building : Building
The building data, used to determine constraints such as peak heating load and available space.
energy_system : EnergySystem
The energy system configuration with specified demand types (e.g., heating, cooling, domestic hot water).
Returns:
-------
Individual
The mutated individual, with potentially adjusted values for generation and storage components.
Process:
-------
1. Defines the `polynomial_mutation_operator`, which applies the mutation based on the distribution index `eta_m`
and ensures the mutated value remains within defined bounds.
2. For each generation component of the individual:
- If the mutation rate threshold is met, performs mutation on `heating_capacity` and `cooling_capacity`,
bounded by the design period demands for the specified energy types.
3. For each energy storage component of the individual:
- If the mutation rate threshold is met, performs mutation on `volume` and `heating_coil_capacity` based on
available storage space and design period demands.
4. Returns the mutated individual.
Details:
--------
- `eta_m` (20): Mutation distribution index that controls the spread of the mutated values; higher values concentrate
mutated values near the original, while lower values increase variability.
- Uses design period demands to set mutation bounds, ensuring capacities stay within feasible ranges for heating,
cooling, and hot water demands.
- Each mutation is bounded to avoid negative or unreasonably large values.
"""
eta_m = 20 # Mutation distribution index
design_period_energy_demands = self.design_period_identification(building)
def polynomial_mutation_operator(value, lower_bound, upper_bound):
u = random.random()
if u < 0.5:
delta_q = (2 * u) ** (1 / (eta_m + 1)) - 1
else:
delta_q = 1 - (2 * (1 - u)) ** (1 / (eta_m + 1))
mutated_value = value + delta_q * (upper_bound - lower_bound)
return mutated_value
# Mutate Generation Components
for generation_component in individual['Generation Components']:
if random.random() < self.mutation_rate:
if (generation_component['nominal_heating_efficiency'] is not None and
(cte.HEATING or cte.DOMESTIC_HOT_WATER in energy_system.demand_types)):
if cte.HEATING in energy_system.demand_types:
max_demand = max(design_period_energy_demands[cte.HEATING]['demands']) / cte.WATTS_HOUR_TO_JULES
else:
max_demand = max(design_period_energy_demands[cte.DOMESTIC_HOT_WATER]['demands']) / cte.WATTS_HOUR_TO_JULES
generation_component['heating_capacity'] = abs(polynomial_mutation_operator(
generation_component['heating_capacity'], 0, max_demand))
if generation_component['nominal_cooling_efficiency'] is not None and cte.COOLING in energy_system.demand_types:
max_cooling_demand = max(design_period_energy_demands[cte.COOLING]['demands']) / cte.WATTS_HOUR_TO_JULES
generation_component['cooling_capacity'] = polynomial_mutation_operator(
generation_component['cooling_capacity'], 0, max_cooling_demand)
# Mutate Storage Components
for storage_component in individual['Energy Storage Components']:
if random.random() < self.mutation_rate:
if storage_component['type'] == f'{cte.THERMAL}_storage':
storage_operation_range = 10 if cte.DOMESTIC_HOT_WATER in energy_system.demand_types else 20
storage_energy_capacity = random.uniform(0, 6)
volume = (storage_energy_capacity * max(design_period_energy_demands[cte.HEATING]['demands'])) / (
cte.WATER_HEAT_CAPACITY * 1000 * storage_operation_range)
max_available_space = min(0.01 * building.volume / building.storeys_above_ground, volume)
storage_component['volume'] = abs(polynomial_mutation_operator(storage_component['volume'], 0,
max_available_space))
if storage_component['heating_coil_capacity'] is not None:
if cte.HEATING in energy_system.demand_types:
max_heating_demand = max(design_period_energy_demands[cte.HEATING]['demands']) / cte.WATTS_HOUR_TO_JULES
else:
max_heating_demand = max(
design_period_energy_demands[cte.DOMESTIC_HOT_WATER]['demands']) / cte.WATTS_HOUR_TO_JULES
storage_component['heating_coil_capacity'] = abs(polynomial_mutation_operator(
storage_component['heating_coil_capacity'], 0, max_heating_demand))
return individual
def fast_non_dominated_sorting(self):
"""
Performs fast non-dominated sorting on the current population to categorize individuals
into different Pareto fronts based on dominance. This method is part of the NSGA-II algorithm
and is used to identify non-dominated solutions at each front level.
Returns:
fronts (list of lists): A list of Pareto fronts, where each front is a list of indices
corresponding to individuals in the population. The first front
(fronts[0]) contains non-dominated individuals, the second front
contains solutions dominated by the first front, and so on.
Workflow:
1. Initializes an empty list `s` for each individual, representing individuals it dominates.
Also initializes `n`, a count of how many individuals dominate each individual, and
`rank` to store the Pareto front rank of each individual.
2. For each individual `p`, compares it with every other individual `q` to determine
dominance. If `p` dominates `q`, `q` is added to `s[p]`. If `q` dominates `p`,
`n[p]` is incremented.
3. Individuals with `n[p] == 0` are non-dominated and belong to the first front, so they
are assigned a rank of 0 and added to `fronts[0]`.
4. Iteratively builds additional fronts:
- For each front, checks each individual in the front, reducing the `n` count of
individuals it dominates (`s[p]`).
- If `n[q]` becomes zero, `q` is non-dominated relative to the current front and is
added to the next front.
- This continues until no more individuals can be added to new fronts.
5. After identifying all fronts, assigns a rank to each individual in each front.
"""
population = self.population
s = [[] for _ in range(len(population))]
n = [0] * len(population)
rank = [0] * len(population)
fronts = [[]]
for p in range(len(population)):
for q in range(len(population)):
if self.domination_status(population[p], population[q]):
s[p].append(q)
elif self.domination_status(population[q], population[p]):
n[p] += 1
if n[p] == 0:
rank[p] = 0
fronts[0].append(p)
i = 0
while fronts[i]:
next_front = set()
for p in fronts[i]:
for q in s[p]:
n[q] -= 1
if n[q] == 0:
rank[q] = i + 1
next_front.add(q)
i += 1
fronts.append(list(next_front))
del fronts[-1]
for (j, front) in enumerate(fronts):
for i in front:
self.population[i].rank = j + 1
return fronts
def calculate_crowding_distance(self, front, crowding_distance):
"""
Calculates the crowding distance for individuals within a given front in the population.
The crowding distance is a measure used in multi-objective optimization to maintain diversity
among non-dominated solutions by evaluating the density of solutions surrounding each individual.
Parameters:
front (list): A list of indices representing the individuals in the current front.
crowding_distance (list): A list of crowding distances for each individual in the population, updated in place.
Returns:
list: The updated crowding distances for each individual in the population.
Methodology:
1. For each objective (indexed by `j`), sorts the individuals in `front` based on their objective values.
2. Assigns a very large crowding distance (1e12) to boundary individuals in each objective (first and last individuals after sorting).
3. Retrieves the minimum and maximum values for the current objective within the sorted front.
4. For non-boundary individuals, calculates the crowding distance by:
- Finding the difference in objective values between the neighboring individuals in the sorted list.
- Normalizing this difference by dividing by the range of the objective values (objective_max - objective_min).
- Adding this value to the individual's crowding distance.
5. Updates each individual in `front` with their calculated crowding distance for later use in selection.
Example:
crowding_distances = [0] * len(self.population)
self.calculate_crowding_distance(front, crowding_distances)
Note:
The crowding distance helps prioritize individuals that are more isolated within the objective space,
thus promoting diversity within the Pareto front.
"""
for j in range(len(self.population[0].fitness_score)):
sorted_front = sorted(front, key=lambda x: self.population[x].individual['fitness_score'][j])
crowding_distance[sorted_front[0]] = float(1e12)
crowding_distance[sorted_front[-1]] = float(1e12)
objective_min = self.population[sorted_front[0]].individual['fitness_score'][j]
objective_max = self.population[sorted_front[-1]].individual['fitness_score'][j]
if objective_max != objective_min:
for i in range(1, len(sorted_front) - 1):
crowding_distance[sorted_front[i]] += (
(self.population[sorted_front[i + 1]].individual['fitness_score'][j] -
self.population[sorted_front[i - 1]].individual['fitness_score'][j]) /
(objective_max - objective_min))
for i in front:
self.population[i].crowding_distance = crowding_distance[i]
return crowding_distance
def nsga2_selection(self, fronts):
"""
Selects the next generation population using the NSGA-II selection strategy.
This method filters the individuals in `fronts` based on non-dominated sorting and crowding distance,
ensuring a diverse and high-quality set of solutions for the next generation.
Parameters:
fronts (list of lists): A list of fronts, where each front is a list of indices
representing non-dominated solutions in the population.
Returns:
list: The selected population of individuals for the next generation, limited to `self.population_size`.
Methodology:
1. Iterates through each front in `fronts` (sorted by rank) to add individuals to the new population.
2. For each front:
- Filters individuals with finite fitness scores to prevent propagation of invalid solutions.
- Adds these individuals to the new population until the population size limit is met.
3. If the new population size limit is not yet reached:
- Sorts the current front based on crowding distance in descending order.
- Adds individuals based on crowding distance until reaching the population limit.
4. Returns the selected population, ensuring that only the top solutions (by rank and crowding distance) are kept.
Notes:
- Non-dominated sorting prioritizes individuals in lower-ranked fronts.
- Crowding distance promotes diversity within the population by favoring isolated solutions.
Example:
selected_population = self.nsga2_selection(fronts)
Important:
This method assumes that each individual's fitness score contains at least two objectives,
and any infinite fitness values are excluded from selection.
"""
new_population = []
i = 0
while len(new_population) + len(fronts[i]) <= self.population_size:
for index in fronts[i]:
# Skip individuals with infinite fitness values to avoid propagation
if not math.isinf(self.population[index].individual['fitness_score'][0]) and \
not math.isinf(self.population[index].individual['fitness_score'][1]):
new_population.append(self.population[index])
i += 1
if i >= len(fronts):
break
if len(new_population) < self.population_size and i < len(fronts):
sorted_front = sorted(fronts[i], key=lambda x: self.population[x].crowding_distance, reverse=True)
for index in sorted_front:
if len(new_population) < self.population_size:
if not math.isinf(self.population[index].individual['fitness_score'][0]) and \
not math.isinf(self.population[index].individual['fitness_score'][1]):
new_population.append(self.population[index])
else:
break
return new_population
def solve_ga(self, building, energy_system):
self.initialize_population(building, energy_system)
solutions = {}
for n in range(self.generations + 1):
# self.crossover_rate = self.initial_crossover_rate + \
# (self.final_crossover_rate - self.initial_crossover_rate) * (n / self.generations)
solutions[f'generation_{n}'] = [individual.fitness_score for individual in self.population]
print(n)
progeny_population = []
while len(progeny_population) < self.population_size:
parent1, parent2 = random.choice(self.population), random.choice(self.population)
child1, child2 = self.sbx_crossover(parent1, parent2)
self.polynomial_mutation(child1.individual, building, energy_system)
self.polynomial_mutation(child2.individual, building, energy_system)
child1.score_evaluation()
child2.score_evaluation()
progeny_population.extend([child1, child2])
self.population.extend(progeny_population)
fronts = self.fast_non_dominated_sorting()
print(fronts)
crowding_distances = [0] * len(self.population)
for front in fronts:
self.calculate_crowding_distance(front=front, crowding_distance=crowding_distances)
new_population = self.nsga2_selection(fronts=fronts)
self.population = new_population
if n == self.generations:
fronts = self.fast_non_dominated_sorting()
self.pareto_front = [self.population[i] for i in fronts[0]]
pareto_fitness_scores = [individual.fitness_score for individual in self.pareto_front]
normalized_objectives = self.euclidean_normalization(pareto_fitness_scores)
performance_scores, selected_solution_indexes = self.topsis_decision_making(normalized_objectives)
self.selected_solutions = self.postprocess(self.pareto_front, selected_solution_indexes)
print(self.selected_solutions)
self.plot_pareto_front(self.pareto_front)
def plot_pareto_front(self, pareto_front):
"""
Plots the Pareto front for a given set of individuals in the population, using normalized fitness scores.
This method supports any number of objective functions, with dynamically labeled axes.
Parameters:
pareto_front (list): List of individuals that are part of the Pareto front.
Returns:
None: Displays a scatter plot with lines connecting the Pareto-optimal points.
Process:
- Normalizes the fitness scores using Euclidean normalization.
- Generates scatter plots for each pair of objectives.
- Connects the sorted Pareto-optimal points with a smooth, thick red line.
Attributes:
- Objectives on each axis are dynamically labeled as "Objective Function <index>_<name>", where the name is
derived from the optimization scenario.
- Adjusted marker and line thicknesses ensure enhanced visibility of the plot.
Steps:
1. Normalize the fitness scores of the Pareto front.
2. Create scatter plots for each pair of objectives.
3. Connect the sorted points for each objective pair with a red line.
4. Label each axis based on objective function index and scenario name.
Example:
self.plot_pareto_front(pareto_front)
Note:
- This method assumes that each individual in the Pareto front has a fitness score with at least two objectives.
"""
# Normalize Pareto scores
pareto_scores = [individual.fitness_score for individual in pareto_front]
normalized_pareto_scores = self.euclidean_normalization(pareto_scores)
normalized_pareto_scores = list(zip(*normalized_pareto_scores)) # Transpose for easy access by objective index
# Extract the number of objectives and the objective names
num_objectives = len(normalized_pareto_scores)
objectives = self.optimization_scenario.split('_')
# Set up subplots for each pair of objectives
fig, axs = plt.subplots(num_objectives - 1, num_objectives - 1, figsize=(15, 10))
for i in range(num_objectives - 1):
for j in range(i + 1, num_objectives):
# Sort points for smoother line connections
sorted_pairs = sorted(zip(normalized_pareto_scores[i], normalized_pareto_scores[j]))
sorted_x, sorted_y = zip(*sorted_pairs)
# Plot each objective pair
ax = axs[i, j - 1] if num_objectives > 2 else axs
ax.scatter(
normalized_pareto_scores[i],
normalized_pareto_scores[j],
color='blue',
alpha=0.7,
edgecolors='black',
s=100 # Larger point size for visibility
)
# Plot smoother, thicker line connecting Pareto points
ax.plot(
sorted_x,
sorted_y,
color='red',
linewidth=3 # Thicker line for better visibility
)
# Dynamic axis labels based on objective function names
ax.set_xlabel(f"Objective Function {i + 1}_{objectives[i]}")
ax.set_ylabel(f"Objective Function {j + 1}_{objectives[j]}")
ax.grid(True)
# Set title and layout adjustments
fig.suptitle("Pareto Front (Normalized Fitness Scores)")
plt.tight_layout()
plt.show()
@staticmethod
def design_period_identification(building):
"""
Identifies the design period for heating, cooling, and domestic hot water (DHW) demands based on the building's
daily energy requirements. This method focuses on selecting time windows with the highest daily demands for
accurate energy system sizing.
Parameters:
----------
building : Building
The building object containing hourly energy demand data for heating, cooling, and domestic hot water.
Returns:
-------
dict
A dictionary with keys for each demand type ('heating', 'cooling', 'domestic_hot_water'), where each entry
contains:
- 'demands': A list of hourly demand values within the identified design period.
- 'start_index': The start hour index for the identified period.
- 'end_index': The end hour index for the identified period.
Procedure:
----------
1. Calculates the total demand for each day for heating, cooling, and DHW.
2. Identifies the day with the highest demand for each type.
3. Selects a time window around each peak-demand day as the design period:
- For middle days, it selects the preceding and following days.
- For first and last days, it adjusts the window to ensure it stays within the range of available data.
Notes:
------
- This design period serves as a critical input for system sizing, ensuring the energy system meets the
maximum demand requirements efficiently.
- Each demand type has a separate design period tailored to its specific daily peak requirements.
{'heating': {'demands': [...], 'start_index': ..., 'end_index': ...},
'cooling': {'demands': [...], 'start_index': ..., 'end_index': ...},
'domestic_hot_water': {'demands': [...], 'start_index': ..., 'end_index': ...}
"""
def get_start_end_indices(max_day_index, total_days):
if 0 < max_day_index < total_days - 1:
start_index = (max_day_index - 5) * 24
end_index = (max_day_index + 5) * 24
elif max_day_index == 0:
start_index = 0
end_index = (max_day_index + 10) * 24
else:
start_index = (max_day_index - 10) * 24
end_index = total_days * 24
return start_index, end_index
# Calculate daily demands
heating_daily_demands = [sum(building.heating_demand[cte.HOUR][i:i + 24]) for i in
range(0, len(building.heating_demand[cte.HOUR]), 24)]
cooling_daily_demands = [sum(building.cooling_demand[cte.HOUR][i:i + 24]) for i in
range(0, len(building.cooling_demand[cte.HOUR]), 24)]
dhw_daily_demands = [sum(building.domestic_hot_water_heat_demand[cte.HOUR][i:i + 24]) for i in
range(0, len(building.domestic_hot_water_heat_demand[cte.HOUR]), 24)]
# Get the day with maximum demand for each type
heating_max_day = heating_daily_demands.index(max(heating_daily_demands))
cooling_max_day = cooling_daily_demands.index(max(cooling_daily_demands))
dhw_max_day = dhw_daily_demands.index(max(dhw_daily_demands))
# Get the start and end indices for each demand type
heating_start, heating_end = get_start_end_indices(heating_max_day, len(heating_daily_demands))
cooling_start, cooling_end = get_start_end_indices(cooling_max_day, len(cooling_daily_demands))
dhw_start, dhw_end = get_start_end_indices(dhw_max_day, len(dhw_daily_demands))
# Return the design period energy demands
return {
f'{cte.HEATING}': {'demands': building.heating_demand[cte.HOUR][heating_start:heating_end],
'start_index': heating_start, 'end_index': heating_end},
f'{cte.COOLING}': {'demands': building.cooling_demand[cte.HOUR][cooling_start:cooling_end],
'start_index': cooling_start, 'end_index': cooling_end},
f'{cte.DOMESTIC_HOT_WATER}': {'demands': building.domestic_hot_water_heat_demand[cte.HOUR][dhw_start:dhw_end],
'start_index': dhw_start, 'end_index': dhw_end}
}
@staticmethod
def domination_status(individual1, individual2):
"""
Determines if `individual1` dominates `individual2` based on their fitness scores (objective values).
Domination in this context means that `individual1` is no worse than `individual2` in all objectives
and strictly better in at least one.
Parameters:
individual1: The first individual (object) being compared.
individual2: The second individual (object) being compared.
Returns:
bool: True if `individual1` dominates `individual2`, False otherwise.
Methodology:
1. Retrieves the list of objective scores (fitness scores) for both individuals.
2. Ensures both individuals have the same number of objectives; otherwise, raises an AssertionError.
3. Initializes two flags:
- `better_in_all`: Checks if `individual1` is equal to or better than `individual2` in all objectives.
- `strictly_better_in_at_least_one`: Checks if `individual1` is strictly better than `individual2` in at
least one objective.
4. Iterates over each objective of both individuals:
- If `individual1` has a worse value in any objective, sets `better_in_all` to False.
- If `individual1` has a better value in any objective, sets `strictly_better_in_at_least_one` to True.
5. Concludes that `individual1` dominates `individual2` if `better_in_all` is True and
`strictly_better_in_at_least_one` is True.
"""
# Extract the list of objectives for both individuals
objectives1 = individual1.individual['fitness_score']
objectives2 = individual2.individual['fitness_score']
# Ensure both individuals have the same number of objectives
assert len(objectives1) == len(objectives2), "Both individuals must have the same number of objectives"
# Flags to check if one dominates the other
better_in_all = True # Whether individual1 is better in all objectives
strictly_better_in_at_least_one = False # Whether individual1 is strictly better in at least one objective
for obj1, obj2 in zip(objectives1, objectives2):
if obj1 > obj2:
better_in_all = False # individual1 is worse in at least one objective
if obj1 < obj2:
strictly_better_in_at_least_one = True # individual1 is better in at least one objective
result = True if better_in_all and strictly_better_in_at_least_one else False
return result
@staticmethod
def euclidean_normalization(fitness_scores):
"""
Normalizes the fitness scores for a set of individuals using Euclidean normalization.
Parameters:
fitness_scores (list of lists/tuples): A list where each element represents an individual's fitness scores
across multiple objectives, e.g., [(obj1, obj2, ...), ...].
Returns:
list of tuples: A list of tuples representing the normalized fitness scores for each individual. Each tuple
contains the normalized values for each objective, scaled between 0 and 1 relative to the
Euclidean length of each objective across the population.
Process:
- For each objective, calculates the sum of squares across all individuals.
- Normalizes each individual's objective score by dividing it by the square root of the sum of squares
for that objective.
- If the sum of squares for any objective is zero (to avoid division by zero), assigns a normalized score
of zero for that objective.
Example:
Given a fitness score input of [[3, 4], [1, 2]], this method will normalize each objective and return:
[(0.9487, 0.8944), (0.3162, 0.4472)].
Notes:
- This normalization is helpful for multi-objective optimization, as it scales each objective score
independently, ensuring comparability even with different objective ranges.
Steps:
1. Calculate the sum of squares for each objective across all individuals.
2. Divide each objective value by the square root of the corresponding sum of squares.
3. Return the normalized values as a list of tuples, preserving the individual-objective structure.
"""
num_objectives = len(fitness_scores[0])
# Calculate sum of squares for each objective
sum_squared_objectives = [
sum(fitness_score[i] ** 2 for fitness_score in fitness_scores)
for i in range(num_objectives)
]
# Normalize the objective values for each individual
normalized_objective_functions = [
tuple(
fitness_score[j] / math.sqrt(sum_squared_objectives[j]) if sum_squared_objectives[j] > 0 else 0
for j in range(num_objectives)
)
for fitness_score in fitness_scores
]
return normalized_objective_functions
@staticmethod
def topsis_decision_making(normalized_objective_functions):
"""
Applies the Technique for Order of Preference by Similarity to Ideal Solution (TOPSIS) method for decision-making
in multi-objective optimization. This method ranks solutions based on their proximity to the ideal best and
worst solutions.
Parameters:
normalized_objective_functions (list of tuples): A list where each element represents a set of normalized
objective values for an individual, e.g.,
[(obj1, obj2, ...), ...].
Returns:
list of float: A list of performance scores for each individual, where higher scores indicate closer
proximity to the ideal best solution.
Process:
- Identifies the ideal best and ideal worst values for each objective based on normalized objective scores.
- Computes the Euclidean distance of each individual from both the ideal best and ideal worst.
- Calculates a performance score for each individual by dividing the distance to the ideal worst by the
sum of the distances to the ideal best and worst.
Example:
Given normalized objectives [[0.8, 0.6], [0.3, 0.4]], the method will return performance scores
based on their proximity to ideal solutions.
Notes:
- TOPSIS helps to balance trade-offs in multi-objective optimization by quantifying how close each solution
is to the ideal, enabling effective ranking of solutions.
- This method is commonly used when both minimization and maximization objectives are present.
Steps:
1. Determine the ideal best (minimum) and ideal worst (maximum) for each objective.
2. Compute Euclidean distances from each solution to the ideal best and ideal worst.
3. Calculate the performance score for each solution based on these distances.
"""
# Find ideal best and ideal worst
selected_solutions = {}
ideal_best = []
ideal_worst = []
num_objectives = len(normalized_objective_functions[0])
for j in range(num_objectives):
objective_values = [ind[j] for ind in normalized_objective_functions]
ideal_best.append(min(objective_values))
ideal_worst.append(max(objective_values))
selected_solutions[f'best_objective_{j + 1}_index'] = objective_values.index(min(objective_values))
# Calculate Euclidean distances to ideal best and ideal worst
distances_to_best = []
distances_to_worst = []
for normalized_values in normalized_objective_functions:
# Distance to ideal best
distance_best = math.sqrt(
sum((normalized_values[j] - ideal_best[j]) ** 2 for j in range(num_objectives))
)
distances_to_best.append(distance_best)
# Distance to ideal worst
distance_worst = math.sqrt(
sum((normalized_values[j] - ideal_worst[j]) ** 2 for j in range(num_objectives))
)
distances_to_worst.append(distance_worst)
performance_scores = [distances_to_worst[i] / (distances_to_best[i] + distances_to_worst[i])
for i in range(len(normalized_objective_functions))]
selected_solutions['best_solution_index'] = performance_scores.index(max(performance_scores))
return performance_scores, selected_solutions
def postprocess(self, pareto_front, selected_solution_indexes):
selected_solutions = {}
for solution_type in selected_solution_indexes:
selected_solutions[solution_type] = {}
index = selected_solution_indexes[solution_type]
individual = pareto_front[index].individual
selected_solutions[solution_type]['Generation Components'] = []
selected_solutions[solution_type]['Storage Components'] = []
for generation_component in individual['Generation Components']:
generation_type = generation_component['type']
heating_capacity = generation_component['heating_capacity']
cooling_capacity = generation_component['cooling_capacity']
selected_solutions[solution_type]['Generation Components'].append({'component type': generation_type,
'heating capacity (W)': heating_capacity,
'cooling capacity (W)': cooling_capacity})
for storage_component in individual['Energy Storage Components']:
storage_type = storage_component['type']
capacity = storage_component['capacity']
volume = storage_component['volume']
heating_coil = storage_component['heating_coil_capacity']
selected_solutions[solution_type]['Storage Components'].append({'storage type': storage_type,
'capacity (W)': capacity,
'volume (m3)': volume,
'heating coil capacity (W)': heating_coil})
if 'energy-consumption' in self.optimization_scenario:
selected_solutions[solution_type]['total energy consumption kWh'] = individual['total_energy_consumption']
if 'cost' in self.optimization_scenario:
selected_solutions[solution_type]['life cycle cost'] = individual['lcc']
return selected_solutions

View File

@ -0,0 +1,342 @@
import copy
import math
import random
import hub.helpers.constants as cte
from energy_system_modelling_package.energy_system_modelling_factories.system_sizing_methods.genetic_algorithm.individual import \
Individual
class SingleObjectiveGeneticAlgorithm:
def __init__(self, population_size=100, generations=20, crossover_rate=0.8, mutation_rate=0.1,
optimization_scenario=None, output_path=None):
self.population_size = population_size
self.population = []
self.generations = generations
self.crossover_rate = crossover_rate
self.mutation_rate = mutation_rate
self.optimization_scenario = optimization_scenario
self.list_of_solutions = []
self.best_solution = None
self.best_solution_generation = None
self.output_path = output_path
# Initialize Population
def initialize_population(self, building, energy_system):
"""
Initialize a population of individuals with feasible configurations for optimizing the sizes of
generation and storage components of an energy system.
:param building: Building object with associated data
:param energy_system: Energy system to optimize
"""
design_period_energy_demands = self.design_period_identification(building)
attempts = 0 # Track attempts to avoid an infinite loop in rare cases
max_attempts = self.population_size * 5
while len(self.population) < self.population_size and attempts < max_attempts:
individual = Individual(building=building,
energy_system=energy_system,
design_period_energy_demands=design_period_energy_demands,
optimization_scenario=self.optimization_scenario)
individual.initialization()
attempts += 1
# Enhanced feasibility check
if self.initial_population_feasibility_check(individual, energy_system.demand_types, design_period_energy_demands):
self.population.append(individual)
# Raise an error or print a warning if the population size goal is not met after max_attempts
if len(self.population) < self.population_size:
raise RuntimeError(f"Could not generate a feasible population of size {self.population_size}. "
f"Only {len(self.population)} feasible individuals were generated.")
@staticmethod
def initial_population_feasibility_check(individual, demand_types, design_period_demands):
"""
Check if the individual meets basic feasibility requirements for heating, cooling, and DHW capacities
and storage volume.
:param individual: Individual to check
:param demand_types: List of demand types (e.g., heating, cooling, DHW)
:param design_period_demands: Design period demand values for heating, cooling, and DHW
:return: True if feasible, False otherwise
"""
# Calculate total heating and cooling capacities
total_heating_capacity = sum(
component['heating_capacity'] for component in individual.individual['Generation Components']
if component['heating_capacity'] is not None
)
total_cooling_capacity = sum(
component['cooling_capacity'] for component in individual.individual['Generation Components']
if component['cooling_capacity'] is not None
)
# Maximum demands for each demand type (converted to kW)
max_heating_demand = max(design_period_demands[cte.HEATING]['demands']) / cte.WATTS_HOUR_TO_JULES
max_cooling_demand = max(design_period_demands[cte.COOLING]['demands']) / cte.WATTS_HOUR_TO_JULES
max_dhw_demand = max(design_period_demands[cte.DOMESTIC_HOT_WATER]['demands']) / cte.WATTS_HOUR_TO_JULES
# Check heating capacity feasibility
if cte.HEATING in demand_types and total_heating_capacity < 0.5 * max_heating_demand:
return False
# Check DHW capacity feasibility
if cte.DOMESTIC_HOT_WATER in demand_types and total_heating_capacity < 0.5 * max_dhw_demand:
return False
# Check cooling capacity feasibility
if cte.COOLING in demand_types and total_cooling_capacity < 0.5 * max_cooling_demand:
return False
# Check storage volume feasibility
total_volume = sum(
component['volume'] for component in individual.individual['Energy Storage Components']
if component['volume'] is not None
)
# Limit storage to 10% of building's available space
max_storage_volume = individual.available_space * 0.1
if total_volume > max_storage_volume:
return False
return True # Feasible if all checks are passed
def order_population(self):
"""
ordering the population based on the fitness score in ascending order
:return:
"""
self.population = sorted(self.population, key=lambda x: x.fitness_score)
def tournament_selection(self):
selected = []
for _ in range(len(self.population)):
i, j = random.sample(range(self.population_size), 2)
if self.population[i].individual['fitness_score'] < self.population[j].individual['fitness_score']:
selected.append(copy.deepcopy(self.population[i]))
else:
selected.append(copy.deepcopy(self.population[j]))
return selected
def crossover(self, parent1, parent2):
"""
Crossover between two parents to produce two children.
swaps generation components and storage components between the two parents with a 50% chance.
:param parent1: First parent individual.
:param parent2: second parent individual.
:return: Two child individuals (child1 and child2).
"""
if random.random() < self.crossover_rate:
# Deep copy of the parents to create children
child1, child2 = copy.deepcopy(parent1), copy.deepcopy(parent2)
# Crossover for Generation Components
for i in range(len(parent1.individual['Generation Components'])):
if random.random() < 0.5:
# swap the entire generation component
child1.individual['Generation Components'][i], child2.individual['Generation Components'][i] = (
child2.individual['Generation Components'][i],
child1.individual['Generation Components'][i]
)
# Crossover for Energy storage Components
for i in range(len(parent1.individual['Energy Storage Components'])):
if random.random() < 0.5:
# swap the entire storage component
child1.individual['Energy Storage Components'][i], child2.individual['Energy Storage Components'][i] = (
child2.individual['Energy Storage Components'][i],
child1.individual['Energy Storage Components'][i]
)
return child1, child2
else:
# If crossover does not happen, return copies of the original parents
return copy.deepcopy(parent1), copy.deepcopy(parent2)
def mutate(self, individual, building, energy_system):
"""
Mutates the individual's generation and storage components.
- `individual`: The individual to mutate (contains generation and storage components).
- `building`: Building data that contains constraints such as peak heating load and available space.
Returns the mutated individual.
"""
design_period_energy_demands = self.design_period_identification(building)
# Mutate Generation Components
for generation_component in individual['Generation Components']:
if random.random() < self.mutation_rate:
if (generation_component['nominal_heating_efficiency'] is not None and cte.HEATING or cte.DOMESTIC_HOT_WATER in
energy_system.demand_types):
# Mutate heating capacity
if cte.HEATING in energy_system.demand_types:
generation_component['heating_capacity'] = random.uniform(
0, max(design_period_energy_demands[cte.HEATING]['demands']) / cte.WATTS_HOUR_TO_JULES)
else:
generation_component['heating_capacity'] = random.uniform(
0, max(design_period_energy_demands[cte.DOMESTIC_HOT_WATER]['demands']) / cte.WATTS_HOUR_TO_JULES)
if generation_component['nominal_cooling_efficiency'] is not None and cte.COOLING in energy_system.demand_types:
# Mutate cooling capacity
generation_component['cooling_capacity'] = random.uniform(
0, max(design_period_energy_demands[cte.COOLING]['demands']) / cte.WATTS_HOUR_TO_JULES)
# Mutate storage Components
for storage_component in individual['Energy Storage Components']:
if random.random() < self.mutation_rate:
if storage_component['type'] == f'{cte.THERMAL}_storage':
# Mutate the volume of thermal storage
max_available_space = 0.01 * building.volume / building.storeys_above_ground
storage_component['volume'] = random.uniform(0, max_available_space)
if storage_component['heating_coil_capacity'] is not None:
if cte.HEATING in energy_system.demand_types:
storage_component['heating_coil_capacity'] = random.uniform(0, max(
design_period_energy_demands[cte.HEATING]['demands']) / cte.WATTS_HOUR_TO_JULES)
else:
storage_component['heating_coil_capacity'] = random.uniform(0, max(
design_period_energy_demands[cte.DOMESTIC_HOT_WATER]['demands']) / cte.WATTS_HOUR_TO_JULES)
return individual
def solve_ga(self, building, energy_system):
"""
solving GA for a single energy system. Here are the steps:
1. Initialize population using the "initialize_population" method in this class.
2. Evaluate the initial population using the "score_evaluation" method in the Individual class.
3. sort population based on fitness score.
4. Repeat selection, crossover, and mutation for a fixed number of generations.
5. Track the best solution found during the optimization process.
:param building: Building object for the energy system.
:param energy_system: Energy system to optimize.
:return: Best solution after running the GA.
"""
# step 1: Initialize the population
self.initialize_population(building, energy_system)
# step 2: Evaluate the initial population
for individual in self.population:
individual.score_evaluation()
# step 3: Order population based on fitness scores
self.order_population()
print([individual.fitness_score for individual in self.population])
# Track the best solution
self.best_solution = self.population[0]
self.best_solution_generation = 0
self.list_of_solutions.append(copy.deepcopy(self.best_solution.individual))
# step 4: Run GA for a fixed number of generations
for generation in range(1, self.generations):
print(f"Generation {generation}")
# selection (using tournament selection)
selected_population = self.tournament_selection()
# Create the next generation through crossover and mutation
next_population = []
for i in range(0, self.population_size, 2):
parent1 = selected_population[i]
parent2 = selected_population[i + 1] if (i + 1) < len(selected_population) else selected_population[0]
# step 5: Apply crossover
child1, child2 = self.crossover(parent1, parent2)
# step 6: Apply mutation
self.mutate(child1.individual, building, energy_system)
self.mutate(child2.individual, building, energy_system)
# step 7: Evaluate the children
child1.score_evaluation()
child2.score_evaluation()
next_population.extend([child1, child2])
# Replace old population with the new one
self.population = next_population
# step 8: sort the new population based on fitness
self.order_population()
print([individual.fitness_score for individual in self.population])
# Track the best solution found in this generation
if self.population[0].individual['fitness_score'] < self.best_solution.individual['fitness_score']:
self.best_solution = self.population[0]
self.best_solution_generation = generation
# store the best solution in the list of solutions
self.list_of_solutions.append(copy.deepcopy(self.population[0].individual))
print(f"Best solution found in generation {self.best_solution_generation}")
print(f"Best solution: {self.best_solution.individual}")
return self.best_solution
@staticmethod
def topsis_decision_making(pareto_front):
"""
Perform TOPSIS decision-making to choose the best solution from the Pareto front.
:param pareto_front: List of individuals in the Pareto front
:return: The best individual based on TOPSIS ranking
"""
# Step 1: Normalize the objective functions (cost and energy consumption)
min_lcc = min([ind.individual['lcc'] for ind in pareto_front])
max_lcc = max([ind.individual['lcc'] for ind in pareto_front])
min_lce = min([ind.individual['total_energy_consumption'] for ind in pareto_front])
max_lce = max([ind.individual['total_energy_consumption'] for ind in pareto_front])
normalized_pareto_front = []
for ind in pareto_front:
normalized_lcc = (ind.individual['lcc'] - min_lcc) / (max_lcc - min_lcc) if max_lcc > min_lcc else 0
normalized_lce = (ind.individual['total_energy_consumption'] - min_lce) / (
max_lce - min_lce) if max_lce > min_lce else 0
normalized_pareto_front.append((ind, normalized_lcc, normalized_lce))
# Step 2: Calculate the ideal and worst solutions
ideal_solution = (0, 0) # Ideal is minimum LCC and minimum LCE (0, 0 after normalization)
worst_solution = (1, 1) # Worst is maximum LCC and maximum LCE (1, 1 after normalization)
# Step 3: Calculate the distance to the ideal and worst solutions
best_distances = []
worst_distances = []
for ind, normalized_lcc, normalized_lce in normalized_pareto_front:
distance_to_ideal = math.sqrt(
(normalized_lcc - ideal_solution[0]) ** 2 + (normalized_lce - ideal_solution[1]) ** 2)
distance_to_worst = math.sqrt(
(normalized_lcc - worst_solution[0]) ** 2 + (normalized_lce - worst_solution[1]) ** 2)
best_distances.append(distance_to_ideal)
worst_distances.append(distance_to_worst)
# Step 4: Calculate relative closeness to the ideal solution
similarity = [worst / (best + worst) for best, worst in zip(best_distances, worst_distances)]
# Step 5: Select the individual with the highest similarity score
best_index = similarity.index(max(similarity))
best_solution = pareto_front[best_index]
return best_solution
@staticmethod
def design_period_identification(building):
def get_start_end_indices(max_day_index, total_days):
if max_day_index > 0 and max_day_index < total_days - 1:
start_index = (max_day_index - 1) * 24
end_index = (max_day_index + 2) * 24
elif max_day_index == 0:
start_index = 0
end_index = (max_day_index + 2) * 24
else:
start_index = (max_day_index - 1) * 24
end_index = total_days * 24
return start_index, end_index
# Calculate daily demands
heating_daily_demands = [sum(building.heating_demand[cte.HOUR][i:i + 24]) for i in
range(0, len(building.heating_demand[cte.HOUR]), 24)]
cooling_daily_demands = [sum(building.cooling_demand[cte.HOUR][i:i + 24]) for i in
range(0, len(building.cooling_demand[cte.HOUR]), 24)]
dhw_daily_demands = [sum(building.domestic_hot_water_heat_demand[cte.HOUR][i:i + 24]) for i in
range(0, len(building.domestic_hot_water_heat_demand[cte.HOUR]), 24)]
# Get the day with maximum demand for each type
heating_max_day = heating_daily_demands.index(max(heating_daily_demands))
cooling_max_day = cooling_daily_demands.index(max(cooling_daily_demands))
dhw_max_day = dhw_daily_demands.index(max(dhw_daily_demands))
# Get the start and end indices for each demand type
heating_start, heating_end = get_start_end_indices(heating_max_day, len(heating_daily_demands))
cooling_start, cooling_end = get_start_end_indices(cooling_max_day, len(cooling_daily_demands))
dhw_start, dhw_end = get_start_end_indices(dhw_max_day, len(dhw_daily_demands))
# Return the design period energy demands
return {
f'{cte.HEATING}': {'demands': building.heating_demand[cte.HOUR][heating_start:heating_end],
'start_index': heating_start, 'end_index': heating_end},
f'{cte.COOLING}': {'demands': building.cooling_demand[cte.HOUR][cooling_start:cooling_end],
'start_index': cooling_start, 'end_index': cooling_end},
f'{cte.DOMESTIC_HOT_WATER}': {'demands': building.domestic_hot_water_heat_demand[cte.HOUR][dhw_start:dhw_end],
'start_index': dhw_start, 'end_index': dhw_end}
}

View File

@ -1,6 +0,0 @@
class HeuristicSizing:
def __init__(self, city):
pass
def enrich_buildings(self):
pass

View File

@ -0,0 +1,42 @@
import hub.helpers.constants as cte
from energy_system_modelling_package.energy_system_modelling_factories.system_sizing_methods.genetic_algorithm.single_objective_genetic_algorithm import \
SingleObjectiveGeneticAlgorithm
class OptimalSizing:
def __init__(self, city, optimization_scenario):
self.city = city
self.optimization_scenario = optimization_scenario
def enrich_buildings(self):
for building in self.city.buildings:
for energy_system in building.energy_systems:
if len(energy_system.generation_systems) == 1 and energy_system.generation_systems[0].energy_storage_systems is None:
if energy_system.generation_systems[0].system_type == cte.PHOTOVOLTAIC:
pass
else:
if cte.HEATING in energy_system.demand_types:
if cte.DOMESTIC_HOT_WATER in energy_system.demand_types:
design_load = max([building.heating_demand[cte.HOUR][i] +
building.domestic_hot_water_heat_demand[cte.HOUR][i] for i in
range(len(building.heating_demand))]) / cte.WATTS_HOUR_TO_JULES
else:
design_load = building.heating_peak_load[cte.YEAR][0]
energy_system.generation_systems[0].nominal_heat_output = design_load
elif cte.COOLING in energy_system.demand_types:
energy_system.generation_systems[0].nominal_cooling_output = building.cooling_peak_load[cte.YEAR][0]
else:
optimized_system = SingleObjectiveGeneticAlgorithm(optimization_scenario=self.optimization_scenario).solve_ga(building, energy_system)
for generation_system in energy_system.generation_systems:
system_type = generation_system.system_type
for generation_component in optimized_system.individual['Generation Components']:
if generation_component['type'] == system_type:
generation_system.nominal_heat_output = generation_component['heating_capacity']
generation_system.nominal_cooling_output = generation_component['cooling_capacity']
if generation_system.energy_storage_systems is not None:
for storage_system in generation_system.energy_storage_systems:
storage_type = f'{storage_system.type_energy_stored}_storage'
for storage_component in optimized_system.individual['Energy Storage Components']:
if storage_component['type'] == storage_type:
storage_system.nominal_capacity = storage_component['capacity']
storage_system.volume = storage_component['volume']

View File

@ -49,7 +49,7 @@ class PeakLoadSizing:
if len(energy_system.generation_systems) == 1:
# If there's only one generation system, it gets the full design load.
if demand_type == cte.HEATING or demand_type == cte.DOMESTIC_HOT_WATER:
energy_system.generation_systems[0].nominal_heat_output = design_load
energy_system.generation_systems[0].nominal_heat_output = 0.7 * design_load
elif demand_type == cte.COOLING:
energy_system.generation_systems[0].nominal_cooling_output = design_load
else:
@ -78,7 +78,7 @@ class PeakLoadSizing:
generation_system.nominal_cooling_output = design_load
else:
if demand_type == cte.HEATING or demand_type == cte.DOMESTIC_HOT_WATER:
generation_system.nominal_heat_output = round(((1 - default_primary_unit_percentage) * design_load /
generation_system.nominal_heat_output = round((default_primary_unit_percentage * design_load /
(len(energy_system.generation_systems) - 1)))
elif demand_type == cte.COOLING and cooling_equipments_number > 1:
generation_system.nominal_cooling_output = round(((1 - default_primary_unit_percentage) * design_load /

View File

@ -22,7 +22,7 @@ class EnergySystemRetrofitReport:
self.retrofit_scenario = retrofit_scenario
self.report = LatexReport('energy_system_retrofit_report',
'Energy System Retrofit Report', self.retrofit_scenario, output_path)
self.system_schemas_path = (Path(__file__).parent.parent / 'hub' / 'data' / 'energy_systems' / 'schemas')
self.system_schemas_path = (Path(__file__).parent.parent.parent / 'hub' / 'data' / 'energy_systems' / 'schemas')
self.charts_path = Path(output_path) / 'charts'
self.charts_path.mkdir(parents=True, exist_ok=True)
files = glob.glob(f'{self.charts_path}/*')
@ -233,7 +233,7 @@ class EnergySystemRetrofitReport:
for archetype, buildings in energy_system_archetypes.items():
buildings_str = ", ".join(buildings)
text += f"Figure 4 shows the schematic of the proposed energy system for buildings {buildings_str}.\n"
if archetype in ['PV+4Pipe+DHW', 'PV+ASHP+GasBoiler+TES']:
if archetype in ['PV+4Pipe+DHW', 'PV+ASHP+GasBoiler+TES', 'Central 4 Pipes Air to Water Heat Pump and Gas Boiler with Independent Water Heating and PV']:
text += "This energy system archetype is formed of the following systems: \par"
items = ['Rooftop Photovoltaic System: The rooftop PV system is tied to the grid and in case there is surplus '
'energy, it is sold to Hydro-Quebec through their Net-Meterin program.',
@ -257,8 +257,7 @@ class EnergySystemRetrofitReport:
for i in range(len(months)):
tilted_radiation = 0
for building in self.city.buildings:
tilted_radiation += (building.roofs[0].global_irradiance_tilted[cte.MONTH][i] /
(cte.WATTS_HOUR_TO_JULES * 1000))
tilted_radiation += (building.roofs[0].global_irradiance_tilted[cte.MONTH][i] / 1000)
monthly_roof_radiation.append(tilted_radiation)
def plot_bar_chart(ax, months, values, color, ylabel, title):
@ -565,7 +564,7 @@ class EnergySystemRetrofitReport:
placement='H')
self.report.add_section(f'{self.retrofit_scenario}')
self.report.add_subsection('Proposed Systems')
# self.energy_system_archetype_schematic()
self.energy_system_archetype_schematic()
if 'PV' in self.retrofit_scenario:
self.report.add_subsection('Rooftop Photovoltaic System Implementation')
self.pv_system()

View File

@ -29,18 +29,20 @@ residential_systems_percentage = {'system 1 gas': 15,
'system 8 electricity': 35}
residential_new_systems_percentage = {
'Central 4 Pipes Air to Water Heat Pump and Gas Boiler with Independent Water Heating and PV': 100,
'Central 4 Pipes Air to Water Heat Pump and electrical Boiler with Independent Water Heating and PV': 0,
'Central 4 Pipes Ground to Water Heat Pump and Gas Boiler with Independent Water Heating and PV': 0,
'Central 4 Pipes Ground to Water Heat Pump and electrical Boiler with Independent Water Heating and PV': 0,
'Central 4 Pipes Water to Water Heat Pump and Gas Boiler with Independent Water Heating and PV': 0,
'Central 4 Pipes Water to Water Heat Pump and electrical Boiler with Independent Water Heating and PV': 0,
'Central 4 Pipes Air to Water Heat Pump and Gas Boiler with Independent Water Heating': 0,
'Central 4 Pipes Air to Water Heat Pump and electrical Boiler with Independent Water Heating': 0,
'Central 4 Pipes Ground to Water Heat Pump and Gas Boiler with Independent Water Heating': 0,
'Central 4 Pipes Ground to Water Heat Pump and electrical Boiler with Independent Water Heating': 0,
'Central 4 Pipes Water to Water Heat Pump and Gas Boiler with Independent Water Heating': 0,
'Central 4 Pipes Water to Water Heat Pump and electrical Boiler with Independent Water Heating': 0,
'Central Hydronic Air and Gas Source Heating System with Unitary Split Cooling and Air Source HP DHW and PV': 100,
'Central Hydronic Air and Electricity Source Heating System with Unitary Split Cooling and Air Source HP DHW and PV': 0,
'Central Hydronic Ground and Gas Source Heating System with Unitary Split Cooling and Air Source HP DHW and PV': 0,
'Central Hydronic Ground and Electricity Source Heating System with Unitary Split Cooling and Air Source HP DHW '
'and PV': 0,
'Central Hydronic Water and Gas Source Heating System with Unitary Split Cooling and Air Source HP DHW and PV': 0,
'Central Hydronic Water and Electricity Source Heating System with Unitary Split Cooling and Air Source HP DHW '
'and PV': 0,
'Central Hydronic Air and Gas Source Heating System with Unitary Split and Air Source HP DHW': 0,
'Central Hydronic Air and Electricity Source Heating System with Unitary Split and Air Source HP DHW': 0,
'Central Hydronic Ground and Gas Source Heating System with Unitary Split and Air Source HP DHW': 0,
'Central Hydronic Ground and Electricity Source Heating System with Unitary Split and Air Source HP DHW': 0,
'Central Hydronic Water and Gas Source Heating System with Unitary Split and Air Source HP DHW': 0,
'Central Hydronic Water and Electricity Source Heating System with Unitary Split and Air Source HP DHW': 0,
'Rooftop PV System': 0
}

View File

@ -49,7 +49,7 @@ ExportsFactory('sra', city, sra_output_path).export()
sra_path = (sra_output_path / f'{city.name}_sra.xml').resolve()
subprocess.run(['sra', str(sra_path)])
ResultFactory('sra', city, sra_output_path).enrich()
pv_feasibility(-73.5681295982132, 45.49218262677643, 0.0001, selected_buildings=city.buildings)
# pv_feasibility(-73.5681295982132, 45.49218262677643, 0.0001, selected_buildings=city.buildings)
energy_plus_workflow(city, energy_plus_output_path)
random_assignation.call_random(city.buildings, random_assignation.residential_systems_percentage)
EnergySystemsFactory('montreal_custom', city).enrich()

View File

@ -14,9 +14,10 @@ class EnergyStorageSystem(ABC):
Energy Storage System Abstract Class
"""
def __init__(self, storage_id, model_name=None, manufacturer=None,
def __init__(self, storage_id=None, name=None, model_name=None, manufacturer=None,
nominal_capacity=None, losses_ratio=None):
self._storage_id = storage_id
self._name = name
self._model_name = model_name
self._manufacturer = manufacturer
self._nominal_capacity = nominal_capacity
@ -30,6 +31,14 @@ class EnergyStorageSystem(ABC):
"""
return self._storage_id
@property
def name(self):
"""
Get storage name
:return: string
"""
return self._name
@property
def type_energy_stored(self):
"""

View File

@ -15,11 +15,11 @@ class ThermalStorageSystem(EnergyStorageSystem):
Energy Storage System Class
"""
def __init__(self, storage_id, type_energy_stored=None, model_name=None, manufacturer=None, storage_type=None,
def __init__(self, storage_id=None, name=None, type_energy_stored=None, model_name=None, manufacturer=None, storage_type=None,
nominal_capacity=None, losses_ratio=None, volume=None, height=None, layers=None,
maximum_operating_temperature=None, storage_medium=None, heating_coil_capacity=None):
super().__init__(storage_id, model_name, manufacturer, nominal_capacity, losses_ratio)
super().__init__(storage_id, name, model_name, manufacturer, nominal_capacity, losses_ratio)
self._type_energy_stored = type_energy_stored
self._storage_type = storage_type
self._volume = volume
@ -109,6 +109,7 @@ class ThermalStorageSystem(EnergyStorageSystem):
'Storage component':
{
'storage id': self.id,
'name': self.name,
'type of energy stored': self.type_energy_stored,
'model name': self.model_name,
'manufacturer': self.manufacturer,

View File

@ -278,6 +278,7 @@ class MontrealFutureSystemCatalogue(Catalog):
template_storages = self._archetypes['EnergySystemCatalog']['energy_storage_components']['templateStorages']
for tes in thermal_storages:
storage_id = tes['storage_id']
storage_name = tes['name']
type_energy_stored = tes['type_energy_stored']
model_name = tes['model_name']
manufacturer = tes['manufacturer']
@ -302,6 +303,7 @@ class MontrealFutureSystemCatalogue(Catalog):
losses_ratio = tes['losses_ratio']
heating_coil_capacity = tes['heating_coil_capacity']
storage_component = ThermalStorageSystem(storage_id=storage_id,
name=storage_name,
model_name=model_name,
type_energy_stored=type_energy_stored,
manufacturer=manufacturer,
@ -318,6 +320,7 @@ class MontrealFutureSystemCatalogue(Catalog):
for template in template_storages:
storage_id = template['storage_id']
storage_name = template['name']
storage_type = template['storage_type']
type_energy_stored = template['type_energy_stored']
maximum_operating_temperature = template['maximum_operating_temperature']
@ -342,6 +345,7 @@ class MontrealFutureSystemCatalogue(Catalog):
volume = template['physical_characteristics']['volume']
heating_coil_capacity = template['heating_coil_capacity']
storage_component = ThermalStorageSystem(storage_id=storage_id,
name=storage_name,
model_name=model_name,
type_energy_stored=type_energy_stored,
manufacturer=manufacturer,

Some files were not shown because too many files have changed in this diff Show More