Update etl to load UPRNs to table

This commit is contained in:
Tom Russell 2018-10-02 21:12:46 +01:00
parent fe8e6f7737
commit 4696e3e079
15 changed files with 137 additions and 157 deletions

View File

@ -29,7 +29,7 @@ create_building_records.sh
# add UPRNs where they match # add UPRNs where they match
load_uprns.py ./addressbase_dir load_uprns.py ./addressbase_dir
# index building records # index building records
psql < ../migrations/002.index-buildings.sql psql < ../migrations/003.index-buildings.sql
``` ```
To help test the Colouring London application, `get_test_polygons.py` will attempt to save a To help test the Colouring London application, `get_test_polygons.py` will attempt to save a

View File

@ -10,7 +10,7 @@ from multiprocessing import Pool
csv.field_size_limit(sys.maxsize) csv.field_size_limit(sys.maxsize)
def main(ab_path, mm_path): def main(ab_path, mm_path):
ab_paths = sorted(glob.glob(os.path.join(ab_path, "*.gml.csv.filtered"))) ab_paths = sorted(glob.glob(os.path.join(ab_path, "*.gml.csv.filtered.csv")))
mm_paths = sorted(glob.glob(os.path.join(mm_path, "*.gml.csv"))) mm_paths = sorted(glob.glob(os.path.join(mm_path, "*.gml.csv")))
try: try:

View File

@ -6,4 +6,4 @@
# doc: {}, # doc: {},
# geom_id: <polygon-guid> # geom_id: <polygon-guid>
# #
psql -c "INSERT INTO buildings ( geometry_id ) SELECT geometry_id from geometries;" psql -c "INSERT INTO buildings ( geometry_id, ref_toid ) SELECT geometry_id, source_id from geometries;"

View File

@ -36,9 +36,28 @@ ogr2ogr -f CSV \
-lco GEOMETRY=AS_WKT -lco GEOMETRY=AS_WKT
# #
# Filter, grouping by TOID # Filter
# #
find $data_dir -type f -name '*.gml.csv' -printf "%f\n" | \ find $data_dir -type f -name '*.gml.csv' -printf "%f\n" | \
parallel \ parallel \
python filter_addressbase_csv.py $data_dir/{} python filter_addressbase_csv.py $data_dir/{}
#
# Transform to 3857 (web mercator)
#
find $data_dir -type f -name '*.filtered.csv' -printf "%f\n" | \
parallel \
ogr2ogr \
-f CSV $data_dir/{}.3857.csv \
-s_srs "EPSG:4326" \
-t_srs "EPSG:3857" \
$data_dir/{} \
-lco GEOMETRY=AS_WKT
#
# Update to EWKT (with SRID indicator for loading to Postgres)
#
find $data_dir -type f -name '*.3857.csv' -printf "%f\n" | \
parallel \
cat $data_dir/{} "|" sed "'s/^\"POINT/\"SRID=3857;POINT/'" "|" cut -f 1,3,4,5 -d "','" ">" $data_dir/{}.loadable

View File

