fix: rethinking of multi objective optimization started

This commit is contained in:
Saeed Ranjbar 2024-10-17 17:08:44 +02:00
parent 368ab757a6
commit 227b70b451
7 changed files with 584 additions and 232 deletions

View File

@ -0,0 +1,418 @@
import copy
import math
import random
import hub.helpers.constants as cte
from energy_system_modelling_package.energy_system_modelling_factories.system_sizing_methods.genetic_algorithm.individual import \
Individual
import matplotlib.pyplot as plt
class MultiObjectiveGeneticAlgorithm:
def __init__(self, population_size=100, generations=50, crossover_rate=0.8, mutation_rate=0.1,
optimization_scenario=None, output_path=None):
self.population_size = population_size
self.population = []
self.generations = generations
self.crossover_rate = crossover_rate
self.mutation_rate = mutation_rate
self.optimization_scenario = optimization_scenario
self.list_of_solutions = []
self.best_solution = None
self.best_solution_generation = None
self.output_path = output_path
# Initialize Population
def initialize_population(self, building, energy_system):
design_period_energy_demands = self.design_period_identification(building)
attempts = 0
max_attempts = self.population_size * 5
while len(self.population) < self.population_size and attempts < max_attempts:
individual = Individual(building=building,
energy_system=energy_system,
design_period_energy_demands=design_period_energy_demands,
optimization_scenario=self.optimization_scenario)
individual.initialization()
attempts += 1
if self.initial_population_feasibility_check(individual, energy_system.demand_types,
design_period_energy_demands):
self.population.append(individual)
if len(self.population) < self.population_size:
raise RuntimeError(f"Could not generate a feasible population of size {self.population_size}. "
f"Only {len(self.population)} feasible individuals were generated.")
@staticmethod
def initial_population_feasibility_check(individual, demand_types, design_period_demands):
total_heating_capacity = sum(
component['heating_capacity'] for component in individual.individual['Generation Components']
if component['heating_capacity'] is not None
)
total_cooling_capacity = sum(
component['cooling_capacity'] for component in individual.individual['Generation Components']
if component['cooling_capacity'] is not None
)
max_heating_demand = max(design_period_demands[cte.HEATING]['demands']) / cte.WATTS_HOUR_TO_JULES
max_cooling_demand = max(design_period_demands[cte.COOLING]['demands']) / cte.WATTS_HOUR_TO_JULES
max_dhw_demand = max(design_period_demands[cte.DOMESTIC_HOT_WATER]['demands']) / cte.WATTS_HOUR_TO_JULES
if cte.HEATING in demand_types and total_heating_capacity < 0.5 * max_heating_demand:
return False
if cte.DOMESTIC_HOT_WATER in demand_types and total_heating_capacity < 0.5 * max_dhw_demand:
return False
if cte.COOLING in demand_types and total_cooling_capacity < 0.5 * max_cooling_demand:
return False
total_volume = sum(
component['volume'] for component in individual.individual['Energy Storage Components']
if component['volume'] is not None
)
max_storage_volume = individual.available_space * 0.1
if total_volume > max_storage_volume:
return False
return True
def nsga2_selection(self, population, fronts, crowding_distances):
new_population = []
i = 0
while len(new_population) + len(fronts[i]) <= self.population_size:
for index in fronts[i]:
# Skip individuals with infinite fitness values to avoid propagation
if not math.isinf(self.population[index].individual['fitness_score'][0]) and \
not math.isinf(self.population[index].individual['fitness_score'][1]):
new_population.append(population[index])
i += 1
if i >= len(fronts):
break
if len(new_population) < self.population_size and i < len(fronts):
sorted_front = sorted(fronts[i], key=lambda x: crowding_distances[i][x], reverse=True)
for index in sorted_front:
if len(new_population) < self.population_size:
if not math.isinf(self.population[index].individual['fitness_score'][0]) and \
not math.isinf(self.population[index].individual['fitness_score'][1]):
new_population.append(population[index])
else:
break
return new_population
def fast_non_dominated_sort(self):
population_size = self.population_size
s = [[] for _ in range(population_size)]
front = [[]]
n = [0] * population_size
rank = [0] * population_size
for p in range(population_size):
s[p] = []
n[p] = 0
for q in range(population_size):
if self.dominates(self.population[p], self.population[q]):
s[p].append(q)
elif self.dominates(self.population[q], self.population[p]):
n[p] += 1
if n[p] == 0:
rank[p] = 0
front[0].append(p)
i = 0
while front[i]:
next_front = set()
for p in front[i]:
for q in s[p]:
n[q] -= 1
if n[q] == 0:
rank[q] = i + 1
next_front.add(q)
i += 1
front.append(list(next_front))
del front[-1]
return front
@staticmethod
def dominates(individual1, individual2):
lcc1, lce1 = individual1.individual['fitness_score']
lcc2, lce2 = individual2.individual['fitness_score']
return (lcc1 <= lcc2 and lce1 <= lce2) and (lcc1 < lcc2 or lce1 < lce2)
def calculate_crowding_distance(self, front):
crowding_distance = [0] * len(self.population)
for objective in ['lcc', 'total_energy_consumption']:
sorted_front = sorted(front, key=lambda x: self.population[x].individual[objective])
# Set distances to finite large numbers rather than `inf`
crowding_distance[sorted_front[0]] = float(1e9)
crowding_distance[sorted_front[-1]] = float(1e9)
objective_min = self.population[sorted_front[0]].individual[objective]
objective_max = self.population[sorted_front[-1]].individual[objective]
if objective_max != objective_min:
for i in range(1, len(sorted_front) - 1):
crowding_distance[sorted_front[i]] += (
(self.population[sorted_front[i + 1]].individual[objective] -
self.population[sorted_front[i - 1]].individual[objective]) /
(objective_max - objective_min))
return crowding_distance
def crossover(self, parent1, parent2):
"""
Crossover between two parents to produce two children.
swaps generation components and storage components between the two parents with a 50% chance.
:param parent1: First parent individual.
:param parent2: second parent individual.
:return: Two child individuals (child1 and child2).
"""
if random.random() < self.crossover_rate:
# Deep copy of the parents to create children
child1, child2 = copy.deepcopy(parent1), copy.deepcopy(parent2)
# Crossover for Generation Components
for i in range(len(parent1.individual['Generation Components'])):
if random.random() < 0.5:
# swap the entire generation component
child1.individual['Generation Components'][i], child2.individual['Generation Components'][i] = (
child2.individual['Generation Components'][i],
child1.individual['Generation Components'][i]
)
# Crossover for Energy storage Components
for i in range(len(parent1.individual['Energy Storage Components'])):
if random.random() < 0.5:
# swap the entire storage component
child1.individual['Energy Storage Components'][i], child2.individual['Energy Storage Components'][i] = (
child2.individual['Energy Storage Components'][i],
child1.individual['Energy Storage Components'][i]
)
return child1, child2
else:
# If crossover does not happen, return copies of the original parents
return copy.deepcopy(parent1), copy.deepcopy(parent2)
def mutate(self, individual, building, energy_system):
"""
Mutates the individual's generation and storage components.
- `individual`: The individual to mutate (contains generation and storage components).
- `building`: Building data that contains constraints such as peak heating load and available space.
Returns the mutated individual.
"""
design_period_energy_demands = self.design_period_identification(building)
# Mutate Generation Components
for generation_component in individual['Generation Components']:
if random.random() < self.mutation_rate:
if (generation_component['nominal_heating_efficiency'] is not None and cte.HEATING or cte.DOMESTIC_HOT_WATER in
energy_system.demand_types):
# Mutate heating capacity
if cte.HEATING in energy_system.demand_types:
generation_component['heating_capacity'] = random.uniform(
0, max(design_period_energy_demands[cte.HEATING]['demands']) / cte.WATTS_HOUR_TO_JULES)
else:
generation_component['heating_capacity'] = random.uniform(
0, max(design_period_energy_demands[cte.DOMESTIC_HOT_WATER]['demands']) / cte.WATTS_HOUR_TO_JULES)
if generation_component['nominal_cooling_efficiency'] is not None and cte.COOLING in energy_system.demand_types:
# Mutate cooling capacity
generation_component['cooling_capacity'] = random.uniform(
0, max(design_period_energy_demands[cte.COOLING]['demands']) / cte.WATTS_HOUR_TO_JULES)
# Mutate storage Components
for storage_component in individual['Energy Storage Components']:
if random.random() < self.mutation_rate:
if storage_component['type'] == f'{cte.THERMAL}_storage':
# Mutate the volume of thermal storage
max_available_space = 0.01 * building.volume / building.storeys_above_ground
storage_component['volume'] = random.uniform(0, max_available_space)
if storage_component['heating_coil_capacity'] is not None:
if cte.HEATING in energy_system.demand_types:
storage_component['heating_coil_capacity'] = random.uniform(0, max(
design_period_energy_demands[cte.HEATING]['demands']) / cte.WATTS_HOUR_TO_JULES)
else:
storage_component['heating_coil_capacity'] = random.uniform(0, max(
design_period_energy_demands[cte.DOMESTIC_HOT_WATER]['demands']) / cte.WATTS_HOUR_TO_JULES)
return individual
def solve_ga(self, building, energy_system):
self.initialize_population(building, energy_system)
for individual in self.population:
individual.score_evaluation()
pareto_population = []
for generation in range(1, self.generations + 1):
print(f"Generation {generation}")
fronts = self.fast_non_dominated_sort()
# Ensure the front calculation is valid
if not fronts or not fronts[0]:
print("Warning: No valid non-dominated front found.")
continue
# Calculate crowding distances and select individuals
crowding_distances = [self.calculate_crowding_distance(front) for front in fronts]
self.population = self.nsga2_selection(self.population, fronts, crowding_distances)
# Add only valid indices to pareto_population
pareto_population.extend([self.population[i] for i in fronts[0] if i < len(self.population)])
# Check population bounds
if len(pareto_population) > self.population_size:
pareto_population = pareto_population[:self.population_size]
# Generate the next population through crossover and mutation
next_population = []
while len(next_population) < self.population_size:
parent1, parent2 = random.choice(self.population), random.choice(self.population)
child1, child2 = self.crossover(parent1, parent2)
self.mutate(child1.individual, building, energy_system)
self.mutate(child2.individual, building, energy_system)
child1.score_evaluation()
child2.score_evaluation()
next_population.extend([child1, child2])
self.population = next_population[:self.population_size]
# Ensure pareto_population contains the correct non-dominated individuals before TOPSIS
if not pareto_population:
print("No Pareto solutions found during optimization.")
return None
# Recalculate pareto front indices based on updated pareto_population
fronts = self.fast_non_dominated_sort()
pareto_front_indices = fronts[0] if fronts else []
pareto_front_indices = [i for i in pareto_front_indices if i < len(pareto_population)]
print(f"Final pareto_front_indices: {pareto_front_indices}, pareto_population size: {len(pareto_population)}")
if not pareto_front_indices:
print("No valid solution found after TOPSIS due to empty pareto front indices.")
return None
global_pareto_front = [pareto_population[i] for i in pareto_front_indices]
# Get the top N solutions with TOPSIS
top_n = 3 # Adjust this value based on how many top solutions you want to explore
self.best_solution = self.topsis_decision_making(global_pareto_front, top_n=top_n)
# Print the top N solutions
if self.best_solution:
print("Top solutions after TOPSIS:")
for idx, solution in enumerate(self.best_solution, 1):
print(f"Solution {idx}: LCC = {solution.individual['lcc']}, "
f"LCE = {solution.individual['total_energy_consumption']}")
else:
print("No valid solutions found after TOPSIS.")
if pareto_population:
self.plot_pareto_front(pareto_population)
return self.best_solution
@staticmethod
def design_period_identification(building):
def get_start_end_indices(max_day_index, total_days):
if max_day_index > 0 and max_day_index < total_days - 1:
start_index = (max_day_index - 1) * 24
end_index = (max_day_index + 2) * 24
elif max_day_index == 0:
start_index = 0
end_index = (max_day_index + 2) * 24
else:
start_index = (max_day_index - 1) * 24
end_index = total_days * 24
return start_index, end_index
# Calculate daily demands
heating_daily_demands = [sum(building.heating_demand[cte.HOUR][i:i + 24]) for i in
range(0, len(building.heating_demand[cte.HOUR]), 24)]
cooling_daily_demands = [sum(building.cooling_demand[cte.HOUR][i:i + 24]) for i in
range(0, len(building.cooling_demand[cte.HOUR]), 24)]
dhw_daily_demands = [sum(building.domestic_hot_water_heat_demand[cte.HOUR][i:i + 24]) for i in
range(0, len(building.domestic_hot_water_heat_demand[cte.HOUR]), 24)]
# Get the day with maximum demand for each type
heating_max_day = heating_daily_demands.index(max(heating_daily_demands))
cooling_max_day = cooling_daily_demands.index(max(cooling_daily_demands))
dhw_max_day = dhw_daily_demands.index(max(dhw_daily_demands))
# Get the start and end indices for each demand type
heating_start, heating_end = get_start_end_indices(heating_max_day, len(heating_daily_demands))
cooling_start, cooling_end = get_start_end_indices(cooling_max_day, len(cooling_daily_demands))
dhw_start, dhw_end = get_start_end_indices(dhw_max_day, len(dhw_daily_demands))
# Return the design period energy demands
return {
f'{cte.HEATING}': {'demands': building.heating_demand[cte.HOUR][heating_start:heating_end],
'start_index': heating_start, 'end_index': heating_end},
f'{cte.COOLING}': {'demands': building.cooling_demand[cte.HOUR][cooling_start:cooling_end],
'start_index': cooling_start, 'end_index': cooling_end},
f'{cte.DOMESTIC_HOT_WATER}': {'demands': building.domestic_hot_water_heat_demand[cte.HOUR][dhw_start:dhw_end],
'start_index': dhw_start, 'end_index': dhw_end}
}
def topsis_decision_making(self, pareto_front, top_n=5):
"""
Perform TOPSIS decision-making to choose the best solutions from the Pareto front.
:param pareto_front: List of individuals in the Pareto front
:param top_n: Number of top solutions to select based on TOPSIS ranking
:return: List of top N individuals based on TOPSIS ranking
"""
# Filter out infinite values from the pareto front before processing
pareto_front = [ind for ind in pareto_front if all(math.isfinite(v) for v in ind.individual['fitness_score'])]
if not pareto_front:
return None
# Step 1: Normalize the objective functions (cost and energy consumption)
min_lcc = min([ind.individual['lcc'] for ind in pareto_front])
max_lcc = max([ind.individual['lcc'] for ind in pareto_front])
min_lce = min([ind.individual['total_energy_consumption'] for ind in pareto_front])
max_lce = max([ind.individual['total_energy_consumption'] for ind in pareto_front])
normalized_pareto_front = []
for ind in pareto_front:
normalized_lcc = (ind.individual['lcc'] - min_lcc) / (max_lcc - min_lcc) if max_lcc > min_lcc else 0
normalized_lce = (ind.individual['total_energy_consumption'] - min_lce) / (
max_lce - min_lce) if max_lce > min_lce else 0
normalized_pareto_front.append((ind, normalized_lcc, normalized_lce))
# Step 2: Calculate the ideal and worst solutions
ideal_solution = (0, 0) # Ideal is minimum LCC and minimum LCE (0, 0 after normalization)
worst_solution = (1, 1) # Worst is maximum LCC and maximum LCE (1, 1 after normalization)
# Step 3: Calculate the distance to the ideal and worst solutions
best_distances = []
worst_distances = []
for ind, normalized_lcc, normalized_lce in normalized_pareto_front:
distance_to_ideal = math.sqrt(
(normalized_lcc - ideal_solution[0]) ** 2 + (normalized_lce - ideal_solution[1]) ** 2)
distance_to_worst = math.sqrt(
(normalized_lcc - worst_solution[0]) ** 2 + (normalized_lce - worst_solution[1]) ** 2)
best_distances.append(distance_to_ideal)
worst_distances.append(distance_to_worst)
# Step 4: Calculate relative closeness to the ideal solution
similarity = [worst / (best + worst) for best, worst in zip(best_distances, worst_distances)]
# Step 5: Select the top N individuals with the highest similarity scores
top_indices = sorted(range(len(similarity)), key=lambda i: similarity[i], reverse=True)[:top_n]
top_solutions = [pareto_front[i] for i in top_indices]
# Plot the similarity scores
self.plot_topsis_similarity(similarity)
return top_solutions
@staticmethod
def plot_topsis_similarity(similarity):
"""
Plot the TOPSIS similarity scores for visualization.
:param similarity: List of similarity scores for each individual in the Pareto front
"""
plt.figure(figsize=(10, 6))
plt.plot(range(len(similarity)), similarity, 'bo-', label='TOPSIS Similarity Scores')
plt.xlabel('Pareto Front Solution Index')
plt.ylabel('Similarity Score')
plt.title('TOPSIS Similarity Scores for Pareto Front Solutions')
plt.legend()
plt.grid(True)
plt.show()
@staticmethod
def plot_pareto_front(pareto_population):
# Extract LCC and LCE for plotting
lcc_values = [individual.individual['lcc'] for individual in pareto_population]
lce_values = [individual.individual['total_energy_consumption'] for individual in pareto_population]
plt.figure(figsize=(10, 6))
plt.scatter(lcc_values, lce_values, color='blue', label='Pareto Front', alpha=0.6, edgecolors='w', s=80)
plt.title('Pareto Front for Life Cycle Cost vs Life Cycle Energy')
plt.xlabel('Life Cycle Cost (LCC)')
plt.ylabel('Life Cycle Energy (LCE)')
plt.grid(True)
plt.legend()
plt.show()

