Included authorization token

This commit is contained in:
Peter Yefi 2023-01-11 19:56:51 -05:00
parent bcf5666af6
commit 2f59c83b12
6 changed files with 130 additions and 7 deletions

View File

@ -30,7 +30,7 @@ from hub_api.energy_demand import EnergyDemand
from hub_api.session import SessionStart, SessionEnd, KeepSessionAlive
from hub_api.uptime import Uptime
from hub_api.greenery import Greenery
from hub_api.user import User
from hub_api.user import User, UserLogin
app = flask.Flask('gamification')
api = Api(app)
@ -55,6 +55,7 @@ api.add_resource(MaterialLCACatalog, '/v1.4/material_lca_catalog/entries')
api.add_resource(MaterialLCACalculations, '/v1.4/material_lca_catalog/calculations')
api.add_resource(HeatPump, '/v1.4/heat-pump')
api.add_resource(User, '/v1.4/user')
api.add_resource(UserLogin, '/v1.4/user/login')
api.add_resource(SessionStart, '/v1.4/session/start')
api.add_resource(SessionEnd, '/v1.4/session/end')
api.add_resource(KeepSessionAlive, '/v1.4/session/keep_alive')
@ -70,8 +71,9 @@ app.config.update({
openapi_version='2.0.0'
),
'APISPEC_SWAGGER_URL': '/swagger/', # URI to access API Doc JSON
'APISPEC_SWAGGER_UI_URL': '/swagger-ui/' # URI to access UI of API Doc
'APISPEC_SWAGGER_UI_URL': '/api-docs/' # URI to access UI of API Doc
})
docs = FlaskApiSpec(app)
docs.register(HeatPump)
docs.register(User)
docs.register(UserLogin)

Binary file not shown.

73
hub_api/helpers/auth.py Normal file
View File

@ -0,0 +1,73 @@
from datetime import datetime, timedelta, timezone
from typing import Dict
from jwt import JWT, jwk_from_pem
import os
from jwt.utils import get_int_from_datetime
from functools import wraps
from flask import request
import json
from hub_logger import logger
from persistence.models import UserRoles
from jwt.exceptions import JWTException
instance = JWT()
def generate_auth_token(user: Dict):
private_key = "{}/rsa.pem".format(os.path.expanduser('~'))
with open(private_key, 'rb') as fh:
signing_key = jwk_from_pem(fh.read())
user['exp'] = get_int_from_datetime(datetime.now(timezone.utc) + timedelta(hours=24))
user['iat'] = get_int_from_datetime(datetime.now(timezone.utc))
return instance.encode(user, signing_key, alg='RS256')
def validate_auth_token(token: str):
public_key = "{}/rsa.pub".format(os.path.expanduser('~'))
with open(public_key, 'rb') as fh:
verifying_key = jwk_from_pem(fh.read())
return instance.decode(token, verifying_key, do_time_check=True)
def role_required(role: str):
def auth_module(user):
return user['role'] == role
"""
A wrapper to authorize specific roles for specific endpoints
:param roles: a list of roles allowed
:return:
"""
def role_decorator(f):
@wraps(f)
def wrapper(*args, **kwargs):
try:
token = request.headers['Authorization']
user = validate_auth_token(token)
if user is None:
return {'messages': 'You have not been authenticated'}, 401
allowed = auth_module(user['user'])
if user['user']['role'] == UserRoles.Admin.value and 'localhost' not in request.headers['Host']:
allowed = False
if not allowed:
return {'messages': 'You are not authorized'}, 403
return f(*args, **kwargs)
except KeyError as err:
# logger Error
logger.error(err)
return {'messages': 'Invalid payload'}, 400
except JWTException as err:
logger.error(err)
return {'messages': 'Invalid token'}, 401
except Exception as err:
logger.error(err)
return {'messages': 'Sorry, an error occurred while processing your request'}, 500
return wrapper
return role_decorator

View File

@ -11,17 +11,31 @@ from flask_apispec.views import MethodResource
from flask_restful import Resource
from marshmallow import Schema, fields
from imports.user_factory import UserFactory
from exports.user_factory import UserFactory as ExUserFactory
import os
from hub_logger import logger
from hub_api.helpers.auth import generate_auth_token, role_required
from persistence.models import UserRoles
class UserPostData(Schema):
class AuthorizationHeader(Schema):
Authorization = fields.Str(required=True, description='Authorization Token')
AppID = fields.Str(required=True, description='ID of Application Accessing API')
class LoginPostData(Schema):
"""
Defines post data for users
"""
password = fields.String(required=True, description='Password of user')
email = fields.String(required=True, description='Email of user')
class UserPostData(LoginPostData):
"""
Defines post data for users
"""
name = fields.String(required=True, description='Name of user')
email = fields.String(required=True, description='Email of user')
password = fields.String(required=True, description='Password of user')
role = fields.String(required=True, description='Allowed user roles', enum=['Admin', 'Hub_Reader'])
@ -46,6 +60,8 @@ class User(MethodResource, Resource):
dotenv_path="{}/.env".format(os.path.expanduser('~')))
@doc(description='Create users', tags=['CreateUser'])
@role_required(UserRoles.Admin.value)
@use_kwargs(AuthorizationHeader, location='headers')
@use_kwargs(UserPostData)
def post(self, **kwargs):
try:
@ -53,7 +69,7 @@ class User(MethodResource, Resource):
user = self.user_factory.create_user(name=kwargs["name"], email=kwargs["email"], password=kwargs["password"],
role=kwargs["role"])
if type(user) is dict:
return Response(response=json.dumps(user), status=200)
return Response(response=json.dumps(user), status=400)
return Response(response=json.dumps({'user': {'id': user.id, 'name': user.name, 'email': user.email,
'password': user.password, 'role': user.role.value}}), status=201)
except Exception as err:
@ -62,6 +78,8 @@ class User(MethodResource, Resource):
@doc(description='Get all users', tags=['UpdateUsers'])
@use_kwargs(UserPutData)
@role_required(UserRoles.Admin.value)
@use_kwargs(AuthorizationHeader, location='headers')
def put(self, **kwargs):
try:
res = self.user_factory.update_user(user_id=kwargs['id'], name=kwargs['name'], password=kwargs['password'],
@ -73,3 +91,32 @@ class User(MethodResource, Resource):
logger.error(err)
return Response(response=json.dumps({'err_msg': 'Sorry, an error occurred while updating user'}),
status=400)
class UserLogin(MethodResource, Resource):
def __init__(self):
self.user_factory = ExUserFactory(db_name='hub_prod', app_env='PROD',
dotenv_path="{}/.env".format(os.path.expanduser('~')))
@doc(description='Create users', tags=['LoginUser'])
@use_kwargs(LoginPostData)
def post(self, **kwargs):
try:
user = self.user_factory.login_user(email=kwargs["email"], password=kwargs["password"])
if type(user) is dict:
return Response(response=json.dumps(user), status=400)
user = user[0]
user_dict = {
'user': {
'id': user.id,
'name': user.name,
'email': user.email,
'password': user.password,
'role': user.role.value,
}
}
user_dict['token'] = generate_auth_token(user_dict)
return Response(response=json.dumps(user_dict), status=201)
except Exception as err:
logger.error(err)
return Response(response=json.dumps({'err_msg': 'Sorry an error occurred while authenticating user'}), status=400)

View File

@ -20,4 +20,5 @@ ply
rhino3dm==7.7.0
scipy
PyYAML
pyecore==0.12.2
pyecore==0.12.2
jwt==1.3.1