zele-utils/functions/helpers.py

90 lines
2.9 KiB
Python
Raw Permalink Normal View History

2024-09-15 17:04:21 -04:00
import geopandas, os, datetime, typer
2024-09-11 10:56:17 -04:00
from sqlalchemy import create_engine
from geoalchemy2 import Geometry, WKTElement
2024-09-15 17:04:21 -04:00
from .printers import error_printer, success_printer
def file_validator(file):
if not file.exists():
error_printer("File not found")
raise typer.Exit()
try:
f = open(file, 'r', encoding='utf-8')
success_printer("File Opened")
except:
error_printer("Unable to read file")
raise typer.Exit()
count = sum(1 for _ in f)
if count == 0:
error_printer("File empty")
raise typer.Exit()
else:
success_printer(f"{count + 1} lines found")
f.close()
2024-09-11 10:56:17 -04:00
2024-09-10 17:40:50 -04:00
def buffer_creator(file,divider,start_line, chunk_size):
buffer = []
line_number = start_line
current_line = 0
divider_count = 0
with open(file,'r',encoding='utf-8') as f:
for line in f:
current_line += 1
if (current_line <= line_number): continue
if (line.strip()== divider):
divider_count = divider_count + 1
if divider_count == chunk_size: break
continue
buffer.append(line.strip())
return current_line,(' ').join(buffer)
2024-09-11 10:56:17 -04:00
2024-09-15 17:04:21 -04:00
def push_to_db_coords(name, data, mode):
if not isinstance(data, geopandas.GeoDataFrame):
GDF = geopandas.GeoDataFrame(data, crs='EPSG:4326')
else:
GDF = data
GDF['geom'] = GDF['coordinates'].apply(lambda x: WKTElement(x.wkt, srid=int(os.getenv("SRID"))))
engine = create_engine(
f'postgresql://{os.getenv("USER")}:{os.getenv("PASS")}@{os.getenv("HOST_NAME")}/{os.getenv("DATA_BASE")}',
echo=False
)
2024-09-11 10:56:17 -04:00
GDF.to_sql(
name=name,
con=engine,
if_exists=mode,
2024-09-15 17:04:21 -04:00
chunksize=int(os.getenv("CHUNK_SIZE")),
dtype={'geom': Geometry('Point', srid=int(os.getenv("SRID")))},
index=False
)
def push_to_db_linestring(name, data, mode):
data = data.to_crs('EPSG:4326')
data['geom'] = data['geometry'].apply(lambda x: WKTElement(x.wkt, srid=int(os.getenv("SRID"))))
data = data.drop(columns=['geometry'])
engine = create_engine(
f'postgresql://{os.getenv("USER")}:{os.getenv("PASS")}@{os.getenv("HOST_NAME")}/{os.getenv("DATA_BASE")}',
echo=False
)
data.to_sql(
name=name,
con=engine,
if_exists=mode,
chunksize=int(os.getenv("CHUNK_SIZE")),
dtype={'geom': Geometry('LINESTRING', srid=int(os.getenv("SRID")))},
2024-09-11 10:56:17 -04:00
index=False
)
def write_to_csv(name,data, file):
2024-09-16 08:38:38 -04:00
print(file)
2024-09-11 10:56:17 -04:00
directory = file.parent
id = datetime.datetime.now().strftime("%Y%m%d")
csv = directory / (file.stem + f"-{name}-{id}.csv")
if csv.exists():
data.to_csv(csv, mode='a',index=False)
else:
2024-09-16 08:38:38 -04:00
data.to_csv(csv,index=False)
def push_to_db(name,df,pushmode):
name_type = name.split("-")
if "stations" in name_type: push_to_db_coords(name,df,pushmode)
if "lines" in name_type: push_to_db_linestring(name,df,pushmode)