106 lines
3.6 KiB
Python
106 lines
3.6 KiB
Python
"""
|
|
City repository with database CRUD operations
|
|
SPDX - License - Identifier: LGPL - 3.0 - or -later
|
|
Copyright © 2022 Concordia CERC group
|
|
Project Coder Peter Yefi peteryefi@gmail.com
|
|
"""
|
|
|
|
from persistence import BaseRepo
|
|
from sqlalchemy.exc import SQLAlchemyError
|
|
from sqlalchemy import select
|
|
from persistence.models import User
|
|
from persistence.models import UserRoles
|
|
from helpers.auth import Auth
|
|
from typing import Union, Dict
|
|
from hub_logger import logger
|
|
import datetime
|
|
|
|
|
|
class UserRepo(BaseRepo):
|
|
_instance = None
|
|
|
|
def __init__(self, db_name: str, dotenv_path: str, app_env: str):
|
|
super().__init__(db_name, dotenv_path, app_env)
|
|
|
|
def __new__(cls, db_name, dotenv_path, app_env):
|
|
"""
|
|
Implemented for a singleton pattern
|
|
"""
|
|
if cls._instance is None:
|
|
cls._instance = super(UserRepo, cls).__new__(cls)
|
|
return cls._instance
|
|
|
|
def insert(self, name: str, email: str, password: str, role: UserRoles) -> Union[User, Dict]:
|
|
user = self.get_by_email(email)
|
|
if user is None:
|
|
try:
|
|
if Auth.validate_password(password):
|
|
user = User(name=name, email=email, password=Auth.hash_password(password), role=role)
|
|
self.session.add(user)
|
|
self.session.flush()
|
|
self.session.commit()
|
|
return user
|
|
except SQLAlchemyError as err:
|
|
logger.error(f'An error occured while creating user: {err}')
|
|
else:
|
|
return {'message': f'user with {email} email already exists'}
|
|
|
|
def update(self, user_id: int, name: str, email: str, password: str, role: UserRoles) -> Union[Dict, None]:
|
|
"""
|
|
Updates a user
|
|
:param user_id: the id of the user to be updated
|
|
:param name: the name of the user
|
|
:param email: the email of the user
|
|
:param password: the password of the user
|
|
:param role: the role of the user
|
|
:return:
|
|
"""
|
|
try:
|
|
if Auth.validate_password(password):
|
|
self.session.query(User).filter(User.id == user_id) \
|
|
.update({'name': name, 'email': email, 'password': Auth.hash_password(password), 'role': role,
|
|
'updated': datetime.datetime.utcnow()})
|
|
self.session.commit()
|
|
except SQLAlchemyError as err:
|
|
logger.error(f'Error while updating user: {err}')
|
|
return {'err_msg': 'Error occured while updated user'}
|
|
|
|
def get_by_email(self, email: str) -> [User]:
|
|
"""
|
|
Fetch user based on the email address
|
|
:param email: the email of the user
|
|
:return: [User] with the provided email
|
|
"""
|
|
try:
|
|
return self.session.execute(select(User).where(User.email == email)).first()
|
|
except SQLAlchemyError as err:
|
|
logger.error(f'Error while fetching user by email: {err}')
|
|
|
|
def delete_user(self, user_id: int):
|
|
"""
|
|
Deletes a user with the id
|
|
:param user_id: the user id
|
|
:return: None
|
|
"""
|
|
try:
|
|
self.session.query(User).filter(User.id == user_id).delete()
|
|
self.session.commit()
|
|
except SQLAlchemyError as err:
|
|
logger.error(f'Error while fetching user: {err}')
|
|
|
|
def get_user_by_email_and_password(self, email: str, password: str) -> [User]:
|
|
"""
|
|
Fetch user based on the email and password
|
|
:param email: the email of the user
|
|
:param password: the password of the user
|
|
:return: [User] with the provided email and password
|
|
"""
|
|
try:
|
|
user = self.session.execute(select(User).where(User.email == email)).first()
|
|
if user:
|
|
if Auth.check_password(password, user[0].password):
|
|
return user
|
|
return {'message': 'user not found'}
|
|
except SQLAlchemyError as err:
|
|
logger.error(f'Error while fetching user by email: {err}')
|