feat: first commit, project tested successfully

This commit is contained in:
Saeed Ranjbar 2024-11-18 09:58:56 +01:00
commit df98713f81
9 changed files with 831 additions and 0 deletions

16
.gitignore vendored Normal file
View File

@ -0,0 +1,16 @@
!.gitignore
**/venv/
.idea/
/development_tests/
/data/energy_systems/heat_pumps/*.csv
/data/energy_systems/heat_pumps/*.insel
.DS_Store
**/.env
**/hub/logs/
**/__pycache__/
**/.idea/
cerc_hub.egg-info
/out_files
/input_files
*/.pyc
*.pyc

View File

@ -0,0 +1,45 @@
import glob
import os
import sys
from pathlib import Path
import csv
from hub.exports.energy_building_exports_factory import EnergyBuildingsExportsFactory
from hub.imports.results_factory import ResultFactory
sys.path.append('../energy_system_modelling_package/')
def energy_plus_workflow(city, output_path):
try:
# city = city
out_path = output_path
files = glob.glob(f'{out_path}/*')
# for file in files:
# if file != '.gitignore':
# os.remove(file)
area = 0
volume = 0
for building in city.buildings:
volume = building.volume
for ground in building.grounds:
area += ground.perimeter_polygon.area
print('exporting:')
_idf = EnergyBuildingsExportsFactory('idf', city, out_path).export()
print(' idf exported...')
_idf.run()
csv_file = str((out_path / f'{city.name}_out.csv').resolve())
eso_file = str((out_path / f'{city.name}_out.eso').resolve())
idf_file = str((out_path / f'{city.name}.idf').resolve())
obj_file = str((out_path / f'{city.name}.obj').resolve())
ResultFactory('energy_plus_multiple_buildings', city, out_path).enrich()
except Exception as ex:
print(ex)
print('error: ', ex)
print('[simulation abort]')
sys.stdout.flush()

View File

@ -0,0 +1,37 @@
import json
from shapely import Polygon
from shapely import Point
from pathlib import Path
def process_geojson(x, y, diff, path, expansion=False):
selection_box = Polygon([[x + diff, y - diff],
[x - diff, y - diff],
[x - diff, y + diff],
[x + diff, y + diff]])
geojson_file = Path(path / 'data/collinear_clean 2.geojson').resolve()
if not expansion:
output_file = Path(path / 'input_files/output_buildings.geojson').resolve()
else:
output_file = Path(path / 'input_files/output_buildings_expanded.geojson').resolve()
buildings_in_region = []
with open(geojson_file, 'r') as file:
city = json.load(file)
buildings = city['features']
for building in buildings:
coordinates = building['geometry']['coordinates'][0]
building_polygon = Polygon(coordinates)
centroid = Point(building_polygon.centroid)
if centroid.within(selection_box):
buildings_in_region.append(building)
output_region = {"type": "FeatureCollection",
"features": buildings_in_region}
with open(output_file, 'w') as file:
file.write(json.dumps(output_region, indent=2))
return output_file

View File