View File

@ -0,0 +1,94 @@
import copy
import math
import random
import hub.helpers.constants as cte
from energy_system_modelling_package.energy_system_modelling_factories.system_sizing_methods.genetic_algorithm.individual import \
Individual
import matplotlib.pyplot as plt
import time
class MultiObjectiveGeneticAlgorithm:
def __init__(self, population_size=100, generations=50, crossover_rate=0.8, mutation_rate=0.1,
optimization_scenario=None, output_path=None):
self.population_size = population_size
self.population = []
self.generations = generations
self.crossover_rate = crossover_rate
self.mutation_rate = mutation_rate
self.optimization_scenario = optimization_scenario
self.list_of_solutions = []
self.best_solution = None
self.best_solution_generation = None
self.output_path = output_path
# Initialize Population
def initialize_population(self, building, energy_system):
design_period_start_time = time.time()
design_period_energy_demands = self.design_period_identification(building)
design_period_time = time.time() - design_period_start_time
print(f"design period identification took {design_period_time:.2f} seconds")
initializing_time_start = time.time()
attempts = 0
max_attempts = self.population_size * 20
while len(self.population) < self.population_size and attempts < max_attempts:
individual = Individual(building=building,
energy_system=energy_system,
design_period_energy_demands=design_period_energy_demands,
optimization_scenario=self.optimization_scenario)
individual.initialization()
individual.score_evaluation()
attempts += 1
if individual.feasibility:
self.population.append(individual)
if len(self.population) < self.population_size:
raise RuntimeError(f"Could not generate a feasible population of size {self.population_size}. "
f"Only {len(self.population)} feasible individuals were generated.")
initializing_time = time.time() - initializing_time_start
print(f"initializing took {initializing_time:.2f} seconds")
@staticmethod
def design_period_identification(building):
def get_start_end_indices(max_day_index, total_days):
if max_day_index > 0 and max_day_index < total_days - 1:
start_index = (max_day_index - 1) * 24
end_index = (max_day_index + 2) * 24
elif max_day_index == 0:
start_index = 0
end_index = (max_day_index + 2) * 24
else:
start_index = (max_day_index - 1) * 24
end_index = total_days * 24
return start_index, end_index
# Calculate daily demands
heating_daily_demands = [sum(building.heating_demand[cte.HOUR][i:i + 24]) for i in
range(0, len(building.heating_demand[cte.HOUR]), 24)]
cooling_daily_demands = [sum(building.cooling_demand[cte.HOUR][i:i + 24]) for i in
range(0, len(building.cooling_demand[cte.HOUR]), 24)]
dhw_daily_demands = [sum(building.domestic_hot_water_heat_demand[cte.HOUR][i:i + 24]) for i in
range(0, len(building.domestic_hot_water_heat_demand[cte.HOUR]), 24)]
# Get the day with maximum demand for each type
heating_max_day = heating_daily_demands.index(max(heating_daily_demands))
cooling_max_day = cooling_daily_demands.index(max(cooling_daily_demands))
dhw_max_day = dhw_daily_demands.index(max(dhw_daily_demands))
# Get the start and end indices for each demand type
heating_start, heating_end = get_start_end_indices(heating_max_day, len(heating_daily_demands))
cooling_start, cooling_end = get_start_end_indices(cooling_max_day, len(cooling_daily_demands))
dhw_start, dhw_end = get_start_end_indices(dhw_max_day, len(dhw_daily_demands))
# Return the design period energy demands
return {
f'{cte.HEATING}': {'demands': building.heating_demand[cte.HOUR][heating_start:heating_end],
'start_index': heating_start, 'end_index': heating_end},
f'{cte.COOLING}': {'demands': building.cooling_demand[cte.HOUR][cooling_start:cooling_end],
'start_index': cooling_start, 'end_index': cooling_end},
f'{cte.DOMESTIC_HOT_WATER}': {'demands': building.domestic_hot_water_heat_demand[cte.HOUR][dhw_start:dhw_end],
'start_index': dhw_start, 'end_index': dhw_end}
}
def solve_ga(self, building, energy_system):
self.initialize_population(building, energy_system)
for individual in self.population:
print(individual.individual)
print([ind.fitness_score for ind in self.population])

