api_v1.4/hub_api/helpers/session_helper.py

90 lines
2.6 KiB
Python
Raw Normal View History

"""
Session helper
SPDX - License - Identifier: LGPL - 3.0 - or -later
Copyright © 2022 Project Author name guillermo.gutierrezmorote@concordia.ca
"""
import datetime
2024-05-13 02:03:26 -04:00
import shutil
import time
2024-05-13 02:03:26 -04:00
import uuid
2024-05-10 13:34:36 -04:00
from copy import deepcopy
2024-06-10 00:31:49 -04:00
from glob import glob
2024-05-02 11:28:53 -04:00
from pathlib import Path
sessions = {}
begin_time = None
2023-01-23 12:29:41 -05:00
swagger_data = None
city = None
greenery_catalog = None
construction_catalog = None
usage_catalog = None
debug_mode = False
2023-07-20 20:15:24 -04:00
def expired_sessions_collector(session_timeout_duration):
"""
Goes through each session in sessions and removes expired session(s)
"""
while True:
if bool(sessions):
2024-05-10 13:34:36 -04:00
_sessions = deepcopy(sessions)
for session_uuid in _sessions:
_expire = datetime.datetime.strptime(_sessions[session_uuid]['expire'], '%Y-%m-%d %H:%M:%S.%f')
if _expire < datetime.datetime.now():
2024-05-10 13:34:36 -04:00
print("session for user: ", _sessions[session_uuid]['user'], "expired.")
response_path = (Path(__file__).parent.parent / f'response_files/{session_uuid}').resolve()
2024-06-10 00:31:49 -04:00
if response_path.exists():
shutil.rmtree(response_path)
2023-08-03 10:42:41 -04:00
del sessions[session_uuid]
2024-06-10 00:31:49 -04:00
existing_directories = glob(f'{Path(__file__).parent.parent.resolve()}/response_files/*')
for directory in existing_directories:
if directory not in _sessions.keys():
shutil.rmtree(directory)
time.sleep(60 * int(session_timeout_duration))
def _validate_session(session_id, token, application_uuid):
"""
Checks if session is valid
"""
try:
_session = session(session_id)
if debug_mode:
token = _session['token']
return bool(_session) and (_session['token'] == token) and str(_session['application_uuid']) == application_uuid
except KeyError:
return False
def remove_session(session_id, token, application_uuid):
"""
Remove a session from the sessions array
"""
if _validate_session(session_id, token, application_uuid):
del sessions[session_id]
return True
return False
def refresh_session(session_id, token, application_uuid):
"""
Validate and extend current session
:return: valid session
"""
if _validate_session(session_id, token, application_uuid):
sessions[session_id]['expire'] = str(datetime.datetime.now() + datetime.timedelta(minutes=5))
sessions[session_id]['token'] = str(uuid.uuid4())
return session(session_id)
return None
def active_session(session_id, token, application_uuid):
_is_valid = _validate_session(session_id=session_id, token=token, application_uuid=application_uuid)
if _is_valid:
refresh_session(session_id, token, application_uuid)
return _is_valid
def session(session_id) -> {}:
return sessions[session_id]