mirror of
https://github.com/knejadshamsi/zele-utils.git
synced 2024-11-14 09:40:26 -05:00
261 lines
14 KiB
Python
261 lines
14 KiB
Python
import os,math,typer
|
|
from bs4 import BeautifulSoup
|
|
from dotenv import load_dotenv
|
|
from rich import print
|
|
from typing_extensions import Annotated
|
|
from typing import Optional
|
|
from pathlib import Path
|
|
from typing import Tuple
|
|
from rich.console import Console
|
|
from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, TaskProgressColumn
|
|
import time
|
|
|
|
from classes import City, DBMode, RTMode
|
|
from functions import file_validator,buffer_creator, process_buffer_population,process_travels, push_to_db_coords, write_to_csv, push_to_db
|
|
from functions import process_nodes,process_links,process_links_attr
|
|
from functions import error_printer,success_printer,info_printer,notice_printer
|
|
from functions import metro_processing, bus_processing
|
|
from styles import print_help
|
|
|
|
called= "population"
|
|
app = typer.Typer(rich_markup_mode="rich")
|
|
load_dotenv()
|
|
|
|
@app.command(print_help())
|
|
def population(
|
|
file: Annotated[Path, typer.Argument(help="Provide the relative path to the [yellow bold underline]XML file[/yellow bold underline].", show_default=False)],
|
|
tables: list[str] = typer.Argument(..., help="Tables to include: [underline bold]agents[/underline bold], [underline bold]travels[/underline bold]. Use [underline bold]all[/underline bold] for everything.",show_default=False),
|
|
range: Tuple[int, int] = typer.Option(None, "--range", "-r", help="Specify the start and end of the chunk range to be processed.", show_default=False),
|
|
log: bool = typer.Option(False, "--log", "-l", help="Creates a Log file in the same directory to track the progress. Useful for large files that might be intrupted."),
|
|
cleandata: bool = typer.Option(False, "--cleandata", "-cd", help="Drop the rows that have missing values."),
|
|
push: bool = typer.Option(False, "--push", "-p", help="Save the output directly to the database When mentioned. Otherwise, Saves as a [green bold]CSV file[/green bold] in the input directory]"),
|
|
mode: Optional[DBMode] = typer.Option(None, help="Specify either [underline]'append'[/underline] or [underline]'drop'[/underline] when pushing data", show_default=False),
|
|
):
|
|
console = Console()
|
|
all_tables = ["agents","travels"]
|
|
common_tables = [item for item in tables if item in ["all"] + all_tables]
|
|
if len(common_tables) == 0:
|
|
error_printer("Incorrect table input")
|
|
raise typer.Exit()
|
|
elif "all" in common_tables:
|
|
common_tables = all_tables
|
|
info_printer(f"Tables to inlude: {common_tables}")
|
|
file_validator(file)
|
|
with open(file,'r',encoding='utf-8') as f:
|
|
for line in f:
|
|
if line.strip() == os.getenv("DIVIDER"):
|
|
max_chunk = max_chunk + 1
|
|
if max_chunk > 0:
|
|
success_printer(f"{max_chunk} Chunks found")
|
|
elif max_chunk == 0:
|
|
error_printer("Unable to find Chunks")
|
|
raise typer.Exit()
|
|
if not range:
|
|
range = [0,max_chunk-2]
|
|
info_printer(f"Chunk Range: {range}")
|
|
directory = file.parent
|
|
log_file = directory / (file.stem + ".log")
|
|
if not log:
|
|
notice_printer("Log file not created")
|
|
else:
|
|
if log_file.exists():
|
|
notice_printer(f"Log file {log_file} already exists")
|
|
else:
|
|
log_file.touch()
|
|
info_printer(f"Log file {log_file} created")
|
|
|
|
with Progress(
|
|
SpinnerColumn(),
|
|
TextColumn("[progress.description]{task.description}"),
|
|
BarColumn(),
|
|
TaskProgressColumn(),
|
|
console=console
|
|
) as progress:
|
|
if max_chunk > 2:
|
|
task = progress.add_task("[cyan]Processing chunks...", total=max_chunk)
|
|
else:
|
|
task = progress.add_task("[cyan]Processing chunks...", total=max_chunk, visible=False)
|
|
|
|
current_chunk = 0
|
|
processed_line = 0
|
|
if log:
|
|
with open(log_file,'r',encoding='utf-8') as l:
|
|
log_list = l.read().splitlines()
|
|
while current_chunk < max_chunk:
|
|
if current_chunk < range[0] or current_chunk > range[1]:
|
|
processed_line, buffer = buffer_creator(file, os.getenv("DIVIDER"), processed_line, int(os.getenv("CHUNK_SIZE")))
|
|
current_chunk += 1
|
|
continue
|
|
if log and current_chunk in log_list: continue
|
|
processed_line, buffer = buffer_creator(file, os.getenv("DIVIDER"), processed_line, int(os.getenv("CHUNK_SIZE")))
|
|
if "agents" in common_tables:
|
|
dataframe = process_buffer_population(buffer)
|
|
if cleandata:
|
|
dataframe = dataframe.dropna()
|
|
if push:
|
|
push_to_db_coords("agents",dataframe, mode)
|
|
else:
|
|
write_to_csv("agents",dataframe, file)
|
|
|
|
if "travels" in common_tables:
|
|
dataframe_travels = process_travels(buffer)
|
|
if push:
|
|
push_to_db_coords("travels",dataframe_travels, mode)
|
|
else:
|
|
write_to_csv("travels",dataframe_travels, file)
|
|
|
|
if log:
|
|
f = open(log_file, "a")
|
|
f.write(f"\n{current_chunk}")
|
|
f.close()
|
|
current_chunk += 1
|
|
time.sleep(2)
|
|
progress.update(task, advance=1)
|
|
progress.update(task, visible=False)
|
|
console.print("[green]Processing complete![/green]")
|
|
|
|
@app.command()
|
|
def network(
|
|
file: Annotated[Path, typer.Argument(help="Provide the relative path to the [yellow bold underline]XML file[/yellow bold underline].", show_default=False)],
|
|
tables: list[str] = typer.Argument(..., help="Tables to include: [underline bold]nodes[/underline bold], [underline bold]links[/underline bold], [underline bold]links_attr[/underline bold]. Use [underline bold]all[/underline bold] for everything.",show_default=False),
|
|
cleandata: bool = typer.Option(False, "--cleandata", "-cd", help="Drop the rows that have missing values."),
|
|
push: bool = typer.Option(False, "--push", "-p", help="Save the output directly to the database When mentioned. Otherwise, Saves as a [green bold]CSV file[/green bold] in the input directory]"),
|
|
mode: Optional[DBMode] = typer.Option(None, help="Specify either [underline]'append'[/underline] or [underline]'drop'[/underline] when pushing data", show_default=False),
|
|
):
|
|
console = Console()
|
|
all_tables = ["nodes","links","links_attr"]
|
|
common_tables = [item for item in tables if item in ["all"] + all_tables]
|
|
if len(common_tables) == 0:
|
|
error_printer("Incorrect table input")
|
|
raise typer.Exit()
|
|
elif "all" in common_tables:
|
|
common_tables = all_tables
|
|
info_printer(f"Tables to inlude: {common_tables}")
|
|
file_validator(file)
|
|
BUFFER = []
|
|
DEVIDER_COUNT = 0
|
|
with Progress(
|
|
SpinnerColumn(),
|
|
TextColumn("[progress.description]{task.description}"),
|
|
BarColumn(),
|
|
TaskProgressColumn(),
|
|
console=console
|
|
) as progress:
|
|
with open(file,'r',encoding='utf-8') as f:
|
|
for line in f:
|
|
if line.strip() == os.getenv("DIVIDER"):
|
|
DEVIDER_COUNT = DEVIDER_COUNT + 1
|
|
if DEVIDER_COUNT == 1 or DEVIDER_COUNT > 3: continue
|
|
if DEVIDER_COUNT == 2:
|
|
if "node" not in common_tables:
|
|
BUFFER.clear()
|
|
continue
|
|
else:
|
|
try:
|
|
element = BeautifulSoup((' ').join(BUFFER), 'lxml-xml')
|
|
total_nodes = element.find_all("node")
|
|
except:
|
|
error_printer("node process failed")
|
|
total_chunks = math.ceil(len(total_nodes)/int(os.getenv("CHUNK_SIZE")))
|
|
if total_chunks > 2:
|
|
node_task = progress.add_task("[cyan]Processing [bold]nodes[/bold] chunks...", total=total_chunks)
|
|
else:
|
|
node_task = progress.add_task("[cyan]Processing [bold]nodes[/bold] chunks...", total=total_chunks, visible=False)
|
|
success_printer(f"Chunk count: {total_chunks}")
|
|
currernt_chunks= 0
|
|
while currernt_chunks < total_chunks:
|
|
size = int(os.getenv("CHUNK_SIZE"))
|
|
new_Chunk = total_nodes[currernt_chunks*size:min(len(total_nodes),(currernt_chunks+1)*size)]
|
|
dataframe = process_nodes(new_Chunk)
|
|
if cleandata:
|
|
dataframe = dataframe.dropna()
|
|
if push:
|
|
push_to_db_coords("nodes", dataframe, mode)
|
|
else:
|
|
write_to_csv("nodes", dataframe,file)
|
|
progress.update(node_task, advance=1)
|
|
currernt_chunks =+ 1
|
|
BUFFER.clear()
|
|
if DEVIDER_COUNT == 3:
|
|
try:
|
|
element = BeautifulSoup((' ').join(BUFFER), 'lxml-xml')
|
|
total_links = element.find_all("link")
|
|
except:
|
|
error_printer("node process failed")
|
|
total_chunks = math.ceil(len(total_links)/int(os.getenv("CHUNK_SIZE")))
|
|
success_printer(f"Chunk count: {total_chunks}")
|
|
if total_chunks > 2:
|
|
link_task = progress.add_task("[cyan]Processing [bold]links[/bold] chunks...", total=total_chunks)
|
|
else:
|
|
link_task = progress.add_task("[cyan]Processing [bold]links[/bold] chunks...", total=total_chunks, visible=False)
|
|
currernt_chunks= 0
|
|
while currernt_chunks < total_chunks:
|
|
size = int(os.getenv("CHUNK_SIZE"))
|
|
new_Chunk = total_links[currernt_chunks*size:min(len(total_links),(currernt_chunks+1)*size)]
|
|
if "links" in common_tables:
|
|
dataframe = process_links(new_Chunk)
|
|
if cleandata:
|
|
dataframe = dataframe.dropna()
|
|
if push:
|
|
push_to_db_coords("links", dataframe, mode)
|
|
else:
|
|
write_to_csv("links", dataframe, file)
|
|
if "links_attr" in common_tables:
|
|
dataframe = process_links_attr(new_Chunk)
|
|
if cleandata:
|
|
dataframe = dataframe.dropna()
|
|
if push:
|
|
push_to_db_coords("links_attr", dataframe, mode)
|
|
else:
|
|
write_to_csv("links_attr", dataframe, file)
|
|
progress.update(link_task, advance=1)
|
|
currernt_chunks +=1
|
|
BUFFER.clear()
|
|
continue
|
|
continue
|
|
BUFFER.append(line.strip())
|
|
console.print("[green]Processing complete![/green]")
|
|
|
|
@app.command()
|
|
def metro(
|
|
city: Annotated[City, typer.Argument(..., help="Choose a city", show_default=False)],
|
|
files: list[Path] = typer.Argument(..., help="Provide the relative path to [yellow bold underline]shape files[/yellow bold underline].", show_default=False),
|
|
cleandata: bool = typer.Option(False, "--cleandata", "-cd", help="Drop the rows that have missing values."),
|
|
push: bool = typer.Option(False, "--push", "-p", help="Save the output directly to the database When mentioned. Otherwise, saves as a [green bold]CSV file[/green bold] in the input directory"),
|
|
pushmode: Optional[DBMode] = typer.Option(None, help="Specify either [underline]'append'[/underline] or [underline]'drop'[/underline] when pushing data", show_default=False),
|
|
):
|
|
required_files = ["stm_arrets_sig", "stm_lignes_sig"]
|
|
for file in files:
|
|
table_name = "metro-stations" if file.stem == required_files[0] else "metro-lines"
|
|
if file.suffix != '.shp':
|
|
error_printer(f"File [bold]{file}[/bold] is not a .shp file.")
|
|
raise typer.Exit()
|
|
success_printer(f"Validated: [bold]{file}[/bold]")
|
|
df = metro_processing(city, file)
|
|
if df.empty:error_printer(f"processing [bold]{file}[/bold] failed"); raise typer.Exit()
|
|
if cleandata: df = df.dropna()
|
|
if push: push_to_db(table_name,df,pushmode); continue
|
|
write_to_csv(table_name,df,file)
|
|
|
|
@app.command()
|
|
def bus(
|
|
city: Annotated[City, typer.Argument(..., help="Choose a city", show_default=False)],
|
|
files: list[Path] = typer.Argument(..., help="Provide the relative path to [yellow bold underline]shape files[/yellow bold underline].", show_default=False),
|
|
cleandata: bool = typer.Option(False, "--cleandata", "-cd", help="Drop the rows that have missing values."),
|
|
push: bool = typer.Option(False, "--push", "-p", help="Save the output directly to the database when mentioned. Otherwise, saves as a [green bold]CSV file[/green bold] in the input directory"),
|
|
pushmode: Optional[DBMode] = typer.Option(None, help="Specify either [underline]'append'[/underline] or [underline]'drop'[/underline] when pushing data", show_default=False),
|
|
):
|
|
required_files = ["stm_arrets_sig", "stm_lignes_sig"]
|
|
for file in files:
|
|
table_name = "bus-stations" if file.stem == required_files[0] else "bus-lines"
|
|
if file.suffix != '.shp':error_printer(f"File {file} is not a .shp file"); raise typer.Exit()
|
|
success_printer("Shapefiles validated successfully")
|
|
df = bus_processing(city, file)
|
|
if df.empty: error_printer(f"processing [bold]{file}[/bold] failed"); raise typer.Exit()
|
|
if cleandata: df.dropna()
|
|
if push: push_to_db(table_name,df,pushmode); continue
|
|
write_to_csv(table_name,df,file)
|
|
success_printer(f"Processing {file} complete.")
|
|
|
|
if __name__ == "__main__":
|
|
app() |