2022-12-08 20:54:47 -05:00
|
|
|
"""
|
|
|
|
City repository with database CRUD operations
|
|
|
|
SPDX - License - Identifier: LGPL - 3.0 - or -later
|
|
|
|
Copyright © 2022 Concordia CERC group
|
|
|
|
Project Coder Peter Yefi peteryefi@gmail.com
|
|
|
|
"""
|
|
|
|
|
2023-01-24 10:51:50 -05:00
|
|
|
from hub.persistence import BaseRepo
|
2022-12-08 20:54:47 -05:00
|
|
|
from sqlalchemy.exc import SQLAlchemyError
|
|
|
|
from sqlalchemy import select
|
2023-01-24 10:51:50 -05:00
|
|
|
from hub.persistence.models import User
|
|
|
|
from hub.persistence.models import UserRoles
|
|
|
|
from hub.helpers.auth import Auth
|
2022-12-08 20:54:47 -05:00
|
|
|
from typing import Union, Dict
|
2023-01-24 10:51:50 -05:00
|
|
|
from hub.hub_logger import logger
|
2023-01-10 12:26:08 -05:00
|
|
|
import datetime
|
2022-12-08 20:54:47 -05:00
|
|
|
|
|
|
|
|
|
|
|
class UserRepo(BaseRepo):
|
|
|
|
_instance = None
|
|
|
|
|
|
|
|
def __init__(self, db_name: str, dotenv_path: str, app_env: str):
|
|
|
|
super().__init__(db_name, dotenv_path, app_env)
|
|
|
|
|
|
|
|
def __new__(cls, db_name, dotenv_path, app_env):
|
|
|
|
"""
|
|
|
|
Implemented for a singleton pattern
|
|
|
|
"""
|
|
|
|
if cls._instance is None:
|
|
|
|
cls._instance = super(UserRepo, cls).__new__(cls)
|
|
|
|
return cls._instance
|
|
|
|
|
|
|
|
def insert(self, name: str, email: str, password: str, role: UserRoles) -> Union[User, Dict]:
|
|
|
|
user = self.get_by_email(email)
|
|
|
|
if user is None:
|
|
|
|
try:
|
|
|
|
if Auth.validate_password(password):
|
|
|
|
user = User(name=name, email=email, password=Auth.hash_password(password), role=role)
|
|
|
|
self.session.add(user)
|
|
|
|
self.session.flush()
|
|
|
|
self.session.commit()
|
|
|
|
return user
|
|
|
|
except SQLAlchemyError as err:
|
2023-01-26 07:41:56 -05:00
|
|
|
logger.error(f'An error occurred while creating user: {err}')
|
2022-12-08 20:54:47 -05:00
|
|
|
else:
|
|
|
|
return {'message': f'user with {email} email already exists'}
|
|
|
|
|
2023-01-10 20:30:55 -05:00
|
|
|
def update(self, user_id: int, name: str, email: str, password: str, role: UserRoles) -> Union[Dict, None]:
|
2022-12-08 20:54:47 -05:00
|
|
|
"""
|
|
|
|
Updates a user
|
|
|
|
:param user_id: the id of the user to be updated
|
|
|
|
:param name: the name of the user
|
|
|
|
:param email: the email of the user
|
|
|
|
:param password: the password of the user
|
|
|
|
:param role: the role of the user
|
|
|
|
:return:
|
|
|
|
"""
|
|
|
|
try:
|
|
|
|
if Auth.validate_password(password):
|
|
|
|
self.session.query(User).filter(User.id == user_id) \
|
2023-01-10 12:26:08 -05:00
|
|
|
.update({'name': name, 'email': email, 'password': Auth.hash_password(password), 'role': role,
|
2023-01-10 20:30:55 -05:00
|
|
|
'updated': datetime.datetime.utcnow()})
|
2022-12-08 20:54:47 -05:00
|
|
|
self.session.commit()
|
|
|
|
except SQLAlchemyError as err:
|
2022-12-14 17:49:29 -05:00
|
|
|
logger.error(f'Error while updating user: {err}')
|
2023-01-26 07:41:56 -05:00
|
|
|
return {'err_msg': 'Error occurred while updated user'}
|
2022-12-08 20:54:47 -05:00
|
|
|
|
|
|
|
def get_by_email(self, email: str) -> [User]:
|
|
|
|
"""
|
|
|
|
Fetch user based on the email address
|
|
|
|
:param email: the email of the user
|
|
|
|
:return: [User] with the provided email
|
|
|
|
"""
|
|
|
|
try:
|
|
|
|
return self.session.execute(select(User).where(User.email == email)).first()
|
|
|
|
except SQLAlchemyError as err:
|
2022-12-14 17:49:29 -05:00
|
|
|
logger.error(f'Error while fetching user by email: {err}')
|
2022-12-08 20:54:47 -05:00
|
|
|
|
|
|
|
def delete_user(self, user_id: int):
|
|
|
|
"""
|
|
|
|
Deletes a user with the id
|
|
|
|
:param user_id: the user id
|
|
|
|
:return: None
|
|
|
|
"""
|
|
|
|
try:
|
|
|
|
self.session.query(User).filter(User.id == user_id).delete()
|
|
|
|
self.session.commit()
|
|
|
|
except SQLAlchemyError as err:
|
2022-12-14 17:49:29 -05:00
|
|
|
logger.error(f'Error while fetching user: {err}')
|
2022-12-08 20:54:47 -05:00
|
|
|
|
|
|
|
def get_user_by_email_and_password(self, email: str, password: str) -> [User]:
|
|
|
|
"""
|
|
|
|
Fetch user based on the email and password
|
|
|
|
:param email: the email of the user
|
|
|
|
:param password: the password of the user
|
|
|
|
:return: [User] with the provided email and password
|
|
|
|
"""
|
|
|
|
try:
|
|
|
|
user = self.session.execute(select(User).where(User.email == email)).first()
|
|
|
|
if user:
|
|
|
|
if Auth.check_password(password, user[0].password):
|
|
|
|
return user
|
2023-01-17 19:53:46 -05:00
|
|
|
else:
|
|
|
|
return {'message': 'Wrong email/password combination'}
|
2022-12-08 20:54:47 -05:00
|
|
|
return {'message': 'user not found'}
|
|
|
|
except SQLAlchemyError as err:
|
2022-12-14 17:49:29 -05:00
|
|
|
logger.error(f'Error while fetching user by email: {err}')
|