mirror of
https://github.com/knejadshamsi/zele-utils.git
synced 2024-11-14 17:40:28 -05:00
152 lines
6.4 KiB
Python
152 lines
6.4 KiB
Python
import os, typer
|
|
from dotenv import load_dotenv
|
|
from rich import print
|
|
from typing_extensions import Annotated
|
|
from typing import Optional
|
|
from pathlib import Path
|
|
from typing import Tuple
|
|
|
|
from classes import City, DBMode, RTMode
|
|
from functions import buffer_creator, process_buffer_population,process_travels, push_population, write_population
|
|
from styles import print_help
|
|
|
|
called= "population"
|
|
app = typer.Typer(rich_markup_mode="rich")
|
|
load_dotenv()
|
|
|
|
def error_printer(text):
|
|
print(f'[bold red]ERROR:[/bold red] [bold]{text}[/bold]')
|
|
def success_printer(text):
|
|
print(f'[bold green]SUCCESS:[/bold green] [bold]{text}[/bold]')
|
|
def info_printer(text):
|
|
print(f'[bold blue]INFO:[/bold blue] [bold]{text}[/bold]')
|
|
def notice_printer(text):
|
|
print(f'[bold yellow]NOTICE:[/bold yellow] [bold]{text}[/bold]')
|
|
|
|
@app.command(print_help())
|
|
def population(
|
|
file: Annotated[Path, typer.Argument(help="Provide the relative path to the [yellow bold underline]XML file[/yellow bold underline].", show_default=False)],
|
|
tables: list[str] = typer.Argument(..., help="Tables to include: [underline bold]agents[/underline bold], [underline bold]travels[/underline bold]. Use [underline bold]all[/underline bold] for everything.",show_default=False),
|
|
range: Tuple[int, int] = typer.Option(None, "--range", "-r", help="Specify the start and end of the chunk range to be processed.", show_default=False),
|
|
log: bool = typer.Option(False, "--log", "-l", help="Creates a Log file in the same directory to track the progress. Useful for large files that might be intrupted."),
|
|
cleandata: bool = typer.Option(False, "--cleandata", "-cd", help="Drop the rows that have missing values."),
|
|
push: bool = typer.Option(False, "--push", "-p", help="Save the output directly to the database When mentioned. Otherwise, Saves as a [green bold]CSV file[/green bold] in the input directory]"),
|
|
mode: Optional[DBMode] = typer.Option(None, help="Specify either [underline]'append'[/underline] or [underline]'drop'[/underline] when pushing data", show_default=False),
|
|
):
|
|
all_tables = ["agents","travels"]
|
|
common_tables = [item for item in tables if item in ["all"] + all_tables]
|
|
if len(common_tables) == 0:
|
|
error_printer("Incorrect table input")
|
|
raise typer.Exit()
|
|
elif "all" in common_tables:
|
|
common_tables = all_tables
|
|
info_printer(f"Tables to inlude: {common_tables}")
|
|
if not file.exists():
|
|
error_printer("File not found")
|
|
raise typer.Exit()
|
|
try:
|
|
f = open(file, 'r', encoding='utf-8')
|
|
success_printer("File Opened")
|
|
except:
|
|
error_printer("Unable to read file")
|
|
raise typer.Exit()
|
|
|
|
count = sum(1 for _ in f)
|
|
if count == 0:
|
|
error_printer("File empty")
|
|
raise typer.Exit()
|
|
else:
|
|
success_printer(f"{count + 1} lines read")
|
|
f.close()
|
|
max_chunk = 0
|
|
with open(file,'r',encoding='utf-8') as f:
|
|
for line in f:
|
|
if line.strip() == os.getenv("DIVIDER"):
|
|
max_chunk = max_chunk + 1
|
|
if max_chunk > 0:
|
|
success_printer(f"{max_chunk} Chunks found")
|
|
elif max_chunk == 0:
|
|
error_printer("Unable to find Chunks")
|
|
raise typer.Exit()
|
|
if not range:
|
|
range = [0,max_chunk-2]
|
|
info_printer(f"Chunk Range: {range}")
|
|
directory = file.parent
|
|
log_file = directory / (file.stem + ".log")
|
|
if not log:
|
|
notice_printer("Log file not created")
|
|
else:
|
|
if log_file.exists():
|
|
notice_printer(f"Log file {log_file} already exists")
|
|
else:
|
|
log_file.touch()
|
|
info_printer(f"Log file {log_file} created")
|
|
|
|
current_chunk = 0
|
|
processed_line = 0
|
|
if log:
|
|
with open(log_file,'r',encoding='utf-8') as l:
|
|
log_list = l.read().splitlines()
|
|
while current_chunk < max_chunk:
|
|
if current_chunk < range[0] or current_chunk > range[1]:
|
|
processed_line, buffer = buffer_creator(file, os.getenv("DIVIDER"), processed_line, int(os.getenv("CHUNK_SIZE")))
|
|
current_chunk += 1
|
|
continue
|
|
if log and current_chunk in log_list: continue
|
|
processed_line, buffer = buffer_creator(file, os.getenv("DIVIDER"), processed_line, int(os.getenv("CHUNK_SIZE")))
|
|
if "agents" in common_tables:
|
|
dataframe = process_buffer_population(buffer)
|
|
if cleandata:
|
|
dataframe = dataframe.dropna()
|
|
if push:
|
|
push_population("agents",dataframe, mode)
|
|
else:
|
|
write_population("agents",dataframe, file)
|
|
|
|
if "travels" in common_tables:
|
|
dataframe_travels = process_travels(buffer)
|
|
if push:
|
|
push_population("travels",dataframe_travels, mode)
|
|
else:
|
|
write_population("travels",dataframe_travels, file)
|
|
|
|
if log:
|
|
f = open(log_file, "a")
|
|
f.write(f"\n{current_chunk}")
|
|
f.close()
|
|
current_chunk += 1
|
|
|
|
@app.command()
|
|
def network(
|
|
file: Annotated[Path, typer.Argument(help="Relative path to the file.", show_default=False)],
|
|
cleandata: bool = typer.Option(False, "--cleandata", "-cd", help="Clean the data if this flag is used."),
|
|
push: bool = typer.Option(False, "--push", "-p", help="Push the data into Database.\nIf you want the output to be saved in [green bold].csv[/green bold] format, do not mention this flag."),
|
|
mode: Optional[DBMode] = typer.Option(None, help="Specify either 'amend' or 'drop' when pushing data", show_default=False),
|
|
):
|
|
if not file.exists():
|
|
error_parser("File did does not exist!")
|
|
raise typer.Exit()
|
|
data = process_network(file,cleandata)
|
|
if push:
|
|
push_network(data, mode)
|
|
else:
|
|
network_write(data)
|
|
|
|
@app.command()
|
|
def metro(
|
|
city: Annotated[City, typer.Argument(..., help="Choose a city", show_default=False)],
|
|
mode: Annotated[RTMode, typer.Argument(..., help="Choose a city", show_default=False)],
|
|
address: Annotated[str, typer.Argument(..., help="enter a relative path or URL", show_default=False)],
|
|
):
|
|
print(f"Hello {city}")
|
|
|
|
@app.command()
|
|
def bus(
|
|
city: Annotated[City, typer.Argument(..., help="Choose a city", show_default=False)],
|
|
mode: Annotated[RTMode, typer.Argument(..., help="Choose a city", show_default=False)],
|
|
address: Annotated[str, typer.Argument(..., help="enter a relative path or URL", show_default=False)],
|
|
):
|
|
print(f"Hello {city}")
|
|
|
|
if __name__ == "__main__":
|
|
app() |