@ -0,0 +1,129 @@
"""
This project aims to assign energy systems archetype names to Montreal buildings.
The random assignation is based on statistical information extracted from different sources, being:
- For residential buildings:
- SHEU 2015: https://oee.nrcan.gc.ca/corporate/statistics/neud/dpa/menus/sheu/2015/tables.cfm
- For non-residential buildings:
- Montreal dataportal: https://dataportalforcities.org/north-america/canada/quebec/montreal
- https://www.eia.gov/consumption/commercial/data/2018/
"""
import json
import random
from hub.city_model_structure.building import Building
energy_systems_format = 'montreal_custom'
# parameters:
residential_systems_percentage = {
'Central Hydronic Air and Gas Source Heating System with Unitary Split Cooling and Air Source HP DHW and Grid Tied PV': 100,
'Central Hydronic Air and Electricity Source Heating System with Unitary Split Cooling and Air Source HP DHW and Grid Tied PV': 0,
'Central Hydronic Ground and Gas Source Heating System with Unitary Split Cooling and Air Source HP DHW and Grid Tied PV': 0,
'Central Hydronic Ground and Electricity Source Heating System with Unitary Split Cooling and Air Source HP DHW '
'and Grid Tied PV': 0,
'Central Hydronic Water and Gas Source Heating System with Unitary Split Cooling and Air Source HP DHW and Grid Tied PV': 0,
'Central Hydronic Water and Electricity Source Heating System with Unitary Split Cooling and Air Source HP DHW '
'and Grid Tied PV': 0,
'Central Hydronic Air and Gas Source Heating System with Unitary Split and Air Source HP DHW': 0,
'Central Hydronic Air and Electricity Source Heating System with Unitary Split and Air Source HP DHW': 0,
'Central Hydronic Ground and Gas Source Heating System with Unitary Split and Air Source HP DHW': 0,
'Central Hydronic Ground and Electricity Source Heating System with Unitary Split and Air Source HP DHW': 0,
'Central Hydronic Water and Gas Source Heating System with Unitary Split and Air Source HP DHW': 0,
'Central Hydronic Water and Electricity Source Heating System with Unitary Split and Air Source HP DHW': 0,
'Grid Tied PV System': 0,
'system 1 gas': 0,
'system 1 gas grid tied pv': 0,
'system 1 electricity': 0,
'system 1 electricity grid tied pv': 0,
'system 2 gas': 0,
'system 2 gas grid tied pv': 0,
'system 2 electricity': 0,
'system 2 electricity grid tied pv': 0,
'system 3 and 4 gas': 0,
'system 3 and 4 gas grid tied pv': 0,
'system 3 and 4 electricity': 0,
'system 3 and 4 electricity grid tied pv': 0,
'system 6 gas': 0,
'system 6 gas grid tied pv': 0,
'system 6 electricity': 0,
'system 6 electricity grid tied pv': 0,
'system 8 gas': 0,
'system 8 gas grid tied pv': 0,
'system 8 electricity': 0,
'system 8 electricity grid tied pv': 0,
}
non_residential_systems_percentage = {'system 1 gas': 0,
'system 1 electricity': 0,
'system 2 gas': 0,
'system 2 electricity': 0,
'system 3 and 4 gas': 39,
'system 3 and 4 electricity': 36,
'system 5 gas': 0,
'system 5 electricity': 0,
'system 6 gas': 13,
'system 6 electricity': 12,
'system 8 gas': 0,
'system 8 electricity': 0}
def _retrieve_buildings(path, year_of_construction_field=None,
function_field=None, function_to_hub=None, aliases_field=None):
_buildings = []
with open(path, 'r', encoding='utf8') as json_file:
_geojson = json.loads(json_file.read())
for feature in _geojson['features']:
_building = {}
year_of_construction = None
if year_of_construction_field is not None:
year_of_construction = int(feature['properties'][year_of_construction_field])
function = None
if function_field is not None:
function = feature['properties'][function_field]
if function_to_hub is not None:
# use the transformation dictionary to retrieve the proper function
if function in function_to_hub:
function = function_to_hub[function]
building_name = ''
building_aliases = []
if 'id' in feature:
building_name = feature['id']
if aliases_field is not None:
for alias_field in aliases_field:
building_aliases.append(feature['properties'][alias_field])
_building['year_of_construction'] = year_of_construction
_building['function'] = function
_building['building_name'] = building_name
_building['building_aliases'] = building_aliases
_buildings.append(_building)
return _buildings
def call_random(_buildings: [Building], _systems_percentage):
_buildings_with_systems = []
_systems_distribution = []
_selected_buildings = list(range(0, len(_buildings)))
random.shuffle(_selected_buildings)
total = 0
maximum = 0
add_to = 0
for _system in _systems_percentage:
if _systems_percentage[_system] > 0:
number_of_buildings = round(_systems_percentage[_system] / 100 * len(_selected_buildings))
_systems_distribution.append({'system': _system, 'number': _systems_percentage[_system],
'number_of_buildings': number_of_buildings})
if number_of_buildings > maximum:
maximum = number_of_buildings
add_to = len(_systems_distribution) - 1
total += number_of_buildings
missing = 0
if total != len(_selected_buildings):
missing = len(_selected_buildings) - total
if missing != 0:
_systems_distribution[add_to]['number_of_buildings'] += missing
_position = 0
for case in _systems_distribution:
for i in range(0, case['number_of_buildings']):
_buildings[_selected_buildings[_position]].energy_systems_archetype_name = case['system']
_position += 1
return _buildings

View File

View File

