import os,math,typer from bs4 import BeautifulSoup from dotenv import load_dotenv from rich import print from typing_extensions import Annotated from typing import Optional from pathlib import Path from typing import Tuple from rich.console import Console from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, TaskProgressColumn import time from classes import City, DBMode, RTMode from functions import file_validator,buffer_creator, process_buffer_population,process_travels, push_to_db_coords, write_to_csv, push_to_db from functions import process_nodes,process_links,process_links_attr from functions import error_printer,success_printer,info_printer,notice_printer from functions import metro_processing, bus_processing from styles import print_help called= "population" app = typer.Typer(rich_markup_mode="rich") load_dotenv() @app.command(print_help()) def population( file: Annotated[Path, typer.Argument(help="Provide the relative path to the [yellow bold underline]XML file[/yellow bold underline].", show_default=False)], tables: list[str] = typer.Argument(..., help="Tables to include: [underline bold]agents[/underline bold], [underline bold]travels[/underline bold]. Use [underline bold]all[/underline bold] for everything.",show_default=False), range: Tuple[int, int] = typer.Option(None, "--range", "-r", help="Specify the start and end of the chunk range to be processed.", show_default=False), log: bool = typer.Option(False, "--log", "-l", help="Creates a Log file in the same directory to track the progress. Useful for large files that might be intrupted."), cleandata: bool = typer.Option(False, "--cleandata", "-cd", help="Drop the rows that have missing values."), push: bool = typer.Option(False, "--push", "-p", help="Save the output directly to the database When mentioned. Otherwise, Saves as a [green bold]CSV file[/green bold] in the input directory]"), mode: Optional[DBMode] = typer.Option(None, help="Specify either [underline]'append'[/underline] or [underline]'drop'[/underline] when pushing data", show_default=False), ): console = Console() all_tables = ["agents","travels"] common_tables = [item for item in tables if item in ["all"] + all_tables] if len(common_tables) == 0: error_printer("Incorrect table input") raise typer.Exit() elif "all" in common_tables: common_tables = all_tables info_printer(f"Tables to inlude: {common_tables}") file_validator(file) with open(file,'r',encoding='utf-8') as f: for line in f: if line.strip() == os.getenv("DIVIDER"): max_chunk = max_chunk + 1 if max_chunk > 0: success_printer(f"{max_chunk} Chunks found") elif max_chunk == 0: error_printer("Unable to find Chunks") raise typer.Exit() if not range: range = [0,max_chunk-2] info_printer(f"Chunk Range: {range}") directory = file.parent log_file = directory / (file.stem + ".log") if not log: notice_printer("Log file not created") else: if log_file.exists(): notice_printer(f"Log file {log_file} already exists") else: log_file.touch() info_printer(f"Log file {log_file} created") with Progress( SpinnerColumn(), TextColumn("[progress.description]{task.description}"), BarColumn(), TaskProgressColumn(), console=console ) as progress: if max_chunk > 2: task = progress.add_task("[cyan]Processing chunks...", total=max_chunk) else: task = progress.add_task("[cyan]Processing chunks...", total=max_chunk, visible=False) current_chunk = 0 processed_line = 0 if log: with open(log_file,'r',encoding='utf-8') as l: log_list = l.read().splitlines() while current_chunk < max_chunk: if current_chunk < range[0] or current_chunk > range[1]: processed_line, buffer = buffer_creator(file, os.getenv("DIVIDER"), processed_line, int(os.getenv("CHUNK_SIZE"))) current_chunk += 1 continue if log and current_chunk in log_list: continue processed_line, buffer = buffer_creator(file, os.getenv("DIVIDER"), processed_line, int(os.getenv("CHUNK_SIZE"))) if "agents" in common_tables: dataframe = process_buffer_population(buffer) if cleandata: dataframe = dataframe.dropna() if push: push_to_db_coords("agents",dataframe, mode) else: write_to_csv("agents",dataframe, file) if "travels" in common_tables: dataframe_travels = process_travels(buffer) if push: push_to_db_coords("travels",dataframe_travels, mode) else: write_to_csv("travels",dataframe_travels, file) if log: f = open(log_file, "a") f.write(f"\n{current_chunk}") f.close() current_chunk += 1 time.sleep(2) progress.update(task, advance=1) progress.update(task, visible=False) console.print("[green]Processing complete![/green]") @app.command() def network( file: Annotated[Path, typer.Argument(help="Provide the relative path to the [yellow bold underline]XML file[/yellow bold underline].", show_default=False)], tables: list[str] = typer.Argument(..., help="Tables to include: [underline bold]nodes[/underline bold], [underline bold]links[/underline bold], [underline bold]links_attr[/underline bold]. Use [underline bold]all[/underline bold] for everything.",show_default=False), cleandata: bool = typer.Option(False, "--cleandata", "-cd", help="Drop the rows that have missing values."), push: bool = typer.Option(False, "--push", "-p", help="Save the output directly to the database When mentioned. Otherwise, Saves as a [green bold]CSV file[/green bold] in the input directory]"), mode: Optional[DBMode] = typer.Option(None, help="Specify either [underline]'append'[/underline] or [underline]'drop'[/underline] when pushing data", show_default=False), ): console = Console() all_tables = ["nodes","links","links_attr"] common_tables = [item for item in tables if item in ["all"] + all_tables] if len(common_tables) == 0: error_printer("Incorrect table input") raise typer.Exit() elif "all" in common_tables: common_tables = all_tables info_printer(f"Tables to inlude: {common_tables}") file_validator(file) BUFFER = [] DEVIDER_COUNT = 0 with Progress( SpinnerColumn(), TextColumn("[progress.description]{task.description}"), BarColumn(), TaskProgressColumn(), console=console ) as progress: with open(file,'r',encoding='utf-8') as f: for line in f: if line.strip() == os.getenv("DIVIDER"): DEVIDER_COUNT = DEVIDER_COUNT + 1 if DEVIDER_COUNT == 1 or DEVIDER_COUNT > 3: continue if DEVIDER_COUNT == 2: if "node" not in common_tables: BUFFER.clear() continue else: try: element = BeautifulSoup((' ').join(BUFFER), 'lxml-xml') total_nodes = element.find_all("node") except: error_printer("node process failed") total_chunks = math.ceil(len(total_nodes)/int(os.getenv("CHUNK_SIZE"))) if total_chunks > 2: node_task = progress.add_task("[cyan]Processing [bold]nodes[/bold] chunks...", total=total_chunks) else: node_task = progress.add_task("[cyan]Processing [bold]nodes[/bold] chunks...", total=total_chunks, visible=False) success_printer(f"Chunk count: {total_chunks}") currernt_chunks= 0 while currernt_chunks < total_chunks: size = int(os.getenv("CHUNK_SIZE")) new_Chunk = total_nodes[currernt_chunks*size:min(len(total_nodes),(currernt_chunks+1)*size)] dataframe = process_nodes(new_Chunk) if cleandata: dataframe = dataframe.dropna() if push: push_to_db_coords("nodes", dataframe, mode) else: write_to_csv("nodes", dataframe,file) progress.update(node_task, advance=1) currernt_chunks =+ 1 BUFFER.clear() if DEVIDER_COUNT == 3: try: element = BeautifulSoup((' ').join(BUFFER), 'lxml-xml') total_links = element.find_all("link") except: error_printer("node process failed") total_chunks = math.ceil(len(total_links)/int(os.getenv("CHUNK_SIZE"))) success_printer(f"Chunk count: {total_chunks}") if total_chunks > 2: link_task = progress.add_task("[cyan]Processing [bold]links[/bold] chunks...", total=total_chunks) else: link_task = progress.add_task("[cyan]Processing [bold]links[/bold] chunks...", total=total_chunks, visible=False) currernt_chunks= 0 while currernt_chunks < total_chunks: size = int(os.getenv("CHUNK_SIZE")) new_Chunk = total_links[currernt_chunks*size:min(len(total_links),(currernt_chunks+1)*size)] if "links" in common_tables: dataframe = process_links(new_Chunk) if cleandata: dataframe = dataframe.dropna() if push: push_to_db_coords("links", dataframe, mode) else: write_to_csv("links", dataframe, file) if "links_attr" in common_tables: dataframe = process_links_attr(new_Chunk) if cleandata: dataframe = dataframe.dropna() if push: push_to_db_coords("links_attr", dataframe, mode) else: write_to_csv("links_attr", dataframe, file) progress.update(link_task, advance=1) currernt_chunks +=1 BUFFER.clear() continue continue BUFFER.append(line.strip()) console.print("[green]Processing complete![/green]") @app.command() def metro( city: Annotated[City, typer.Argument(..., help="Choose a city", show_default=False)], files: list[Path] = typer.Argument(..., help="Provide the relative path to [yellow bold underline]shape files[/yellow bold underline].", show_default=False), cleandata: bool = typer.Option(False, "--cleandata", "-cd", help="Drop the rows that have missing values."), push: bool = typer.Option(False, "--push", "-p", help="Save the output directly to the database When mentioned. Otherwise, saves as a [green bold]CSV file[/green bold] in the input directory"), pushmode: Optional[DBMode] = typer.Option(None, help="Specify either [underline]'append'[/underline] or [underline]'drop'[/underline] when pushing data", show_default=False), ): required_files = ["stm_arrets_sig", "stm_lignes_sig"] for file in files: table_name = "metro-stations" if file.stem == required_files[0] else "metro-lines" if file.suffix != '.shp': error_printer(f"File [bold]{file}[/bold] is not a .shp file.") raise typer.Exit() success_printer(f"Validated: [bold]{file}[/bold]") df = metro_processing(city, file) if df.empty:error_printer(f"processing [bold]{file}[/bold] failed"); raise typer.Exit() if cleandata: df = df.dropna() if push: push_to_db(table_name,df,pushmode); continue write_to_csv(table_name,df,file) @app.command() def bus( city: Annotated[City, typer.Argument(..., help="Choose a city", show_default=False)], files: list[Path] = typer.Argument(..., help="Provide the relative path to [yellow bold underline]shape files[/yellow bold underline].", show_default=False), cleandata: bool = typer.Option(False, "--cleandata", "-cd", help="Drop the rows that have missing values."), push: bool = typer.Option(False, "--push", "-p", help="Save the output directly to the database when mentioned. Otherwise, saves as a [green bold]CSV file[/green bold] in the input directory"), pushmode: Optional[DBMode] = typer.Option(None, help="Specify either [underline]'append'[/underline] or [underline]'drop'[/underline] when pushing data", show_default=False), ): required_files = ["stm_arrets_sig", "stm_lignes_sig"] for file in files: table_name = "bus-stations" if file.stem == required_files[0] else "bus-lines" if file.suffix != '.shp':error_printer(f"File {file} is not a .shp file"); raise typer.Exit() success_printer("Shapefiles validated successfully") df = bus_processing(city, file) if df.empty: error_printer(f"processing [bold]{file}[/bold] failed"); raise typer.Exit() if cleandata: df.dropna() if push: push_to_db(table_name,df,pushmode); continue write_to_csv(table_name,df,file) success_printer(f"Processing {file} complete.") if __name__ == "__main__": app()