80 lines
2.4 KiB
Python
80 lines
2.4 KiB
Python
"""
|
|
HeatPump Service
|
|
SPDX - License - Identifier: LGPL - 3.0 - or -later
|
|
Copyright © 2023 Project Author Peter Yefi peteryefi@gmail.com
|
|
"""
|
|
|
|
from datetime import datetime, timedelta, timezone
|
|
from typing import Dict
|
|
from jwt import JWT, jwk_from_pem
|
|
import os
|
|
from jwt.utils import get_int_from_datetime
|
|
from functools import wraps
|
|
from flask import request, g
|
|
from hub.hub_logger import logger
|
|
from hub.persistence.models import UserRoles
|
|
from jwt.exceptions import JWTException
|
|
|
|
instance = JWT()
|
|
|
|
|
|
def generate_auth_token(user: Dict):
|
|
private_key = "{}/rsa.pem".format(os.path.expanduser('~'))
|
|
with open(private_key, 'rb') as fh:
|
|
signing_key = jwk_from_pem(fh.read())
|
|
user['exp'] = get_int_from_datetime(datetime.now(timezone.utc) + timedelta(hours=24))
|
|
user['iat'] = get_int_from_datetime(datetime.now(timezone.utc))
|
|
return instance.encode(user, signing_key, alg='RS256')
|
|
|
|
|
|
def validate_auth_token(token: str):
|
|
public_key = "{}/rsa.pub".format(os.path.expanduser('~'))
|
|
with open(public_key, 'rb') as fh:
|
|
verifying_key = jwk_from_pem(fh.read())
|
|
return instance.decode(token, verifying_key, do_time_check=True)
|
|
|
|
|
|
def role_required(roles: [str]):
|
|
def auth_module(user):
|
|
g.user = user
|
|
for role in roles:
|
|
if role == user['role']:
|
|
return True
|
|
return False
|
|
|
|
"""
|
|
A wrapper to authorize specific roles for specific endpoints
|
|
:param roles: a list of roles allowed
|
|
:return:
|
|
"""
|
|
|
|
def role_decorator(f):
|
|
@wraps(f)
|
|
def wrapper(*args, **kwargs):
|
|
try:
|
|
token = request.headers['Authorization'].split()[1]
|
|
user = validate_auth_token(token)
|
|
if user is None:
|
|
return {'messages': 'You have not been authenticated'}, 401
|
|
allowed = auth_module(user['user'])
|
|
if user['user']['role'] == UserRoles.Admin.value and 'localhost' not in request.headers['Host']:
|
|
allowed = False
|
|
if not allowed:
|
|
return {'messages': 'You are not authorized'}, 403
|
|
return f(*args, **kwargs)
|
|
except KeyError as err:
|
|
# logger Error
|
|
logger.error(err)
|
|
return {'messages': 'Invalid payload'}, 400
|
|
except JWTException as err:
|
|
logger.error(err)
|
|
return {'messages': 'Invalid token'}, 401
|
|
except Exception as err:
|
|
logger.error(err)
|
|
return {'messages': 'Sorry, an error occurred while processing your request'}, 500
|
|
|
|
return wrapper
|
|
|
|
return role_decorator
|
|
|