@ -0,0 +1,83 @@
from pathlib import Path
import subprocess
from building_modelling.ep_run_enrich import energy_plus_workflow
from building_modelling import random_assignation
from pv_assessment.pv_system_assessment import PvSystemAssessment
from pv_assessment.solar_calculator import SolarCalculator
from hub.imports.energy_systems_factory import EnergySystemsFactory
from hub.imports.geometry_factory import GeometryFactory
from hub.helpers.dictionaries import Dictionaries
from hub.imports.construction_factory import ConstructionFactory
from hub.imports.usage_factory import UsageFactory
from hub.imports.weather_factory import WeatherFactory
from hub.imports.results_factory import ResultFactory
from building_modelling.geojson_creator import process_geojson
from hub.exports.exports_factory import ExportsFactory
# Define paths for input and output directories, ensuring directories are created if they do not exist
main_path = Path(__file__).parent.parent.resolve()
input_files_path = (Path(__file__).parent.parent / 'input_files')
input_files_path.mkdir(parents=True, exist_ok=True)
output_path = (Path(__file__).parent.parent / 'out_files').resolve()
output_path.mkdir(parents=True, exist_ok=True)
# Define specific paths for outputs from EnergyPlus and SRA (Simplified Radiosity Algorith) and PV calculation processes
energy_plus_output_path = output_path / 'energy_plus_outputs'
energy_plus_output_path.mkdir(parents=True, exist_ok=True)
sra_output_path = output_path / 'sra_outputs'
sra_output_path.mkdir(parents=True, exist_ok=True)
pv_assessment_path = output_path / 'pv_outputs'
pv_assessment_path.mkdir(parents=True, exist_ok=True)
# Generate a GeoJSON file for city buildings based on latitude, longitude, and building dimensions
geojson_file_path = input_files_path / 'output_buildings.geojson'
# Initialize a city object from the geojson file, mapping building functions using a predefined dictionary
city = GeometryFactory(file_type='geojson',
path=geojson_file_path,
height_field='height',
year_of_construction_field='year_of_construction',
function_field='function',
function_to_hub=Dictionaries().montreal_function_to_hub_function).city
# Enrich city data with construction, usage, and weather information specific to the location
ConstructionFactory('nrcan', city).enrich()
UsageFactory('nrcan', city).enrich()
WeatherFactory('epw', city).enrich()
# Execute the EnergyPlus workflow to simulate building energy performance and generate output
energy_plus_workflow(city, energy_plus_output_path)
# Export the city data in SRA-compatible format to facilitate solar radiation assessment
ExportsFactory('sra', city, sra_output_path).export()
# Run SRA simulation using an external command, passing the generated SRA XML file path as input
sra_path = (sra_output_path / f'{city.name}_sra.xml').resolve()
subprocess.run(['sra', str(sra_path)])
# Enrich city data with SRA simulation results for subsequent analysis
ResultFactory('sra', city, sra_output_path).enrich()
# Assign PV system archetype name to the buildings in city
random_assignation.call_random(city.buildings, random_assignation.residential_systems_percentage)
# Enrich city model with Montreal future systems parameters
EnergySystemsFactory('montreal_future', city).enrich()
# # Initialize solar calculation parameters (e.g., azimuth, altitude) and compute irradiance and solar angles
tilt_angle = 37
solar_parameters = SolarCalculator(city=city,
surface_azimuth_angle=180,
tilt_angle=tilt_angle,
standard_meridian=-75)
solar_angles = solar_parameters.solar_angles # Obtain solar angles for further analysis
solar_parameters.tilted_irradiance_calculator() # Calculate the solar radiation on a tilted surface
# # PV modelling building by building
#List of available PV modules ['RE400CAA Pure 2', 'RE410CAA Pure 2', 'RE420CAA Pure 2', 'RE430CAA Pure 2',
# 'REC600AA Pro M', 'REC610AA Pro M', 'REC620AA Pro M', 'REC630AA Pro M', 'REC640AA Pro M']
for building in city.buildings:
PvSystemAssessment(building=building,
pv_system=None,
battery=None,
tilt_angle=tilt_angle,
solar_angles=solar_angles,
pv_installation_type='rooftop',
simulation_model_type='explicit',
module_model_name=None,
inverter_efficiency=0.95,
system_catalogue_handler=None,
roof_percentage_coverage=0.75,
facade_coverage_percentage=0,
csv_output=False,
output_path=pv_assessment_path).enrich()

View File

