Fix lints

This commit is contained in:
Tom Russell 2023-08-15 14:10:19 +01:00
parent 5378fac326
commit af54a67051
9 changed files with 393 additions and 320 deletions

View File

@ -1 +1,3 @@
from .filter_mastermap import filter_mastermap from .filter_mastermap import filter_mastermap
__all__ = ["filter_mastermap"]

View File

@ -20,24 +20,24 @@ def main(mastermap_path):
def filter_mastermap(mm_path): def filter_mastermap(mm_path):
output_path = str(mm_path).replace(".gml.csv", "") output_path = str(mm_path).replace(".gml.csv", "")
output_path = "{}.filtered.csv".format(output_path) output_path = "{}.filtered.csv".format(output_path)
output_fieldnames = ('WKT', 'fid', 'descriptiveGroup') output_fieldnames = ("WKT", "fid", "descriptiveGroup")
# Open the input csv with all polygons, buildings and others # Open the input csv with all polygons, buildings and others
with open(mm_path, 'r') as fh: with open(mm_path, "r") as fh:
r = csv.DictReader(fh) r = csv.DictReader(fh)
# Open a new output csv that will contain just buildings # Open a new output csv that will contain just buildings
with open(output_path, 'w') as output_fh: with open(output_path, "w") as output_fh:
w = csv.DictWriter(output_fh, fieldnames=output_fieldnames) w = csv.DictWriter(output_fh, fieldnames=output_fieldnames)
w.writeheader() w.writeheader()
for line in r: for line in r:
try: try:
if 'Building' in line['descriptiveGroup']: if "Building" in line["descriptiveGroup"]:
w.writerow(line) w.writerow(line)
# when descriptiveGroup is missing, ignore this Polygon # when descriptiveGroup is missing, ignore this Polygon
except TypeError: except TypeError:
pass pass
if __name__ == '__main__': if __name__ == "__main__":
if len(sys.argv) != 2: if len(sys.argv) != 2:
print("Usage: filter_mastermap.py ./path/to/mastermap/dir") print("Usage: filter_mastermap.py ./path/to/mastermap/dir")
exit(-1) exit(-1)

View File

@ -17,7 +17,6 @@ Then with this script:
""" """
import json
import csv import csv
import os import os
import subprocess import subprocess
@ -28,50 +27,49 @@ from tqdm import tqdm
def main(base_url, api_key, source_file): def main(base_url, api_key, source_file):
"""Read from file, update buildings """Read from file, update buildings"""
""" with open(source_file, "r") as source_fh:
with open(source_file, 'r') as source_fh:
source = csv.DictReader(source_fh) source = csv.DictReader(source_fh)
for feature in tqdm(source, total=line_count(source_file)): for feature in tqdm(source, total=line_count(source_file)):
building_id, data = process_ca(feature) building_id, data = process_ca(feature)
if building_id and building_id != 'building_id': if building_id and building_id != "building_id":
save_data(building_id, data, api_key, base_url) save_data(building_id, data, api_key, base_url)
def line_count(fname): def line_count(fname):
"""Count lines - relies on 'wc' """Count lines - relies on 'wc'"""
""" p = subprocess.run(["wc", "-l", fname], stdout=subprocess.PIPE)
p = subprocess.run(['wc', '-l', fname], stdout=subprocess.PIPE)
if p.returncode != 0: if p.returncode != 0:
raise IOError(err) raise IOError(p.returncode)
return int(p.stdout.strip().split()[0]) return int(p.stdout.strip().split()[0])
def process_ca(props): def process_ca(props):
building_id = props['building_id'] building_id = props["building_id"]
data = { data = {
'planning_in_conservation_area': True, "planning_in_conservation_area": True,
'planning_conservation_area_name': props['conservation_area_name'] "planning_conservation_area_name": props["conservation_area_name"],
} }
return building_id, data return building_id, data
def save_data(building_id, data, api_key, base_url): def save_data(building_id, data, api_key, base_url):
"""Save data to a building """Save data to a building"""
""" requests.post(
r = requests.post(
"{}/buildings/{}.json?api_key={}".format(base_url, building_id, api_key), "{}/buildings/{}.json?api_key={}".format(base_url, building_id, api_key),
json=data json=data,
) )
if __name__ == '__main__': if __name__ == "__main__":
try: try:
url, api_key, filename = sys.argv[1], sys.argv[2], sys.argv[3] url, api_key, filename = sys.argv[1], sys.argv[2], sys.argv[3]
except IndexError: except IndexError:
print( print(
"Usage: {} <URL> <api_key> ./path/to/conservation_areas.csv".format( "Usage: {} <URL> <api_key> ./path/to/conservation_areas.csv".format(
os.path.basename(__file__) os.path.basename(__file__)
)) )
)
exit() exit()
main(url, api_key, filename) main(url, api_key, filename)

View File

