diff --git a/etl/0_extract_addressbase.sh b/etl/0_extract_addressbase.sh index 70949d8b..e1d87be9 100755 --- a/etl/0_extract_addressbase.sh +++ b/etl/0_extract_addressbase.sh @@ -12,10 +12,12 @@ data_dir=$1 # Unzip to GML # -# find $data_dir -name '*.zip' -print0 | xargs -0 -P 4 -n 1 unzip +find $data_dir -type f -name '*.zip' -printf "%f\n" | \ +parallel \ +unzip -u $data_dir/{} -d $data_dir # -# Extract (subset) to CSV +# Extract to CSV # # Relevant fields: # WKT @@ -26,15 +28,17 @@ data_dir=$1 # logicalStatus: 1 (one) is approved (otherwise historical, provisional) # -# find $data_dir -type f -name '*.gml' -printf "%f\n" | \ -# parallel \ -# ogr2ogr -f CSV \ -# -select crossReference,source,uprn,parentUPRN,logicalStatus \ -# {}.csv {} BasicLandPropertyUnit \ -# -lco GEOMETRY=AS_WKT - -rm $data_dir/*.gml - -find $data_dir -type f -name '*.gml.csv' -printf "$data_dir%f\n" | \ +find $data_dir -type f -name '*.gml' -printf "%f\n" | \ parallel \ -python filter_addressbase_csv.py {} +ogr2ogr -f CSV \ + -select crossReference,source,uprn,parentUPRN,logicalStatus \ + $data_dir/{}.csv $data_dir/{} BasicLandPropertyUnit \ + -lco GEOMETRY=AS_WKT + +# +# Filter, grouping by TOID +# + +find $data_dir -type f -name '*.gml.csv' -printf "%f\n" | \ +parallel \ +python filter_addressbase_csv.py $data_dir/{} diff --git a/etl/0_extract_mastermap.sh b/etl/0_extract_mastermap.sh index 38ce6771..904978a1 100755 --- a/etl/0_extract_mastermap.sh +++ b/etl/0_extract_mastermap.sh @@ -1,5 +1,13 @@ #!/usr/bin/env bash +# +# Extract MasterMap +# + +: ${1?"Usage: $0 ./path/to/mastermap/dir"} + +data_dir=$1 + # # Extract buildings from *.gz to CSV # @@ -9,12 +17,14 @@ # Use `fid` as source ID, aka TOID. # -: ${1?"Usage: $0 ./path/to/data/dir"} - -data_dir=$1 - find $data_dir -type f -name '*.gz' -printf "%f\n" | \ parallel \ +gunzip $data_dir/{} -k -S gml + +rename 's/$/.gml/' $data_dir/*[^gzt] + +find $data_dir -type f -name '*.gml' -printf "%f\n" | \ +parallel \ ogr2ogr \ -select fid,descriptiveGroup \ -f CSV $data_dir/{}.csv \ @@ -22,9 +32,4 @@ ogr2ogr \ TopographicArea \ -lco GEOMETRY=AS_WKT -# then filter -# -where "\"descriptiveGroup='(1:Building)'\"" \ -# OR toid in addressbase_toids - -# finally load -# -t_srs "EPSG:3857" \ +rm $data_dir/*.gfs diff --git a/etl/0_filter_for_loading.sh b/etl/0_filter_for_loading.sh new file mode 100755 index 00000000..d149f316 --- /dev/null +++ b/etl/0_filter_for_loading.sh @@ -0,0 +1,40 @@ +#!/usr/bin/env bash + +# +# Filter and transform for loading +# +: ${1?"Usage: $0 ./path/to/addressbase/dir ./path/to/mastermap/dir"} +: ${2?"Usage: $0 ./path/to/addressbase/dir ./path/to/mastermap/dir"} + +addressbase_dir=$1 +mastermap_dir=$2 + +# +# Check which TOIDs are matched against UPRNs +# +python check_ab_mm_match.py $addressbase_dir $mastermap_dir + +# +# Filter +# - WHERE descriptiveGroup = '(1:Building)' +# - OR toid in addressbase_toids +# +python filter_mastermap.py $addressbase_dir $mastermap_dir + +# +# Transform to 3857 (web mercator) +# +find $mastermap_dir -type f -name '*.filtered.csv' -printf "%f\n" | \ +parallel \ +ogr2ogr \ + -f CSV $mastermap_dir/{}.3857.csv \ + -select fid \ + -s_srs "EPSG:27700" \ + -t_srs "EPSG:3857" \ + $mastermap_dir/{} \ + -lco GEOMETRY=AS_WKT + +# +# Update to EWKT (with SRID indicator for loading to Postgres) +# +sed -i 's/^"POLYGON/"SRID=3857;POLYGON/' $mastermap_dir/*.3857.csv diff --git a/etl/1_load_geometries.py b/etl/1_load_geometries.py deleted file mode 100644 index 93ac196d..00000000 --- a/etl/1_load_geometries.py +++ /dev/null @@ -1,105 +0,0 @@ -"""Load geometries from GeoJSON to Postgres - - - create 'geometry' record with { - id: , - doc: {source_id: }, - geom: - } - - create corresponding 'building' record with { - id: , - doc: {}, - geom_id: - } -""" -import glob -import json -import os -import sys - -import fiona -import psycopg2 -from shapely.geometry import shape - - -def main(source_dir, config_path): - """Load config, read files and save features to the database - """ - conf = read_config(config_path) - dbconf = conf['database'] - conn = psycopg2.connect(**dbconf) - - source_files = glob.glob("{}/*.geojson".format(source_dir)) - - loaded = {} - for source_file in source_files: - with fiona.open(source_file, 'r') as source: - with conn.cursor() as cur: - for feature in source: - fid = feature['properties']['fid'] - if fid not in loaded: - save_feature(cur, feature) - loaded[fid] = True - else: - print("Skipping", fid) - conn.commit() - conn.close() - - - -def save_feature(cur, feature): - """Save a feature with geometry and source id - """ - cur.execute( - """INSERT INTO geometries - ( - geometry_doc, - geometry_geom - ) - VALUES - ( - %s::jsonb, - ST_SetSRID(%s::geometry, %s) - ) - RETURNING geometry_id - """, ( - json.dumps({ - 'source_id': feature['properties']['fid'] - }), - shape(feature['geometry']).wkb_hex, - 3857 - ) - ) - geom_id, = cur.fetchone() - cur.execute( - """INSERT INTO buildings - ( - building_doc, - geometry_id - ) - VALUES - ( - %s::jsonb, - %s - ) - """, ( - json.dumps({}), - geom_id - ) - ) - - -def read_config(config_path): - """Read a JSON config file containing database connection details - """ - with open(config_path, 'r') as conf_fh: - conf = json.load(conf_fh) - return conf - - -if __name__ == '__main__': - if len(sys.argv) != 3: - print("Usage: {} ./path/to/source/dir ./path/to/dbconfig.json".format( - os.path.basename(__file__) - )) - exit() - main(sys.argv[1], sys.argv[2]) diff --git a/etl/1_load_geometries.sh b/etl/1_load_geometries.sh new file mode 100755 index 00000000..0342e73b --- /dev/null +++ b/etl/1_load_geometries.sh @@ -0,0 +1,40 @@ +#!/usr/bin/env bash + +# +# Load geometries from GeoJSON to Postgres +# - assume postgres connection details are set in the environment using PGUSER, PGHOST etc. +# +: ${1?"Usage: $0 ./path/to/mastermap/dir"} + +mastermap_dir=$1 + +# +# Create 'geometry' record with +# id: , +# source_id: , +# geom: +# +find $mastermap_dir -type f -name '*.3857.csv' \ +-printf "COPY geometries ( geometry_geom, source_id ) FROM '$mastermap_dir/%f' WITH CSV HEADER;\n" | \ +parallel \ +psql -c {} + +# +# Delete any duplicated geometries (by TOID) +# +psql -c "DELETE FROM geometries a USING ( + SELECT MIN(ctid) as ctid, source_id + FROM geometries + GROUP BY source_id + HAVING COUNT(*) > 1 +) b +WHERE a.source_id = b.source_id +AND a.ctid <> b.ctid;" + +# +# Create corresponding 'building' record with +# id: , +# doc: {}, +# geom_id: +# +psql -c "INSERT INTO buildings ( geometry_id ) SELECT geometry_id from geometries;" diff --git a/etl/check_ab_mm_match.py b/etl/check_ab_mm_match.py index bd260b43..76c59686 100644 --- a/etl/check_ab_mm_match.py +++ b/etl/check_ab_mm_match.py @@ -11,17 +11,23 @@ csv.field_size_limit(sys.maxsize) def main(ab_path, mm_path): ab_paths = sorted(glob.glob(os.path.join(ab_path, "*.gml.csv.filtered"))) - mm_paths = sorted(glob.glob(os.path.join(mm_path, "*.gz.csv"))) + mm_paths = sorted(glob.glob(os.path.join(mm_path, "*.gml.csv"))) + + try: + assert len(ab_paths) == len(mm_paths) + except AssertionError: + print(ab_paths) + print(mm_paths) - assert len(ab_paths) == len(mm_paths) zipped_paths = zip(ab_paths, mm_paths) + # parallel map over tiles with Pool(4) as p: p.starmap(check, zipped_paths) def check(ab_path, mm_path): tile = str(os.path.basename(ab_path)).split(".")[0] - print(tile) + output_base = os.path.dirname(ab_path) ab_toids = set() mm_toids = set() @@ -35,22 +41,20 @@ def check(ab_path, mm_path): for line in r: mm_toids.add(line['fid']) - print("MasterMap", len(mm_toids)) - print("Addressbase", len(ab_toids)) missing = ab_toids - mm_toids - print("in AB but not MM", len(missing)) + print(tile, "MasterMap:", len(mm_toids), "Addressbase:", len(ab_toids), "AB but not MM:", len(missing)) - with open('missing_toids_{}.txt'.format(tile), 'w') as fh: + with open(os.path.join(output_base, 'missing_toids_{}.txt'.format(tile)), 'w') as fh: for toid in missing: fh.write("{}\n".format(toid)) - with open('ab_toids_{}.txt'.format(tile), 'w') as fh: + with open(os.path.join(output_base, 'ab_toids_{}.txt'.format(tile)), 'w') as fh: for toid in ab_toids: fh.write("{}\n".format(toid)) if __name__ == '__main__': if len(sys.argv) != 3: - print("Usage: filter_addressbase_csv.py ./path/to/addressbase/dir ./path/to/mastermap/dir") + print("Usage: check_ab_mm_match.py ./path/to/addressbase/dir ./path/to/mastermap/dir") exit(-1) main(sys.argv[1], sys.argv[2]) diff --git a/etl/filter_mastermap.py b/etl/filter_mastermap.py new file mode 100644 index 00000000..a32bde15 --- /dev/null +++ b/etl/filter_mastermap.py @@ -0,0 +1,60 @@ +"""Filter MasterMap to buildings and addressbase-matches + +- WHERE descriptiveGroup = '(1:Building)' +- OR toid in addressbase_toids +""" +import csv +import glob +import json +import os +import sys + +from multiprocessing import Pool + +csv.field_size_limit(sys.maxsize) + +def main(ab_path, mm_path): + mm_paths = sorted(glob.glob(os.path.join(mm_path, "*.gml.csv"))) + toid_paths = sorted(glob.glob(os.path.join(ab_path, "ab_toids_*.txt"))) + + try: + assert len(mm_paths) == len(toid_paths) + except AssertionError: + print(mm_paths) + print(toid_paths) + zipped_paths = zip(mm_paths, toid_paths) + + # parallel map over tiles + with Pool(4) as p: + p.starmap(filter, zipped_paths) + +def filter(mm_path, toid_path): + with open(toid_path, 'r') as fh: + r = csv.reader(fh) + toids = set(line[0] for line in r) + + output_path = "{}.filtered.csv".format(str(mm_path).replace(".gml.csv", "")) + alt_output_path = "{}.filtered_not_building.csv".format(str(mm_path).replace(".gml.csv", "")) + output_fieldnames = ('WKT', 'fid', 'descriptiveGroup') + with open(mm_path, 'r') as fh: + r = csv.DictReader(fh) + with open(output_path, 'w') as output_fh: + w = csv.DictWriter(output_fh, fieldnames=output_fieldnames) + w.writeheader() + with open(alt_output_path, 'w') as alt_output_fh: + alt_w = csv.DictWriter(alt_output_fh, fieldnames=output_fieldnames) + alt_w.writeheader() + for line in r: + if 'Building' in line['descriptiveGroup']: + w.writerow(line) + + elif line['fid'] in toids: + alt_w.writerow(line) + + + +if __name__ == '__main__': + if len(sys.argv) != 3: + print("Usage: filter_mastermap.py ./path/to/addressbase/dir ./path/to/mastermap/dir") + exit(-1) + main(sys.argv[1], sys.argv[2])