right-tree/backend/right_tree/api/wms_utils.py
2023-02-08 14:48:24 +13:00

134 lines
3.8 KiB
Python

import json
import os
import re
import requests
from urllib.parse import urlencode
from unicodedata import normalize
from django.contrib.gis.geos import Point, GEOSGeometry
from django.conf import settings
LINZ_API_KEY = os.getenv("LINZ_API_KEY")
LINZ_WFS_ENDPOINT = f"https://data.linz.govt.nz/services;key={LINZ_API_KEY}/wfs"
PROPERTY_TILE_LAYER = "layer-50804"
PROPERTY_INFO_LAYER = "layer-53353"
search_num = re.compile(r'((?<!\S)\d+)')
search_str = re.compile(r'((?<!\S)[a-zA-Z\-]+)')
class WFSError(Exception):
pass
def get_point_from_coordinates(coordinates):
"""Given a coordinates json string, returns the coordinates as a Point object"""
coordinates_json = json.loads(coordinates)
pnt = Point(coordinates_json["lng"],
coordinates_json["lat"], srid=4326)
return pnt
def wfs_getfeature(endpoint, **kwargs):
"""Perform a WFS request with the with all keyword arguments as parameters
within the URL query, returning the JSON-formatted response"""
params = {
'service': "WFS",
'request': "GetFeature",
'outputFormat': "application/json",
**kwargs,
}
query = urlencode(params)
url = f"{endpoint}?{query}"
response = requests.get(url)
try:
return response.json()
except json.JSONDecodeError as e:
raise WFSError(
f"Failed to make WFS request to {url}: {response.content}")
def linz_wfs_getfeature(**kwargs):
"""Perform a WFS request via the LINZ endpoint"""
return wfs_getfeature(LINZ_WFS_ENDPOINT, version="2.0.0", **kwargs)
def intersecting_property(coordinates):
"""Finds the property that intersects with the coordinates and returns its definition as a dictionary"""
point = get_point_from_coordinates(coordinates)
cql_filter = f"Intersects(shape,{point})"
json = linz_wfs_getfeature(
typeNames=PROPERTY_TILE_LAYER, cql_filter=cql_filter, count=1)
features = json.get("features")
if len(features) > 0:
return features[0]
def get_address(polygon):
"""Fetch the address string that intersects with the specified polygon"""
# Don't perform request if the input polygon WKT is too long to fit into URL
# Limit is 2048 characters
if len(str(polygon)) > 1500:
return None
cql_filter = f"Within(shape,{polygon})"
json = linz_wfs_getfeature(
typeNames=PROPERTY_INFO_LAYER, cql_filter=cql_filter, count=1)
features = json.get("features")
if len(features) > 0:
feature = features[0]
return feature['properties']
def get_address_from_coordinates(coordinates):
propertyPolygon = intersecting_property(coordinates)
if propertyPolygon is not None:
prop_geom = GEOSGeometry(json.dumps(propertyPolygon['geometry']))
return get_address(prop_geom)
def search_address(address):
# normalize accent characters etc.
address = normalize(
"NFKD",
address.strip().lower(),
).encode("ascii", "ignore").decode()
nums = search_num.findall(address)
strings = search_str.findall(address)
num_terms = [f"address_number = {n}" for n in nums]
string_terms = []
for s in strings:
string_terms += [
f"full_road_name_ascii ILIKE '{s}%'",
f"town_city_ascii ILIKE '{s}%'",
f"suburb_locality_ascii ILIKE '{s}%'",
]
num_expr = " OR ".join(f"({term})" for term in num_terms)
string_expr = " OR ".join(f"({term})" for term in string_terms)
cql_filter = f"({num_expr}) AND ({string_expr})"
resp = linz_wfs_getfeature(
typeNames=PROPERTY_INFO_LAYER,
cql_filter=cql_filter,
count=10,
)
features = resp.get("features", [])
return [
{
'coordinates': feature['geometry']['coordinates'],
'address': feature['properties']['full_address'],
} for feature in features
]