import os,math,typer from bs4 import BeautifulSoup from dotenv import load_dotenv from rich import print from typing_extensions import Annotated from typing import Optional from pathlib import Path from typing import Tuple from rich.console import Console from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, TaskProgressColumn import time from classes import City, DBMode, RTMode from functions import buffer_creator, process_buffer_population,process_travels, push_to_db_coords, write_to_csv from functions import process_nodes,process_links,process_links_attr from styles import print_help import xml.etree.ElementTree as ET called= "population" app = typer.Typer(rich_markup_mode="rich") load_dotenv() def error_printer(text): print(f'[bold red]ERROR:[/bold red] [bold]{text}[/bold]') def success_printer(text): print(f'[bold green]SUCCESS:[/bold green] [bold]{text}[/bold]') def info_printer(text): print(f'[bold blue]INFO:[/bold blue] [bold]{text}[/bold]') def notice_printer(text): print(f'[bold yellow]NOTICE:[/bold yellow] [bold]{text}[/bold]') @app.command(print_help()) def population( file: Annotated[Path, typer.Argument(help="Provide the relative path to the [yellow bold underline]XML file[/yellow bold underline].", show_default=False)], tables: list[str] = typer.Argument(..., help="Tables to include: [underline bold]agents[/underline bold], [underline bold]travels[/underline bold]. Use [underline bold]all[/underline bold] for everything.",show_default=False), range: Tuple[int, int] = typer.Option(None, "--range", "-r", help="Specify the start and end of the chunk range to be processed.", show_default=False), log: bool = typer.Option(False, "--log", "-l", help="Creates a Log file in the same directory to track the progress. Useful for large files that might be intrupted."), cleandata: bool = typer.Option(False, "--cleandata", "-cd", help="Drop the rows that have missing values."), push: bool = typer.Option(False, "--push", "-p", help="Save the output directly to the database When mentioned. Otherwise, Saves as a [green bold]CSV file[/green bold] in the input directory]"), mode: Optional[DBMode] = typer.Option(None, help="Specify either [underline]'append'[/underline] or [underline]'drop'[/underline] when pushing data", show_default=False), ): console = Console() all_tables = ["agents","travels"] common_tables = [item for item in tables if item in ["all"] + all_tables] if len(common_tables) == 0: error_printer("Incorrect table input") raise typer.Exit() elif "all" in common_tables: common_tables = all_tables info_printer(f"Tables to inlude: {common_tables}") if not file.exists(): error_printer("File not found") raise typer.Exit() try: f = open(file, 'r', encoding='utf-8') success_printer("File Opened") except: error_printer("Unable to read file") raise typer.Exit() count = sum(1 for _ in f) if count == 0: error_printer("File empty") raise typer.Exit() else: success_printer(f"{count + 1} lines read") f.close() max_chunk = 0 with open(file,'r',encoding='utf-8') as f: for line in f: if line.strip() == os.getenv("DIVIDER"): max_chunk = max_chunk + 1 if max_chunk > 0: success_printer(f"{max_chunk} Chunks found") elif max_chunk == 0: error_printer("Unable to find Chunks") raise typer.Exit() if not range: range = [0,max_chunk-2] info_printer(f"Chunk Range: {range}") directory = file.parent log_file = directory / (file.stem + ".log") if not log: notice_printer("Log file not created") else: if log_file.exists(): notice_printer(f"Log file {log_file} already exists") else: log_file.touch() info_printer(f"Log file {log_file} created") with Progress( SpinnerColumn(), TextColumn("[progress.description]{task.description}"), BarColumn(), TaskProgressColumn(), console=console ) as progress: if max_chunk > 2: task = progress.add_task("[cyan]Processing chunks...", total=max_chunk) else: task = progress.add_task("[cyan]Processing chunks...", total=max_chunk, visible=False) current_chunk = 0 processed_line = 0 if log: with open(log_file,'r',encoding='utf-8') as l: log_list = l.read().splitlines() while current_chunk < max_chunk: if current_chunk < range[0] or current_chunk > range[1]: processed_line, buffer = buffer_creator(file, os.getenv("DIVIDER"), processed_line, int(os.getenv("CHUNK_SIZE"))) current_chunk += 1 continue if log and current_chunk in log_list: continue processed_line, buffer = buffer_creator(file, os.getenv("DIVIDER"), processed_line, int(os.getenv("CHUNK_SIZE"))) if "agents" in common_tables: dataframe = process_buffer_population(buffer) if cleandata: dataframe = dataframe.dropna() if push: push_to_db_coords("agents",dataframe, mode) else: write_to_csv("agents",dataframe, file) if "travels" in common_tables: dataframe_travels = process_travels(buffer) if push: push_to_db_coords("travels",dataframe_travels, mode) else: write_to_csv("travels",dataframe_travels, file) if log: f = open(log_file, "a") f.write(f"\n{current_chunk}") f.close() current_chunk += 1 time.sleep(2) progress.update(task, advance=1) progress.update(task, visible=False) console.print("[green]Processing complete![/green]") @app.command() def network( file: Annotated[Path, typer.Argument(help="Provide the relative path to the [yellow bold underline]XML file[/yellow bold underline].", show_default=False)], tables: list[str] = typer.Argument(..., help="Tables to include: [underline bold]nodes[/underline bold], [underline bold]links[/underline bold], [underline bold]links_attr[/underline bold]. Use [underline bold]all[/underline bold] for everything.",show_default=False), cleandata: bool = typer.Option(False, "--cleandata", "-cd", help="Drop the rows that have missing values."), push: bool = typer.Option(False, "--push", "-p", help="Save the output directly to the database When mentioned. Otherwise, Saves as a [green bold]CSV file[/green bold] in the input directory]"), mode: Optional[DBMode] = typer.Option(None, help="Specify either [underline]'append'[/underline] or [underline]'drop'[/underline] when pushing data", show_default=False), ): console = Console() all_tables = ["nodes","links","links_attr"] common_tables = [item for item in tables if item in ["all"] + all_tables] if len(common_tables) == 0: error_printer("Incorrect table input") raise typer.Exit() elif "all" in common_tables: common_tables = all_tables info_printer(f"Tables to inlude: {common_tables}") if not file.exists(): error_printer("File not found") raise typer.Exit() try: f = open(file, 'r', encoding='utf-8') success_printer("File Opened") except: error_printer("Unable to read file") raise typer.Exit() count = sum(1 for _ in f) if count == 0: error_printer("File empty") raise typer.Exit() else: success_printer(f"{count + 1} lines found") f.close() BUFFER = [] DEVIDER_COUNT = 0 with Progress( SpinnerColumn(), TextColumn("[progress.description]{task.description}"), BarColumn(), TaskProgressColumn(), console=console ) as progress: with open(file,'r',encoding='utf-8') as f: for line in f: if line.strip() == os.getenv("DIVIDER"): DEVIDER_COUNT = DEVIDER_COUNT + 1 if DEVIDER_COUNT == 1 or DEVIDER_COUNT > 3: continue if DEVIDER_COUNT == 2: if "node" not in common_tables: BUFFER.clear() continue else: try: element = BeautifulSoup((' ').join(BUFFER), 'lxml-xml') total_nodes = element.find_all("node") except: error_printer("node process failed") total_chunks = math.ceil(len(total_nodes)/int(os.getenv("CHUNK_SIZE"))) if total_chunks > 2: node_task = progress.add_task("[cyan]Processing [bold]nodes[/bold] chunks...", total=total_chunks) else: node_task = progress.add_task("[cyan]Processing [bold]nodes[/bold] chunks...", total=total_chunks, visible=False) success_printer(f"Chunk count: {total_chunks}") currernt_chunks= 0 while currernt_chunks < total_chunks: size = int(os.getenv("CHUNK_SIZE")) new_Chunk = total_nodes[currernt_chunks*size:min(len(total_nodes),(currernt_chunks+1)*size)] dataframe = process_nodes(new_Chunk) if cleandata: dataframe = dataframe.dropna() if push: push_to_db_coords("nodes", dataframe, mode) else: write_to_csv("nodes", dataframe,file) progress.update(node_task, advance=1) currernt_chunks =+ 1 BUFFER.clear() if DEVIDER_COUNT == 3: try: element = BeautifulSoup((' ').join(BUFFER), 'lxml-xml') total_links = element.find_all("link") except: error_printer("node process failed") total_chunks = math.ceil(len(total_links)/int(os.getenv("CHUNK_SIZE"))) success_printer(f"Chunk count: {total_chunks}") if total_chunks > 2: link_task = progress.add_task("[cyan]Processing [bold]links[/bold] chunks...", total=total_chunks) else: link_task = progress.add_task("[cyan]Processing [bold]links[/bold] chunks...", total=total_chunks, visible=False) currernt_chunks= 0 while currernt_chunks < total_chunks: size = int(os.getenv("CHUNK_SIZE")) new_Chunk = total_links[currernt_chunks*size:min(len(total_links),(currernt_chunks+1)*size)] if "links" in common_tables: dataframe = process_links(new_Chunk) if cleandata: dataframe = dataframe.dropna() if push: push_to_db_coords("links", dataframe, mode) else: write_to_csv("links", dataframe, file) if "links_attr" in common_tables: dataframe = process_links_attr(new_Chunk) if cleandata: dataframe = dataframe.dropna() if push: push_to_db_coords("links_attr", dataframe, mode) else: write_to_csv("links_attr", dataframe, file) progress.update(link_task, advance=1) currernt_chunks +=1 BUFFER.clear() continue continue BUFFER.append(line.strip()) console.print("[green]Processing complete![/green]") @app.command() def metro( city: Annotated[City, typer.Argument(..., help="Choose a city", show_default=False)], mode: Annotated[RTMode, typer.Argument(..., help="Choose a city", show_default=False)], address: Annotated[str, typer.Argument(..., help="enter a relative path or URL", show_default=False)], ): print(f"Hello {city}") @app.command() def bus( city: Annotated[City, typer.Argument(..., help="Choose a city", show_default=False)], mode: Annotated[RTMode, typer.Argument(..., help="Choose a city", show_default=False)], address: Annotated[str, typer.Argument(..., help="enter a relative path or URL", show_default=False)], ): print(f"Hello {city}") if __name__ == "__main__": app()