population added + help improved

This commit is contained in:
Kian 2024-09-10 17:40:50 -04:00
parent 87b0db002a
commit 52be6c060e
7 changed files with 214 additions and 41 deletions

12
classes.py Normal file
View File

@ -0,0 +1,12 @@
from enum import Enum
class City(str, Enum):
mtl = "mtl"
class DBMode(str, Enum):
drop = "drop"
append = "append"
class RTMode(str, Enum):
online = "online"
offline = "offline"

View File

@ -1,11 +1,13 @@
from .population import process_population, push_population,population_write
from .population import process_buffer_population, push_population,write_population
from .network import process_network, push_network, network_write
from .metro import process_metro, push_metro, metro_write
from .bus import process_bus, push_bus, bus_write
from .helpers import buffer_creator
__all__ = [
'process_population', 'push_population', 'population_write',
'process_buffer_population', 'push_population', 'write_population',
'process_network', 'push_network', 'network_write',
'process_metro', 'push_metro', 'metro_write',
'process_bus', 'push_bus', 'bus_write'
'process_bus', 'push_bus', 'bus_write',
'buffer_creator'
]

16
functions/helpers.py Normal file
View File

@ -0,0 +1,16 @@
def buffer_creator(file,divider,start_line, chunk_size):
buffer = []
line_number = start_line
current_line = 0
divider_count = 0
with open(file,'r',encoding='utf-8') as f:
for line in f:
current_line += 1
if (current_line <= line_number): continue
if (line.strip()== divider):
divider_count = divider_count + 1
if divider_count == chunk_size: break
continue
buffer.append(line.strip())
return current_line,(' ').join(buffer)

View File

@ -1,9 +1,46 @@
from bs4 import BeautifulSoup
import pandas,geopandas, pyproj, re, os, datetime
from shapely.geometry import Point
from sqlalchemy import create_engine
from geoalchemy2 import Geometry, WKTElement
def process_population(data, cleandata):
print(data, cleandata)
def camel_to_snake(name):
return re.sub(r'(?<!^)(?=[A-Z])', '_', name).lower()
def process_buffer_population(data):
transformer = pyproj.Transformer.from_crs('EPSG:2950', 'EPSG:4326', always_xy=True)
PERSON_LIST = []
elements = BeautifulSoup(data,'html.parser')
for person in elements.find_all('person'):
person_obj = {}
person_obj['id'] = person['id']
activity = person.find('plan').find('activity')
lat, lon = transformer.transform(activity['x'], activity['y'])
person_obj['coordinates'] = Point(lon,lat)
person_obj['time'] = activity['end_time']
for attr in person.find_all('attribute'):
person_obj[camel_to_snake(attr['name'])] = attr.get_text()
PERSON_LIST.append(person_obj)
return pandas.DataFrame(PERSON_LIST)
def push_population(data,mode):
print(data,mode)
GDF = geopandas.GeoDataFrame(data, crs='EPSG:4326')
GDF['geom'] = GDF['coordinates'].apply(lambda x: WKTElement(x.wkt, srid=os.getenv("SRID")))
engine = create_engine(f'postgresql://{os.getenv("USER")}:{os.getenv("PASS")}@{os.getenv("HOST_NAME")}/{os.getenv("DATA_BASE")}', echo=False)
GDF.to_sql(
name='agents',
con=engine,
if_exists=mode,
chunksize=os.getenv("CHUNK_SIZE"),
dtype={'geom': Geometry('Point', srid=os.getenv("SRID"))},
index=False
)
def population_write(data):
print(data)
def write_population(data, file):
directory = file.parent
id = datetime.datetime.now().strftime("%Y%m%d")
csv = directory / (file.stem + id +".csv")
if csv.exists():
data.to_csv(csv, mode='a',index=False)
else:
data.to_csv(csv,index=False)

129
main.py
View File