View File

@ -6,7 +6,7 @@ from energy_system_modelling_package.energy_system_modelling_factories.system_si
Individual
class GeneticAlgorithm:
class SingleObjectiveGeneticAlgorithm:
def __init__(self, population_size=100, generations=20, crossover_rate=0.8, mutation_rate=0.1,
optimization_scenario=None, output_path=None):
self.population_size = population_size
@ -38,10 +38,11 @@ class GeneticAlgorithm:
energy_system=energy_system,
design_period_energy_demands=design_period_energy_demands,
optimization_scenario=self.optimization_scenario)
individual.initialization()
attempts += 1
# Enhanced feasibility check
# Enhanced feasibility check
if self.initial_population_feasibility_check(individual, energy_system.demand_types, design_period_energy_demands):
self.population.append(individual)
@ -117,142 +118,6 @@ class GeneticAlgorithm:
selected.append(copy.deepcopy(self.population[j]))
return selected
def nsga2_selection(self, population, fronts, crowding_distance):
"""
The selection used in multi-objective optimization based on non-dominated sorting and crowding distance
:return:
"""
new_population = []
i = 0
# Go through each front until we have enough individuals for the new population
while len(new_population) + len(fronts[i]) <= self.population_size:
for index in fronts[i]:
new_population.append(population[index])
i += 1
# Ensure we do not go out of bounds
if i >= len(fronts):
break
# If we still need more individuals to fill the population
if len(new_population) < self.population_size and i < len(fronts):
# Sort the individuals in the current front by crowding distance in descending order
fronts[i].sort(key=lambda x: crowding_distance[x], reverse=True)
for index in fronts[i]:
if len(new_population) < self.population_size:
new_population.append(population[index])
else:
break
return new_population
def fast_non_dominated_sort(self):
"""
Perform non-dominated sorting on the population based on multiple objective fitness scores
(e.g., life cycle cost (LCC) and energy consumption (LCE)).
This method assigns each individual in the population to different fronts based on dominance.
Returns:
A list of fronts where each front is a list of individual indices.
Front 0 is the best (non-dominated) front, followed by worse dominated fronts.
"""
# Number of individuals in the population
population_size = self.population_size
# s[p] contains individuals dominated by individual p
s = [[] for _ in range(population_size)]
# front[i] contains individuals in the i-th front (i = 0 is the best front)
front = [[]]
# n[p] is the count of individuals that dominate individual p
n = [0] * population_size
# rank[p] stores the front rank of individual p
rank = [0] * population_size
# Loop through each individual in the population (indexed by p)
for p in range(population_size):
s[p] = [] # Initialize the list of individuals p dominates
n[p] = 0 # Initialize the count of individuals dominating p
for q in range(population_size):
if self.dominates(self.population[p], self.population[q]):
# If individual p dominates individual q, add q to s[p]
s[p].append(q)
elif self.dominates(self.population[q], self.population[p]):
# If individual q dominates individual p, increment n[p]
n[p] += 1
# If no one dominates p, assign p to the first front (rank 0)
if n[p] == 0:
rank[p] = 0
front[0].append(p)
# Begin sorting into subsequent fronts
i = 0
while front[i]:
q = [] # Next front to populate
for p in front[i]: # For each individual p in the current front
for t in s[p]: # For each individual q dominated by p
# Ensure that `t` is a valid index (integer)
if isinstance(t, int) and 0 <= t < population_size:
n[t] -= 1 # Decrement n[q] since p is processed
if n[t] == 0: # If q is not dominated by any other individuals
rank[t] = i + 1 # Assign q to the next front
if t not in q: # Add q to the next front if not already added
q.append(t)
else:
# Debugging log to help identify the issue
print(f"Invalid value for t: {t}, expected an integer.")
i += 1 # Move to the next front
front.append(q) # Add the new front to the list of fronts
# Remove the last empty front (no individuals in it)
del front[-1]
# Return the list of fronts, each front containing indices of non-dominated individuals
return front
@staticmethod
def dominates(individual1, individual2):
"""
Checks if individual1 dominates individual2.
A solution dominates another if it is no worse in all objectives (LCC and LCE)
and better in at least one.
Args:
individual1: First individual (to check if it dominates individual2).
individual2: second individual (to check if it's dominated by individual1).
Returns:
True if individual1 dominates individual2, False otherwise.
"""
# Extract life cycle cost (LCC) and life cycle energy (LCE) from both individuals
lcc1, lce1 = individual1.individual['fitness_score']
lcc2, lce2 = individual2.individual['fitness_score']
# Check if individual1 dominates individual2
return (lcc1 <= lcc2 and lce1 <= lce2) and (lcc1 < lcc2 or lce1 < lce2)
def calculate_crowding_distance(self, front):
"""
Calculate the crowding distance for individuals in a given front based on objective values
(life cycle cost and energy consumption).
:param front: A list of indices of individuals in the current front.
:return crowding_distance: A list of crowding distances for individuals in the front.
"""
# Initialize distance list for the entire population
crowding_distance = [0] * len(self.population)
# There are two objectives: LCC (self.population[i].individual['lcc']) and
# LCE (self.population[i].individual['total_energy_consumption'])
num_objectives = 2
# For each objective (LCC and LCE)
for m, objective in enumerate(['lcc', 'total_energy_consumption']):
# Sort the front based on the objective 'm'
front = sorted(front, key=lambda x: self.population[x].individual[objective])
# Assign infinite distance to boundary individuals
crowding_distance[front[0]] = float('inf')
crowding_distance[front[-1]] = float('inf')
# Calculate crowding distance for individuals in between
for i in range(1, len(front) - 1):
objective_min = self.population[front[0]].individual[objective]
objective_max = self.population[front[-1]].individual[objective]
if objective_max != objective_min: # Avoid division by zero
crowding_distance[front[i]] += (self.population[front[i + 1]].individual[objective] -
self.population[front[i - 1]].individual[objective]) / (
objective_max - objective_min)
return crowding_distance
def crossover(self, parent1, parent2):
"""
Crossover between two parents to produce two children.
@ -264,21 +129,21 @@ class GeneticAlgorithm:
:return: Two child individuals (child1 and child2).
"""
if random.random() < self.crossover_rate:
# Deep copy of the parents to create children
# Deep copy of the parents to create children
child1, child2 = copy.deepcopy(parent1), copy.deepcopy(parent2)
# Crossover for Generation Components
# Crossover for Generation Components
for i in range(len(parent1.individual['Generation Components'])):
if random.random() < 0.5:
# swap the entire generation component
# swap the entire generation component
child1.individual['Generation Components'][i], child2.individual['Generation Components'][i] = (
child2.individual['Generation Components'][i],
child1.individual['Generation Components'][i]
)
# Crossover for Energy storage Components
# Crossover for Energy storage Components
for i in range(len(parent1.individual['Energy Storage Components'])):
if random.random() < 0.5:
# swap the entire storage component
# swap the entire storage component
child1.individual['Energy Storage Components'][i], child2.individual['Energy Storage Components'][i] = (
child2.individual['Energy Storage Components'][i],
child1.individual['Energy Storage Components'][i]
@ -286,7 +151,7 @@ class GeneticAlgorithm:
return child1, child2
else:
# If crossover does not happen, return copies of the original parents
# If crossover does not happen, return copies of the original parents
return copy.deepcopy(parent1), copy.deepcopy(parent2)
def mutate(self, individual, building, energy_system):
@ -349,89 +214,46 @@ class GeneticAlgorithm:
# step 2: Evaluate the initial population
for individual in self.population:
individual.score_evaluation()
# Create a list to store all non-dominated solutions from each generation
pareto_population = []
if self.optimization_scenario == 'cost_energy_consumption':
# NSGA-II optimization loop
for generation in range(1, self.generations + 1):
print(f"Generation {generation}")
# Step 3: Perform non-dominated sorting
fronts = self.fast_non_dominated_sort()
# Step 4: Calculate crowding distances for all fronts
crowding_distances = []
for front in fronts:
front = sorted(front, key=lambda x: self.population[x].individual['lcc'])
crowding_distances.extend(self.calculate_crowding_distance(front))
# Step 5: Selection using NSGA-II
self.population = self.nsga2_selection(self.population, fronts, crowding_distances)
print([individual.fitness_score for individual in self.population])
# Step 6: Add non-dominated solutions (front[0]) to the Pareto population
pareto_population.extend([self.population[i] for i in fronts[0]])
print(pareto_population)
# Step 7: Apply crossover and mutation to create offspring
next_population = []
while len(next_population) < self.population_size:
parent1 = random.choice(self.population)
parent2 = random.choice(self.population)
child1, child2 = self.crossover(parent1, parent2)
self.mutate(child1.individual, building, energy_system)
self.mutate(child2.individual, building, energy_system)
child1.score_evaluation()
child2.score_evaluation()
next_population.extend([child1, child2])
self.population = next_population[:self.population_size]
# Step 8: Perform non-dominated sorting on pareto_population to get the global Pareto front
self.population = pareto_population
pareto_front_indices = self.fast_non_dominated_sort()[0]
global_pareto_front = [pareto_population[i] for i in pareto_front_indices]
# Step 9: Use TOPSIS to find the best solution from the global Pareto front
self.best_solution = self.topsis_decision_making(global_pareto_front)
# Print the best solution
print(f"Best solution after TOPSIS: LCC = {self.best_solution.individual['lcc']}, "
f"LCE = {self.best_solution.individual['total_energy_consumption']}")
return self.best_solution
else:
# step 3: Order population based on fitness scores
# step 3: Order population based on fitness scores
self.order_population()
print([individual.fitness_score for individual in self.population])
# Track the best solution
self.best_solution = self.population[0]
self.best_solution_generation = 0
self.list_of_solutions.append(copy.deepcopy(self.best_solution.individual))
# step 4: Run GA for a fixed number of generations
for generation in range(1, self.generations):
print(f"Generation {generation}")
# selection (using tournament selection)
selected_population = self.tournament_selection()
# Create the next generation through crossover and mutation
next_population = []
for i in range(0, self.population_size, 2):
parent1 = selected_population[i]
parent2 = selected_population[i + 1] if (i + 1) < len(selected_population) else selected_population[0]
# step 5: Apply crossover
child1, child2 = self.crossover(parent1, parent2)
# step 6: Apply mutation
self.mutate(child1.individual, building, energy_system)
self.mutate(child2.individual, building, energy_system)
# step 7: Evaluate the children
child1.score_evaluation()
child2.score_evaluation()
next_population.extend([child1, child2])
# Replace old population with the new one
self.population = next_population
# step 8: sort the new population based on fitness
self.order_population()
print([individual.fitness_score for individual in self.population])
# Track the best solution
self.best_solution = self.population[0]
self.best_solution_generation = 0
self.list_of_solutions.append(copy.deepcopy(self.best_solution.individual))
# step 4: Run GA for a fixed number of generations
for generation in range(1, self.generations):
print(f"Generation {generation}")
# selection (using tournament selection)
selected_population = self.tournament_selection()
# Create the next generation through crossover and mutation
next_population = []
for i in range(0, self.population_size, 2):
parent1 = selected_population[i]
parent2 = selected_population[i + 1] if (i + 1) < len(selected_population) else selected_population[0]
# step 5: Apply crossover
child1, child2 = self.crossover(parent1, parent2)
# step 6: Apply mutation
self.mutate(child1.individual, building, energy_system)
self.mutate(child2.individual, building, energy_system)
# step 7: Evaluate the children
child1.score_evaluation()
child2.score_evaluation()
next_population.extend([child1, child2])
# Replace old population with the new one
self.population = next_population
# step 8: sort the new population based on fitness
self.order_population()
print([individual.fitness_score for individual in self.population])
# Track the best solution found in this generation
if self.population[0].individual['fitness_score'] < self.best_solution.individual['fitness_score']:
self.best_solution = self.population[0]
self.best_solution_generation = generation
# store the best solution in the list of solutions
self.list_of_solutions.append(copy.deepcopy(self.population[0].individual))
print(f"Best solution found in generation {self.best_solution_generation}")
print(f"Best solution: {self.best_solution.individual}")
return self.best_solution
# Track the best solution found in this generation
if self.population[0].individual['fitness_score'] < self.best_solution.individual['fitness_score']:
self.best_solution = self.population[0]
self.best_solution_generation = generation
# store the best solution in the list of solutions
self.list_of_solutions.append(copy.deepcopy(self.population[0].individual))
print(f"Best solution found in generation {self.best_solution_generation}")
print(f"Best solution: {self.best_solution.individual}")
return self.best_solution
@staticmethod
def topsis_decision_making(pareto_front):
@ -517,3 +339,4 @@ class GeneticAlgorithm:
f'{cte.DOMESTIC_HOT_WATER}': {'demands': building.domestic_hot_water_heat_demand[cte.HOUR][dhw_start:dhw_end],
'start_index': dhw_start, 'end_index': dhw_end}
}

