api_v1.4/hub_api/helpers/auth.py

80 lines
2.4 KiB
Python

"""
HeatPump Service
SPDX - License - Identifier: LGPL - 3.0 - or -later
Copyright © 2023 Project Author Peter Yefi peteryefi@gmail.com
"""
from datetime import datetime, timedelta, timezone
from typing import Dict
from jwt import JWT, jwk_from_pem
import os
from jwt.utils import get_int_from_datetime
from functools import wraps
from flask import request, g
from hub_logger import logger
from persistence.models import UserRoles
from jwt.exceptions import JWTException
instance = JWT()
def generate_auth_token(user: Dict):
private_key = "{}/rsa.pem".format(os.path.expanduser('~'))
with open(private_key, 'rb') as fh:
signing_key = jwk_from_pem(fh.read())
user['exp'] = get_int_from_datetime(datetime.now(timezone.utc) + timedelta(hours=24))
user['iat'] = get_int_from_datetime(datetime.now(timezone.utc))
return instance.encode(user, signing_key, alg='RS256')
def validate_auth_token(token: str):
public_key = "{}/rsa.pub".format(os.path.expanduser('~'))
with open(public_key, 'rb') as fh:
verifying_key = jwk_from_pem(fh.read())
return instance.decode(token, verifying_key, do_time_check=True)
def role_required(roles: [str]):
def auth_module(user):
g.user = user
for role in roles:
if role == user['role']:
return True
return False
"""
A wrapper to authorize specific roles for specific endpoints
:param roles: a list of roles allowed
:return:
"""
def role_decorator(f):
@wraps(f)
def wrapper(*args, **kwargs):
try:
token = request.headers['Authorization'].split()[1]
user = validate_auth_token(token)
if user is None:
return {'messages': 'You have not been authenticated'}, 401
allowed = auth_module(user['user'])
if user['user']['role'] == UserRoles.Admin.value and 'localhost' not in request.headers['Host']:
allowed = False
if not allowed:
return {'messages': 'You are not authorized'}, 403
return f(*args, **kwargs)
except KeyError as err:
# logger Error
logger.error(err)
return {'messages': 'Invalid payload'}, 400
except JWTException as err:
logger.error(err)
return {'messages': 'Invalid token'}, 401
except Exception as err:
logger.error(err)
return {'messages': 'Sorry, an error occurred while processing your request'}, 500
return wrapper
return role_decorator