@ -23,7 +23,7 @@ find $data_dir -type f -name '*.gz' -printf "%f\n" | \
parallel \ parallel \
gunzip $data_dir/{} -k -S gml gunzip $data_dir/{} -k -S gml
rename 's/$/.gml/' $data_dir/*[^gzt] rename 's/$/.gml/' $data_dir/*[^gzvt]
find $data_dir -type f -name '*.gml' -printf "%f\n" | \ find $data_dir -type f -name '*.gml' -printf "%f\n" | \
parallel \ parallel \

View File

@ -5,76 +5,33 @@ import csv
import json import json
import sys import sys
from collections import defaultdict
def main(input_path): def main(input_path):
output_path = "{}.filtered".format(input_path) output_path = "{}.filtered.csv".format(input_path)
fieldnames = ( fieldnames = (
'toid', 'uprn', 'wkt', 'uprn_relations' 'wkt', 'toid', 'uprn', 'parent_uprn'
) )
by_toid = defaultdict(list)
with open(input_path) as input_fh: with open(input_path) as input_fh:
with open(output_path, 'w') as output_fh:
w = csv.DictWriter(output_fh, fieldnames=fieldnames)
w.writeheader()
r = csv.DictReader(input_fh) r = csv.DictReader(input_fh)
for line in r: for line in r:
if line['logicalStatus'] != "1": if line['logicalStatus'] != "1":
continue continue
refs = eval(line['crossReference']) refs = json.loads(line['crossReference'])
sources = eval(line['source']) sources = json.loads(line['source'])
toid = "" toid = ""
for ref, source in zip(refs, sources): for ref, source in zip(refs, sources):
if source == "7666MT": if source == "7666MT":
toid = ref toid = ref
by_toid[toid].append({
'uprn': line['uprn'],
'parent': line['parentUPRN'],
'wkt': line['WKT']
})
with open(output_path, 'w') as output_fh:
w = csv.DictWriter(output_fh, fieldnames=fieldnames)
w.writeheader()
for toid, uprns in by_toid.items():
if toid == "":
print(len(uprns), "not matched")
continue
if len(uprns) == 1:
# if there's only one, pick that as the 'primary' uprn for the toid
uprn = uprns[0]['uprn']
else:
# else try picking a top-level match (i.e. uprn with no parent)
orphans = set(u['uprn'] for u in uprns if not u['parent'])
if orphans:
uprn = orphans.pop()
# else climb to a root of the current tree (forest?)
else:
uprn_tree = {}
for u in uprns:
uprn_tree[u['uprn']] = u['parent']
uprn = uprns[0]['uprn']
while True:
if uprn in uprn_tree and uprn_tree[uprn]:
uprn = uprn_tree[uprn]
else:
break
# pick out wkt
wkt = ''
for item in uprns:
if item['uprn'] == uprn:
wkt = item['wkt']
w.writerow({ w.writerow({
'uprn': line['uprn'],
'parent_uprn': line['parentUPRN'],
'toid': toid, 'toid': toid,
'wkt': wkt, 'wkt': line['WKT'],
'uprn': uprn,
'uprn_relations': json.dumps([{
'uprn': u['uprn'],
'parent': u['parent']
} for u in uprns])
}) })

View File

@ -1,6 +1,6 @@
"""Filter MasterMap to buildings and addressbase-matches """Filter MasterMap to buildings and addressbase-matches
- WHERE descriptiveGroup = '(1:Building)' - WHERE descriptiveGroup includes 'Building'
- OR toid in addressbase_toids - OR toid in addressbase_toids
""" """
import csv import csv

View File

@ -37,4 +37,10 @@ ogr2ogr \
# #
# Update to EWKT (with SRID indicator for loading to Postgres) # Update to EWKT (with SRID indicator for loading to Postgres)
# #
sed -i 's/^"POLYGON/"SRID=3857;POLYGON/' $mastermap_dir/*.3857.csv find $mastermap_dir -type f -name '*.3857.csv' -printf "%f\n" | \
parallel \
sed -i 's/^"POLYGON/"SRID=3857;POLYGON/' $mastermap_dir/{}
find $mastermap_dir -type f -name '*.3857.csv' -printf "%f\n" | \
parallel \
sed -i 's/^"MULTIPOLYGON/"SRID=3857;MULTIPOLYGON/' $mastermap_dir/{}

View File

@ -17,7 +17,7 @@ mastermap_dir=$1
find $mastermap_dir -type f -name '*.3857.csv' \ find $mastermap_dir -type f -name '*.3857.csv' \
-printf "$mastermap_dir/%f\n" | \ -printf "$mastermap_dir/%f\n" | \
parallel \ parallel \
cat {} | psql -c "COPY geometries ( geometry_geom, source_id ) FROM stdin WITH CSV HEADER;" cat {} '|' psql -c "\"COPY geometries ( geometry_geom, source_id ) FROM stdin WITH CSV HEADER;\""
# #
# Delete any duplicated geometries (by TOID) # Delete any duplicated geometries (by TOID)

View File

@ -1,78 +0,0 @@
#!/usr/bin/env python
"""Load buildings from CSV to Postgres
- update 'building' record with {
all_uprns: [<uprn>, ...],
uprn: <min_uprn>
}
"""
import csv
import glob
import json
import os
import sys
from multiprocessing import Pool
import psycopg2
def main(addressbase_dir):
"""Read files and save features to the database
"""
ab_paths = list(glob.glob(os.path.join(addressbase_dir, "*.gml.csv.filtered")))
# parallel map over tiles
with Pool() as p:
p.map(load_file, ab_paths)
def load_file(source_file):
"""Load UPRN data from CSVs
"""
config = {
'host': os.environ['PGHOST'],
'port': os.environ['PGPORT'],
'dbname': os.environ['PGDATABASE'],
'user': os.environ['PGUSER'],
'password': os.environ['PGPASSWORD'],
}
conn = psycopg2.connect(**config)
with conn.cursor() as cur:
with open(source_file, 'r') as source_fh:
reader = csv.reader(source_fh)
next(reader)
for toid, uprn, wkt, uprn_relations in reader:
save_building(cur, int(uprn), toid, json.loads(uprn_relations))
conn.commit()
conn.close()
def save_building(cur, uprn, toid, uprn_relations):
"""Save a building
"""
cur.execute(
"""UPDATE buildings
SET uprn = %s, building_doc = %s::jsonb
WHERE geometry_id = (
SELECT geometry_id FROM geometries
WHERE
source_id = %s
)
""", (
uprn,
json.dumps({
'uprn_relations': uprn_relations
}),
toid
)
)
if __name__ == '__main__':
if len(sys.argv) != 2:
print("Usage: {} ./path/to/addressbase_dir/".format(
os.path.basename(__file__)
))
exit()
main(sys.argv[1])

32
etl/load_uprns.sh Executable file
View File

@ -0,0 +1,32 @@
#!/usr/bin/env bash
#
# Load UPRNS from CSV to Postgres
# - assume postgres connection details are set in the environment using PGUSER, PGHOST etc.
#
: ${1?"Usage: $0 ./path/to/addressbase/dir"}
data_dir=$1
#
# Create 'building_properties' record with
# uprn: <uprn>,
# parent_uprn: <parent_uprn>,
# toid: <toid>,
# uprn_geom: <point>
#
# find $data_dir -type f -name '*.3857.csv.loadable' \
# -printf "$data_dir/%f\n" | \
# parallel \
# cat {} '|' psql -c "\"COPY building_properties ( uprn_geom, toid, uprn, parent_uprn ) FROM stdin WITH CSV HEADER;\""
#
# Create references
#
psql -c "UPDATE building_properties
SET building_id = (
SELECT b.building_id
FROM buildings as b
WHERE
building_properties.toid = b.ref_toid
);"

29
etl/run_all.sh Executable file
View File

@ -0,0 +1,29 @@
#!/usr/bin/env bash
#
# Filter and transform for loading
#
: ${1?"Usage: $0 ./path/to/addressbase/dir ./path/to/mastermap/dir ./path/to/boundary"}
: ${2?"Usage: $0 ./path/to/addressbase/dir ./path/to/mastermap/dir ./path/to/boundary"}
: ${3?"Usage: $0 ./path/to/addressbase/dir ./path/to/mastermap/dir ./path/to/boundary"}
addressbase_dir=$1
mastermap_dir=$2
boundary_file=$3
script_dir=${0%/*}
# extract both datasets
$script_dir/extract_addressbase.sh $addressbase_dir
$script_dir/extract_mastermap.sh $mastermap_dir $boundary_file
# filter mastermap ('building' polygons and any others referenced by addressbase)
$script_dir/filter_transform_mastermap_for_loading.sh $addressbase_dir $mastermap_dir
# load all building outlines
$script_dir/load_geometries.sh $mastermap_dir
# index geometries (should be faster after loading)
psql < $script_dir/../migrations/002.index-geometries.up.sql
# create a building record per outline
$script_dir/create_building_records.sh
# add UPRNs where they match
$script_dir/load_uprns.sh $addressbase_dir
# index building records
psql < $script_dir/../migrations/003.index-buildings.up.sql

13
etl/run_clean.sh Executable file
View File

@ -0,0 +1,13 @@
#!/usr/bin/env bash
#
# Filter and transform for loading
#
: ${1?"Usage: $0 ./path/to/addressbase/dir ./path/to/mastermap/dir"}
: ${2?"Usage: $0 ./path/to/addressbase/dir ./path/to/mastermap/dir"}
addressbase_dir=$1
mastermap_dir=$2
rm -f $addressbase_dir/*.{csv,gml,txt,filtered,gfs}
rm -f $mastermap_dir/*.{csv,gml,txt,filtered,gfs}

View File

@ -9,7 +9,7 @@ CREATE TABLE IF NOT EXISTS geometries (
-- cross-reference to data source id -- cross-reference to data source id
source_id varchar(30), source_id varchar(30),
-- geometry as EPSG:3857 avoiding reprojection for tiles -- geometry as EPSG:3857 avoiding reprojection for tiles
geometry_geom geometry(POLYGON, 3857) geometry_geom geometry(GEOMETRY, 3857)
); );
-- --
@ -44,7 +44,9 @@ CREATE TABLE IF NOT EXISTS building_properties (
-- Building ID may be null for failed matches -- Building ID may be null for failed matches
building_id integer REFERENCES buildings, building_id integer REFERENCES buildings,
-- TOID match provided by AddressBase -- TOID match provided by AddressBase
toid varchar toid varchar,
-- Geometry (for verification if loaded, not for public access)
uprn_geom geometry(POINT, 3857)
); );
-- --

View File

@ -1,13 +1,13 @@
-- Create building indexes after bulk loading -- Create building indexes after bulk loading
-- Building index over UPRNs (given a building, find UPRNs) -- Building index over UPRNs (given a building, find UPRNs)
CREATE INDEX uprn_building_idx ON building_properties ( building_id ); CREATE INDEX IF NOT EXISTS uprn_building_idx ON building_properties ( building_id );
-- UPRN index (given a UPRN, find buildings or parents) -- UPRN index (given a UPRN, find buildings or parents)
CREATE INDEX uprn_uprn_idx ON building_properties ( uprn ); CREATE INDEX IF NOT EXISTS uprn_uprn_idx ON building_properties ( uprn );
-- Parent index over UPRNs (given a UPRN, find children) -- Parent index over UPRNs (given a UPRN, find children)
CREATE INDEX uprn_parent_idx ON building_properties ( parent_uprn ); CREATE INDEX IF NOT EXISTS uprn_parent_idx ON building_properties ( parent_uprn );
-- TOID index over buildings -- TOID index over buildings
CREATE INDEX building_toid_idx ON buildings ( ref_toid ); CREATE INDEX IF NOT EXISTS building_toid_idx ON buildings ( ref_toid );