diff --git a/etl/load_uprns.py b/etl/load_uprns.py old mode 100644 new mode 100755 index 2e47d792..3552efcb --- a/etl/load_uprns.py +++ b/etl/load_uprns.py @@ -1,130 +1,78 @@ +#!/usr/bin/env python """Load buildings from CSV to Postgres - - - create 'building' record with { - geometry_id: , - all_uprns: [, ...], - uprn: - } +- update 'building' record with { + all_uprns: [, ...], + uprn: +} """ import csv +import glob import json import os import sys +from multiprocessing import Pool + import psycopg2 -def main(source_file, config_path): - """Load config, read files and save features to the database +def main(addressbase_dir): + """Read files and save features to the database """ - conf = read_config(config_path) - dbconf = conf['database'] - conn = psycopg2.connect(**dbconf) + ab_paths = list(glob.glob(os.path.join(addressbase_dir, "*.gml.csv.filtered"))) + # parallel map over tiles + with Pool() as p: + p.map(load_file, ab_paths) + + +def load_file(source_file): + """Load UPRN data from CSVs + """ + config = { + 'host': os.environ['PGHOST'], + 'port': os.environ['PGPORT'], + 'dbname': os.environ['PGDATABASE'], + 'user': os.environ['PGUSER'], + 'password': os.environ['PGPASSWORD'], + } + conn = psycopg2.connect(**config) with conn.cursor() as cur: with open(source_file, 'r') as source_fh: reader = csv.reader(source_fh) next(reader) - for uprn, _, _, lat, lng in reader: - geometry_id = find_geom(cur, float(lat), float(lng)) - if geometry_id is not None: - save_building(cur, int(uprn), geometry_id) - else: - print("No geometry for", uprn) + for toid, uprn, wkt, uprn_relations in reader: + save_building(cur, int(uprn), toid, json.loads(uprn_relations)) conn.commit() conn.close() -def find_geom(cur, lat, lng): - """Find a building geometry - """ - cur.execute( - """SELECT geometry_id FROM geometries - WHERE - ST_Within( - ST_Transform( - ST_SetSRID(ST_Point(%s, %s), 4326), - 3857 - ), - geometry_geom - ) - """, ( - lng, - lat - ) - ) - result = cur.fetchone() - if result is not None: - id_, = result - return id_ - else: - return result - - -def save_building(cur, uprn, geometry_id): +def save_building(cur, uprn, toid, uprn_relations): """Save a building """ cur.execute( - """SELECT building_id FROM buildings - WHERE - geometry_id = %s + """UPDATE buildings + SET uprn = %s, building_doc = %s::jsonb + WHERE geometry_id = ( + SELECT geometry_id FROM geometries + WHERE + source_id = %s + ) """, ( - geometry_id, + uprn, + json.dumps({ + 'uprn_relations': uprn_relations + }), + toid ) ) - building = cur.fetchone() - if building is None: - cur.execute( - """INSERT INTO buildings - ( - building_doc, - geometry_id - ) - VALUES - ( - %s::jsonb, - %s - ) - """, ( - json.dumps({ - 'uprns': [uprn] - }), - geometry_id - ) - ) - else: - building_id = building[0] - cur.execute( - """UPDATE buildings - SET - building_doc = jsonb_insert( - building_doc, - '{uprns, -1}', -- insert at end of 'uprns' array - '%s'::jsonb, - true -- insert after location - ) - WHERE - building_id = %s - """, ( - uprn, - building_id - ) - ) - - -def read_config(config_path): - """Read a JSON config file containing database connection details - """ - with open(config_path, 'r') as fh: - conf = json.load(fh) - return conf if __name__ == '__main__': if len(sys.argv) != 2: - print("Usage: {} ./path/to/source/file.csv".format( + print("Usage: {} ./path/to/addressbase_dir/".format( os.path.basename(__file__) )) exit() - main(sys.argv[1], sys.argv[2]) + main(sys.argv[1])