View File

@ -1,6 +1,6 @@
import hub.helpers.constants as cte
from energy_system_modelling_package.energy_system_modelling_factories.system_sizing_methods.genetic_algorithm.genetic_algorithm import \
GeneticAlgorithm
from energy_system_modelling_package.energy_system_modelling_factories.system_sizing_methods.genetic_algorithm.single_objective_genetic_algorithm import \
SingleObjectiveGeneticAlgorithm
class OptimalSizing:
@ -26,7 +26,7 @@ class OptimalSizing:
elif cte.COOLING in energy_system.demand_types:
energy_system.generation_systems[0].nominal_cooling_output = building.cooling_peak_load[cte.YEAR][0]
else:
optimized_system = GeneticAlgorithm(optimization_scenario=self.optimization_scenario).solve_ga(building, energy_system)
optimized_system = SingleObjectiveGeneticAlgorithm(optimization_scenario=self.optimization_scenario).solve_ga(building, energy_system)
for generation_system in energy_system.generation_systems:
system_type = generation_system.system_type
for generation_component in optimized_system.individual['Generation Components']:

View File

@ -22,6 +22,7 @@ class PvGenerationSystem(GenerationSystem):
self._nominal_radiation = None
self._standard_test_condition_cell_temperature = None
self._standard_test_condition_maximum_power = None
self._standard_test_condition_radiation = None
self._cell_temperature_coefficient = None
self._width = None
self._height = None
@ -143,6 +144,22 @@ class PvGenerationSystem(GenerationSystem):
"""
self._standard_test_condition_maximum_power = value
@property
def standard_test_condition_radiation(self):
"""
Get standard test condition radiation in W/m2
:return: float
"""
return self._standard_test_condition_radiation
@standard_test_condition_radiation.setter
def standard_test_condition_radiation(self, value):
"""
Set standard test condition radiation in W/m2
:param value: float
"""
self._standard_test_condition_radiation = value
@property
def cell_temperature_coefficient(self):
"""

