zele-utils/main.py
2024-09-15 17:04:21 -04:00

289 lines
15 KiB
Python

import os,math,typer
from bs4 import BeautifulSoup
from dotenv import load_dotenv
from rich import print
from typing_extensions import Annotated
from typing import Optional
from pathlib import Path
from typing import Tuple
from rich.console import Console
from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, TaskProgressColumn
import time
from classes import City, DBMode, RTMode
from functions import file_validator,buffer_creator, process_buffer_population,process_travels, push_to_db_coords, write_to_csv, push_to_db_linestring
from functions import process_nodes,process_links,process_links_attr
from functions import error_printer,success_printer,info_printer,notice_printer
from functions import metro_processing, bus_processing
from styles import print_help
called= "population"
app = typer.Typer(rich_markup_mode="rich")
load_dotenv()
@app.command(print_help())
def population(
file: Annotated[Path, typer.Argument(help="Provide the relative path to the [yellow bold underline]XML file[/yellow bold underline].", show_default=False)],
tables: list[str] = typer.Argument(..., help="Tables to include: [underline bold]agents[/underline bold], [underline bold]travels[/underline bold]. Use [underline bold]all[/underline bold] for everything.",show_default=False),
range: Tuple[int, int] = typer.Option(None, "--range", "-r", help="Specify the start and end of the chunk range to be processed.", show_default=False),
log: bool = typer.Option(False, "--log", "-l", help="Creates a Log file in the same directory to track the progress. Useful for large files that might be intrupted."),
cleandata: bool = typer.Option(False, "--cleandata", "-cd", help="Drop the rows that have missing values."),
push: bool = typer.Option(False, "--push", "-p", help="Save the output directly to the database When mentioned. Otherwise, Saves as a [green bold]CSV file[/green bold] in the input directory]"),
mode: Optional[DBMode] = typer.Option(None, help="Specify either [underline]'append'[/underline] or [underline]'drop'[/underline] when pushing data", show_default=False),
):
console = Console()
all_tables = ["agents","travels"]
common_tables = [item for item in tables if item in ["all"] + all_tables]
if len(common_tables) == 0:
error_printer("Incorrect table input")
raise typer.Exit()
elif "all" in common_tables:
common_tables = all_tables
info_printer(f"Tables to inlude: {common_tables}")
file_validator(file)
with open(file,'r',encoding='utf-8') as f:
for line in f:
if line.strip() == os.getenv("DIVIDER"):
max_chunk = max_chunk + 1
if max_chunk > 0:
success_printer(f"{max_chunk} Chunks found")
elif max_chunk == 0:
error_printer("Unable to find Chunks")
raise typer.Exit()
if not range:
range = [0,max_chunk-2]
info_printer(f"Chunk Range: {range}")
directory = file.parent
log_file = directory / (file.stem + ".log")
if not log:
notice_printer("Log file not created")
else:
if log_file.exists():
notice_printer(f"Log file {log_file} already exists")
else:
log_file.touch()
info_printer(f"Log file {log_file} created")
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
BarColumn(),
TaskProgressColumn(),
console=console
) as progress:
if max_chunk > 2:
task = progress.add_task("[cyan]Processing chunks...", total=max_chunk)
else:
task = progress.add_task("[cyan]Processing chunks...", total=max_chunk, visible=False)
current_chunk = 0
processed_line = 0
if log:
with open(log_file,'r',encoding='utf-8') as l:
log_list = l.read().splitlines()
while current_chunk < max_chunk:
if current_chunk < range[0] or current_chunk > range[1]:
processed_line, buffer = buffer_creator(file, os.getenv("DIVIDER"), processed_line, int(os.getenv("CHUNK_SIZE")))
current_chunk += 1
continue
if log and current_chunk in log_list: continue
processed_line, buffer = buffer_creator(file, os.getenv("DIVIDER"), processed_line, int(os.getenv("CHUNK_SIZE")))
if "agents" in common_tables:
dataframe = process_buffer_population(buffer)
if cleandata:
dataframe = dataframe.dropna()
if push:
push_to_db_coords("agents",dataframe, mode)
else:
write_to_csv("agents",dataframe, file)
if "travels" in common_tables:
dataframe_travels = process_travels(buffer)
if push:
push_to_db_coords("travels",dataframe_travels, mode)
else:
write_to_csv("travels",dataframe_travels, file)
if log:
f = open(log_file, "a")
f.write(f"\n{current_chunk}")
f.close()
current_chunk += 1
time.sleep(2)
progress.update(task, advance=1)
progress.update(task, visible=False)
console.print("[green]Processing complete![/green]")
@app.command()
def network(
file: Annotated[Path, typer.Argument(help="Provide the relative path to the [yellow bold underline]XML file[/yellow bold underline].", show_default=False)],
tables: list[str] = typer.Argument(..., help="Tables to include: [underline bold]nodes[/underline bold], [underline bold]links[/underline bold], [underline bold]links_attr[/underline bold]. Use [underline bold]all[/underline bold] for everything.",show_default=False),
cleandata: bool = typer.Option(False, "--cleandata", "-cd", help="Drop the rows that have missing values."),
push: bool = typer.Option(False, "--push", "-p", help="Save the output directly to the database When mentioned. Otherwise, Saves as a [green bold]CSV file[/green bold] in the input directory]"),
mode: Optional[DBMode] = typer.Option(None, help="Specify either [underline]'append'[/underline] or [underline]'drop'[/underline] when pushing data", show_default=False),
):
console = Console()
all_tables = ["nodes","links","links_attr"]
common_tables = [item for item in tables if item in ["all"] + all_tables]
if len(common_tables) == 0:
error_printer("Incorrect table input")
raise typer.Exit()
elif "all" in common_tables:
common_tables = all_tables
info_printer(f"Tables to inlude: {common_tables}")
file_validator(file)
BUFFER = []
DEVIDER_COUNT = 0
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
BarColumn(),
TaskProgressColumn(),
console=console
) as progress:
with open(file,'r',encoding='utf-8') as f:
for line in f:
if line.strip() == os.getenv("DIVIDER"):
DEVIDER_COUNT = DEVIDER_COUNT + 1
if DEVIDER_COUNT == 1 or DEVIDER_COUNT > 3: continue
if DEVIDER_COUNT == 2:
if "node" not in common_tables:
BUFFER.clear()
continue
else:
try:
element = BeautifulSoup((' ').join(BUFFER), 'lxml-xml')
total_nodes = element.find_all("node")
except:
error_printer("node process failed")
total_chunks = math.ceil(len(total_nodes)/int(os.getenv("CHUNK_SIZE")))
if total_chunks > 2:
node_task = progress.add_task("[cyan]Processing [bold]nodes[/bold] chunks...", total=total_chunks)
else:
node_task = progress.add_task("[cyan]Processing [bold]nodes[/bold] chunks...", total=total_chunks, visible=False)
success_printer(f"Chunk count: {total_chunks}")
currernt_chunks= 0
while currernt_chunks < total_chunks:
size = int(os.getenv("CHUNK_SIZE"))
new_Chunk = total_nodes[currernt_chunks*size:min(len(total_nodes),(currernt_chunks+1)*size)]
dataframe = process_nodes(new_Chunk)
if cleandata:
dataframe = dataframe.dropna()
if push:
push_to_db_coords("nodes", dataframe, mode)
else:
write_to_csv("nodes", dataframe,file)
progress.update(node_task, advance=1)
currernt_chunks =+ 1
BUFFER.clear()
if DEVIDER_COUNT == 3:
try:
element = BeautifulSoup((' ').join(BUFFER), 'lxml-xml')
total_links = element.find_all("link")
except:
error_printer("node process failed")
total_chunks = math.ceil(len(total_links)/int(os.getenv("CHUNK_SIZE")))
success_printer(f"Chunk count: {total_chunks}")
if total_chunks > 2:
link_task = progress.add_task("[cyan]Processing [bold]links[/bold] chunks...", total=total_chunks)
else:
link_task = progress.add_task("[cyan]Processing [bold]links[/bold] chunks...", total=total_chunks, visible=False)
currernt_chunks= 0
while currernt_chunks < total_chunks:
size = int(os.getenv("CHUNK_SIZE"))
new_Chunk = total_links[currernt_chunks*size:min(len(total_links),(currernt_chunks+1)*size)]
if "links" in common_tables:
dataframe = process_links(new_Chunk)
if cleandata:
dataframe = dataframe.dropna()
if push:
push_to_db_coords("links", dataframe, mode)
else:
write_to_csv("links", dataframe, file)
if "links_attr" in common_tables:
dataframe = process_links_attr(new_Chunk)
if cleandata:
dataframe = dataframe.dropna()
if push:
push_to_db_coords("links_attr", dataframe, mode)
else:
write_to_csv("links_attr", dataframe, file)
progress.update(link_task, advance=1)
currernt_chunks +=1
BUFFER.clear()
continue
continue
BUFFER.append(line.strip())
console.print("[green]Processing complete![/green]")
@app.command()
def metro(
city: Annotated[City, typer.Argument(..., help="Choose a city", show_default=False)],
files: list[Path] = typer.Option(None, "--files", "-f", help="Provide the relative path to [yellow bold underline]shape files[/yellow bold underline].", show_default=False),
cleandata: bool = typer.Option(False, "--cleandata", "-cd", help="Drop the rows that have missing values."),
push: bool = typer.Option(False, "--push", "-p", help="Save the output directly to the database When mentioned. Otherwise, saves as a [green bold]CSV file[/green bold] in the input directory"),
pushmode: Optional[DBMode] = typer.Option(None, help="Specify either [underline]'append'[/underline] or [underline]'drop'[/underline] when pushing data", show_default=False),
):
for file in files:
if not file.exists():
error_printer(f"Shapefile {file} does not exist.")
raise typer.Exit()
if file.suffix != '.shp':
error_printer(f"File {file} is not a .shp file.")
raise typer.Exit()
success_printer("Shapefiles validated successfully.")
metro_stations_df, metro_lines_df = metro_processing(city, files)
if not metro_stations_df or not metro_lines_df:
error_printer("dataframes were processed successfully")
raise typer.Exit()
if cleandata:
if metro_stations_df: metro_stations_df = metro_stations_df.dropna()
if metro_lines_df: metro_lines_df = metro_lines_df.dropna()
if push:
if metro_stations_df: push_to_db_coords("metro-stations",metro_stations_df,pushmode)
if metro_lines_df: push_to_db_linestring("metro-lines",metro_lines_df, pushmode)
else:
if metro_stations_df: write_to_csv("metro-stations",metro_stations_df,file)
if metro_lines_df: write_to_csv("metro-lines",metro_lines_df,file)
success_printer("Processing complete.")
@app.command()
def bus(
city: Annotated[City, typer.Argument(..., help="Choose a city", show_default=False)],
files: list[Path] = typer.Option(None, "--files", "-f", help="Provide the relative path to [yellow bold underline]shape files[/yellow bold underline].", show_default=False),
cleandata: bool = typer.Option(False, "--cleandata", "-cd", help="Drop the rows that have missing values."),
push: bool = typer.Option(False, "--push", "-p", help="Save the output directly to the database when mentioned. Otherwise, saves as a [green bold]CSV file[/green bold] in the input directory"),
pushmode: Optional[DBMode] = typer.Option(None, help="Specify either [underline]'append'[/underline] or [underline]'drop'[/underline] when pushing data", show_default=False),
):
for file in files:
if not file.exists():
error_printer(f"Shapefile {file} does not exist.")
raise typer.Exit()
if file.suffix != '.shp':
error_printer(f"File {file} is not a .shp file.")
raise typer.Exit()
success_printer("Shapefiles validated successfully.")
bus_stations_df, bus_lines_df = bus_processing(city, files)
if not bus_stations_df and not bus_lines_df:
error_printer("No dataframes were processed successfully.")
raise typer.Exit()
if cleandata:
if bus_stations_df is not None:
bus_stations_df = bus_stations_df.dropna()
if bus_lines_df is not None:
bus_lines_df = bus_lines_df.dropna()
if push:
if bus_stations_df is not None:
push_to_db_coords("bus-stations", bus_stations_df, pushmode)
if bus_lines_df is not None:
push_to_db_linestring("bus-lines", bus_lines_df, pushmode)
else:
if bus_stations_df is not None:
write_to_csv("bus-stations", bus_stations_df, file)
if bus_lines_df is not None:
write_to_csv("bus-lines", bus_lines_df, file)
success_printer("Processing complete.")
if __name__ == "__main__":
app()