@ -0,0 +1,75 @@
import hub.helpers.constants as cte
class HourlyElectricityDemand:
def __init__(self, building):
self.building = building
def calculate(self):
hourly_electricity_consumption = []
energy_systems = self.building.energy_systems
appliance = self.building.appliances_electrical_demand[cte.HOUR] if self.building.appliances_electrical_demand else 0
lighting = self.building.lighting_electrical_demand[cte.HOUR] if self.building.lighting_electrical_demand else 0
elec_heating = 0
elec_cooling = 0
elec_dhw = 0
if cte.HEATING in self.building.energy_consumption_breakdown[cte.ELECTRICITY]:
elec_heating = 1
if cte.COOLING in self.building.energy_consumption_breakdown[cte.ELECTRICITY]:
elec_cooling = 1
if cte.DOMESTIC_HOT_WATER in self.building.energy_consumption_breakdown[cte.ELECTRICITY]:
elec_dhw = 1
heating = None
cooling = None
dhw = None
if elec_heating == 1:
for energy_system in energy_systems:
if cte.HEATING in energy_system.demand_types:
for generation_system in energy_system.generation_systems:
if generation_system.fuel_type == cte.ELECTRICITY:
if cte.HEATING in generation_system.energy_consumption:
heating = generation_system.energy_consumption[cte.HEATING][cte.HOUR]
else:
if len(energy_system.generation_systems) > 1:
heating = [x / 2 for x in self.building.heating_consumption[cte.HOUR]]
else:
heating = self.building.heating_consumption[cte.HOUR]
if elec_dhw == 1:
for energy_system in energy_systems:
if cte.DOMESTIC_HOT_WATER in energy_system.demand_types:
for generation_system in energy_system.generation_systems:
if generation_system.fuel_type == cte.ELECTRICITY:
if cte.DOMESTIC_HOT_WATER in generation_system.energy_consumption:
dhw = generation_system.energy_consumption[cte.DOMESTIC_HOT_WATER][cte.HOUR]
else:
if len(energy_system.generation_systems) > 1:
dhw = [x / 2 for x in self.building.domestic_hot_water_consumption[cte.HOUR]]
else:
dhw = self.building.domestic_hot_water_consumption[cte.HOUR]
if elec_cooling == 1:
for energy_system in energy_systems:
if cte.COOLING in energy_system.demand_types:
for generation_system in energy_system.generation_systems:
if cte.COOLING in generation_system.energy_consumption:
cooling = generation_system.energy_consumption[cte.COOLING][cte.HOUR]
else:
if len(energy_system.generation_systems) > 1:
cooling = [x / 2 for x in self.building.cooling_consumption[cte.HOUR]]
else:
cooling = self.building.cooling_consumption[cte.HOUR]
for i in range(8760):
hourly = 0
if isinstance(appliance, list):
hourly += appliance[i]
if isinstance(lighting, list):
hourly += lighting[i]
if heating is not None:
hourly += heating[i]
if cooling is not None:
hourly += cooling[i]
if dhw is not None:
hourly += dhw[i]
hourly_electricity_consumption.append(hourly)
return hourly_electricity_consumption

View File

