From 181e8502255ccbe28a9a4732ba1610ec22a3aa07 Mon Sep 17 00:00:00 2001 From: Tom Russell Date: Fri, 21 Sep 2018 11:10:39 +0100 Subject: [PATCH] Parallel extract/filter OS data --- .gitignore | 1 + etl/0_extract_addressbase.py | 54 ---------------------- etl/0_extract_addressbase.sh | 40 +++++++++++++++++ etl/0_extract_mastermap.sh | 30 ++++++++----- etl/check_ab_mm_match.py | 56 +++++++++++++++++++++++ etl/filter_addressbase_csv.py | 85 +++++++++++++++++++++++++++++++++++ 6 files changed, 202 insertions(+), 64 deletions(-) delete mode 100644 etl/0_extract_addressbase.py create mode 100755 etl/0_extract_addressbase.sh mode change 100644 => 100755 etl/0_extract_mastermap.sh create mode 100644 etl/check_ab_mm_match.py create mode 100755 etl/filter_addressbase_csv.py diff --git a/.gitignore b/.gitignore index e76920ba..a5744ad6 100644 --- a/.gitignore +++ b/.gitignore @@ -11,3 +11,4 @@ app/start.sh etl/cache/* etl/images/* etl/*.geojson +etl/*.txt diff --git a/etl/0_extract_addressbase.py b/etl/0_extract_addressbase.py deleted file mode 100644 index 13cfb550..00000000 --- a/etl/0_extract_addressbase.py +++ /dev/null @@ -1,54 +0,0 @@ -"""Extract address points from CSV in *.zip - -Relevant CSV columns:: -0 - UPRN -1 - TOID (may not match given version of OS MasterMap) -16 - OSGB Easting -17 - OSGB Northing -18 - Latitude -19 - Longitude -""" -import csv -import glob -import io -import os -import sys -from zipfile import ZipFile - -def main(source_dir, output_file): - with open(output_file, 'w', encoding='utf8', newline='') as fh: - w = csv.writer(fh) - w.writerow(('UPRN', 'easting', 'northing', 'lat', 'lng')) - for address in read_addresses(source_dir): - w.writerow(address) - - -def read_addresses(source_dir): - zips = glob.glob(os.path.join(source_dir, '*.zip')) - n = len(zips) - for i, zipname in enumerate(zips): - with ZipFile(zipname) as zipfile: - names = zipfile.namelist() - csvname = names[0] - print("Processing {} ({} of {})".format(csvname, i+1, n)) - - with zipfile.open(csvname) as csvfile: - fh = io.TextIOWrapper(csvfile) - r = csv.reader(fh) - for line in r: - uprn = line[0] - # toid = line[1] # skip - we do our own matching to geometries - easting = line[16] - northing = line[17] - lat = line[18] - lng = float(line[19]) - yield uprn, easting, northing, lat, lng - - -if __name__ == '__main__': - if len(sys.argv) != 3: - print("Usage: {} ./path/to/source/dir ./path/to/output/file".format( - os.path.basename(__file__) - )) - exit() - main(sys.argv[1], sys.argv[2]) diff --git a/etl/0_extract_addressbase.sh b/etl/0_extract_addressbase.sh new file mode 100755 index 00000000..70949d8b --- /dev/null +++ b/etl/0_extract_addressbase.sh @@ -0,0 +1,40 @@ +#!/usr/bin/env bash + +# +# Extract address points from OS Addressbase GML +# - as supplied in 5km tiles, zip/gz archives +# +: ${1?"Usage: $0 ./path/to/data/dir"} + +data_dir=$1 + +# +# Unzip to GML +# + +# find $data_dir -name '*.zip' -print0 | xargs -0 -P 4 -n 1 unzip + +# +# Extract (subset) to CSV +# +# Relevant fields: +# WKT +# crossReference (list of TOID/other references) +# source (list of cross-reference sources: 7666MT refers to MasterMap Topo) +# uprn +# parentUPRN +# logicalStatus: 1 (one) is approved (otherwise historical, provisional) +# + +# find $data_dir -type f -name '*.gml' -printf "%f\n" | \ +# parallel \ +# ogr2ogr -f CSV \ +# -select crossReference,source,uprn,parentUPRN,logicalStatus \ +# {}.csv {} BasicLandPropertyUnit \ +# -lco GEOMETRY=AS_WKT + +rm $data_dir/*.gml + +find $data_dir -type f -name '*.gml.csv' -printf "$data_dir%f\n" | \ +parallel \ +python filter_addressbase_csv.py {} diff --git a/etl/0_extract_mastermap.sh b/etl/0_extract_mastermap.sh old mode 100644 new mode 100755 index f6c06238..38ce6771 --- a/etl/0_extract_mastermap.sh +++ b/etl/0_extract_mastermap.sh @@ -1,20 +1,30 @@ -# Extract buildings from *.gml.gz +#!/usr/bin/env bash + +# +# Extract buildings from *.gz to CSV # # Features where:: # descriptiveGroup = '(1:Building)' # # Use `fid` as source ID, aka TOID. +# -: ${1?"Usage: $0 ./path/to/input/dir ./path/to/ouput/dir"} +: ${1?"Usage: $0 ./path/to/data/dir"} -: ${2?"Usage: $0 ./path/to/input/dir ./path/to/ouput/dir"} +data_dir=$1 -find $1 -type f -name '*.gz' -printf "%f\n" | \ +find $data_dir -type f -name '*.gz' -printf "%f\n" | \ parallel \ ogr2ogr \ - -select fid \ - -where "\"descriptiveGroup='(1:Building)'\"" \ - -t_srs "EPSG:3857" \ - -f "GeoJSON" $2/{}.geojson \ - $1/{} \ - TopographicArea + -select fid,descriptiveGroup \ + -f CSV $data_dir/{}.csv \ + $data_dir/{} \ + TopographicArea \ + -lco GEOMETRY=AS_WKT + +# then filter +# -where "\"descriptiveGroup='(1:Building)'\"" \ +# OR toid in addressbase_toids + +# finally load +# -t_srs "EPSG:3857" \ diff --git a/etl/check_ab_mm_match.py b/etl/check_ab_mm_match.py new file mode 100644 index 00000000..bd260b43 --- /dev/null +++ b/etl/check_ab_mm_match.py @@ -0,0 +1,56 @@ +"""Check if AddressBase TOIDs will match MasterMap +""" +import csv +import glob +import os +import sys + +from multiprocessing import Pool + +csv.field_size_limit(sys.maxsize) + +def main(ab_path, mm_path): + ab_paths = sorted(glob.glob(os.path.join(ab_path, "*.gml.csv.filtered"))) + mm_paths = sorted(glob.glob(os.path.join(mm_path, "*.gz.csv"))) + + assert len(ab_paths) == len(mm_paths) + zipped_paths = zip(ab_paths, mm_paths) + + with Pool(4) as p: + p.starmap(check, zipped_paths) + +def check(ab_path, mm_path): + tile = str(os.path.basename(ab_path)).split(".")[0] + print(tile) + ab_toids = set() + mm_toids = set() + + with open(ab_path, 'r') as fh: + r = csv.DictReader(fh) + for line in r: + ab_toids.add(line['toid']) + + with open(mm_path, 'r') as fh: + r = csv.DictReader(fh) + for line in r: + mm_toids.add(line['fid']) + + print("MasterMap", len(mm_toids)) + print("Addressbase", len(ab_toids)) + missing = ab_toids - mm_toids + print("in AB but not MM", len(missing)) + + with open('missing_toids_{}.txt'.format(tile), 'w') as fh: + for toid in missing: + fh.write("{}\n".format(toid)) + + with open('ab_toids_{}.txt'.format(tile), 'w') as fh: + for toid in ab_toids: + fh.write("{}\n".format(toid)) + + +if __name__ == '__main__': + if len(sys.argv) != 3: + print("Usage: filter_addressbase_csv.py ./path/to/addressbase/dir ./path/to/mastermap/dir") + exit(-1) + main(sys.argv[1], sys.argv[2]) diff --git a/etl/filter_addressbase_csv.py b/etl/filter_addressbase_csv.py new file mode 100755 index 00000000..ce70b9c6 --- /dev/null +++ b/etl/filter_addressbase_csv.py @@ -0,0 +1,85 @@ +#!/usr/bin/env python +"""Read ogr2ogr-converted CSV, filter to get OSMM TOID reference, only active addresses +""" +import csv +import json +import sys + +from collections import defaultdict + +def main(input_path): + output_path = "{}.filtered".format(input_path) + fieldnames = ( + 'toid', 'uprn', 'wkt', 'uprn_relations' + ) + by_toid = defaultdict(list) + + with open(input_path) as input_fh: + r = csv.DictReader(input_fh) + for line in r: + if line['logicalStatus'] != "1": + continue + + refs = eval(line['crossReference']) + sources = eval(line['source']) + toid = "" + for ref, source in zip(refs, sources): + if source == "7666MT": + toid = ref + + by_toid[toid].append({ + 'uprn': line['uprn'], + 'parent': line['parentUPRN'], + 'wkt': line['WKT'] + }) + + with open(output_path, 'w') as output_fh: + w = csv.DictWriter(output_fh, fieldnames=fieldnames) + w.writeheader() + for toid, uprns in by_toid.items(): + if toid == "": + print(len(uprns), "not matched") + continue + if len(uprns) == 1: + # if there's only one, pick that as the 'primary' uprn for the toid + uprn = uprns[0]['uprn'] + else: + # else try picking a top-level match (i.e. uprn with no parent) + orphans = set(u['uprn'] for u in uprns if not u['parent']) + if orphans: + uprn = orphans.pop() + # else climb to a root of the current tree (forest?) + else: + uprn_tree = {} + for u in uprns: + uprn_tree[u['uprn']] = u['parent'] + + uprn = uprns[0]['uprn'] + while True: + if uprn in uprn_tree and uprn_tree[uprn]: + uprn = uprn_tree[uprn] + else: + break + + # pick out wkt + wkt = '' + for item in uprns: + if item['uprn'] == uprn: + wkt = item['wkt'] + + w.writerow({ + 'toid': toid, + 'wkt': wkt, + 'uprn': uprn, + 'uprn_relations': json.dumps([{ + 'uprn': u['uprn'], + 'parent': u['parent'] + } for u in uprns]) + }) + + +if __name__ == '__main__': + if len(sys.argv) != 2: + print("Usage: filter_addressbase_csv.py ./path/to/data.csv") + exit(-1) + main(sys.argv[1])