feat: pv calculation code added and tested

This commit is contained in:
Saeed Ranjbar 2024-11-26 11:43:11 +01:00
parent 4cd79c2125
commit 6ec598218c
7 changed files with 822 additions and 92 deletions

View File

@ -317,7 +317,8 @@ LATENT = 'Latent'
LITHIUMION = 'Lithium Ion'
NICD = 'NiCd'
LEADACID = 'Lead Acid'
THERMAL = 'thermal'
ELECTRICAL = 'electrical'
# Geometry
EPSILON = 0.0000001

View File

@ -1,103 +1,126 @@
import csv
from pathlib import Path
from hub.helpers.monthly_values import MonthlyValues
import hub.helpers.constants as cte
class ArchetypeBasedDemand:
def __init__(self, city, base_path):
self.city = city
self.archetype_csv_path = Path(base_path)
self.archetype_data = self._load_archetype_data()
def __init__(self, city, base_path):
self.city = city
self.archetype_csv_path = Path(base_path)
self.archetype_data = self._load_archetype_data()
def _load_archetype_data(self):
archetype_data = {}
with open(self.archetype_csv_path, 'r', encoding='utf-8-sig') as csv_file:
csv_reader = csv.DictReader(csv_file)
csv_reader.fieldnames = [field.strip() for field in csv_reader.fieldnames]
for row in csv_reader:
standardized_row = {key.strip(): value.strip() for key, value in row.items()}
usage = standardized_row['Usage']
vintage = standardized_row['Vintage']
full_key = f"{usage} {vintage}"
def _load_archetype_data(self):
archetype_data = {}
with open(self.archetype_csv_path, 'r', encoding='utf-8-sig') as csv_file:
csv_reader = csv.DictReader(csv_file)
csv_reader.fieldnames = [field.strip() for field in csv_reader.fieldnames]
for row in csv_reader:
standardized_row = {key.strip(): value.strip() for key, value in row.items()}
usage = standardized_row['Usage']
vintage = standardized_row['Vintage']
full_key = f"{usage} {vintage}"
# Initialize the archetype entry if not present
if full_key not in archetype_data:
# Initialize dictionaries for each demand type with empty lists
archetype_data[full_key] = {
'Heating': [],
'Cooling': [],
'DHW': [],
'Equipment': [],
'Lighting': [],
}
# Initialize the archetype entry if not present
if full_key not in archetype_data:
# Initialize dictionaries for each demand type with empty lists
archetype_data[full_key] = {
'Heating': [],
'Cooling': [],
'DHW': [],
'Equipment': [],
'Lighting': [],
}
# Append the demand values to the lists
archetype_data[full_key]['Heating'].append(float(standardized_row['Heating']))
archetype_data[full_key]['Cooling'].append(float(standardized_row['Cooling']))
archetype_data[full_key]['DHW'].append(float(standardized_row['DHW']))
archetype_data[full_key]['Equipment'].append(float(standardized_row['Equipment']))
archetype_data[full_key]['Lighting'].append(float(standardized_row['Lighting']))
return archetype_data
# Append the demand values to the lists
archetype_data[full_key]['Heating'].append(float(standardized_row['Heating']))
archetype_data[full_key]['Cooling'].append(float(standardized_row['Cooling']))
archetype_data[full_key]['DHW'].append(float(standardized_row['DHW']))
archetype_data[full_key]['Equipment'].append(float(standardized_row['Equipment']))
archetype_data[full_key]['Lighting'].append(float(standardized_row['Lighting']))
return archetype_data
def _get_archetype_key(self, building):
function = building.function.lower()
year = building.year_of_construction
height = building.eave_height
adjacency = building.adjacency.lower()
def _get_archetype_key(self, building):
function = building.function.lower()
year = building.year_of_construction
height = building.eave_height
adjacency = building.adjacency.lower()
if function in ['residential', 'multifamily house', 'single family house']:
if height < 6 and adjacency == 'detached':
usage = 'Single Family'
elif height < 6 and adjacency == 'attached':
usage = 'Row house'
elif 6 <= height <= 10 and adjacency == 'detached':
usage = 'Duplex/triplex'
elif 6 <= height <= 10 and adjacency == 'attached':
usage = 'Small MURBs'
elif 10 < height <= 15:
usage = 'Medium MURBs'
elif 15 < height:
usage = 'Large MURBs'
else:
usage = "No Archetype!"
elif function in ['office', 'office and administration']:
usage = 'Office'
elif function in ['commercial', 'retail shop without refrigerated food', 'retail shop with refrigerated food',
'stand alone retail', 'strip mall']:
if adjacency == 'attached':
usage = 'Commercial attached'
else:
usage = 'Commercial detached'
else:
usage = "No Archetypes yet"
if function in ['residential', 'multifamily house', 'single family house']:
if height < 6 and adjacency == 'detached':
usage = 'Single Family'
elif height < 6 and adjacency == 'attached':
usage = 'Row house'
elif 6 <= height <= 10 and adjacency == 'detached':
usage = 'Duplex/triplex'
elif 6 <= height <= 10 and adjacency == 'attached':
usage = 'Small MURBs'
elif 10 < height <= 15:
usage = 'Medium MURBs'
elif 15 < height:
usage = 'Large MURBs'
else:
usage = "No Archetype!"
elif function in ['office', 'office and administration']:
usage = 'Office'
elif function in ['commercial', 'retail shop without refrigerated food', 'retail shop with refrigerated food',
'stand alone retail', 'strip mall']:
if adjacency == 'attached':
usage = 'Commercial attached'
else:
usage = 'Commercial detached'
else:
usage = "No Archetypes yet"
if year <= 1947:
vintage = 'Pre 1947'
elif 1947 < year <= 1983:
vintage = '1947-1983'
elif 1983 < year <= 2010:
vintage = '1984-2010'
else:
vintage = 'Post 2010'
if year <= 1947:
vintage = 'Pre 1947'
elif 1947 < year <= 1983:
vintage = '1947-1983'
elif 1983 < year <= 2010:
vintage = '1984-2010'
else:
vintage = 'Post 2010'
if usage:
archetype_key = f"{usage} {vintage}"
return archetype_key
else:
return None
if usage:
archetype_key = f"{usage} {vintage}"
return archetype_key
else:
return None
def _assign_demands(self, building, demand, area):
building.heating_demand = {'hour': [value * area for value in demand['Heating']]}
building.cooling_demand = {'hour': [value * area for value in demand['Cooling']]}
building.domestic_hot_water_heat_demand = {'hour': [value * area for value in demand['DHW']]}
building.appliances_electrical_demand = {'hour': [value * area for value in demand['Equipment']]}
building.lighting_electrical_demand = {'hour': [value * area for value in demand['Lighting']]}
def _assign_demands(self, building, demand, area):
hourly_heating_demand = [value * area for value in demand['Heating']]
building.heating_demand = {cte.HOUR: hourly_heating_demand,
cte.MONTH: MonthlyValues.get_total_month(hourly_heating_demand),
cte.YEAR: [sum(hourly_heating_demand)]
}
hourly_cooling_demand = [value * area for value in demand['Cooling']]
building.cooling_demand = {cte.HOUR: hourly_cooling_demand,
cte.MONTH: MonthlyValues.get_total_month(hourly_cooling_demand),
cte.YEAR: [sum(hourly_cooling_demand)]
}
hourly_dhw_demand = [value * area for value in demand['DHW']]
building.domestic_hot_water_heat_demand = {cte.HOUR: hourly_dhw_demand,
cte.MONTH: MonthlyValues.get_total_month(hourly_dhw_demand),
cte.YEAR: [sum(hourly_dhw_demand)]
}
hourly_appliance_demand = [value * area for value in demand['Equipment']]
building.appliances_electrical_demand = {cte.HOUR: hourly_appliance_demand,
cte.MONTH: MonthlyValues.get_total_month(hourly_appliance_demand),
cte.YEAR: [sum(hourly_appliance_demand)]
}
hourly_lighting_demand = [value * area for value in demand['Lighting']]
building.lighting_electrical_demand = {cte.HOUR: hourly_lighting_demand,
cte.MONTH: MonthlyValues.get_total_month(hourly_lighting_demand),
cte.YEAR: [sum(hourly_lighting_demand)]
}
def enrich(self):
for building in self.city.buildings:
archetype_key = self._get_archetype_key(building)
print(archetype_key)
if archetype_key and archetype_key in self.archetype_data:
demand = self.archetype_data[archetype_key]
area = building.thermal_zones_from_internal_zones[0].total_floor_area
self._assign_demands(building, demand, area)
else:
print(f"No archetype found for building: {building.name} with key: {archetype_key}")
def enrich(self):
for building in self.city.buildings:
archetype_key = self._get_archetype_key(building)
print(archetype_key)
if archetype_key and archetype_key in self.archetype_data:
demand = self.archetype_data[archetype_key]
area = building.thermal_zones_from_internal_zones[0].total_floor_area
self._assign_demands(building, demand, area)
else:
print(f"No archetype found for building: {building.name} with key: {archetype_key}")

