fix: multi objective optimization work continued
This commit is contained in:
parent
227b70b451
commit
57c23f290f
|
@ -48,6 +48,8 @@ class Individual:
|
|||
self.costs = self.costs_archetype()
|
||||
self.feasibility = True
|
||||
self.fitness_score = 0
|
||||
self.rank = 0
|
||||
self.crowding_distance = 0
|
||||
self.individual = {}
|
||||
|
||||
def system_components(self):
|
||||
|
|
|
@ -9,7 +9,7 @@ import time
|
|||
|
||||
|
||||
class MultiObjectiveGeneticAlgorithm:
|
||||
def __init__(self, population_size=100, generations=50, crossover_rate=0.8, mutation_rate=0.1,
|
||||
def __init__(self, population_size=100, generations=50, crossover_rate=0.9, mutation_rate=0.1,
|
||||
optimization_scenario=None, output_path=None):
|
||||
self.population_size = population_size
|
||||
self.population = []
|
||||
|
@ -20,6 +20,7 @@ class MultiObjectiveGeneticAlgorithm:
|
|||
self.list_of_solutions = []
|
||||
self.best_solution = None
|
||||
self.best_solution_generation = None
|
||||
self.crowding_distances = None
|
||||
self.output_path = output_path
|
||||
|
||||
# Initialize Population
|
||||
|
@ -47,7 +48,6 @@ class MultiObjectiveGeneticAlgorithm:
|
|||
initializing_time = time.time() - initializing_time_start
|
||||
print(f"initializing took {initializing_time:.2f} seconds")
|
||||
|
||||
|
||||
@staticmethod
|
||||
def design_period_identification(building):
|
||||
def get_start_end_indices(max_day_index, total_days):
|
||||
|
@ -87,8 +87,228 @@ class MultiObjectiveGeneticAlgorithm:
|
|||
'start_index': dhw_start, 'end_index': dhw_end}
|
||||
}
|
||||
|
||||
def sbx_crossover(self, parent1, parent2):
|
||||
eta_c = 2 # Distribution index for SBX
|
||||
if random.random() < self.crossover_rate:
|
||||
child1, child2 = copy.deepcopy(parent1), copy.deepcopy(parent2)
|
||||
# Crossover for Generation Components
|
||||
for i in range(len(parent1.individual['Generation Components'])):
|
||||
for cap_type in ['heating_capacity', 'cooling_capacity']:
|
||||
if parent1.individual['Generation Components'][i][cap_type] is not None:
|
||||
x1 = parent1.individual['Generation Components'][i][cap_type]
|
||||
x2 = parent2.individual['Generation Components'][i][cap_type]
|
||||
if abs(x1 - x2) > 1e-14: # Avoid division by zero
|
||||
u = random.random()
|
||||
if u <= 0.5:
|
||||
beta = (2 * u) ** (1 / (eta_c + 1))
|
||||
else:
|
||||
beta = (1 / (2 * (1 - u))) ** (1 / (eta_c + 1))
|
||||
|
||||
child1.individual['Generation Components'][i][cap_type] = max(0.1,
|
||||
0.5 * ((1 + beta) * x1 + (1 - beta) * x2))
|
||||
child2.individual['Generation Components'][i][cap_type] = max(0.1,
|
||||
0.5 * ((1 - beta) * x1 + (1 + beta) * x2))
|
||||
# Crossover for Energy Storage Components
|
||||
for i in range(len(parent1.individual['Energy Storage Components'])):
|
||||
for item in ['capacity', 'volume', 'heating_coil_capacity']:
|
||||
if parent1.individual['Energy Storage Components'][i][item] is not None:
|
||||
x1 = parent1.individual['Energy Storage Components'][i][item]
|
||||
x2 = parent2.individual['Energy Storage Components'][i][item]
|
||||
if abs(x1 - x2) > 1e-14: # Avoid division by zero
|
||||
u = random.random()
|
||||
if u <= 0.5:
|
||||
beta = (2 * u) ** (1 / (eta_c + 1))
|
||||
else:
|
||||
beta = (1 / (2 * (1 - u))) ** (1 / (eta_c + 1))
|
||||
threshold1 = random.uniform(0, 0.01) * child1.available_space
|
||||
child1.individual['Energy Storage Components'][i][item] = max(threshold1,
|
||||
0.5 * ((1 + beta) * x1 + (1 - beta) * x2))
|
||||
threshold2 = random.uniform(0, 0.01) * child1.available_space
|
||||
child2.individual['Energy Storage Components'][i][item] = max(threshold2,
|
||||
0.5 * ((1 - beta) * x1 + (1 + beta) * x2))
|
||||
return child1, child2
|
||||
else:
|
||||
return copy.deepcopy(parent1), copy.deepcopy(parent2)
|
||||
|
||||
def polynomial_mutation(self, individual, building, energy_system):
|
||||
"""
|
||||
Mutates the individual's generation and storage components using the Polynomial Mutation Operator.
|
||||
|
||||
- `individual`: The individual to mutate (contains generation and storage components).
|
||||
- `building`: Building data that contains constraints such as peak heating load and available space.
|
||||
|
||||
Returns the mutated individual.
|
||||
"""
|
||||
eta_m = 20 # Mutation distribution index (can be adjusted)
|
||||
design_period_energy_demands = self.design_period_identification(building)
|
||||
|
||||
def polynomial_mutation_operator(value, lower_bound, upper_bound):
|
||||
delta = (value - lower_bound) / (upper_bound - lower_bound)
|
||||
u = random.random()
|
||||
|
||||
if u < 0.5:
|
||||
delta_q = (2 * u) ** (1 / (eta_m + 1)) - 1
|
||||
else:
|
||||
delta_q = 1 - (2 * (1 - u)) ** (1 / (eta_m + 1))
|
||||
|
||||
mutated_value = value + delta_q * (upper_bound - lower_bound)
|
||||
return max(lower_bound, min(mutated_value, upper_bound)) # Ensure it's within bounds
|
||||
|
||||
# Mutate Generation Components
|
||||
for generation_component in individual['Generation Components']:
|
||||
if random.random() < self.mutation_rate:
|
||||
if (generation_component['nominal_heating_efficiency'] is not None and
|
||||
(cte.HEATING or cte.DOMESTIC_HOT_WATER in energy_system.demand_types)):
|
||||
# Mutate heating capacity
|
||||
if cte.HEATING in energy_system.demand_types:
|
||||
max_demand = max(design_period_energy_demands[cte.HEATING]['demands']) / cte.WATTS_HOUR_TO_JULES
|
||||
else:
|
||||
max_demand = max(design_period_energy_demands[cte.DOMESTIC_HOT_WATER]['demands']) / cte.WATTS_HOUR_TO_JULES
|
||||
|
||||
generation_component['heating_capacity'] = polynomial_mutation_operator(
|
||||
generation_component['heating_capacity'], 0, max_demand)
|
||||
|
||||
if generation_component['nominal_cooling_efficiency'] is not None and cte.COOLING in energy_system.demand_types:
|
||||
# Mutate cooling capacity
|
||||
max_cooling_demand = max(design_period_energy_demands[cte.COOLING]['demands']) / cte.WATTS_HOUR_TO_JULES
|
||||
generation_component['cooling_capacity'] = polynomial_mutation_operator(
|
||||
generation_component['cooling_capacity'], 0, max_cooling_demand)
|
||||
|
||||
# Mutate Storage Components
|
||||
for storage_component in individual['Energy Storage Components']:
|
||||
if random.random() < self.mutation_rate:
|
||||
if storage_component['type'] == f'{cte.THERMAL}_storage':
|
||||
# Mutate the volume of thermal storage
|
||||
max_available_space = 0.01 * building.volume / building.storeys_above_ground
|
||||
storage_component['volume'] = polynomial_mutation_operator(
|
||||
storage_component['volume'], 0, max_available_space)
|
||||
|
||||
if storage_component['heating_coil_capacity'] is not None:
|
||||
if cte.HEATING in energy_system.demand_types:
|
||||
max_heating_demand = max(design_period_energy_demands[cte.HEATING]['demands']) / cte.WATTS_HOUR_TO_JULES
|
||||
else:
|
||||
max_heating_demand = max(
|
||||
design_period_energy_demands[cte.DOMESTIC_HOT_WATER]['demands']) / cte.WATTS_HOUR_TO_JULES
|
||||
|
||||
storage_component['heating_coil_capacity'] = polynomial_mutation_operator(
|
||||
storage_component['heating_coil_capacity'], 0, max_heating_demand)
|
||||
|
||||
return individual
|
||||
|
||||
def fast_non_dominated_sorting(self):
|
||||
population = self.population
|
||||
s = [[] for _ in range(len(population))]
|
||||
n = [0] * len(population)
|
||||
rank = [0] * len(population)
|
||||
fronts = [[]]
|
||||
for p in range(len(population)):
|
||||
for q in range(len(population)):
|
||||
if self.domination_status(population[p], population[q]):
|
||||
s[p].append(q)
|
||||
elif self.domination_status(population[q], population[p]):
|
||||
n[p] += 1
|
||||
if n[p] == 0:
|
||||
rank[p] = 0
|
||||
fronts[0].append(p)
|
||||
i = 0
|
||||
while fronts[i]:
|
||||
next_front = set()
|
||||
for p in fronts[i]:
|
||||
for q in s[p]:
|
||||
n[q] -= 1
|
||||
if n[q] == 0:
|
||||
rank[q] = i + 1
|
||||
next_front.add(q)
|
||||
i += 1
|
||||
fronts.append(list(next_front))
|
||||
del fronts[-1]
|
||||
for (j, front) in enumerate(fronts):
|
||||
for i in front:
|
||||
self.population[i].rank = j + 1
|
||||
return fronts
|
||||
|
||||
def calculate_crowding_distance(self, front, crowding_distance):
|
||||
for j in range(len(self.population[0].fitness_score)):
|
||||
sorted_front = sorted(front, key=lambda x: self.population[x].individual['fitness_score'][j])
|
||||
print(sorted_front)
|
||||
# Set distances to finite large numbers rather than `inf`
|
||||
crowding_distance[sorted_front[0]] = float(1e12)
|
||||
crowding_distance[sorted_front[-1]] = float(1e12)
|
||||
objective_min = self.population[sorted_front[0]].individual['fitness_score'][j]
|
||||
objective_max = self.population[sorted_front[-1]].individual['fitness_score'][j]
|
||||
if objective_max != objective_min:
|
||||
for i in range(1, len(sorted_front) - 1):
|
||||
crowding_distance[sorted_front[i]] += (
|
||||
(self.population[sorted_front[i + 1]].individual['fitness_score'][j] -
|
||||
self.population[sorted_front[i - 1]].individual['fitness_score'][j]) /
|
||||
(objective_max - objective_min))
|
||||
return crowding_distance
|
||||
|
||||
@staticmethod
|
||||
def domination_status(individual1, individual2):
|
||||
# Extract the list of objectives for both individuals
|
||||
objectives1 = individual1.individual['fitness_score']
|
||||
objectives2 = individual2.individual['fitness_score']
|
||||
|
||||
# Ensure both individuals have the same number of objectives
|
||||
assert len(objectives1) == len(objectives2), "Both individuals must have the same number of objectives"
|
||||
|
||||
# Flags to check if one dominates the other
|
||||
better_in_all = True # Whether individual1 is better in all objectives
|
||||
strictly_better_in_at_least_one = False # Whether individual1 is strictly better in at least one objective
|
||||
|
||||
for obj1, obj2 in zip(objectives1, objectives2):
|
||||
if obj1 > obj2:
|
||||
better_in_all = False # individual1 is worse in at least one objective
|
||||
if obj1 < obj2:
|
||||
strictly_better_in_at_least_one = True # individual1 is better in at least one objective
|
||||
result = True if better_in_all and strictly_better_in_at_least_one else False
|
||||
return result
|
||||
|
||||
def solve_ga(self, building, energy_system):
|
||||
self.initialize_population(building, energy_system)
|
||||
for individual in self.population:
|
||||
print(individual.individual)
|
||||
progeny_population = []
|
||||
progeny_creation_time_start = time.time()
|
||||
while len(progeny_population) < self.population_size:
|
||||
parent1, parent2 = random.choice(self.population), random.choice(self.population)
|
||||
child1, child2 = self.sbx_crossover(parent1, parent2)
|
||||
self.polynomial_mutation(child1.individual, building, energy_system)
|
||||
self.polynomial_mutation(child2.individual, building, energy_system)
|
||||
child1.score_evaluation()
|
||||
child2.score_evaluation()
|
||||
progeny_population.extend([child1, child2])
|
||||
progeny_creation_duration = time.time() - progeny_creation_time_start
|
||||
print(f"progeny population creation took {progeny_creation_duration:.2f} seconds")
|
||||
self.population.extend(progeny_population)
|
||||
print([ind.fitness_score for ind in self.population])
|
||||
ns_start_time = time.time()
|
||||
fronts = self.fast_non_dominated_sorting()
|
||||
ns_duration = time.time() - ns_start_time
|
||||
print(f"non-dominated sorting took {ns_duration:.2f} seconds")
|
||||
self.crowding_distances = [0] * len(self.population)
|
||||
crowding_d_start_time = time.time()
|
||||
for front in fronts:
|
||||
print(front)
|
||||
self.crowding_distance = self.calculate_crowding_distance(front=front, crowding_distance=self.crowding_distances)
|
||||
print(self.crowding_distance)
|
||||
crowding_duration = time.time() - crowding_d_start_time
|
||||
print(f"crowding d calculation took {crowding_duration:.2f} seconds")
|
||||
self.plot_pareto_front(self.population)
|
||||
|
||||
@staticmethod
|
||||
def plot_pareto_front(pareto_population):
|
||||
# Extract LCC and LCE for plotting
|
||||
lcc_values = [individual.individual['lcc'] for individual in pareto_population]
|
||||
lce_values = [individual.individual['total_energy_consumption'] for individual in pareto_population]
|
||||
plt.figure(figsize=(10, 6))
|
||||
plt.scatter(lcc_values, lce_values, color='blue', label='Pareto Front', alpha=0.6, edgecolors='w', s=80)
|
||||
plt.title('Pareto Front for Life Cycle Cost vs Life Cycle Energy')
|
||||
plt.xlabel('Life Cycle Cost (LCC)')
|
||||
plt.ylabel('Life Cycle Energy (LCE)')
|
||||
plt.grid(True)
|
||||
plt.legend()
|
||||
plt.show()
|
||||
|
||||
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue
Block a user