Parallel extract/filter OS data

This commit is contained in:
Tom Russell 2018-09-21 11:10:39 +01:00
parent e774ed1693
commit 181e850225
6 changed files with 202 additions and 64 deletions

1
.gitignore vendored
View File

@ -11,3 +11,4 @@ app/start.sh
etl/cache/* etl/cache/*
etl/images/* etl/images/*
etl/*.geojson etl/*.geojson
etl/*.txt

View File

@ -1,54 +0,0 @@
"""Extract address points from CSV in *.zip
Relevant CSV columns::
0 - UPRN
1 - TOID (may not match given version of OS MasterMap)
16 - OSGB Easting
17 - OSGB Northing
18 - Latitude
19 - Longitude
"""
import csv
import glob
import io
import os
import sys
from zipfile import ZipFile
def main(source_dir, output_file):
with open(output_file, 'w', encoding='utf8', newline='') as fh:
w = csv.writer(fh)
w.writerow(('UPRN', 'easting', 'northing', 'lat', 'lng'))
for address in read_addresses(source_dir):
w.writerow(address)
def read_addresses(source_dir):
zips = glob.glob(os.path.join(source_dir, '*.zip'))
n = len(zips)
for i, zipname in enumerate(zips):
with ZipFile(zipname) as zipfile:
names = zipfile.namelist()
csvname = names[0]
print("Processing {} ({} of {})".format(csvname, i+1, n))
with zipfile.open(csvname) as csvfile:
fh = io.TextIOWrapper(csvfile)
r = csv.reader(fh)
for line in r:
uprn = line[0]
# toid = line[1] # skip - we do our own matching to geometries
easting = line[16]
northing = line[17]
lat = line[18]
lng = float(line[19])
yield uprn, easting, northing, lat, lng
if __name__ == '__main__':
if len(sys.argv) != 3:
print("Usage: {} ./path/to/source/dir ./path/to/output/file".format(
os.path.basename(__file__)
))
exit()
main(sys.argv[1], sys.argv[2])

40
etl/0_extract_addressbase.sh Executable file
View File

@ -0,0 +1,40 @@
#!/usr/bin/env bash
#
# Extract address points from OS Addressbase GML
# - as supplied in 5km tiles, zip/gz archives
#
: ${1?"Usage: $0 ./path/to/data/dir"}
data_dir=$1
#
# Unzip to GML
#
# find $data_dir -name '*.zip' -print0 | xargs -0 -P 4 -n 1 unzip
#
# Extract (subset) to CSV
#
# Relevant fields:
# WKT
# crossReference (list of TOID/other references)
# source (list of cross-reference sources: 7666MT refers to MasterMap Topo)
# uprn
# parentUPRN
# logicalStatus: 1 (one) is approved (otherwise historical, provisional)
#
# find $data_dir -type f -name '*.gml' -printf "%f\n" | \
# parallel \
# ogr2ogr -f CSV \
# -select crossReference,source,uprn,parentUPRN,logicalStatus \
# {}.csv {} BasicLandPropertyUnit \
# -lco GEOMETRY=AS_WKT
rm $data_dir/*.gml
find $data_dir -type f -name '*.gml.csv' -printf "$data_dir%f\n" | \
parallel \
python filter_addressbase_csv.py {}

30
etl/0_extract_mastermap.sh Normal file → Executable file
View File

@ -1,20 +1,30 @@
# Extract buildings from *.gml.gz #!/usr/bin/env bash
#
# Extract buildings from *.gz to CSV
# #
# Features where:: # Features where::
# descriptiveGroup = '(1:Building)' # descriptiveGroup = '(1:Building)'
# #
# Use `fid` as source ID, aka TOID. # Use `fid` as source ID, aka TOID.
#
: ${1?"Usage: $0 ./path/to/input/dir ./path/to/ouput/dir"} : ${1?"Usage: $0 ./path/to/data/dir"}
: ${2?"Usage: $0 ./path/to/input/dir ./path/to/ouput/dir"} data_dir=$1
find $1 -type f -name '*.gz' -printf "%f\n" | \ find $data_dir -type f -name '*.gz' -printf "%f\n" | \
parallel \ parallel \
ogr2ogr \ ogr2ogr \
-select fid \ -select fid,descriptiveGroup \
-where "\"descriptiveGroup='(1:Building)'\"" \ -f CSV $data_dir/{}.csv \
-t_srs "EPSG:3857" \ $data_dir/{} \
-f "GeoJSON" $2/{}.geojson \ TopographicArea \
$1/{} \ -lco GEOMETRY=AS_WKT
TopographicArea
# then filter
# -where "\"descriptiveGroup='(1:Building)'\"" \
# OR toid in addressbase_toids
# finally load
# -t_srs "EPSG:3857" \

56
etl/check_ab_mm_match.py Normal file
View File

@ -0,0 +1,56 @@
"""Check if AddressBase TOIDs will match MasterMap
"""
import csv
import glob
import os
import sys
from multiprocessing import Pool
csv.field_size_limit(sys.maxsize)
def main(ab_path, mm_path):
ab_paths = sorted(glob.glob(os.path.join(ab_path, "*.gml.csv.filtered")))
mm_paths = sorted(glob.glob(os.path.join(mm_path, "*.gz.csv")))
assert len(ab_paths) == len(mm_paths)
zipped_paths = zip(ab_paths, mm_paths)
with Pool(4) as p:
p.starmap(check, zipped_paths)
def check(ab_path, mm_path):
tile = str(os.path.basename(ab_path)).split(".")[0]
print(tile)
ab_toids = set()
mm_toids = set()
with open(ab_path, 'r') as fh:
r = csv.DictReader(fh)
for line in r:
ab_toids.add(line['toid'])
with open(mm_path, 'r') as fh:
r = csv.DictReader(fh)
for line in r:
mm_toids.add(line['fid'])
print("MasterMap", len(mm_toids))
print("Addressbase", len(ab_toids))
missing = ab_toids - mm_toids
print("in AB but not MM", len(missing))
with open('missing_toids_{}.txt'.format(tile), 'w') as fh:
for toid in missing:
fh.write("{}\n".format(toid))
with open('ab_toids_{}.txt'.format(tile), 'w') as fh:
for toid in ab_toids:
fh.write("{}\n".format(toid))
if __name__ == '__main__':
if len(sys.argv) != 3:
print("Usage: filter_addressbase_csv.py ./path/to/addressbase/dir ./path/to/mastermap/dir")
exit(-1)
main(sys.argv[1], sys.argv[2])

85
etl/filter_addressbase_csv.py Executable file
View File

@ -0,0 +1,85 @@
#!/usr/bin/env python
"""Read ogr2ogr-converted CSV, filter to get OSMM TOID reference, only active addresses
"""
import csv
import json
import sys
from collections import defaultdict
def main(input_path):
output_path = "{}.filtered".format(input_path)
fieldnames = (
'toid', 'uprn', 'wkt', 'uprn_relations'
)
by_toid = defaultdict(list)
with open(input_path) as input_fh:
r = csv.DictReader(input_fh)
for line in r:
if line['logicalStatus'] != "1":
continue
refs = eval(line['crossReference'])
sources = eval(line['source'])
toid = ""
for ref, source in zip(refs, sources):
if source == "7666MT":
toid = ref
by_toid[toid].append({
'uprn': line['uprn'],
'parent': line['parentUPRN'],
'wkt': line['WKT']
})
with open(output_path, 'w') as output_fh:
w = csv.DictWriter(output_fh, fieldnames=fieldnames)
w.writeheader()
for toid, uprns in by_toid.items():
if toid == "":
print(len(uprns), "not matched")
continue
if len(uprns) == 1:
# if there's only one, pick that as the 'primary' uprn for the toid
uprn = uprns[0]['uprn']
else:
# else try picking a top-level match (i.e. uprn with no parent)
orphans = set(u['uprn'] for u in uprns if not u['parent'])
if orphans:
uprn = orphans.pop()
# else climb to a root of the current tree (forest?)
else:
uprn_tree = {}
for u in uprns:
uprn_tree[u['uprn']] = u['parent']
uprn = uprns[0]['uprn']
while True:
if uprn in uprn_tree and uprn_tree[uprn]:
uprn = uprn_tree[uprn]
else:
break
# pick out wkt
wkt = ''
for item in uprns:
if item['uprn'] == uprn:
wkt = item['wkt']
w.writerow({
'toid': toid,
'wkt': wkt,
'uprn': uprn,
'uprn_relations': json.dumps([{
'uprn': u['uprn'],
'parent': u['parent']
} for u in uprns])
})
if __name__ == '__main__':
if len(sys.argv) != 2:
print("Usage: filter_addressbase_csv.py ./path/to/data.csv")
exit(-1)
main(sys.argv[1])