57
main.py
View File

@ -1,11 +1,26 @@
from hub.imports.energy_systems_factory import EnergySystemsFactory
from hub.imports.geometry_factory import GeometryFactory
from hub.helpers.dictionaries import Dictionaries
from hub.imports.construction_factory import ConstructionFactory
from hub.imports.results_factory import ResultFactory
from hub.exports.exports_factory import ExportsFactory
import subprocess
from pathlib import Path
from hub.imports.weather_factory import WeatherFactory
from pv_assessment.electricity_demand_calculator import HourlyElectricityDemand
from pv_assessment.pv_system_assessment import PvSystemAssessment
from pv_assessment.solar_calculator import SolarCalculator
input_file = "data/cmm_test_corrected.geojson"
demand_file = "data/energy_demand_data.csv"
# Define specific paths for outputs from SRA (Simplified Radiosity Algorith) and PV calculation processes
output_path = (Path(__file__).parent.parent / 'out_files').resolve()
output_path.mkdir(parents=True, exist_ok=True)
sra_output_path = output_path / 'sra_outputs'
sra_output_path.mkdir(parents=True, exist_ok=True)
pv_assessment_path = output_path / 'pv_outputs'
pv_assessment_path.mkdir(parents=True, exist_ok=True)
city = GeometryFactory(
"geojson",
input_file,
@ -15,5 +30,45 @@ city = GeometryFactory(
adjacency_field="adjacency",
function_to_hub=Dictionaries().montreal_function_to_hub_function).city
ConstructionFactory('nrcan', city).enrich()
WeatherFactory('epw', city).enrich()
ResultFactory('archetypes', city, demand_file).enrich()
# Export the city data in SRA-compatible format to facilitate solar radiation assessment
ExportsFactory('sra', city, sra_output_path).export()
# Run SRA simulation using an external command, passing the generated SRA XML file path as input
sra_path = (sra_output_path / f'{city.name}_sra.xml').resolve()
subprocess.run(['sra', str(sra_path)])
# Enrich city data with SRA simulation results for subsequent analysis
ResultFactory('sra', city, sra_output_path).enrich()
# # Initialize solar calculation parameters (e.g., azimuth, altitude) and compute irradiance and solar angles
tilt_angle = 37
solar_parameters = SolarCalculator(city=city,
surface_azimuth_angle=180,
tilt_angle=tilt_angle,
standard_meridian=-75)
solar_angles = solar_parameters.solar_angles # Obtain solar angles for further analysis
solar_parameters.tilted_irradiance_calculator() # Calculate the solar radiation on a tilted surface
# Assignation of Energy System Archetypes to Buildings
#TODO this needs to be modified. We should either use the existing percentages or assign systems based on building
# functions
for building in city.buildings:
building.energy_systems_archetype_name = 'Grid Tied PV System'
EnergySystemsFactory('montreal_future', city).enrich()
for building in city.buildings:
electricity_demand = HourlyElectricityDemand(building).calculate()
PvSystemAssessment(building=building,
pv_system=None,
battery=None,
electricity_demand=electricity_demand,
tilt_angle=tilt_angle,
solar_angles=solar_angles,
pv_installation_type='rooftop',
simulation_model_type='explicit',
module_model_name=None,
inverter_efficiency=0.95,
system_catalogue_handler=None,
roof_percentage_coverage=0.75,
facade_coverage_percentage=0,
csv_output=False,
output_path=pv_assessment_path).enrich()
print("done")

View File

@ -0,0 +1,75 @@
import hub.helpers.constants as cte
class HourlyElectricityDemand:
def __init__(self, building):
self.building = building
def calculate(self):
hourly_electricity_consumption = []
energy_systems = self.building.energy_systems
appliance = self.building.appliances_electrical_demand[cte.HOUR] if self.building.appliances_electrical_demand else 0
lighting = self.building.lighting_electrical_demand[cte.HOUR] if self.building.lighting_electrical_demand else 0
elec_heating = 0
elec_cooling = 0
elec_dhw = 0
if cte.HEATING in self.building.energy_consumption_breakdown[cte.ELECTRICITY]:
elec_heating = 1
if cte.COOLING in self.building.energy_consumption_breakdown[cte.ELECTRICITY]:
elec_cooling = 1
if cte.DOMESTIC_HOT_WATER in self.building.energy_consumption_breakdown[cte.ELECTRICITY]:
elec_dhw = 1
heating = None
cooling = None
dhw = None
if elec_heating == 1:
for energy_system in energy_systems:
if cte.HEATING in energy_system.demand_types:
for generation_system in energy_system.generation_systems:
if generation_system.fuel_type == cte.ELECTRICITY:
if cte.HEATING in generation_system.energy_consumption:
heating = generation_system.energy_consumption[cte.HEATING][cte.HOUR]
else:
if len(energy_system.generation_systems) > 1:
heating = [x / 2 for x in self.building.heating_consumption[cte.HOUR]]
else:
heating = self.building.heating_consumption[cte.HOUR]
if elec_dhw == 1:
for energy_system in energy_systems:
if cte.DOMESTIC_HOT_WATER in energy_system.demand_types:
for generation_system in energy_system.generation_systems:
if generation_system.fuel_type == cte.ELECTRICITY:
if cte.DOMESTIC_HOT_WATER in generation_system.energy_consumption:
dhw = generation_system.energy_consumption[cte.DOMESTIC_HOT_WATER][cte.HOUR]
else:
if len(energy_system.generation_systems) > 1:
dhw = [x / 2 for x in self.building.domestic_hot_water_consumption[cte.HOUR]]
else:
dhw = self.building.domestic_hot_water_consumption[cte.HOUR]
if elec_cooling == 1:
for energy_system in energy_systems:
if cte.COOLING in energy_system.demand_types:
for generation_system in energy_system.generation_systems:
if cte.COOLING in generation_system.energy_consumption:
cooling = generation_system.energy_consumption[cte.COOLING][cte.HOUR]
else:
if len(energy_system.generation_systems) > 1:
cooling = [x / 2 for x in self.building.cooling_consumption[cte.HOUR]]
else:
cooling = self.building.cooling_consumption[cte.HOUR]
for i in range(8760):
hourly = 0
if isinstance(appliance, list):
hourly += appliance[i]
if isinstance(lighting, list):
hourly += lighting[i]
if heating is not None:
hourly += heating[i]
if cooling is not None:
hourly += cooling[i]
if dhw is not None:
hourly += dhw[i]
hourly_electricity_consumption.append(hourly)
return hourly_electricity_consumption

View File

@ -0,0 +1,225 @@
import math
import csv
import hub.helpers.constants as cte
from pv_assessment.electricity_demand_calculator import HourlyElectricityDemand
from hub.catalog_factories.energy_systems_catalog_factory import EnergySystemsCatalogFactory
from hub.helpers.monthly_values import MonthlyValues
class PvSystemAssessment:
def __init__(self, building=None, pv_system=None, battery=None, electricity_demand=None, tilt_angle=None,
solar_angles=None, pv_installation_type=None, simulation_model_type=None, module_model_name=None,
inverter_efficiency=None, system_catalogue_handler=None, roof_percentage_coverage=None,
facade_coverage_percentage=None, csv_output=False, output_path=None):
"""
:param building:
:param tilt_angle:
:param solar_angles:
:param simulation_model_type:
:param module_model_name:
:param inverter_efficiency:
:param system_catalogue_handler:
:param roof_percentage_coverage:
:param facade_coverage_percentage:
"""
self.building = building
self.electricity_demand = electricity_demand
self.tilt_angle = tilt_angle
self.solar_angles = solar_angles
self.pv_installation_type = pv_installation_type
self.simulation_model_type = simulation_model_type
self.module_model_name = module_model_name
self.inverter_efficiency = inverter_efficiency
self.system_catalogue_handler = system_catalogue_handler
self.roof_percentage_coverage = roof_percentage_coverage
self.facade_coverage_percentage = facade_coverage_percentage
self.pv_hourly_generation = None
self.t_cell = None
self.results = {}
self.csv_output = csv_output
self.output_path = output_path
if pv_system is not None:
self.pv_system = pv_system
else:
for energy_system in self.building.energy_systems:
for generation_system in energy_system.generation_systems:
if generation_system.system_type == cte.PHOTOVOLTAIC:
self.pv_system = generation_system
if battery is not None:
self.battery = battery
else:
for energy_system in self.building.energy_systems:
for generation_system in energy_system.generation_systems:
if generation_system.system_type == cte.PHOTOVOLTAIC and generation_system.energy_storage_systems is not None:
for storage_system in generation_system.energy_storage_systems:
if storage_system.type_energy_stored == cte.ELECTRICAL:
self.battery = storage_system
@staticmethod
def explicit_model(pv_system, inverter_efficiency, number_of_panels, irradiance, outdoor_temperature):
inverter_efficiency = inverter_efficiency
stc_power = float(pv_system.standard_test_condition_maximum_power)
stc_irradiance = float(pv_system.standard_test_condition_radiation)
cell_temperature_coefficient = float(pv_system.cell_temperature_coefficient) / 100 if (
pv_system.cell_temperature_coefficient is not None) else None
stc_t_cell = float(pv_system.standard_test_condition_cell_temperature)
nominal_condition_irradiance = float(pv_system.nominal_radiation)
nominal_condition_cell_temperature = float(pv_system.nominal_cell_temperature)
nominal_t_out = float(pv_system.nominal_ambient_temperature)
g_i = irradiance
t_out = outdoor_temperature
t_cell = []
pv_output = []
for i in range(len(g_i)):
t_cell.append((t_out[i] + (g_i[i] / nominal_condition_irradiance) *
(nominal_condition_cell_temperature - nominal_t_out)))
pv_output.append((inverter_efficiency * number_of_panels * (stc_power * (g_i[i] / stc_irradiance) *
(1 - cell_temperature_coefficient *
(t_cell[i] - stc_t_cell)))))
return pv_output
def rooftop_sizing(self, roof):
pv_system = self.pv_system
if self.module_model_name is not None:
self.system_assignation()
# System Sizing
module_width = float(pv_system.width)
module_height = float(pv_system.height)
roof_area = roof.perimeter_area
pv_module_area = module_width * module_height
available_roof = (self.roof_percentage_coverage * roof_area)
# Inter-Row Spacing
winter_solstice = self.solar_angles[(self.solar_angles['AST'].dt.month == 12) &
(self.solar_angles['AST'].dt.day == 21) &
(self.solar_angles['AST'].dt.hour == 12)]
solar_altitude = winter_solstice['solar altitude'].values[0]
solar_azimuth = winter_solstice['solar azimuth'].values[0]
distance = ((module_height * math.sin(math.radians(self.tilt_angle)) * abs(
math.cos(math.radians(solar_azimuth)))) / math.tan(math.radians(solar_altitude)))
distance = float(format(distance, '.2f'))
# Calculation of the number of panels
space_dimension = math.sqrt(available_roof)
space_dimension = float(format(space_dimension, '.2f'))
panels_per_row = math.ceil(space_dimension / module_width)
number_of_rows = math.ceil(space_dimension / distance)
total_number_of_panels = panels_per_row * number_of_rows
total_pv_area = total_number_of_panels * pv_module_area
roof.installed_solar_collector_area = total_pv_area
return panels_per_row, number_of_rows
def system_assignation(self):
generation_units_catalogue = EnergySystemsCatalogFactory(self.system_catalogue_handler).catalog
catalog_pv_generation_equipments = [component for component in
generation_units_catalogue.entries('generation_equipments') if
component.system_type == 'photovoltaic']
selected_pv_module = None
for pv_module in catalog_pv_generation_equipments:
if self.module_model_name == pv_module.model_name:
selected_pv_module = pv_module
if selected_pv_module is None:
raise ValueError("No PV module with the provided model name exists in the catalogue")
for energy_system in self.building.energy_systems:
for idx, generation_system in enumerate(energy_system.generation_systems):
if generation_system.system_type == cte.PHOTOVOLTAIC:
new_system = selected_pv_module
# Preserve attributes that exist in the original but not in the new system
for attr in dir(generation_system):
# Skip private attributes and methods
if not attr.startswith('__') and not callable(getattr(generation_system, attr)):
if not hasattr(new_system, attr):
setattr(new_system, attr, getattr(generation_system, attr))
# Replace the old generation system with the new one
energy_system.generation_systems[idx] = new_system
def grid_tied_system(self):
if self.electricity_demand is not None:
electricity_demand = self.electricity_demand
else:
electricity_demand = [demand / cte.WATTS_HOUR_TO_JULES for demand in
HourlyElectricityDemand(self.building).calculate()]
rooftops_pv_output = [0] * len(electricity_demand)
facades_pv_output = [0] * len(electricity_demand)
rooftop_number_of_panels = 0
if 'rooftop' in self.pv_installation_type.lower():
for roof in self.building.roofs:
if roof.perimeter_area > 40:
np, ns = self.rooftop_sizing(roof)
single_roof_number_of_panels = np * ns
rooftop_number_of_panels += single_roof_number_of_panels
if self.simulation_model_type == 'explicit':
single_roof_pv_output = self.explicit_model(pv_system=self.pv_system,
inverter_efficiency=self.inverter_efficiency,
number_of_panels=single_roof_number_of_panels,
irradiance=roof.global_irradiance_tilted[cte.HOUR],
outdoor_temperature=self.building.external_temperature[
cte.HOUR])
for i in range(len(rooftops_pv_output)):
rooftops_pv_output[i] += single_roof_pv_output[i]
total_hourly_pv_output = [rooftops_pv_output[i] + facades_pv_output[i] for i in range(8760)]
imported_electricity = [0] * 8760
exported_electricity = [0] * 8760
for i in range(len(electricity_demand)):
transfer = total_hourly_pv_output[i] - electricity_demand[i]
if transfer > 0:
exported_electricity[i] = transfer
else:
imported_electricity[i] = abs(transfer)
results = {'building_name': self.building.name,
'total_floor_area_m2': self.building.thermal_zones_from_internal_zones[0].total_floor_area,
'roof_area_m2': self.building.roofs[0].perimeter_area, 'rooftop_panels': rooftop_number_of_panels,
'rooftop_panels_area_m2': self.building.roofs[0].installed_solar_collector_area,
'yearly_rooftop_ghi_kW/m2': self.building.roofs[0].global_irradiance[cte.YEAR][0] / 1000,
f'yearly_rooftop_tilted_radiation_{self.tilt_angle}_degree_kW/m2':
self.building.roofs[0].global_irradiance_tilted[cte.YEAR][0] / 1000,
'yearly_rooftop_pv_production_kWh': sum(rooftops_pv_output) / 1000,
'yearly_total_pv_production_kWh': sum(total_hourly_pv_output) / 1000,
'specific_pv_production_kWh/kWp': sum(rooftops_pv_output) / (
float(self.pv_system.standard_test_condition_maximum_power) * rooftop_number_of_panels),
'hourly_rooftop_poa_irradiance_W/m2': self.building.roofs[0].global_irradiance_tilted[cte.HOUR],
'hourly_rooftop_pv_output_W': rooftops_pv_output, 'T_out': self.building.external_temperature[cte.HOUR],
'building_electricity_demand_W': electricity_demand,
'total_hourly_pv_system_output_W': total_hourly_pv_output, 'import_from_grid_W': imported_electricity,
'export_to_grid_W': exported_electricity}
return results
def enrich(self):
system_archetype_name = self.building.energy_systems_archetype_name
archetype_name = '_'.join(system_archetype_name.lower().split())
if 'grid_tied' in archetype_name:
self.results = self.grid_tied_system()
hourly_pv_output = self.results['total_hourly_pv_system_output_W']
self.building.pv_generation[cte.HOUR] = hourly_pv_output
self.building.pv_generation[cte.MONTH] = MonthlyValues.get_total_month(hourly_pv_output)
self.building.pv_generation[cte.YEAR] = [sum(hourly_pv_output)]
if self.csv_output:
self.save_to_csv(self.results, self.output_path, f'{self.building.name}_pv_system_analysis.csv')
@staticmethod
def save_to_csv(data, output_path, filename='rooftop_system_results.csv'):
# Separate keys based on whether their values are single values or lists
single_value_keys = [key for key, value in data.items() if not isinstance(value, list)]
list_value_keys = [key for key, value in data.items() if isinstance(value, list)]
# Check if all lists have the same length
list_lengths = [len(data[key]) for key in list_value_keys]
if not all(length == list_lengths[0] for length in list_lengths):
raise ValueError("All lists in the dictionary must have the same length")
# Get the length of list values (assuming all lists are of the same length, e.g., 8760 for hourly data)
num_rows = list_lengths[0] if list_value_keys else 1
# Open the CSV file for writing
with open(output_path / filename, mode='w', newline='') as csv_file:
writer = csv.writer(csv_file)
# Write single-value data as a header section
for key in single_value_keys:
writer.writerow([key, data[key]])
# Write an empty row for separation
writer.writerow([])
# Write the header for the list values
writer.writerow(list_value_keys)
# Write each row for the lists
for i in range(num_rows):
row = [data[key][i] for key in list_value_keys]
writer.writerow(row)

View File

@ -0,0 +1,222 @@
import math
import pandas as pd
from datetime import datetime
import hub.helpers.constants as cte
from hub.helpers.monthly_values import MonthlyValues
class SolarCalculator:
def __init__(self, city, tilt_angle, surface_azimuth_angle, standard_meridian=-75,
solar_constant=1366.1, maximum_clearness_index=1, min_cos_zenith=0.065, maximum_zenith_angle=87):
"""
A class to calculate the solar angles and solar irradiance on a tilted surface in the City
:param city: An object from the City class -> City
:param tilt_angle: tilt angle of surface -> float
:param surface_azimuth_angle: The orientation of the surface. 0 is North -> float
:param standard_meridian: A standard meridian is the meridian whose mean solar time is the basis of the time of day
observed in a time zone -> float
:param solar_constant: The amount of energy received by a given area one astronomical unit away from the Sun. It is
constant and must not be changed
:param maximum_clearness_index: This is used to calculate the diffuse fraction of the solar irradiance -> float
:param min_cos_zenith: This is needed to avoid unrealistic values in tilted irradiance calculations -> float
:param maximum_zenith_angle: This is needed to avoid negative values in tilted irradiance calculations -> float
"""
self.city = city
self.location_latitude = city.latitude
self.location_longitude = city.longitude
self.location_latitude_rad = math.radians(self.location_latitude)
self.surface_azimuth_angle = surface_azimuth_angle
self.surface_azimuth_rad = math.radians(surface_azimuth_angle)
self.tilt_angle = tilt_angle
self.tilt_angle_rad = math.radians(tilt_angle)
self.standard_meridian = standard_meridian
self.longitude_correction = (self.location_longitude - standard_meridian) * 4
self.solar_constant = solar_constant
self.maximum_clearness_index = maximum_clearness_index
self.min_cos_zenith = min_cos_zenith
self.maximum_zenith_angle = maximum_zenith_angle
timezone_offset = int(-standard_meridian / 15)
self.timezone = f'Etc/GMT{"+" if timezone_offset < 0 else "-"}{abs(timezone_offset)}'
self.eot = []
self.ast = []
self.hour_angles = []
self.declinations = []
self.solar_altitudes = []
self.solar_azimuths = []
self.zeniths = []
self.incidents = []
self.i_on = []
self.i_oh = []
self.times = pd.date_range(start='2023-01-01', end='2023-12-31 23:00', freq='h', tz=self.timezone)
self.solar_angles = pd.DataFrame(index=self.times)
self.day_of_year = self.solar_angles.index.dayofyear
def solar_time(self, datetime_val, day_of_year):
b = (day_of_year - 81) * 2 * math.pi / 364
eot = 9.87 * math.sin(2 * b) - 7.53 * math.cos(b) - 1.5 * math.sin(b)
self.eot.append(eot)
# Calculate Local Solar Time (LST)
lst_hour = datetime_val.hour
lst_minute = datetime_val.minute
lst_second = datetime_val.second
lst = lst_hour + lst_minute / 60 + lst_second / 3600
# Calculate Apparent Solar Time (AST) in decimal hours
ast_decimal = lst + eot / 60 + self.longitude_correction / 60
ast_hours = int(ast_decimal) % 24 # Adjust hours to fit within 023 range
ast_minutes = round((ast_decimal - ast_hours) * 60)
# Ensure ast_minutes is within valid range
if ast_minutes == 60:
ast_hours += 1
ast_minutes = 0
elif ast_minutes < 0:
ast_minutes = 0
ast_time = datetime(year=datetime_val.year, month=datetime_val.month, day=datetime_val.day,
hour=ast_hours, minute=ast_minutes)
self.ast.append(ast_time)
return ast_time
def declination_angle(self, day_of_year):
declination = 23.45 * math.sin(math.radians(360 / 365 * (284 + day_of_year)))
declination_radian = math.radians(declination)
self.declinations.append(declination)
return declination_radian
def hour_angle(self, ast_time):
hour_angle = ((ast_time.hour * 60 + ast_time.minute) - 720) / 4
hour_angle_radian = math.radians(hour_angle)
self.hour_angles.append(hour_angle)
return hour_angle_radian
def solar_altitude(self, declination_radian, hour_angle_radian):
solar_altitude_radians = math.asin(math.cos(self.location_latitude_rad) * math.cos(declination_radian) *
math.cos(hour_angle_radian) + math.sin(self.location_latitude_rad) *
math.sin(declination_radian))
solar_altitude = math.degrees(solar_altitude_radians)
self.solar_altitudes.append(solar_altitude)
return solar_altitude_radians
def zenith(self, solar_altitude_radians):
solar_altitude = math.degrees(solar_altitude_radians)
zenith_degree = 90 - solar_altitude
zenith_radian = math.radians(zenith_degree)
self.zeniths.append(zenith_degree)
return zenith_radian
def solar_azimuth_analytical(self, hourangle, declination, zenith):
numer = (math.cos(zenith) * math.sin(self.location_latitude_rad) - math.sin(declination))
denom = (math.sin(zenith) * math.cos(self.location_latitude_rad))
if math.isclose(denom, 0.0, abs_tol=1e-8):
cos_azi = 1.0
else:
cos_azi = numer / denom
cos_azi = max(-1.0, min(1.0, cos_azi))
sign_ha = math.copysign(1, hourangle)
solar_azimuth_radians = sign_ha * math.acos(cos_azi) + math.pi
solar_azimuth_degrees = math.degrees(solar_azimuth_radians)
self.solar_azimuths.append(solar_azimuth_degrees)
return solar_azimuth_radians
def incident_angle(self, solar_altitude_radians, solar_azimuth_radians):
incident_radian = math.acos(math.cos(solar_altitude_radians) *
math.cos(abs(solar_azimuth_radians - self.surface_azimuth_rad)) *
math.sin(self.tilt_angle_rad) + math.sin(solar_altitude_radians) *
math.cos(self.tilt_angle_rad))
incident_angle_degrees = math.degrees(incident_radian)
self.incidents.append(incident_angle_degrees)
return incident_radian
def dni_extra(self, day_of_year, zenith_radian):
i_on = self.solar_constant * (1 + 0.033 * math.cos(math.radians(360 * day_of_year / 365)))
i_oh = i_on * max(math.cos(zenith_radian), self.min_cos_zenith)
self.i_on.append(i_on)
self.i_oh.append(i_oh)
return i_on, i_oh
def clearness_index(self, ghi, i_oh):
k_t = ghi / i_oh
k_t = max(0, k_t)
k_t = min(self.maximum_clearness_index, k_t)
return k_t
def diffuse_fraction(self, k_t, zenith):
if k_t <= 0.22:
fraction_diffuse = 1 - 0.09 * k_t
elif k_t <= 0.8:
fraction_diffuse = (0.9511 - 0.1604 * k_t + 4.388 * k_t ** 2 - 16.638 * k_t ** 3 + 12.336 * k_t ** 4)
else:
fraction_diffuse = 0.165
if zenith > self.maximum_zenith_angle:
fraction_diffuse = 1
return fraction_diffuse
def radiation_components_horizontal(self, ghi, fraction_diffuse, zenith):
diffuse_horizontal = ghi * fraction_diffuse
dni = (ghi - diffuse_horizontal) / math.cos(math.radians(zenith))
if zenith > self.maximum_zenith_angle or dni < 0:
dni = 0
return diffuse_horizontal, dni
def radiation_components_tilted(self, diffuse_horizontal, dni, incident_angle):
beam_tilted = dni * math.cos(math.radians(incident_angle))
beam_tilted = max(beam_tilted, 0)
diffuse_tilted = diffuse_horizontal * ((1 + math.cos(math.radians(self.tilt_angle))) / 2)
total_radiation_tilted = beam_tilted + diffuse_tilted
return total_radiation_tilted
def solar_angles_calculator(self, csv_output=False):
for i in range(len(self.times)):
datetime_val = self.times[i]
day_of_year = self.day_of_year[i]
declination_radians = self.declination_angle(day_of_year)
ast_time = self.solar_time(datetime_val, day_of_year)
hour_angle_radians = self.hour_angle(ast_time)
solar_altitude_radians = self.solar_altitude(declination_radians, hour_angle_radians)
zenith_radians = self.zenith(solar_altitude_radians)
solar_azimuth_radians = self.solar_azimuth_analytical(hour_angle_radians, declination_radians, zenith_radians)
self.incident_angle(solar_altitude_radians, solar_azimuth_radians)
self.dni_extra(day_of_year=day_of_year, zenith_radian=zenith_radians)
self.solar_angles['DateTime'] = self.times
self.solar_angles['AST'] = self.ast
self.solar_angles['hour angle'] = self.hour_angles
self.solar_angles['eot'] = self.eot
self.solar_angles['declination angle'] = self.declinations
self.solar_angles['solar altitude'] = self.solar_altitudes
self.solar_angles['zenith'] = self.zeniths
self.solar_angles['solar azimuth'] = self.solar_azimuths
self.solar_angles['incident angle'] = self.incidents
self.solar_angles['extraterrestrial normal radiation (Wh/m2)'] = self.i_on
self.solar_angles['extraterrestrial radiation on horizontal (Wh/m2)'] = self.i_oh
if csv_output:
self.solar_angles.to_csv('solar_angles_new.csv')
def tilted_irradiance_calculator(self):
if self.solar_angles.empty:
self.solar_angles_calculator()
for building in self.city.buildings:
for roof in building.roofs:
hourly_tilted_irradiance = []
roof_ghi = roof.global_irradiance[cte.HOUR]
for i in range(len(roof_ghi)):
k_t = self.clearness_index(ghi=roof_ghi[i], i_oh=self.i_oh[i])
fraction_diffuse = self.diffuse_fraction(k_t, self.zeniths[i])
diffuse_horizontal, dni = self.radiation_components_horizontal(ghi=roof_ghi[i],
fraction_diffuse=fraction_diffuse,
zenith=self.zeniths[i])
hourly_tilted_irradiance.append(int(self.radiation_components_tilted(diffuse_horizontal=diffuse_horizontal,
dni=dni,
incident_angle=self.incidents[i])))
roof.global_irradiance_tilted[cte.HOUR] = hourly_tilted_irradiance
roof.global_irradiance_tilted[cte.MONTH] = (MonthlyValues.get_total_month(
roof.global_irradiance_tilted[cte.HOUR]))
roof.global_irradiance_tilted[cte.YEAR] = [sum(roof.global_irradiance_tilted[cte.MONTH])]

129
random_assignation.py Normal file
View File

@ -0,0 +1,129 @@
"""
This project aims to assign energy systems archetype names to Montreal buildings.
The random assignation is based on statistical information extracted from different sources, being:
- For residential buildings:
- SHEU 2015: https://oee.nrcan.gc.ca/corporate/statistics/neud/dpa/menus/sheu/2015/tables.cfm
- For non-residential buildings:
- Montreal dataportal: https://dataportalforcities.org/north-america/canada/quebec/montreal
- https://www.eia.gov/consumption/commercial/data/2018/
"""
import json
import random
from hub.city_model_structure.building import Building
energy_systems_format = 'montreal_future'
# parameters:
residential_systems_percentage = {
'Central Hydronic Air and Gas Source Heating System with Unitary Split Cooling and Air Source HP DHW and Grid Tied PV': 100,
'Central Hydronic Air and Electricity Source Heating System with Unitary Split Cooling and Air Source HP DHW and Grid Tied PV': 0,
'Central Hydronic Ground and Gas Source Heating System with Unitary Split Cooling and Air Source HP DHW and Grid Tied PV': 0,
'Central Hydronic Ground and Electricity Source Heating System with Unitary Split Cooling and Air Source HP DHW '
'and Grid Tied PV': 0,
'Central Hydronic Water and Gas Source Heating System with Unitary Split Cooling and Air Source HP DHW and Grid Tied PV': 0,
'Central Hydronic Water and Electricity Source Heating System with Unitary Split Cooling and Air Source HP DHW '
'and Grid Tied PV': 0,
'Central Hydronic Air and Gas Source Heating System with Unitary Split and Air Source HP DHW': 0,
'Central Hydronic Air and Electricity Source Heating System with Unitary Split and Air Source HP DHW': 0,
'Central Hydronic Ground and Gas Source Heating System with Unitary Split and Air Source HP DHW': 0,
'Central Hydronic Ground and Electricity Source Heating System with Unitary Split and Air Source HP DHW': 0,
'Central Hydronic Water and Gas Source Heating System with Unitary Split and Air Source HP DHW': 0,
'Central Hydronic Water and Electricity Source Heating System with Unitary Split and Air Source HP DHW': 0,
'Grid Tied PV System': 0,
'system 1 gas': 0,
'system 1 gas grid tied pv': 0,
'system 1 electricity': 0,
'system 1 electricity grid tied pv': 0,
'system 2 gas': 0,
'system 2 gas grid tied pv': 0,
'system 2 electricity': 0,
'system 2 electricity grid tied pv': 0,
'system 3 and 4 gas': 0,
'system 3 and 4 gas grid tied pv': 0,
'system 3 and 4 electricity': 0,
'system 3 and 4 electricity grid tied pv': 0,
'system 6 gas': 0,
'system 6 gas grid tied pv': 0,
'system 6 electricity': 0,
'system 6 electricity grid tied pv': 0,
'system 8 gas': 0,
'system 8 gas grid tied pv': 0,
'system 8 electricity': 0,
'system 8 electricity grid tied pv': 0,
}
non_residential_systems_percentage = {'system 1 gas': 0,
'system 1 electricity': 0,
'system 2 gas': 0,
'system 2 electricity': 0,
'system 3 and 4 gas': 39,
'system 3 and 4 electricity': 36,
'system 5 gas': 0,
'system 5 electricity': 0,
'system 6 gas': 13,
'system 6 electricity': 12,
'system 8 gas': 0,
'system 8 electricity': 0}
def _retrieve_buildings(path, year_of_construction_field=None,
function_field=None, function_to_hub=None, aliases_field=None):
_buildings = []
with open(path, 'r', encoding='utf8') as json_file:
_geojson = json.loads(json_file.read())
for feature in _geojson['features']:
_building = {}
year_of_construction = None
if year_of_construction_field is not None:
year_of_construction = int(feature['properties'][year_of_construction_field])
function = None
if function_field is not None:
function = feature['properties'][function_field]
if function_to_hub is not None:
# use the transformation dictionary to retrieve the proper function
if function in function_to_hub:
function = function_to_hub[function]
building_name = ''
building_aliases = []
if 'id' in feature:
building_name = feature['id']
if aliases_field is not None:
for alias_field in aliases_field:
building_aliases.append(feature['properties'][alias_field])
_building['year_of_construction'] = year_of_construction
_building['function'] = function
_building['building_name'] = building_name
_building['building_aliases'] = building_aliases
_buildings.append(_building)
return _buildings
def call_random(_buildings: [Building], _systems_percentage):
_buildings_with_systems = []
_systems_distribution = []
_selected_buildings = list(range(0, len(_buildings)))
random.shuffle(_selected_buildings)
total = 0
maximum = 0
add_to = 0
for _system in _systems_percentage:
if _systems_percentage[_system] > 0:
number_of_buildings = round(_systems_percentage[_system] / 100 * len(_selected_buildings))
_systems_distribution.append({'system': _system, 'number': _systems_percentage[_system],
'number_of_buildings': number_of_buildings})
if number_of_buildings > maximum:
maximum = number_of_buildings
add_to = len(_systems_distribution) - 1
total += number_of_buildings
missing = 0
if total != len(_selected_buildings):
missing = len(_selected_buildings) - total
if missing != 0:
_systems_distribution[add_to]['number_of_buildings'] += missing
_position = 0
for case in _systems_distribution:
for i in range(0, case['number_of_buildings']):
_buildings[_selected_buildings[_position]].energy_systems_archetype_name = case['system']
_position += 1
return _buildings