@ -44,8 +44,6 @@ TODO extend to allow latitude,longitude or easting,northing columns and lookup b
""" """
import csv import csv
import json import json
import os
import sys
import argparse import argparse
import requests import requests
@ -53,9 +51,8 @@ from retrying import retry
def main(base_url, api_key, source_file, json_columns, no_overwrite=False, debug=False): def main(base_url, api_key, source_file, json_columns, no_overwrite=False, debug=False):
"""Read from file, update buildings """Read from file, update buildings"""
""" with open(source_file, "r") as source:
with open(source_file, 'r') as source:
reader = csv.DictReader(source) reader = csv.DictReader(source)
for line in reader: for line in reader:
building_id = find_building(line, base_url) building_id = find_building(line, base_url)
@ -64,78 +61,86 @@ def main(base_url, api_key, source_file, json_columns, no_overwrite=False, debug
if building_id is None: if building_id is None:
continue continue
if 'sust_dec' in line and line['sust_dec'] == '': if "sust_dec" in line and line["sust_dec"] == "":
del line['sust_dec'] del line["sust_dec"]
if no_overwrite: if no_overwrite:
try: try:
if check_data_present(building_id, line.keys(), base_url): if check_data_present(building_id, line.keys(), base_url):
print(f'Building {building_id}: Not updating to avoid overwriting existing data') print(
f"Building {building_id}: Not updating to avoid overwriting existing data"
)
continue continue
except ApiRequestError as e: except ApiRequestError as e:
print(f'Error checking existing data for building {building_id}: status {e.code}, data: {e.data}') print(
f"Error checking existing data for building {building_id}: status {e.code}, data: {e.data}"
)
raise raise
response_code, response_data = update_building(building_id, line, api_key, base_url) response_code, response_data = update_building(
building_id, line, api_key, base_url
)
if response_code != 200: if response_code != 200:
print('ERROR', building_id, response_code, response_data) print("ERROR", building_id, response_code, response_data)
elif debug: elif debug:
print('DEBUG', building_id, response_code, response_data) print("DEBUG", building_id, response_code, response_data)
class ApiRequestError(Exception): class ApiRequestError(Exception):
def __init__(self, code, data, message=''): def __init__(self, code, data, message=""):
self.code = code self.code = code
self.data = data self.data = data
super().__init__(message) super().__init__(message)
def check_data_present(building_id, fields, base_url): def check_data_present(building_id, fields, base_url):
response_code, current_state = get_building(building_id, base_url) response_code, current_state = get_building(building_id, base_url)
if response_code != 200: if response_code != 200:
raise ApiRequestError(response_code, current_state) raise ApiRequestError(response_code, current_state)
else: else:
id_fields = set(['building_id', 'toid', 'uprn']) id_fields = set(["building_id", "toid", "uprn"])
field_names_without_ids = [k for k in fields if k not in id_fields] field_names_without_ids = [k for k in fields if k not in id_fields]
return any([current_state.get(k, None) != None for k in field_names_without_ids]) return any(
[current_state.get(k, None) is not None for k in field_names_without_ids]
)
@retry(wait_exponential_multiplier=1000, wait_exponential_max=10000) @retry(wait_exponential_multiplier=1000, wait_exponential_max=10000)
def get_building(building_id, base_url): def get_building(building_id, base_url):
"""Get data for a building """Get data for a building"""
"""
r = requests.get(f"{base_url}/api/buildings/{building_id}.json") r = requests.get(f"{base_url}/api/buildings/{building_id}.json")
return r.status_code, r.json() return r.status_code, r.json()
@retry(wait_exponential_multiplier=1000, wait_exponential_max=10000) @retry(wait_exponential_multiplier=1000, wait_exponential_max=10000)
def update_building(building_id, data, api_key, base_url): def update_building(building_id, data, api_key, base_url):
"""Save data to a building """Save data to a building"""
"""
r = requests.post( r = requests.post(
"{}/api/buildings/{}.json".format(base_url, building_id), "{}/api/buildings/{}.json".format(base_url, building_id),
params={'api_key': api_key}, params={"api_key": api_key},
json=data json=data,
) )
return r.status_code, r.json() return r.status_code, r.json()
def find_building(data, base_url): def find_building(data, base_url):
if 'building_id' in data: if "building_id" in data:
building_id = data['building_id'] building_id = data["building_id"]
if building_id is not None: if building_id is not None:
print("match_by_building_id", building_id) print("match_by_building_id", building_id)
return building_id return building_id
if 'toid' in data: if "toid" in data:
building_id = find_by_reference(base_url, 'toid', data['toid']) building_id = find_by_reference(base_url, "toid", data["toid"])
if building_id is not None: if building_id is not None:
print("match_by_toid", data['toid'], building_id) print("match_by_toid", data["toid"], building_id)
return building_id return building_id
if 'uprn' in data: if "uprn" in data:
building_id = find_by_reference(base_url, 'uprn', data['uprn']) building_id = find_by_reference(base_url, "uprn", data["uprn"])
if building_id is not None: if building_id is not None:
print("match_by_uprn", data['uprn'], building_id) print("match_by_uprn", data["uprn"], building_id)
return building_id return building_id
print("no_match", data) print("no_match", data)
@ -144,21 +149,21 @@ def find_building(data, base_url):
@retry(wait_exponential_multiplier=1000, wait_exponential_max=10000) @retry(wait_exponential_multiplier=1000, wait_exponential_max=10000)
def find_by_reference(base_url, ref_key, ref_id): def find_by_reference(base_url, ref_key, ref_id):
"""Find building_id by TOID or UPRN """Find building_id by TOID or UPRN"""
""" r = requests.get(
r = requests.get("{}/api/buildings/reference".format(base_url), params={ "{}/api/buildings/reference".format(base_url),
'key': ref_key, params={"key": ref_key, "id": ref_id},
'id': ref_id )
})
buildings = r.json() buildings = r.json()
if buildings and 'error' not in buildings and len(buildings) == 1: if buildings and "error" not in buildings and len(buildings) == 1:
building_id = buildings[0]['building_id'] building_id = buildings[0]["building_id"]
else: else:
building_id = None building_id = None
return building_id return building_id
def parse_json_columns(row, json_columns): def parse_json_columns(row, json_columns):
for col in json_columns: for col in json_columns:
row[col] = json.loads(row[col]) row[col] = json.loads(row[col])
@ -167,28 +172,41 @@ def parse_json_columns(row, json_columns):
def list_str(values): def list_str(values):
return values.split(',') return values.split(",")
if __name__ == '__main__':
if __name__ == "__main__":
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument('url', help='URL for the app') parser.add_argument("url", help="URL for the app")
parser.add_argument('api_key', help='API key for the user') parser.add_argument("api_key", help="API key for the user")
parser.add_argument('path', help='Path to data CSV file') parser.add_argument("path", help="Path to data CSV file")
parser.add_argument('json_columns', parser.add_argument(
nargs='?', "json_columns",
nargs="?",
type=list_str, type=list_str,
default=[], default=[],
help='A comma-separated list of columns which should be parsed as JSON') help="A comma-separated list of columns which should be parsed as JSON",
)
parser.add_argument('--no-overwrite', '-n', parser.add_argument(
action='store_true', "--no-overwrite",
dest='no_overwrite', "-n",
help='Don\'t overwrite building data if any of the fields supplied is already set') action="store_true",
dest="no_overwrite",
help="Don't overwrite building data if any of the fields supplied is already set",
)
parser.add_argument('--debug', '-d', parser.add_argument(
action='store_true', "--debug", "-d", action="store_true", help="Print debug messages"
help='Print debug messages') )
args = parser.parse_args() args = parser.parse_args()
main(args.url, args.api_key, args.path, args.json_columns, args.no_overwrite, args.debug) main(
args.url,
args.api_key,
args.path,
args.json_columns,
args.no_overwrite,
args.debug,
)

View File

@ -23,18 +23,18 @@ The process:
TODO extend to allow latitude,longitude or easting,northing columns and lookup by location. TODO extend to allow latitude,longitude or easting,northing columns and lookup by location.
""" """
import csv import csv
import json
import os import os
import sys import sys
import requests import requests
session = requests.Session() session = requests.Session()
session.verify = False session.verify = False
def main(base_url, api_key, source_file): def main(base_url, api_key, source_file):
"""Read from file, update buildings """Read from file, update buildings"""
""" with open(source_file, "r") as source:
with open(source_file, 'r') as source:
reader = csv.DictReader(source) reader = csv.DictReader(source)
for line in reader: for line in reader:
building_id = find_building(line, base_url) building_id = find_building(line, base_url)
@ -42,40 +42,41 @@ def main(base_url, api_key, source_file):
if building_id is None: if building_id is None:
continue continue
response_code, response_data = update_building(building_id, line, api_key, base_url) response_code, response_data = update_building(
building_id, line, api_key, base_url
)
if response_code != 200: if response_code != 200:
print('ERROR', building_id, response_code, response_data) print("ERROR", building_id, response_code, response_data)
def update_building(building_id, data, api_key, base_url): def update_building(building_id, data, api_key, base_url):
"""Save data to a building """Save data to a building"""
"""
r = requests.post( r = requests.post(
"{}/api/buildings/{}.json".format(base_url, building_id), "{}/api/buildings/{}.json".format(base_url, building_id),
params={'api_key': api_key}, params={"api_key": api_key},
json=data, json=data,
verify=False verify=False,
) )
print(r) print(r)
return r.status_code, r.json() return r.status_code, r.json()
def find_building(data, base_url): def find_building(data, base_url):
if 'building_id' in data: if "building_id" in data:
building_id = data['building_id'] building_id = data["building_id"]
if building_id is not None: if building_id is not None:
print("match_by_building_id", building_id) print("match_by_building_id", building_id)
return building_id return building_id
if 'toid' in data: if "toid" in data:
building_id = find_by_reference(base_url, 'toid', data['toid']) building_id = find_by_reference(base_url, "toid", data["toid"])
if building_id is not None: if building_id is not None:
print("match_by_toid", data['toid'], building_id) print("match_by_toid", data["toid"], building_id)
return building_id return building_id
if 'uprn' in data: if "uprn" in data:
building_id = find_by_reference(base_url, 'uprn', data['uprn']) building_id = find_by_reference(base_url, "uprn", data["uprn"])
if building_id is not None: if building_id is not None:
print("match_by_uprn", data['uprn'], building_id) print("match_by_uprn", data["uprn"], building_id)
return building_id return building_id
print("no_match", data) print("no_match", data)
@ -83,32 +84,34 @@ def find_building(data, base_url):
def find_by_reference(base_url, ref_key, ref_id): def find_by_reference(base_url, ref_key, ref_id):
"""Find building_id by TOID or UPRN """Find building_id by TOID or UPRN"""
""" r = requests.get(
r = requests.get("{}/api/buildings/reference".format(base_url), params={ "{}/api/buildings/reference".format(base_url),
'key': ref_key, params={
'id': ref_id, "key": ref_key,
}, "id": ref_id,
verify=False },
verify=False,
) )
buildings = r.json() buildings = r.json()
if buildings and 'error' not in buildings and len(buildings) == 1: if buildings and "error" not in buildings and len(buildings) == 1:
building_id = buildings[0]['building_id'] building_id = buildings[0]["building_id"]
else: else:
building_id = None building_id = None
return building_id return building_id
if __name__ == '__main__': if __name__ == "__main__":
try: try:
url, api_key, filename = sys.argv[1], sys.argv[2], sys.argv[3] url, api_key, filename = sys.argv[1], sys.argv[2], sys.argv[3]
except IndexError: except IndexError:
print( print(
"Usage: {} <URL> <api_key> ./path/to/data.csv".format( "Usage: {} <URL> <api_key> ./path/to/data.csv".format(
os.path.basename(__file__) os.path.basename(__file__)
)) )
)
exit() exit()
main(url, api_key, filename) main(url, api_key, filename)

View File

@ -8,7 +8,6 @@ datasets for Camden (age data) and Fitzrovia (number of storeys).
- else locate building by representative point - else locate building by representative point
- update building with data - update building with data
""" """
import json
import os import os
import sys import sys
from functools import partial from functools import partial
@ -21,18 +20,15 @@ from shapely.ops import transform
osgb_to_ll = partial( osgb_to_ll = partial(
pyproj.transform, pyproj.transform, pyproj.Proj(init="epsg:27700"), pyproj.Proj(init="epsg:4326")
pyproj.Proj(init='epsg:27700'),
pyproj.Proj(init='epsg:4326')
) )
def main(base_url, api_key, process, source_file): def main(base_url, api_key, process, source_file):
"""Read from file, update buildings """Read from file, update buildings"""
""" with fiona.open(source_file, "r") as source:
with fiona.open(source_file, 'r') as source:
for feature in source: for feature in source:
props = feature['properties'] props = feature["properties"]
if process == "camden": if process == "camden":
toid, data = process_camden(props) toid, data = process_camden(props)
@ -42,7 +38,7 @@ def main(base_url, api_key, process, source_file):
if data is None: if data is None:
continue continue
building_id = find_building(toid, feature['geometry'], base_url) building_id = find_building(toid, feature["geometry"], base_url)
if not building_id: if not building_id:
print("no_match", toid, "-") print("no_match", toid, "-")
continue continue
@ -51,31 +47,22 @@ def main(base_url, api_key, process, source_file):
def process_camden(props): def process_camden(props):
toid = osgb_toid(props['TOID']) toid = osgb_toid(props["TOID"])
data = { data = {"date_year": props["Year_C"], "date_source_detail": props["Date_sou_1"]}
'date_year': props['Year_C'],
'date_source_detail': props['Date_sou_1']
}
return toid, data return toid, data
def process_fitzrovia(props): def process_fitzrovia(props):
toid = osgb_toid(props['TOID']) toid = osgb_toid(props["TOID"])
storeys = props['Storeys'] storeys = props["Storeys"]
if storeys is None: if storeys is None:
return toid, None return toid, None
if props['Basement'] == 'Yes': if props["Basement"] == "Yes":
data = { data = {"size_storeys_core": int(storeys) - 1, "size_storeys_basement": 1}
'size_storeys_core': int(storeys) - 1,
'size_storeys_basement': 1
}
else: else:
data = { data = {"size_storeys_core": int(storeys), "size_storeys_basement": 0}
'size_storeys_core': int(storeys),
'size_storeys_basement': 0
}
return toid, data return toid, data
@ -86,24 +73,21 @@ def osgb_toid(toid):
def save_data(building_id, data, api_key, base_url): def save_data(building_id, data, api_key, base_url):
"""Save data to a building """Save data to a building"""
""" requests.post(
r = requests.post(
"{}/buildings/{}.json?api_key={}".format(base_url, building_id, api_key), "{}/buildings/{}.json?api_key={}".format(base_url, building_id, api_key),
json=data json=data,
) )
def find_building(toid, geom, base_url): def find_building(toid, geom, base_url):
"""Find building_id by TOID or location """Find building_id by TOID or location"""
""" r = requests.get(
r = requests.get(base_url + "/buildings/reference", params={ base_url + "/buildings/reference", params={"key": "toid", "id": toid}
'key': 'toid', )
'id': toid
})
buildings = r.json() buildings = r.json()
if buildings and len(buildings) == 1: if buildings and len(buildings) == 1:
bid = buildings[0]['building_id'] bid = buildings[0]["building_id"]
print("match_by_toid", toid, bid) print("match_by_toid", toid, bid)
return bid return bid
@ -114,27 +98,32 @@ def find_building(toid, geom, base_url):
point_osgb = poly.representative_point() point_osgb = poly.representative_point()
point_ll = transform(osgb_to_ll, point_osgb) point_ll = transform(osgb_to_ll, point_osgb)
r = requests.get(base_url + "/buildings/locate", params={ r = requests.get(
'lng': point_ll.x, base_url + "/buildings/locate", params={"lng": point_ll.x, "lat": point_ll.y}
'lat': point_ll.y )
})
buildings = r.json() buildings = r.json()
if buildings and len(buildings) == 1: if buildings and len(buildings) == 1:
bid = buildings[0]['building_id'] bid = buildings[0]["building_id"]
print("match_by_location", toid, bid) print("match_by_location", toid, bid)
return bid return bid
return None return None
if __name__ == '__main__': if __name__ == "__main__":
try: try:
url, api_key, process, filename = sys.argv[1], sys.argv[2], sys.argv[3], sys.argv[4] url, api_key, process, filename = (
sys.argv[1],
sys.argv[2],
sys.argv[3],
sys.argv[4],
)
except IndexError: except IndexError:
print( print(
"Usage: {} <URL> <api_key> <camden|fitzrovia> ./path/to/camden.shp".format( "Usage: {} <URL> <api_key> <camden|fitzrovia> ./path/to/camden.shp".format(
os.path.basename(__file__) os.path.basename(__file__)
)) )
)
exit() exit()
main(url, api_key, process, filename) main(url, api_key, process, filename)

View File

@ -8,7 +8,6 @@ datasets for Camden (age data) and Fitzrovia (number of storeys).
- else locate building by representative point - else locate building by representative point
- update building with data - update building with data
""" """
import json
import os import os
import sys import sys
from functools import partial from functools import partial
@ -21,18 +20,15 @@ from shapely.ops import transform
osgb_to_ll = partial( osgb_to_ll = partial(
pyproj.transform, pyproj.transform, pyproj.Proj(init="epsg:27700"), pyproj.Proj(init="epsg:4326")
pyproj.Proj(init='epsg:27700'),
pyproj.Proj(init='epsg:4326')
) )
def main(base_url, api_key, process, source_file): def main(base_url, api_key, process, source_file):
"""Read from file, update buildings """Read from file, update buildings"""
""" with fiona.open(source_file, "r") as source:
with fiona.open(source_file, 'r') as source:
for feature in source: for feature in source:
props = feature['properties'] props = feature["properties"]
if process == "camden": if process == "camden":
toid, data = process_camden(props) toid, data = process_camden(props)
@ -42,7 +38,7 @@ def main(base_url, api_key, process, source_file):
if data is None: if data is None:
continue continue
building_id = find_building(toid, feature['geometry'], base_url) building_id = find_building(toid, feature["geometry"], base_url)
if not building_id: if not building_id:
print("no_match", toid, "-") print("no_match", toid, "-")
continue continue
@ -51,31 +47,22 @@ def main(base_url, api_key, process, source_file):
def process_camden(props): def process_camden(props):
toid = osgb_toid(props['TOID']) toid = osgb_toid(props["TOID"])
data = { data = {"date_year": props["Year_C"], "date_source_detail": props["Date_sou_1"]}
'date_year': props['Year_C'],
'date_source_detail': props['Date_sou_1']
}
return toid, data return toid, data
def process_fitzrovia(props): def process_fitzrovia(props):
toid = osgb_toid(props['TOID']) toid = osgb_toid(props["TOID"])
storeys = props['Storeys'] storeys = props["Storeys"]
if storeys is None: if storeys is None:
return toid, None return toid, None
if props['Basement'] == 'Yes': if props["Basement"] == "Yes":
data = { data = {"size_storeys_core": int(storeys) - 1, "size_storeys_basement": 1}
'size_storeys_core': int(storeys) - 1,
'size_storeys_basement': 1
}
else: else:
data = { data = {"size_storeys_core": int(storeys), "size_storeys_basement": 0}
'size_storeys_core': int(storeys),
'size_storeys_basement': 0
}
return toid, data return toid, data
@ -86,24 +73,21 @@ def osgb_toid(toid):
def save_data(building_id, data, api_key, base_url): def save_data(building_id, data, api_key, base_url):
"""Save data to a building """Save data to a building"""
""" requests.post(
r = requests.post(
"{}/buildings/{}.json?api_key={}".format(base_url, building_id, api_key), "{}/buildings/{}.json?api_key={}".format(base_url, building_id, api_key),
json=data json=data,
) )
def find_building(toid, geom, base_url): def find_building(toid, geom, base_url):
"""Find building_id by TOID or location """Find building_id by TOID or location"""
""" r = requests.get(
r = requests.get(base_url + "/buildings/reference", params={ base_url + "/buildings/reference", params={"key": "toid", "id": toid}
'key': 'toid', )
'id': toid
})
buildings = r.json() buildings = r.json()
if buildings and len(buildings) == 1: if buildings and len(buildings) == 1:
bid = buildings[0]['building_id'] bid = buildings[0]["building_id"]
print("match_by_toid", toid, bid) print("match_by_toid", toid, bid)
return bid return bid
@ -114,27 +98,32 @@ def find_building(toid, geom, base_url):
point_osgb = poly.representative_point() point_osgb = poly.representative_point()
point_ll = transform(osgb_to_ll, point_osgb) point_ll = transform(osgb_to_ll, point_osgb)
r = requests.get(base_url + "/buildings/locate", params={ r = requests.get(
'lng': point_ll.x, base_url + "/buildings/locate", params={"lng": point_ll.x, "lat": point_ll.y}
'lat': point_ll.y )
})
buildings = r.json() buildings = r.json()
if buildings and len(buildings) == 1: if buildings and len(buildings) == 1:
bid = buildings[0]['building_id'] bid = buildings[0]["building_id"]
print("match_by_location", toid, bid) print("match_by_location", toid, bid)
return bid return bid
return None return None
if __name__ == '__main__': if __name__ == "__main__":
try: try:
url, api_key, process, filename = sys.argv[1], sys.argv[2], sys.argv[3], sys.argv[4] url, api_key, process, filename = (
sys.argv[1],
sys.argv[2],
sys.argv[3],
sys.argv[4],
)
except IndexError: except IndexError:
print( print(
"Usage: {} <URL> <api_key> <camden|fitzrovia> ./path/to/camden.shp".format( "Usage: {} <URL> <api_key> <camden|fitzrovia> ./path/to/camden.shp".format(
os.path.basename(__file__) os.path.basename(__file__)
)) )
)
exit() exit()
main(url, api_key, process, filename) main(url, api_key, process, filename)

View File

@ -1,9 +1,14 @@
def planning_data_entry_to_address(element): def planning_data_entry_to_address(element):
site_name = element["_source"].get("site_name") site_name = element["_source"].get("site_name")
site_number = element["_source"].get("site_number") site_number = element["_source"].get("site_number")
street_name = element["_source"].get("street_name") # seems often misused - say "31 COPTHALL ROAD EAST" site_name getting Ickenham street_name street_name = element["_source"].get("street_name")
# seems often misused - say "31 COPTHALL ROAD EAST" site_name
# getting Ickenham street_name
secondary_street_name = element["_source"].get("secondary_street_name") secondary_street_name = element["_source"].get("secondary_street_name")
return generate_address(site_name, site_number, street_name, secondary_street_name)['result'] return generate_address(site_name, site_number, street_name, secondary_street_name)[
"result"
]
def generate_address(site_name, site_number, street_name, secondary_street_name): def generate_address(site_name, site_number, street_name, secondary_street_name):
""" """
@ -11,13 +16,13 @@ def generate_address(site_name, site_number, street_name, secondary_street_name)
sadly it does not always works well and relies on many heursitics as data quality is limited sadly it does not always works well and relies on many heursitics as data quality is limited
""" """
if site_name != None: if site_name is not None:
site_name = site_name.strip() site_name = site_name.strip()
if site_number != None: if site_number is not None:
site_number = site_number.strip() site_number = site_number.strip()
if street_name != None: if street_name is not None:
street_name = street_name.strip() street_name = street_name.strip()
if secondary_street_name != None: if secondary_street_name is not None:
secondary_street_name = secondary_street_name.strip() secondary_street_name = secondary_street_name.strip()
if site_name == "": if site_name == "":
@ -29,68 +34,80 @@ def generate_address(site_name, site_number, street_name, secondary_street_name)
if secondary_street_name == "": if secondary_street_name == "":
secondary_street_name = None secondary_street_name = None
data = { data = {
'site_name': site_name, "site_name": site_name,
'site_number': site_number, "site_number": site_number,
'street_name': street_name, "street_name": street_name,
'secondary_street_name': secondary_street_name, "secondary_street_name": secondary_street_name,
} }
if site_name == site_number == street_name == secondary_street_name == None: if site_name == site_number == street_name == secondary_street_name is None:
return {'result': None, 'data': data} return {"result": None, "data": data}
if secondary_street_name != None: if secondary_street_name is not None:
if street_name == None: if street_name is None:
print('"secondary_street_name != None, street_name == None"') print('"secondary_street_name is not None, street_name is None"')
show_data(site_name, site_number, street_name, secondary_street_name, "???????") show_data(
site_name, site_number, street_name, secondary_street_name, "???????"
)
else: else:
street_name += " - with secondary road name: " + secondary_street_name street_name += " - with secondary road name: " + secondary_street_name
if site_number != None and street_name != None: if site_number is not None and street_name is not None:
address = site_number + " " + street_name address = site_number + " " + street_name
if site_name != None: if site_name is not None:
print('"site_name != None and site_number != None and street_name != None"') print(
show_data(site_name, site_number, street_name, secondary_street_name, address) '"site_name is not None and site_number is not None and street_name is not None"'
)
show_data(
site_name, site_number, street_name, secondary_street_name, address
)
return {'result': address, 'data': data} return {"result": address, "data": data}
if site_name != None: if site_name is not None:
if street_name != None: if street_name is not None:
try: try:
if site_number == None and int(site_name): if site_number is None and int(site_name):
return {'result': site_name + " " + street_name, 'data': data} return {"result": site_name + " " + street_name, "data": data}
except ValueError: except ValueError:
pass pass
if street_name in site_name: if street_name in site_name:
site_name_without_street_name = site_name.replace(street_name, "").strip() site_name_without_street_name = site_name.replace(
street_name, ""
).strip()
try: try:
house_number = int(site_name_without_street_name) _ = int(site_name_without_street_name)
# so it appears to be case like # so it appears to be case like
# site_name: 5 Warwick Road # site_name: 5 Warwick Road
# street_name: Warwick Road # street_name: Warwick Road
# no other info provided # no other info provided
# in such case just returning site_name will work fine... # in such case just returning site_name will work fine...
return {'result': site_name, 'data': data} return {"result": site_name, "data": data}
except ValueError: except ValueError:
pass pass
print('"site_name != None and street_name != None"') print('"site_name is not None and street_name is not None"')
show_data(site_name, site_number, street_name, secondary_street_name, site_name) show_data(
if site_number != None: site_name, site_number, street_name, secondary_street_name, site_name
print('"site_name != None and site_number != None"') )
show_data(site_name, site_number, street_name, secondary_street_name, site_name) if site_number is not None:
return {'result': site_name, 'data': data} print('"site_name is not None and site_number is not None"')
show_data(
site_name, site_number, street_name, secondary_street_name, site_name
)
return {"result": site_name, "data": data}
else: else:
if street_name != None: if street_name is not None:
if site_number != None: if site_number is not None:
return {'result': site_number + " " + street_name, 'data': data} return {"result": site_number + " " + street_name, "data": data}
if street_name != None and site_number == None: if street_name is not None and site_number is None:
print('"street_name != None or site_number == None"') print('"street_name is not None or site_number is None"')
show_data(site_name, site_number, street_name, secondary_street_name, None) show_data(site_name, site_number, street_name, secondary_street_name, None)
return {'result': None, 'data': data} return {"result": None, "data": data}
if street_name == None and site_number != None: if street_name is None and site_number is not None:
print('"street_name == None or site_number != None"') print('"street_name is None or site_number is not None"')
show_data(site_name, site_number, street_name, secondary_street_name, None) show_data(site_name, site_number, street_name, secondary_street_name, None)
return {'result': None, 'data': data} return {"result": None, "data": data}
return {'result': None, 'data': data} return {"result": None, "data": data}
def show_data(site_name, site_number, street_name, secondary_street_name, address): def show_data(site_name, site_number, street_name, secondary_street_name, address):

View File

@ -5,6 +5,7 @@ import requests
import psycopg2 import psycopg2
import address_data import address_data
def main(): def main():
connection = get_connection() connection = get_connection()
cursor = connection.cursor() cursor = connection.cursor()
@ -16,10 +17,12 @@ def main():
while True: while True:
data = query(search_after).json() data = query(search_after).json()
load_data_into_database(cursor, data) load_data_into_database(cursor, data)
for entry in data['hits']['hits']: for entry in data["hits"]["hits"]:
downloaded += 1 downloaded += 1
last_sort = entry['sort'] last_sort = entry["sort"]
print("downloaded", downloaded, "last_sort", last_sort, "previous", search_after) print(
"downloaded", downloaded, "last_sort", last_sort, "previous", search_after
)
if search_after == last_sort: if search_after == last_sort:
break break
search_after = last_sort search_after = last_sort
@ -31,24 +34,30 @@ def load_data_into_database(cursor, data):
print(json.dumps(data, indent=4)) print(json.dumps(data, indent=4))
print("timed_out field missing in provided data") print("timed_out field missing in provided data")
else: else:
if data['timed_out']: if data["timed_out"]:
raise Exception("query getting livestream data has failed") raise Exception("query getting livestream data has failed")
for entry in data['hits']['hits']: for entry in data["hits"]["hits"]:
try: try:
description = None description = None
if entry['_source']['description'] != None: if entry["_source"]["description"] is not None:
description = entry['_source']['description'].strip() description = entry["_source"]["description"].strip()
application_id = entry['_source']['lpa_app_no'] application_id = entry["_source"]["lpa_app_no"]
application_id_with_borough_identifier = entry['_source']['id'] application_id_with_borough_identifier = entry["_source"]["id"]
decision_date = parse_date_string_into_date_object(entry['_source']['decision_date']) decision_date = parse_date_string_into_date_object(
last_synced_date = parse_date_string_into_date_object(entry['_source']['last_synced']) entry["_source"]["decision_date"]
uprn = entry['_source']['uprn'] )
status_before_aliasing = entry['_source']['status'] last_synced_date = parse_date_string_into_date_object(
entry["_source"]["last_synced"]
)
uprn = entry["_source"]["uprn"]
status_before_aliasing = entry["_source"]["status"]
status_info = process_status(status_before_aliasing, decision_date) status_info = process_status(status_before_aliasing, decision_date)
status = status_info["status"] status = status_info["status"]
status_explanation_note = status_info["status_explanation_note"] status_explanation_note = status_info["status_explanation_note"]
planning_url = obtain_entry_link(entry['_source']['url_planning_app'], application_id) planning_url = obtain_entry_link(
if uprn == None: entry["_source"]["url_planning_app"], application_id
)
if uprn is None:
continue continue
try: try:
uprn = int(uprn) uprn = int(uprn)
@ -61,7 +70,9 @@ def load_data_into_database(cursor, data):
"last_synced_date": last_synced_date, "last_synced_date": last_synced_date,
"application_id": application_id, "application_id": application_id,
"application_url": planning_url, "application_url": planning_url,
"registered_with_local_authority_date": parse_date_string_into_date_object(entry['_source']['valid_date']), "registered_with_local_authority_date": parse_date_string_into_date_object(
entry["_source"]["valid_date"]
),
"uprn": uprn, "uprn": uprn,
"status": status, "status": status,
"status_before_aliasing": status_before_aliasing, "status_before_aliasing": status_before_aliasing,
@ -70,13 +81,16 @@ def load_data_into_database(cursor, data):
"data_source_link": "https://www.london.gov.uk/programmes-strategies/planning/digital-planning/planning-london-datahub", "data_source_link": "https://www.london.gov.uk/programmes-strategies/planning/digital-planning/planning-london-datahub",
"address": address_data.planning_data_entry_to_address(entry), "address": address_data.planning_data_entry_to_address(entry),
} }
if entry["address"] != None: if entry["address"] is not None:
maximum_address_length = 300 maximum_address_length = 300
if len(entry["address"]) > maximum_address_length: if len(entry["address"]) > maximum_address_length:
print("address is too long, shortening", entry["address"]) print("address is too long, shortening", entry["address"])
entry["address"] = entry["address"][0:maximum_address_length] entry["address"] = entry["address"][0:maximum_address_length]
if date_in_future(entry["registered_with_local_authority_date"]): if date_in_future(entry["registered_with_local_authority_date"]):
print("registered_with_local_authority_date is treated as invalid:", entry["registered_with_local_authority_date"]) print(
"registered_with_local_authority_date is treated as invalid:",
entry["registered_with_local_authority_date"],
)
# Brent-87_0946 has "valid_date": "23/04/9187" # Brent-87_0946 has "valid_date": "23/04/9187"
entry["registered_with_local_authority_date"] = None entry["registered_with_local_authority_date"] = None
@ -85,13 +99,17 @@ def load_data_into_database(cursor, data):
entry["decision_date"] = None entry["decision_date"] = None
if date_in_future(entry["last_synced_date"]): if date_in_future(entry["last_synced_date"]):
print("last_synced_date is treated as invalid:", entry["last_synced_date"]) print(
"last_synced_date is treated as invalid:", entry["last_synced_date"]
)
entry["last_synced_date"] = None entry["last_synced_date"] = None
if "Hackney" in application_id_with_borough_identifier: if "Hackney" in application_id_with_borough_identifier:
if entry["application_url"] != None: if entry["application_url"] is not None:
if "https://" not in entry["application_url"]: if "https://" not in entry["application_url"]:
entry["application_url"] = "https://developmentandhousing.hackney.gov.uk" + entry["application_url"] entry[
"application_url"
] = f"https://developmentandhousing.hackney.gov.uk{entry['application_url']}"
insert_entry(cursor, entry) insert_entry(cursor, entry)
except TypeError as e: except TypeError as e:
print() print()
@ -104,40 +122,40 @@ def load_data_into_database(cursor, data):
def date_in_future(date): def date_in_future(date):
if date == None: if date is None:
return False return False
return date > datetime.datetime.now() return date > datetime.datetime.now()
def query(search_after): def query(search_after):
headers = { headers = {
'X-API-AllowRequest': os.environ['PLANNNING_DATA_API_ALLOW_REQUEST_CODE'], "X-API-AllowRequest": os.environ["PLANNNING_DATA_API_ALLOW_REQUEST_CODE"],
# Already added when you pass json= but not when you pass data= # Already added when you pass json= but not when you pass data=
# 'Content-Type': 'application/json', # 'Content-Type': 'application/json',
} }
json_data = { json_data = {
'size': 10000, "size": 10000,
'sort': [ "sort": [
{ {
'last_updated': { "last_updated": {
'order': 'desc', "order": "desc",
'unmapped_type': 'boolean', "unmapped_type": "boolean",
}, },
}, },
], ],
'stored_fields': [ "stored_fields": [
'*', "*",
], ],
'_source': { "_source": {
'excludes': [], "excludes": [],
}, },
'query': { "query": {
'bool': { "bool": {
'must': [ "must": [
{ {
'range': { "range": {
'valid_date': { "valid_date": {
'gte': '01/01/1021', "gte": "01/01/1021",
}, },
}, },
}, },
@ -147,18 +165,22 @@ def query(search_after):
} }
if search_after != []: if search_after != []:
json_data['search_after'] = search_after json_data["search_after"] = search_after
print(json_data) print(json_data)
return requests.post('https://planningdata.london.gov.uk/api-guest/applications/_search', headers=headers, json=json_data) return requests.post(
"https://planningdata.london.gov.uk/api-guest/applications/_search",
headers=headers,
json=json_data,
)
def get_connection(): def get_connection():
return psycopg2.connect( return psycopg2.connect(
host=os.environ['PGHOST'], host=os.environ["PGHOST"],
dbname=os.environ['PGDATABASE'], dbname=os.environ["PGDATABASE"],
user=os.environ['PGUSER'], user=os.environ["PGUSER"],
password=os.environ['PGPASSWORD'] password=os.environ["PGPASSWORD"],
) )
@ -170,28 +192,31 @@ def insert_entry(cursor, e):
try: try:
now = datetime.datetime.now() now = datetime.datetime.now()
application_url = None application_url = None
if e["application_url"] != None: if e["application_url"] is not None:
application_url = e["application_url"] application_url = e["application_url"]
cursor.execute('''INSERT INTO cursor.execute(
"""INSERT INTO
planning_data (planning_application_id, planning_application_link, description, registered_with_local_authority_date, days_since_registration_cached, decision_date, days_since_decision_date_cached, last_synced_date, status, status_before_aliasing, status_explanation_note, data_source, data_source_link, address, uprn) planning_data (planning_application_id, planning_application_link, description, registered_with_local_authority_date, days_since_registration_cached, decision_date, days_since_decision_date_cached, last_synced_date, status, status_before_aliasing, status_explanation_note, data_source, data_source_link, address, uprn)
VALUES VALUES
(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
''', ( """,
e["application_id"], (
application_url, e["description"], e["application_id"],
date_object_into_date_string(e["registered_with_local_authority_date"]), application_url,
days_since(e["registered_with_local_authority_date"], now), e["description"],
date_object_into_date_string(e["decision_date"]), date_object_into_date_string(e["registered_with_local_authority_date"]),
days_since(e["decision_date"], now), days_since(e["registered_with_local_authority_date"], now),
date_object_into_date_string(e["last_synced_date"]), date_object_into_date_string(e["decision_date"]),
e["status"], days_since(e["decision_date"], now),
e["status_before_aliasing"], date_object_into_date_string(e["last_synced_date"]),
e["status_explanation_note"], e["status"],
e["data_source"], e["status_before_aliasing"],
e["data_source_link"], e["status_explanation_note"],
e["address"], e["data_source"],
e["uprn"], e["data_source_link"],
) e["address"],
e["uprn"],
),
) )
except psycopg2.errors.Error as error: except psycopg2.errors.Error as error:
show_dictionary(e) show_dictionary(e)
@ -204,30 +229,32 @@ def show_dictionary(data):
def days_since(date, now): def days_since(date, now):
if(date == None): if date is None:
return None return None
return (now - date).days return (now - date).days
def date_object_into_date_string(date): def date_object_into_date_string(date):
if(date == None): if date is None:
return None return None
return datetime.datetime.strftime(date, "%Y-%m-%d") return datetime.datetime.strftime(date, "%Y-%m-%d")
def parse_date_string_into_date_object(incoming): def parse_date_string_into_date_object(incoming):
if incoming == None: if incoming is None:
return None return None
date = None date = None
try: try:
date = datetime.datetime.strptime(incoming, "%d/%m/%Y") # '21/07/2022' date = datetime.datetime.strptime(incoming, "%d/%m/%Y") # '21/07/2022'
except ValueError: except ValueError:
date = datetime.datetime.strptime(incoming, "%Y-%m-%dT%H:%M:%S.%fZ") # '2022-08-08T20:07:22.238Z' date = datetime.datetime.strptime(
incoming, "%Y-%m-%dT%H:%M:%S.%fZ"
) # '2022-08-08T20:07:22.238Z'
return date return date
def obtain_entry_link(provided_link, application_id): def obtain_entry_link(provided_link, application_id):
if provided_link != None: if provided_link is not None:
if "Ealing" in application_id: if "Ealing" in application_id:
if ";" == provided_link[-1]: if ";" == provided_link[-1]:
return provided_link[:-1] return provided_link[:-1]
@ -237,7 +264,7 @@ def obtain_entry_link(provided_link, application_id):
# Planning application ID: Hackney-2021_2491 # Planning application ID: Hackney-2021_2491
# https://developmentandhousing.hackney.gov.uk/planning/index.html?fa=getApplication&reference=2021/2491 # https://developmentandhousing.hackney.gov.uk/planning/index.html?fa=getApplication&reference=2021/2491
ref_for_link = application_id.replace("Hackney-", "").replace("_", "/") ref_for_link = application_id.replace("Hackney-", "").replace("_", "/")
return "https://developmentandhousing.hackney.gov.uk/planning/index.html?fa=getApplication&reference=" + ref_for_link return f"https://developmentandhousing.hackney.gov.uk/planning/index.html?fa=getApplication&reference={ref_for_link}"
if "Lambeth" in application_id: if "Lambeth" in application_id:
# sadly, specific links seems impossible # sadly, specific links seems impossible
return "https://planning.lambeth.gov.uk/online-applications/refineSearch.do?action=refine" return "https://planning.lambeth.gov.uk/online-applications/refineSearch.do?action=refine"
@ -282,9 +309,16 @@ def obtain_entry_link(provided_link, application_id):
def process_status(status, decision_date): def process_status(status, decision_date):
status_length_limit = 50 # see migrations/034.planning_livestream_data.up.sql status_length_limit = 50 # see migrations/034.planning_livestream_data.up.sql
if status in ["Application Under Consideration", "Application Received"]: if status in ["Application Under Consideration", "Application Received"]:
if decision_date == None: if decision_date is None:
status = "Submitted" status = "Submitted"
if status in ["Refused", "Refusal", "Refusal (P)", "Application Invalid", "Insufficient Fee", "Dismissed"]: if status in [
"Refused",
"Refusal",
"Refusal (P)",
"Application Invalid",
"Insufficient Fee",
"Dismissed",
]:
status = "Rejected" status = "Rejected"
if status == "Appeal Received": if status == "Appeal Received":
status = "Appeal In Progress" status = "Appeal In Progress"
@ -296,16 +330,39 @@ def process_status(status, decision_date):
status = "Withdrawn" status = "Withdrawn"
if len(status) > status_length_limit: if len(status) > status_length_limit:
print("Status was too long and was skipped:", status) print("Status was too long and was skipped:", status)
return {"status": "Processing failed", "status_explanation_note": "status was unusally long and it was imposible to save it"} return {
if (status in ["Submitted", "Approved", "Rejected", "Appeal In Progress", "Withdrawn", "Unknown"]): "status": "Processing failed",
"status_explanation_note": "status was unusally long and it was imposible to save it",
}
if status in [
"Submitted",
"Approved",
"Rejected",
"Appeal In Progress",
"Withdrawn",
"Unknown",
]:
return {"status": status, "status_explanation_note": None} return {"status": status, "status_explanation_note": None}
if status in ["No Objection to Proposal (OBS only)", "Objection Raised to Proposal (OBS only)"]: if status in [
return {"status": "Approved", "status_explanation_note": "preapproved application, local authority is unable to reject it"} "No Objection to Proposal (OBS only)",
"Objection Raised to Proposal (OBS only)",
]:
return {
"status": "Approved",
"status_explanation_note": "preapproved application, local authority is unable to reject it",
}
print("Unexpected status " + status) print("Unexpected status " + status)
if status not in ["Not Required", "SECS", "Comment Issued", "ALL DECISIONS ISSUED", "Closed", "Declined to Determine"]: if status not in [
"Not Required",
"SECS",
"Comment Issued",
"ALL DECISIONS ISSUED",
"Closed",
"Declined to Determine",
]:
print("New unexpected status " + status) print("New unexpected status " + status)
return {"status": status, "status_explanation_note": None} return {"status": status, "status_explanation_note": None}
if __name__ == '__main__': if __name__ == "__main__":
main() main()