network functions added pt.1

This commit is contained in:
Kian 2024-09-11 10:56:17 -04:00
parent 3541f9f10e
commit e8dda40c9f
5 changed files with 132 additions and 48 deletions

View File

@ -1,11 +1,11 @@
from .population import process_buffer_population, process_travels, push_population,write_population from .population import process_buffer_population, process_travels, push_to_db_coords,write_to_csv
from .network import process_network, push_network, network_write from .network import process_network, push_network, network_write
from .metro import process_metro, push_metro, metro_write from .metro import process_metro, push_metro, metro_write
from .bus import process_bus, push_bus, bus_write from .bus import process_bus, push_bus, bus_write
from .helpers import buffer_creator from .helpers import buffer_creator
__all__ = [ __all__ = [
'process_buffer_population', 'process_travels','push_population', 'write_population', 'process_buffer_population', 'process_travels','push_to_db_coords', 'write_to_csv',
'process_network', 'push_network', 'network_write', 'process_network', 'push_network', 'network_write',
'process_metro', 'push_metro', 'metro_write', 'process_metro', 'push_metro', 'metro_write',
'process_bus', 'push_bus', 'bus_write', 'process_bus', 'push_bus', 'bus_write',

View File

@ -1,3 +1,7 @@
import geopandas, os, datetime
from sqlalchemy import create_engine
from geoalchemy2 import Geometry, WKTElement
def buffer_creator(file,divider,start_line, chunk_size): def buffer_creator(file,divider,start_line, chunk_size):
buffer = [] buffer = []
line_number = start_line line_number = start_line
@ -14,3 +18,24 @@ def buffer_creator(file,divider,start_line, chunk_size):
buffer.append(line.strip()) buffer.append(line.strip())
return current_line,(' ').join(buffer) return current_line,(' ').join(buffer)
def push_to_db_coords(name,data,mode):
GDF = geopandas.GeoDataFrame(data, crs='EPSG:4326')
GDF['geom'] = GDF['coordinates'].apply(lambda x: WKTElement(x.wkt, srid=os.getenv("SRID")))
engine = create_engine(f'postgresql://{os.getenv("USER")}:{os.getenv("PASS")}@{os.getenv("HOST_NAME")}/{os.getenv("DATA_BASE")}', echo=False)
GDF.to_sql(
name=name,
con=engine,
if_exists=mode,
chunksize=os.getenv("CHUNK_SIZE"),
dtype={'geom': Geometry('Point', srid=os.getenv("SRID"))},
index=False
)
def write_to_csv(name,data, file):
directory = file.parent
id = datetime.datetime.now().strftime("%Y%m%d")
csv = directory / (file.stem + f"-{name}-{id}.csv")
if csv.exists():
data.to_csv(csv, mode='a',index=False)
else:
data.to_csv(csv,index=False)

View File

@ -1,9 +1,32 @@
from bs4 import BeautifulSoup
import pandas,pyproj, re
from shapely.geometry import Point
def process_network(data, cleandata): def process_nodes(data):
print(data, cleandata) ELEMENT_LIST = []
elements = BeautifulSoup(data,'lxml-xml')
for element in elements.find_all("node"):
ELEMENT_LIST.append(dict(element.attrs))
return pandas.DataFrame(ELEMENT_LIST)
def push_network(data,mode): def process_links(data):
print(data,mode) ELEMENT_LIST = []
elements = BeautifulSoup(data,'lxml-xml')
for element in elements.find_all("link"):
ELEMENT_LIST.append(dict(element.attrs))
return pandas.DataFrame(ELEMENT_LIST)
def network_write(data): def process_links_attr(data):
print(data) ELEMENT_LIST = []
elements = BeautifulSoup(data,'lxml-xml')
for element in elements.find_all("link"):
ELEMENT_DICT = {}
if element.find_all("attribute"):
for attr in element.find_all("attribute"):
ELEMENT_DICT.update({attr["name"]: attr.get_text()})
else:
continue
ELEMENT_DICT["id"]=element.getattr("id")
ELEMENT_LIST.append(ELEMENT_DICT)
return pandas.DataFrame(ELEMENT_LIST)

View File

@ -1,8 +1,6 @@
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
import pandas,geopandas, pyproj, re, os, datetime import pandas,pyproj, re
from shapely.geometry import Point from shapely.geometry import Point
from sqlalchemy import create_engine
from geoalchemy2 import Geometry, WKTElement
def camel_to_snake(name): def camel_to_snake(name):
return re.sub(r'(?<!^)(?=[A-Z])', '_', name).lower() return re.sub(r'(?<!^)(?=[A-Z])', '_', name).lower()
@ -52,24 +50,3 @@ def process_travels(data):
return pandas.DataFrame(activities_list) return pandas.DataFrame(activities_list)
def push_population(name,data,mode):
GDF = geopandas.GeoDataFrame(data, crs='EPSG:4326')
GDF['geom'] = GDF['coordinates'].apply(lambda x: WKTElement(x.wkt, srid=os.getenv("SRID")))
engine = create_engine(f'postgresql://{os.getenv("USER")}:{os.getenv("PASS")}@{os.getenv("HOST_NAME")}/{os.getenv("DATA_BASE")}', echo=False)
GDF.to_sql(
name=name,
con=engine,
if_exists=mode,
chunksize=os.getenv("CHUNK_SIZE"),
dtype={'geom': Geometry('Point', srid=os.getenv("SRID"))},
index=False
)
def write_population(name,data, file):
directory = file.parent
id = datetime.datetime.now().strftime("%Y%m%d")
csv = directory / (file.stem + f"-{name}-{id}.csv")
if csv.exists():
data.to_csv(csv, mode='a',index=False)
else:
data.to_csv(csv,index=False)

89
main.py
View File

@ -10,7 +10,8 @@ from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, TaskPr
import time import time
from classes import City, DBMode, RTMode from classes import City, DBMode, RTMode
from functions import buffer_creator, process_buffer_population,process_travels, push_population, write_population from functions import buffer_creator, process_buffer_population,process_travels, push_to_db_coords, write_to_csv
from functions import process_nodes,process_links,process_links_attr
from styles import print_help from styles import print_help
called= "population" called= "population"
@ -112,16 +113,16 @@ def population(
if cleandata: if cleandata:
dataframe = dataframe.dropna() dataframe = dataframe.dropna()
if push: if push:
push_population("agents",dataframe, mode) push_to_db_coords("agents",dataframe, mode)
else: else:
write_population("agents",dataframe, file) write_to_csv("agents",dataframe, file)
if "travels" in common_tables: if "travels" in common_tables:
dataframe_travels = process_travels(buffer) dataframe_travels = process_travels(buffer)
if push: if push:
push_population("travels",dataframe_travels, mode) push_to_db_coords("travels",dataframe_travels, mode)
else: else:
write_population("travels",dataframe_travels, file) write_to_csv("travels",dataframe_travels, file)
if log: if log:
f = open(log_file, "a") f = open(log_file, "a")
@ -135,19 +136,77 @@ def population(
@app.command() @app.command()
def network( def network(
file: Annotated[Path, typer.Argument(help="Relative path to the file.", show_default=False)], file: Annotated[Path, typer.Argument(help="Provide the relative path to the [yellow bold underline]XML file[/yellow bold underline].", show_default=False)],
cleandata: bool = typer.Option(False, "--cleandata", "-cd", help="Clean the data if this flag is used."), tables: list[str] = typer.Argument(..., help="Tables to include: [underline bold]nodes[/underline bold], [underline bold]links[/underline bold], [underline bold]links_attr[/underline bold]. Use [underline bold]all[/underline bold] for everything.",show_default=False),
push: bool = typer.Option(False, "--push", "-p", help="Push the data into Database.\nIf you want the output to be saved in [green bold].csv[/green bold] format, do not mention this flag."), cleandata: bool = typer.Option(False, "--cleandata", "-cd", help="Drop the rows that have missing values."),
mode: Optional[DBMode] = typer.Option(None, help="Specify either 'amend' or 'drop' when pushing data", show_default=False), push: bool = typer.Option(False, "--push", "-p", help="Save the output directly to the database When mentioned. Otherwise, Saves as a [green bold]CSV file[/green bold] in the input directory]"),
mode: Optional[DBMode] = typer.Option(None, help="Specify either [underline]'append'[/underline] or [underline]'drop'[/underline] when pushing data", show_default=False),
): ):
if not file.exists(): console = Console()
error_parser("File did does not exist!") all_tables = ["nodes","links","links_attr"]
common_tables = [item for item in tables if item in ["all"] + all_tables]
if len(common_tables) == 0:
error_printer("Incorrect table input")
raise typer.Exit()
elif "all" in common_tables:
common_tables = all_tables
info_printer(f"Tables to inlude: {common_tables}")
if not file.exists():
error_printer("File not found")
raise typer.Exit()
try:
f = open(file, 'r', encoding='utf-8')
success_printer("File Opened")
except:
error_printer("Unable to read file")
raise typer.Exit()
count = sum(1 for _ in f)
if count == 0:
error_printer("File empty")
raise typer.Exit() raise typer.Exit()
data = process_network(file,cleandata)
if push:
push_network(data, mode)
else: else:
network_write(data) success_printer(f"{count + 1} lines read")
f.close()
BUFFER = []
DEVIDER_COUNT = 0
with open(file,'r',encoding='utf-8') as f:
for line in f:
if line.strip() == os.getenv("DIVIDER"):
DEVIDER_COUNT = DEVIDER_COUNT + 1
if DEVIDER_COUNT == 2 and "nodes" in common_tables:
dataframe = process_nodes(BUFFER)
if cleandata:
dataframe = dataframe.dropna()
if push:
push_to_db_coords("nodes", dataframe, mode)
else:
write_to_csv("nodes", dataframe,file)
BUFFER = []
if DEVIDER_COUNT == 3:
if "links" in common_tables:
dataframe = process_links(BUFFER)
if cleandata:
dataframe = dataframe.dropna()
if push:
push_to_db_coords("links", dataframe, mode)
else:
write_to_csv("links", dataframe, file)
if "links_attr" in common_tables:
dataframe = process_links_attr(BUFFER)
if cleandata:
dataframe = dataframe.dropna()
if push:
push_to_db_coords("links_attr", dataframe, mode)
else:
write_to_csv("links_attr", dataframe, file)
BUFFER = []
if DEVIDER_COUNT < 1:
continue
if DEVIDER_COUNT > 2:
continue
BUFFER.append(line)
console.print("[green]Processing complete![/green]")
@app.command() @app.command()
def metro( def metro(