@ -0,0 +1,225 @@
import math
import csv
import hub.helpers.constants as cte
from pv_assessment.electricity_demand_calculator import HourlyElectricityDemand
from hub.catalog_factories.energy_systems_catalog_factory import EnergySystemsCatalogFactory
from hub.helpers.monthly_values import MonthlyValues
class PvSystemAssessment:
def __init__(self, building=None, pv_system=None, battery=None, tilt_angle=None, solar_angles=None,
pv_installation_type=None, simulation_model_type=None, module_model_name=None,
inverter_efficiency=None, system_catalogue_handler=None, roof_percentage_coverage=None,
facade_coverage_percentage=None, csv_output=False, output_path=None):
"""
:param building:
:param tilt_angle:
:param solar_angles:
:param simulation_model_type:
:param module_model_name:
:param inverter_efficiency:
:param system_catalogue_handler:
:param roof_percentage_coverage:
:param facade_coverage_percentage:
"""
self.building = building
self.tilt_angle = tilt_angle
self.solar_angles = solar_angles
self.pv_installation_type = pv_installation_type
self.simulation_model_type = simulation_model_type
self.module_model_name = module_model_name
self.inverter_efficiency = inverter_efficiency
self.system_catalogue_handler = system_catalogue_handler
self.roof_percentage_coverage = roof_percentage_coverage
self.facade_coverage_percentage = facade_coverage_percentage
self.pv_hourly_generation = None
self.t_cell = None
self.results = {}
self.csv_output = csv_output
self.output_path = output_path
if pv_system is not None:
self.pv_system = pv_system
else:
for energy_system in self.building.energy_systems:
for generation_system in energy_system.generation_systems:
if generation_system.system_type == cte.PHOTOVOLTAIC:
self.pv_system = generation_system
if battery is not None:
self.battery = battery
else:
for energy_system in self.building.energy_systems:
for generation_system in energy_system.generation_systems:
if generation_system.system_type == cte.PHOTOVOLTAIC and generation_system.energy_storage_systems is not None:
for storage_system in generation_system.energy_storage_systems:
if storage_system.type_energy_stored == cte.ELECTRICAL:
self.battery = storage_system
@staticmethod
def explicit_model(pv_system, inverter_efficiency, number_of_panels, irradiance, outdoor_temperature):
inverter_efficiency = inverter_efficiency
stc_power = float(pv_system.standard_test_condition_maximum_power)
stc_irradiance = float(pv_system.standard_test_condition_radiation)
cell_temperature_coefficient = float(pv_system.cell_temperature_coefficient) / 100 if (
pv_system.cell_temperature_coefficient is not None) else None
stc_t_cell = float(pv_system.standard_test_condition_cell_temperature)
nominal_condition_irradiance = float(pv_system.nominal_radiation)
nominal_condition_cell_temperature = float(pv_system.nominal_cell_temperature)
nominal_t_out = float(pv_system.nominal_ambient_temperature)
g_i = irradiance
t_out = outdoor_temperature
t_cell = []
pv_output = []
for i in range(len(g_i)):
t_cell.append((t_out[i] + (g_i[i] / nominal_condition_irradiance) *
(nominal_condition_cell_temperature - nominal_t_out)))
pv_output.append((inverter_efficiency * number_of_panels * (stc_power * (g_i[i] / stc_irradiance) *
(1 - cell_temperature_coefficient *
(t_cell[i] - stc_t_cell)))))
return pv_output
def rooftop_sizing(self):
pv_system = self.pv_system
if self.module_model_name is not None:
self.system_assignation()
# System Sizing
module_width = float(pv_system.width)
module_height = float(pv_system.height)
roof_area = 0
for roof in self.building.roofs:
roof_area += roof.perimeter_area
pv_module_area = module_width * module_height
available_roof = (self.roof_percentage_coverage * roof_area)
# Inter-Row Spacing
winter_solstice = self.solar_angles[(self.solar_angles['AST'].dt.month == 12) &
(self.solar_angles['AST'].dt.day == 21) &
(self.solar_angles['AST'].dt.hour == 12)]
solar_altitude = winter_solstice['solar altitude'].values[0]
solar_azimuth = winter_solstice['solar azimuth'].values[0]
distance = ((module_height * math.sin(math.radians(self.tilt_angle)) * abs(
math.cos(math.radians(solar_azimuth)))) / math.tan(math.radians(solar_altitude)))
distance = float(format(distance, '.2f'))
# Calculation of the number of panels
space_dimension = math.sqrt(available_roof)
space_dimension = float(format(space_dimension, '.2f'))
panels_per_row = math.ceil(space_dimension / module_width)
number_of_rows = math.ceil(space_dimension / distance)
total_number_of_panels = panels_per_row * number_of_rows
total_pv_area = total_number_of_panels * pv_module_area
self.building.roofs[0].installed_solar_collector_area = total_pv_area
return panels_per_row, number_of_rows
def system_assignation(self):
generation_units_catalogue = EnergySystemsCatalogFactory(self.system_catalogue_handler).catalog
catalog_pv_generation_equipments = [component for component in
generation_units_catalogue.entries('generation_equipments') if
component.system_type == 'photovoltaic']
selected_pv_module = None
for pv_module in catalog_pv_generation_equipments:
if self.module_model_name == pv_module.model_name:
selected_pv_module = pv_module
if selected_pv_module is None:
raise ValueError("No PV module with the provided model name exists in the catalogue")
for energy_system in self.building.energy_systems:
for idx, generation_system in enumerate(energy_system.generation_systems):
if generation_system.system_type == cte.PHOTOVOLTAIC:
new_system = selected_pv_module
# Preserve attributes that exist in the original but not in the new system
for attr in dir(generation_system):
# Skip private attributes and methods
if not attr.startswith('__') and not callable(getattr(generation_system, attr)):
if not hasattr(new_system, attr):
setattr(new_system, attr, getattr(generation_system, attr))
# Replace the old generation system with the new one
energy_system.generation_systems[idx] = new_system
def grid_tied_system(self):
building_hourly_electricity_demand = [demand / cte.WATTS_HOUR_TO_JULES for demand in
HourlyElectricityDemand(self.building).calculate()]
rooftop_pv_output = [0] * 8760
facade_pv_output = [0] * 8760
rooftop_number_of_panels = 0
if 'rooftop' in self.pv_installation_type.lower():
np, ns = self.rooftop_sizing()
if self.simulation_model_type == 'explicit':
rooftop_number_of_panels = np * ns
rooftop_pv_output = self.explicit_model(pv_system=self.pv_system,
inverter_efficiency=self.inverter_efficiency,
number_of_panels=rooftop_number_of_panels,
irradiance=self.building.roofs[0].global_irradiance_tilted[
cte.HOUR],
outdoor_temperature=self.building.external_temperature[
cte.HOUR])
total_hourly_pv_output = [rooftop_pv_output[i] + facade_pv_output[i] for i in range(8760)]
imported_electricity = [0] * 8760
exported_electricity = [0] * 8760
for i in range(8760):
transfer = total_hourly_pv_output[i] - building_hourly_electricity_demand[i]
if transfer > 0:
exported_electricity[i] = transfer
else:
imported_electricity[i] = abs(transfer)
results = {'building_name': self.building.name,
'total_floor_area_m2': self.building.thermal_zones_from_internal_zones[0].total_floor_area,
'roof_area_m2': self.building.roofs[0].perimeter_area, 'rooftop_panels': rooftop_number_of_panels,
'rooftop_panels_area_m2': self.building.roofs[0].installed_solar_collector_area,
'yearly_rooftop_ghi_kW/m2': self.building.roofs[0].global_irradiance[cte.YEAR][0] / 1000,
f'yearly_rooftop_tilted_radiation_{self.tilt_angle}_degree_kW/m2':
self.building.roofs[0].global_irradiance_tilted[cte.YEAR][0] / 1000,
'yearly_rooftop_pv_production_kWh': sum(rooftop_pv_output) / 1000,
'yearly_total_pv_production_kWh': sum(total_hourly_pv_output) / 1000,
'specific_pv_production_kWh/kWp': sum(rooftop_pv_output) / (
float(self.pv_system.standard_test_condition_maximum_power) * rooftop_number_of_panels),
'hourly_rooftop_poa_irradiance_W/m2': self.building.roofs[0].global_irradiance_tilted[cte.HOUR],
'hourly_rooftop_pv_output_W': rooftop_pv_output, 'T_out': self.building.external_temperature[cte.HOUR],
'building_electricity_demand_W': building_hourly_electricity_demand,
'total_hourly_pv_system_output_W': total_hourly_pv_output, 'import_from_grid_W': imported_electricity,
'export_to_grid_W': exported_electricity}
return results
def enrich(self):
system_archetype_name = self.building.energy_systems_archetype_name
archetype_name = '_'.join(system_archetype_name.lower().split())
if 'grid_tied' in archetype_name:
self.results = self.grid_tied_system()
hourly_pv_output = self.results['total_hourly_pv_system_output_W']
if self.building.onsite_electrical_production:
self.building.pv_generation[cte.HOUR] = hourly_pv_output
self.building.pv_generation[cte.MONTH] = MonthlyValues.get_total_month(hourly_pv_output)
self.building.pv_generation[cte.YEAR] = [sum(hourly_pv_output)]
else:
self.building.onsite_electrical_production[cte.HOUR] = hourly_pv_output
self.building.onsite_electrical_production[cte.MONTH] = MonthlyValues.get_total_month(hourly_pv_output)
self.building.onsite_electrical_production[cte.YEAR] = [sum(hourly_pv_output)]
if self.csv_output:
self.save_to_csv(self.results, self.output_path, f'{self.building.name}_pv_system_analysis.csv')
@staticmethod
def save_to_csv(data, output_path, filename='rooftop_system_results.csv'):
# Separate keys based on whether their values are single values or lists
single_value_keys = [key for key, value in data.items() if not isinstance(value, list)]
list_value_keys = [key for key, value in data.items() if isinstance(value, list)]
# Check if all lists have the same length
list_lengths = [len(data[key]) for key in list_value_keys]
if not all(length == list_lengths[0] for length in list_lengths):
raise ValueError("All lists in the dictionary must have the same length")
# Get the length of list values (assuming all lists are of the same length, e.g., 8760 for hourly data)
num_rows = list_lengths[0] if list_value_keys else 1
# Open the CSV file for writing
with open(output_path / filename, mode='w', newline='') as csv_file:
writer = csv.writer(csv_file)
# Write single-value data as a header section
for key in single_value_keys:
writer.writerow([key, data[key]])
# Write an empty row for separation
writer.writerow([])
# Write the header for the list values
writer.writerow(list_value_keys)
# Write each row for the lists
for i in range(num_rows):
row = [data[key][i] for key in list_value_keys]
writer.writerow(row)