@ -1,55 +1,118 @@
import typer, geopandas, requests, shapely
from bs4 import BeautifulSoup
from zipfile import ZipFile
import os, typer
from dotenv import load_dotenv
from rich import print
from typing_extensions import Annotated
from typing import Optional
from pathlib import Path
from enum import Enum
from typing import Tuple
from functions import process_population, push_population, population_write
from functions import process_network, push_network, network_write
from functions import process_metro, push_metro, metro_write
from functions import process_bus, push_bus, bus_write
from classes import City, DBMode, RTMode
from functions import buffer_creator, process_buffer_population, push_population, write_population
from styles import print_help
app = typer.Typer()
called= "population"
app = typer.Typer(rich_markup_mode="rich")
load_dotenv()
class City(str, Enum):
mtl = "mtl"
def error_printer(text):
print(f'[bold red]ERROR:[/bold red] [bold]{text}[/bold]')
def success_printer(text):
print(f'[bold green]SUCCESS:[/bold green] [bold]{text}[/bold]')
def info_printer(text):
print(f'[bold blue]INFO:[/bold blue] [bold]{text}[/bold]')
def notice_printer(text):
print(f'[bold yellow]NOTICE:[/bold yellow] [bold]{text}[/bold]')
class DBMode(str, Enum):
drop = "drop"
amend = "amend"
class RTMode(str, Enum):
online = "online"
offline = "offline"
@app.command()
@app.command(print_help())
def population(
file: Annotated[Path, typer.Argument(help="Relative path to the file.", show_default=False)],
cleandata: bool = typer.Option(False, "--cleandata", "-cd", help="Clean the data if this flag is used."),
push: bool = typer.Option(False, "--push", "-p", help="Push the data into Database."),
mode: Optional[DBMode] = typer.Option(None, help="Specify either 'amend' or 'drop' when pushing data", show_default=False),
file: Annotated[Path, typer.Argument(help="Provide the relative path to the [yellow bold underline]XML file[/yellow bold underline].", show_default=False)],
tables: list[str] = typer.Argument(..., help="Tables to include: [underline bold]agents[/underline bold], [underline bold]travels[/underline bold]. Use [underline bold]all[/underline bold] for everything.",show_default=False),
range: Tuple[int, int] = typer.Option(None, "--range", "-r", help="Specify the start and end of the chunk range to be processed.", show_default=False),
log: bool = typer.Option(False, "--log", "-l", help="Creates a Log file in the same directory to track the progress. Useful for large files that might be intrupted."),
cleandata: bool = typer.Option(False, "--cleandata", "-cd", help="Drop the rows that have missing values."),
push: bool = typer.Option(False, "--push", "-p", help="Save the output directly to the database When mentioned. Otherwise, Saves as a [green bold]CSV file[/green bold] in the input directory]"),
mode: Optional[DBMode] = typer.Option(None, help="Specify either [underline]'append'[/underline] or [underline]'drop'[/underline] when pushing data", show_default=False),
):
if not file.exists():
print("File did does not exist!")
all_tables = ["agents","travels"]
common_tables = [item for item in tables if item in ["all"] + all_tables]
if len(common_tables) == 0:
error_printer("Incorrect table input")
raise typer.Exit()
elif "all" in common_tables:
common_tables = all_tables
info_printer(f"Tables to inlude: {common_tables}")
if not file.exists():
error_printer("File not found")
raise typer.Exit()
try:
f = open(file, 'r', encoding='utf-8')
success_printer("File Opened")
except:
error_printer("Unable to read file")
raise typer.Exit()
count = sum(1 for _ in f)
if count == 0:
error_printer("File empty")
raise typer.Exit()
data = process_population(file,cleandata)
if push:
push_population(data, mode)
else:
population_write(data)
success_printer(f"{count + 1} lines read")
f.close()
max_chunk = 0
with open(file,'r',encoding='utf-8') as f:
for line in f:
if line.strip() == os.getenv("DIVIDER"):
max_chunk = max_chunk + 1
if max_chunk > 0:
success_printer(f"{max_chunk} Chunks found")
elif max_chunk == 0:
error_printer("Unable to find Chunks")
raise typer.Exit()
if not range:
range = [0,max_chunk-2]
info_printer(f"Chunk Range: {range}")
directory = file.parent
log_file = directory / (file.stem + ".log")
if not log:
notice_printer("Log file not created")
else:
if log_file.exists():
notice_printer(f"Log file {log_file} already exists")
else:
log_file.touch()
info_printer(f"Log file {log_file} created")
current_chunk = 0
processed_line = 0
if log:
with open(log_file,'r',encoding='utf-8') as l:
log_list = l.read().splitlines()
while current_chunk < max_chunk:
if log and current_chunk in log_list: continue
processed_line, buffer = buffer_creator(file, os.getenv("DIVIDER"), processed_line, int(os.getenv("CHUNK_SIZE")))
dataframe = process_buffer_population(buffer)
print(dataframe)
if cleandata:
dataframe = dataframe.dropna()
if push:
push_population(dataframe, mode)
else:
write_population(dataframe,file)
if log:
f = open(log_file, "a")
f.write(f"\n{current_chunk}")
f.close()
current_chunk += 1
@app.command()
def network(
file: Annotated[Path, typer.Argument(help="Relative path to the file.", show_default=False)],
cleandata: bool = typer.Option(False, "--cleandata", "-cd", help="Clean the data if this flag is used."),
push: bool = typer.Option(False, "--push", "-p", help="Push the data into Database."),
push: bool = typer.Option(False, "--push", "-p", help="Push the data into Database.\nIf you want the output to be saved in [green bold].csv[/green bold] format, do not mention this flag."),
mode: Optional[DBMode] = typer.Option(None, help="Specify either 'amend' or 'drop' when pushing data", show_default=False),
):
if not file.exists():
print("File did does not exist!")
error_parser("File did does not exist!")
raise typer.Exit()
data = process_network(file,cleandata)
if push:

