mirror of
https://github.com/louisleroy5/trnslator.git
synced 2024-11-15 00:30:31 -05:00
2557 lines
92 KiB
Python
2557 lines
92 KiB
Python
################################################################################
|
|
# Module: idfclass.py
|
|
# Description: Various functions for processing of EnergyPlus models and
|
|
# retrieving results in different forms
|
|
# License: MIT, see full license in LICENSE.txt
|
|
# Web: https://github.com/louisleroy5/translater
|
|
################################################################################
|
|
import datetime
|
|
import glob
|
|
import hashlib
|
|
import inspect
|
|
import json
|
|
import logging as lg
|
|
import os
|
|
import platform
|
|
import subprocess
|
|
import time
|
|
from collections import defaultdict, OrderedDict
|
|
from itertools import compress
|
|
from math import isclose
|
|
from sqlite3 import OperationalError
|
|
from subprocess import CalledProcessError
|
|
from tempfile import TemporaryDirectory
|
|
|
|
import eppy
|
|
import eppy.modeleditor
|
|
import geomeppy
|
|
import pandas as pd
|
|
from eppy.EPlusInterfaceFunctions import parse_idd
|
|
from eppy.bunch_subclass import EpBunch
|
|
from eppy.easyopen import getiddfile
|
|
from path import Path, TempDir
|
|
|
|
import translater
|
|
import translater.settings
|
|
from translater import (
|
|
log,
|
|
settings,
|
|
EnergyPlusProcessError,
|
|
cd,
|
|
ReportData,
|
|
EnergySeries,
|
|
close_logger,
|
|
EnergyPlusVersionError,
|
|
get_eplus_dirs,
|
|
)
|
|
from translater.utils import _unpack_tuple
|
|
|
|
|
|
class IDF(geomeppy.IDF):
|
|
"""Wrapper over the geomeppy.IDF class and subsequently the
|
|
eppy.modeleditor.IDF class
|
|
"""
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
"""
|
|
Args:
|
|
*args:
|
|
**kwargs:
|
|
"""
|
|
super(IDF, self).__init__(*args, **kwargs)
|
|
self._sql_file = None
|
|
self.schedules_dict = self.get_all_schedules()
|
|
self._sql = None
|
|
self._htm = None
|
|
self.eplus_run_options = EnergyPlusOptions(
|
|
eplus_file=self.idfname,
|
|
weather_file=getattr(self, "epw", None),
|
|
ep_version="-".join(map(str, self.idd_version)),
|
|
)
|
|
self.OutputPrep = None
|
|
|
|
@classmethod
|
|
def setiddname(cls, iddname, testing=False):
|
|
"""Set the path to the EnergyPlus IDD for the version of EnergyPlus
|
|
which is to be used by eppy.
|
|
|
|
Args:
|
|
iddname (str): Path to the IDD file.
|
|
testing:
|
|
"""
|
|
cls.iddname = iddname
|
|
cls.idd_info = None
|
|
cls.block = None
|
|
|
|
@property
|
|
def name(self):
|
|
return os.path.basename(self.idfname)
|
|
|
|
@property
|
|
def sql(self):
|
|
if self._sql is None:
|
|
log("No sql object for {}. Running EnergyPlus...".format(self.name))
|
|
self._sql = self.run_eplus(
|
|
annual=True, prep_outputs=True, output_report="sql", verbose="q"
|
|
)
|
|
return self._sql
|
|
else:
|
|
return self._sql
|
|
|
|
@property
|
|
def htm(self):
|
|
if self._htm is None:
|
|
self._htm = self.run_eplus(
|
|
annual=True, prep_outputs=True, output_report="htm"
|
|
)
|
|
return self._htm
|
|
else:
|
|
return self._htm
|
|
|
|
@property
|
|
def sql_file(self):
|
|
if self._sql_file is None:
|
|
log("No sql file for {}. Running EnergyPlus...".format(self.name))
|
|
self._sql_file = self.run_eplus(
|
|
annual=True, prep_outputs=True, output_report="sql_file", verbose="q"
|
|
)
|
|
return self._sql_file
|
|
else:
|
|
return self._sql_file
|
|
|
|
@property
|
|
def area_conditioned(self):
|
|
"""Returns the total conditioned area of a building (taking into account
|
|
zone multipliers
|
|
"""
|
|
area = 0
|
|
zones = self.idfobjects["ZONE"]
|
|
zone: EpBunch
|
|
for zone in zones:
|
|
for surface in zone.zonesurfaces:
|
|
if hasattr(surface, "tilt"):
|
|
if surface.tilt == 180.0:
|
|
part_of = int(zone.Part_of_Total_Floor_Area.upper() != "NO")
|
|
multiplier = float(
|
|
zone.Multiplier if zone.Multiplier != "" else 1
|
|
)
|
|
|
|
area += surface.area * multiplier * part_of
|
|
return area
|
|
|
|
@property
|
|
def partition_ratio(self):
|
|
"""The number of lineal meters of partitions (Floor to ceiling) present
|
|
in average in the building floor plan by m2.
|
|
"""
|
|
partition_lineal = 0
|
|
zones = self.idfobjects["ZONE"]
|
|
zone: EpBunch
|
|
for zone in zones:
|
|
for surface in [
|
|
surf
|
|
for surf in zone.zonesurfaces
|
|
if surf.key.upper() not in ["INTERNALMASS", "WINDOWSHADINGCONTROL"]
|
|
]:
|
|
if hasattr(surface, "tilt"):
|
|
if (
|
|
surface.tilt == 90.0
|
|
and surface.Outside_Boundary_Condition != "Outdoors"
|
|
):
|
|
multiplier = float(
|
|
zone.Multiplier if zone.Multiplier != "" else 1
|
|
)
|
|
partition_lineal += surface.width * multiplier
|
|
|
|
return partition_lineal / self.area_conditioned
|
|
|
|
def wwr(self, azimuth_threshold=10, round_to=None):
|
|
"""Returns the Window-to-Wall Ratio by major orientation for the IDF
|
|
model. Optionally round up the WWR value to nearest value (eg.: nearest
|
|
10).
|
|
|
|
Args:
|
|
azimuth_threshold (int): Defines the incremental major orientation
|
|
azimuth angle. Due to possible rounding errors, some surface
|
|
azimuth can be rounded to values different than the main
|
|
directions (eg.: 89 degrees instead of 90 degrees). Defaults to
|
|
increments of 10 degrees.
|
|
round_to (float): Optionally round the WWR value to nearest value
|
|
(eg.: nearest 10). If None, this is ignored and the float is
|
|
returned.
|
|
|
|
Returns:
|
|
(pd.DataFrame): A DataFrame with the total wall area, total window
|
|
area and WWR for each main orientation of the building.
|
|
"""
|
|
import math
|
|
|
|
def roundto(x, to=10.0):
|
|
"""Rounds up to closest `to` number"""
|
|
if to and not math.isnan(x):
|
|
return int(round(x / to)) * to
|
|
else:
|
|
return x
|
|
|
|
total_wall_area = defaultdict(int)
|
|
total_window_area = defaultdict(int)
|
|
|
|
zones = self.idfobjects["ZONE"]
|
|
zone: EpBunch
|
|
for zone in zones:
|
|
multiplier = float(zone.Multiplier if zone.Multiplier != "" else 1)
|
|
for surface in [
|
|
surf
|
|
for surf in zone.zonesurfaces
|
|
if surf.key.upper() not in ["INTERNALMASS", "WINDOWSHADINGCONTROL"]
|
|
]:
|
|
if isclose(surface.tilt, 90, abs_tol=10):
|
|
if surface.Outside_Boundary_Condition == "Outdoors":
|
|
surf_azim = roundto(surface.azimuth, to=azimuth_threshold)
|
|
total_wall_area[surf_azim] += surface.area * multiplier
|
|
for subsurface in surface.subsurfaces:
|
|
if isclose(subsurface.tilt, 90, abs_tol=10):
|
|
if subsurface.Surface_Type.lower() == "window":
|
|
surf_azim = roundto(
|
|
subsurface.azimuth, to=azimuth_threshold
|
|
)
|
|
total_window_area[surf_azim] += subsurface.area * multiplier
|
|
# Fix azimuth = 360 which is the same as azimuth 0
|
|
total_wall_area[0] += total_wall_area.pop(360, 0)
|
|
total_window_area[0] += total_window_area.pop(360, 0)
|
|
|
|
# Create dataframe with wall_area, window_area and wwr as columns and azimuth
|
|
# as indexes
|
|
df = pd.DataFrame(
|
|
{"wall_area": total_wall_area, "window_area": total_window_area}
|
|
).rename_axis("Azimuth")
|
|
df["wwr"] = df.window_area / df.wall_area
|
|
df["wwr_rounded_%"] = (df.window_area / df.wall_area * 100).apply(
|
|
lambda x: roundto(x, to=round_to)
|
|
)
|
|
return df
|
|
|
|
def space_heating_profile(
|
|
self,
|
|
units="kWh",
|
|
energy_out_variable_name=None,
|
|
name="Space Heating",
|
|
EnergySeries_kwds={},
|
|
):
|
|
"""
|
|
Args:
|
|
units (str): Units to convert the energy profile to. Will detect the
|
|
units of the EnergyPlus results.
|
|
energy_out_variable_name (list-like): a list of EnergyPlus Variable
|
|
names.
|
|
name (str): Name given to the EnergySeries.
|
|
EnergySeries_kwds (dict, optional): keywords passed to
|
|
:func:`EnergySeries.from_sqlite`
|
|
|
|
Returns:
|
|
EnergySeries
|
|
"""
|
|
start_time = time.time()
|
|
if energy_out_variable_name is None:
|
|
energy_out_variable_name = (
|
|
"Air System Total Heating Energy",
|
|
"Zone Ideal Loads Zone Total Heating Energy",
|
|
)
|
|
series = self._energy_series(
|
|
energy_out_variable_name, units, name, EnergySeries_kwds=EnergySeries_kwds
|
|
)
|
|
log(
|
|
"Retrieved Space Heating Profile in {:,.2f} seconds".format(
|
|
time.time() - start_time
|
|
)
|
|
)
|
|
return series
|
|
|
|
def service_water_heating_profile(
|
|
self,
|
|
units="kWh",
|
|
energy_out_variable_name=None,
|
|
name="Space Heating",
|
|
EnergySeries_kwds={},
|
|
):
|
|
"""
|
|
Args:
|
|
units (str): Units to convert the energy profile to. Will detect the
|
|
units of the EnergyPlus results.
|
|
energy_out_variable_name (list-like): a list of EnergyPlus Variable
|
|
names.
|
|
name (str): Name given to the EnergySeries.
|
|
EnergySeries_kwds (dict, optional): keywords passed to
|
|
:func:`EnergySeries.from_sqlite`
|
|
|
|
Returns:
|
|
EnergySeries
|
|
"""
|
|
start_time = time.time()
|
|
if energy_out_variable_name is None:
|
|
energy_out_variable_name = ("WaterSystems:EnergyTransfer",)
|
|
series = self._energy_series(
|
|
energy_out_variable_name, units, name, EnergySeries_kwds=EnergySeries_kwds
|
|
)
|
|
log(
|
|
"Retrieved Service Water Heating Profile in {:,.2f} seconds".format(
|
|
time.time() - start_time
|
|
)
|
|
)
|
|
return series
|
|
|
|
def space_cooling_profile(
|
|
self,
|
|
units="kWh",
|
|
energy_out_variable_name=None,
|
|
name="Space Cooling",
|
|
EnergySeries_kwds={},
|
|
):
|
|
"""
|
|
Args:
|
|
units (str): Units to convert the energy profile to. Will detect the
|
|
units of the EnergyPlus results.
|
|
energy_out_variable_name (list-like): a list of EnergyPlus
|
|
name (str): Name given to the EnergySeries.
|
|
EnergySeries_kwds (dict, optional): keywords passed to
|
|
:func:`EnergySeries.from_sqlite`
|
|
|
|
Returns:
|
|
EnergySeries
|
|
"""
|
|
start_time = time.time()
|
|
if energy_out_variable_name is None:
|
|
energy_out_variable_name = (
|
|
"Air System Total Cooling Energy",
|
|
"Zone Ideal Loads Zone Total Cooling Energy",
|
|
)
|
|
series = self._energy_series(
|
|
energy_out_variable_name, units, name, EnergySeries_kwds=EnergySeries_kwds
|
|
)
|
|
log(
|
|
"Retrieved Space Cooling Profile in {:,.2f} seconds".format(
|
|
time.time() - start_time
|
|
)
|
|
)
|
|
return series
|
|
|
|
def custom_profile(
|
|
self,
|
|
energy_out_variable_name,
|
|
name,
|
|
units="kWh",
|
|
prep_outputs=None,
|
|
EnergySeries_kwds={},
|
|
):
|
|
"""
|
|
Args:
|
|
energy_out_variable_name (list-like): a list of EnergyPlus
|
|
name (str): Name given to the EnergySeries.
|
|
units (str): Units to convert the energy profile to. Will detect the
|
|
units of the EnergyPlus results.
|
|
prep_outputs:
|
|
EnergySeries_kwds (dict, optional): keywords passed to
|
|
:func:`EnergySeries.from_sqlite`
|
|
|
|
Returns:
|
|
EnergySeries
|
|
"""
|
|
start_time = time.time()
|
|
series = self._energy_series(
|
|
energy_out_variable_name,
|
|
units,
|
|
name,
|
|
prep_outputs,
|
|
EnergySeries_kwds=EnergySeries_kwds,
|
|
)
|
|
log("Retrieved {} in {:,.2f} seconds".format(name, time.time() - start_time))
|
|
return series
|
|
|
|
def _energy_series(
|
|
self,
|
|
energy_out_variable_name,
|
|
units,
|
|
name,
|
|
prep_outputs=None,
|
|
EnergySeries_kwds=None,
|
|
):
|
|
"""
|
|
Args:
|
|
energy_out_variable_name:
|
|
units:
|
|
name:
|
|
prep_outputs (list):
|
|
EnergySeries_kwds:
|
|
"""
|
|
if prep_outputs:
|
|
self._sql = self.run_eplus(
|
|
annual=True,
|
|
prep_outputs=prep_outputs,
|
|
output_report="sql_file",
|
|
verbose="q",
|
|
)
|
|
rd = ReportData.from_sqlite(self.sql_file, table_name=energy_out_variable_name)
|
|
profile = EnergySeries.from_sqlite(
|
|
rd, to_units=units, name=name, **EnergySeries_kwds
|
|
)
|
|
return profile
|
|
|
|
def run_eplus(self, **kwargs):
|
|
"""wrapper around the :meth:`translater.idfclass.run_eplus` method.
|
|
|
|
If weather file is defined in the IDF object, then this field is
|
|
optional. By default, will load the sql in self.sql.
|
|
|
|
Args:
|
|
kwargs:
|
|
|
|
Returns:
|
|
The output report or the sql file loaded as a dict of DataFrames.
|
|
"""
|
|
self.eplus_run_options.__dict__.update(kwargs)
|
|
results = run_eplus(**self.eplus_run_options.__dict__)
|
|
if self.eplus_run_options.output_report == "sql":
|
|
# user simply wants the sql
|
|
self._sql = results
|
|
return results
|
|
elif self.eplus_run_options.output_report == "sql_file":
|
|
self._sql_file = results
|
|
return results
|
|
else:
|
|
# user wants something more than the sql
|
|
return results
|
|
|
|
def add_object(self, ep_object, save=True, **kwargs):
|
|
"""Add a new object to an idf file. The function will test if the object
|
|
exists to prevent duplicates. By default, the idf with the new object is
|
|
saved to disk (save=True)
|
|
|
|
Args:
|
|
ep_object (str): the object name to add, eg. 'OUTPUT:METER' (Must be
|
|
in all_caps).
|
|
save (bool): Save the IDF as a text file with the current idfname of
|
|
the IDF.
|
|
**kwargs: keyword arguments to pass to other functions.
|
|
|
|
Returns:
|
|
EpBunch: the object
|
|
"""
|
|
# get list of objects
|
|
objs = self.idfobjects[ep_object] # a list
|
|
# If object is supposed to be 'unique-object', deletes all objects to be
|
|
# sure there is only one of them when creating new object
|
|
# (see following line)
|
|
for obj in objs:
|
|
if "unique-object" in obj.objidd[0].keys():
|
|
self.removeidfobject(obj)
|
|
# create new object
|
|
new_object = self.newidfobject(ep_object, **kwargs)
|
|
# Check if new object exists in previous list
|
|
# If True, delete the object
|
|
if sum([str(obj).upper() == str(new_object).upper() for obj in objs]) > 1:
|
|
log('object "{}" already exists in idf file'.format(ep_object), lg.DEBUG)
|
|
# Remove the newly created object since the function
|
|
# `idf.newidfobject()` automatically adds it
|
|
self.removeidfobject(new_object)
|
|
if not save:
|
|
return self.getobject(
|
|
ep_object,
|
|
kwargs.get(
|
|
"Variable_Name",
|
|
kwargs.get("Key_Name", kwargs.get("Name", None)),
|
|
),
|
|
)
|
|
else:
|
|
if save:
|
|
log('object "{}" added to the idf file'.format(ep_object))
|
|
self.save()
|
|
# invalidate the sql statements
|
|
self._sql = None
|
|
self._sql_file = None
|
|
# return the ep_object
|
|
return new_object
|
|
|
|
def get_schedule_type_limits_data_by_name(self, schedule_limit_name):
|
|
"""Returns the data for a particular 'ScheduleTypeLimits' object
|
|
|
|
Args:
|
|
schedule_limit_name:
|
|
"""
|
|
schedule = self.getobject("ScheduleTypeLimits".upper(), schedule_limit_name)
|
|
|
|
if schedule is not None:
|
|
lower_limit = schedule["Lower_Limit_Value"]
|
|
upper_limit = schedule["Upper_Limit_Value"]
|
|
numeric_type = schedule["Numeric_Type"]
|
|
unit_type = schedule["Unit_Type"]
|
|
|
|
if schedule["Unit_Type"] == "":
|
|
unit_type = numeric_type
|
|
|
|
return lower_limit, upper_limit, numeric_type, unit_type
|
|
else:
|
|
return "", "", "", ""
|
|
|
|
def get_schedule_epbunch(self, name, sch_type=None):
|
|
"""Returns the epbunch of a particular schedule name. If the schedule
|
|
type is know, retreives it quicker.
|
|
|
|
Args:
|
|
name (str): The name of the schedule to retreive in the IDF file.
|
|
sch_type (str): The schedule type, e.g.: "SCHEDULE:YEAR".
|
|
"""
|
|
if sch_type is None:
|
|
try:
|
|
return self.schedules_dict[name.upper()]
|
|
except:
|
|
try:
|
|
schedules_dict = self.get_all_schedules()
|
|
return schedules_dict[name.upper()]
|
|
except KeyError:
|
|
raise KeyError(
|
|
'Unable to find schedule "{}" of type "{}" '
|
|
'in idf file "{}"'.format(name, sch_type, self.idfname)
|
|
)
|
|
else:
|
|
return self.getobject(sch_type.upper(), name)
|
|
|
|
def get_all_schedules(self, yearly_only=False):
|
|
"""Returns all schedule ep_objects in a dict with their name as a key
|
|
|
|
Args:
|
|
yearly_only (bool): If True, return only yearly schedules
|
|
|
|
Returns:
|
|
(dict of eppy.bunch_subclass.EpBunch): the schedules with their
|
|
name as a key
|
|
"""
|
|
schedule_types = list(map(str.upper, self.getiddgroupdict()["Schedules"]))
|
|
if yearly_only:
|
|
schedule_types = [
|
|
"Schedule:Year".upper(),
|
|
"Schedule:Compact".upper(),
|
|
"Schedule:Constant".upper(),
|
|
"Schedule:File".upper(),
|
|
]
|
|
scheds = {}
|
|
for sched_type in schedule_types:
|
|
for sched in self.idfobjects[sched_type]:
|
|
try:
|
|
if sched.key.upper() in schedule_types:
|
|
scheds[sched.Name.upper()] = sched
|
|
except:
|
|
pass
|
|
return scheds
|
|
|
|
def get_used_schedules(self, yearly_only=False):
|
|
"""Returns all used schedules
|
|
|
|
Args:
|
|
yearly_only (bool): If True, return only yearly schedules
|
|
|
|
Returns:
|
|
(list): the schedules names
|
|
"""
|
|
schedule_types = [
|
|
"Schedule:Day:Hourly".upper(),
|
|
"Schedule:Day:Interval".upper(),
|
|
"Schedule:Day:List".upper(),
|
|
"Schedule:Week:Daily".upper(),
|
|
"Schedule:Year".upper(),
|
|
"Schedule:Week:Compact".upper(),
|
|
"Schedule:Compact".upper(),
|
|
"Schedule:Constant".upper(),
|
|
"Schedule:File".upper(),
|
|
]
|
|
|
|
used_schedules = []
|
|
all_schedules = self.get_all_schedules(yearly_only=yearly_only)
|
|
for object_name in self.idfobjects:
|
|
for object in self.idfobjects[object_name]:
|
|
if object.key.upper() not in schedule_types:
|
|
for fieldvalue in object.fieldvalues:
|
|
try:
|
|
if (
|
|
fieldvalue.upper() in all_schedules.keys()
|
|
and fieldvalue not in used_schedules
|
|
):
|
|
used_schedules.append(fieldvalue)
|
|
except:
|
|
pass
|
|
return used_schedules
|
|
|
|
@property
|
|
def day_of_week_for_start_day(self):
|
|
"""Get day of week for start day for the first found RUNPERIOD"""
|
|
import calendar
|
|
|
|
day = self.idfobjects["RUNPERIOD"][0]["Day_of_Week_for_Start_Day"]
|
|
|
|
if day.lower() == "sunday":
|
|
return calendar.SUNDAY
|
|
elif day.lower() == "monday":
|
|
return calendar.MONDAY
|
|
elif day.lower() == "tuesday":
|
|
return calendar.TUESDAY
|
|
elif day.lower() == "wednesday":
|
|
return calendar.WEDNESDAY
|
|
elif day.lower() == "thursday":
|
|
return calendar.THURSDAY
|
|
elif day.lower() == "friday":
|
|
return calendar.FRIDAY
|
|
elif day.lower() == "saturday":
|
|
return calendar.SATURDAY
|
|
else:
|
|
return 0
|
|
|
|
def building_name(self, use_idfname=False):
|
|
"""
|
|
Args:
|
|
use_idfname:
|
|
"""
|
|
if use_idfname:
|
|
return os.path.basename(self.idfname)
|
|
else:
|
|
bld = self.idfobjects["BUILDING"]
|
|
if bld is not None:
|
|
return bld[0].Name
|
|
else:
|
|
return os.path.basename(self.idfname)
|
|
|
|
def rename(self, objkey, objname, newname):
|
|
"""rename all the references to this objname
|
|
|
|
Function comes from eppy.modeleditor and was modify to compare the
|
|
name to rename as a lower string (see
|
|
idfobject[idfobject.objls[findex]].lower() == objname.lower())
|
|
|
|
Args:
|
|
objkey (EpBunch): EpBunch we want to rename and rename all the
|
|
occurrences where this object is in the IDF file
|
|
objname (str): The name of the EpBunch to rename
|
|
newname (str): New name used to rename the EpBunch
|
|
|
|
Returns:
|
|
theobject (EpBunch): The IDF objects renameds
|
|
"""
|
|
|
|
refnames = eppy.modeleditor.getrefnames(self, objkey)
|
|
for refname in refnames:
|
|
objlists = eppy.modeleditor.getallobjlists(self, refname)
|
|
# [('OBJKEY', refname, fieldindexlist), ...]
|
|
for robjkey, refname, fieldindexlist in objlists:
|
|
idfobjects = self.idfobjects[robjkey]
|
|
for idfobject in idfobjects:
|
|
for findex in fieldindexlist: # for each field
|
|
if (
|
|
idfobject[idfobject.objls[findex]].lower()
|
|
== objname.lower()
|
|
):
|
|
idfobject[idfobject.objls[findex]] = newname
|
|
theobject = self.getobject(objkey, objname)
|
|
fieldname = [item for item in theobject.objls if item.endswith("Name")][0]
|
|
theobject[fieldname] = newname
|
|
return theobject
|
|
|
|
|
|
class EnergyPlusOptions:
|
|
def __init__(
|
|
self,
|
|
eplus_file,
|
|
weather_file,
|
|
output_directory=None,
|
|
ep_version=None,
|
|
output_report=None,
|
|
prep_outputs=False,
|
|
simulname=None,
|
|
keep_data=True,
|
|
annual=False,
|
|
design_day=False,
|
|
epmacro=False,
|
|
expandobjects=True,
|
|
readvars=False,
|
|
output_prefix=False,
|
|
output_suffix="L",
|
|
version=None,
|
|
verbose="v",
|
|
keep_data_err=False,
|
|
include=None,
|
|
process_files=False,
|
|
custom_processes=None,
|
|
return_idf=False,
|
|
return_files=False,
|
|
):
|
|
"""
|
|
Args:
|
|
eplus_file:
|
|
weather_file:
|
|
output_directory:
|
|
ep_version:
|
|
output_report:
|
|
prep_outputs:
|
|
simulname:
|
|
keep_data:
|
|
annual:
|
|
design_day:
|
|
epmacro:
|
|
expandobjects:
|
|
readvars:
|
|
output_prefix:
|
|
output_suffix:
|
|
version:
|
|
verbose:
|
|
keep_data_err:
|
|
include:
|
|
process_files:
|
|
custom_processes:
|
|
return_idf:
|
|
return_files:
|
|
"""
|
|
self.return_files = return_files
|
|
self.custom_processes = custom_processes
|
|
self.process_files = process_files
|
|
self.include = include
|
|
self.keep_data_err = keep_data_err
|
|
self.version = version
|
|
self.keep_data = keep_data
|
|
self.simulname = simulname
|
|
self.output_suffix = output_suffix
|
|
self.verbose = verbose
|
|
self.output_prefix = output_prefix
|
|
self.readvars = readvars
|
|
self.expandobjects = expandobjects
|
|
self.epmacro = epmacro
|
|
self.design_day = design_day
|
|
self.annual = annual
|
|
self.return_idf = return_idf
|
|
self.prep_outputs = prep_outputs
|
|
self.output_report = output_report
|
|
self.ep_version = ep_version
|
|
self.output_directory = output_directory
|
|
self.weather_file = weather_file
|
|
self.eplus_file = eplus_file
|
|
|
|
def __repr__(self):
|
|
return str(self)
|
|
|
|
def __str__(self):
|
|
return json.dumps(self.__dict__, indent=2)
|
|
|
|
|
|
def load_idf(
|
|
eplus_file,
|
|
idd_filename=None,
|
|
output_folder=None,
|
|
include=None,
|
|
weather_file=None,
|
|
ep_version=None,
|
|
):
|
|
"""Returns a parsed IDF object from file. If *translater.settings.use_cache*
|
|
is true, then the idf object is loaded from cache.
|
|
|
|
Args:
|
|
eplus_file (str): Either the absolute or relative path to the idf file.
|
|
idd_filename (str, optional): Either the absolute or relative path to
|
|
the EnergyPlus IDD file. If None, the function tries to find it at
|
|
the default EnergyPlus install location.
|
|
output_folder (Path, optional): Either the absolute or relative path of
|
|
the output folder. Specify if the cache location is different than
|
|
translater.settings.cache_folder.
|
|
include (str, optional): List input files that need to be copied to the
|
|
simulation directory. Those can be, for example, schedule files read
|
|
by the idf file. If a string is provided, it should be in a glob
|
|
form (see pathlib.Path.glob).
|
|
weather_file: Either the absolute or relative path to the weather epw
|
|
file.
|
|
ep_version (str, optional): EnergyPlus version number to use, eg.:
|
|
"9-2-0". Defaults to `settings.ep_version` .
|
|
|
|
Returns:
|
|
IDF: The IDF object.
|
|
"""
|
|
eplus_file = Path(eplus_file)
|
|
start_time = time.time()
|
|
|
|
idf = load_idf_object_from_cache(eplus_file)
|
|
if idf:
|
|
return idf
|
|
else:
|
|
# Else, run eppy to load the idf objects
|
|
idf = _eppy_load(
|
|
eplus_file,
|
|
idd_filename,
|
|
output_folder=output_folder,
|
|
include=include,
|
|
epw=weather_file,
|
|
ep_version=ep_version if ep_version is not None else settings.ep_version,
|
|
)
|
|
log(
|
|
'Loaded "{}" in {:,.2f} seconds\n'.format(
|
|
eplus_file.basename(), time.time() - start_time
|
|
)
|
|
)
|
|
return idf
|
|
|
|
|
|
def _eppy_load(
|
|
file, idd_filename, output_folder=None, include=None, epw=None, ep_version=None
|
|
):
|
|
"""Uses package eppy to parse an idf file. Will also try to upgrade the idf
|
|
file using the EnergyPlus Transition executables if the version of
|
|
EnergyPlus is not installed on the machine.
|
|
|
|
Args:
|
|
file (str): path of the idf file.
|
|
idd_filename: path of the EnergyPlus IDD file.
|
|
output_folder (str): path to the output folder. Will default to the
|
|
settings.cache_folder.
|
|
include (str, optional): List input files that need to be copied to the
|
|
simulation directory.if a string is provided, it should be in a glob
|
|
form (see pathlib.Path.glob).
|
|
epw (str, optional): path of the epw weather file.
|
|
ep_version (str): EnergyPlus version number to use.
|
|
|
|
Returns:
|
|
eppy.modeleditor.IDF: IDF object
|
|
"""
|
|
file = Path(file)
|
|
cache_filename = hash_file(file)
|
|
|
|
try:
|
|
# first copy the file
|
|
if not output_folder:
|
|
output_folder = settings.cache_folder / cache_filename
|
|
else:
|
|
output_folder = Path(output_folder)
|
|
|
|
output_folder.makedirs_p()
|
|
if file.basename() not in [
|
|
file.basename() for file in output_folder.glob("*.idf")
|
|
]:
|
|
# The file does not exist; copy it to the output_folder & override path name
|
|
file = Path(file.copy(output_folder))
|
|
else:
|
|
# The file already exists at the location. Use that file
|
|
file = output_folder / file.basename()
|
|
|
|
# Determine version of idf file by reading the text file
|
|
if idd_filename is None:
|
|
idd_filename = getiddfile(get_idf_version(file))
|
|
|
|
# Initiate an eppy.modeleditor.IDF object
|
|
IDF.setiddname(idd_filename, testing=True)
|
|
# load the idf object
|
|
idf_object = IDF(file, epw=epw)
|
|
# Check version of IDF file against version of IDD file
|
|
idf_version = idf_object.idfobjects["VERSION"][0].Version_Identifier
|
|
idd_version = "{}.{}".format(
|
|
idf_object.idd_version[0], idf_object.idd_version[1]
|
|
)
|
|
except FileNotFoundError:
|
|
# Loading the idf object will raise a FileNotFoundError if the
|
|
# version of EnergyPlus is not installed
|
|
log("Transitioning idf file {}".format(file))
|
|
# if they don't fit, upgrade file
|
|
file = idf_version_updater(file, out_dir=output_folder, to_version=ep_version)
|
|
idd_filename = getiddfile(get_idf_version(file))
|
|
# Initiate an eppy.modeleditor.IDF object
|
|
IDF.setiddname(idd_filename, testing=True)
|
|
# load the idf object
|
|
idf_object = IDF(file, epw=epw)
|
|
else:
|
|
# the versions fit, great!
|
|
log(
|
|
'The version of the IDF file "{}", version "{}", matched the '
|
|
'version of EnergyPlus {}, version "{}", used to parse it.'.format(
|
|
file.basename(), idf_version, idf_object.getiddname(), idd_version
|
|
),
|
|
level=lg.DEBUG,
|
|
)
|
|
# when parsing is complete, save it to disk, then return object
|
|
save_idf_object_to_cache(idf_object, idf_object.idfname, output_folder)
|
|
if isinstance(include, str):
|
|
include = Path().abspath().glob(include)
|
|
if include is not None:
|
|
[Path(file).copy(output_folder) for file in include]
|
|
return idf_object
|
|
|
|
|
|
def save_idf_object_to_cache(idf_object, idf_file, output_folder=None, how=None):
|
|
"""Saves the object to disk. Essentially uses the pickling functions of
|
|
python.
|
|
|
|
Todo:
|
|
* Json dump does not work yet.
|
|
|
|
Args:
|
|
idf_object (eppy.modeleditor.IDF): an eppy IDF object
|
|
idf_file (str): file path of idf file
|
|
output_folder (Path): temporary output directory (default:
|
|
settings.cache_folder)
|
|
how (str, optional): How the pickling is done. Choices are 'json' or
|
|
'pickle'. json dump does not quite work yet. 'pickle' will save to a
|
|
gzip'ed file instead of a regular binary file (.dat).
|
|
|
|
Returns:
|
|
None
|
|
"""
|
|
# upper() can't take NoneType as input.
|
|
if how is None:
|
|
how = ""
|
|
# The main function
|
|
if settings.use_cache:
|
|
if output_folder is None:
|
|
output_folder = hash_file(idf_file)
|
|
cache_dir = os.path.join(settings.cache_folder, output_folder)
|
|
cache_dir = output_folder
|
|
|
|
# create the folder on the disk if it doesn't already exist
|
|
if not os.path.exists(cache_dir):
|
|
os.makedirs(cache_dir)
|
|
|
|
if how.upper() == "JSON":
|
|
cache_fullpath_filename = cache_dir / cache_dir.basename() + "idfs.json"
|
|
import gzip, json
|
|
|
|
with open(cache_fullpath_filename, "w") as file_handle:
|
|
json.dump(
|
|
{
|
|
key: value.__dict__
|
|
for key, value in idf_object.idfobjects.items()
|
|
},
|
|
file_handle,
|
|
sort_keys=True,
|
|
indent=4,
|
|
check_circular=True,
|
|
)
|
|
|
|
elif how.upper() == "PICKLE":
|
|
# create pickle and dump
|
|
cache_fullpath_filename = cache_dir / cache_dir.basename() + "idfs.gzip"
|
|
import gzip
|
|
|
|
try:
|
|
import cPickle as pickle
|
|
except ImportError:
|
|
import pickle
|
|
start_time = time.time()
|
|
with gzip.GzipFile(cache_fullpath_filename, "wb") as file_handle:
|
|
pickle.dump(idf_object, file_handle, protocol=0)
|
|
log(
|
|
"Saved pickle to file in {:,.2f} seconds".format(
|
|
time.time() - start_time
|
|
)
|
|
)
|
|
|
|
else:
|
|
cache_fullpath_filename = cache_dir / cache_dir.basename() + "idfs.dat"
|
|
try:
|
|
import cPickle as pickle
|
|
except ImportError:
|
|
import pickle
|
|
start_time = time.time()
|
|
with open(cache_fullpath_filename, "wb") as file_handle:
|
|
pickle.dump(idf_object, file_handle, protocol=-1)
|
|
log(
|
|
"Saved pickle to file in {:,.2f} seconds".format(
|
|
time.time() - start_time
|
|
)
|
|
)
|
|
|
|
|
|
def load_idf_object_from_cache(idf_file, how=None):
|
|
"""Load an idf instance from cache.
|
|
|
|
Args:
|
|
idf_file (str): Either the absolute or relative path to the idf file.
|
|
how (str, optional): How the pickling is done. Choices are 'json' or
|
|
'pickle' or 'idf'. json dump doesn't quite work yet. 'pickle' will
|
|
load from a gzip'ed file instead of a regular binary file (.gzip).
|
|
'idf' will load from idf file saved in cache (.dat).
|
|
|
|
Returns:
|
|
IDF: The IDF object.
|
|
"""
|
|
# upper() can't take NoneType as input.
|
|
if how is None:
|
|
how = ""
|
|
# The main function
|
|
if settings.use_cache:
|
|
cache_filename = hash_file(idf_file)
|
|
if how.upper() == "JSON":
|
|
cache_fullpath_filename = os.path.join(
|
|
settings.cache_folder,
|
|
cache_filename,
|
|
os.extsep.join([cache_filename + "idfs", "json"]),
|
|
)
|
|
import json
|
|
|
|
try:
|
|
import cPickle as pickle
|
|
except ImportError:
|
|
import pickle
|
|
start_time = time.time()
|
|
if os.path.isfile(cache_fullpath_filename):
|
|
if os.path.getsize(cache_fullpath_filename) > 0:
|
|
with open(cache_fullpath_filename, "rb") as file_handle:
|
|
idf = json.load(file_handle)
|
|
log(
|
|
'Loaded "{}" from pickled file in {:,.2f} seconds'.format(
|
|
os.path.basename(idf_file), time.time() - start_time
|
|
)
|
|
)
|
|
return idf
|
|
|
|
elif how.upper() == "PICKLE":
|
|
cache_fullpath_filename = os.path.join(
|
|
settings.cache_folder,
|
|
cache_filename,
|
|
os.extsep.join([cache_filename + "idfs", "gzip"]),
|
|
)
|
|
import gzip
|
|
|
|
try:
|
|
import cPickle as pickle
|
|
except ImportError:
|
|
import pickle
|
|
start_time = time.time()
|
|
if os.path.isfile(cache_fullpath_filename):
|
|
if os.path.getsize(cache_fullpath_filename) > 0:
|
|
with gzip.GzipFile(cache_fullpath_filename, "rb") as file_handle:
|
|
try:
|
|
idf = pickle.load(file_handle)
|
|
except EOFError:
|
|
return None
|
|
if idf.iddname is None:
|
|
idf.setiddname(getiddfile(idf.model.dt["VERSION"][0][1]))
|
|
# idf.read()
|
|
log(
|
|
'Loaded "{}" from pickled file in {:,.2f} seconds'.format(
|
|
os.path.basename(idf_file), time.time() - start_time
|
|
)
|
|
)
|
|
return idf
|
|
elif how.upper() == "IDF":
|
|
cache_fullpath_filename = os.path.join(
|
|
settings.cache_folder,
|
|
cache_filename,
|
|
os.extsep.join([cache_filename, "idf"]),
|
|
)
|
|
if os.path.isfile(cache_fullpath_filename):
|
|
version = get_idf_version(cache_fullpath_filename, doted=True)
|
|
iddfilename = getiddfile(version)
|
|
idf = _eppy_load(cache_fullpath_filename, iddfilename)
|
|
return idf
|
|
else:
|
|
cache_fullpath_filename = os.path.join(
|
|
settings.cache_folder,
|
|
cache_filename,
|
|
os.extsep.join([cache_filename + "idfs", "dat"]),
|
|
)
|
|
try:
|
|
import cPickle as pickle
|
|
except ImportError:
|
|
import pickle
|
|
start_time = time.time()
|
|
if os.path.isfile(cache_fullpath_filename):
|
|
if os.path.getsize(cache_fullpath_filename) > 0:
|
|
with open(cache_fullpath_filename, "rb") as file_handle:
|
|
try:
|
|
idf = pickle.load(file_handle)
|
|
except EOFError:
|
|
return None
|
|
if idf.iddname is None:
|
|
idf.setiddname(getiddfile(idf.model.dt["VERSION"][0][1]))
|
|
idf.read()
|
|
log(
|
|
'Loaded "{}" from pickled file in {:,.2f} seconds'.format(
|
|
os.path.basename(idf_file), time.time() - start_time
|
|
)
|
|
)
|
|
return idf
|
|
|
|
|
|
class OutputPrep:
|
|
"""Handles preparation of EnergyPlus outputs. Different instance methods
|
|
allow to chain methods together and to add predefined bundles of outputs in
|
|
one go.
|
|
|
|
For example:
|
|
>>> OutputPrep(idf=idf_obj).add_output_control().add_umi_ouputs().add_profile_gas_elect_ouputs()
|
|
"""
|
|
|
|
def __init__(self, idf, save=True):
|
|
"""Initialize an OutputPrep object.
|
|
|
|
Args:
|
|
idf (IDF): the IDF object for wich this OutputPrep object is created.
|
|
save (bool): weather to save or not changes after adding outputs to the
|
|
IDF file.
|
|
"""
|
|
self.idf = idf
|
|
self.save = save
|
|
self.outputs = []
|
|
|
|
def add_custom(self, outputs):
|
|
"""Add custom-defined outputs as a list of objects.
|
|
|
|
Examples:
|
|
>>> outputs = [
|
|
>>> {
|
|
>>> "ep_object": "OUTPUT:METER",
|
|
>>> "kwargs": dict(
|
|
>>> Key_Name="Electricity:Facility",
|
|
>>> Reporting_Frequency="hourly",
|
|
>>> save=True,
|
|
>>> ),
|
|
>>> },
|
|
>>> ]
|
|
>>> OutputPrep().add_custom(outputs)
|
|
|
|
Args:
|
|
outputs (list): Pass a list of ep-objects defined as dictionary. See
|
|
examples.
|
|
"""
|
|
if isinstance(outputs, list):
|
|
prepare_outputs(self.idf, outputs=outputs, save=self.save)
|
|
self.outputs.extend(outputs)
|
|
return self
|
|
|
|
def add_basics(self):
|
|
"""Adds the summary report and the sql file to the idf outputs"""
|
|
return self.add_summary_report().add_output_control().add_sql().add_schedules()
|
|
|
|
def add_schedules(self):
|
|
"""Adds Schedules object"""
|
|
outputs = [
|
|
{
|
|
"ep_object": "Output:Schedules".upper(),
|
|
"kwargs": dict(Key_Field="Hourly", save=self.save),
|
|
}
|
|
]
|
|
prepare_outputs(self.idf, outputs=outputs, save=self.save)
|
|
self.outputs.extend(outputs)
|
|
return self
|
|
|
|
def add_summary_report(self, summary="AllSummary"):
|
|
"""Adds the Output:Table:SummaryReports object.
|
|
|
|
Args:
|
|
summary (str): Choices are AllSummary, AllMonthly,
|
|
AllSummaryAndMonthly, AllSummaryAndSizingPeriod,
|
|
AllSummaryMonthlyAndSizingPeriod,
|
|
AnnualBuildingUtilityPerformanceSummary,
|
|
InputVerificationandResultsSummary,
|
|
SourceEnergyEndUseComponentsSummary, ClimaticDataSummary,
|
|
EnvelopeSummary, SurfaceShadowingSummary, ShadingSummary,
|
|
LightingSummary, EquipmentSummary, HVACSizingSummary,
|
|
ComponentSizingSummary, CoilSizingDetails, OutdoorAirSummary,
|
|
SystemSummary, AdaptiveComfortSummary, SensibleHeatGainSummary,
|
|
Standard62.1Summary, EnergyMeters, InitializationSummary,
|
|
LEEDSummary, TariffReport, EconomicResultSummary,
|
|
ComponentCostEconomicsSummary, LifeCycleCostReport,
|
|
HeatEmissionsSummary,
|
|
"""
|
|
outputs = [
|
|
{
|
|
"ep_object": "Output:Table:SummaryReports".upper(),
|
|
"kwargs": dict(Report_1_Name=summary, save=self.save),
|
|
}
|
|
]
|
|
prepare_outputs(self.idf, outputs=outputs, save=self.save)
|
|
self.outputs.extend(outputs)
|
|
return self
|
|
|
|
def add_sql(self, sql_output_style="SimpleAndTabular"):
|
|
"""Adds the `Output:SQLite` object. This object will produce an sql file
|
|
that contains the simulation results in a database format. See
|
|
`eplusout.sql
|
|
<https://bigladdersoftware.com/epx/docs/9-2/output-details-and
|
|
-examples/eplusout-sql.html#eplusout.sql>`_ for more details.
|
|
|
|
Args:
|
|
sql_output_style (str): The *Simple* option will include all of the
|
|
predefined database tables as well as time series related data.
|
|
Using the *SimpleAndTabular* choice adds database tables related
|
|
to the tabular reports that are already output by EnergyPlus in
|
|
other formats.
|
|
"""
|
|
outputs = [
|
|
{
|
|
"ep_object": "Output:SQLite".upper(),
|
|
"kwargs": dict(Option_Type=sql_output_style, save=self.save),
|
|
}
|
|
]
|
|
prepare_outputs(self.idf, outputs=outputs, save=self.save)
|
|
self.outputs.extend(outputs)
|
|
return self
|
|
|
|
def add_output_control(self, output_control_table_style="CommaAndHTML"):
|
|
"""Sets the `OutputControl:Table:Style` object.
|
|
|
|
Args:
|
|
output_control_table_style (str): Choices are: Comma, Tab, Fixed,
|
|
HTML, XML, CommaAndHTML, TabAndHTML, XMLAndHTML, All
|
|
"""
|
|
outputs = [
|
|
{
|
|
"ep_object": "OutputControl:Table:Style".upper(),
|
|
"kwargs": dict(
|
|
Column_Separator=output_control_table_style, save=self.save
|
|
),
|
|
}
|
|
]
|
|
prepare_outputs(self.idf, outputs=outputs, save=self.save)
|
|
self.outputs.extend(outputs)
|
|
return self
|
|
|
|
def add_template_outputs(self):
|
|
"""Adds the necessary outputs in order to create an UMI template."""
|
|
# list the outputs here
|
|
outputs = [
|
|
{
|
|
"ep_object": "Output:Variable".upper(),
|
|
"kwargs": dict(
|
|
Variable_Name="Air System Total Heating Energy",
|
|
Reporting_Frequency="hourly",
|
|
save=self.save,
|
|
),
|
|
},
|
|
{
|
|
"ep_object": "Output:Variable".upper(),
|
|
"kwargs": dict(
|
|
Variable_Name="Air System Total Cooling Energy",
|
|
Reporting_Frequency="hourly",
|
|
save=self.save,
|
|
),
|
|
},
|
|
{
|
|
"ep_object": "Output:Variable".upper(),
|
|
"kwargs": dict(
|
|
Variable_Name="Zone Ideal Loads Zone Total Cooling Energy",
|
|
Reporting_Frequency="hourly",
|
|
save=self.save,
|
|
),
|
|
},
|
|
{
|
|
"ep_object": "Output:Variable".upper(),
|
|
"kwargs": dict(
|
|
Variable_Name="Zone Ideal Loads Zone Total Heating Energy",
|
|
Reporting_Frequency="hourly",
|
|
save=self.save,
|
|
),
|
|
},
|
|
{
|
|
"ep_object": "Output:Variable".upper(),
|
|
"kwargs": dict(
|
|
Variable_Name="Zone Thermostat Heating Setpoint Temperature",
|
|
Reporting_Frequency="hourly",
|
|
save=self.save,
|
|
),
|
|
},
|
|
{
|
|
"ep_object": "Output:Variable".upper(),
|
|
"kwargs": dict(
|
|
Variable_Name="Zone Thermostat Cooling Setpoint Temperature",
|
|
Reporting_Frequency="hourly",
|
|
save=self.save,
|
|
),
|
|
},
|
|
{
|
|
"ep_object": "Output:Variable".upper(),
|
|
"kwargs": dict(
|
|
Variable_Name="Heat Exchanger Total Heating Rate",
|
|
Reporting_Frequency="hourly",
|
|
save=self.save,
|
|
),
|
|
},
|
|
{
|
|
"ep_object": "Output:Variable".upper(),
|
|
"kwargs": dict(
|
|
Variable_Name="Heat Exchanger Sensible Effectiveness",
|
|
Reporting_Frequency="hourly",
|
|
save=self.save,
|
|
),
|
|
},
|
|
{
|
|
"ep_object": "Output:Variable".upper(),
|
|
"kwargs": dict(
|
|
Variable_Name="Heat Exchanger Latent Effectiveness",
|
|
Reporting_Frequency="hourly",
|
|
save=self.save,
|
|
),
|
|
},
|
|
{
|
|
"ep_object": "Output:Variable".upper(),
|
|
"kwargs": dict(
|
|
Variable_Name="Water Heater Heating Energy",
|
|
Reporting_Frequency="hourly",
|
|
save=self.save,
|
|
),
|
|
},
|
|
{
|
|
"ep_object": "OUTPUT:METER",
|
|
"kwargs": dict(
|
|
Key_Name="HeatRejection:EnergyTransfer",
|
|
Reporting_Frequency="hourly",
|
|
save=self.save,
|
|
),
|
|
},
|
|
{
|
|
"ep_object": "OUTPUT:METER",
|
|
"kwargs": dict(
|
|
Key_Name="Heating:EnergyTransfer",
|
|
Reporting_Frequency="hourly",
|
|
save=self.save,
|
|
),
|
|
},
|
|
{
|
|
"ep_object": "OUTPUT:METER",
|
|
"kwargs": dict(
|
|
Key_Name="Cooling:EnergyTransfer",
|
|
Reporting_Frequency="hourly",
|
|
save=self.save,
|
|
),
|
|
},
|
|
{
|
|
"ep_object": "OUTPUT:METER",
|
|
"kwargs": dict(
|
|
Key_Name="Heating:DistrictHeating",
|
|
Reporting_Frequency="hourly",
|
|
save=self.save,
|
|
),
|
|
},
|
|
{
|
|
"ep_object": "OUTPUT:METER",
|
|
"kwargs": dict(
|
|
Key_Name="Heating:Electricity",
|
|
Reporting_Frequency="hourly",
|
|
save=self.save,
|
|
),
|
|
},
|
|
{
|
|
"ep_object": "OUTPUT:METER",
|
|
"kwargs": dict(
|
|
Key_Name="Heating:Gas", Reporting_Frequency="hourly", save=self.save
|
|
),
|
|
},
|
|
{
|
|
"ep_object": "OUTPUT:METER",
|
|
"kwargs": dict(
|
|
Key_Name="Cooling:DistrictCooling",
|
|
Reporting_Frequency="hourly",
|
|
save=self.save,
|
|
),
|
|
},
|
|
{
|
|
"ep_object": "OUTPUT:METER",
|
|
"kwargs": dict(
|
|
Key_Name="Cooling:Electricity",
|
|
Reporting_Frequency="hourly",
|
|
save=self.save,
|
|
),
|
|
},
|
|
{
|
|
"ep_object": "OUTPUT:METER",
|
|
"kwargs": dict(
|
|
Key_Name="Cooling:Electricity",
|
|
Reporting_Frequency="hourly",
|
|
save=self.save,
|
|
),
|
|
},
|
|
{
|
|
"ep_object": "OUTPUT:METER",
|
|
"kwargs": dict(
|
|
Key_Name="Cooling:Gas", Reporting_Frequency="hourly", save=self.save
|
|
),
|
|
},
|
|
{
|
|
"ep_object": "OUTPUT:METER",
|
|
"kwargs": dict(
|
|
Key_Name="WaterSystems:EnergyTransfer",
|
|
Reporting_Frequency="hourly",
|
|
save=self.save,
|
|
),
|
|
},
|
|
{
|
|
"ep_object": "OUTPUT:METER",
|
|
"kwargs": dict(
|
|
Key_Name="Cooling:Gas", Reporting_Frequency="hourly", save=self.save
|
|
),
|
|
},
|
|
]
|
|
|
|
prepare_outputs(self.idf, outputs=outputs, save=self.save)
|
|
self.outputs.extend(outputs)
|
|
return self
|
|
|
|
def add_umi_ouputs(self):
|
|
"""Adds the necessary outputs in order to return the same energy profile
|
|
as in UMI.
|
|
"""
|
|
# list the outputs here
|
|
outputs = [
|
|
{
|
|
"ep_object": "Output:Variable".upper(),
|
|
"kwargs": dict(
|
|
Variable_Name="Air System Total Heating Energy",
|
|
Reporting_Frequency="hourly",
|
|
save=self.save,
|
|
),
|
|
},
|
|
{
|
|
"ep_object": "Output:Variable".upper(),
|
|
"kwargs": dict(
|
|
Variable_Name="Air System Total Cooling Energy",
|
|
Reporting_Frequency="hourly",
|
|
save=self.save,
|
|
),
|
|
},
|
|
{
|
|
"ep_object": "Output:Variable".upper(),
|
|
"kwargs": dict(
|
|
Variable_Name="Zone Ideal Loads Zone Total Cooling Energy",
|
|
Reporting_Frequency="hourly",
|
|
save=self.save,
|
|
),
|
|
},
|
|
{
|
|
"ep_object": "Output:Variable".upper(),
|
|
"kwargs": dict(
|
|
Variable_Name="Zone Ideal Loads Zone Total Heating Energy",
|
|
Reporting_Frequency="hourly",
|
|
save=self.save,
|
|
),
|
|
},
|
|
{
|
|
"ep_object": "Output:Variable".upper(),
|
|
"kwargs": dict(
|
|
Variable_Name="Water Heater Heating Energy",
|
|
Reporting_Frequency="hourly",
|
|
save=self.save,
|
|
),
|
|
},
|
|
]
|
|
|
|
prepare_outputs(self.idf, outputs=outputs, save=self.save)
|
|
self.outputs.extend(outputs)
|
|
return self
|
|
|
|
def add_profile_gas_elect_ouputs(self):
|
|
"""Adds the following meters: Electricity:Facility, Gas:Facility,
|
|
WaterSystems:Electricity, Heating:Electricity, Cooling:Electricity
|
|
"""
|
|
# list the outputs here
|
|
outputs = [
|
|
{
|
|
"ep_object": "OUTPUT:METER",
|
|
"kwargs": dict(
|
|
Key_Name="Electricity:Facility",
|
|
Reporting_Frequency="hourly",
|
|
save=True,
|
|
),
|
|
},
|
|
{
|
|
"ep_object": "OUTPUT:METER",
|
|
"kwargs": dict(
|
|
Key_Name="Gas:Facility", Reporting_Frequency="hourly", save=True
|
|
),
|
|
},
|
|
{
|
|
"ep_object": "OUTPUT:METER",
|
|
"kwargs": dict(
|
|
Key_Name="WaterSystems:Electricity",
|
|
Reporting_Frequency="hourly",
|
|
save=True,
|
|
),
|
|
},
|
|
{
|
|
"ep_object": "OUTPUT:METER",
|
|
"kwargs": dict(
|
|
Key_Name="Heating:Electricity",
|
|
Reporting_Frequency="hourly",
|
|
save=True,
|
|
),
|
|
},
|
|
{
|
|
"ep_object": "OUTPUT:METER",
|
|
"kwargs": dict(
|
|
Key_Name="Cooling:Electricity",
|
|
Reporting_Frequency="hourly",
|
|
save=True,
|
|
),
|
|
},
|
|
]
|
|
|
|
prepare_outputs(self.idf, outputs=outputs, save=self.save)
|
|
self.outputs.extend(outputs)
|
|
return self
|
|
|
|
|
|
def prepare_outputs(
|
|
idf, outputs=None, idd_filename=None, output_directory=None, save=True, epw=None
|
|
):
|
|
"""Add additional epobjects to the idf file. Users can pass in an outputs
|
|
|
|
Examples:
|
|
>>> objects = [{'ep_object':'OUTPUT:DIAGNOSTICS',
|
|
>>> 'kwargs':{'Key_1':'DisplayUnusedSchedules'}}]
|
|
>>> prepare_outputs(idf, outputs=objects)
|
|
|
|
Args:
|
|
idf (IDF or Path): The IDF object or the path to the file describing the
|
|
model (.idf).
|
|
outputs (bool or list):
|
|
idd_filename:
|
|
output_directory:
|
|
save (bool): if True, saves the idf inplace to disk with added objects
|
|
epw:
|
|
"""
|
|
if isinstance(idf, (Path, str)):
|
|
log("first, loading the idf file")
|
|
idf = load_idf(
|
|
idf,
|
|
idd_filename=idd_filename,
|
|
output_folder=output_directory,
|
|
weather_file=epw,
|
|
)
|
|
|
|
if isinstance(outputs, list):
|
|
for output in outputs:
|
|
save = output["kwargs"].pop("save", save)
|
|
idf.add_object(output["ep_object"], **output["kwargs"], save=save)
|
|
|
|
|
|
def cache_runargs(eplus_file, runargs):
|
|
"""
|
|
Args:
|
|
eplus_file:
|
|
runargs:
|
|
"""
|
|
import json
|
|
|
|
output_directory = runargs["output_directory"] / runargs["output_prefix"]
|
|
|
|
runargs.update({"run_time": datetime.datetime.now().isoformat()})
|
|
runargs.update({"idf_file": eplus_file})
|
|
with open(os.path.join(output_directory, "runargs.json"), "w") as fp:
|
|
json.dump(runargs, fp, sort_keys=True, indent=4)
|
|
|
|
|
|
def run_eplus(
|
|
eplus_file,
|
|
weather_file,
|
|
output_directory=None,
|
|
ep_version=None,
|
|
output_report=None,
|
|
prep_outputs=False,
|
|
simulname=None,
|
|
keep_data=True,
|
|
annual=False,
|
|
design_day=False,
|
|
epmacro=False,
|
|
expandobjects=True,
|
|
readvars=False,
|
|
output_prefix=None,
|
|
output_suffix=None,
|
|
version=None,
|
|
verbose="v",
|
|
keep_data_err=False,
|
|
include=None,
|
|
process_files=False,
|
|
custom_processes=None,
|
|
return_idf=False,
|
|
return_files=False,
|
|
**kwargs,
|
|
):
|
|
"""Run an EnergyPlus file using the EnergyPlus executable.
|
|
|
|
Specify run options:
|
|
Run options are specified in the same way as the E+ command line
|
|
interface: annual, design_day, epmacro, expandobjects, etc. are all
|
|
supported.
|
|
|
|
Specify outputs:
|
|
Optionally define the desired outputs by specifying the
|
|
:attr:`prep_outputs` attribute.
|
|
|
|
With the :attr:`prep_outputs` attribute, specify additional outputs
|
|
objects to append to the energy plus file. If True is specified, a selection of
|
|
useful options will be append by default (see: :class:`OutputPrep`
|
|
for more details).
|
|
|
|
Args:
|
|
eplus_file (str): path to the idf file.
|
|
weather_file (str): path to the EPW weather file.
|
|
output_directory (str, optional): path to the output folder. Will
|
|
default to the settings.cache_folder.
|
|
ep_version (str, optional): EnergyPlus version to use, eg: 9-2-0
|
|
output_report: 'sql' or 'htm'.
|
|
prep_outputs (bool or list, optional): if True, meters and variable
|
|
outputs will be appended to the idf files. Can also specify custom
|
|
outputs as list of ep-object outputs.
|
|
simulname (str): The name of the simulation. (Todo: Currently not implemented).
|
|
keep_data (bool): If True, files created by EnergyPlus are saved to the
|
|
output_directory.
|
|
annual (bool): If True then force annual simulation (default: False)
|
|
design_day (bool): Force design-day-only simulation (default: False)
|
|
epmacro (bool): Run EPMacro prior to simulation (default: False)
|
|
expandobjects (bool): Run ExpandObjects prior to simulation (default:
|
|
True)
|
|
readvars (bool): Run ReadVarsESO after simulation (default: False)
|
|
output_prefix (str, optional): Prefix for output file names.
|
|
output_suffix (str, optional): Suffix style for output file names
|
|
(default: L) Choices are:
|
|
- L: Legacy (e.g., eplustbl.csv)
|
|
- C: Capital (e.g., eplusTable.csv)
|
|
- D: Dash (e.g., eplus-table.csv)
|
|
version (bool, optional): Display version information (default: False)
|
|
verbose (str): Set verbosity of runtime messages (default: v) v: verbose
|
|
q: quiet
|
|
keep_data_err (bool): If True, errored directory where simulation occurred is
|
|
kept.
|
|
include (str, optional): List input files that need to be copied to the
|
|
simulation directory. If a string is provided, it should be in a glob
|
|
form (see :meth:`pathlib.Path.glob`).
|
|
process_files (bool): If True, process the output files and load to a
|
|
:class:`~pandas.DataFrame`. Custom processes can be passed using the
|
|
:attr:`custom_processes` attribute.
|
|
custom_processes (dict(Callback)): if provided, it has to be a
|
|
dictionary with the keys being a glob (see :meth:`pathlib.Path.glob`), and
|
|
the value a Callback taking as signature `callback(file: str,
|
|
working_dir, simulname) -> Any` All the file matching this glob will
|
|
be processed by this callback. Note: they will still be processed by
|
|
pandas.read_csv (if they are csv files), resulting in duplicate. The
|
|
only way to bypass this behavior is to add the key "*.csv" to that
|
|
dictionary.
|
|
return_idf (bool): If True, returns the :class:`IDF` object part of the
|
|
return tuple.
|
|
return_files (bool): It True, all files paths created by the EnergyPlus
|
|
run are returned.
|
|
|
|
Returns:
|
|
2-tuple: a 1-tuple or a 2-tuple
|
|
- dict: dict of [(title, table), .....]
|
|
- IDF: The IDF object. Only provided if return_idf is True.
|
|
|
|
Raises:
|
|
EnergyPlusProcessError.
|
|
"""
|
|
eplus_file = Path(eplus_file)
|
|
weather_file = Path(weather_file)
|
|
|
|
frame = inspect.currentframe()
|
|
args, _, _, values = inspect.getargvalues(frame)
|
|
args = {arg: values[arg] for arg in args}
|
|
|
|
cache_filename = hash_file(eplus_file)
|
|
if not output_prefix:
|
|
output_prefix = cache_filename
|
|
if not output_directory:
|
|
output_directory = settings.cache_folder / cache_filename
|
|
else:
|
|
output_directory = Path(output_directory)
|
|
args["output_directory"] = output_directory
|
|
# <editor-fold desc="Try to get cached results">
|
|
try:
|
|
start_time = time.time()
|
|
cached_run_results = get_from_cache(args)
|
|
except Exception as e:
|
|
# catch other exceptions that could occur
|
|
raise Exception("{}".format(e))
|
|
else:
|
|
if cached_run_results:
|
|
# if cached run found, simply return it
|
|
log(
|
|
"Successfully parsed cached idf run in {:,.2f} seconds".format(
|
|
time.time() - start_time
|
|
),
|
|
name=eplus_file.basename(),
|
|
)
|
|
# return_idf
|
|
if return_idf:
|
|
filepath = os.path.join(
|
|
output_directory,
|
|
hash_file(output_directory / eplus_file.basename(), args),
|
|
eplus_file.basename(),
|
|
)
|
|
idf = load_idf(
|
|
filepath,
|
|
weather_file=weather_file,
|
|
output_folder=output_directory,
|
|
include=include,
|
|
)
|
|
else:
|
|
idf = None
|
|
if return_files:
|
|
files = Path(
|
|
os.path.join(
|
|
output_directory,
|
|
hash_file(output_directory / eplus_file.basename(), args),
|
|
)
|
|
).files()
|
|
else:
|
|
files = None
|
|
return_elements = list(
|
|
compress(
|
|
[cached_run_results, idf, files], [True, return_idf, return_files]
|
|
)
|
|
)
|
|
return _unpack_tuple(return_elements)
|
|
|
|
runs_not_found = eplus_file
|
|
# </editor-fold>
|
|
|
|
# <editor-fold desc="Upgrade the file version if needed">
|
|
if ep_version:
|
|
# if users specifies version, make sure dots are replaced with "-".
|
|
ep_version = ep_version.replace(".", "-")
|
|
else:
|
|
# if no version is specified, take the package default version
|
|
ep_version = translater.settings.ep_version
|
|
eplus_file = idf_version_updater(
|
|
upgraded_file(eplus_file, output_directory),
|
|
to_version=ep_version,
|
|
out_dir=output_directory,
|
|
)
|
|
# In case the file has been updated, update the versionid of the file
|
|
# and the idd_file
|
|
versionid = get_idf_version(eplus_file, doted=False)
|
|
idd_file = Path(getiddfile(get_idf_version(eplus_file, doted=True)))
|
|
# </editor-fold>
|
|
|
|
# Prepare outputs e.g. sql table
|
|
if prep_outputs:
|
|
# Check if idf file has necessary objects (eg specific outputs)
|
|
idf_obj = load_idf(
|
|
eplus_file,
|
|
idd_filename=idd_file,
|
|
output_folder=output_directory,
|
|
weather_file=weather_file,
|
|
)
|
|
# Call the OutputPrep class with chained instance methods to add all
|
|
# necessary outputs + custom ones defined in the parameters of this function.
|
|
OutputPrep(
|
|
idf=idf_obj, save=True
|
|
).add_basics().add_template_outputs().add_custom(
|
|
outputs=prep_outputs
|
|
).add_profile_gas_elect_ouputs()
|
|
|
|
if runs_not_found:
|
|
# continue with simulation of other files
|
|
log(
|
|
"no cached run for {}. Running EnergyPlus...".format(
|
|
os.path.basename(eplus_file)
|
|
),
|
|
name=eplus_file.basename(),
|
|
)
|
|
|
|
start_time = time.time()
|
|
if isinstance(include, str):
|
|
include = Path().abspath().glob(include)
|
|
elif include is not None:
|
|
include = [Path(file) for file in include]
|
|
# run the EnergyPlus Simulation
|
|
with TempDir(
|
|
prefix="eplus_run_", suffix=output_prefix, dir=output_directory
|
|
) as tmp:
|
|
log(
|
|
"temporary dir (%s) created" % tmp, lg.DEBUG, name=eplus_file.basename()
|
|
)
|
|
if include:
|
|
include = [file.copy(tmp) for file in include]
|
|
tmp_file = Path(eplus_file.copy(tmp))
|
|
runargs = {
|
|
"tmp": tmp,
|
|
"eplus_file": tmp_file,
|
|
"weather": Path(weather_file.copy(tmp)),
|
|
"verbose": verbose,
|
|
"output_directory": output_directory,
|
|
"ep_version": versionid,
|
|
"output_prefix": hash_file(eplus_file, args),
|
|
"idd": Path(idd_file.copy(tmp)),
|
|
"annual": annual,
|
|
"epmacro": epmacro,
|
|
"readvars": readvars,
|
|
"output_suffix": output_suffix,
|
|
"version": version,
|
|
"expandobjects": expandobjects,
|
|
"design_day": design_day,
|
|
"keep_data_err": keep_data_err,
|
|
"output_report": output_report,
|
|
"include": include,
|
|
"custom_processes": custom_processes,
|
|
}
|
|
|
|
_run_exec(**runargs)
|
|
|
|
log(
|
|
"EnergyPlus Completed in {:,.2f} seconds".format(
|
|
time.time() - start_time
|
|
),
|
|
name=eplus_file.basename(),
|
|
)
|
|
|
|
processes = {"*.csv": _process_csv} # output_prefix +
|
|
if custom_processes is not None:
|
|
processes.update(custom_processes)
|
|
|
|
results = []
|
|
if process_files:
|
|
|
|
for glob, process in processes.items():
|
|
results.extend(
|
|
[
|
|
(
|
|
file.basename(),
|
|
process(
|
|
file,
|
|
working_dir=os.getcwd(),
|
|
simulname=output_prefix,
|
|
),
|
|
)
|
|
for file in tmp.files(glob)
|
|
]
|
|
)
|
|
|
|
save_dir = output_directory / hash_file(eplus_file, args)
|
|
if keep_data:
|
|
save_dir.rmtree_p()
|
|
tmp.copytree(save_dir)
|
|
|
|
log(
|
|
"Files generated at the end of the simulation: %s"
|
|
% "\n".join((save_dir).files()),
|
|
lg.DEBUG,
|
|
name=eplus_file.basename(),
|
|
)
|
|
if return_files:
|
|
files = save_dir.files()
|
|
else:
|
|
files = None
|
|
|
|
# save runargs
|
|
cache_runargs(tmp_file, runargs.copy())
|
|
|
|
# Return summary DataFrames
|
|
runargs["output_directory"] = save_dir
|
|
cached_run_results = get_report(**runargs)
|
|
if return_idf:
|
|
idf = load_idf(
|
|
eplus_file,
|
|
output_folder=output_directory,
|
|
include=include,
|
|
weather_file=weather_file,
|
|
)
|
|
runargs["output_report"] = "sql"
|
|
idf._sql = get_report(**runargs)
|
|
runargs["output_report"] = "sql_file"
|
|
idf._sql_file = get_report(**runargs)
|
|
runargs["output_report"] = "htm"
|
|
idf._htm = get_report(**runargs)
|
|
else:
|
|
idf = None
|
|
return_elements = list(
|
|
compress(
|
|
[cached_run_results, idf, files], [True, return_idf, return_files]
|
|
)
|
|
)
|
|
return _unpack_tuple(return_elements)
|
|
|
|
|
|
def upgraded_file(eplus_file, output_directory):
|
|
"""returns the eplus_file path that would have been copied in the output
|
|
directory if it exists
|
|
|
|
Args:
|
|
eplus_file:
|
|
output_directory:
|
|
"""
|
|
if settings.use_cache:
|
|
eplus_file = next(iter(output_directory.glob("*.idf")), eplus_file)
|
|
return eplus_file
|
|
|
|
|
|
def _process_csv(file, working_dir, simulname):
|
|
"""
|
|
Args:
|
|
file:
|
|
working_dir:
|
|
simulname:
|
|
"""
|
|
try:
|
|
log("looking for csv output, return the csv files in DataFrames if any")
|
|
if "table" in file.basename():
|
|
tables_out = working_dir.abspath() / "tables"
|
|
tables_out.makedirs_p()
|
|
file.copy(
|
|
tables_out / "%s_%s.csv" % (file.basename().stripext(), simulname)
|
|
)
|
|
return
|
|
log("try to store file %s in DataFrame" % (file))
|
|
df = pd.read_csv(file, sep=",", encoding="us-ascii")
|
|
log("file %s stored" % file)
|
|
return df
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
def _run_exec(
|
|
tmp,
|
|
eplus_file,
|
|
weather,
|
|
output_directory,
|
|
annual,
|
|
design_day,
|
|
idd,
|
|
epmacro,
|
|
expandobjects,
|
|
readvars,
|
|
output_prefix,
|
|
output_suffix,
|
|
version,
|
|
verbose,
|
|
ep_version,
|
|
keep_data_err,
|
|
output_report,
|
|
include,
|
|
custom_processes,
|
|
):
|
|
"""Wrapper around the EnergyPlus command line interface.
|
|
|
|
Adapted from :func:`eppy.runner.runfunctions.run`.
|
|
|
|
Args:
|
|
tmp:
|
|
eplus_file:
|
|
weather:
|
|
output_directory:
|
|
annual:
|
|
design_day:
|
|
idd:
|
|
epmacro:
|
|
expandobjects:
|
|
readvars:
|
|
output_prefix:
|
|
output_suffix:
|
|
version:
|
|
verbose:
|
|
ep_version:
|
|
keep_data_err:
|
|
output_report:
|
|
include:
|
|
"""
|
|
|
|
args = locals().copy()
|
|
# get unneeded params out of args ready to pass the rest to energyplus.exe
|
|
verbose = args.pop("verbose")
|
|
eplus_file = args.pop("eplus_file")
|
|
iddname = args.get("idd")
|
|
tmp = args.pop("tmp")
|
|
keep_data_err = args.pop("keep_data_err")
|
|
output_directory = args.pop("output_directory")
|
|
output_report = args.pop("output_report")
|
|
idd = args.pop("idd")
|
|
include = args.pop("include")
|
|
custom_processes = args.pop("custom_processes")
|
|
try:
|
|
idf_path = os.path.abspath(eplus_file.idfname)
|
|
except AttributeError:
|
|
idf_path = os.path.abspath(eplus_file)
|
|
ep_version = args.pop("ep_version")
|
|
# get version from IDF object or by parsing the IDF file for it
|
|
if not ep_version:
|
|
try:
|
|
ep_version = "-".join(str(x) for x in eplus_file.idd_version[:3])
|
|
except AttributeError:
|
|
raise AttributeError(
|
|
"The ep_version must be set when passing an IDF path. \
|
|
Alternatively, use IDF.run()"
|
|
)
|
|
|
|
eplus_exe_path, eplus_weather_path = eppy.runner.run_functions.install_paths(
|
|
ep_version, iddname
|
|
)
|
|
if version:
|
|
# just get EnergyPlus version number and return
|
|
cmd = [eplus_exe_path, "--version"]
|
|
subprocess.check_call(cmd)
|
|
return
|
|
|
|
# convert paths to absolute paths if required
|
|
if os.path.isfile(args["weather"]):
|
|
args["weather"] = os.path.abspath(args["weather"])
|
|
else:
|
|
args["weather"] = os.path.join(eplus_weather_path, args["weather"])
|
|
# args['output_directory'] = tmp.abspath()
|
|
|
|
with tmp.abspath() as tmp:
|
|
# build a list of command line arguments
|
|
cmd = [eplus_exe_path]
|
|
for arg in args:
|
|
if args[arg]:
|
|
if isinstance(args[arg], bool):
|
|
args[arg] = ""
|
|
cmd.extend(["--{}".format(arg.replace("_", "-"))])
|
|
if args[arg] != "":
|
|
cmd.extend([args[arg]])
|
|
cmd.extend([idf_path])
|
|
|
|
with subprocess.Popen(
|
|
cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT
|
|
) as process:
|
|
_log_subprocess_output(
|
|
process.stdout, name=eplus_file.basename(), verbose=verbose
|
|
)
|
|
if process.wait() != 0:
|
|
error_filename = output_prefix + "out.err"
|
|
with open(error_filename, "r") as stderr:
|
|
stderr_r = stderr.read()
|
|
if keep_data_err:
|
|
failed_dir = output_directory / "failed"
|
|
failed_dir.mkdir_p()
|
|
tmp.copytree(failed_dir / output_prefix)
|
|
raise EnergyPlusProcessError(
|
|
cmd=cmd, idf=eplus_file.basename(), stderr=stderr_r
|
|
)
|
|
|
|
|
|
def _log_subprocess_output(pipe, name, verbose):
|
|
"""
|
|
Args:
|
|
pipe:
|
|
name:
|
|
verbose:
|
|
"""
|
|
logger = None
|
|
for line in iter(pipe.readline, b""):
|
|
if verbose == "v":
|
|
logger = log(
|
|
line.decode().strip("\n"),
|
|
level=lg.DEBUG,
|
|
name="eplus_run_" + name,
|
|
filename="eplus_run_" + name,
|
|
log_dir=os.getcwd(),
|
|
)
|
|
if logger:
|
|
close_logger(logger)
|
|
|
|
|
|
def hash_file(eplus_file, kwargs=None):
|
|
"""Simple function to hash a file and return it as a string. Will also hash
|
|
the :func:`eppy.runner.run_functions.run()` arguments so that correct
|
|
results are returned when different run arguments are used.
|
|
|
|
Todo:
|
|
Hashing should include the external files used an idf file. For example,
|
|
if a model uses a csv file as an input and that file changes, the
|
|
hashing will currently not pickup that change. This could result in
|
|
loading old results without the user knowing.
|
|
|
|
Args:
|
|
eplus_file (str): path of the idf file.
|
|
kwargs (dict): keywargs to serialize in addition to the file content.
|
|
|
|
Returns:
|
|
str: The digest value as a string of hexadecimal digits
|
|
"""
|
|
if kwargs:
|
|
# Before we hash the kwargs, remove the ones that don't have an impact on
|
|
# simulation results and so should not change the cache dirname.
|
|
no_impact = ["keep_data", "keep_data_err", "return_idf", "return_files"]
|
|
for argument in no_impact:
|
|
_ = kwargs.pop(argument, None)
|
|
|
|
# sorting keys for serialization of dictionary
|
|
kwargs = OrderedDict(sorted(kwargs.items()))
|
|
|
|
# create hasher
|
|
hasher = hashlib.md5()
|
|
with open(eplus_file, "rb") as afile:
|
|
buf = afile.read()
|
|
hasher.update(buf)
|
|
hasher.update(kwargs.__str__().encode("utf-8")) # Hashing the kwargs as well
|
|
return hasher.hexdigest()
|
|
|
|
|
|
def get_report(
|
|
eplus_file, output_directory=None, output_report="sql", output_prefix=None, **kwargs
|
|
):
|
|
"""Returns the specified report format (html or sql)
|
|
|
|
Args:
|
|
eplus_file (str): path of the idf file
|
|
output_directory (str, optional): path to the output folder. Will
|
|
default to the settings.cache_folder.
|
|
output_report: 'html' or 'sql'
|
|
output_prefix:
|
|
**kwargs: keyword arguments to pass to hasher.
|
|
|
|
Returns:
|
|
dict: a dict of DataFrames
|
|
"""
|
|
# Hash the idf file with any kwargs used in the function
|
|
if output_prefix is None:
|
|
output_prefix = hash_file(eplus_file, kwargs)
|
|
if output_report is None:
|
|
return None
|
|
elif "htm" in output_report.lower():
|
|
# Get the html report
|
|
fullpath_filename = output_directory / output_prefix + "tbl.htm"
|
|
if fullpath_filename.exists():
|
|
return get_html_report(fullpath_filename)
|
|
else:
|
|
raise FileNotFoundError(
|
|
'File "{}" does not exist'.format(fullpath_filename)
|
|
)
|
|
|
|
elif "sql" == output_report.lower():
|
|
# Get the sql report
|
|
fullpath_filename = output_directory / output_prefix + "out.sql"
|
|
if fullpath_filename.exists():
|
|
return get_sqlite_report(fullpath_filename)
|
|
else:
|
|
raise FileNotFoundError(
|
|
'File "{}" does not exist'.format(fullpath_filename)
|
|
)
|
|
elif output_report.lower() == "sql_file":
|
|
# Get the sql report
|
|
fullpath_filename = output_directory / output_prefix + "out.sql"
|
|
if fullpath_filename.exists():
|
|
return fullpath_filename
|
|
else:
|
|
raise FileNotFoundError(
|
|
'File "{}" does not exist'.format(fullpath_filename)
|
|
)
|
|
else:
|
|
return None
|
|
|
|
|
|
def get_from_cache(kwargs):
|
|
"""Retrieve a EPlus Tabulated Summary run result from the cache
|
|
|
|
Args:
|
|
kwargs (dict): Args used to create the cache name.
|
|
|
|
Returns:
|
|
dict: dict of DataFrames
|
|
"""
|
|
output_directory = Path(kwargs.get("output_directory"))
|
|
output_report = kwargs.get("output_report")
|
|
eplus_file = next(iter(output_directory.glob("*.idf")), None)
|
|
if not eplus_file:
|
|
return None
|
|
if settings.use_cache:
|
|
# determine the filename by hashing the eplus_file
|
|
cache_filename_prefix = hash_file(eplus_file, kwargs)
|
|
|
|
if output_report is None:
|
|
# No report is expected but we should still return the path if it exists.
|
|
cached_run_dir = output_directory / cache_filename_prefix
|
|
if cached_run_dir.exists():
|
|
return cached_run_dir
|
|
else:
|
|
return None
|
|
elif "htm" in output_report.lower():
|
|
# Get the html report
|
|
|
|
cache_fullpath_filename = (
|
|
output_directory / cache_filename_prefix / cache_filename_prefix
|
|
+ "tbl.htm"
|
|
)
|
|
if cache_fullpath_filename.exists():
|
|
return get_html_report(cache_fullpath_filename)
|
|
|
|
elif "sql" == output_report.lower():
|
|
# get the SQL report
|
|
if not output_directory:
|
|
output_directory = settings.cache_folder / cache_filename_prefix
|
|
|
|
cache_fullpath_filename = (
|
|
output_directory / cache_filename_prefix / cache_filename_prefix
|
|
+ "out.sql"
|
|
)
|
|
|
|
if cache_fullpath_filename.exists():
|
|
# get reports from passed-in report names or from
|
|
# settings.available_sqlite_tables if None are given
|
|
return get_sqlite_report(
|
|
cache_fullpath_filename,
|
|
kwargs.get("report_tables", settings.available_sqlite_tables),
|
|
)
|
|
elif "sql_file" == output_report.lower():
|
|
# get the SQL report
|
|
if not output_directory:
|
|
output_directory = settings.cache_folder / cache_filename_prefix
|
|
|
|
cache_fullpath_filename = (
|
|
output_directory / cache_filename_prefix / cache_filename_prefix
|
|
+ "out.sql"
|
|
)
|
|
if cache_fullpath_filename.exists():
|
|
return cache_fullpath_filename
|
|
|
|
|
|
def get_html_report(report_fullpath):
|
|
"""Parses the html Summary Report for each tables into a dictionary of
|
|
DataFrames
|
|
|
|
Args:
|
|
report_fullpath (str): full path to the report file
|
|
|
|
Returns:
|
|
dict: dict of {title : table <DataFrame>,...}
|
|
"""
|
|
from eppy.results import readhtml # the eppy module with functions to read the html
|
|
|
|
with open(report_fullpath, "r", encoding="utf-8") as cache_file:
|
|
filehandle = cache_file.read() # get a file handle to the html file
|
|
|
|
cached_tbl = readhtml.titletable(
|
|
filehandle
|
|
) # get a file handle to the html file
|
|
|
|
log('Retrieved response from cache file "{}"'.format(report_fullpath))
|
|
return summary_reports_to_dataframes(cached_tbl)
|
|
|
|
|
|
def summary_reports_to_dataframes(reports_list):
|
|
"""Converts a list of [(title, table),...] to a dict of {title: table
|
|
<DataFrame>}. Duplicate keys must have their own unique names in the output
|
|
dict.
|
|
|
|
Args:
|
|
reports_list (list): a list of [(title, table),...]
|
|
|
|
Returns:
|
|
dict: a dict of {title: table <DataFrame>}
|
|
"""
|
|
results_dict = {}
|
|
for table in reports_list:
|
|
key = str(table[0])
|
|
if key in results_dict: # Check if key is already exists in
|
|
# dictionary and give it a new name
|
|
key = key + "_"
|
|
df = pd.DataFrame(table[1])
|
|
df = df.rename(columns=df.iloc[0]).drop(df.index[0])
|
|
results_dict[key] = df
|
|
return results_dict
|
|
|
|
|
|
def get_sqlite_report(report_file, report_tables=None):
|
|
"""Connects to the EnergyPlus SQL output file and retreives all tables
|
|
|
|
Args:
|
|
report_file (str): path of report file
|
|
report_tables (list, optional): list of report table names to retreive.
|
|
Defaults to settings.available_sqlite_tables
|
|
|
|
Returns:
|
|
dict: dict of DataFrames
|
|
"""
|
|
# set list of report tables
|
|
if not report_tables:
|
|
report_tables = settings.available_sqlite_tables
|
|
|
|
# if file exists, parse it with pandas' read_sql_query
|
|
if os.path.isfile(report_file):
|
|
import sqlite3
|
|
import numpy as np
|
|
|
|
# create database connection with sqlite3
|
|
with sqlite3.connect(report_file) as conn:
|
|
# empty dict to hold all DataFrames
|
|
all_tables = {}
|
|
# Iterate over all tables in the report_tables list
|
|
for table in report_tables:
|
|
try:
|
|
# Try regular str read, could fail if wrong encoding
|
|
conn.text_factory = str
|
|
df = pd.read_sql_query(
|
|
"select * from {};".format(table),
|
|
conn,
|
|
index_col=report_tables[table]["PrimaryKey"],
|
|
parse_dates=report_tables[table]["ParseDates"],
|
|
coerce_float=True,
|
|
)
|
|
all_tables[table] = df
|
|
except OperationalError:
|
|
# Wring encoding found, the load bytes and ecode object
|
|
# columns only
|
|
conn.text_factory = bytes
|
|
df = pd.read_sql_query(
|
|
"select * from {};".format(table),
|
|
conn,
|
|
index_col=report_tables[table]["PrimaryKey"],
|
|
parse_dates=report_tables[table]["ParseDates"],
|
|
coerce_float=True,
|
|
)
|
|
str_df = df.select_dtypes([np.object])
|
|
str_df = str_df.stack().str.decode("8859").unstack()
|
|
for col in str_df:
|
|
df[col] = str_df[col]
|
|
all_tables[table] = df
|
|
log(
|
|
"SQL query parsed {} tables as DataFrames from {}".format(
|
|
len(all_tables), report_file
|
|
)
|
|
)
|
|
return all_tables
|
|
|
|
|
|
def idf_version_updater(idf_file, to_version=None, out_dir=None, simulname=None):
|
|
"""EnergyPlus idf version updater using local transition program.
|
|
|
|
Update the EnergyPlus simulation file (.idf) to the latest available
|
|
EnergyPlus version installed on this machine. Optionally specify a version
|
|
(eg.: "9-2-0") to aim for a specific version. The output will be the path of
|
|
the updated file. The run is multiprocessing_safe.
|
|
|
|
Hint:
|
|
If attempting to upgrade an earlier version of EnergyPlus ( pre-v7.2.0),
|
|
specific binaries need to be downloaded and copied to the
|
|
EnergyPlus*/PreProcess/IDFVersionUpdater folder. More info at
|
|
`Converting older version files
|
|
<http://energyplus.helpserve.com/Knowledgebase/List/Index/46
|
|
/converting-older-version-files>`_ .
|
|
|
|
Args:
|
|
idf_file (Path): path of idf file
|
|
to_version (str, optional): EnergyPlus version in the form "X-X-X".
|
|
out_dir (Path): path of the output_dir
|
|
simulname (str or None, optional): this name will be used for temp dir
|
|
id and saved outputs. If not provided, uuid.uuid1() is used. Be
|
|
careful to avoid naming collision : the run will alway be done in
|
|
separated folders, but the output files can overwrite each other if
|
|
the simulname is the same. (default: None)
|
|
|
|
Returns:
|
|
Path: The path of the new transitioned idf file.
|
|
"""
|
|
idf_file = Path(idf_file)
|
|
if not out_dir:
|
|
# if no directory is provided, use directory of file
|
|
out_dir = idf_file.dirname()
|
|
if not out_dir.isdir() and out_dir != "":
|
|
# check if dir exists
|
|
out_dir.makedirs_p()
|
|
with TemporaryDirectory(
|
|
prefix="transition_run_", suffix=simulname, dir=out_dir
|
|
) as tmp:
|
|
log("temporary dir (%s) created" % tmp, lg.DEBUG)
|
|
idf_file = Path(idf_file.copy(tmp)).abspath() # copy and return abspath
|
|
|
|
versionid = get_idf_version(idf_file, doted=False)[0:5]
|
|
doted_version = get_idf_version(idf_file, doted=True)
|
|
iddfile = getiddfile(doted_version)
|
|
if os.path.exists(iddfile):
|
|
# if a E+ exists, means there is an E+ install that can be used
|
|
if versionid == to_version:
|
|
# if version of idf file is equal to intended version, copy file from
|
|
# temp transition folder into cache folder and return path
|
|
return idf_file.copy(out_dir / idf_file.basename())
|
|
# might be an old version of E+
|
|
elif tuple(map(int, doted_version.split("."))) < (8, 0):
|
|
# the version is an old E+ version (< 8.0)
|
|
iddfile = getoldiddfile(doted_version)
|
|
if versionid == to_version:
|
|
# if version of idf file is equal to intended version, copy file from
|
|
# temp transition folder into cache folder and return path
|
|
return idf_file.copy(out_dir / idf_file.basename())
|
|
# use to_version
|
|
if to_version is None:
|
|
# What is the latest E+ installed version
|
|
to_version = find_eplus_installs(iddfile)
|
|
if tuple(versionid.split("-")) > tuple(to_version.split("-")):
|
|
raise EnergyPlusVersionError(idf_file, versionid, to_version)
|
|
vupdater_path = (
|
|
get_eplus_dirs(settings.ep_version) / "PreProcess" / "IDFVersionUpdater"
|
|
)
|
|
exe = ".exe" if platform.system() == "Windows" else ""
|
|
trans_exec = {
|
|
"1-0-0": vupdater_path / "Transition-V1-0-0-to-V1-0-1" + exe,
|
|
"1-0-1": vupdater_path / "Transition-V1-0-1-to-V1-0-2" + exe,
|
|
"1-0-2": vupdater_path / "Transition-V1-0-2-to-V1-0-3" + exe,
|
|
"1-0-3": vupdater_path / "Transition-V1-0-3-to-V1-1-0" + exe,
|
|
"1-1-0": vupdater_path / "Transition-V1-1-0-to-V1-1-1" + exe,
|
|
"1-1-1": vupdater_path / "Transition-V1-1-1-to-V1-2-0" + exe,
|
|
"1-2-0": vupdater_path / "Transition-V1-2-0-to-V1-2-1" + exe,
|
|
"1-2-1": vupdater_path / "Transition-V1-2-1-to-V1-2-2" + exe,
|
|
"1-2-2": vupdater_path / "Transition-V1-2-2-to-V1-2-3" + exe,
|
|
"1-2-3": vupdater_path / "Transition-V1-2-3-to-V1-3-0" + exe,
|
|
"1-3-0": vupdater_path / "Transition-V1-3-0-to-V1-4-0" + exe,
|
|
"1-4-0": vupdater_path / "Transition-V1-4-0-to-V2-0-0" + exe,
|
|
"2-0-0": vupdater_path / "Transition-V2-0-0-to-V2-1-0" + exe,
|
|
"2-1-0": vupdater_path / "Transition-V2-1-0-to-V2-2-0" + exe,
|
|
"2-2-0": vupdater_path / "Transition-V2-2-0-to-V3-0-0" + exe,
|
|
"3-0-0": vupdater_path / "Transition-V3-0-0-to-V3-1-0" + exe,
|
|
"3-1-0": vupdater_path / "Transition-V3-1-0-to-V4-0-0" + exe,
|
|
"4-0-0": vupdater_path / "Transition-V4-0-0-to-V5-0-0" + exe,
|
|
"5-0-0": vupdater_path / "Transition-V5-0-0-to-V6-0-0" + exe,
|
|
"6-0-0": vupdater_path / "Transition-V6-0-0-to-V7-0-0" + exe,
|
|
"7-0-0": vupdater_path / "Transition-V7-0-0-to-V7-1-0" + exe,
|
|
"7-1-0": vupdater_path / "Transition-V7-1-0-to-V7-2-0" + exe,
|
|
"7-2-0": vupdater_path / "Transition-V7-2-0-to-V8-0-0" + exe,
|
|
"8-0-0": vupdater_path / "Transition-V8-0-0-to-V8-1-0" + exe,
|
|
"8-1-0": vupdater_path / "Transition-V8-1-0-to-V8-2-0" + exe,
|
|
"8-2-0": vupdater_path / "Transition-V8-2-0-to-V8-3-0" + exe,
|
|
"8-3-0": vupdater_path / "Transition-V8-3-0-to-V8-4-0" + exe,
|
|
"8-4-0": vupdater_path / "Transition-V8-4-0-to-V8-5-0" + exe,
|
|
"8-5-0": vupdater_path / "Transition-V8-5-0-to-V8-6-0" + exe,
|
|
"8-6-0": vupdater_path / "Transition-V8-6-0-to-V8-7-0" + exe,
|
|
"8-7-0": vupdater_path / "Transition-V8-7-0-to-V8-8-0" + exe,
|
|
"8-8-0": vupdater_path / "Transition-V8-8-0-to-V8-9-0" + exe,
|
|
"8-9-0": vupdater_path / "Transition-V8-9-0-to-V9-0-0" + exe,
|
|
"9-0-0": vupdater_path / "Transition-V9-0-0-to-V9-1-0" + exe,
|
|
"9-1-0": vupdater_path / "Transition-V9-1-0-to-V9-2-0" + exe,
|
|
}
|
|
|
|
# check the file version, if it corresponds to the latest version found on
|
|
# the machine, means its already upgraded to the correct version. Return it.
|
|
if versionid == to_version:
|
|
# if file version and to_version are the same, we don't need to
|
|
# perform transition
|
|
log(
|
|
'file {} already upgraded to latest version "{}"'.format(
|
|
idf_file, versionid
|
|
)
|
|
)
|
|
idf_file = Path(idf_file.copy(out_dir))
|
|
return idf_file
|
|
|
|
# Otherwise,
|
|
# build a list of command line arguments
|
|
try:
|
|
with cd(vupdater_path):
|
|
transitions = [
|
|
key
|
|
for key in trans_exec
|
|
if tuple(map(int, key.split("-")))
|
|
< tuple(map(int, to_version.split("-")))
|
|
and tuple(map(int, key.split("-")))
|
|
>= tuple(map(int, versionid.split("-")))
|
|
]
|
|
for trans in transitions:
|
|
if not trans_exec[trans].exists():
|
|
raise EnergyPlusProcessError(
|
|
cmd=trans_exec[trans],
|
|
stderr="The specified EnergyPlus version (v{}) does not have"
|
|
" the required transition program '{}' in the "
|
|
"PreProcess folder. See the documentation "
|
|
"(translater.readthedocs.io/troubleshooting.html#missing-transition-programs) "
|
|
"to solve this issue".format(to_version, trans_exec[trans]),
|
|
idf=idf_file.basename(),
|
|
)
|
|
else:
|
|
cmd = [trans_exec[trans], idf_file]
|
|
try:
|
|
process = subprocess.Popen(
|
|
cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT
|
|
)
|
|
process_output, error_output = process.communicate()
|
|
log(process_output.decode("utf-8"), lg.DEBUG)
|
|
except CalledProcessError as exception:
|
|
log(
|
|
"{} failed with error\n".format(
|
|
idf_version_updater.__name__, str(exception)
|
|
),
|
|
lg.ERROR,
|
|
)
|
|
except EnergyPlusProcessError as e:
|
|
raise e
|
|
for f in Path(tmp).files("*.idfnew"):
|
|
f.copy(out_dir / idf_file.basename())
|
|
return Path(out_dir / idf_file.basename())
|
|
|
|
|
|
def find_eplus_installs(iddfile):
|
|
"""Finds all installed versions of EnergyPlus in the default location and
|
|
returns the latest version number
|
|
|
|
Args:
|
|
iddfile:
|
|
|
|
Returns:
|
|
(str): The version number of the latest E+ install
|
|
"""
|
|
vupdater_path, _ = iddfile.split("Energy+")
|
|
path_to_eplus, _ = vupdater_path.split("EnergyPlus")
|
|
|
|
# Find all EnergyPlus folders
|
|
list_eplus_dir = glob.glob(os.path.join(path_to_eplus, "EnergyPlus*"))
|
|
|
|
# check if any EnergyPlus install exists
|
|
if not list_eplus_dir:
|
|
raise Exception(
|
|
"No EnergyPlus installation found. Make sure you have EnergyPlus installed. "
|
|
"Go to https://energyplus.net/downloads to download the latest version of EnergyPlus."
|
|
)
|
|
|
|
# Find the most recent version of EnergyPlus installed from the version
|
|
# number (at the end of the folder name)
|
|
v0 = (0, 0, 0) # Initialize the version number
|
|
# Find the most recent version in the different folders found
|
|
for dir in list_eplus_dir:
|
|
version = dir[-5:]
|
|
ver = tuple(map(int, version.split("-")))
|
|
if ver > v0:
|
|
v0 = ver
|
|
|
|
return "-".join(tuple(map(str, v0)))
|
|
|
|
|
|
def get_idf_version(file, doted=True):
|
|
"""Get idf version quickly by reading first few lines of idf file containing
|
|
the 'VERSION' identifier
|
|
|
|
Args:
|
|
file (str): Absolute or relative Path to the idf file
|
|
doted (bool, optional): Wheter or not to return the version number
|
|
|
|
Returns:
|
|
str: the version id
|
|
"""
|
|
with open(os.path.abspath(file), "r", encoding="latin-1") as fhandle:
|
|
try:
|
|
txt = fhandle.read()
|
|
ntxt = parse_idd.nocomment(txt, "!")
|
|
blocks = ntxt.split(";")
|
|
blocks = [block.strip() for block in blocks]
|
|
bblocks = [block.split(",") for block in blocks]
|
|
bblocks1 = [[item.strip() for item in block] for block in bblocks]
|
|
ver_blocks = [block for block in bblocks1 if block[0].upper() == "VERSION"]
|
|
ver_block = ver_blocks[0]
|
|
if doted:
|
|
versionid = ver_block[1]
|
|
else:
|
|
versionid = ver_block[1].replace(".", "-") + "-0"
|
|
except Exception as e:
|
|
log('Version id for file "{}" cannot be found'.format(file))
|
|
log("{}".format(e))
|
|
raise
|
|
else:
|
|
return versionid
|
|
|
|
|
|
def getoldiddfile(versionid):
|
|
"""find the IDD file of the E+ installation E+ version 7 and earlier have
|
|
the idd in /EnergyPlus-7-2-0/bin/Energy+.idd
|
|
|
|
Args:
|
|
versionid:
|
|
"""
|
|
vlist = versionid.split(".")
|
|
if len(vlist) == 1:
|
|
vlist = vlist + ["0", "0"]
|
|
elif len(vlist) == 2:
|
|
vlist = vlist + ["0"]
|
|
ver_str = "-".join(vlist)
|
|
eplus_exe, _ = eppy.runner.run_functions.install_paths(ver_str)
|
|
eplusfolder = os.path.dirname(eplus_exe)
|
|
iddfile = "{}/bin/Energy+.idd".format(eplusfolder)
|
|
return iddfile
|
|
|
|
|
|
if __name__ == "__main__":
|
|
pass
|