city_retrofit/hub/persistence/repositories/user_repo.py

107 lines
3.7 KiB
Python
Raw Normal View History

"""
City repository with database CRUD operations
SPDX - License - Identifier: LGPL - 3.0 - or -later
Copyright © 2022 Concordia CERC group
Project Coder Peter Yefi peteryefi@gmail.com
"""
2023-01-24 10:51:50 -05:00
from hub.persistence import BaseRepo
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy import select
2023-01-24 10:51:50 -05:00
from hub.persistence.models import User
from hub.persistence.models import UserRoles
from hub.helpers.auth import Auth
from typing import Union, Dict
2023-01-24 10:51:50 -05:00
from hub.hub_logger import logger
import datetime
class UserRepo(BaseRepo):
_instance = None
def __init__(self, db_name: str, dotenv_path: str, app_env: str):
super().__init__(db_name, dotenv_path, app_env)
def __new__(cls, db_name, dotenv_path, app_env):
"""
Implemented for a singleton pattern
"""
if cls._instance is None:
cls._instance = super(UserRepo, cls).__new__(cls)
return cls._instance
2023-01-31 13:11:39 -05:00
def insert(self, name: str, password: str, role: UserRoles, application_id: int) -> Union[User, Dict]:
user = self.get_by_name_and_application(name, application_id)
if user is None:
try:
2023-01-31 13:11:39 -05:00
user = User(name=name, password=Auth.hash_password(password), role=role, application_id=application_id)
self.session.add(user)
self.session.flush()
self.session.commit()
return user
except SQLAlchemyError as err:
logger.error(f'An error occurred while creating user: {err}')
else:
return {'message': f'user with {email} email already exists'}
2023-01-10 20:30:55 -05:00
def update(self, user_id: int, name: str, email: str, password: str, role: UserRoles) -> Union[Dict, None]:
"""
Updates a user
:param user_id: the id of the user to be updated
:param name: the name of the user
:param email: the email of the user
:param password: the password of the user
:param role: the role of the user
:return:
"""
try:
if Auth.validate_password(password):
self.session.query(User).filter(User.id == user_id) \
.update({'name': name, 'email': email, 'password': Auth.hash_password(password), 'role': role,
2023-01-10 20:30:55 -05:00
'updated': datetime.datetime.utcnow()})
self.session.commit()
except SQLAlchemyError as err:
2022-12-14 17:49:29 -05:00
logger.error(f'Error while updating user: {err}')
return {'err_msg': 'Error occurred while updated user'}
def get_by_email(self, email: str) -> [User]:
"""
Fetch user based on the email address
:param email: the email of the user
:return: [User] with the provided email
"""
try:
return self.session.execute(select(User).where(User.email == email)).first()
except SQLAlchemyError as err:
2022-12-14 17:49:29 -05:00
logger.error(f'Error while fetching user by email: {err}')
def delete_user(self, user_id: int):
"""
Deletes a user with the id
:param user_id: the user id
:return: None
"""
try:
self.session.query(User).filter(User.id == user_id).delete()
self.session.commit()
except SQLAlchemyError as err:
2022-12-14 17:49:29 -05:00
logger.error(f'Error while fetching user: {err}')
def get_user_by_email_and_password(self, email: str, password: str) -> [User]:
"""
Fetch user based on the email and password
:param email: the email of the user
:param password: the password of the user
:return: [User] with the provided email and password
"""
try:
user = self.session.execute(select(User).where(User.email == email)).first()
if user:
if Auth.check_password(password, user[0].password):
return user
else:
return {'message': 'Wrong email/password combination'}
return {'message': 'user not found'}
except SQLAlchemyError as err:
2022-12-14 17:49:29 -05:00
logger.error(f'Error while fetching user by email: {err}')