zele-utils/main.py
2024-09-10 20:01:51 -04:00

169 lines
7.1 KiB
Python

import os, typer
from dotenv import load_dotenv
from rich import print
from typing_extensions import Annotated
from typing import Optional
from pathlib import Path
from typing import Tuple
from rich.console import Console
from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, TaskProgressColumn
import time
from classes import City, DBMode, RTMode
from functions import buffer_creator, process_buffer_population,process_travels, push_population, write_population
from styles import print_help
called= "population"
app = typer.Typer(rich_markup_mode="rich")
load_dotenv()
def error_printer(text):
print(f'[bold red]ERROR:[/bold red] [bold]{text}[/bold]')
def success_printer(text):
print(f'[bold green]SUCCESS:[/bold green] [bold]{text}[/bold]')
def info_printer(text):
print(f'[bold blue]INFO:[/bold blue] [bold]{text}[/bold]')
def notice_printer(text):
print(f'[bold yellow]NOTICE:[/bold yellow] [bold]{text}[/bold]')
@app.command(print_help())
def population(
file: Annotated[Path, typer.Argument(help="Provide the relative path to the [yellow bold underline]XML file[/yellow bold underline].", show_default=False)],
tables: list[str] = typer.Argument(..., help="Tables to include: [underline bold]agents[/underline bold], [underline bold]travels[/underline bold]. Use [underline bold]all[/underline bold] for everything.",show_default=False),
range: Tuple[int, int] = typer.Option(None, "--range", "-r", help="Specify the start and end of the chunk range to be processed.", show_default=False),
log: bool = typer.Option(False, "--log", "-l", help="Creates a Log file in the same directory to track the progress. Useful for large files that might be intrupted."),
cleandata: bool = typer.Option(False, "--cleandata", "-cd", help="Drop the rows that have missing values."),
push: bool = typer.Option(False, "--push", "-p", help="Save the output directly to the database When mentioned. Otherwise, Saves as a [green bold]CSV file[/green bold] in the input directory]"),
mode: Optional[DBMode] = typer.Option(None, help="Specify either [underline]'append'[/underline] or [underline]'drop'[/underline] when pushing data", show_default=False),
):
console = Console()
all_tables = ["agents","travels"]
common_tables = [item for item in tables if item in ["all"] + all_tables]
if len(common_tables) == 0:
error_printer("Incorrect table input")
raise typer.Exit()
elif "all" in common_tables:
common_tables = all_tables
info_printer(f"Tables to inlude: {common_tables}")
if not file.exists():
error_printer("File not found")
raise typer.Exit()
try:
f = open(file, 'r', encoding='utf-8')
success_printer("File Opened")
except:
error_printer("Unable to read file")
raise typer.Exit()
count = sum(1 for _ in f)
if count == 0:
error_printer("File empty")
raise typer.Exit()
else:
success_printer(f"{count + 1} lines read")
f.close()
max_chunk = 0
with open(file,'r',encoding='utf-8') as f:
for line in f:
if line.strip() == os.getenv("DIVIDER"):
max_chunk = max_chunk + 1
if max_chunk > 0:
success_printer(f"{max_chunk} Chunks found")
elif max_chunk == 0:
error_printer("Unable to find Chunks")
raise typer.Exit()
if not range:
range = [0,max_chunk-2]
info_printer(f"Chunk Range: {range}")
directory = file.parent
log_file = directory / (file.stem + ".log")
if not log:
notice_printer("Log file not created")
else:
if log_file.exists():
notice_printer(f"Log file {log_file} already exists")
else:
log_file.touch()
info_printer(f"Log file {log_file} created")
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
BarColumn(),
TaskProgressColumn(),
console=console
) as progress:
task = progress.add_task("[cyan]Processing chunks...", total=max_chunk)
current_chunk = 0
processed_line = 0
if log:
with open(log_file,'r',encoding='utf-8') as l:
log_list = l.read().splitlines()
while current_chunk < max_chunk:
if current_chunk < range[0] or current_chunk > range[1]:
processed_line, buffer = buffer_creator(file, os.getenv("DIVIDER"), processed_line, int(os.getenv("CHUNK_SIZE")))
current_chunk += 1
continue
if log and current_chunk in log_list: continue
processed_line, buffer = buffer_creator(file, os.getenv("DIVIDER"), processed_line, int(os.getenv("CHUNK_SIZE")))
if "agents" in common_tables:
dataframe = process_buffer_population(buffer)
if cleandata:
dataframe = dataframe.dropna()
if push:
push_population("agents",dataframe, mode)
else:
write_population("agents",dataframe, file)
if "travels" in common_tables:
dataframe_travels = process_travels(buffer)
if push:
push_population("travels",dataframe_travels, mode)
else:
write_population("travels",dataframe_travels, file)
if log:
f = open(log_file, "a")
f.write(f"\n{current_chunk}")
f.close()
current_chunk += 1
time.sleep(2)
progress.update(task, advance=1)
progress.update(task, visible=False)
console.print("[green]Processing complete![/green]")
@app.command()
def network(
file: Annotated[Path, typer.Argument(help="Relative path to the file.", show_default=False)],
cleandata: bool = typer.Option(False, "--cleandata", "-cd", help="Clean the data if this flag is used."),
push: bool = typer.Option(False, "--push", "-p", help="Push the data into Database.\nIf you want the output to be saved in [green bold].csv[/green bold] format, do not mention this flag."),
mode: Optional[DBMode] = typer.Option(None, help="Specify either 'amend' or 'drop' when pushing data", show_default=False),
):
if not file.exists():
error_parser("File did does not exist!")
raise typer.Exit()
data = process_network(file,cleandata)
if push:
push_network(data, mode)
else:
network_write(data)
@app.command()
def metro(
city: Annotated[City, typer.Argument(..., help="Choose a city", show_default=False)],
mode: Annotated[RTMode, typer.Argument(..., help="Choose a city", show_default=False)],
address: Annotated[str, typer.Argument(..., help="enter a relative path or URL", show_default=False)],
):
print(f"Hello {city}")
@app.command()
def bus(
city: Annotated[City, typer.Argument(..., help="Choose a city", show_default=False)],
mode: Annotated[RTMode, typer.Argument(..., help="Choose a city", show_default=False)],
address: Annotated[str, typer.Argument(..., help="enter a relative path or URL", show_default=False)],
):
print(f"Hello {city}")
if __name__ == "__main__":
app()