118 lines
4.8 KiB
Python
118 lines
4.8 KiB
Python
import json
|
|
import os
|
|
import shutil
|
|
import zipfile
|
|
from pathlib import Path
|
|
|
|
from flask import Response, request, send_file, make_response
|
|
from flask_restful import Resource
|
|
from hub.exports.energy_building_exports_factory import EnergyBuildingsExportsFactory
|
|
from hub.imports.construction_factory import ConstructionFactory
|
|
from hub.imports.geometry_factory import GeometryFactory
|
|
from hub.imports.usage_factory import UsageFactory
|
|
from hub.imports.weather_factory import WeatherFactory
|
|
from werkzeug.utils import secure_filename
|
|
|
|
from hub_api.config import Config
|
|
from hub_api.helpers.session_helper import refresh_session
|
|
|
|
|
|
class EnergyPlus(Resource, Config):
|
|
def __init__(self):
|
|
super().__init__()
|
|
self._extensions = ['.geojson', '.gml']
|
|
self._tmp_path = (Path(__file__).parent / 'tmp').resolve()
|
|
self._response_path = (Path(__file__).parent.parent / 'response_files').resolve()
|
|
self._city = None
|
|
|
|
def _allowed_extensions(self, filename):
|
|
self._file_extension = Path(filename).suffix
|
|
return self._file_extension in self._extensions
|
|
|
|
def _geojson(self, file_path):
|
|
try:
|
|
height_field = request.form.get('height_field')
|
|
year_of_construction_field = request.form.get('year_of_construction_field')
|
|
function_field = request.form.get('function_field')
|
|
function_dictionary = self._dictionaries[request.form.get('function_to_hub')]
|
|
return GeometryFactory('geojson',
|
|
path=file_path,
|
|
height_field=height_field,
|
|
year_of_construction_field=year_of_construction_field,
|
|
function_field=function_field,
|
|
function_to_hub=function_dictionary).city
|
|
except KeyError:
|
|
return None
|
|
|
|
def _citygml(self, file_path):
|
|
try:
|
|
year_of_construction_field = request.form.get('year_of_construction_field')
|
|
function_field = request.form.get('function_field')
|
|
function_dictionary = self._dictionaries[request.form.get('function_dictionary_name')]
|
|
hub_crs = request.form.get('hub_crs')
|
|
return GeometryFactory('citygml',
|
|
path=file_path,
|
|
year_of_construction_field=year_of_construction_field,
|
|
function_field=function_field,
|
|
function_to_hub=function_dictionary,
|
|
hub_crs=hub_crs).city
|
|
except KeyError:
|
|
return None
|
|
|
|
def post(self):
|
|
"""
|
|
API call for performing the monthly energy balance workflow
|
|
"""
|
|
session_id = request.headers.get('session-id', None)
|
|
token = request.headers.get('token', None)
|
|
application_uuid = request.headers.get('application-uuid', None)
|
|
_session = refresh_session(session_id, token, application_uuid)
|
|
if _session is None:
|
|
return Response(json.dumps({'error': 'unauthorized'}), status=403)
|
|
else:
|
|
response_token = {'token': _session['token']}
|
|
tmp_path = (self._tmp_path / token).resolve()
|
|
response_path = (self._response_path / session_id).resolve()
|
|
try:
|
|
os.mkdir(tmp_path)
|
|
except FileExistsError:
|
|
pass
|
|
try:
|
|
os.mkdir(response_path)
|
|
except FileExistsError:
|
|
pass
|
|
geometry_file = request.files['geometry_file']
|
|
if not self._allowed_extensions(geometry_file.filename):
|
|
shutil.rmtree(tmp_path)
|
|
return Response(json.dumps({'error': 'Unsupported media type'}), status=415, headers=response_token)
|
|
filename = secure_filename(geometry_file.filename)
|
|
file_path = os.path.join(tmp_path, filename)
|
|
geometry_file.save(file_path)
|
|
if self._file_extension == '.geojson':
|
|
self._city = self._geojson(file_path)
|
|
else:
|
|
self._city = self._citygml(file_path)
|
|
if self._city is None:
|
|
shutil.rmtree(tmp_path)
|
|
return Response(json.dumps({'error': 'Bad request'}), status=400, headers=response_token)
|
|
construction_handler = request.form.get('construction_handler')
|
|
usage_handler = request.form.get('usage_handler')
|
|
WeatherFactory('epw', self._city).enrich()
|
|
ConstructionFactory(construction_handler, self._city).enrich()
|
|
UsageFactory(usage_handler, self._city).enrich()
|
|
_idf = EnergyBuildingsExportsFactory('idf', self._city, tmp_path).export()
|
|
_idf.run()
|
|
result_files = [
|
|
str((tmp_path / f'{self._city.name}_out.csv').resolve()),
|
|
str((tmp_path / f'{self._city.name}_out.eso').resolve()),
|
|
str((tmp_path / f'{self._city.name}.idf').resolve()),
|
|
]
|
|
result_zip = (response_path / f'{token}.zip').resolve()
|
|
with zipfile.ZipFile(result_zip, 'w') as zf:
|
|
for result_file in result_files:
|
|
zf.write(result_file, Path(result_file).name)
|
|
shutil.rmtree(tmp_path)
|
|
response = make_response(send_file(result_zip))
|
|
response.headers['token'] = token
|
|
return response
|