zele-utils/main.py
2024-09-16 08:38:38 -04:00

261 lines
14 KiB
Python

import os,math,typer
from bs4 import BeautifulSoup
from dotenv import load_dotenv
from rich import print
from typing_extensions import Annotated
from typing import Optional
from pathlib import Path
from typing import Tuple
from rich.console import Console
from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, TaskProgressColumn
import time
from classes import City, DBMode, RTMode
from functions import file_validator,buffer_creator, process_buffer_population,process_travels, push_to_db_coords, write_to_csv, push_to_db
from functions import process_nodes,process_links,process_links_attr
from functions import error_printer,success_printer,info_printer,notice_printer
from functions import metro_processing, bus_processing
from styles import print_help
called= "population"
app = typer.Typer(rich_markup_mode="rich")
load_dotenv()
@app.command(print_help())
def population(
file: Annotated[Path, typer.Argument(help="Provide the relative path to the [yellow bold underline]XML file[/yellow bold underline].", show_default=False)],
tables: list[str] = typer.Argument(..., help="Tables to include: [underline bold]agents[/underline bold], [underline bold]travels[/underline bold]. Use [underline bold]all[/underline bold] for everything.",show_default=False),
range: Tuple[int, int] = typer.Option(None, "--range", "-r", help="Specify the start and end of the chunk range to be processed.", show_default=False),
log: bool = typer.Option(False, "--log", "-l", help="Creates a Log file in the same directory to track the progress. Useful for large files that might be intrupted."),
cleandata: bool = typer.Option(False, "--cleandata", "-cd", help="Drop the rows that have missing values."),
push: bool = typer.Option(False, "--push", "-p", help="Save the output directly to the database When mentioned. Otherwise, Saves as a [green bold]CSV file[/green bold] in the input directory]"),
mode: Optional[DBMode] = typer.Option(None, help="Specify either [underline]'append'[/underline] or [underline]'drop'[/underline] when pushing data", show_default=False),
):
console = Console()
all_tables = ["agents","travels"]
common_tables = [item for item in tables if item in ["all"] + all_tables]
if len(common_tables) == 0:
error_printer("Incorrect table input")
raise typer.Exit()
elif "all" in common_tables:
common_tables = all_tables
info_printer(f"Tables to inlude: {common_tables}")
file_validator(file)
with open(file,'r',encoding='utf-8') as f:
for line in f:
if line.strip() == os.getenv("DIVIDER"):
max_chunk = max_chunk + 1
if max_chunk > 0:
success_printer(f"{max_chunk} Chunks found")
elif max_chunk == 0:
error_printer("Unable to find Chunks")
raise typer.Exit()
if not range:
range = [0,max_chunk-2]
info_printer(f"Chunk Range: {range}")
directory = file.parent
log_file = directory / (file.stem + ".log")
if not log:
notice_printer("Log file not created")
else:
if log_file.exists():
notice_printer(f"Log file {log_file} already exists")
else:
log_file.touch()
info_printer(f"Log file {log_file} created")
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
BarColumn(),
TaskProgressColumn(),
console=console
) as progress:
if max_chunk > 2:
task = progress.add_task("[cyan]Processing chunks...", total=max_chunk)
else:
task = progress.add_task("[cyan]Processing chunks...", total=max_chunk, visible=False)
current_chunk = 0
processed_line = 0
if log:
with open(log_file,'r',encoding='utf-8') as l:
log_list = l.read().splitlines()
while current_chunk < max_chunk:
if current_chunk < range[0] or current_chunk > range[1]:
processed_line, buffer = buffer_creator(file, os.getenv("DIVIDER"), processed_line, int(os.getenv("CHUNK_SIZE")))
current_chunk += 1
continue
if log and current_chunk in log_list: continue
processed_line, buffer = buffer_creator(file, os.getenv("DIVIDER"), processed_line, int(os.getenv("CHUNK_SIZE")))
if "agents" in common_tables:
dataframe = process_buffer_population(buffer)
if cleandata:
dataframe = dataframe.dropna()
if push:
push_to_db_coords("agents",dataframe, mode)
else:
write_to_csv("agents",dataframe, file)
if "travels" in common_tables:
dataframe_travels = process_travels(buffer)
if push:
push_to_db_coords("travels",dataframe_travels, mode)
else:
write_to_csv("travels",dataframe_travels, file)
if log:
f = open(log_file, "a")
f.write(f"\n{current_chunk}")
f.close()
current_chunk += 1
time.sleep(2)
progress.update(task, advance=1)
progress.update(task, visible=False)
console.print("[green]Processing complete![/green]")
@app.command()
def network(
file: Annotated[Path, typer.Argument(help="Provide the relative path to the [yellow bold underline]XML file[/yellow bold underline].", show_default=False)],
tables: list[str] = typer.Argument(..., help="Tables to include: [underline bold]nodes[/underline bold], [underline bold]links[/underline bold], [underline bold]links_attr[/underline bold]. Use [underline bold]all[/underline bold] for everything.",show_default=False),
cleandata: bool = typer.Option(False, "--cleandata", "-cd", help="Drop the rows that have missing values."),
push: bool = typer.Option(False, "--push", "-p", help="Save the output directly to the database When mentioned. Otherwise, Saves as a [green bold]CSV file[/green bold] in the input directory]"),
mode: Optional[DBMode] = typer.Option(None, help="Specify either [underline]'append'[/underline] or [underline]'drop'[/underline] when pushing data", show_default=False),
):
console = Console()
all_tables = ["nodes","links","links_attr"]
common_tables = [item for item in tables if item in ["all"] + all_tables]
if len(common_tables) == 0:
error_printer("Incorrect table input")
raise typer.Exit()
elif "all" in common_tables:
common_tables = all_tables
info_printer(f"Tables to inlude: {common_tables}")
file_validator(file)
BUFFER = []
DEVIDER_COUNT = 0
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
BarColumn(),
TaskProgressColumn(),
console=console
) as progress:
with open(file,'r',encoding='utf-8') as f:
for line in f:
if line.strip() == os.getenv("DIVIDER"):
DEVIDER_COUNT = DEVIDER_COUNT + 1
if DEVIDER_COUNT == 1 or DEVIDER_COUNT > 3: continue
if DEVIDER_COUNT == 2:
if "node" not in common_tables:
BUFFER.clear()
continue
else:
try:
element = BeautifulSoup((' ').join(BUFFER), 'lxml-xml')
total_nodes = element.find_all("node")
except:
error_printer("node process failed")
total_chunks = math.ceil(len(total_nodes)/int(os.getenv("CHUNK_SIZE")))
if total_chunks > 2:
node_task = progress.add_task("[cyan]Processing [bold]nodes[/bold] chunks...", total=total_chunks)
else:
node_task = progress.add_task("[cyan]Processing [bold]nodes[/bold] chunks...", total=total_chunks, visible=False)
success_printer(f"Chunk count: {total_chunks}")
currernt_chunks= 0
while currernt_chunks < total_chunks:
size = int(os.getenv("CHUNK_SIZE"))
new_Chunk = total_nodes[currernt_chunks*size:min(len(total_nodes),(currernt_chunks+1)*size)]
dataframe = process_nodes(new_Chunk)
if cleandata:
dataframe = dataframe.dropna()
if push:
push_to_db_coords("nodes", dataframe, mode)
else:
write_to_csv("nodes", dataframe,file)
progress.update(node_task, advance=1)
currernt_chunks =+ 1
BUFFER.clear()
if DEVIDER_COUNT == 3:
try:
element = BeautifulSoup((' ').join(BUFFER), 'lxml-xml')
total_links = element.find_all("link")
except:
error_printer("node process failed")
total_chunks = math.ceil(len(total_links)/int(os.getenv("CHUNK_SIZE")))
success_printer(f"Chunk count: {total_chunks}")
if total_chunks > 2:
link_task = progress.add_task("[cyan]Processing [bold]links[/bold] chunks...", total=total_chunks)
else:
link_task = progress.add_task("[cyan]Processing [bold]links[/bold] chunks...", total=total_chunks, visible=False)
currernt_chunks= 0
while currernt_chunks < total_chunks:
size = int(os.getenv("CHUNK_SIZE"))
new_Chunk = total_links[currernt_chunks*size:min(len(total_links),(currernt_chunks+1)*size)]
if "links" in common_tables:
dataframe = process_links(new_Chunk)
if cleandata:
dataframe = dataframe.dropna()
if push:
push_to_db_coords("links", dataframe, mode)
else:
write_to_csv("links", dataframe, file)
if "links_attr" in common_tables:
dataframe = process_links_attr(new_Chunk)
if cleandata:
dataframe = dataframe.dropna()
if push:
push_to_db_coords("links_attr", dataframe, mode)
else:
write_to_csv("links_attr", dataframe, file)
progress.update(link_task, advance=1)
currernt_chunks +=1
BUFFER.clear()
continue
continue
BUFFER.append(line.strip())
console.print("[green]Processing complete![/green]")
@app.command()
def metro(
city: Annotated[City, typer.Argument(..., help="Choose a city", show_default=False)],
files: list[Path] = typer.Argument(..., help="Provide the relative path to [yellow bold underline]shape files[/yellow bold underline].", show_default=False),
cleandata: bool = typer.Option(False, "--cleandata", "-cd", help="Drop the rows that have missing values."),
push: bool = typer.Option(False, "--push", "-p", help="Save the output directly to the database When mentioned. Otherwise, saves as a [green bold]CSV file[/green bold] in the input directory"),
pushmode: Optional[DBMode] = typer.Option(None, help="Specify either [underline]'append'[/underline] or [underline]'drop'[/underline] when pushing data", show_default=False),
):
required_files = ["stm_arrets_sig", "stm_lignes_sig"]
for file in files:
table_name = "metro-stations" if file.stem == required_files[0] else "metro-lines"
if file.suffix != '.shp':
error_printer(f"File [bold]{file}[/bold] is not a .shp file.")
raise typer.Exit()
success_printer(f"Validated: [bold]{file}[/bold]")
df = metro_processing(city, file)
if df.empty:error_printer(f"processing [bold]{file}[/bold] failed"); raise typer.Exit()
if cleandata: df = df.dropna()
if push: push_to_db(table_name,df,pushmode); continue
write_to_csv(table_name,df,file)
@app.command()
def bus(
city: Annotated[City, typer.Argument(..., help="Choose a city", show_default=False)],
files: list[Path] = typer.Argument(..., help="Provide the relative path to [yellow bold underline]shape files[/yellow bold underline].", show_default=False),
cleandata: bool = typer.Option(False, "--cleandata", "-cd", help="Drop the rows that have missing values."),
push: bool = typer.Option(False, "--push", "-p", help="Save the output directly to the database when mentioned. Otherwise, saves as a [green bold]CSV file[/green bold] in the input directory"),
pushmode: Optional[DBMode] = typer.Option(None, help="Specify either [underline]'append'[/underline] or [underline]'drop'[/underline] when pushing data", show_default=False),
):
required_files = ["stm_arrets_sig", "stm_lignes_sig"]
for file in files:
table_name = "bus-stations" if file.stem == required_files[0] else "bus-lines"
if file.suffix != '.shp':error_printer(f"File {file} is not a .shp file"); raise typer.Exit()
success_printer("Shapefiles validated successfully")
df = bus_processing(city, file)
if df.empty: error_printer(f"processing [bold]{file}[/bold] failed"); raise typer.Exit()
if cleandata: df.dropna()
if push: push_to_db(table_name,df,pushmode); continue
write_to_csv(table_name,df,file)
success_printer(f"Processing {file} complete.")
if __name__ == "__main__":
app()