right-tree/backend/right_tree/api/views.py

317 lines
11 KiB
Python
Raw Normal View History

import json
import stripe
from datetime import timedelta
from django.conf import settings
from django.contrib.gis.geos import Point
from django.http import JsonResponse, HttpResponseNotAllowed, HttpResponseNotFound, HttpResponseBadRequest, FileResponse
from django.shortcuts import get_object_or_404, redirect
from django.urls import reverse
from django.utils import timezone
from django.utils.text import slugify
from rest_framework import viewsets, permissions
from rest_framework.response import Response
from .models import Habitat, HabitatImage, Plant, EcologicalDistrictLayer, SoilOrder, Zone, Questionnaire, ActivationKey, ActivationKeySet, Customer, CustomerAddress
from .serializers import HabitatImageSerializer, HabitatSerializer, PlantSerializer, SoilOrderSerializer, EcologicalDistrictLayerSerializer, AddressSerializer, ZoneSerializer, QuestionnaireSerializer
2023-04-20 15:12:22 +12:00
from .filters import get_filtered_plants, is_in_auckland, is_in_christchurch
2023-02-08 14:16:07 +13:00
from .wms_utils import get_address_from_coordinates, search_address
from .resource_generation_utils import generate_csv, get_filter_values, serialize_plants_queryset, create_planting_guide_pdf, PLANTING_GUIDE_PDF_FILENAME, storage
from .redis import redis_client
class PlantViewSet(viewsets.ModelViewSet):
""" Filtered viewset for plants.
"""
queryset = Plant.objects.all()
serializer_class = PlantSerializer
def get_queryset(self):
""" Filtering plant query set by query parameters in the URL.
(May want to eventually use django filters to break up the logic...)
"""
2021-11-10 14:15:25 +13:00
return get_filtered_plants(self.request)
class SoilOrderViewSet(viewsets.ModelViewSet):
""" Filtered viewset for soil details.
"""
serializer_class = SoilOrderSerializer
def get_queryset(self):
""" Filtering soil order query set by coordinate parameters in the URL.
"""
try:
lat = float(self.request.query_params["lat"])
lng = float(self.request.query_params["lng"])
except (KeyError, ValueError):
return SoilOrder.objects.all()
return SoilOrder.objects.filter(soillayer__geom__intersects=Point(lng, lat, srid=4326))
class EcologicalDistrictViewSet(viewsets.ModelViewSet):
""" Filtered viewset for ecological district/region details.
"""
serializer_class = EcologicalDistrictLayerSerializer
def get_queryset(self):
""" Filtering ecological district/region query set by coordinate parameters in the URL.
"""
try:
lat = float(self.request.query_params["lat"])
lng = float(self.request.query_params["lng"])
except (KeyError, ValueError):
return EcologicalDistrictLayer.objects.all()
return EcologicalDistrictLayer.objects.filter(geom__intersects=Point(lng, lat, srid=4326))
class LINZPropertyViewSet(viewsets.ViewSet):
""" Filtered viewset for ecological district/region details.
"""
def list(self, request):
try:
lat = float(self.request.query_params["lat"])
lng = float(self.request.query_params["lng"])
except (KeyError, ValueError):
lat = lng = None
2023-02-08 14:16:07 +13:00
address = self.request.query_params.get('search')
if address is not None:
results = search_address(address)
return Response(results)
elif lat and lng:
address_data = get_address_from_coordinates(Point(lng, lat, srid=4326))
serializer = AddressSerializer(address_data)
return Response(serializer.data)
return HttpResponseBadRequest("Invalid parameters.")
class AuckCHCHRegionInformation(viewsets.ViewSet):
""" Filtered viewset defining if coordinate falls inside auckland and chch regions.
"""
def list(self, request):
try:
lat = float(self.request.query_params["lat"])
lng = float(self.request.query_params["lng"])
except (KeyError, ValueError):
return HttpResponseBadRequest("Missing or invalid coordinates.")
p = Point(lng, lat, srid=4326)
in_chch = False
in_auckland = False
# can avoid computing intersections for Auckland if we use a conditional here
if is_in_christchurch(p):
in_chch = True
elif is_in_auckland(p):
in_auckland = True
return Response({"in_chch": in_chch, "in_auckland": in_auckland})
class HabitatViewSet(viewsets.ModelViewSet):
""" Viewset for all habitats.
"""
serializer_class = HabitatSerializer
queryset = Habitat.objects.all()
class ZoneViewSet(viewsets.ModelViewSet):
""" Viewset for all habitats.
"""
serializer_class = ZoneSerializer
queryset = Zone.objects.all()
class HabitatImageViewSet(viewsets.ViewSet):
""" Viewset for a habitat image.
"""
def list(self, request):
queryset = HabitatImage.objects.all()
serializer = HabitatImageSerializer(queryset, many=True)
return Response(serializer.data)
def retrieve(self, request, pk=None):
queryset = HabitatImage.objects.all()
habitat_image = get_object_or_404(queryset, pk=pk)
serializer = HabitatImageSerializer(habitat_image)
return Response(serializer.data)
2021-11-08 17:00:04 +13:00
class CSVDownloadView(viewsets.ViewSet):
""" Viewset for a downloading a CSV plant list and filters.
"""
2021-11-08 17:00:04 +13:00
def list(self, request, *args, **kwargs):
2021-11-10 14:15:25 +13:00
filtered_plants = get_filtered_plants(request)
plant_data = serialize_plants_queryset(filtered_plants)
filename = f"plants_{slugify(timezone.now())}.csv"
generate_csv(plant_data, filename)
2021-11-08 17:00:04 +13:00
return FileResponse(
storage.open(filename, 'rb'),
filename='plants.csv',
content_type='text/csv',
)
class PDFDownloadView(viewsets.ViewSet):
""" Viewset for a downloading a PDF planting guide with appended filter and plant list info.
"""
def list(self, request, *args, **kwargs):
filter_data = get_filter_values(request.query_params)
filtered_plants = get_filtered_plants(request)
plant_data = serialize_plants_queryset(filtered_plants)
filename = f"planting_guide_{slugify(timezone.now())}.pdf"
create_planting_guide_pdf(filter_data, plant_data, filename)
return FileResponse(
storage.open(filename, 'rb'),
filename=PLANTING_GUIDE_PDF_FILENAME,
content_type='application/pdf',
)
class QuestionnaireViewSet(viewsets.ModelViewSet):
serializer_class = QuestionnaireSerializer
queryset = Questionnaire.objects.all()
http_method_names = ("post",)
permission_classes = [permissions.AllowAny]
def validate_key(request):
"""Checks if a given key value is valid"""
if request.method == "GET":
data = request.GET
elif request.method != "POST":
return HttpResponseNotAllowed()
else:
try:
data = request.POST or json.loads(request.body)
except json.JSONDecodeError as e:
return HttpResponseBadRequest(e)
key = data.get("key")
if not key:
return HttpResponseBadRequest("'key' not specified")
try:
ak = ActivationKey.objects.get(key=key)
except ActivationKey.DoesNotExist:
return HttpResponseNotFound()
if ak.remaining_activations > 0:
# valid key, permit entry
return JsonResponse({"type": ak.key_set.name})
elif ak.activations == 1:
# key has been activated, but can return the existing data from that activation
return JsonResponse(QuestionnaireSerializer(ak.questionnaire_set.first()).data)
# key has multiple activations, but all are expended
# could return most recent questionnaire but user who uses many-use keys probably doesn't care
return HttpResponseBadRequest()
def activate_key(request):
"""Adds a single activation to a given key if a Stripe payment has succeeded"""
stripe.api_key = settings.STRIPE_API_KEY
redirect_url = "/"
key = request.GET['key']
stripe_session_id = redis_client.get(key).decode()
stripe_session = stripe.checkout.Session.retrieve(stripe_session_id)
is_physical = stripe_session.metadata.get("physical") == "true"
kwargs = {}
if is_physical:
address, _ = CustomerAddress.objects.get_or_create(
city=stripe_session.customer_details.address['city'],
line1=stripe_session.customer_details.address['line1'],
line2=stripe_session.customer_details.address['line2'],
postal_code=int(stripe_session.customer_details.address['postal_code']),
)
customer, _ = Customer.objects.get_or_create(
email=stripe_session.customer_details.email,
name=stripe_session.customer_details.name,
address=address,
)
key_set, _ = ActivationKeySet.objects.get_or_create(
name=settings.STRIPE_PHYSICAL_KEY_SET,
size=0,
)
kwargs["customer"] = customer
else:
key_set, _ = ActivationKeySet.objects.get_or_create(
name=settings.STRIPE_DIGITAL_KEY_SET,
size=0,
)
if stripe_session.payment_status == "paid":
ActivationKey.objects.create(key=key, key_set=key_set, **kwargs)
redis_client.delete(key)
redirect_url += "?key=" + key
return redirect(redirect_url)
def purchase_key(request):
"""Generate a prospective key and redirect to the Stripe payment portal"""
stripe.api_key = settings.STRIPE_API_KEY
2023-04-20 15:12:22 +12:00
price_id = settings.STRIPE_DIGITAL_PRICE_ID
extra_kwargs = {}
key = ActivationKey.key_default()
redirect_url = request.build_absolute_uri(reverse(activate_key)) + f"?key={key}"
# requesting checkout for physical copy
if request.GET.get("physical", "").lower() in {"t", "true", "y", "yes", "1"}:
price_id = settings.STRIPE_PHYSICAL_PRICE_ID
extra_kwargs = {
'shipping_address_collection': {
'allowed_countries': ['NZ'],
},
'metadata': {
'physical': 'true',
},
}
stripe_session = stripe.checkout.Session.create(
line_items=[
{
"price": price_id,
"quantity": 1,
},
],
automatic_tax={'enabled': True},
invoice_creation={
'enabled': True,
'invoice_data': {
2023-03-29 16:37:39 +13:00
'description': f'Your activation key is {key}',
'rendering_options': {'amount_tax_display': 'include_inclusive_tax'},
'footer': 'BioSphere Capital Limited',
},
},
mode='payment',
success_url=redirect_url,
cancel_url=redirect_url,
**extra_kwargs,
)
redis_client.setex(key, timedelta(hours=8), stripe_session.id)
return redirect(stripe_session.url)