import geopandas, os, datetime, typer from sqlalchemy import create_engine from geoalchemy2 import Geometry, WKTElement from .printers import error_printer, success_printer def file_validator(file): if not file.exists(): error_printer("File not found") raise typer.Exit() try: f = open(file, 'r', encoding='utf-8') success_printer("File Opened") except: error_printer("Unable to read file") raise typer.Exit() count = sum(1 for _ in f) if count == 0: error_printer("File empty") raise typer.Exit() else: success_printer(f"{count + 1} lines found") f.close() def buffer_creator(file,divider,start_line, chunk_size): buffer = [] line_number = start_line current_line = 0 divider_count = 0 with open(file,'r',encoding='utf-8') as f: for line in f: current_line += 1 if (current_line <= line_number): continue if (line.strip()== divider): divider_count = divider_count + 1 if divider_count == chunk_size: break continue buffer.append(line.strip()) return current_line,(' ').join(buffer) def push_to_db_coords(name, data, mode): if not isinstance(data, geopandas.GeoDataFrame): GDF = geopandas.GeoDataFrame(data, crs='EPSG:4326') else: GDF = data GDF['geom'] = GDF['coordinates'].apply(lambda x: WKTElement(x.wkt, srid=int(os.getenv("SRID")))) engine = create_engine( f'postgresql://{os.getenv("USER")}:{os.getenv("PASS")}@{os.getenv("HOST_NAME")}/{os.getenv("DATA_BASE")}', echo=False ) GDF.to_sql( name=name, con=engine, if_exists=mode, chunksize=int(os.getenv("CHUNK_SIZE")), dtype={'geom': Geometry('Point', srid=int(os.getenv("SRID")))}, index=False ) def push_to_db_linestring(name, data, mode): data = data.to_crs('EPSG:4326') data['geom'] = data['geometry'].apply(lambda x: WKTElement(x.wkt, srid=int(os.getenv("SRID")))) data = data.drop(columns=['geometry']) engine = create_engine( f'postgresql://{os.getenv("USER")}:{os.getenv("PASS")}@{os.getenv("HOST_NAME")}/{os.getenv("DATA_BASE")}', echo=False ) data.to_sql( name=name, con=engine, if_exists=mode, chunksize=int(os.getenv("CHUNK_SIZE")), dtype={'geom': Geometry('LINESTRING', srid=int(os.getenv("SRID")))}, index=False ) def write_to_csv(name,data, file): print(file) directory = file.parent id = datetime.datetime.now().strftime("%Y%m%d") csv = directory / (file.stem + f"-{name}-{id}.csv") if csv.exists(): data.to_csv(csv, mode='a',index=False) else: data.to_csv(csv,index=False) def push_to_db(name,df,pushmode): name_type = name.split("-") if "stations" in name_type: push_to_db_coords(name,df,pushmode) if "lines" in name_type: push_to_db_linestring(name,df,pushmode)