Extract and load addressbase/mastermap using id, gml, parallel
This commit is contained in:
parent
7c3b9d222e
commit
c6b3d3d5ca
@ -12,10 +12,12 @@ data_dir=$1
|
||||
# Unzip to GML
|
||||
#
|
||||
|
||||
# find $data_dir -name '*.zip' -print0 | xargs -0 -P 4 -n 1 unzip
|
||||
find $data_dir -type f -name '*.zip' -printf "%f\n" | \
|
||||
parallel \
|
||||
unzip -u $data_dir/{} -d $data_dir
|
||||
|
||||
#
|
||||
# Extract (subset) to CSV
|
||||
# Extract to CSV
|
||||
#
|
||||
# Relevant fields:
|
||||
# WKT
|
||||
@ -26,15 +28,17 @@ data_dir=$1
|
||||
# logicalStatus: 1 (one) is approved (otherwise historical, provisional)
|
||||
#
|
||||
|
||||
# find $data_dir -type f -name '*.gml' -printf "%f\n" | \
|
||||
# parallel \
|
||||
# ogr2ogr -f CSV \
|
||||
# -select crossReference,source,uprn,parentUPRN,logicalStatus \
|
||||
# {}.csv {} BasicLandPropertyUnit \
|
||||
# -lco GEOMETRY=AS_WKT
|
||||
|
||||
rm $data_dir/*.gml
|
||||
|
||||
find $data_dir -type f -name '*.gml.csv' -printf "$data_dir%f\n" | \
|
||||
find $data_dir -type f -name '*.gml' -printf "%f\n" | \
|
||||
parallel \
|
||||
python filter_addressbase_csv.py {}
|
||||
ogr2ogr -f CSV \
|
||||
-select crossReference,source,uprn,parentUPRN,logicalStatus \
|
||||
$data_dir/{}.csv $data_dir/{} BasicLandPropertyUnit \
|
||||
-lco GEOMETRY=AS_WKT
|
||||
|
||||
#
|
||||
# Filter, grouping by TOID
|
||||
#
|
||||
|
||||
find $data_dir -type f -name '*.gml.csv' -printf "%f\n" | \
|
||||
parallel \
|
||||
python filter_addressbase_csv.py $data_dir/{}
|
||||
|
@ -1,5 +1,13 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
#
|
||||
# Extract MasterMap
|
||||
#
|
||||
|
||||
: ${1?"Usage: $0 ./path/to/mastermap/dir"}
|
||||
|
||||
data_dir=$1
|
||||
|
||||
#
|
||||
# Extract buildings from *.gz to CSV
|
||||
#
|
||||
@ -9,12 +17,14 @@
|
||||
# Use `fid` as source ID, aka TOID.
|
||||
#
|
||||
|
||||
: ${1?"Usage: $0 ./path/to/data/dir"}
|
||||
|
||||
data_dir=$1
|
||||
|
||||
find $data_dir -type f -name '*.gz' -printf "%f\n" | \
|
||||
parallel \
|
||||
gunzip $data_dir/{} -k -S gml
|
||||
|
||||
rename 's/$/.gml/' $data_dir/*[^gzt]
|
||||
|
||||
find $data_dir -type f -name '*.gml' -printf "%f\n" | \
|
||||
parallel \
|
||||
ogr2ogr \
|
||||
-select fid,descriptiveGroup \
|
||||
-f CSV $data_dir/{}.csv \
|
||||
@ -22,9 +32,4 @@ ogr2ogr \
|
||||
TopographicArea \
|
||||
-lco GEOMETRY=AS_WKT
|
||||
|
||||
# then filter
|
||||
# -where "\"descriptiveGroup='(1:Building)'\"" \
|
||||
# OR toid in addressbase_toids
|
||||
|
||||
# finally load
|
||||
# -t_srs "EPSG:3857" \
|
||||
rm $data_dir/*.gfs
|
||||
|
40
etl/0_filter_for_loading.sh
Executable file
40
etl/0_filter_for_loading.sh
Executable file
@ -0,0 +1,40 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
#
|
||||
# Filter and transform for loading
|
||||
#
|
||||
: ${1?"Usage: $0 ./path/to/addressbase/dir ./path/to/mastermap/dir"}
|
||||
: ${2?"Usage: $0 ./path/to/addressbase/dir ./path/to/mastermap/dir"}
|
||||
|
||||
addressbase_dir=$1
|
||||
mastermap_dir=$2
|
||||
|
||||
#
|
||||
# Check which TOIDs are matched against UPRNs
|
||||
#
|
||||
python check_ab_mm_match.py $addressbase_dir $mastermap_dir
|
||||
|
||||
#
|
||||
# Filter
|
||||
# - WHERE descriptiveGroup = '(1:Building)'
|
||||
# - OR toid in addressbase_toids
|
||||
#
|
||||
python filter_mastermap.py $addressbase_dir $mastermap_dir
|
||||
|
||||
#
|
||||
# Transform to 3857 (web mercator)
|
||||
#
|
||||
find $mastermap_dir -type f -name '*.filtered.csv' -printf "%f\n" | \
|
||||
parallel \
|
||||
ogr2ogr \
|
||||
-f CSV $mastermap_dir/{}.3857.csv \
|
||||
-select fid \
|
||||
-s_srs "EPSG:27700" \
|
||||
-t_srs "EPSG:3857" \
|
||||
$mastermap_dir/{} \
|
||||
-lco GEOMETRY=AS_WKT
|
||||
|
||||
#
|
||||
# Update to EWKT (with SRID indicator for loading to Postgres)
|
||||
#
|
||||
sed -i 's/^"POLYGON/"SRID=3857;POLYGON/' $mastermap_dir/*.3857.csv
|
@ -1,105 +0,0 @@
|
||||
"""Load geometries from GeoJSON to Postgres
|
||||
|
||||
- create 'geometry' record with {
|
||||
id: <polygon-guid>,
|
||||
doc: {source_id: <toid>},
|
||||
geom: <geom-wkb_hex>
|
||||
}
|
||||
- create corresponding 'building' record with {
|
||||
id: <building-guid>,
|
||||
doc: {},
|
||||
geom_id: <polygon-guid>
|
||||
}
|
||||
"""
|
||||
import glob
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
|
||||
import fiona
|
||||
import psycopg2
|
||||
from shapely.geometry import shape
|
||||
|
||||
|
||||
def main(source_dir, config_path):
|
||||
"""Load config, read files and save features to the database
|
||||
"""
|
||||
conf = read_config(config_path)
|
||||
dbconf = conf['database']
|
||||
conn = psycopg2.connect(**dbconf)
|
||||
|
||||
source_files = glob.glob("{}/*.geojson".format(source_dir))
|
||||
|
||||
loaded = {}
|
||||
for source_file in source_files:
|
||||
with fiona.open(source_file, 'r') as source:
|
||||
with conn.cursor() as cur:
|
||||
for feature in source:
|
||||
fid = feature['properties']['fid']
|
||||
if fid not in loaded:
|
||||
save_feature(cur, feature)
|
||||
loaded[fid] = True
|
||||
else:
|
||||
print("Skipping", fid)
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
|
||||
|
||||
def save_feature(cur, feature):
|
||||
"""Save a feature with geometry and source id
|
||||
"""
|
||||
cur.execute(
|
||||
"""INSERT INTO geometries
|
||||
(
|
||||
geometry_doc,
|
||||
geometry_geom
|
||||
)
|
||||
VALUES
|
||||
(
|
||||
%s::jsonb,
|
||||
ST_SetSRID(%s::geometry, %s)
|
||||
)
|
||||
RETURNING geometry_id
|
||||
""", (
|
||||
json.dumps({
|
||||
'source_id': feature['properties']['fid']
|
||||
}),
|
||||
shape(feature['geometry']).wkb_hex,
|
||||
3857
|
||||
)
|
||||
)
|
||||
geom_id, = cur.fetchone()
|
||||
cur.execute(
|
||||
"""INSERT INTO buildings
|
||||
(
|
||||
building_doc,
|
||||
geometry_id
|
||||
)
|
||||
VALUES
|
||||
(
|
||||
%s::jsonb,
|
||||
%s
|
||||
)
|
||||
""", (
|
||||
json.dumps({}),
|
||||
geom_id
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
def read_config(config_path):
|
||||
"""Read a JSON config file containing database connection details
|
||||
"""
|
||||
with open(config_path, 'r') as conf_fh:
|
||||
conf = json.load(conf_fh)
|
||||
return conf
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
if len(sys.argv) != 3:
|
||||
print("Usage: {} ./path/to/source/dir ./path/to/dbconfig.json".format(
|
||||
os.path.basename(__file__)
|
||||
))
|
||||
exit()
|
||||
main(sys.argv[1], sys.argv[2])
|
40
etl/1_load_geometries.sh
Executable file
40
etl/1_load_geometries.sh
Executable file
@ -0,0 +1,40 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
#
|
||||
# Load geometries from GeoJSON to Postgres
|
||||
# - assume postgres connection details are set in the environment using PGUSER, PGHOST etc.
|
||||
#
|
||||
: ${1?"Usage: $0 ./path/to/mastermap/dir"}
|
||||
|
||||
mastermap_dir=$1
|
||||
|
||||
#
|
||||
# Create 'geometry' record with
|
||||
# id: <polygon-guid>,
|
||||
# source_id: <toid>,
|
||||
# geom: <geom>
|
||||
#
|
||||
find $mastermap_dir -type f -name '*.3857.csv' \
|
||||
-printf "COPY geometries ( geometry_geom, source_id ) FROM '$mastermap_dir/%f' WITH CSV HEADER;\n" | \
|
||||
parallel \
|
||||
psql -c {}
|
||||
|
||||
#
|
||||
# Delete any duplicated geometries (by TOID)
|
||||
#
|
||||
psql -c "DELETE FROM geometries a USING (
|
||||
SELECT MIN(ctid) as ctid, source_id
|
||||
FROM geometries
|
||||
GROUP BY source_id
|
||||
HAVING COUNT(*) > 1
|
||||
) b
|
||||
WHERE a.source_id = b.source_id
|
||||
AND a.ctid <> b.ctid;"
|
||||
|
||||
#
|
||||
# Create corresponding 'building' record with
|
||||
# id: <building-guid>,
|
||||
# doc: {},
|
||||
# geom_id: <polygon-guid>
|
||||
#
|
||||
psql -c "INSERT INTO buildings ( geometry_id ) SELECT geometry_id from geometries;"
|
@ -11,17 +11,23 @@ csv.field_size_limit(sys.maxsize)
|
||||
|
||||
def main(ab_path, mm_path):
|
||||
ab_paths = sorted(glob.glob(os.path.join(ab_path, "*.gml.csv.filtered")))
|
||||
mm_paths = sorted(glob.glob(os.path.join(mm_path, "*.gz.csv")))
|
||||
mm_paths = sorted(glob.glob(os.path.join(mm_path, "*.gml.csv")))
|
||||
|
||||
try:
|
||||
assert len(ab_paths) == len(mm_paths)
|
||||
except AssertionError:
|
||||
print(ab_paths)
|
||||
print(mm_paths)
|
||||
|
||||
zipped_paths = zip(ab_paths, mm_paths)
|
||||
|
||||
# parallel map over tiles
|
||||
with Pool(4) as p:
|
||||
p.starmap(check, zipped_paths)
|
||||
|
||||
def check(ab_path, mm_path):
|
||||
tile = str(os.path.basename(ab_path)).split(".")[0]
|
||||
print(tile)
|
||||
output_base = os.path.dirname(ab_path)
|
||||
ab_toids = set()
|
||||
mm_toids = set()
|
||||
|
||||
@ -35,22 +41,20 @@ def check(ab_path, mm_path):
|
||||
for line in r:
|
||||
mm_toids.add(line['fid'])
|
||||
|
||||
print("MasterMap", len(mm_toids))
|
||||
print("Addressbase", len(ab_toids))
|
||||
missing = ab_toids - mm_toids
|
||||
print("in AB but not MM", len(missing))
|
||||
print(tile, "MasterMap:", len(mm_toids), "Addressbase:", len(ab_toids), "AB but not MM:", len(missing))
|
||||
|
||||
with open('missing_toids_{}.txt'.format(tile), 'w') as fh:
|
||||
with open(os.path.join(output_base, 'missing_toids_{}.txt'.format(tile)), 'w') as fh:
|
||||
for toid in missing:
|
||||
fh.write("{}\n".format(toid))
|
||||
|
||||
with open('ab_toids_{}.txt'.format(tile), 'w') as fh:
|
||||
with open(os.path.join(output_base, 'ab_toids_{}.txt'.format(tile)), 'w') as fh:
|
||||
for toid in ab_toids:
|
||||
fh.write("{}\n".format(toid))
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
if len(sys.argv) != 3:
|
||||
print("Usage: filter_addressbase_csv.py ./path/to/addressbase/dir ./path/to/mastermap/dir")
|
||||
print("Usage: check_ab_mm_match.py ./path/to/addressbase/dir ./path/to/mastermap/dir")
|
||||
exit(-1)
|
||||
main(sys.argv[1], sys.argv[2])
|
||||
|
60
etl/filter_mastermap.py
Normal file
60
etl/filter_mastermap.py
Normal file
@ -0,0 +1,60 @@
|
||||
"""Filter MasterMap to buildings and addressbase-matches
|
||||
|
||||
- WHERE descriptiveGroup = '(1:Building)'
|
||||
- OR toid in addressbase_toids
|
||||
"""
|
||||
import csv
|
||||
import glob
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
|
||||
from multiprocessing import Pool
|
||||
|
||||
csv.field_size_limit(sys.maxsize)
|
||||
|
||||
def main(ab_path, mm_path):
|
||||
mm_paths = sorted(glob.glob(os.path.join(mm_path, "*.gml.csv")))
|
||||
toid_paths = sorted(glob.glob(os.path.join(ab_path, "ab_toids_*.txt")))
|
||||
|
||||
try:
|
||||
assert len(mm_paths) == len(toid_paths)
|
||||
except AssertionError:
|
||||
print(mm_paths)
|
||||
print(toid_paths)
|
||||
zipped_paths = zip(mm_paths, toid_paths)
|
||||
|
||||
# parallel map over tiles
|
||||
with Pool(4) as p:
|
||||
p.starmap(filter, zipped_paths)
|
||||
|
||||
def filter(mm_path, toid_path):
|
||||
with open(toid_path, 'r') as fh:
|
||||
r = csv.reader(fh)
|
||||
toids = set(line[0] for line in r)
|
||||
|
||||
output_path = "{}.filtered.csv".format(str(mm_path).replace(".gml.csv", ""))
|
||||
alt_output_path = "{}.filtered_not_building.csv".format(str(mm_path).replace(".gml.csv", ""))
|
||||
output_fieldnames = ('WKT', 'fid', 'descriptiveGroup')
|
||||
with open(mm_path, 'r') as fh:
|
||||
r = csv.DictReader(fh)
|
||||
with open(output_path, 'w') as output_fh:
|
||||
w = csv.DictWriter(output_fh, fieldnames=output_fieldnames)
|
||||
w.writeheader()
|
||||
with open(alt_output_path, 'w') as alt_output_fh:
|
||||
alt_w = csv.DictWriter(alt_output_fh, fieldnames=output_fieldnames)
|
||||
alt_w.writeheader()
|
||||
for line in r:
|
||||
if 'Building' in line['descriptiveGroup']:
|
||||
w.writerow(line)
|
||||
|
||||
elif line['fid'] in toids:
|
||||
alt_w.writerow(line)
|
||||
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
if len(sys.argv) != 3:
|
||||
print("Usage: filter_mastermap.py ./path/to/addressbase/dir ./path/to/mastermap/dir")
|
||||
exit(-1)
|
||||
main(sys.argv[1], sys.argv[2])
|
Loading…
Reference in New Issue
Block a user