change data source

This commit is contained in:
Mateusz Konieczny 2022-10-31 08:49:13 +01:00
parent d1cc8c7a16
commit 0f689217c3
8 changed files with 204 additions and 8804 deletions

View File

@ -16,6 +16,10 @@ const LongText = ({ content,limit}) => {
const showMore = () => setShowAll(true);
const showLess = () => setShowAll(false);
if (content == null) {
return <div>{MissingData}</div>
}
if (content.length <= limit) {
return <div>{content}</div>
}
@ -36,6 +40,12 @@ const LongText = ({ content,limit}) => {
const Disclaimer = () => { return <Fragment><div><i><u>Disclaimer</u>: these data are currently incomplete and also often do not provide information on minor alterations. For comprehensive information on all applications please visit the local authorities' planning websites.</i></div></Fragment> }
const MissingData = "not available"
function ShowIfAvailable(data) {
return <>{data ? data.toString() : MissingData }</>
}
const PlanningDataOfficialDataEntry: React.FC<PlanningDataOfficialDataEntryProps> = (props) => {
const data = props.value || [];
@ -54,13 +64,13 @@ const PlanningDataOfficialDataEntry: React.FC<PlanningDataOfficialDataEntryProps
<Fragment>
<div><i>Planning application status is streamed using live data uploaded by local authorities to the <a href={data[0]["data_source_link"]}>{data[0]["data_source"]}</a>.</i></div>
<br/>
<div><b>Current planning application status for this site:</b> {data[0]["status"]}</div>
<div><b>Planning application ID:</b> {data[0]["planning_application_id"]}</div>
<div><b>Date registered by the planning authority (validation date)</b>: {data[0]["registered_with_local_authority_date"]}</div>
<div><b>Decision date</b>: {data[0]["decision_date"].toString()}</div>
<div><b>Planning application link</b>: TODO move here</div>
<div><b>Description of proposed work</b>: <LongText content = {data[0]["description"]} limit = {400}/></div>
<div><b>Most recent update by data provider:</b> {data[0]["decision_date"]}</div>
<div><b>Current planning application status for this site:</b> {ShowIfAvailable(data[0]["status"])}</div>
<div><b>Planning application ID:</b> {ShowIfAvailable(data[0]["planning_application_id"])}</div>
<div><b>Date registered by the planning authority (validation date)</b>: {ShowIfAvailable(data[0]["registered_with_local_authority_date"])}</div>
<div><b>Decision date</b>: {ShowIfAvailable(data[0]["decision_date"])}</div>
<div><b>Planning application link</b>: {ShowIfAvailable(data[0]["planning_application_link"])}</div>
<div><b>Description of proposed work</b>: {data[0]["description"] ? <LongText content = {data[0]["description"]} limit = {400}/> : MissingData}</div>
<div><b>Most recent update by data provider:</b> {ShowIfAvailable(data[0]["decision_date"])}</div>
<br/>
<Disclaimer />
</Fragment>

View File

@ -1,11 +1,12 @@
Following instructions assume that code is placed within `~/colouring-london/etl/planning_data/`
To install necessary dependecies use `cd ~/colouring-london/etl/planning_data/ && pip3 install -r requirements.txt`
Following scripts should be scheduled to run regularly to load livestream data into database.
```
# querying API to obtain data
python3 obtain_livestream_data.py > all_data.json
# loading data into Colouring database
python3 load_into_database
# querying API to obtain data & loading data into Colouring database
python3 obtain_livestream_data_and_load_into_database.py
# removing tile cache for planning_applications_status layer - note that location of cache depends on your configuration
rm /srv/colouring-london/tilecache/planning_applications_status/* -rf
@ -14,7 +15,7 @@ rm /srv/colouring-london/tilecache/planning_applications_status/* -rf
As loading into databases expects environment variables to be set, one option to actually schedule it in a cron is something like
```
export $(cat ~/scripts/.env | xargs) && /usr/bin/python3 ~/colouring-london/etl/planning_data/load_into_database.py
export $(cat ~/scripts/.env | xargs) && /usr/bin/python3 ~/colouring-london/etl/planning_data/obtain_livestream_data_and_load_into_database.py
```
with
@ -30,4 +31,5 @@ PGHOST=localhost
PGDATABASE=colouringlondondb
PGUSER=cldbadmin
PGPASSWORD=actualpassword
PLANNNING_DATA_API_ALLOW_REQUEST_CODE=requestcode
```

View File

@ -1,93 +0,0 @@
import json
import datetime
import psycopg2
import os
def get_connection():
return psycopg2.connect(
host=os.environ['PGHOST'],
dbname=os.environ['PGDATABASE'],
user=os.environ['PGUSER'],
password=os.environ['PGPASSWORD']
)
def filepath():
return os.path.dirname(os.path.realpath(__file__)) + os.sep + "data.json"
def insert_entry(connection, e):
elements = []
application_url = "NULL"
if e["application_url"] != None:
application_url = "'" + e["application_url"] + "'"
with connection.cursor() as cur:
cur.execute('''INSERT INTO
planning_data (planning_application_id, planning_application_link, description, registered_with_local_authority_date, decision_date, last_synced_date, status, data_source, data_source_link, uprn)
VALUES
(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
''', (e["application_id"], application_url, e["description"], e["registered_with_local_authority_date"], e["decision_date"], e["last_synced_date"], e["status"], e["data_source"], e["data_source_link"], e["uprn"]))
connection.commit()
def parse_date_string_into_datestring(incoming):
date = None
try:
date = datetime.datetime.strptime(incoming, "%d/%m/%Y") # '21/07/2022'
except ValueError:
date = datetime.datetime.strptime(incoming, "%Y-%m-%dT%H:%M:%S.%fZ") # '2022-08-08T20:07:22.238Z'
return datetime.datetime.strftime(date, "%Y-%m-%d")
def process_status(status):
"""return None if status is invalid"""
if status == "Refused":
status = "Rejected"
if status == "Appeal Received":
status = "Appeal In Progress"
if status == None:
status = "Unknown"
if (status in ["Approved", "Rejected", "Appeal In Progress", "Withdrawn", "Unknown"]):
return status
print("Unexpected status " + status)
if status not in ["No Objection to Proposal (OBS only)", "Objection Raised to Proposal (OBS only)", "Not Required", "Unknown", "Lapsed", "SECS", "Comment Issued", "ALL DECISIONS ISSUED", "Closed", "Declined to Determine"]:
print("New unexpected status " + status)
status_length_limit = 50 # see migrations/033.planning_livestream_data.up.sql
if len(status) > 50:
print("Status was too long and was skipped:", status)
return None
return status
def main():
connection = get_connection()
with connection.cursor() as cur:
cur.execute("TRUNCATE planning_data")
with open(filepath(), 'r') as content_file:
data = json.load(content_file)
if data['rawResponse']['timed_out']:
raise Exception("query getting livestream data has failed")
if data['is_partial']:
raise Exception("query getting livestream data has failed")
if data['is_running']:
raise Exception("query getting livestream data has failed")
for entry in data['rawResponse']['hits']['hits']:
description = entry['_source']['description'].strip()
application_id = entry['_source']['id']
decision_date = parse_date_string_into_datestring(entry['_source']['decision_date'])
last_synced_date = parse_date_string_into_datestring(entry['_source']['last_synced'])
uprn = entry['_source']['uprn']
status = process_status(entry['_source']['status'])
if uprn == None:
continue
entry = {
"description": description,
"decision_date": decision_date,
"last_synced_date": last_synced_date,
"application_id": application_id,
"application_url": entry['_source']['url_planning_app'],
"registered_with_local_authority_date": parse_date_string_into_datestring(entry['_source']['valid_date']),
"uprn": uprn,
"status": status,
"data_source": "Greater London Authority's Planning London DataHub",
"data_source_link": "https://data.london.gov.uk/dataset/planning-london-datahub?_gl=1%2aprwpc%2a_ga%2aMzQyOTg0MjcxLjE2NTk0NDA4NTM", # TODO test
}
insert_entry(connection, entry)
if __name__ == '__main__':
main()

View File

@ -1,105 +0,0 @@
import json
import datetime
import psycopg2
import os
def get_connection():
return psycopg2.connect(
host=os.environ['PGHOST'],
dbname=os.environ['PGDATABASE'],
user=os.environ['PGUSER'],
password=os.environ['PGPASSWORD']
)
def filepath():
return os.path.dirname(os.path.realpath(__file__)) + os.sep + "recovered.geojson"
def insert_entry(connection, e):
print(e)
elements = []
application_url = "NULL"
if e["application_url"] != None:
application_url = "'" + e["application_url"] + "'"
with connection.cursor() as cur:
cur.execute('''INSERT INTO
planning_data (planning_application_id, planning_application_link, description, registered_with_local_authority_date, decision_date, last_synced_date, status, data_source, data_source_link, uprn)
VALUES
(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
''', (e["application_id"], application_url, e["description"], e["registered_with_local_authority_date"], e["decision_date"], e["last_synced_date"], e["status"], e["data_source"], e["data_source_link"], e["uprn"]))
connection.commit()
def parse_date_string_into_datestring(incoming):
date = None
try:
date = datetime.datetime.strptime(incoming, "%d/%m/%Y") # '21/07/2022'
except ValueError:
date = datetime.datetime.strptime(incoming, "%Y-%m-%dT%H:%M:%S.%fZ") # '2022-08-08T20:07:22.238Z'
return datetime.datetime.strftime(date, "%Y-%m-%d")
def shorten_description(original_description):
description = original_description.strip()
limit = 400
if len(description) > limit:
description = ""
for entry in original_description.split():
extended = description
if extended != "":
extended += " "
extended += entry
if len(extended) <= limit:
description = extended
if description == "":
description = description[0:limit]
description += "... <i>(show more)</i>"
return description
def process_status(status):
"""return None if status is invalid"""
if status == "Refused":
status = "Rejected"
if status == "Appeal Received":
status = "Appeal In Progress"
if status == None:
status = "Unknown"
if (status in ["Approved", "Rejected", "Appeal In Progress", "Withdrawn", "Unknown"]):
return status
print("Unexpected status " + status)
if status not in ["No Objection to Proposal (OBS only)", "Objection Raised to Proposal (OBS only)", "Not Required", "Unknown", "Lapsed", "SECS", "Comment Issued", "ALL DECISIONS ISSUED", "Closed", "Declined to Determine"]:
print("New unexpected status " + status)
status_length_limit = 50 # see migrations/033.planning_livestream_data.up.sql
if len(status) > 50:
print("Status was too long and was skipped:", status)
return None
return status
def main():
connection = get_connection()
with connection.cursor() as cur:
cur.execute("TRUNCATE planning_data")
with open(filepath(), 'r') as content_file:
data = json.load(content_file)
for entry in data['features']:
description = entry['properties']['description']
application_id = "not available"
decision_date = parse_date_string_into_datestring(entry['properties']['decision_date'])
last_synced_date = parse_date_string_into_datestring(entry['properties']['decision_date'])
uprn = entry['properties']['uprn']
status = process_status(entry['properties']['status'])
if uprn == None:
continue
entry = {
"description": description,
"decision_date": decision_date,
"last_synced_date": last_synced_date,
"application_id": application_id,
"application_url": None,
"registered_with_local_authority_date": None,
"uprn": uprn,
"status": status,
"data_source": "Greater London Authority's Planning London DataHub",
"data_source_link": "https://data.london.gov.uk/dataset/planning-london-datahub?_gl=1%2aprwpc%2a_ga%2aMzQyOTg0MjcxLjE2NTk0NDA4NTM", # TODO test
}
insert_entry(connection, entry)
if __name__ == '__main__':
main()

View File

@ -1,87 +0,0 @@
import json
import jsbeautifier
import make_query
def main():
output = make_query.obtain_data(get_query())
# print(json.dumps(output))
opts = jsbeautifier.default_options()
opts.indent_size = 2
print(jsbeautifier.beautify(json.dumps(output), opts))
def get_query():
true = True # makes possible to copy JSON into Python code
return {
"params": {
"ignoreThrottled": true,
"index": "applications",
"body": {
"version": true,
"size": 500,
"sort": [
{
"last_updated": {
"order": "desc",
"unmapped_type": "boolean"
}
}
],
"aggs": {
"2": {
"date_histogram": {
"field": "last_updated",
"calendar_interval": "1d",
"time_zone": "Europe/London",
"min_doc_count": 1
}
}
},
"stored_fields": [
"*"
],
"script_fields": {},
"docvalue_fields": [],
"_source": {
"excludes": []
},
"query": {
"bool": {
"must": [],
"filter": [
{
"range": {
"decision_date": {
"gte": "1922-01-01T00:00:00.000Z",
"format": "strict_date_optional_time"
}
}
}
],
"should": [],
"must_not": []
}
},
"highlight": {
"pre_tags": [
"@kibana-highlighted-field@"
],
"post_tags": [
"@/kibana-highlighted-field@"
],
"fields": {
"*": {}
},
"fragment_size": 2147483647
}
},
"rest_total_hits_as_int": true,
"ignore_unavailable": true,
"ignore_throttled": true,
"timeout": "30000ms"
}
}
if __name__ == '__main__':
main()

View File

@ -0,0 +1,175 @@
import requests
import json
import json
import datetime
import psycopg2
import os
def main():
connection = get_connection()
cursor = connection.cursor()
cursor.execute("TRUNCATE planning_data")
downloaded = 0
last_sort = None
search_after = []
while True:
data = query(search_after).json()
load_data_into_database(cursor, data)
for entry in data['hits']['hits']:
downloaded += 1
last_sort = entry['sort']
print("downloaded", downloaded, "last_sort", last_sort, "previous", search_after)
if search_after == last_sort:
break
search_after = last_sort
connection.commit()
def load_data_into_database(cursor, data):
if "timed_out" not in data:
print(json.dumps(data, indent = 4))
print("timed_out field missing in provided data")
else:
if data['timed_out']:
raise Exception("query getting livestream data has failed")
for entry in data['hits']['hits']:
try:
description = None
if entry['_source']['description'] != None:
description = entry['_source']['description'].strip()
application_id = entry['_source']['id']
decision_date = parse_date_string_into_datestring(entry['_source']['decision_date'])
last_synced_date = parse_date_string_into_datestring(entry['_source']['last_synced'])
uprn = entry['_source']['uprn']
status = process_status(entry['_source']['status'])
if uprn == None:
continue
try:
uprn = int(uprn)
except ValueError as e:
print(e)
continue
entry = {
"description": description,
"decision_date": decision_date,
"last_synced_date": last_synced_date,
"application_id": application_id,
"application_url": entry['_source']['url_planning_app'],
"registered_with_local_authority_date": parse_date_string_into_datestring(entry['_source']['valid_date']),
"uprn": uprn,
"status": status,
"data_source": "Greater London Authority's Planning London DataHub",
"data_source_link": "https://data.london.gov.uk/dataset/planning-london-datahub?_gl=1%2aprwpc%2a_ga%2aMzQyOTg0MjcxLjE2NTk0NDA4NTM", # TODO test
}
insert_entry(cursor, entry)
except TypeError as e:
print()
print()
print()
print(e)
print()
print(json.dumps(entry, indent = 4))
def query(search_after):
headers = {
'X-API-AllowRequest': os.environ['PLANNNING_DATA_API_ALLOW_REQUEST_CODE'],
# Already added when you pass json= but not when you pass data=
# 'Content-Type': 'application/json',
}
json_data = {
'size': 10000,
'sort': [
{
'last_updated': {
'order': 'desc',
'unmapped_type': 'boolean',
},
},
],
'stored_fields': [
'*',
],
'_source': {
'excludes': [],
},
'query': {
'bool': {
'must': [
{
'range': {
'valid_date': {
'gte': '01/01/1021',
},
},
},
],
},
},
}
if search_after != []:
json_data['search_after'] = search_after
print(json_data)
return requests.post('https://planningdata.london.gov.uk/api-guest/applications/_search', headers=headers, json=json_data)
def get_connection():
return psycopg2.connect(
host=os.environ['PGHOST'],
dbname=os.environ['PGDATABASE'],
user=os.environ['PGUSER'],
password=os.environ['PGPASSWORD']
)
def filepath():
return os.path.dirname(os.path.realpath(__file__)) + os.sep + "data.json"
def insert_entry(cursor, e):
elements = []
application_url = None
if e["application_url"] != None:
application_url = "'" + e["application_url"] + "'"
cursor.execute('''INSERT INTO
planning_data (planning_application_id, planning_application_link, description, registered_with_local_authority_date, decision_date, last_synced_date, status, data_source, data_source_link, uprn)
VALUES
(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
''', (e["application_id"], application_url, e["description"], e["registered_with_local_authority_date"], e["decision_date"], e["last_synced_date"], e["status"], e["data_source"], e["data_source_link"], e["uprn"]))
def parse_date_string_into_datestring(incoming):
if incoming == None:
return None
date = None
try:
date = datetime.datetime.strptime(incoming, "%d/%m/%Y") # '21/07/2022'
except ValueError:
date = datetime.datetime.strptime(incoming, "%Y-%m-%dT%H:%M:%S.%fZ") # '2022-08-08T20:07:22.238Z'
return datetime.datetime.strftime(date, "%Y-%m-%d")
def process_status(status):
"""return None if status is invalid"""
if status in ["Application Under Consideration", "Application Received"]:
status = "Submitted"
if status in ["Refused", "Refusal", "Refusal (P)", "Application Invalid", "Insufficient Fee"]:
status = "Rejected"
if status == "Appeal Received":
status = "Appeal In Progress"
if status == "Completed":
status = "Approved"
if status in [None, "NOT_MAPPED"]:
status = "Unknown"
if (status in ["Submitted", "Approved", "Rejected", "Appeal In Progress", "Withdrawn", "Unknown"]):
return status
print("Unexpected status " + status)
if status not in ["No Objection to Proposal (OBS only)", "Objection Raised to Proposal (OBS only)", "Not Required", "Unknown", "Lapsed", "SECS", "Comment Issued", "ALL DECISIONS ISSUED", "Closed", "Declined to Determine"]:
print("New unexpected status " + status)
status_length_limit = 50 # see migrations/033.planning_livestream_data.up.sql
if len(status) > 50:
print("Status was too long and was skipped:", status)
return None
return status
if __name__ == '__main__':
main()

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,3 @@
# Python packages for planning data import
psycopg2==2.8.6
requests==2.27.1