Update UPRN-load script
This commit is contained in:
parent
d9797385d9
commit
342167f9c9
140
etl/load_uprns.py
Normal file → Executable file
140
etl/load_uprns.py
Normal file → Executable file
@ -1,130 +1,78 @@
|
|||||||
|
#!/usr/bin/env python
|
||||||
"""Load buildings from CSV to Postgres
|
"""Load buildings from CSV to Postgres
|
||||||
|
|
||||||
|
- update 'building' record with {
|
||||||
- create 'building' record with {
|
all_uprns: [<uprn>, ...],
|
||||||
geometry_id: <polygon-guid>,
|
uprn: <min_uprn>
|
||||||
all_uprns: [<uprn>, ...],
|
}
|
||||||
uprn: <min_uprn>
|
|
||||||
}
|
|
||||||
"""
|
"""
|
||||||
import csv
|
import csv
|
||||||
|
import glob
|
||||||
import json
|
import json
|
||||||
import os
|
import os
|
||||||
import sys
|
import sys
|
||||||
|
|
||||||
|
from multiprocessing import Pool
|
||||||
|
|
||||||
import psycopg2
|
import psycopg2
|
||||||
|
|
||||||
|
|
||||||
def main(source_file, config_path):
|
def main(addressbase_dir):
|
||||||
"""Load config, read files and save features to the database
|
"""Read files and save features to the database
|
||||||
"""
|
"""
|
||||||
conf = read_config(config_path)
|
ab_paths = list(glob.glob(os.path.join(addressbase_dir, "*.gml.csv.filtered")))
|
||||||
dbconf = conf['database']
|
|
||||||
conn = psycopg2.connect(**dbconf)
|
|
||||||
|
|
||||||
|
# parallel map over tiles
|
||||||
|
with Pool() as p:
|
||||||
|
p.map(load_file, ab_paths)
|
||||||
|
|
||||||
|
|
||||||
|
def load_file(source_file):
|
||||||
|
"""Load UPRN data from CSVs
|
||||||
|
"""
|
||||||
|
config = {
|
||||||
|
'host': os.environ['PGHOST'],
|
||||||
|
'port': os.environ['PGPORT'],
|
||||||
|
'dbname': os.environ['PGDATABASE'],
|
||||||
|
'user': os.environ['PGUSER'],
|
||||||
|
'password': os.environ['PGPASSWORD'],
|
||||||
|
}
|
||||||
|
conn = psycopg2.connect(**config)
|
||||||
with conn.cursor() as cur:
|
with conn.cursor() as cur:
|
||||||
with open(source_file, 'r') as source_fh:
|
with open(source_file, 'r') as source_fh:
|
||||||
reader = csv.reader(source_fh)
|
reader = csv.reader(source_fh)
|
||||||
next(reader)
|
next(reader)
|
||||||
for uprn, _, _, lat, lng in reader:
|
for toid, uprn, wkt, uprn_relations in reader:
|
||||||
geometry_id = find_geom(cur, float(lat), float(lng))
|
save_building(cur, int(uprn), toid, json.loads(uprn_relations))
|
||||||
if geometry_id is not None:
|
|
||||||
save_building(cur, int(uprn), geometry_id)
|
|
||||||
else:
|
|
||||||
print("No geometry for", uprn)
|
|
||||||
conn.commit()
|
conn.commit()
|
||||||
conn.close()
|
conn.close()
|
||||||
|
|
||||||
|
|
||||||
def find_geom(cur, lat, lng):
|
def save_building(cur, uprn, toid, uprn_relations):
|
||||||
"""Find a building geometry
|
|
||||||
"""
|
|
||||||
cur.execute(
|
|
||||||
"""SELECT geometry_id FROM geometries
|
|
||||||
WHERE
|
|
||||||
ST_Within(
|
|
||||||
ST_Transform(
|
|
||||||
ST_SetSRID(ST_Point(%s, %s), 4326),
|
|
||||||
3857
|
|
||||||
),
|
|
||||||
geometry_geom
|
|
||||||
)
|
|
||||||
""", (
|
|
||||||
lng,
|
|
||||||
lat
|
|
||||||
)
|
|
||||||
)
|
|
||||||
result = cur.fetchone()
|
|
||||||
if result is not None:
|
|
||||||
id_, = result
|
|
||||||
return id_
|
|
||||||
else:
|
|
||||||
return result
|
|
||||||
|
|
||||||
|
|
||||||
def save_building(cur, uprn, geometry_id):
|
|
||||||
"""Save a building
|
"""Save a building
|
||||||
"""
|
"""
|
||||||
cur.execute(
|
cur.execute(
|
||||||
"""SELECT building_id FROM buildings
|
"""UPDATE buildings
|
||||||
WHERE
|
SET uprn = %s, building_doc = %s::jsonb
|
||||||
geometry_id = %s
|
WHERE geometry_id = (
|
||||||
|
SELECT geometry_id FROM geometries
|
||||||
|
WHERE
|
||||||
|
source_id = %s
|
||||||
|
)
|
||||||
""", (
|
""", (
|
||||||
geometry_id,
|
uprn,
|
||||||
|
json.dumps({
|
||||||
|
'uprn_relations': uprn_relations
|
||||||
|
}),
|
||||||
|
toid
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
building = cur.fetchone()
|
|
||||||
if building is None:
|
|
||||||
cur.execute(
|
|
||||||
"""INSERT INTO buildings
|
|
||||||
(
|
|
||||||
building_doc,
|
|
||||||
geometry_id
|
|
||||||
)
|
|
||||||
VALUES
|
|
||||||
(
|
|
||||||
%s::jsonb,
|
|
||||||
%s
|
|
||||||
)
|
|
||||||
""", (
|
|
||||||
json.dumps({
|
|
||||||
'uprns': [uprn]
|
|
||||||
}),
|
|
||||||
geometry_id
|
|
||||||
)
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
building_id = building[0]
|
|
||||||
cur.execute(
|
|
||||||
"""UPDATE buildings
|
|
||||||
SET
|
|
||||||
building_doc = jsonb_insert(
|
|
||||||
building_doc,
|
|
||||||
'{uprns, -1}', -- insert at end of 'uprns' array
|
|
||||||
'%s'::jsonb,
|
|
||||||
true -- insert after location
|
|
||||||
)
|
|
||||||
WHERE
|
|
||||||
building_id = %s
|
|
||||||
""", (
|
|
||||||
uprn,
|
|
||||||
building_id
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def read_config(config_path):
|
|
||||||
"""Read a JSON config file containing database connection details
|
|
||||||
"""
|
|
||||||
with open(config_path, 'r') as fh:
|
|
||||||
conf = json.load(fh)
|
|
||||||
return conf
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
if len(sys.argv) != 2:
|
if len(sys.argv) != 2:
|
||||||
print("Usage: {} ./path/to/source/file.csv".format(
|
print("Usage: {} ./path/to/addressbase_dir/".format(
|
||||||
os.path.basename(__file__)
|
os.path.basename(__file__)
|
||||||
))
|
))
|
||||||
exit()
|
exit()
|
||||||
main(sys.argv[1], sys.argv[2])
|
main(sys.argv[1])
|
||||||
|
Loading…
Reference in New Issue
Block a user