5
styles/__Init__.py Normal file
View File

@ -0,0 +1,5 @@
from .help import print_help
__all__ = [
'print_help',
]

38
styles/help.py Normal file
View File

@ -0,0 +1,38 @@
import sys
from rich.table import Table
from rich.console import Console
from rich.console import Group
from rich.padding import Padding
from rich.panel import Panel
def population_help():
line1 = "This CLI tool processes [yellow bold]MATSim population XML files[/yellow bold] and prepares the data for storage in either [green bold].csv[/green bold] file or in [blue bold]PostgreSQL[/blue bold] database with [blue bold]PostGIS[/blue bold] integration."
line2 = "It extracts key data such as coordinates, converting them into a format ready for geospatial.\nUse the available [underline bold]options[/underline bold] to modify the behavior, such as [underline bold]cleaning[/underline bold] the data with missing values, or choosing to either [underline bold]replace[/underline bold] or [underline bold]append[/underline bold] data in the target table."
line3 = "The resulting table structure includes columns such as the following:"
line4 = "[red bold]NOTE:[/red bold] Ensure PostgreSQL connection details are provided via a [underline bold bright_cyan].env[/underline bold bright_cyan] file.\n[red bold]NOTE:[/red bold] By default if a [underline].log[/underline] exist with the same name in the same directory of file, It will use that to prcoess the file"
table = Table("id","lon","lat","geom","time","age","sex","person_id","economic_sector","household_id","household_income")
table.add_row("1","45.89977111012078","-73.26605847316777","0101000020E61000005CED21B32BF3464013451E1A075152C0","07:00:00","4","1","1","0","1","4")
table.add_row("2","45.89977111012078","-73.26605847316777","0101000020E61000005CED21B32BF3464013451E1A075152C0","08:00:00","4","2","2","0","1","4")
lines = f"{line1} \n{line2} \n{line3}"
panel = Padding(Panel(Padding(Group(lines,Padding(table, (1,0)),line4), (1,1)), title="About",title_align="left"), (1,0,0,0))
return panel
def network_help():
test = Padding("Hello network", (1,1))
return Panel(test, title="About",title_align="left")
def metro_help():
test = Padding("Hello network", (1,1))
return Panel(test, title="About",title_align="left")
def bus_help():
test = Padding("Hello network", (1,1))
return Panel(test, title="About",title_align="left")
def print_help():
console = Console()
if "--help" in sys.argv or "-h" in sys.argv:
if (sys.argv[1] == "population"):
console.print(population_help())
elif (sys.argv[1] == "network"):
console.print(network_help())