View File

@ -0,0 +1,221 @@
import math
import pandas as pd
from datetime import datetime
import hub.helpers.constants as cte
from hub.helpers.monthly_values import MonthlyValues
class SolarCalculator:
def __init__(self, city, tilt_angle, surface_azimuth_angle, standard_meridian=-75,
solar_constant=1366.1, maximum_clearness_index=1, min_cos_zenith=0.065, maximum_zenith_angle=87):
"""
A class to calculate the solar angles and solar irradiance on a tilted surface in the City
:param city: An object from the City class -> City
:param tilt_angle: tilt angle of surface -> float
:param surface_azimuth_angle: The orientation of the surface. 0 is North -> float
:param standard_meridian: A standard meridian is the meridian whose mean solar time is the basis of the time of day
observed in a time zone -> float
:param solar_constant: The amount of energy received by a given area one astronomical unit away from the Sun. It is
constant and must not be changed
:param maximum_clearness_index: This is used to calculate the diffuse fraction of the solar irradiance -> float
:param min_cos_zenith: This is needed to avoid unrealistic values in tilted irradiance calculations -> float
:param maximum_zenith_angle: This is needed to avoid negative values in tilted irradiance calculations -> float
"""
self.city = city
self.location_latitude = city.latitude
self.location_longitude = city.longitude
self.location_latitude_rad = math.radians(self.location_latitude)
self.surface_azimuth_angle = surface_azimuth_angle
self.surface_azimuth_rad = math.radians(surface_azimuth_angle)
self.tilt_angle = tilt_angle
self.tilt_angle_rad = math.radians(tilt_angle)
self.standard_meridian = standard_meridian
self.longitude_correction = (self.location_longitude - standard_meridian) * 4
self.solar_constant = solar_constant
self.maximum_clearness_index = maximum_clearness_index
self.min_cos_zenith = min_cos_zenith
self.maximum_zenith_angle = maximum_zenith_angle
timezone_offset = int(-standard_meridian / 15)
self.timezone = f'Etc/GMT{"+" if timezone_offset < 0 else "-"}{abs(timezone_offset)}'
self.eot = []
self.ast = []
self.hour_angles = []
self.declinations = []
self.solar_altitudes = []
self.solar_azimuths = []
self.zeniths = []
self.incidents = []
self.i_on = []
self.i_oh = []
self.times = pd.date_range(start='2023-01-01', end='2023-12-31 23:00', freq='h', tz=self.timezone)
self.solar_angles = pd.DataFrame(index=self.times)
self.day_of_year = self.solar_angles.index.dayofyear
def solar_time(self, datetime_val, day_of_year):
b = (day_of_year - 81) * 2 * math.pi / 364
eot = 9.87 * math.sin(2 * b) - 7.53 * math.cos(b) - 1.5 * math.sin(b)
self.eot.append(eot)
# Calculate Local Solar Time (LST)
lst_hour = datetime_val.hour
lst_minute = datetime_val.minute
lst_second = datetime_val.second
lst = lst_hour + lst_minute / 60 + lst_second / 3600
# Calculate Apparent Solar Time (AST) in decimal hours
ast_decimal = lst + eot / 60 + self.longitude_correction / 60
ast_hours = int(ast_decimal) % 24 # Adjust hours to fit within 023 range
ast_minutes = round((ast_decimal - ast_hours) * 60)
# Ensure ast_minutes is within valid range
if ast_minutes == 60:
ast_hours += 1
ast_minutes = 0
elif ast_minutes < 0:
ast_minutes = 0
ast_time = datetime(year=datetime_val.year, month=datetime_val.month, day=datetime_val.day,
hour=ast_hours, minute=ast_minutes)
self.ast.append(ast_time)
return ast_time
def declination_angle(self, day_of_year):
declination = 23.45 * math.sin(math.radians(360 / 365 * (284 + day_of_year)))
declination_radian = math.radians(declination)
self.declinations.append(declination)
return declination_radian
def hour_angle(self, ast_time):
hour_angle = ((ast_time.hour * 60 + ast_time.minute) - 720) / 4
hour_angle_radian = math.radians(hour_angle)
self.hour_angles.append(hour_angle)
return hour_angle_radian
def solar_altitude(self, declination_radian, hour_angle_radian):
solar_altitude_radians = math.asin(math.cos(self.location_latitude_rad) * math.cos(declination_radian) *
math.cos(hour_angle_radian) + math.sin(self.location_latitude_rad) *
math.sin(declination_radian))
solar_altitude = math.degrees(solar_altitude_radians)
self.solar_altitudes.append(solar_altitude)
return solar_altitude_radians
def zenith(self, solar_altitude_radians):
solar_altitude = math.degrees(solar_altitude_radians)
zenith_degree = 90 - solar_altitude
zenith_radian = math.radians(zenith_degree)
self.zeniths.append(zenith_degree)
return zenith_radian
def solar_azimuth_analytical(self, hourangle, declination, zenith):
numer = (math.cos(zenith) * math.sin(self.location_latitude_rad) - math.sin(declination))
denom = (math.sin(zenith) * math.cos(self.location_latitude_rad))
if math.isclose(denom, 0.0, abs_tol=1e-8):
cos_azi = 1.0
else:
cos_azi = numer / denom
cos_azi = max(-1.0, min(1.0, cos_azi))
sign_ha = math.copysign(1, hourangle)
solar_azimuth_radians = sign_ha * math.acos(cos_azi) + math.pi
solar_azimuth_degrees = math.degrees(solar_azimuth_radians)
self.solar_azimuths.append(solar_azimuth_degrees)
return solar_azimuth_radians
def incident_angle(self, solar_altitude_radians, solar_azimuth_radians):
incident_radian = math.acos(math.cos(solar_altitude_radians) *
math.cos(abs(solar_azimuth_radians - self.surface_azimuth_rad)) *
math.sin(self.tilt_angle_rad) + math.sin(solar_altitude_radians) *
math.cos(self.tilt_angle_rad))
incident_angle_degrees = math.degrees(incident_radian)
self.incidents.append(incident_angle_degrees)
return incident_radian
def dni_extra(self, day_of_year, zenith_radian):
i_on = self.solar_constant * (1 + 0.033 * math.cos(math.radians(360 * day_of_year / 365)))
i_oh = i_on * max(math.cos(zenith_radian), self.min_cos_zenith)
self.i_on.append(i_on)
self.i_oh.append(i_oh)
return i_on, i_oh
def clearness_index(self, ghi, i_oh):
k_t = ghi / i_oh
k_t = max(0, k_t)
k_t = min(self.maximum_clearness_index, k_t)
return k_t
def diffuse_fraction(self, k_t, zenith):
if k_t <= 0.22:
fraction_diffuse = 1 - 0.09 * k_t
elif k_t <= 0.8:
fraction_diffuse = (0.9511 - 0.1604 * k_t + 4.388 * k_t ** 2 - 16.638 * k_t ** 3 + 12.336 * k_t ** 4)
else:
fraction_diffuse = 0.165
if zenith > self.maximum_zenith_angle:
fraction_diffuse = 1
return fraction_diffuse
def radiation_components_horizontal(self, ghi, fraction_diffuse, zenith):
diffuse_horizontal = ghi * fraction_diffuse
dni = (ghi - diffuse_horizontal) / math.cos(math.radians(zenith))
if zenith > self.maximum_zenith_angle or dni < 0:
dni = 0
return diffuse_horizontal, dni
def radiation_components_tilted(self, diffuse_horizontal, dni, incident_angle):
beam_tilted = dni * math.cos(math.radians(incident_angle))
beam_tilted = max(beam_tilted, 0)
diffuse_tilted = diffuse_horizontal * ((1 + math.cos(math.radians(self.tilt_angle))) / 2)
total_radiation_tilted = beam_tilted + diffuse_tilted
return total_radiation_tilted
def solar_angles_calculator(self, csv_output=False):
for i in range(len(self.times)):
datetime_val = self.times[i]
day_of_year = self.day_of_year[i]
declination_radians = self.declination_angle(day_of_year)
ast_time = self.solar_time(datetime_val, day_of_year)
hour_angle_radians = self.hour_angle(ast_time)
solar_altitude_radians = self.solar_altitude(declination_radians, hour_angle_radians)
zenith_radians = self.zenith(solar_altitude_radians)
solar_azimuth_radians = self.solar_azimuth_analytical(hour_angle_radians, declination_radians, zenith_radians)
self.incident_angle(solar_altitude_radians, solar_azimuth_radians)
self.dni_extra(day_of_year=day_of_year, zenith_radian=zenith_radians)
self.solar_angles['DateTime'] = self.times
self.solar_angles['AST'] = self.ast
self.solar_angles['hour angle'] = self.hour_angles
self.solar_angles['eot'] = self.eot
self.solar_angles['declination angle'] = self.declinations
self.solar_angles['solar altitude'] = self.solar_altitudes
self.solar_angles['zenith'] = self.zeniths
self.solar_angles['solar azimuth'] = self.solar_azimuths
self.solar_angles['incident angle'] = self.incidents
self.solar_angles['extraterrestrial normal radiation (Wh/m2)'] = self.i_on
self.solar_angles['extraterrestrial radiation on horizontal (Wh/m2)'] = self.i_oh
if csv_output:
self.solar_angles.to_csv('solar_angles_new.csv')
def tilted_irradiance_calculator(self):
if self.solar_angles.empty:
self.solar_angles_calculator()
for building in self.city.buildings:
hourly_tilted_irradiance = []
roof_ghi = building.roofs[0].global_irradiance[cte.HOUR]
for i in range(len(roof_ghi)):
k_t = self.clearness_index(ghi=roof_ghi[i], i_oh=self.i_oh[i])
fraction_diffuse = self.diffuse_fraction(k_t, self.zeniths[i])
diffuse_horizontal, dni = self.radiation_components_horizontal(ghi=roof_ghi[i],
fraction_diffuse=fraction_diffuse,
zenith=self.zeniths[i])
hourly_tilted_irradiance.append(int(self.radiation_components_tilted(diffuse_horizontal=diffuse_horizontal,
dni=dni,
incident_angle=self.incidents[i])))
building.roofs[0].global_irradiance_tilted[cte.HOUR] = hourly_tilted_irradiance
building.roofs[0].global_irradiance_tilted[cte.MONTH] = (MonthlyValues.get_total_month(
building.roofs[0].global_irradiance_tilted[cte.HOUR]))
building.roofs[0].global_irradiance_tilted[cte.YEAR] = [sum(building.roofs[0].global_irradiance_tilted[cte.MONTH])]