From c47071bc2d3443d6ce52d1eac0bc77765ff9a0d3 Mon Sep 17 00:00:00 2001 From: s_ranjbar Date: Thu, 31 Oct 2024 09:50:12 +0000 Subject: [PATCH] fix: nsga-II is complete the only remaining step is TOPSIS decision-making --- ..._objective_genetic_algorithm_rethinking.py | 150 ++++++++++-------- 1 file changed, 86 insertions(+), 64 deletions(-) diff --git a/energy_system_modelling_package/energy_system_modelling_factories/system_sizing_methods/genetic_algorithm/multi_objective_genetic_algorithm_rethinking.py b/energy_system_modelling_package/energy_system_modelling_factories/system_sizing_methods/genetic_algorithm/multi_objective_genetic_algorithm_rethinking.py index 16284146..35ff872d 100644 --- a/energy_system_modelling_package/energy_system_modelling_factories/system_sizing_methods/genetic_algorithm/multi_objective_genetic_algorithm_rethinking.py +++ b/energy_system_modelling_package/energy_system_modelling_factories/system_sizing_methods/genetic_algorithm/multi_objective_genetic_algorithm_rethinking.py @@ -9,7 +9,7 @@ import time class MultiObjectiveGeneticAlgorithm: - def __init__(self, population_size=100, generations=50, crossover_rate=0.9, mutation_rate=0.1, + def __init__(self, population_size=50, generations=50, crossover_rate=0.9, mutation_rate=0.33, optimization_scenario=None, output_path=None): self.population_size = population_size self.population = [] @@ -23,13 +23,9 @@ class MultiObjectiveGeneticAlgorithm: self.crowding_distances = None self.output_path = output_path -# Initialize Population + # Initialize Population def initialize_population(self, building, energy_system): - design_period_start_time = time.time() design_period_energy_demands = self.design_period_identification(building) - design_period_time = time.time() - design_period_start_time - print(f"design period identification took {design_period_time:.2f} seconds") - initializing_time_start = time.time() attempts = 0 max_attempts = self.population_size * 20 while len(self.population) < self.population_size and attempts < max_attempts: @@ -45,8 +41,7 @@ class MultiObjectiveGeneticAlgorithm: if len(self.population) < self.population_size: raise RuntimeError(f"Could not generate a feasible population of size {self.population_size}. " f"Only {len(self.population)} feasible individuals were generated.") - initializing_time = time.time() - initializing_time_start - print(f"initializing took {initializing_time:.2f} seconds") + @staticmethod def design_period_identification(building): @@ -91,7 +86,7 @@ class MultiObjectiveGeneticAlgorithm: eta_c = 2 # Distribution index for SBX if random.random() < self.crossover_rate: child1, child2 = copy.deepcopy(parent1), copy.deepcopy(parent2) - # Crossover for Generation Components + # Crossover for Generation Components for i in range(len(parent1.individual['Generation Components'])): for cap_type in ['heating_capacity', 'cooling_capacity']: if parent1.individual['Generation Components'][i][cap_type] is not None: @@ -108,7 +103,7 @@ class MultiObjectiveGeneticAlgorithm: 0.5 * ((1 + beta) * x1 + (1 - beta) * x2)) child2.individual['Generation Components'][i][cap_type] = max(0.1, 0.5 * ((1 - beta) * x1 + (1 + beta) * x2)) - # Crossover for Energy Storage Components + # Crossover for Energy Storage Components for i in range(len(parent1.individual['Energy Storage Components'])): for item in ['capacity', 'volume', 'heating_coil_capacity']: if parent1.individual['Energy Storage Components'][i][item] is not None: @@ -133,24 +128,19 @@ class MultiObjectiveGeneticAlgorithm: def polynomial_mutation(self, individual, building, energy_system): """ Mutates the individual's generation and storage components using the Polynomial Mutation Operator. - - `individual`: The individual to mutate (contains generation and storage components). - `building`: Building data that contains constraints such as peak heating load and available space. - Returns the mutated individual. """ eta_m = 20 # Mutation distribution index (can be adjusted) design_period_energy_demands = self.design_period_identification(building) - def polynomial_mutation_operator(value, lower_bound, upper_bound): delta = (value - lower_bound) / (upper_bound - lower_bound) u = random.random() - if u < 0.5: delta_q = (2 * u) ** (1 / (eta_m + 1)) - 1 else: delta_q = 1 - (2 * (1 - u)) ** (1 / (eta_m + 1)) - mutated_value = value + delta_q * (upper_bound - lower_bound) return max(lower_bound, min(mutated_value, upper_bound)) # Ensure it's within bounds @@ -159,27 +149,26 @@ class MultiObjectiveGeneticAlgorithm: if random.random() < self.mutation_rate: if (generation_component['nominal_heating_efficiency'] is not None and (cte.HEATING or cte.DOMESTIC_HOT_WATER in energy_system.demand_types)): - # Mutate heating capacity + # Mutate heating capacity if cte.HEATING in energy_system.demand_types: max_demand = max(design_period_energy_demands[cte.HEATING]['demands']) / cte.WATTS_HOUR_TO_JULES else: max_demand = max(design_period_energy_demands[cte.DOMESTIC_HOT_WATER]['demands']) / cte.WATTS_HOUR_TO_JULES - generation_component['heating_capacity'] = polynomial_mutation_operator( - generation_component['heating_capacity'], 0, max_demand) - + generation_component['heating_capacity'], 0.1, max_demand) if generation_component['nominal_cooling_efficiency'] is not None and cte.COOLING in energy_system.demand_types: - # Mutate cooling capacity + # Mutate cooling capacity max_cooling_demand = max(design_period_energy_demands[cte.COOLING]['demands']) / cte.WATTS_HOUR_TO_JULES generation_component['cooling_capacity'] = polynomial_mutation_operator( - generation_component['cooling_capacity'], 0, max_cooling_demand) + generation_component['cooling_capacity'], 0.1, max_cooling_demand) # Mutate Storage Components for storage_component in individual['Energy Storage Components']: if random.random() < self.mutation_rate: if storage_component['type'] == f'{cte.THERMAL}_storage': - # Mutate the volume of thermal storage + # Mutate the volume of thermal storage max_available_space = 0.01 * building.volume / building.storeys_above_ground + lower_bound = random.uniform(0, 0.001) * max_available_space storage_component['volume'] = polynomial_mutation_operator( storage_component['volume'], 0, max_available_space) @@ -229,8 +218,8 @@ class MultiObjectiveGeneticAlgorithm: def calculate_crowding_distance(self, front, crowding_distance): for j in range(len(self.population[0].fitness_score)): + sorted_front = sorted(front, key=lambda x: self.population[x].individual['fitness_score'][j]) - print(sorted_front) # Set distances to finite large numbers rather than `inf` crowding_distance[sorted_front[0]] = float(1e12) crowding_distance[sorted_front[-1]] = float(1e12) @@ -242,6 +231,8 @@ class MultiObjectiveGeneticAlgorithm: (self.population[sorted_front[i + 1]].individual['fitness_score'][j] - self.population[sorted_front[i - 1]].individual['fitness_score'][j]) / (objective_max - objective_min)) + for i in front: + self.population[i].crowding_distance = crowding_distance[i] return crowding_distance @staticmethod @@ -249,14 +240,11 @@ class MultiObjectiveGeneticAlgorithm: # Extract the list of objectives for both individuals objectives1 = individual1.individual['fitness_score'] objectives2 = individual2.individual['fitness_score'] - # Ensure both individuals have the same number of objectives assert len(objectives1) == len(objectives2), "Both individuals must have the same number of objectives" - # Flags to check if one dominates the other better_in_all = True # Whether individual1 is better in all objectives strictly_better_in_at_least_one = False # Whether individual1 is strictly better in at least one objective - for obj1, obj2 in zip(objectives1, objectives2): if obj1 > obj2: better_in_all = False # individual1 is worse in at least one objective @@ -265,43 +253,80 @@ class MultiObjectiveGeneticAlgorithm: result = True if better_in_all and strictly_better_in_at_least_one else False return result - def solve_ga(self, building, energy_system): - self.initialize_population(building, energy_system) - progeny_population = [] - progeny_creation_time_start = time.time() - while len(progeny_population) < self.population_size: - parent1, parent2 = random.choice(self.population), random.choice(self.population) - child1, child2 = self.sbx_crossover(parent1, parent2) - self.polynomial_mutation(child1.individual, building, energy_system) - self.polynomial_mutation(child2.individual, building, energy_system) - child1.score_evaluation() - child2.score_evaluation() - progeny_population.extend([child1, child2]) - progeny_creation_duration = time.time() - progeny_creation_time_start - print(f"progeny population creation took {progeny_creation_duration:.2f} seconds") - self.population.extend(progeny_population) - print([ind.fitness_score for ind in self.population]) - ns_start_time = time.time() - fronts = self.fast_non_dominated_sorting() - ns_duration = time.time() - ns_start_time - print(f"non-dominated sorting took {ns_duration:.2f} seconds") - self.crowding_distances = [0] * len(self.population) - crowding_d_start_time = time.time() - for front in fronts: - print(front) - self.crowding_distance = self.calculate_crowding_distance(front=front, crowding_distance=self.crowding_distances) - print(self.crowding_distance) - crowding_duration = time.time() - crowding_d_start_time - print(f"crowding d calculation took {crowding_duration:.2f} seconds") - self.plot_pareto_front(self.population) + def nsga2_selection(self, fronts): + new_population = [] + i = 0 + while len(new_population) + len(fronts[i]) <= self.population_size: + for index in fronts[i]: + # Skip individuals with infinite fitness values to avoid propagation + if not math.isinf(self.population[index].individual['fitness_score'][0]) and \ + not math.isinf(self.population[index].individual['fitness_score'][1]): + new_population.append(self.population[index]) + i += 1 + if i >= len(fronts): + break + if len(new_population) < self.population_size and i < len(fronts): + sorted_front = sorted(fronts[i], key=lambda x: self.population[x].crowding_distance, reverse=True) + for index in sorted_front: + if len(new_population) < self.population_size: + if not math.isinf(self.population[index].individual['fitness_score'][0]) and \ + not math.isinf(self.population[index].individual['fitness_score'][1]): + new_population.append(self.population[index]) + else: + break + return new_population - @staticmethod - def plot_pareto_front(pareto_population): - # Extract LCC and LCE for plotting - lcc_values = [individual.individual['lcc'] for individual in pareto_population] - lce_values = [individual.individual['total_energy_consumption'] for individual in pareto_population] + def solve_ga(self, building, energy_system): + optimization_start = time.time() + self.initialize_population(building, energy_system) + for n in range(self.generations + 1): + generation_n_start_time = time.time() + print(f"Generation {n}") + progeny_population = [] + while len(progeny_population) < self.population_size: + parent1, parent2 = random.choice(self.population), random.choice(self.population) + child1, child2 = self.sbx_crossover(parent1, parent2) + self.polynomial_mutation(child1.individual, building, energy_system) + self.polynomial_mutation(child2.individual, building, energy_system) + child1.score_evaluation() + child2.score_evaluation() + progeny_population.extend([child1, child2]) + self.population.extend(progeny_population) + fronts = self.fast_non_dominated_sorting() + print([self.population[ind].fitness_score for ind in fronts[0]]) + print(fronts) + crowding_distances = [0] * len(self.population) + for front in fronts: + self.calculate_crowding_distance(front=front, crowding_distance=crowding_distances) + new_population = self.nsga2_selection(fronts=fronts) + self.population = new_population + generation_n_calc_duration = time.time() - generation_n_start_time + print(f"Generation {n:.2f} took {generation_n_calc_duration:.2f} seconds") + + if n == self.generations: + fronts = self.fast_non_dominated_sorting() + print(fronts) + self.plot_pareto_front(fronts) + optimization_duration = time.time() - optimization_start + print(f"NSGA-II took {optimization_duration:.2f} seconds") + + def plot_pareto_front(self, fronts): + # Extract LCC and LCE for plotting all individuals + lcc_values = [individual.individual['lcc'] for individual in self.population] + lce_values = [individual.individual['total_energy_consumption'] for individual in self.population] + # Plot all individuals as blue points plt.figure(figsize=(10, 6)) - plt.scatter(lcc_values, lce_values, color='blue', label='Pareto Front', alpha=0.6, edgecolors='w', s=80) + plt.scatter(lcc_values, lce_values, color='blue', label='Population', alpha=0.6, edgecolors='w', s=80) + # Extract and sort LCC and LCE for individuals in the first front (front_0) to ensure a connected line + front_0 = [self.population[i] for i in fronts[0]] + front_0_lcc = [individual.individual['lcc'] for individual in front_0] + front_0_lce = [individual.individual['total_energy_consumption'] for individual in front_0] + # Sort front_0 individuals by LCC or LCE to connect them in order (optional step for smooth line) + sorted_front_0 = sorted(zip(front_0_lcc, front_0_lce), key=lambda x: x[0]) + sorted_lcc, sorted_lce = zip(*sorted_front_0) + # Plot the Pareto front (front_0) as a red line + plt.plot(sorted_lcc, sorted_lce, color='red', label='Pareto Front (Front 0)', linewidth=2) + # Configure plot details plt.title('Pareto Front for Life Cycle Cost vs Life Cycle Energy') plt.xlabel('Life Cycle Cost (LCC)') plt.ylabel('Life Cycle Energy (LCE)') @@ -309,6 +334,3 @@ class MultiObjectiveGeneticAlgorithm: plt.legend() plt.show() - - -