""" Test db factory SPDX - License - Identifier: LGPL - 3.0 - or -later Copyright © 2022 Concordia CERC group Project Coder Peter Yefi peteryefi@gmail.com """ import distutils.spawn import glob import json import logging import os import subprocess import unittest from pathlib import Path from unittest import TestCase import sqlalchemy.exc from sqlalchemy import create_engine from sqlalchemy.exc import ProgrammingError import hub.helpers.constants as cte from hub.exports.energy_building_exports_factory import EnergyBuildingsExportsFactory from hub.exports.exports_factory import ExportsFactory from hub.helpers.data.montreal_function_to_hub_function import MontrealFunctionToHubFunction from hub.imports.construction_factory import ConstructionFactory from hub.imports.energy_systems_factory import EnergySystemsFactory from hub.imports.geometry_factory import GeometryFactory from hub.imports.results_factory import ResultFactory from hub.imports.usage_factory import UsageFactory from hub.imports.weather_factory import WeatherFactory from cerc_persistence.db_control import DBControl from cerc_persistence.models import City, Application, CityObject, SimulationResults from cerc_persistence.models import User, UserRoles from cerc_persistence.repository import Repository class Control: _skip_test = False _skip_reason = 'PostgreSQL not properly installed in host machine' def __init__(self): """ Test setup :return: None """ self._skip_test = False # Create test database. dotenv_path = Path("{}/.local/etc/hub/.env".format(os.path.expanduser('~'))).resolve() if not dotenv_path.exists(): self._skip_test = True self._skip_reason = f'.env file missing at {dotenv_path}' return dotenv_path = str(dotenv_path) # Check if database exists. repository = Repository(db_name='test_db', app_env='TEST', dotenv_path=dotenv_path) try: engine = create_engine(repository.configuration.connection_string) connection = engine.connect() connection.close() except ProgrammingError: logging.info('Database does not exist. Nothing to delete') except sqlalchemy.exc.OperationalError as operational_error: self._skip_test = True self._skip_reason = f'{operational_error}' return # Create tables if they don't exist. Application.__table__.create(bind=repository.engine, checkfirst=True) User.__table__.create(bind=repository.engine, checkfirst=True) City.__table__.create(bind=repository.engine, checkfirst=True) CityObject.__table__.create(bind=repository.engine, checkfirst=True) SimulationResults.__table__.create(bind=repository.engine, checkfirst=True) # Generate files for the persistence tests. city_file = (Path(__file__).parent / 'tests_data/test.geojson').resolve() output_path = (Path(__file__).parent / 'tests_outputs').resolve() self._city = GeometryFactory('geojson', city_file, height_field='citygml_me', year_of_construction_field='ANNEE_CONS', aliases_field=['ID_UEV', 'CIVIQUE_DE', 'NOM_RUE'], function_field='CODE_UTILI', function_to_hub=MontrealFunctionToHubFunction().dictionary).city ConstructionFactory('nrcan', self._city).enrich() UsageFactory('nrcan', self._city).enrich() WeatherFactory('epw', self._city).enrich() ExportsFactory('sra', self._city, output_path).export() sra_file = str((output_path / f'{self._city.name}_sra.xml').resolve()) subprocess.run([self.sra, sra_file], stdout=subprocess.DEVNULL) ResultFactory('sra', self._city, output_path).enrich() for building in self._city.buildings: building.energy_systems_archetype_name = 'system 1 gas pv' EnergySystemsFactory('montreal_custom', self._city).enrich() EnergyBuildingsExportsFactory('insel_monthly_energy_balance', self._city, output_path).export() _insel_files = glob.glob(f'{output_path}/*.insel') for insel_file in _insel_files: subprocess.run([self.insel, str(insel_file)], stdout=subprocess.DEVNULL) ResultFactory('insel_monthly_energy_balance', self._city, output_path).enrich() # Create test application and user. self._database = DBControl( db_name=repository.configuration.db_name, app_env='TEST', dotenv_path=dotenv_path) self._application_uuid = 'b9e0ce80-1218-410c-8a64-9d9b7026aad8' self._application_id = self._database.persist_application( 'test', 'test', self.application_uuid ) self._user_id = self._database.create_user('test', self._application_id, 'test', UserRoles.Admin) self._pickle_path = (Path(__file__).parent / 'tests_data/pickle_path.bz2').resolve() @property def database(self): return self._database @property def application_uuid(self): return self._application_uuid @property def application_id(self): return self._application_id @property def user_id(self): return self._user_id @property def skip_test(self): return self._skip_test @property def insel(self): return distutils.spawn.find_executable('insel') @property def sra(self): return distutils.spawn.find_executable('sra') @property def skip_insel_test(self): return self.insel is None @property def skip_reason(self): return self._skip_reason @property def message(self): return self._skip_reason @property def city(self): return self._city @property def pickle_path(self): return self._pickle_path control = Control() class TestDBFactory(TestCase): """ TestDBFactory """ @unittest.skipIf(control.skip_test, control.skip_reason) def test_create_city(self): control.city.name = "Montreal" city_id = control.database.persist_city( control.city, control.pickle_path, control.city.name, control.application_id, control.user_id) # Check the city was updated cities = control.database.cities_by_user_and_application( control.user_id, control.application_id) for updated_city in cities: if updated_city.id == city_id: self.assertEqual(updated_city.name, control.city.name) break # Tear down after test control.database.delete_city(city_id) @unittest.skipIf(control.skip_test, control.skip_reason) def test_update_city(self): # Create and update city name city_id = control.database.persist_city(control.city, control.pickle_path, control.city.name, control.application_id, control.user_id) # Update city control.city.name = "Ottawa" control.city.pickle_path = "new_pickle_path" control.database.update_city(city_id, control.city) # Check the city was updated cities = control.database.cities_by_user_and_application( control.user_id, control.application_id) for updated_city in cities: if updated_city.id == city_id: self.assertEqual(updated_city.name, control.city.name) break # Delete extra city created control.database.delete_city(city_id) @unittest.skipIf(control.skip_test, control.skip_reason) def test_get_deleted_city(self): control.city.name = "Montreal" city_id = control.database.persist_city( control.city, control.pickle_path, control.city.name, control.application_id, control.user_id) control.database.delete_city(city_id) # Check the city was deleted properly cities = control.database.cities_by_user_and_application( control.user_id, control.application_id) city_id_found = False for updated_city in cities: if updated_city.id == city_id: city_id_found = True break self.assertTrue(not city_id_found, "The city_id was not deleted successfully") @unittest.skipIf(control.skip_test, control.skip_reason) @unittest.skipIf(control.skip_insel_test, 'insel is not installed') def test_save_results(self): # Create city city_id = control.database.persist_city(control.city, control.pickle_path, 'current status', control.application_id, control.user_id) # Create city objects city_objects_id = [] expected_results = {} for building in control.city.buildings: building_info = control.database.building_info(building.name, city_id) if cte.MONTH not in building.cooling_demand: print(f'building {building.name} not calculated') continue monthly_cooling_peak_load = building.cooling_peak_load[cte.MONTH] yearly_cooling_peak_load = building.cooling_peak_load[cte.YEAR] monthly_heating_peak_load = building.heating_peak_load[cte.MONTH] yearly_heating_peak_load = building.heating_peak_load[cte.YEAR] monthly_lighting_peak_load = building.lighting_peak_load[cte.MONTH] yearly_lighting_peak_load = building.lighting_peak_load[cte.YEAR] monthly_appliances_peak_load = building.appliances_peak_load[cte.MONTH] yearly_appliances_peak_load = building.appliances_peak_load[cte.YEAR] monthly_cooling_demand = building.cooling_demand[cte.MONTH] yearly_cooling_demand = building.cooling_demand[cte.YEAR] monthly_heating_demand = building.heating_demand[cte.MONTH] yearly_heating_demand = building.heating_demand[cte.YEAR] monthly_lighting_electrical_demand = building.lighting_electrical_demand[cte.MONTH] yearly_lighting_electrical_demand = building.lighting_electrical_demand[cte.YEAR] monthly_appliances_electrical_demand = building.appliances_electrical_demand[cte.MONTH] yearly_appliances_electrical_demand = building.appliances_electrical_demand[cte.YEAR] monthly_domestic_hot_water_heat_demand = building.domestic_hot_water_heat_demand[cte.MONTH] yearly_domestic_hot_water_heat_demand = building.domestic_hot_water_heat_demand[cte.YEAR] monthly_heating_consumption = building.heating_consumption[cte.MONTH] yearly_heating_consumption = building.heating_consumption[cte.YEAR] monthly_cooling_consumption = building.cooling_consumption[cte.MONTH] yearly_cooling_consumption = building.cooling_consumption[cte.YEAR] monthly_domestic_hot_water_consumption = building.domestic_hot_water_consumption[cte.MONTH] yearly_domestic_hot_water_consumption = building._domestic_hot_water_consumption[cte.YEAR] monthly_distribution_systems_electrical_consumption = building.distribution_systems_electrical_consumption[ cte.MONTH] yearly_distribution_systems_electrical_consumption = building.distribution_systems_electrical_consumption[ cte.YEAR] monthly_on_site_electrical_production = [x * cte.WATTS_HOUR_TO_JULES for x in building.onsite_electrical_production[cte.MONTH]] yearly_on_site_electrical_production = [x * cte.WATTS_HOUR_TO_JULES for x in building.onsite_electrical_production[cte.YEAR]] building_expected_results = {cte.INSEL_MEB: { 'monthly_cooling_peak_load': monthly_cooling_peak_load, 'yearly_cooling_peak_load': yearly_cooling_peak_load, 'monthly_heating_peak_load': monthly_heating_peak_load, 'yearly_heating_peak_load': yearly_heating_peak_load, 'monthly_lighting_peak_load': monthly_lighting_peak_load, 'yearly_lighting_peak_load': yearly_lighting_peak_load, 'monthly_appliances_peak_load': monthly_appliances_peak_load, 'yearly_appliances_peak_load': yearly_appliances_peak_load, 'monthly_cooling_demand': monthly_cooling_demand, 'yearly_cooling_demand': yearly_cooling_demand, 'monthly_heating_demand': monthly_heating_demand, 'yearly_heating_demand': yearly_heating_demand, 'monthly_lighting_electrical_demand': monthly_lighting_electrical_demand, 'yearly_lighting_electrical_demand': yearly_lighting_electrical_demand, 'monthly_appliances_electrical_demand': monthly_appliances_electrical_demand, 'yearly_appliances_electrical_demand': yearly_appliances_electrical_demand, 'monthly_domestic_hot_water_heat_demand': monthly_domestic_hot_water_heat_demand, 'yearly_domestic_hot_water_heat_demand': yearly_domestic_hot_water_heat_demand, 'monthly_heating_consumption': monthly_heating_consumption, 'yearly_heating_consumption': yearly_heating_consumption, 'monthly_cooling_consumption': monthly_cooling_consumption, 'yearly_cooling_consumption': yearly_cooling_consumption, 'monthly_domestic_hot_water_consumption': monthly_domestic_hot_water_consumption, 'yearly_domestic_hot_water_consumption': yearly_domestic_hot_water_consumption, 'monthly_distribution_systems_electrical_consumption': monthly_distribution_systems_electrical_consumption, 'yearly_distribution_systems_electrical_consumption': yearly_distribution_systems_electrical_consumption, 'monthly_on_site_electrical_production': monthly_on_site_electrical_production, 'yearly_on_site_electrical_production': yearly_on_site_electrical_production }} expected_results[building.name] = building_expected_results[cte.INSEL_MEB] db_building_id = building_info.id city_objects_id.append(db_building_id) control.database.add_simulation_results( cte.INSEL_MEB, building_expected_results, city_object_id=db_building_id) # Verify 17 city objects have been created self.assertEqual(17, len(city_objects_id), 'wrong number of results') # Verify results have been saved scenario_name = "current status" request_values = { "scenarios": [ { scenario_name: list(expected_results.keys()) }, ] } results = control.database.results(control.user_id, control.application_id, request_values) for result in results[scenario_name]: self.assertEqual(expected_results[result['building']], result[cte.INSEL_MEB], f"building '{result['building']}' saved results does not match expected results") for _id in city_objects_id: control.database.delete_results_by_name(cte.INSEL_MEB, city_object_id=_id) control.database.delete_city(city_id) @unittest.skipIf(control.skip_test, control.skip_reason) def test_get_building(self): # Insert two cities with same buildings but different city names control.city.name = 'Montreal' mtl_city_id = control.database.persist_city( control.city, control.pickle_path, control.city.name, control.application_id, control.user_id) control.city.name = 'Ottawa' ott_city_id = control.database.persist_city( control.city, control.pickle_path, control.city.name, control.application_id, control.user_id) # Retrieve building from database test_building = control.city.buildings[0] city_object = control.database.building(test_building.name, control.user_id, control.application_id, 'Montreal') self.assertEqual(city_object.name, test_building.name, "City name does not match") city_object = control.database.building_info(test_building.name, mtl_city_id) self.assertEqual(city_object.name, test_building.name, "City name does not match") city_object = control.database.building_info_in_cities(test_building.name, [mtl_city_id, ott_city_id]) self.assertEqual(city_object.name, test_building.name, "City name does not match") # TODO: Waiting for the code to be finished to complete this test. city_objects = control.database.buildings_info(control.user_id, control.application_id, [control.city.buildings[0].name, control.city.buildings[1].name]) # for city_obj in city_objects: control.database.delete_city(mtl_city_id) control.database.delete_city(ott_city_id) @unittest.skipIf(control.skip_test, control.skip_reason) def test_get_non_existing_building(self): # Create and delete cities control.city.name = 'Montreal' mtl_city_id = control.database.persist_city( control.city, control.pickle_path, control.city.name, control.application_id, control.user_id) control.city.name = 'Ottawa' ott_city_id = control.database.persist_city( control.city, control.pickle_path, control.city.name, control.application_id, control.user_id) control.database.delete_city(mtl_city_id) control.database.delete_city(ott_city_id) # Retrieve building from database test_building = control.city.buildings[0] result = control.database.building(test_building.name, control.user_id, control.application_id, 'Montreal') self.assertIsNone(result, "City object exists.") result = control.database.building_info(test_building.name, mtl_city_id) self.assertIsNone(result, "City object exists.") result = control.database.building_info_in_cities(test_building.name, [mtl_city_id, ott_city_id]) self.assertIsNone(result, "City object exists.") city_objects = control.database.buildings_info(control.user_id, control.application_id, [control.city.buildings[0].name, control.city.buildings[1].name]) self.assertEqual(len(city_objects), 0, "Found a city object with given values"); @classmethod @unittest.skipIf(control.skip_test, control.skip_reason) def tearDownClass(cls): control.database.delete_application(control.application_uuid) control.database.delete_user(control.user_id) os.unlink(control.pickle_path)