right-tree/backend/right_tree/api/wms_utils.py

88 lines
2.6 KiB
Python
Raw Normal View History

import json
import os
import requests
from urllib.parse import urlencode
from django.contrib.gis.geos import Point, GEOSGeometry
from django.conf import settings
LINZ_API_KEY = os.getenv("LINZ_API_KEY")
LINZ_WFS_ENDPOINT = f"https://data.linz.govt.nz/services;key={LINZ_API_KEY}/wfs"
PROPERTY_TILE_LAYER = "layer-50804"
PROPERTY_INFO_LAYER = "layer-53353"
class WFSError(Exception):
pass
def get_point_from_coordinates(coordinates):
"""Given a coordinates json string, returns the coordinates as a Point object"""
coordinates_json = json.loads(coordinates)
pnt = Point(coordinates_json["lng"],
coordinates_json["lat"], srid=4326)
return pnt
def wfs_getfeature(endpoint, **kwargs):
"""Perform a WFS request with the with all keyword arguments as parameters
within the URL query, returning the JSON-formatted response"""
params = {
'service': "WFS",
'request': "GetFeature",
'outputFormat': "application/json",
**kwargs,
}
query = urlencode(params)
url = f"{endpoint}?{query}"
response = requests.get(url)
try:
return response.json()
except json.JSONDecodeError as e:
raise WFSError(
f"Failed to make WFS request to {url}: {response.content}")
def linz_wfs_getfeature(**kwargs):
"""Perform a WFS request via the LINZ endpoint"""
return wfs_getfeature(LINZ_WFS_ENDPOINT, version="2.0.0", **kwargs)
def intersecting_property(coordinates):
"""Finds the property that intersects with the coordinates and returns its definition as a dictionary"""
point = get_point_from_coordinates(coordinates)
cql_filter = f"Intersects(shape,{point})"
json = linz_wfs_getfeature(
typeNames=PROPERTY_TILE_LAYER, cql_filter=cql_filter, count=1)
features = json.get("features")
if len(features) > 0:
return features[0]
def get_address(polygon):
"""Fetch the address string that intersects with the specified polygon"""
# Don't perform request if the input polygon WKT is too long to fit into URL
# Limit is 2048 characters
if len(str(polygon)) > 1500:
return None
cql_filter = f"Within(shape,{polygon})"
json = linz_wfs_getfeature(
typeNames=PROPERTY_INFO_LAYER, cql_filter=cql_filter, count=1)
features = json.get("features")
if len(features) > 0:
feature = features[0]
return feature['properties']
def get_address_from_coordinates(coordinates):
propertyPolygon = intersecting_property(coordinates)
if propertyPolygon is not None:
prop_geom = GEOSGeometry(json.dumps(propertyPolygon['geometry']))
return get_address(prop_geom)