From 2243e22866831775059e43602e424fb4e6f55a9e Mon Sep 17 00:00:00 2001 From: s_ranjbar Date: Fri, 8 Nov 2024 13:08:23 +0100 Subject: [PATCH] fix: NSGA-II is now working properly for heating systems. --- .../heat_pump_boiler_tes_heating.py | 7 +- .../genetic_algorithm/individual.py | 10 +- .../multi_objective_genetic_algorithm.py | 1003 ++++++++++++----- ..._objective_genetic_algorithm_rethinking.py | 336 ------ test.py | 4 +- 5 files changed, 713 insertions(+), 647 deletions(-) delete mode 100644 energy_system_modelling_package/energy_system_modelling_factories/system_sizing_methods/genetic_algorithm/multi_objective_genetic_algorithm_rethinking.py diff --git a/energy_system_modelling_package/energy_system_modelling_factories/hvac_dhw_systems_simulation_models/heat_pump_boiler_tes_heating.py b/energy_system_modelling_package/energy_system_modelling_factories/hvac_dhw_systems_simulation_models/heat_pump_boiler_tes_heating.py index affaa871..71de5e5e 100644 --- a/energy_system_modelling_package/energy_system_modelling_factories/hvac_dhw_systems_simulation_models/heat_pump_boiler_tes_heating.py +++ b/energy_system_modelling_package/energy_system_modelling_factories/hvac_dhw_systems_simulation_models/heat_pump_boiler_tes_heating.py @@ -7,8 +7,8 @@ from energy_system_modelling_package.energy_system_modelling_factories.hvac_dhw_ class HeatPumpBoilerTesHeating: - def __init__(self, hp, boiler, tes, hourly_heating_demand_joules, heating_peak_load_watts, upper_limit_tes, - outdoor_temperature, dt=None): + def __init__(self, hp=None, boiler=None, tes=None, hourly_heating_demand_joules=None, heating_peak_load_watts=None, + upper_limit_tes=None, outdoor_temperature=None, dt=None): self.hp = hp self.boiler = boiler self.tes = tes @@ -25,6 +25,9 @@ class HeatPumpBoilerTesHeating: def simulation(self): hp, boiler, tes = self.hp, self.boiler, self.tes + if hp.nominal_heat_output < 0 or boiler.nominal_heat_output < 0: + raise ValueError("Heat output values must be non-negative. Check the nominal_heat_output for hp and boiler.") + heating_coil_nominal_output = 0 if tes.heating_coil_capacity is not None: heating_coil_nominal_output = float(tes.heating_coil_capacity) diff --git a/energy_system_modelling_package/energy_system_modelling_factories/system_sizing_methods/genetic_algorithm/individual.py b/energy_system_modelling_package/energy_system_modelling_factories/system_sizing_methods/genetic_algorithm/individual.py index 8232bac8..8d54a823 100644 --- a/energy_system_modelling_package/energy_system_modelling_factories/system_sizing_methods/genetic_algorithm/individual.py +++ b/energy_system_modelling_package/energy_system_modelling_factories/system_sizing_methods/genetic_algorithm/individual.py @@ -195,7 +195,7 @@ class Individual: operation_cost_year_0=operation_cost_year_0, maintenance_cost_year_0=maintenance_cost_year_0) self.individual['lcc'] = lcc - if 'energy_consumption' in self.optimization_scenario: + if 'energy-consumption' in self.optimization_scenario: total_energy_consumption = 0 for generation_system in self.individual['Generation Components']: total_energy_consumption += generation_system['total_energy_consumption(kWh)'] @@ -207,18 +207,20 @@ class Individual: if self.optimization_scenario == 'cost': self.fitness_score = lcc self.individual['fitness_score'] = lcc - elif self.optimization_scenario == 'energy_consumption': + elif self.optimization_scenario == 'energy-consumption': self.fitness_score = total_energy_consumption self.individual['fitness_score'] = total_energy_consumption - elif self.optimization_scenario == 'cost_energy_consumption' or self.optimization_scenario == 'energy_consumption_cost': + elif self.optimization_scenario == 'cost_energy-consumption': self.fitness_score = (lcc, total_energy_consumption) + elif self.optimization_scenario == 'energy-consumption_cost': + self.fitness_score = (total_energy_consumption, lcc) self.individual['fitness_score'] = (lcc, total_energy_consumption) else: lcc = float('inf') total_energy_consumption = float('inf') self.individual['lcc'] = lcc self.individual['total_energy_consumption'] = total_energy_consumption - if self.optimization_scenario == 'cost_energy_consumption' or self.optimization_scenario == 'energy_consumption_cost': + if self.optimization_scenario == 'cost_energy-consumption' or self.optimization_scenario == 'energy-consumption_cost': self.individual['fitness_score'] = (float('inf'), float('inf')) self.fitness_score = (float('inf'), float('inf')) else: diff --git a/energy_system_modelling_package/energy_system_modelling_factories/system_sizing_methods/genetic_algorithm/multi_objective_genetic_algorithm.py b/energy_system_modelling_package/energy_system_modelling_factories/system_sizing_methods/genetic_algorithm/multi_objective_genetic_algorithm.py index a063cb48..e532cce4 100644 --- a/energy_system_modelling_package/energy_system_modelling_factories/system_sizing_methods/genetic_algorithm/multi_objective_genetic_algorithm.py +++ b/energy_system_modelling_package/energy_system_modelling_factories/system_sizing_methods/genetic_algorithm/multi_objective_genetic_algorithm.py @@ -8,67 +8,416 @@ import matplotlib.pyplot as plt class MultiObjectiveGeneticAlgorithm: - def __init__(self, population_size=100, generations=50, crossover_rate=0.8, mutation_rate=0.1, - optimization_scenario=None, output_path=None): + """ + A class to perform NSGA-II optimization. This class is specifically designed to optimize + multiple conflicting objectives, facilitating evolutionary-based search to determine the optimal sizes of all + components in an energy system. + + Attributes: + ---------- + population_size : int + The total number of individuals in the population. + generations : int + The number of generations to evolve the population. + crossover_rate : float + Probability of crossover between pairs of individuals. + mutation_rate : float + Probability of mutation in individual genes. + number_of_selected_solutions : int, optional + Number of optimal solutions to select from the final population. + optimization_scenario : str + The specific optimization scenario defining the objectives to be optimized. + pareto_front : list, optional + Stores the Pareto front solutions after optimization, representing non-dominated solutions. + + Methods: + -------- + __init__(self, population_size=40, generations=50, crossover_rate=0.9, mutation_rate=0.33, + number_of_selected_solutions=None, optimization_scenario=None) + Initializes the MultiObjectiveGeneticAlgorithm with population size, generation count, and evolutionary + operators such as crossover and mutation rates. + """ + + def __init__(self, population_size=50, generations=50, crossover_rate=0.9, mutation_rate=0.33, + number_of_selected_solutions=None, optimization_scenario=None): self.population_size = population_size self.population = [] self.generations = generations self.crossover_rate = crossover_rate self.mutation_rate = mutation_rate self.optimization_scenario = optimization_scenario - self.list_of_solutions = [] - self.best_solution = None - self.best_solution_generation = None - self.output_path = output_path + self.number_of_selected_solutions = number_of_selected_solutions + self.selected_solutions = None + self.pareto_front = None -# Initialize Population + # Initialize Population def initialize_population(self, building, energy_system): + """ + Initializes the population for the genetic algorithm with feasible individuals based on building constraints + and energy system characteristics. + + Parameters: + ---------- + building : Building + The building object containing data on building demands and properties. + energy_system : EnergySystem + The energy system object containing demand types and components to be optimized. + + Procedure: + ---------- + 1. Identifies a design period based on building energy demands. + 2. Attempts to create and initialize individuals for the population until the specified population size + is reached or the maximum number of attempts is exceeded. + 3. Each individual is evaluated for feasibility based on defined constraints. + 4. Only feasible individuals are added to the population. + + Raises: + ------- + RuntimeError + If a feasible population of the specified size cannot be generated within the set maximum attempts. + + Notes: + ------ + - This method stops when we achieve feasible population size or raises an error if unsuccessful. + - The feasibility of each individual is checked to ensure that it satisfies the constraints of the + building and energy system demands. + - The `design_period_energy_demands` is used to evaluate individual feasibility based on specific demand + periods within the building’s operation. + """ design_period_energy_demands = self.design_period_identification(building) attempts = 0 - max_attempts = self.population_size * 5 + max_attempts = self.population_size * 20 while len(self.population) < self.population_size and attempts < max_attempts: individual = Individual(building=building, energy_system=energy_system, design_period_energy_demands=design_period_energy_demands, optimization_scenario=self.optimization_scenario) individual.initialization() + individual.score_evaluation() attempts += 1 - if self.initial_population_feasibility_check(individual, energy_system.demand_types, - design_period_energy_demands): + if individual.feasibility: self.population.append(individual) if len(self.population) < self.population_size: raise RuntimeError(f"Could not generate a feasible population of size {self.population_size}. " f"Only {len(self.population)} feasible individuals were generated.") - @staticmethod - def initial_population_feasibility_check(individual, demand_types, design_period_demands): - total_heating_capacity = sum( - component['heating_capacity'] for component in individual.individual['Generation Components'] - if component['heating_capacity'] is not None - ) - total_cooling_capacity = sum( - component['cooling_capacity'] for component in individual.individual['Generation Components'] - if component['cooling_capacity'] is not None - ) - max_heating_demand = max(design_period_demands[cte.HEATING]['demands']) / cte.WATTS_HOUR_TO_JULES - max_cooling_demand = max(design_period_demands[cte.COOLING]['demands']) / cte.WATTS_HOUR_TO_JULES - max_dhw_demand = max(design_period_demands[cte.DOMESTIC_HOT_WATER]['demands']) / cte.WATTS_HOUR_TO_JULES - if cte.HEATING in demand_types and total_heating_capacity < 0.5 * max_heating_demand: - return False - if cte.DOMESTIC_HOT_WATER in demand_types and total_heating_capacity < 0.5 * max_dhw_demand: - return False - if cte.COOLING in demand_types and total_cooling_capacity < 0.5 * max_cooling_demand: - return False - total_volume = sum( - component['volume'] for component in individual.individual['Energy Storage Components'] - if component['volume'] is not None - ) - max_storage_volume = individual.available_space * 0.1 - if total_volume > max_storage_volume: - return False - return True + def sbx_crossover(self, parent1, parent2): + """ + Simulated Binary Crossover (SBX) operator to produce offspring from two parent individuals by recombining their + attributes. This method is applied to both generation and storage components, using a distribution index + to control the diversity of the generated offspring. - def nsga2_selection(self, population, fronts, crowding_distances): + Parameters: + ---------- + parent1 : Individual + The first parent individual, containing attributes for generation and storage components. + parent2 : Individual + The second parent individual with a similar structure to parent1. + + Returns: + ------- + tuple + Two new offspring individuals (child1, child2) generated via the SBX crossover process. If crossover is + not performed (based on crossover probability), it returns deep copies of the parent individuals. + + Process: + ------- + 1. Determines whether crossover should occur based on a randomly generated probability and the crossover rate. + 2. For each component in the generation and storage attributes of both parents: + - Calculates crossover coefficients for each attribute based on a randomly generated value `u`. + - Uses a distribution index (`eta_c`) to compute the value of `beta`, which controls the spread of offspring. + - Generates two offspring values (`alpha1`, `alpha2`) based on beta and the parent values (x1 and x2). + - Ensures that all offspring values are positive; if `alpha1` or `alpha2` is non-positive, assigns the + average of `x1` and `x2` instead. + 3. Returns the two newly created offspring if crossover is successful; otherwise, returns copies of the parents. + + Details: + -------- + - `eta_c` (8): Distribution index influencing the SBX process; higher values result in offspring closer + to the parents, while lower values create more diversity. + - Each attribute in `Generation Components` (e.g., `heating_capacity`, `cooling_capacity`) and + `Energy Storage Components` (e.g., `capacity`, `volume`, `heating_coil_capacity`) has crossover applied. + - Ensures robustness by preventing the assignment of non-positive values to offspring attributes. + """ + eta_c = 8 # Distribution index for SBX + if random.random() < self.crossover_rate: + child1, child2 = copy.deepcopy(parent1), copy.deepcopy(parent2) + # Crossover for Generation Components + for i in range(len(parent1.individual['Generation Components'])): + for cap_type in ['heating_capacity', 'cooling_capacity']: + if parent1.individual['Generation Components'][i][cap_type] is not None: + x1 = parent1.individual['Generation Components'][i][cap_type] + x2 = parent2.individual['Generation Components'][i][cap_type] + u = random.random() + if u <= 0.5: + beta = (2 * u) ** (1 / (eta_c + 1)) + else: + beta = (1 / (2 * (1 - u))) ** (1 / (eta_c + 1)) + alpha1 = 0.5 * ((1 + beta) * x1 + (1 - beta) * x2) + alpha2 = 0.5 * ((1 - beta) * x1 + (1 + beta) * x2) + if alpha1 > 0: + child1.individual['Generation Components'][i][cap_type] = 0.5 * ((1 + beta) * x1 + (1 - beta) * x2) + else: + child1.individual['Generation Components'][i][cap_type] = 0.5 * (x1 + x2) + if alpha2 > 0: + child2.individual['Generation Components'][i][cap_type] = 0.5 * ((1 - beta) * x1 + (1 + beta) * x2) + else: + child2.individual['Generation Components'][i][cap_type] = 0.5 * (x1 + x2) + # Crossover for Energy Storage Components + for i in range(len(parent1.individual['Energy Storage Components'])): + for item in ['capacity', 'volume', 'heating_coil_capacity']: + if parent1.individual['Energy Storage Components'][i][item] is not None: + x1 = parent1.individual['Energy Storage Components'][i][item] + x2 = parent2.individual['Energy Storage Components'][i][item] + u = random.random() + if u <= 0.5: + beta = (2 * u) ** (1 / (eta_c + 1)) + else: + beta = (1 / (2 * (1 - u))) ** (1 / (eta_c + 1)) + alpha1 = 0.5 * ((1 + beta) * x1 + (1 - beta) * x2) + alpha2 = 0.5 * ((1 - beta) * x1 + (1 + beta) * x2) + if alpha1 > 0: + child1.individual['Energy Storage Components'][i][item] = 0.5 * ((1 + beta) * x1 + (1 - beta) * x2) + else: + child1.individual['Energy Storage Components'][i][item] = 0.5 * (x1 + x2) + if alpha2 > 0: + child2.individual['Energy Storage Components'][i][item] = 0.5 * ((1 - beta) * x1 + (1 + beta) * x2) + else: + child2.individual['Energy Storage Components'][i][item] = 0.5 * (x1 + x2) + return child1, child2 + else: + return copy.deepcopy(parent1), copy.deepcopy(parent2) + + def polynomial_mutation(self, individual, building, energy_system): + """ + Applies a Polynomial Mutation operator to modify the attributes of an individual's generation and storage components. + This operator introduces controlled variation in the population, guided by the mutation distribution index, to enhance + exploration within the search space. + + Parameters: + ---------- + individual : Individual + The individual to mutate, containing attributes for generation and storage components. + building : Building + The building data, used to determine constraints such as peak heating load and available space. + energy_system : EnergySystem + The energy system configuration with specified demand types (e.g., heating, cooling, domestic hot water). + + Returns: + ------- + Individual + The mutated individual, with potentially adjusted values for generation and storage components. + + Process: + ------- + 1. Defines the `polynomial_mutation_operator`, which applies the mutation based on the distribution index `eta_m` + and ensures the mutated value remains within defined bounds. + 2. For each generation component of the individual: + - If the mutation rate threshold is met, performs mutation on `heating_capacity` and `cooling_capacity`, + bounded by the design period demands for the specified energy types. + 3. For each energy storage component of the individual: + - If the mutation rate threshold is met, performs mutation on `volume` and `heating_coil_capacity` based on + available storage space and design period demands. + 4. Returns the mutated individual. + + Details: + -------- + - `eta_m` (20): Mutation distribution index that controls the spread of the mutated values; higher values concentrate + mutated values near the original, while lower values increase variability. + - Uses design period demands to set mutation bounds, ensuring capacities stay within feasible ranges for heating, + cooling, and hot water demands. + - Each mutation is bounded to avoid negative or unreasonably large values. + """ + eta_m = 20 # Mutation distribution index + design_period_energy_demands = self.design_period_identification(building) + + def polynomial_mutation_operator(value, lower_bound, upper_bound): + u = random.random() + if u < 0.5: + delta_q = (2 * u) ** (1 / (eta_m + 1)) - 1 + else: + delta_q = 1 - (2 * (1 - u)) ** (1 / (eta_m + 1)) + mutated_value = value + delta_q * (upper_bound - lower_bound) + return mutated_value + + # Mutate Generation Components + for generation_component in individual['Generation Components']: + if random.random() < self.mutation_rate: + if (generation_component['nominal_heating_efficiency'] is not None and + (cte.HEATING or cte.DOMESTIC_HOT_WATER in energy_system.demand_types)): + if cte.HEATING in energy_system.demand_types: + max_demand = max(design_period_energy_demands[cte.HEATING]['demands']) / cte.WATTS_HOUR_TO_JULES + else: + max_demand = max(design_period_energy_demands[cte.DOMESTIC_HOT_WATER]['demands']) / cte.WATTS_HOUR_TO_JULES + generation_component['heating_capacity'] = abs(polynomial_mutation_operator( + generation_component['heating_capacity'], 0, max_demand)) + if generation_component['nominal_cooling_efficiency'] is not None and cte.COOLING in energy_system.demand_types: + max_cooling_demand = max(design_period_energy_demands[cte.COOLING]['demands']) / cte.WATTS_HOUR_TO_JULES + generation_component['cooling_capacity'] = polynomial_mutation_operator( + generation_component['cooling_capacity'], 0, max_cooling_demand) + # Mutate Storage Components + for storage_component in individual['Energy Storage Components']: + if random.random() < self.mutation_rate: + if storage_component['type'] == f'{cte.THERMAL}_storage': + max_available_space = 0.01 * building.volume / building.storeys_above_ground + storage_component['volume'] = abs(polynomial_mutation_operator(storage_component['volume'], 0, + max_available_space)) + if storage_component['heating_coil_capacity'] is not None: + if cte.HEATING in energy_system.demand_types: + max_heating_demand = max(design_period_energy_demands[cte.HEATING]['demands']) / cte.WATTS_HOUR_TO_JULES + else: + max_heating_demand = max( + design_period_energy_demands[cte.DOMESTIC_HOT_WATER]['demands']) / cte.WATTS_HOUR_TO_JULES + + storage_component['heating_coil_capacity'] = abs(polynomial_mutation_operator( + storage_component['heating_coil_capacity'], 0, max_heating_demand)) + return individual + + def fast_non_dominated_sorting(self): + """ + Performs fast non-dominated sorting on the current population to categorize individuals + into different Pareto fronts based on dominance. This method is part of the NSGA-II algorithm + and is used to identify non-dominated solutions at each front level. + + Returns: + fronts (list of lists): A list of Pareto fronts, where each front is a list of indices + corresponding to individuals in the population. The first front + (fronts[0]) contains non-dominated individuals, the second front + contains solutions dominated by the first front, and so on. + + Workflow: + 1. Initializes an empty list `s` for each individual, representing individuals it dominates. + Also initializes `n`, a count of how many individuals dominate each individual, and + `rank` to store the Pareto front rank of each individual. + + 2. For each individual `p`, compares it with every other individual `q` to determine + dominance. If `p` dominates `q`, `q` is added to `s[p]`. If `q` dominates `p`, + `n[p]` is incremented. + + 3. Individuals with `n[p] == 0` are non-dominated and belong to the first front, so they + are assigned a rank of 0 and added to `fronts[0]`. + + 4. Iteratively builds additional fronts: + - For each front, checks each individual in the front, reducing the `n` count of + individuals it dominates (`s[p]`). + - If `n[q]` becomes zero, `q` is non-dominated relative to the current front and is + added to the next front. + - This continues until no more individuals can be added to new fronts. + + 5. After identifying all fronts, assigns a rank to each individual in each front. + """ + population = self.population + s = [[] for _ in range(len(population))] + n = [0] * len(population) + rank = [0] * len(population) + fronts = [[]] + for p in range(len(population)): + for q in range(len(population)): + if self.domination_status(population[p], population[q]): + s[p].append(q) + elif self.domination_status(population[q], population[p]): + n[p] += 1 + if n[p] == 0: + rank[p] = 0 + fronts[0].append(p) + i = 0 + while fronts[i]: + next_front = set() + for p in fronts[i]: + for q in s[p]: + n[q] -= 1 + if n[q] == 0: + rank[q] = i + 1 + next_front.add(q) + i += 1 + fronts.append(list(next_front)) + del fronts[-1] + for (j, front) in enumerate(fronts): + for i in front: + self.population[i].rank = j + 1 + return fronts + + def calculate_crowding_distance(self, front, crowding_distance): + """ + Calculates the crowding distance for individuals within a given front in the population. + The crowding distance is a measure used in multi-objective optimization to maintain diversity + among non-dominated solutions by evaluating the density of solutions surrounding each individual. + + Parameters: + front (list): A list of indices representing the individuals in the current front. + crowding_distance (list): A list of crowding distances for each individual in the population, updated in place. + + Returns: + list: The updated crowding distances for each individual in the population. + + Methodology: + 1. For each objective (indexed by `j`), sorts the individuals in `front` based on their objective values. + 2. Assigns a very large crowding distance (1e12) to boundary individuals in each objective (first and last individuals after sorting). + 3. Retrieves the minimum and maximum values for the current objective within the sorted front. + 4. For non-boundary individuals, calculates the crowding distance by: + - Finding the difference in objective values between the neighboring individuals in the sorted list. + - Normalizing this difference by dividing by the range of the objective values (objective_max - objective_min). + - Adding this value to the individual's crowding distance. + 5. Updates each individual in `front` with their calculated crowding distance for later use in selection. + + Example: + crowding_distances = [0] * len(self.population) + self.calculate_crowding_distance(front, crowding_distances) + + Note: + The crowding distance helps prioritize individuals that are more isolated within the objective space, + thus promoting diversity within the Pareto front. + """ + for j in range(len(self.population[0].fitness_score)): + sorted_front = sorted(front, key=lambda x: self.population[x].individual['fitness_score'][j]) + crowding_distance[sorted_front[0]] = float(1e12) + crowding_distance[sorted_front[-1]] = float(1e12) + objective_min = self.population[sorted_front[0]].individual['fitness_score'][j] + objective_max = self.population[sorted_front[-1]].individual['fitness_score'][j] + if objective_max != objective_min: + for i in range(1, len(sorted_front) - 1): + crowding_distance[sorted_front[i]] += ( + (self.population[sorted_front[i + 1]].individual['fitness_score'][j] - + self.population[sorted_front[i - 1]].individual['fitness_score'][j]) / + (objective_max - objective_min)) + for i in front: + self.population[i].crowding_distance = crowding_distance[i] + return crowding_distance + + def nsga2_selection(self, fronts): + """ + Selects the next generation population using the NSGA-II selection strategy. + This method filters the individuals in `fronts` based on non-dominated sorting and crowding distance, + ensuring a diverse and high-quality set of solutions for the next generation. + + Parameters: + fronts (list of lists): A list of fronts, where each front is a list of indices + representing non-dominated solutions in the population. + + Returns: + list: The selected population of individuals for the next generation, limited to `self.population_size`. + + Methodology: + 1. Iterates through each front in `fronts` (sorted by rank) to add individuals to the new population. + 2. For each front: + - Filters individuals with finite fitness scores to prevent propagation of invalid solutions. + - Adds these individuals to the new population until the population size limit is met. + 3. If the new population size limit is not yet reached: + - Sorts the current front based on crowding distance in descending order. + - Adds individuals based on crowding distance until reaching the population limit. + 4. Returns the selected population, ensuring that only the top solutions (by rank and crowding distance) are kept. + + Notes: + - Non-dominated sorting prioritizes individuals in lower-ranked fronts. + - Crowding distance promotes diversity within the population by favoring isolated solutions. + + Example: + selected_population = self.nsga2_selection(fronts) + + Important: + This method assumes that each individual's fitness score contains at least two objectives, + and any infinite fitness values are excluded from selection. + """ new_population = [] i = 0 while len(new_population) + len(fronts[i]) <= self.population_size: @@ -76,236 +425,180 @@ class MultiObjectiveGeneticAlgorithm: # Skip individuals with infinite fitness values to avoid propagation if not math.isinf(self.population[index].individual['fitness_score'][0]) and \ not math.isinf(self.population[index].individual['fitness_score'][1]): - new_population.append(population[index]) + new_population.append(self.population[index]) i += 1 if i >= len(fronts): break if len(new_population) < self.population_size and i < len(fronts): - sorted_front = sorted(fronts[i], key=lambda x: crowding_distances[i][x], reverse=True) + sorted_front = sorted(fronts[i], key=lambda x: self.population[x].crowding_distance, reverse=True) for index in sorted_front: if len(new_population) < self.population_size: if not math.isinf(self.population[index].individual['fitness_score'][0]) and \ not math.isinf(self.population[index].individual['fitness_score'][1]): - new_population.append(population[index]) + new_population.append(self.population[index]) else: break return new_population - def fast_non_dominated_sort(self): - population_size = self.population_size - s = [[] for _ in range(population_size)] - front = [[]] - n = [0] * population_size - rank = [0] * population_size - for p in range(population_size): - s[p] = [] - n[p] = 0 - for q in range(population_size): - if self.dominates(self.population[p], self.population[q]): - s[p].append(q) - elif self.dominates(self.population[q], self.population[p]): - n[p] += 1 - if n[p] == 0: - rank[p] = 0 - front[0].append(p) - i = 0 - while front[i]: - next_front = set() - for p in front[i]: - for q in s[p]: - n[q] -= 1 - if n[q] == 0: - rank[q] = i + 1 - next_front.add(q) - i += 1 - front.append(list(next_front)) - del front[-1] - return front - - @staticmethod - def dominates(individual1, individual2): - lcc1, lce1 = individual1.individual['fitness_score'] - lcc2, lce2 = individual2.individual['fitness_score'] - return (lcc1 <= lcc2 and lce1 <= lce2) and (lcc1 < lcc2 or lce1 < lce2) - - def calculate_crowding_distance(self, front): - crowding_distance = [0] * len(self.population) - for objective in ['lcc', 'total_energy_consumption']: - sorted_front = sorted(front, key=lambda x: self.population[x].individual[objective]) - # Set distances to finite large numbers rather than `inf` - crowding_distance[sorted_front[0]] = float(1e9) - crowding_distance[sorted_front[-1]] = float(1e9) - objective_min = self.population[sorted_front[0]].individual[objective] - objective_max = self.population[sorted_front[-1]].individual[objective] - if objective_max != objective_min: - for i in range(1, len(sorted_front) - 1): - crowding_distance[sorted_front[i]] += ( - (self.population[sorted_front[i + 1]].individual[objective] - - self.population[sorted_front[i - 1]].individual[objective]) / - (objective_max - objective_min)) - return crowding_distance - - def crossover(self, parent1, parent2): - """ - Crossover between two parents to produce two children. - swaps generation components and storage components between the two parents with a 50% chance. - :param parent1: First parent individual. - :param parent2: second parent individual. - :return: Two child individuals (child1 and child2). - """ - if random.random() < self.crossover_rate: - # Deep copy of the parents to create children - child1, child2 = copy.deepcopy(parent1), copy.deepcopy(parent2) - # Crossover for Generation Components - for i in range(len(parent1.individual['Generation Components'])): - if random.random() < 0.5: - # swap the entire generation component - child1.individual['Generation Components'][i], child2.individual['Generation Components'][i] = ( - child2.individual['Generation Components'][i], - child1.individual['Generation Components'][i] - ) - - # Crossover for Energy storage Components - for i in range(len(parent1.individual['Energy Storage Components'])): - if random.random() < 0.5: - # swap the entire storage component - child1.individual['Energy Storage Components'][i], child2.individual['Energy Storage Components'][i] = ( - child2.individual['Energy Storage Components'][i], - child1.individual['Energy Storage Components'][i] - ) - - return child1, child2 - else: - # If crossover does not happen, return copies of the original parents - return copy.deepcopy(parent1), copy.deepcopy(parent2) - - def mutate(self, individual, building, energy_system): - """ - Mutates the individual's generation and storage components. - - - `individual`: The individual to mutate (contains generation and storage components). - - `building`: Building data that contains constraints such as peak heating load and available space. - - Returns the mutated individual. - """ - design_period_energy_demands = self.design_period_identification(building) - # Mutate Generation Components - for generation_component in individual['Generation Components']: - if random.random() < self.mutation_rate: - if (generation_component['nominal_heating_efficiency'] is not None and cte.HEATING or cte.DOMESTIC_HOT_WATER in - energy_system.demand_types): - # Mutate heating capacity - if cte.HEATING in energy_system.demand_types: - generation_component['heating_capacity'] = random.uniform( - 0, max(design_period_energy_demands[cte.HEATING]['demands']) / cte.WATTS_HOUR_TO_JULES) - else: - generation_component['heating_capacity'] = random.uniform( - 0, max(design_period_energy_demands[cte.DOMESTIC_HOT_WATER]['demands']) / cte.WATTS_HOUR_TO_JULES) - if generation_component['nominal_cooling_efficiency'] is not None and cte.COOLING in energy_system.demand_types: - # Mutate cooling capacity - generation_component['cooling_capacity'] = random.uniform( - 0, max(design_period_energy_demands[cte.COOLING]['demands']) / cte.WATTS_HOUR_TO_JULES) - # Mutate storage Components - for storage_component in individual['Energy Storage Components']: - if random.random() < self.mutation_rate: - if storage_component['type'] == f'{cte.THERMAL}_storage': - # Mutate the volume of thermal storage - max_available_space = 0.01 * building.volume / building.storeys_above_ground - storage_component['volume'] = random.uniform(0, max_available_space) - if storage_component['heating_coil_capacity'] is not None: - if cte.HEATING in energy_system.demand_types: - storage_component['heating_coil_capacity'] = random.uniform(0, max( - design_period_energy_demands[cte.HEATING]['demands']) / cte.WATTS_HOUR_TO_JULES) - else: - storage_component['heating_coil_capacity'] = random.uniform(0, max( - design_period_energy_demands[cte.DOMESTIC_HOT_WATER]['demands']) / cte.WATTS_HOUR_TO_JULES) - return individual - def solve_ga(self, building, energy_system): + # df = pd.DataFrame() self.initialize_population(building, energy_system) - for individual in self.population: - individual.score_evaluation() - pareto_population = [] + for n in range(self.generations + 1): + print(n) + progeny_population = [] + while len(progeny_population) < self.population_size: - for generation in range(1, self.generations + 1): - print(f"Generation {generation}") - fronts = self.fast_non_dominated_sort() - - # Ensure the front calculation is valid - if not fronts or not fronts[0]: - print("Warning: No valid non-dominated front found.") - continue - - # Calculate crowding distances and select individuals - crowding_distances = [self.calculate_crowding_distance(front) for front in fronts] - self.population = self.nsga2_selection(self.population, fronts, crowding_distances) - - # Add only valid indices to pareto_population - pareto_population.extend([self.population[i] for i in fronts[0] if i < len(self.population)]) - - # Check population bounds - if len(pareto_population) > self.population_size: - pareto_population = pareto_population[:self.population_size] - - # Generate the next population through crossover and mutation - next_population = [] - while len(next_population) < self.population_size: parent1, parent2 = random.choice(self.population), random.choice(self.population) - child1, child2 = self.crossover(parent1, parent2) - self.mutate(child1.individual, building, energy_system) - self.mutate(child2.individual, building, energy_system) + child1, child2 = self.sbx_crossover(parent1, parent2) + self.polynomial_mutation(child1.individual, building, energy_system) + self.polynomial_mutation(child2.individual, building, energy_system) child1.score_evaluation() child2.score_evaluation() - next_population.extend([child1, child2]) + progeny_population.extend([child1, child2]) + self.population.extend(progeny_population) + fronts = self.fast_non_dominated_sorting() + crowding_distances = [0] * len(self.population) + for front in fronts: + self.calculate_crowding_distance(front=front, crowding_distance=crowding_distances) + new_population = self.nsga2_selection(fronts=fronts) + self.population = new_population + if n == self.generations: + fronts = self.fast_non_dominated_sorting() + self.pareto_front = [self.population[i] for i in fronts[0]] + pareto_fitness_scores = [individual.fitness_score for individual in self.pareto_front] + normalized_objectives = self.euclidean_normalization(pareto_fitness_scores) + performance_scores, selected_solution_indexes = self.topsis_decision_making(normalized_objectives) + self.selected_solutions = self.postprocess(self.pareto_front, selected_solution_indexes) + print(self.selected_solutions) + self.plot_pareto_front(self.pareto_front) - self.population = next_population[:self.population_size] + def plot_pareto_front(self, pareto_front): + """ + Plots the Pareto front for a given set of individuals in the population, using normalized fitness scores. + This method supports any number of objective functions, with dynamically labeled axes. - # Ensure pareto_population contains the correct non-dominated individuals before TOPSIS - if not pareto_population: - print("No Pareto solutions found during optimization.") - return None + Parameters: + pareto_front (list): List of individuals that are part of the Pareto front. - # Recalculate pareto front indices based on updated pareto_population - fronts = self.fast_non_dominated_sort() - pareto_front_indices = fronts[0] if fronts else [] - pareto_front_indices = [i for i in pareto_front_indices if i < len(pareto_population)] - print(f"Final pareto_front_indices: {pareto_front_indices}, pareto_population size: {len(pareto_population)}") + Returns: + None: Displays a scatter plot with lines connecting the Pareto-optimal points. - if not pareto_front_indices: - print("No valid solution found after TOPSIS due to empty pareto front indices.") - return None + Process: + - Normalizes the fitness scores using Euclidean normalization. + - Generates scatter plots for each pair of objectives. + - Connects the sorted Pareto-optimal points with a smooth, thick red line. - global_pareto_front = [pareto_population[i] for i in pareto_front_indices] + Attributes: + - Objectives on each axis are dynamically labeled as "Objective Function _", where the name is + derived from the optimization scenario. + - Adjusted marker and line thicknesses ensure enhanced visibility of the plot. - # Get the top N solutions with TOPSIS - top_n = 3 # Adjust this value based on how many top solutions you want to explore - self.best_solution = self.topsis_decision_making(global_pareto_front, top_n=top_n) + Steps: + 1. Normalize the fitness scores of the Pareto front. + 2. Create scatter plots for each pair of objectives. + 3. Connect the sorted points for each objective pair with a red line. + 4. Label each axis based on objective function index and scenario name. - # Print the top N solutions - if self.best_solution: - print("Top solutions after TOPSIS:") - for idx, solution in enumerate(self.best_solution, 1): - print(f"Solution {idx}: LCC = {solution.individual['lcc']}, " - f"LCE = {solution.individual['total_energy_consumption']}") - else: - print("No valid solutions found after TOPSIS.") + Example: + self.plot_pareto_front(pareto_front) - if pareto_population: - self.plot_pareto_front(pareto_population) + Note: + - This method assumes that each individual in the Pareto front has a fitness score with at least two objectives. + """ + # Normalize Pareto scores + pareto_scores = [individual.fitness_score for individual in pareto_front] + normalized_pareto_scores = self.euclidean_normalization(pareto_scores) + normalized_pareto_scores = list(zip(*normalized_pareto_scores)) # Transpose for easy access by objective index - return self.best_solution + # Extract the number of objectives and the objective names + num_objectives = len(normalized_pareto_scores) + objectives = self.optimization_scenario.split('_') + + # Set up subplots for each pair of objectives + fig, axs = plt.subplots(num_objectives - 1, num_objectives - 1, figsize=(15, 10)) + for i in range(num_objectives - 1): + for j in range(i + 1, num_objectives): + # Sort points for smoother line connections + sorted_pairs = sorted(zip(normalized_pareto_scores[i], normalized_pareto_scores[j])) + sorted_x, sorted_y = zip(*sorted_pairs) + + # Plot each objective pair + ax = axs[i, j - 1] if num_objectives > 2 else axs + ax.scatter( + normalized_pareto_scores[i], + normalized_pareto_scores[j], + color='blue', + alpha=0.7, + edgecolors='black', + s=100 # Larger point size for visibility + ) + # Plot smoother, thicker line connecting Pareto points + ax.plot( + sorted_x, + sorted_y, + color='red', + linewidth=3 # Thicker line for better visibility + ) + + # Dynamic axis labels based on objective function names + ax.set_xlabel(f"Objective Function {i + 1}_{objectives[i]}") + ax.set_ylabel(f"Objective Function {j + 1}_{objectives[j]}") + ax.grid(True) + + # Set title and layout adjustments + fig.suptitle("Pareto Front (Normalized Fitness Scores)") + plt.tight_layout() + plt.show() @staticmethod def design_period_identification(building): + """ + Identifies the design period for heating, cooling, and domestic hot water (DHW) demands based on the building's + daily energy requirements. This method focuses on selecting time windows with the highest daily demands for + accurate energy system sizing. + + Parameters: + ---------- + building : Building + The building object containing hourly energy demand data for heating, cooling, and domestic hot water. + + Returns: + ------- + dict + A dictionary with keys for each demand type ('heating', 'cooling', 'domestic_hot_water'), where each entry + contains: + - 'demands': A list of hourly demand values within the identified design period. + - 'start_index': The start hour index for the identified period. + - 'end_index': The end hour index for the identified period. + + Procedure: + ---------- + 1. Calculates the total demand for each day for heating, cooling, and DHW. + 2. Identifies the day with the highest demand for each type. + 3. Selects a time window around each peak-demand day as the design period: + - For middle days, it selects the preceding and following days. + - For first and last days, it adjusts the window to ensure it stays within the range of available data. + + Notes: + ------ + - This design period serves as a critical input for system sizing, ensuring the energy system meets the + maximum demand requirements efficiently. + - Each demand type has a separate design period tailored to its specific daily peak requirements. + {'heating': {'demands': [...], 'start_index': ..., 'end_index': ...}, + 'cooling': {'demands': [...], 'start_index': ..., 'end_index': ...}, + 'domestic_hot_water': {'demands': [...], 'start_index': ..., 'end_index': ...} + + """ + def get_start_end_indices(max_day_index, total_days): - if max_day_index > 0 and max_day_index < total_days - 1: + if 0 < max_day_index < total_days - 1: start_index = (max_day_index - 1) * 24 end_index = (max_day_index + 2) * 24 elif max_day_index == 0: start_index = 0 end_index = (max_day_index + 2) * 24 else: - start_index = (max_day_index - 1) * 24 + start_index = (max_day_index - 2) * 24 end_index = total_days * 24 return start_index, end_index @@ -334,85 +627,189 @@ class MultiObjectiveGeneticAlgorithm: 'start_index': dhw_start, 'end_index': dhw_end} } - def topsis_decision_making(self, pareto_front, top_n=5): + @staticmethod + def domination_status(individual1, individual2): """ - Perform TOPSIS decision-making to choose the best solutions from the Pareto front. + Determines if `individual1` dominates `individual2` based on their fitness scores (objective values). + Domination in this context means that `individual1` is no worse than `individual2` in all objectives + and strictly better in at least one. - :param pareto_front: List of individuals in the Pareto front - :param top_n: Number of top solutions to select based on TOPSIS ranking - :return: List of top N individuals based on TOPSIS ranking + Parameters: + individual1: The first individual (object) being compared. + individual2: The second individual (object) being compared. + + Returns: + bool: True if `individual1` dominates `individual2`, False otherwise. + + Methodology: + 1. Retrieves the list of objective scores (fitness scores) for both individuals. + 2. Ensures both individuals have the same number of objectives; otherwise, raises an AssertionError. + 3. Initializes two flags: + - `better_in_all`: Checks if `individual1` is equal to or better than `individual2` in all objectives. + - `strictly_better_in_at_least_one`: Checks if `individual1` is strictly better than `individual2` in at + least one objective. + 4. Iterates over each objective of both individuals: + - If `individual1` has a worse value in any objective, sets `better_in_all` to False. + - If `individual1` has a better value in any objective, sets `strictly_better_in_at_least_one` to True. + 5. Concludes that `individual1` dominates `individual2` if `better_in_all` is True and + `strictly_better_in_at_least_one` is True. """ - # Filter out infinite values from the pareto front before processing - pareto_front = [ind for ind in pareto_front if all(math.isfinite(v) for v in ind.individual['fitness_score'])] - if not pareto_front: - return None - - # Step 1: Normalize the objective functions (cost and energy consumption) - min_lcc = min([ind.individual['lcc'] for ind in pareto_front]) - max_lcc = max([ind.individual['lcc'] for ind in pareto_front]) - min_lce = min([ind.individual['total_energy_consumption'] for ind in pareto_front]) - max_lce = max([ind.individual['total_energy_consumption'] for ind in pareto_front]) - - normalized_pareto_front = [] - for ind in pareto_front: - normalized_lcc = (ind.individual['lcc'] - min_lcc) / (max_lcc - min_lcc) if max_lcc > min_lcc else 0 - normalized_lce = (ind.individual['total_energy_consumption'] - min_lce) / ( - max_lce - min_lce) if max_lce > min_lce else 0 - normalized_pareto_front.append((ind, normalized_lcc, normalized_lce)) - - # Step 2: Calculate the ideal and worst solutions - ideal_solution = (0, 0) # Ideal is minimum LCC and minimum LCE (0, 0 after normalization) - worst_solution = (1, 1) # Worst is maximum LCC and maximum LCE (1, 1 after normalization) - - # Step 3: Calculate the distance to the ideal and worst solutions - best_distances = [] - worst_distances = [] - for ind, normalized_lcc, normalized_lce in normalized_pareto_front: - distance_to_ideal = math.sqrt( - (normalized_lcc - ideal_solution[0]) ** 2 + (normalized_lce - ideal_solution[1]) ** 2) - distance_to_worst = math.sqrt( - (normalized_lcc - worst_solution[0]) ** 2 + (normalized_lce - worst_solution[1]) ** 2) - best_distances.append(distance_to_ideal) - worst_distances.append(distance_to_worst) - - # Step 4: Calculate relative closeness to the ideal solution - similarity = [worst / (best + worst) for best, worst in zip(best_distances, worst_distances)] - - # Step 5: Select the top N individuals with the highest similarity scores - top_indices = sorted(range(len(similarity)), key=lambda i: similarity[i], reverse=True)[:top_n] - top_solutions = [pareto_front[i] for i in top_indices] - - # Plot the similarity scores - self.plot_topsis_similarity(similarity) - - return top_solutions + # Extract the list of objectives for both individuals + objectives1 = individual1.individual['fitness_score'] + objectives2 = individual2.individual['fitness_score'] + # Ensure both individuals have the same number of objectives + assert len(objectives1) == len(objectives2), "Both individuals must have the same number of objectives" + # Flags to check if one dominates the other + better_in_all = True # Whether individual1 is better in all objectives + strictly_better_in_at_least_one = False # Whether individual1 is strictly better in at least one objective + for obj1, obj2 in zip(objectives1, objectives2): + if obj1 > obj2: + better_in_all = False # individual1 is worse in at least one objective + if obj1 < obj2: + strictly_better_in_at_least_one = True # individual1 is better in at least one objective + result = True if better_in_all and strictly_better_in_at_least_one else False + return result @staticmethod - def plot_topsis_similarity(similarity): + def euclidean_normalization(fitness_scores): """ - Plot the TOPSIS similarity scores for visualization. + Normalizes the fitness scores for a set of individuals using Euclidean normalization. - :param similarity: List of similarity scores for each individual in the Pareto front + Parameters: + fitness_scores (list of lists/tuples): A list where each element represents an individual's fitness scores + across multiple objectives, e.g., [(obj1, obj2, ...), ...]. + + Returns: + list of tuples: A list of tuples representing the normalized fitness scores for each individual. Each tuple + contains the normalized values for each objective, scaled between 0 and 1 relative to the + Euclidean length of each objective across the population. + + Process: + - For each objective, calculates the sum of squares across all individuals. + - Normalizes each individual's objective score by dividing it by the square root of the sum of squares + for that objective. + - If the sum of squares for any objective is zero (to avoid division by zero), assigns a normalized score + of zero for that objective. + + Example: + Given a fitness score input of [[3, 4], [1, 2]], this method will normalize each objective and return: + [(0.9487, 0.8944), (0.3162, 0.4472)]. + + Notes: + - This normalization is helpful for multi-objective optimization, as it scales each objective score + independently, ensuring comparability even with different objective ranges. + + Steps: + 1. Calculate the sum of squares for each objective across all individuals. + 2. Divide each objective value by the square root of the corresponding sum of squares. + 3. Return the normalized values as a list of tuples, preserving the individual-objective structure. """ - plt.figure(figsize=(10, 6)) - plt.plot(range(len(similarity)), similarity, 'bo-', label='TOPSIS Similarity Scores') - plt.xlabel('Pareto Front Solution Index') - plt.ylabel('Similarity Score') - plt.title('TOPSIS Similarity Scores for Pareto Front Solutions') - plt.legend() - plt.grid(True) - plt.show() + num_objectives = len(fitness_scores[0]) + # Calculate sum of squares for each objective + sum_squared_objectives = [ + sum(fitness_score[i] ** 2 for fitness_score in fitness_scores) + for i in range(num_objectives) + ] + # Normalize the objective values for each individual + normalized_objective_functions = [ + tuple( + fitness_score[j] / math.sqrt(sum_squared_objectives[j]) if sum_squared_objectives[j] > 0 else 0 + for j in range(num_objectives) + ) + for fitness_score in fitness_scores + ] + return normalized_objective_functions @staticmethod - def plot_pareto_front(pareto_population): - # Extract LCC and LCE for plotting - lcc_values = [individual.individual['lcc'] for individual in pareto_population] - lce_values = [individual.individual['total_energy_consumption'] for individual in pareto_population] - plt.figure(figsize=(10, 6)) - plt.scatter(lcc_values, lce_values, color='blue', label='Pareto Front', alpha=0.6, edgecolors='w', s=80) - plt.title('Pareto Front for Life Cycle Cost vs Life Cycle Energy') - plt.xlabel('Life Cycle Cost (LCC)') - plt.ylabel('Life Cycle Energy (LCE)') - plt.grid(True) - plt.legend() - plt.show() + def topsis_decision_making(normalized_objective_functions): + """ + Applies the Technique for Order of Preference by Similarity to Ideal Solution (TOPSIS) method for decision-making + in multi-objective optimization. This method ranks solutions based on their proximity to the ideal best and + worst solutions. + + Parameters: + normalized_objective_functions (list of tuples): A list where each element represents a set of normalized + objective values for an individual, e.g., + [(obj1, obj2, ...), ...]. + + Returns: + list of float: A list of performance scores for each individual, where higher scores indicate closer + proximity to the ideal best solution. + + Process: + - Identifies the ideal best and ideal worst values for each objective based on normalized objective scores. + - Computes the Euclidean distance of each individual from both the ideal best and ideal worst. + - Calculates a performance score for each individual by dividing the distance to the ideal worst by the + sum of the distances to the ideal best and worst. + + Example: + Given normalized objectives [[0.8, 0.6], [0.3, 0.4]], the method will return performance scores + based on their proximity to ideal solutions. + + Notes: + - TOPSIS helps to balance trade-offs in multi-objective optimization by quantifying how close each solution + is to the ideal, enabling effective ranking of solutions. + - This method is commonly used when both minimization and maximization objectives are present. + + Steps: + 1. Determine the ideal best (minimum) and ideal worst (maximum) for each objective. + 2. Compute Euclidean distances from each solution to the ideal best and ideal worst. + 3. Calculate the performance score for each solution based on these distances. + """ + # Find ideal best and ideal worst + selected_solutions = {} + ideal_best = [] + ideal_worst = [] + num_objectives = len(normalized_objective_functions[0]) + for j in range(num_objectives): + objective_values = [ind[j] for ind in normalized_objective_functions] + ideal_best.append(min(objective_values)) + ideal_worst.append(max(objective_values)) + selected_solutions[f'best_objective_{j + 1}_index'] = objective_values.index(min(objective_values)) + # Calculate Euclidean distances to ideal best and ideal worst + distances_to_best = [] + distances_to_worst = [] + for normalized_values in normalized_objective_functions: + # Distance to ideal best + distance_best = math.sqrt( + sum((normalized_values[j] - ideal_best[j]) ** 2 for j in range(num_objectives)) + ) + distances_to_best.append(distance_best) + # Distance to ideal worst + distance_worst = math.sqrt( + sum((normalized_values[j] - ideal_worst[j]) ** 2 for j in range(num_objectives)) + ) + distances_to_worst.append(distance_worst) + performance_scores = [distances_to_worst[i] / (distances_to_best[i] + distances_to_worst[i]) + for i in range(len(normalized_objective_functions))] + selected_solutions['best_solution_index'] = performance_scores.index(max(performance_scores)) + return performance_scores, selected_solutions + + def postprocess(self, pareto_front, selected_solution_indexes): + selected_solutions = {} + for solution_type in selected_solution_indexes: + selected_solutions[solution_type] = {} + index = selected_solution_indexes[solution_type] + individual = pareto_front[index].individual + selected_solutions[solution_type]['Generation Components'] = [] + selected_solutions[solution_type]['Storage Components'] = [] + for generation_component in individual['Generation Components']: + generation_type = generation_component['type'] + heating_capacity = generation_component['heating_capacity'] + cooling_capacity = generation_component['cooling_capacity'] + selected_solutions[solution_type]['Generation Components'].append({'component type': generation_type, + 'heating capacity (W)': heating_capacity, + 'cooling capacity (W)': cooling_capacity}) + for storage_component in individual['Energy Storage Components']: + storage_type = storage_component['type'] + capacity = storage_component['capacity'] + volume = storage_component['volume'] + selected_solutions[solution_type]['Storage Components'].append({'storage type': storage_type, + 'capacity (W)': capacity, + 'volume (m3)': volume}) + if 'energy-consumption' in self.optimization_scenario: + selected_solutions[solution_type]['total energy consumption kWh'] = individual['total_energy_consumption'] + if 'cost' in self.optimization_scenario: + selected_solutions[solution_type]['life cycle cost'] = individual['lcc'] + + return selected_solutions diff --git a/energy_system_modelling_package/energy_system_modelling_factories/system_sizing_methods/genetic_algorithm/multi_objective_genetic_algorithm_rethinking.py b/energy_system_modelling_package/energy_system_modelling_factories/system_sizing_methods/genetic_algorithm/multi_objective_genetic_algorithm_rethinking.py deleted file mode 100644 index 35ff872d..00000000 --- a/energy_system_modelling_package/energy_system_modelling_factories/system_sizing_methods/genetic_algorithm/multi_objective_genetic_algorithm_rethinking.py +++ /dev/null @@ -1,336 +0,0 @@ -import copy -import math -import random -import hub.helpers.constants as cte -from energy_system_modelling_package.energy_system_modelling_factories.system_sizing_methods.genetic_algorithm.individual import \ - Individual -import matplotlib.pyplot as plt -import time - - -class MultiObjectiveGeneticAlgorithm: - def __init__(self, population_size=50, generations=50, crossover_rate=0.9, mutation_rate=0.33, - optimization_scenario=None, output_path=None): - self.population_size = population_size - self.population = [] - self.generations = generations - self.crossover_rate = crossover_rate - self.mutation_rate = mutation_rate - self.optimization_scenario = optimization_scenario - self.list_of_solutions = [] - self.best_solution = None - self.best_solution_generation = None - self.crowding_distances = None - self.output_path = output_path - - # Initialize Population - def initialize_population(self, building, energy_system): - design_period_energy_demands = self.design_period_identification(building) - attempts = 0 - max_attempts = self.population_size * 20 - while len(self.population) < self.population_size and attempts < max_attempts: - individual = Individual(building=building, - energy_system=energy_system, - design_period_energy_demands=design_period_energy_demands, - optimization_scenario=self.optimization_scenario) - individual.initialization() - individual.score_evaluation() - attempts += 1 - if individual.feasibility: - self.population.append(individual) - if len(self.population) < self.population_size: - raise RuntimeError(f"Could not generate a feasible population of size {self.population_size}. " - f"Only {len(self.population)} feasible individuals were generated.") - - - @staticmethod - def design_period_identification(building): - def get_start_end_indices(max_day_index, total_days): - if max_day_index > 0 and max_day_index < total_days - 1: - start_index = (max_day_index - 1) * 24 - end_index = (max_day_index + 2) * 24 - elif max_day_index == 0: - start_index = 0 - end_index = (max_day_index + 2) * 24 - else: - start_index = (max_day_index - 1) * 24 - end_index = total_days * 24 - return start_index, end_index - - # Calculate daily demands - heating_daily_demands = [sum(building.heating_demand[cte.HOUR][i:i + 24]) for i in - range(0, len(building.heating_demand[cte.HOUR]), 24)] - cooling_daily_demands = [sum(building.cooling_demand[cte.HOUR][i:i + 24]) for i in - range(0, len(building.cooling_demand[cte.HOUR]), 24)] - dhw_daily_demands = [sum(building.domestic_hot_water_heat_demand[cte.HOUR][i:i + 24]) for i in - range(0, len(building.domestic_hot_water_heat_demand[cte.HOUR]), 24)] - # Get the day with maximum demand for each type - heating_max_day = heating_daily_demands.index(max(heating_daily_demands)) - cooling_max_day = cooling_daily_demands.index(max(cooling_daily_demands)) - dhw_max_day = dhw_daily_demands.index(max(dhw_daily_demands)) - # Get the start and end indices for each demand type - heating_start, heating_end = get_start_end_indices(heating_max_day, len(heating_daily_demands)) - cooling_start, cooling_end = get_start_end_indices(cooling_max_day, len(cooling_daily_demands)) - dhw_start, dhw_end = get_start_end_indices(dhw_max_day, len(dhw_daily_demands)) - # Return the design period energy demands - return { - f'{cte.HEATING}': {'demands': building.heating_demand[cte.HOUR][heating_start:heating_end], - 'start_index': heating_start, 'end_index': heating_end}, - f'{cte.COOLING}': {'demands': building.cooling_demand[cte.HOUR][cooling_start:cooling_end], - 'start_index': cooling_start, 'end_index': cooling_end}, - f'{cte.DOMESTIC_HOT_WATER}': {'demands': building.domestic_hot_water_heat_demand[cte.HOUR][dhw_start:dhw_end], - 'start_index': dhw_start, 'end_index': dhw_end} - } - - def sbx_crossover(self, parent1, parent2): - eta_c = 2 # Distribution index for SBX - if random.random() < self.crossover_rate: - child1, child2 = copy.deepcopy(parent1), copy.deepcopy(parent2) - # Crossover for Generation Components - for i in range(len(parent1.individual['Generation Components'])): - for cap_type in ['heating_capacity', 'cooling_capacity']: - if parent1.individual['Generation Components'][i][cap_type] is not None: - x1 = parent1.individual['Generation Components'][i][cap_type] - x2 = parent2.individual['Generation Components'][i][cap_type] - if abs(x1 - x2) > 1e-14: # Avoid division by zero - u = random.random() - if u <= 0.5: - beta = (2 * u) ** (1 / (eta_c + 1)) - else: - beta = (1 / (2 * (1 - u))) ** (1 / (eta_c + 1)) - - child1.individual['Generation Components'][i][cap_type] = max(0.1, - 0.5 * ((1 + beta) * x1 + (1 - beta) * x2)) - child2.individual['Generation Components'][i][cap_type] = max(0.1, - 0.5 * ((1 - beta) * x1 + (1 + beta) * x2)) - # Crossover for Energy Storage Components - for i in range(len(parent1.individual['Energy Storage Components'])): - for item in ['capacity', 'volume', 'heating_coil_capacity']: - if parent1.individual['Energy Storage Components'][i][item] is not None: - x1 = parent1.individual['Energy Storage Components'][i][item] - x2 = parent2.individual['Energy Storage Components'][i][item] - if abs(x1 - x2) > 1e-14: # Avoid division by zero - u = random.random() - if u <= 0.5: - beta = (2 * u) ** (1 / (eta_c + 1)) - else: - beta = (1 / (2 * (1 - u))) ** (1 / (eta_c + 1)) - threshold1 = random.uniform(0, 0.01) * child1.available_space - child1.individual['Energy Storage Components'][i][item] = max(threshold1, - 0.5 * ((1 + beta) * x1 + (1 - beta) * x2)) - threshold2 = random.uniform(0, 0.01) * child1.available_space - child2.individual['Energy Storage Components'][i][item] = max(threshold2, - 0.5 * ((1 - beta) * x1 + (1 + beta) * x2)) - return child1, child2 - else: - return copy.deepcopy(parent1), copy.deepcopy(parent2) - - def polynomial_mutation(self, individual, building, energy_system): - """ - Mutates the individual's generation and storage components using the Polynomial Mutation Operator. - - `individual`: The individual to mutate (contains generation and storage components). - - `building`: Building data that contains constraints such as peak heating load and available space. - Returns the mutated individual. - """ - eta_m = 20 # Mutation distribution index (can be adjusted) - design_period_energy_demands = self.design_period_identification(building) - def polynomial_mutation_operator(value, lower_bound, upper_bound): - delta = (value - lower_bound) / (upper_bound - lower_bound) - u = random.random() - if u < 0.5: - delta_q = (2 * u) ** (1 / (eta_m + 1)) - 1 - else: - delta_q = 1 - (2 * (1 - u)) ** (1 / (eta_m + 1)) - mutated_value = value + delta_q * (upper_bound - lower_bound) - return max(lower_bound, min(mutated_value, upper_bound)) # Ensure it's within bounds - - # Mutate Generation Components - for generation_component in individual['Generation Components']: - if random.random() < self.mutation_rate: - if (generation_component['nominal_heating_efficiency'] is not None and - (cte.HEATING or cte.DOMESTIC_HOT_WATER in energy_system.demand_types)): - # Mutate heating capacity - if cte.HEATING in energy_system.demand_types: - max_demand = max(design_period_energy_demands[cte.HEATING]['demands']) / cte.WATTS_HOUR_TO_JULES - else: - max_demand = max(design_period_energy_demands[cte.DOMESTIC_HOT_WATER]['demands']) / cte.WATTS_HOUR_TO_JULES - generation_component['heating_capacity'] = polynomial_mutation_operator( - generation_component['heating_capacity'], 0.1, max_demand) - if generation_component['nominal_cooling_efficiency'] is not None and cte.COOLING in energy_system.demand_types: - # Mutate cooling capacity - max_cooling_demand = max(design_period_energy_demands[cte.COOLING]['demands']) / cte.WATTS_HOUR_TO_JULES - generation_component['cooling_capacity'] = polynomial_mutation_operator( - generation_component['cooling_capacity'], 0.1, max_cooling_demand) - - # Mutate Storage Components - for storage_component in individual['Energy Storage Components']: - if random.random() < self.mutation_rate: - if storage_component['type'] == f'{cte.THERMAL}_storage': - # Mutate the volume of thermal storage - max_available_space = 0.01 * building.volume / building.storeys_above_ground - lower_bound = random.uniform(0, 0.001) * max_available_space - storage_component['volume'] = polynomial_mutation_operator( - storage_component['volume'], 0, max_available_space) - - if storage_component['heating_coil_capacity'] is not None: - if cte.HEATING in energy_system.demand_types: - max_heating_demand = max(design_period_energy_demands[cte.HEATING]['demands']) / cte.WATTS_HOUR_TO_JULES - else: - max_heating_demand = max( - design_period_energy_demands[cte.DOMESTIC_HOT_WATER]['demands']) / cte.WATTS_HOUR_TO_JULES - - storage_component['heating_coil_capacity'] = polynomial_mutation_operator( - storage_component['heating_coil_capacity'], 0, max_heating_demand) - - return individual - - def fast_non_dominated_sorting(self): - population = self.population - s = [[] for _ in range(len(population))] - n = [0] * len(population) - rank = [0] * len(population) - fronts = [[]] - for p in range(len(population)): - for q in range(len(population)): - if self.domination_status(population[p], population[q]): - s[p].append(q) - elif self.domination_status(population[q], population[p]): - n[p] += 1 - if n[p] == 0: - rank[p] = 0 - fronts[0].append(p) - i = 0 - while fronts[i]: - next_front = set() - for p in fronts[i]: - for q in s[p]: - n[q] -= 1 - if n[q] == 0: - rank[q] = i + 1 - next_front.add(q) - i += 1 - fronts.append(list(next_front)) - del fronts[-1] - for (j, front) in enumerate(fronts): - for i in front: - self.population[i].rank = j + 1 - return fronts - - def calculate_crowding_distance(self, front, crowding_distance): - for j in range(len(self.population[0].fitness_score)): - - sorted_front = sorted(front, key=lambda x: self.population[x].individual['fitness_score'][j]) - # Set distances to finite large numbers rather than `inf` - crowding_distance[sorted_front[0]] = float(1e12) - crowding_distance[sorted_front[-1]] = float(1e12) - objective_min = self.population[sorted_front[0]].individual['fitness_score'][j] - objective_max = self.population[sorted_front[-1]].individual['fitness_score'][j] - if objective_max != objective_min: - for i in range(1, len(sorted_front) - 1): - crowding_distance[sorted_front[i]] += ( - (self.population[sorted_front[i + 1]].individual['fitness_score'][j] - - self.population[sorted_front[i - 1]].individual['fitness_score'][j]) / - (objective_max - objective_min)) - for i in front: - self.population[i].crowding_distance = crowding_distance[i] - return crowding_distance - - @staticmethod - def domination_status(individual1, individual2): - # Extract the list of objectives for both individuals - objectives1 = individual1.individual['fitness_score'] - objectives2 = individual2.individual['fitness_score'] - # Ensure both individuals have the same number of objectives - assert len(objectives1) == len(objectives2), "Both individuals must have the same number of objectives" - # Flags to check if one dominates the other - better_in_all = True # Whether individual1 is better in all objectives - strictly_better_in_at_least_one = False # Whether individual1 is strictly better in at least one objective - for obj1, obj2 in zip(objectives1, objectives2): - if obj1 > obj2: - better_in_all = False # individual1 is worse in at least one objective - if obj1 < obj2: - strictly_better_in_at_least_one = True # individual1 is better in at least one objective - result = True if better_in_all and strictly_better_in_at_least_one else False - return result - - def nsga2_selection(self, fronts): - new_population = [] - i = 0 - while len(new_population) + len(fronts[i]) <= self.population_size: - for index in fronts[i]: - # Skip individuals with infinite fitness values to avoid propagation - if not math.isinf(self.population[index].individual['fitness_score'][0]) and \ - not math.isinf(self.population[index].individual['fitness_score'][1]): - new_population.append(self.population[index]) - i += 1 - if i >= len(fronts): - break - if len(new_population) < self.population_size and i < len(fronts): - sorted_front = sorted(fronts[i], key=lambda x: self.population[x].crowding_distance, reverse=True) - for index in sorted_front: - if len(new_population) < self.population_size: - if not math.isinf(self.population[index].individual['fitness_score'][0]) and \ - not math.isinf(self.population[index].individual['fitness_score'][1]): - new_population.append(self.population[index]) - else: - break - return new_population - - def solve_ga(self, building, energy_system): - optimization_start = time.time() - self.initialize_population(building, energy_system) - for n in range(self.generations + 1): - generation_n_start_time = time.time() - print(f"Generation {n}") - progeny_population = [] - while len(progeny_population) < self.population_size: - parent1, parent2 = random.choice(self.population), random.choice(self.population) - child1, child2 = self.sbx_crossover(parent1, parent2) - self.polynomial_mutation(child1.individual, building, energy_system) - self.polynomial_mutation(child2.individual, building, energy_system) - child1.score_evaluation() - child2.score_evaluation() - progeny_population.extend([child1, child2]) - self.population.extend(progeny_population) - fronts = self.fast_non_dominated_sorting() - print([self.population[ind].fitness_score for ind in fronts[0]]) - print(fronts) - crowding_distances = [0] * len(self.population) - for front in fronts: - self.calculate_crowding_distance(front=front, crowding_distance=crowding_distances) - new_population = self.nsga2_selection(fronts=fronts) - self.population = new_population - generation_n_calc_duration = time.time() - generation_n_start_time - print(f"Generation {n:.2f} took {generation_n_calc_duration:.2f} seconds") - - if n == self.generations: - fronts = self.fast_non_dominated_sorting() - print(fronts) - self.plot_pareto_front(fronts) - optimization_duration = time.time() - optimization_start - print(f"NSGA-II took {optimization_duration:.2f} seconds") - - def plot_pareto_front(self, fronts): - # Extract LCC and LCE for plotting all individuals - lcc_values = [individual.individual['lcc'] for individual in self.population] - lce_values = [individual.individual['total_energy_consumption'] for individual in self.population] - # Plot all individuals as blue points - plt.figure(figsize=(10, 6)) - plt.scatter(lcc_values, lce_values, color='blue', label='Population', alpha=0.6, edgecolors='w', s=80) - # Extract and sort LCC and LCE for individuals in the first front (front_0) to ensure a connected line - front_0 = [self.population[i] for i in fronts[0]] - front_0_lcc = [individual.individual['lcc'] for individual in front_0] - front_0_lce = [individual.individual['total_energy_consumption'] for individual in front_0] - # Sort front_0 individuals by LCC or LCE to connect them in order (optional step for smooth line) - sorted_front_0 = sorted(zip(front_0_lcc, front_0_lce), key=lambda x: x[0]) - sorted_lcc, sorted_lce = zip(*sorted_front_0) - # Plot the Pareto front (front_0) as a red line - plt.plot(sorted_lcc, sorted_lce, color='red', label='Pareto Front (Front 0)', linewidth=2) - # Configure plot details - plt.title('Pareto Front for Life Cycle Cost vs Life Cycle Energy') - plt.xlabel('Life Cycle Cost (LCC)') - plt.ylabel('Life Cycle Energy (LCE)') - plt.grid(True) - plt.legend() - plt.show() - diff --git a/test.py b/test.py index 310936c8..644d3548 100644 --- a/test.py +++ b/test.py @@ -3,7 +3,7 @@ import subprocess from building_modelling.ep_run_enrich import energy_plus_workflow from energy_system_modelling_package.energy_system_modelling_factories.montreal_energy_system_archetype_modelling_factory import \ MontrealEnergySystemArchetypesSimulationFactory -from energy_system_modelling_package.energy_system_modelling_factories.system_sizing_methods.genetic_algorithm.multi_objective_genetic_algorithm_rethinking import MultiObjectiveGeneticAlgorithm +from energy_system_modelling_package.energy_system_modelling_factories.system_sizing_methods.genetic_algorithm.multi_objective_genetic_algorithm import MultiObjectiveGeneticAlgorithm from hub.imports.geometry_factory import GeometryFactory from hub.helpers.dictionaries import Dictionaries from hub.imports.construction_factory import ConstructionFactory @@ -55,4 +55,4 @@ random_assignation.call_random(city.buildings, random_assignation.residential_ne EnergySystemsFactory('montreal_future', city).enrich() for building in city.buildings: energy_system = building.energy_systems[1] - MultiObjectiveGeneticAlgorithm(optimization_scenario='cost_energy_consumption').solve_ga(building, energy_system) + MultiObjectiveGeneticAlgorithm(optimization_scenario='energy-consumption_cost').solve_ga(building, energy_system)