Added user authentication capabilities to hub
This commit is contained in:
parent
23ce8665a2
commit
4693edd5e8
|
@ -46,16 +46,21 @@ from exports.db_factory import DBFactory
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
dotenv_path = (Path(__file__).parent / '.env').resolve()
|
dotenv_path = (Path(__file__).parent / '.env').resolve()
|
||||||
factory = DBFactory(db_name='hub_db', app_env='PROD', dotenv_path=dotenv_path, city=None)
|
factory = DBFactory(db_name='hub_db', app_env='PROD', dotenv_path=dotenv_path)
|
||||||
```
|
```
|
||||||
|
|
||||||
|
|
||||||
## Create Database Tables ##
|
## Create Database Tables ##
|
||||||
Use the *DBSetUp* class in the persistence package to create the required database tables as described below
|
Use the *DBSetup* class in the persistence package to create the required database tables as described below
|
||||||
```python
|
```python
|
||||||
from persistence import DBSetup
|
from persistence import DBSetup
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
dotenv_path = (Path(__file__).parent / '.env').resolve()
|
dotenv_path = (Path(__file__).parent / '.env').resolve()
|
||||||
DBSetup(db_name='hub_db', app_env='PROD', dotenv_path=dotenv_path)
|
DBSetup(db_name='hub_db', app_env='PROD', dotenv_path=dotenv_path)
|
||||||
```
|
```
|
||||||
|
The *DBSetUp* class also creates a default admin user with default credentials that can be changed.
|
||||||
|
with the import UserFactory class. The admin user (name, email, password and role) is logged into the console after it is created by the
|
||||||
|
*constructor of DBSetup*. Use can also manage users (create, read, update and delete) with user import and export factories.
|
||||||
|
|
||||||
|
**NB: Make sure to change the default admin user credentials**
|
||||||
|
|
31
exports/user_factory.py
Normal file
31
exports/user_factory.py
Normal file
|
@ -0,0 +1,31 @@
|
||||||
|
"""
|
||||||
|
User performs user related crud operations
|
||||||
|
SPDX - License - Identifier: LGPL - 3.0 - or -later
|
||||||
|
Copyright © 2022 Concordia CERC group
|
||||||
|
Project CoderPeter Yefi peteryefi@gmail.com
|
||||||
|
"""
|
||||||
|
from persistence import UserRepo
|
||||||
|
|
||||||
|
|
||||||
|
class UserFactory:
|
||||||
|
"""
|
||||||
|
UserFactory class
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, db_name, app_env, dotenv_path):
|
||||||
|
self._user_repo = UserRepo(db_name=db_name, app_env=app_env, dotenv_path=dotenv_path)
|
||||||
|
|
||||||
|
def login_user(self, email: str, password: str):
|
||||||
|
"""
|
||||||
|
Retrieve a single city from postgres
|
||||||
|
:param email: the email of the user
|
||||||
|
:param password: the password of the user
|
||||||
|
"""
|
||||||
|
return self._user_repo.get_user_by_email_and_password(email, password)
|
||||||
|
|
||||||
|
def get_user_by_email(self, email):
|
||||||
|
"""
|
||||||
|
Retrieve a single user
|
||||||
|
:param email: the email of the user to get
|
||||||
|
"""
|
||||||
|
return self._user_repo.get_by_email(email)
|
43
helpers/auth.py
Normal file
43
helpers/auth.py
Normal file
|
@ -0,0 +1,43 @@
|
||||||
|
import bcrypt
|
||||||
|
import re
|
||||||
|
|
||||||
|
|
||||||
|
class Auth(object):
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def validate_password(password: str) -> bool:
|
||||||
|
"""
|
||||||
|
Validates a password
|
||||||
|
:param password: the password to validate
|
||||||
|
:return:
|
||||||
|
"""
|
||||||
|
pattern = "^(?=.*[a-z])(?=.*[A-Z])(?=.*\d)(?=.*[@$!%*#?&])[A-Za-z\d@$!#%*?&]{6,20}$"
|
||||||
|
pattern = re.compile(pattern)
|
||||||
|
if not re.search(pattern, password):
|
||||||
|
raise ValueError("Password must be between 6 to 20 characters and must have at least a number, an uppercase "
|
||||||
|
"letter, a lowercase letter, and a special character")
|
||||||
|
return True
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def hash_password(password: str) -> str:
|
||||||
|
"""
|
||||||
|
Hashes a password
|
||||||
|
:param password: the password to be hashed
|
||||||
|
:return:
|
||||||
|
"""
|
||||||
|
return bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt(14)).decode('utf-8')
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def check_password(password: str, hashed_password) -> bool:
|
||||||
|
"""
|
||||||
|
Hashes a password
|
||||||
|
:param password: the password to be checked
|
||||||
|
:param hashed_password: the hashed password
|
||||||
|
:return:
|
||||||
|
"""
|
||||||
|
return bcrypt.checkpw(password.encode('utf-8'), hashed_password.encode('utf-8'))
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -1,122 +0,0 @@
|
||||||
from persistence.models import Building
|
|
||||||
from city_model_structure.building_demand.surface import Surface
|
|
||||||
from typing import Dict, List, Union
|
|
||||||
import numpy as np
|
|
||||||
|
|
||||||
|
|
||||||
class CityUtil(object):
|
|
||||||
# holds a single instance of this class
|
|
||||||
_instance = None
|
|
||||||
|
|
||||||
def __init__(self):
|
|
||||||
pass
|
|
||||||
|
|
||||||
def __new__(cls):
|
|
||||||
"""
|
|
||||||
Implemented for a singleton pattern
|
|
||||||
"""
|
|
||||||
if cls._instance is None:
|
|
||||||
cls._instance = super(CityUtil, cls).__new__(cls)
|
|
||||||
return cls._instance
|
|
||||||
|
|
||||||
def _class_object_to_dict(self, class_obj) -> Union[Dict, None]:
|
|
||||||
"""
|
|
||||||
converts a class object to a dictionary
|
|
||||||
:param class_obj: the class object
|
|
||||||
:return:
|
|
||||||
"""
|
|
||||||
if class_obj is None:
|
|
||||||
return None
|
|
||||||
if type(class_obj) is not dict:
|
|
||||||
return vars(class_obj)
|
|
||||||
return class_obj
|
|
||||||
|
|
||||||
def _ndarray_to_list(self, key, building_dict) -> Union[Dict, None]:
|
|
||||||
"""
|
|
||||||
Converts a numpy array to dictionary
|
|
||||||
:param key: the key to the dictionary value
|
|
||||||
:param building_dict: the dictionary
|
|
||||||
:return:
|
|
||||||
"""
|
|
||||||
if dict is not None:
|
|
||||||
if type(building_dict[key]) is np.ndarray:
|
|
||||||
return building_dict[key].tolist()
|
|
||||||
else:
|
|
||||||
return building_dict[key]
|
|
||||||
return None
|
|
||||||
|
|
||||||
def object_list_to_dict_list(self, object_list):
|
|
||||||
"""
|
|
||||||
converts a list of objects to a list of dictionaries
|
|
||||||
:param object_list: a list of objects
|
|
||||||
:return:
|
|
||||||
"""
|
|
||||||
if object_list is None:
|
|
||||||
return []
|
|
||||||
return [self._class_object_to_dict(obj) for obj in object_list]
|
|
||||||
|
|
||||||
def _serialize_points(self, points) -> List:
|
|
||||||
"""
|
|
||||||
Deserializes arry of Point objects to array of dictionarier
|
|
||||||
:param points: a Point object
|
|
||||||
:return a list of points []:
|
|
||||||
"""
|
|
||||||
if points is None:
|
|
||||||
return None
|
|
||||||
serialized_points = []
|
|
||||||
for i in range(len(points)):
|
|
||||||
point = self._class_object_to_dict(points[i])
|
|
||||||
point['_coordinates'] = self._ndarray_to_list('_coordinates', point)
|
|
||||||
serialized_points.append(point)
|
|
||||||
return serialized_points
|
|
||||||
|
|
||||||
def extract_building_data(self, building: Building) -> Dict:
|
|
||||||
"""
|
|
||||||
Extracts various values from a building
|
|
||||||
:param building: the building object
|
|
||||||
:return: a dictionary {} of building attributes
|
|
||||||
"""
|
|
||||||
dict_building = {}
|
|
||||||
for key, value in vars(building).items():
|
|
||||||
if key in ['_name', '_year_of_construction', '_function', '_floor_area']:
|
|
||||||
continue
|
|
||||||
if type(value) is list:
|
|
||||||
if len(value) == 0:
|
|
||||||
dict_building[key] = []
|
|
||||||
elif len(value) > 0:
|
|
||||||
if type(value[0]) is Surface:
|
|
||||||
try:
|
|
||||||
surfaces = []
|
|
||||||
for surface in value:
|
|
||||||
surface_dict = vars(surface)
|
|
||||||
perimeter_polygon = self._class_object_to_dict(surface_dict['_perimeter_polygon'])
|
|
||||||
solid_polygon = self._class_object_to_dict(surface_dict['_solid_polygon'])
|
|
||||||
holes_polygon = self._class_object_to_dict(surface_dict['_holes_polygons'])
|
|
||||||
|
|
||||||
if perimeter_polygon is not None:
|
|
||||||
perimeter_polygon['_coordinates'] = self._ndarray_to_list('_coordinates', perimeter_polygon)
|
|
||||||
perimeter_polygon['_points'] = self._serialize_points(perimeter_polygon['_points'])
|
|
||||||
surface_dict['_perimeter_polygon'] = perimeter_polygon
|
|
||||||
|
|
||||||
if holes_polygon is not None:
|
|
||||||
holes_polygon['_coordinates'] = self._ndarray_to_list('_coordinates', holes_polygon)
|
|
||||||
holes_polygon['_points'] = self._serialize_points(holes_polygon['_points'])
|
|
||||||
surface_dict['_holes_polygons'] = holes_polygon
|
|
||||||
|
|
||||||
if solid_polygon is not None:
|
|
||||||
solid_polygon['_coordinates'] = self._ndarray_to_list('_coordinates', solid_polygon)
|
|
||||||
solid_polygon['_points'] = self._serialize_points(solid_polygon['_points'])
|
|
||||||
surface_dict['_solid_polygon'] = solid_polygon
|
|
||||||
|
|
||||||
surfaces.append(surface_dict)
|
|
||||||
dict_building[key] = surfaces
|
|
||||||
except KeyError as err:
|
|
||||||
print(f'Dictionary key error: {err}')
|
|
||||||
elif value is None:
|
|
||||||
dict_building[key] = None
|
|
||||||
elif type(value) in [str, int, dict, np.float64]:
|
|
||||||
dict_building[key] = value
|
|
||||||
elif type(value) is np.ndarray:
|
|
||||||
dict_building[key] = value.tolist()
|
|
||||||
|
|
||||||
return dict_building
|
|
45
imports/user_factory.py
Normal file
45
imports/user_factory.py
Normal file
|
@ -0,0 +1,45 @@
|
||||||
|
"""
|
||||||
|
User performs user-related crud operations
|
||||||
|
SPDX - License - Identifier: LGPL - 3.0 - or -later
|
||||||
|
Copyright © 2022 Concordia CERC group
|
||||||
|
Project CoderPeter Yefi peteryefi@gmail.com
|
||||||
|
"""
|
||||||
|
from persistence import UserRepo
|
||||||
|
from persistence import UserRoles
|
||||||
|
|
||||||
|
|
||||||
|
class UserFactory:
|
||||||
|
"""
|
||||||
|
UserFactory class
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, db_name, app_env, dotenv_path):
|
||||||
|
self._user_repo = UserRepo(db_name=db_name, app_env=app_env, dotenv_path=dotenv_path)
|
||||||
|
|
||||||
|
def create_user(self, name: str, email: str, password: str, role: UserRoles):
|
||||||
|
"""
|
||||||
|
Creates a new user
|
||||||
|
:param name: the name of the user
|
||||||
|
:param email: the email of the user
|
||||||
|
:param password: the password of the user
|
||||||
|
:param role: the role of the user
|
||||||
|
"""
|
||||||
|
return self._user_repo.insert(name, email, password, role)
|
||||||
|
|
||||||
|
def update_user(self, user_id: int, name: str, email: str, password: str, role: UserRoles):
|
||||||
|
"""
|
||||||
|
Creates a new user
|
||||||
|
:param user_id: the id of the user
|
||||||
|
:param name: the name of the user
|
||||||
|
:param email: the email of the user
|
||||||
|
:param password: the password of the user
|
||||||
|
:param role: the role of the user
|
||||||
|
"""
|
||||||
|
return self._user_repo.update(user_id, name, email, password, role)
|
||||||
|
|
||||||
|
def delete_user(self, user_id):
|
||||||
|
"""
|
||||||
|
Retrieve a single user
|
||||||
|
:param user_id: the id of the user to delete
|
||||||
|
"""
|
||||||
|
return self._user_repo.delete_user(user_id)
|
|
@ -1,5 +1,6 @@
|
||||||
from .base_repo import BaseRepo
|
from .base_repo import BaseRepo
|
||||||
from .repositories.city_repo import CityRepo
|
from .repositories.city_repo import CityRepo
|
||||||
from .repositories.building_repo import BuildingRepo
|
|
||||||
from .repositories.heat_pump_simulation_repo import HeatPumpSimulationRepo
|
from .repositories.heat_pump_simulation_repo import HeatPumpSimulationRepo
|
||||||
from .db_setup import DBSetup
|
from .db_setup import DBSetup
|
||||||
|
from .repositories.user_repo import UserRepo
|
||||||
|
from .models.user import UserRoles
|
||||||
|
|
|
@ -1,11 +1,31 @@
|
||||||
from persistence.models import City
|
from persistence.models import City
|
||||||
from persistence import BaseRepo
|
from persistence import BaseRepo
|
||||||
from persistence.models import HeatPumpSimulation
|
from persistence.models import HeatPumpSimulation
|
||||||
|
from persistence.models import User
|
||||||
|
from persistence.repositories import UserRepo
|
||||||
|
from persistence.models import UserRoles
|
||||||
|
|
||||||
|
|
||||||
class DBSetup:
|
class DBSetup:
|
||||||
|
|
||||||
def __init__(self, db_name, app_env, dotenv_path):
|
def __init__(self, db_name, app_env, dotenv_path):
|
||||||
|
"""
|
||||||
|
Creates database tables and a default admin user
|
||||||
|
:param db_name:
|
||||||
|
:param app_env:
|
||||||
|
:param dotenv_path:
|
||||||
|
"""
|
||||||
repo = BaseRepo(db_name=db_name, app_env=app_env, dotenv_path=dotenv_path)
|
repo = BaseRepo(db_name=db_name, app_env=app_env, dotenv_path=dotenv_path)
|
||||||
City.__table__.create(bind=repo.engine, checkfirst=True)
|
City.__table__.create(bind=repo.engine, checkfirst=True)
|
||||||
HeatPumpSimulation.__table__.create(bind=repo.engine, checkfirst=True)
|
HeatPumpSimulation.__table__.create(bind=repo.engine, checkfirst=True)
|
||||||
|
User.__table__.create(bind=repo.engine, checkfirst=True)
|
||||||
|
self._user_repo = UserRepo(db_name=db_name, app_env=app_env, dotenv_path=dotenv_path)
|
||||||
|
self._create_admin_user(self._user_repo)
|
||||||
|
|
||||||
|
def _create_admin_user(self, user_repo):
|
||||||
|
email = 'admin@hub.com'git
|
||||||
|
password = 'HubAdmin#!98'
|
||||||
|
print('Creating default admin user...')
|
||||||
|
user_repo.insert('Administrator', email, password, UserRoles.Admin)
|
||||||
|
print(f'Created Admin user with email: {email}, password: {password} and role: {UserRoles.Admin}')
|
||||||
|
print('Remember to change the admin default password and email address with the UserFactory')
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
from .building import Building
|
|
||||||
from .city import City
|
from .city import City
|
||||||
from .heat_pump_simulation import HeatPumpSimulation
|
from .heat_pump_simulation import HeatPumpSimulation
|
||||||
from .heat_pump_simulation import SimulationTypes
|
from .heat_pump_simulation import SimulationTypes
|
||||||
from .heat_pump_simulation import HeatPumpTypes
|
from .heat_pump_simulation import HeatPumpTypes
|
||||||
|
from .user import User, UserRoles
|
||||||
|
|
|
@ -1,31 +0,0 @@
|
||||||
"""
|
|
||||||
Model representation of a building
|
|
||||||
SPDX - License - Identifier: LGPL - 3.0 - or -later
|
|
||||||
Copyright © 2022 Concordia CERC group
|
|
||||||
Project Coder Peter Yefi peteryefi@gmail.com
|
|
||||||
"""
|
|
||||||
|
|
||||||
from sqlalchemy import Column, Integer, String, Sequence, Float, ForeignKey
|
|
||||||
from sqlalchemy.dialects.postgresql import JSONB
|
|
||||||
from persistence.db_config import Base
|
|
||||||
|
|
||||||
|
|
||||||
class Building(Base):
|
|
||||||
"""A model representation of a building
|
|
||||||
|
|
||||||
Attributes:
|
|
||||||
city_id A reference to the city which has this building.
|
|
||||||
name The name of the building.
|
|
||||||
construction_year The year of construction of the building.
|
|
||||||
function The function (e.g. residential) of the building.
|
|
||||||
floor_area The computed area of the floor of the building.
|
|
||||||
data A JSON object which contain other data (like roof, walls, zones, etc) of the building
|
|
||||||
"""
|
|
||||||
__tablename__ = "building"
|
|
||||||
id = Column(Integer, Sequence('building_id_seq'), primary_key=True)
|
|
||||||
city_id = Column(Integer, ForeignKey('city.id'), nullable=False)
|
|
||||||
name = Column(String, nullable=False)
|
|
||||||
construction_year = Column(Integer, nullable=False)
|
|
||||||
function = Column(String, nullable=False)
|
|
||||||
floor_area = Column(Float, nullable=False)
|
|
||||||
data = Column(JSONB, nullable=False)
|
|
45
persistence/models/user.py
Normal file
45
persistence/models/user.py
Normal file
|
@ -0,0 +1,45 @@
|
||||||
|
"""
|
||||||
|
Model representation of a User
|
||||||
|
SPDX - License - Identifier: LGPL - 3.0 - or -later
|
||||||
|
Copyright © 2022 Concordia CERC group
|
||||||
|
Project Coder Peter Yefi peteryefi@gmail.com
|
||||||
|
"""
|
||||||
|
|
||||||
|
from sqlalchemy import Column, Integer, String, Sequence
|
||||||
|
from sqlalchemy import DateTime, Enum
|
||||||
|
from persistence.db_config import Base
|
||||||
|
import datetime
|
||||||
|
from sqlalchemy.orm import validates
|
||||||
|
import re
|
||||||
|
import enum
|
||||||
|
|
||||||
|
|
||||||
|
class UserRoles(enum.Enum):
|
||||||
|
Admin = 'ADMIN'
|
||||||
|
HubReader = 'HUB_READER'
|
||||||
|
|
||||||
|
|
||||||
|
class User(Base):
|
||||||
|
"""A model representation of a city
|
||||||
|
"""
|
||||||
|
__tablename__ = "user"
|
||||||
|
id = Column(Integer, Sequence('user_id_seq'), primary_key=True)
|
||||||
|
name = Column(String, nullable=False)
|
||||||
|
email = Column(String, nullable=False, unique=True)
|
||||||
|
password = Column(String, nullable=False)
|
||||||
|
role = Column(Enum(UserRoles), nullable=False, default=UserRoles.HubReader)
|
||||||
|
created = Column(DateTime, default=datetime.datetime.utcnow)
|
||||||
|
updated = Column(DateTime, default=datetime.datetime.utcnow)
|
||||||
|
|
||||||
|
@validates("email")
|
||||||
|
def validate_email(self, key, address):
|
||||||
|
pattern = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'
|
||||||
|
if not re.match(pattern, address):
|
||||||
|
raise ValueError("failed simple email validation")
|
||||||
|
return address
|
||||||
|
|
||||||
|
def __init__(self, name, email, password, role):
|
||||||
|
self.name = name
|
||||||
|
self.email = email
|
||||||
|
self.password = password
|
||||||
|
self.role = role
|
|
@ -1 +1 @@
|
||||||
|
from .user_repo import UserRepo
|
||||||
|
|
|
@ -1,92 +0,0 @@
|
||||||
"""
|
|
||||||
Building repository with database CRUD operations
|
|
||||||
SPDX - License - Identifier: LGPL - 3.0 - or -later
|
|
||||||
Copyright © 2022 Concordia CERC group
|
|
||||||
Project Coder Peter Yefi peteryefi@gmail.com
|
|
||||||
"""
|
|
||||||
|
|
||||||
from city_model_structure.building import Building as CityBuilding
|
|
||||||
from sqlalchemy.exc import SQLAlchemyError
|
|
||||||
from persistence.models import Building
|
|
||||||
from persistence import BaseRepo
|
|
||||||
from helpers.city_util import CityUtil
|
|
||||||
from sqlalchemy import select
|
|
||||||
|
|
||||||
|
|
||||||
class BuildingRepo(BaseRepo):
|
|
||||||
|
|
||||||
def __init__(self, db_name):
|
|
||||||
super().__init__(db_name)
|
|
||||||
self._city_util = CityUtil()
|
|
||||||
|
|
||||||
def insert(self, building: CityBuilding, city_id: int) -> Building:
|
|
||||||
"""
|
|
||||||
Inserts a new building into the database
|
|
||||||
:param building: the building to insert
|
|
||||||
:param city_id: the city the building belongs to
|
|
||||||
:return:
|
|
||||||
"""
|
|
||||||
|
|
||||||
model_building = Building()
|
|
||||||
model_building.name = building.name
|
|
||||||
model_building.function = building.function
|
|
||||||
model_building.construction_year = building.year_of_construction
|
|
||||||
model_building.floor_area = building.floor_area
|
|
||||||
model_building.city_id = city_id
|
|
||||||
model_building.data = self._city_util.extract_building_data(building)
|
|
||||||
try:
|
|
||||||
self.session.add(model_building)
|
|
||||||
self.session.flush()
|
|
||||||
self.session.commit()
|
|
||||||
return model_building
|
|
||||||
except SQLAlchemyError as err:
|
|
||||||
print(f'Error while adding building: {err}')
|
|
||||||
|
|
||||||
def get_by_id(self, building_id: int) -> Building:
|
|
||||||
"""
|
|
||||||
Fetch a building based on the id
|
|
||||||
:param building_id: the building id
|
|
||||||
:return: a Building
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
return self.session.execute(select(Building).where(Building.id == building_id)).first()[0]
|
|
||||||
except SQLAlchemyError as err:
|
|
||||||
print(f'Error while fetching building: {err}')
|
|
||||||
|
|
||||||
def get_by_name(self, building_name: str) -> [Building]:
|
|
||||||
"""
|
|
||||||
Fetch a building based on the name
|
|
||||||
:param building_name: the name of the building
|
|
||||||
:return: [Building] with the provided name
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
result_set = self.session.execute(select(Building).where(Building.name == building_name))
|
|
||||||
return [building[0] for building in result_set]
|
|
||||||
except SQLAlchemyError as err:
|
|
||||||
print(f'Error while fetching buildings: {err}')
|
|
||||||
|
|
||||||
def get_by_construction_year(self, year: int):
|
|
||||||
"""
|
|
||||||
Fetch a building based on the year of construction
|
|
||||||
:param year: the construction year of the building
|
|
||||||
:return: [Building]
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
result_set = self.session.execute(select(Building).where(Building.construction_year == year))
|
|
||||||
return [building[0] for building in result_set]
|
|
||||||
except SQLAlchemyError as err:
|
|
||||||
print(f'Error while fetching buildings: {err}')
|
|
||||||
|
|
||||||
def get_by_city(self, city_id: int):
|
|
||||||
"""
|
|
||||||
Get all the buildings in a city
|
|
||||||
:param city_id: the city id
|
|
||||||
:return: [Building]
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
result_set = self.session.execute(select(Building).where(Building.city_id == city_id))
|
|
||||||
return [building[0] for building in result_set]
|
|
||||||
except SQLAlchemyError as err:
|
|
||||||
print(f'Error while fetching buildings: {err}')
|
|
||||||
|
|
||||||
|
|
101
persistence/repositories/user_repo.py
Normal file
101
persistence/repositories/user_repo.py
Normal file
|
@ -0,0 +1,101 @@
|
||||||
|
"""
|
||||||
|
City repository with database CRUD operations
|
||||||
|
SPDX - License - Identifier: LGPL - 3.0 - or -later
|
||||||
|
Copyright © 2022 Concordia CERC group
|
||||||
|
Project Coder Peter Yefi peteryefi@gmail.com
|
||||||
|
"""
|
||||||
|
|
||||||
|
from persistence import BaseRepo
|
||||||
|
from sqlalchemy.exc import SQLAlchemyError
|
||||||
|
from sqlalchemy import select
|
||||||
|
from persistence.models import User
|
||||||
|
from persistence.models import UserRoles
|
||||||
|
from helpers.auth import Auth
|
||||||
|
from typing import Union, Dict
|
||||||
|
|
||||||
|
|
||||||
|
class UserRepo(BaseRepo):
|
||||||
|
_instance = None
|
||||||
|
|
||||||
|
def __init__(self, db_name: str, dotenv_path: str, app_env: str):
|
||||||
|
super().__init__(db_name, dotenv_path, app_env)
|
||||||
|
|
||||||
|
def __new__(cls, db_name, dotenv_path, app_env):
|
||||||
|
"""
|
||||||
|
Implemented for a singleton pattern
|
||||||
|
"""
|
||||||
|
if cls._instance is None:
|
||||||
|
cls._instance = super(UserRepo, cls).__new__(cls)
|
||||||
|
return cls._instance
|
||||||
|
|
||||||
|
def insert(self, name: str, email: str, password: str, role: UserRoles) -> Union[User, Dict]:
|
||||||
|
user = self.get_by_email(email)
|
||||||
|
if user is None:
|
||||||
|
try:
|
||||||
|
if Auth.validate_password(password):
|
||||||
|
user = User(name=name, email=email, password=Auth.hash_password(password), role=role)
|
||||||
|
self.session.add(user)
|
||||||
|
self.session.flush()
|
||||||
|
self.session.commit()
|
||||||
|
return user
|
||||||
|
except SQLAlchemyError as err:
|
||||||
|
print(f'An error occured while creating user: {err}')
|
||||||
|
else:
|
||||||
|
return {'message': f'user with {email} email already exists'}
|
||||||
|
|
||||||
|
def update(self, user_id: int, name: str, email: str, password: str, role: UserRoles):
|
||||||
|
"""
|
||||||
|
Updates a user
|
||||||
|
:param user_id: the id of the user to be updated
|
||||||
|
:param name: the name of the user
|
||||||
|
:param email: the email of the user
|
||||||
|
:param password: the password of the user
|
||||||
|
:param role: the role of the user
|
||||||
|
:return:
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
if Auth.validate_password(password):
|
||||||
|
self.session.query(User).filter(User.id == user_id) \
|
||||||
|
.update({'name': name, 'email': email, 'password': Auth.hash_password(password), 'role': role})
|
||||||
|
self.session.commit()
|
||||||
|
except SQLAlchemyError as err:
|
||||||
|
print(f'Error while updating user: {err}')
|
||||||
|
|
||||||
|
def get_by_email(self, email: str) -> [User]:
|
||||||
|
"""
|
||||||
|
Fetch user based on the email address
|
||||||
|
:param email: the email of the user
|
||||||
|
:return: [User] with the provided email
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
return self.session.execute(select(User).where(User.email == email)).first()
|
||||||
|
except SQLAlchemyError as err:
|
||||||
|
print(f'Error while fetching user by email: {err}')
|
||||||
|
|
||||||
|
def delete_user(self, user_id: int):
|
||||||
|
"""
|
||||||
|
Deletes a user with the id
|
||||||
|
:param user_id: the user id
|
||||||
|
:return: None
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
self.session.query(User).filter(User.id == user_id).delete()
|
||||||
|
self.session.commit()
|
||||||
|
except SQLAlchemyError as err:
|
||||||
|
print(f'Error while fetching user: {err}')
|
||||||
|
|
||||||
|
def get_user_by_email_and_password(self, email: str, password: str) -> [User]:
|
||||||
|
"""
|
||||||
|
Fetch user based on the email and password
|
||||||
|
:param email: the email of the user
|
||||||
|
:param password: the password of the user
|
||||||
|
:return: [User] with the provided email and password
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
user = self.session.execute(select(User).where(User.email == email)).first()
|
||||||
|
if user:
|
||||||
|
if Auth.check_password(password, user[0].password):
|
||||||
|
return user
|
||||||
|
return {'message': 'user not found'}
|
||||||
|
except SQLAlchemyError as err:
|
||||||
|
print(f'Error while fetching user by email: {err}')
|
|
@ -18,3 +18,4 @@ PyYAML
|
||||||
pyecore==0.12.2
|
pyecore==0.12.2
|
||||||
python-dotenv
|
python-dotenv
|
||||||
SQLAlchemy
|
SQLAlchemy
|
||||||
|
bcrypt==4.0.1
|
||||||
|
|
Loading…
Reference in New Issue
Block a user