From 57c23f290f3b2a3d88cd1dc894dc2531b02179b6 Mon Sep 17 00:00:00 2001 From: s_ranjbar Date: Fri, 25 Oct 2024 11:55:12 +0200 Subject: [PATCH] fix: multi objective optimization work continued --- .../genetic_algorithm/individual.py | 2 + ..._objective_genetic_algorithm_rethinking.py | 228 +++++++++++++++++- 2 files changed, 226 insertions(+), 4 deletions(-) diff --git a/energy_system_modelling_package/energy_system_modelling_factories/system_sizing_methods/genetic_algorithm/individual.py b/energy_system_modelling_package/energy_system_modelling_factories/system_sizing_methods/genetic_algorithm/individual.py index 5a25e72e..8232bac8 100644 --- a/energy_system_modelling_package/energy_system_modelling_factories/system_sizing_methods/genetic_algorithm/individual.py +++ b/energy_system_modelling_package/energy_system_modelling_factories/system_sizing_methods/genetic_algorithm/individual.py @@ -48,6 +48,8 @@ class Individual: self.costs = self.costs_archetype() self.feasibility = True self.fitness_score = 0 + self.rank = 0 + self.crowding_distance = 0 self.individual = {} def system_components(self): diff --git a/energy_system_modelling_package/energy_system_modelling_factories/system_sizing_methods/genetic_algorithm/multi_objective_genetic_algorithm_rethinking.py b/energy_system_modelling_package/energy_system_modelling_factories/system_sizing_methods/genetic_algorithm/multi_objective_genetic_algorithm_rethinking.py index 23ef7ff2..16284146 100644 --- a/energy_system_modelling_package/energy_system_modelling_factories/system_sizing_methods/genetic_algorithm/multi_objective_genetic_algorithm_rethinking.py +++ b/energy_system_modelling_package/energy_system_modelling_factories/system_sizing_methods/genetic_algorithm/multi_objective_genetic_algorithm_rethinking.py @@ -9,7 +9,7 @@ import time class MultiObjectiveGeneticAlgorithm: - def __init__(self, population_size=100, generations=50, crossover_rate=0.8, mutation_rate=0.1, + def __init__(self, population_size=100, generations=50, crossover_rate=0.9, mutation_rate=0.1, optimization_scenario=None, output_path=None): self.population_size = population_size self.population = [] @@ -20,6 +20,7 @@ class MultiObjectiveGeneticAlgorithm: self.list_of_solutions = [] self.best_solution = None self.best_solution_generation = None + self.crowding_distances = None self.output_path = output_path # Initialize Population @@ -47,7 +48,6 @@ class MultiObjectiveGeneticAlgorithm: initializing_time = time.time() - initializing_time_start print(f"initializing took {initializing_time:.2f} seconds") - @staticmethod def design_period_identification(building): def get_start_end_indices(max_day_index, total_days): @@ -87,8 +87,228 @@ class MultiObjectiveGeneticAlgorithm: 'start_index': dhw_start, 'end_index': dhw_end} } + def sbx_crossover(self, parent1, parent2): + eta_c = 2 # Distribution index for SBX + if random.random() < self.crossover_rate: + child1, child2 = copy.deepcopy(parent1), copy.deepcopy(parent2) + # Crossover for Generation Components + for i in range(len(parent1.individual['Generation Components'])): + for cap_type in ['heating_capacity', 'cooling_capacity']: + if parent1.individual['Generation Components'][i][cap_type] is not None: + x1 = parent1.individual['Generation Components'][i][cap_type] + x2 = parent2.individual['Generation Components'][i][cap_type] + if abs(x1 - x2) > 1e-14: # Avoid division by zero + u = random.random() + if u <= 0.5: + beta = (2 * u) ** (1 / (eta_c + 1)) + else: + beta = (1 / (2 * (1 - u))) ** (1 / (eta_c + 1)) + + child1.individual['Generation Components'][i][cap_type] = max(0.1, + 0.5 * ((1 + beta) * x1 + (1 - beta) * x2)) + child2.individual['Generation Components'][i][cap_type] = max(0.1, + 0.5 * ((1 - beta) * x1 + (1 + beta) * x2)) + # Crossover for Energy Storage Components + for i in range(len(parent1.individual['Energy Storage Components'])): + for item in ['capacity', 'volume', 'heating_coil_capacity']: + if parent1.individual['Energy Storage Components'][i][item] is not None: + x1 = parent1.individual['Energy Storage Components'][i][item] + x2 = parent2.individual['Energy Storage Components'][i][item] + if abs(x1 - x2) > 1e-14: # Avoid division by zero + u = random.random() + if u <= 0.5: + beta = (2 * u) ** (1 / (eta_c + 1)) + else: + beta = (1 / (2 * (1 - u))) ** (1 / (eta_c + 1)) + threshold1 = random.uniform(0, 0.01) * child1.available_space + child1.individual['Energy Storage Components'][i][item] = max(threshold1, + 0.5 * ((1 + beta) * x1 + (1 - beta) * x2)) + threshold2 = random.uniform(0, 0.01) * child1.available_space + child2.individual['Energy Storage Components'][i][item] = max(threshold2, + 0.5 * ((1 - beta) * x1 + (1 + beta) * x2)) + return child1, child2 + else: + return copy.deepcopy(parent1), copy.deepcopy(parent2) + + def polynomial_mutation(self, individual, building, energy_system): + """ + Mutates the individual's generation and storage components using the Polynomial Mutation Operator. + + - `individual`: The individual to mutate (contains generation and storage components). + - `building`: Building data that contains constraints such as peak heating load and available space. + + Returns the mutated individual. + """ + eta_m = 20 # Mutation distribution index (can be adjusted) + design_period_energy_demands = self.design_period_identification(building) + + def polynomial_mutation_operator(value, lower_bound, upper_bound): + delta = (value - lower_bound) / (upper_bound - lower_bound) + u = random.random() + + if u < 0.5: + delta_q = (2 * u) ** (1 / (eta_m + 1)) - 1 + else: + delta_q = 1 - (2 * (1 - u)) ** (1 / (eta_m + 1)) + + mutated_value = value + delta_q * (upper_bound - lower_bound) + return max(lower_bound, min(mutated_value, upper_bound)) # Ensure it's within bounds + + # Mutate Generation Components + for generation_component in individual['Generation Components']: + if random.random() < self.mutation_rate: + if (generation_component['nominal_heating_efficiency'] is not None and + (cte.HEATING or cte.DOMESTIC_HOT_WATER in energy_system.demand_types)): + # Mutate heating capacity + if cte.HEATING in energy_system.demand_types: + max_demand = max(design_period_energy_demands[cte.HEATING]['demands']) / cte.WATTS_HOUR_TO_JULES + else: + max_demand = max(design_period_energy_demands[cte.DOMESTIC_HOT_WATER]['demands']) / cte.WATTS_HOUR_TO_JULES + + generation_component['heating_capacity'] = polynomial_mutation_operator( + generation_component['heating_capacity'], 0, max_demand) + + if generation_component['nominal_cooling_efficiency'] is not None and cte.COOLING in energy_system.demand_types: + # Mutate cooling capacity + max_cooling_demand = max(design_period_energy_demands[cte.COOLING]['demands']) / cte.WATTS_HOUR_TO_JULES + generation_component['cooling_capacity'] = polynomial_mutation_operator( + generation_component['cooling_capacity'], 0, max_cooling_demand) + + # Mutate Storage Components + for storage_component in individual['Energy Storage Components']: + if random.random() < self.mutation_rate: + if storage_component['type'] == f'{cte.THERMAL}_storage': + # Mutate the volume of thermal storage + max_available_space = 0.01 * building.volume / building.storeys_above_ground + storage_component['volume'] = polynomial_mutation_operator( + storage_component['volume'], 0, max_available_space) + + if storage_component['heating_coil_capacity'] is not None: + if cte.HEATING in energy_system.demand_types: + max_heating_demand = max(design_period_energy_demands[cte.HEATING]['demands']) / cte.WATTS_HOUR_TO_JULES + else: + max_heating_demand = max( + design_period_energy_demands[cte.DOMESTIC_HOT_WATER]['demands']) / cte.WATTS_HOUR_TO_JULES + + storage_component['heating_coil_capacity'] = polynomial_mutation_operator( + storage_component['heating_coil_capacity'], 0, max_heating_demand) + + return individual + + def fast_non_dominated_sorting(self): + population = self.population + s = [[] for _ in range(len(population))] + n = [0] * len(population) + rank = [0] * len(population) + fronts = [[]] + for p in range(len(population)): + for q in range(len(population)): + if self.domination_status(population[p], population[q]): + s[p].append(q) + elif self.domination_status(population[q], population[p]): + n[p] += 1 + if n[p] == 0: + rank[p] = 0 + fronts[0].append(p) + i = 0 + while fronts[i]: + next_front = set() + for p in fronts[i]: + for q in s[p]: + n[q] -= 1 + if n[q] == 0: + rank[q] = i + 1 + next_front.add(q) + i += 1 + fronts.append(list(next_front)) + del fronts[-1] + for (j, front) in enumerate(fronts): + for i in front: + self.population[i].rank = j + 1 + return fronts + + def calculate_crowding_distance(self, front, crowding_distance): + for j in range(len(self.population[0].fitness_score)): + sorted_front = sorted(front, key=lambda x: self.population[x].individual['fitness_score'][j]) + print(sorted_front) + # Set distances to finite large numbers rather than `inf` + crowding_distance[sorted_front[0]] = float(1e12) + crowding_distance[sorted_front[-1]] = float(1e12) + objective_min = self.population[sorted_front[0]].individual['fitness_score'][j] + objective_max = self.population[sorted_front[-1]].individual['fitness_score'][j] + if objective_max != objective_min: + for i in range(1, len(sorted_front) - 1): + crowding_distance[sorted_front[i]] += ( + (self.population[sorted_front[i + 1]].individual['fitness_score'][j] - + self.population[sorted_front[i - 1]].individual['fitness_score'][j]) / + (objective_max - objective_min)) + return crowding_distance + + @staticmethod + def domination_status(individual1, individual2): + # Extract the list of objectives for both individuals + objectives1 = individual1.individual['fitness_score'] + objectives2 = individual2.individual['fitness_score'] + + # Ensure both individuals have the same number of objectives + assert len(objectives1) == len(objectives2), "Both individuals must have the same number of objectives" + + # Flags to check if one dominates the other + better_in_all = True # Whether individual1 is better in all objectives + strictly_better_in_at_least_one = False # Whether individual1 is strictly better in at least one objective + + for obj1, obj2 in zip(objectives1, objectives2): + if obj1 > obj2: + better_in_all = False # individual1 is worse in at least one objective + if obj1 < obj2: + strictly_better_in_at_least_one = True # individual1 is better in at least one objective + result = True if better_in_all and strictly_better_in_at_least_one else False + return result + def solve_ga(self, building, energy_system): self.initialize_population(building, energy_system) - for individual in self.population: - print(individual.individual) + progeny_population = [] + progeny_creation_time_start = time.time() + while len(progeny_population) < self.population_size: + parent1, parent2 = random.choice(self.population), random.choice(self.population) + child1, child2 = self.sbx_crossover(parent1, parent2) + self.polynomial_mutation(child1.individual, building, energy_system) + self.polynomial_mutation(child2.individual, building, energy_system) + child1.score_evaluation() + child2.score_evaluation() + progeny_population.extend([child1, child2]) + progeny_creation_duration = time.time() - progeny_creation_time_start + print(f"progeny population creation took {progeny_creation_duration:.2f} seconds") + self.population.extend(progeny_population) print([ind.fitness_score for ind in self.population]) + ns_start_time = time.time() + fronts = self.fast_non_dominated_sorting() + ns_duration = time.time() - ns_start_time + print(f"non-dominated sorting took {ns_duration:.2f} seconds") + self.crowding_distances = [0] * len(self.population) + crowding_d_start_time = time.time() + for front in fronts: + print(front) + self.crowding_distance = self.calculate_crowding_distance(front=front, crowding_distance=self.crowding_distances) + print(self.crowding_distance) + crowding_duration = time.time() - crowding_d_start_time + print(f"crowding d calculation took {crowding_duration:.2f} seconds") + self.plot_pareto_front(self.population) + + @staticmethod + def plot_pareto_front(pareto_population): + # Extract LCC and LCE for plotting + lcc_values = [individual.individual['lcc'] for individual in pareto_population] + lce_values = [individual.individual['total_energy_consumption'] for individual in pareto_population] + plt.figure(figsize=(10, 6)) + plt.scatter(lcc_values, lce_values, color='blue', label='Pareto Front', alpha=0.6, edgecolors='w', s=80) + plt.title('Pareto Front for Life Cycle Cost vs Life Cycle Energy') + plt.xlabel('Life Cycle Cost (LCC)') + plt.ylabel('Life Cycle Energy (LCE)') + plt.grid(True) + plt.legend() + plt.show() + + + +