View File

@ -3,7 +3,7 @@ import subprocess
from building_modelling.ep_run_enrich import energy_plus_workflow
from energy_system_modelling_package.energy_system_modelling_factories.montreal_energy_system_archetype_modelling_factory import \
MontrealEnergySystemArchetypesSimulationFactory
from energy_system_modelling_package.energy_system_modelling_factories.system_sizing_methods.genetic_algorithm.genetic_algorithm import GeneticAlgorithm
from energy_system_modelling_package.energy_system_modelling_factories.system_sizing_methods.genetic_algorithm.single_objective_genetic_algorithm import SingleObjectiveGeneticAlgorithm
from hub.imports.geometry_factory import GeometryFactory
from hub.helpers.dictionaries import Dictionaries
from hub.imports.construction_factory import ConstructionFactory

View File

@ -3,7 +3,7 @@ import subprocess
from building_modelling.ep_run_enrich import energy_plus_workflow
from energy_system_modelling_package.energy_system_modelling_factories.montreal_energy_system_archetype_modelling_factory import \
MontrealEnergySystemArchetypesSimulationFactory
from energy_system_modelling_package.energy_system_modelling_factories.system_sizing_methods.genetic_algorithm.genetic_algorithm import GeneticAlgorithm
from energy_system_modelling_package.energy_system_modelling_factories.system_sizing_methods.genetic_algorithm.multi_objective_genetic_algorithm_rethinking import MultiObjectiveGeneticAlgorithm
from hub.imports.geometry_factory import GeometryFactory
from hub.helpers.dictionaries import Dictionaries
from hub.imports.construction_factory import ConstructionFactory
@ -54,5 +54,5 @@ energy_plus_workflow(city, energy_plus_output_path)
random_assignation.call_random(city.buildings, random_assignation.residential_new_systems_percentage)
EnergySystemsFactory('montreal_future', city).enrich()
for building in city.buildings:
energy_system = building.energy_systems[-1]
GeneticAlgorithm(optimization_scenario='cost').solve_ga(building, energy_system)
energy_system = building.energy_systems[1]
MultiObjectiveGeneticAlgorithm(optimization_scenario='cost_energy_consumption').solve_ga(building, energy_system)