import math import csv import hub.helpers.constants as cte from pv_assessment.electricity_demand_calculator import HourlyElectricityDemand from hub.catalog_factories.energy_systems_catalog_factory import EnergySystemsCatalogFactory from hub.helpers.monthly_values import MonthlyValues class PvSystemAssessment: def __init__(self, building=None, pv_system=None, battery=None, electricity_demand=None, tilt_angle=None, solar_angles=None, pv_installation_type=None, simulation_model_type=None, module_model_name=None, inverter_efficiency=None, system_catalogue_handler=None, roof_percentage_coverage=None, facade_coverage_percentage=None, csv_output=False, output_path=None, run_lcoe=False): self.building = building self.electricity_demand = electricity_demand self.tilt_angle = tilt_angle self.solar_angles = solar_angles self.pv_installation_type = pv_installation_type self.simulation_model_type = simulation_model_type self.module_model_name = module_model_name self.inverter_efficiency = inverter_efficiency self.system_catalogue_handler = system_catalogue_handler self.roof_percentage_coverage = roof_percentage_coverage self.facade_coverage_percentage = facade_coverage_percentage self.csv_output = csv_output self.output_path = output_path self.run_lcoe = run_lcoe # Default LCOE parameters self.inflation_rate = 0.03 self.discount_rate = 0.05 self.period = 25 self.degradation_rate = 0.01 self.year_of_replacement_list = [12] self.replacement_ratio = 0.1 self.installation_cost = 0 self.tax_deduct = 0 self.incentive = 0 self.pv_hourly_generation = None self.t_cell = None self.results = {} if pv_system is not None: self.pv_system = pv_system else: self.pv_system = None for energy_system in self.building.energy_systems: for generation_system in energy_system.generation_systems: if generation_system.system_type == cte.PHOTOVOLTAIC: self.pv_system = generation_system if battery is not None: self.battery = battery else: self.battery = None for energy_system in self.building.energy_systems: for generation_system in energy_system.generation_systems: if (generation_system.system_type == cte.PHOTOVOLTAIC and generation_system.energy_storage_systems is not None): for storage_system in generation_system.energy_storage_systems: if storage_system.type_energy_stored == cte.ELECTRICAL: self.battery = storage_system @staticmethod def explicit_model(pv_system, inverter_efficiency, number_of_panels, irradiance, outdoor_temperature): stc_power = float(pv_system.standard_test_condition_maximum_power) stc_irradiance = float(pv_system.standard_test_condition_radiation) cell_temperature_coefficient = (float(pv_system.cell_temperature_coefficient) / 100 if pv_system.cell_temperature_coefficient is not None else 0.0) stc_t_cell = float(pv_system.standard_test_condition_cell_temperature) nominal_condition_irradiance = float(pv_system.nominal_radiation) nominal_condition_cell_temperature = float(pv_system.nominal_cell_temperature) nominal_t_out = float(pv_system.nominal_ambient_temperature) pv_output = [] for i in range(len(irradiance)): g_i = irradiance[i] t_out = outdoor_temperature[i] t_cell = t_out + (g_i / nominal_condition_irradiance) * (nominal_condition_cell_temperature - nominal_t_out) power = (inverter_efficiency * number_of_panels * (stc_power * (g_i / stc_irradiance) * (1 - cell_temperature_coefficient * (t_cell - stc_t_cell)))) pv_output.append(power) return pv_output def rooftop_sizing(self, roof): pv_system = self.pv_system if self.module_model_name is not None: self.system_assignation() module_width = float(pv_system.width) module_height = float(pv_system.height) roof_area = roof.perimeter_area pv_module_area = module_width * module_height available_roof = (self.roof_percentage_coverage * roof_area) winter_solstice = self.solar_angles[(self.solar_angles['AST'].dt.month == 12) & (self.solar_angles['AST'].dt.day == 21) & (self.solar_angles['AST'].dt.hour == 12)] solar_altitude = winter_solstice['solar altitude'].values[0] solar_azimuth = winter_solstice['solar azimuth'].values[0] distance = ((module_height * math.sin(math.radians(self.tilt_angle)) * abs(math.cos(math.radians(solar_azimuth)))) / math.tan(math.radians(solar_altitude))) distance = float(format(distance, '.2f')) space_dimension = math.sqrt(available_roof) space_dimension = float(format(space_dimension, '.2f')) panels_per_row = math.ceil(space_dimension / module_width) number_of_rows = math.ceil(space_dimension / distance) total_number_of_panels = panels_per_row * number_of_rows total_pv_area = total_number_of_panels * pv_module_area roof.installed_solar_collector_area = total_pv_area return panels_per_row, number_of_rows def system_assignation(self): generation_units_catalogue = EnergySystemsCatalogFactory(self.system_catalogue_handler).catalog catalog_pv_generation_equipments = [component for component in generation_units_catalogue.entries('generation_equipments') if component.system_type == 'photovoltaic'] selected_pv_module = None for pv_module in catalog_pv_generation_equipments: if self.module_model_name == pv_module.model_name: selected_pv_module = pv_module if selected_pv_module is None: raise ValueError("No PV module with the provided model name exists in the catalogue") for energy_system in self.building.energy_systems: for idx, generation_system in enumerate(energy_system.generation_systems): if generation_system.system_type == cte.PHOTOVOLTAIC: new_system = selected_pv_module for attr in dir(generation_system): if not attr.startswith('__') and not callable(getattr(generation_system, attr)): if not hasattr(new_system, attr): setattr(new_system, attr, getattr(generation_system, attr)) energy_system.generation_systems[idx] = new_system def grid_tied_system(self): if self.electricity_demand is not None: electricity_demand = self.electricity_demand else: electricity_demand = [d * 1000 for d in HourlyElectricityDemand(self.building).calculate()] rooftops_pv_output = [0] * len(electricity_demand) facades_pv_output = [0] * len(electricity_demand) rooftop_number_of_panels = 0 if self.pv_installation_type is not None and 'rooftop' in self.pv_installation_type.lower(): for roof in self.building.roofs: if roof.perimeter_area > 40: npanels_per_row, nrows = self.rooftop_sizing(roof) single_roof_number_of_panels = npanels_per_row * nrows rooftop_number_of_panels += single_roof_number_of_panels if self.simulation_model_type == 'explicit': single_roof_pv_output = self.explicit_model( pv_system=self.pv_system, inverter_efficiency=self.inverter_efficiency, number_of_panels=single_roof_number_of_panels, irradiance=roof.global_irradiance_tilted[cte.HOUR], outdoor_temperature=self.building.external_temperature[cte.HOUR]) for i in range(len(rooftops_pv_output)): rooftops_pv_output[i] += single_roof_pv_output[i] total_hourly_pv_output = [rooftops_pv_output[i] + facades_pv_output[i] for i in range(len(electricity_demand))] imported_electricity = [] exported_electricity = [] self.building.self_sufficiency['hour'] = [] for i in range(len(electricity_demand)): transfer = total_hourly_pv_output[i] - electricity_demand[i] self.building.self_sufficiency['hour'].append(transfer) if transfer > 0: exported_electricity.append(transfer) imported_electricity.append(0) else: exported_electricity.append(0) imported_electricity.append(abs(transfer)) self.building.self_sufficiency['year'] = sum(self.building.self_sufficiency['hour']) results = { 'building_name': self.building.name, 'total_floor_area_m2': self.building.thermal_zones_from_internal_zones[0].total_floor_area, 'roof_area_m2': self.building.roofs[0].perimeter_area, 'rooftop_panels': rooftop_number_of_panels, 'rooftop_panels_area_m2': self.building.roofs[0].installed_solar_collector_area, 'yearly_rooftop_ghi_kW/m2': self.building.roofs[0].global_irradiance[cte.YEAR][0] / 1000, 'yearly_rooftop_tilted_radiation_{}_degree_kW/m2'.format(self.tilt_angle): self.building.roofs[0].global_irradiance_tilted[cte.YEAR][0] / 1000, 'yearly_rooftop_pv_production_kWh': sum(rooftops_pv_output) / 1000, 'yearly_total_pv_production_kWh': sum(total_hourly_pv_output) / 1000, 'specific_pv_production_kWh/kWp': ( sum(rooftops_pv_output) / (float(self.pv_system.standard_test_condition_maximum_power) * rooftop_number_of_panels) ) if rooftop_number_of_panels > 0 else 0, 'pv_installation': 'possible' if (( sum(rooftops_pv_output) / (float(self.pv_system.standard_test_condition_maximum_power) * rooftop_number_of_panels) ) if rooftop_number_of_panels > 0 else 0) > 760 else 'not possible', 'hourly_rooftop_poa_irradiance_W/m2': self.building.roofs[0].global_irradiance_tilted[cte.HOUR], 'hourly_rooftop_pv_output_W': rooftops_pv_output, 'T_out': self.building.external_temperature[cte.HOUR], 'building_electricity_demand_W': electricity_demand, 'total_hourly_pv_system_output_W': total_hourly_pv_output, 'import_from_grid_W': imported_electricity, 'export_to_grid_W': exported_electricity } if self.run_lcoe: stc_power_w = float(self.pv_system.standard_test_condition_maximum_power) capacity_kW = (results['rooftop_panels'] * stc_power_w) / 1000.0 # Parametric cost function cost_per_kW = 3086.8 * (capacity_kW ** (-0.061)) initial_cost = capacity_kW * cost_per_kW first_year_generation_PV = sum(total_hourly_pv_output) / 1000.0 building_hourly_consumption_kWh = [x / 1000.0 for x in electricity_demand] PV_hourly_generation_kWh = [x / 1000.0 for x in total_hourly_pv_output] lcoe_pv = self._calculate_lcoe( capacity=capacity_kW, cost_per_kW=cost_per_kW, first_year_generation=first_year_generation_PV, period=self.period, discount_rate=self.discount_rate, degradation_rate=self.degradation_rate, year_of_replacement_list=self.year_of_replacement_list, replacement_ratio=self.replacement_ratio, inflation_rate=self.inflation_rate, initial_cost=initial_cost ) results['LCOE_PV'] = lcoe_pv # If needed, you can also calculate system LCOE with the given method: # (If you want to represent Zahra's code exactly, this is how you do it) # lcoe_system = self._calculate_system_lcoe( # PV_hourly_generation=PV_hourly_generation_kWh, # building_hourly_consumption=building_hourly_consumption_kWh, # grid_current_tariff=0.06704, # Example or from building function logic # capacity=capacity_kW, # cost_per_kW=cost_per_kW, # first_year_generation_PV=first_year_generation_PV, # period=self.period, # discount_rate=self.discount_rate, # degradation_rate=self.degradation_rate, # year_of_replacement_list=self.year_of_replacement_list, # replacement_ratio=self.replacement_ratio, # inflation_rate=self.inflation_rate, # initial_cost=initial_cost, # installation_cost=self.installation_cost, # tax_deduct=self.tax_deduct, # incentive=self.incentive # ) # results['LCOE_system'] = lcoe_system return results def enrich(self): system_archetype_name = self.building.energy_systems_archetype_name archetype_name = '_'.join(system_archetype_name.lower().split()) if 'grid_tied' in archetype_name: self.results = self.grid_tied_system() for energy_system in self.building.energy_systems: for generation_system in energy_system.generation_systems: if generation_system.system_type == cte.PHOTOVOLTAIC: generation_system.installed_capacity = (self.results['rooftop_panels'] * float(generation_system.standard_test_condition_maximum_power)) hourly_pv_output = self.results['total_hourly_pv_system_output_W'] self.building.pv_generation[cte.HOUR] = hourly_pv_output self.building.pv_generation[cte.MONTH] = MonthlyValues.get_total_month(hourly_pv_output) self.building.pv_generation[cte.YEAR] = [sum(hourly_pv_output)] self.building.pv_generation['LCOE_PV'] = self.results['LCOE_PV'] self.building.pv_generation['PV_Installation'] = self.results['pv_installation'] if self.csv_output: self.save_to_csv(self.results, self.output_path, f'{self.building.name}_pv_system_analysis.csv') @staticmethod def save_to_csv(data, output_path, filename='rooftop_system_results.csv'): single_value_keys = [key for key, value in data.items() if not isinstance(value, list)] list_value_keys = [key for key, value in data.items() if isinstance(value, list)] if list_value_keys: list_lengths = [len(data[key]) for key in list_value_keys] if not all(length == list_lengths[0] for length in list_lengths): raise ValueError("All lists in the dictionary must have the same length") num_rows = list_lengths[0] else: num_rows = 1 with open(output_path / filename, mode='w', newline='') as csv_file: writer = csv.writer(csv_file) for key in single_value_keys: writer.writerow([key, data[key]]) writer.writerow([]) if list_value_keys: writer.writerow(list_value_keys) for i in range(num_rows): row = [data[key][i] for key in list_value_keys] writer.writerow(row) def _discounted_total_generation_pv(self, first_year_generation_PV, period, discount_rate, degradation_rate=0.01): discounted_total_generation = 0 discounted_generation_per_year = {} for year in range(1, period + 1): generation = (first_year_generation_PV * ((1 - degradation_rate) ** (year - 1)) / ((1 + discount_rate) ** year)) discounted_generation_per_year[year] = generation discounted_total_generation += generation return discounted_generation_per_year, discounted_total_generation def _discounted_total_cost_pv(self, capacity, cost_per_kW, discount_rate, year_of_replacement_list, period, replacement_ratio, inflation_rate, initial_cost): opex_annual = {} discounted_annual_cost = {} discounted_total_cost = 0 replacement_cost = { y: capacity * cost_per_kW * replacement_ratio * ((1 + inflation_rate) ** y) / ((1 + discount_rate) ** y) for y in year_of_replacement_list } for y in range(period + 1): # Annual OPEX with inflation and discounting opex_annual[y] = initial_cost * 0.01 * ((1 + inflation_rate) ** y) / ((1 + discount_rate) ** y) for y in range(period + 1): # Annual OPEX opex_annual[y] = initial_cost * 0.01 * ((1 + inflation_rate) ** y) / ((1 + discount_rate) ** y) # Total annual cost (OPEX + replacement CAPEX if applicable) if y == 0: annual_cost = initial_cost elif y in year_of_replacement_list: annual_cost = opex_annual[y] + replacement_cost[y] else: annual_cost = opex_annual[y] discounted_annual_cost[y] = annual_cost discounted_total_cost += annual_cost return opex_annual, initial_cost, discounted_annual_cost, discounted_total_cost, replacement_cost def _estimate_system_income(self, building_hourly_consumption, PV_hourly_generation, grid_current_tariff, inflation_rate, discount_rate, period, initial_cost, installation_cost, tax_deduct, incentive): total_discounted_income = 0 annual_discounted_income_dict = {} def net_metering_income(PV_hourly_generation, building_hourly_consumption, grid_tariff): PV_hourly_export = [max(PV_hourly_generation[i] - building_hourly_consumption[i], 0) for i in range(len(PV_hourly_generation))] building_hourly_purchase = [max(building_hourly_consumption[i] - PV_hourly_generation[i], 0) for i in range(len(PV_hourly_generation))] annual_PV_export = sum(PV_hourly_export) annual_grid_purchase = sum(building_hourly_purchase) return min(annual_PV_export, annual_grid_purchase) * grid_tariff for year in range(1, period + 1): inflated_grid_tariff = (grid_current_tariff * ((1 + inflation_rate) ** year) / ((1 + discount_rate) ** year)) building_hourly_self_consumption = [min(PV_hourly_generation[i], building_hourly_consumption[i]) for i in range(len(PV_hourly_generation))] PV_hourly_export = [max(PV_hourly_generation[i] - building_hourly_consumption[i], 0) for i in range(len(PV_hourly_generation))] self_consumption_income = sum(building_hourly_self_consumption) * inflated_grid_tariff net_metering_revenue = net_metering_income(PV_hourly_generation, building_hourly_consumption, inflated_grid_tariff) annual_tax_deduction_income = (initial_cost * (1 + tax_deduct) * ((1 - tax_deduct) ** (year - 1)) * tax_deduct) annual_discounted_income = self_consumption_income + net_metering_revenue + annual_tax_deduction_income annual_discounted_income_dict[year] = annual_discounted_income total_discounted_income += annual_discounted_income total_discounted_income += incentive return total_discounted_income, annual_discounted_income_dict def _calculate_lcoe(self, capacity, cost_per_kW, first_year_generation, period, discount_rate, degradation_rate, year_of_replacement_list, replacement_ratio, inflation_rate, initial_cost): _, _, _, discounted_total_cost, _ = self._discounted_total_cost_pv( capacity, cost_per_kW, discount_rate, year_of_replacement_list, period, replacement_ratio, inflation_rate, initial_cost ) _, discounted_total_generation = self._discounted_total_generation_pv( first_year_generation, period, discount_rate, degradation_rate ) if discounted_total_generation == 0: raise ValueError("Discounted generation is zero, cannot calculate LCOE.") lcoe = discounted_total_cost / discounted_total_generation return lcoe def _calculate_system_lcoe(self, PV_hourly_generation, building_hourly_consumption, grid_current_tariff, capacity, cost_per_kW, first_year_generation_PV, period, discount_rate, degradation_rate, year_of_replacement_list, replacement_ratio, inflation_rate, initial_cost, installation_cost, tax_deduct, incentive): PV_hourly_export = [max(PV_hourly_generation[i] - building_hourly_consumption[i], 0) for i in range(len(PV_hourly_generation))] building_hourly_purchase = [max(building_hourly_consumption[i] - PV_hourly_generation[i], 0) for i in range(len(PV_hourly_generation))] building_hourly_self_consumption = [min(PV_hourly_generation[i], building_hourly_consumption[i]) for i in range(len(PV_hourly_generation))] annual_PV_export = sum(PV_hourly_export) annual_grid_purchase = sum(building_hourly_purchase) annual_building_self_consumption = sum(building_hourly_self_consumption) total_energy = annual_building_self_consumption + annual_grid_purchase + annual_PV_export share_self = annual_building_self_consumption / total_energy if total_energy > 0 else 0 share_grid = annual_grid_purchase / total_energy if total_energy > 0 else 0 share_export = annual_PV_export / total_energy if total_energy > 0 else 0 lcoe_pv = self._calculate_lcoe( capacity, cost_per_kW, first_year_generation_PV, period, discount_rate, degradation_rate, year_of_replacement_list, replacement_ratio, inflation_rate, initial_cost ) lcoe_grid = grid_current_tariff pv_export_income, annual_discounted_income_dict = self._estimate_system_income( building_hourly_consumption, PV_hourly_generation, grid_current_tariff, inflation_rate, discount_rate, period, initial_cost, installation_cost, tax_deduct, incentive ) lcoe_export = -pv_export_income / annual_PV_export if annual_PV_export > 0 else 0 lcoe_system = ( share_self * lcoe_pv + share_grid * lcoe_grid + share_export * lcoe_export ) return lcoe_system