import json import stripe from datetime import timedelta from django.conf import settings from django.contrib.gis.geos import Point from django.http import JsonResponse, HttpResponseNotAllowed, HttpResponseNotFound, HttpResponseBadRequest, FileResponse from django.shortcuts import get_object_or_404, redirect from django.urls import reverse from django.utils import timezone from django.utils.text import slugify from rest_framework import viewsets, permissions from rest_framework.response import Response from .models import Habitat, HabitatImage, Plant, EcologicalDistrictLayer, SoilOrder, Zone, Questionnaire, ActivationKey, ActivationKeySet, Customer, CustomerAddress from .serializers import HabitatImageSerializer, HabitatSerializer, PlantSerializer, SoilOrderSerializer, EcologicalDistrictLayerSerializer, AddressSerializer, ZoneSerializer, QuestionnaireSerializer from .filters import * from .wms_utils import get_address_from_coordinates, search_address from .resource_generation_utils import generate_csv, get_filter_values, serialize_plants_queryset, create_planting_guide_pdf, PLANTING_GUIDE_PDF_FILENAME, storage from .redis import redis_client class PlantViewSet(viewsets.ModelViewSet): """ Filtered viewset for plants. """ queryset = Plant.objects.all() serializer_class = PlantSerializer def get_queryset(self): """ Filtering plant query set by query parameters in the URL. (May want to eventually use django filters to break up the logic...) """ return get_filtered_plants(self.request) class SoilOrderViewSet(viewsets.ModelViewSet): """ Filtered viewset for soil details. """ serializer_class = SoilOrderSerializer def get_queryset(self): """ Filtering soil order query set by coordinate parameters in the URL. """ try: lat = float(self.request.query_params["lat"]) lng = float(self.request.query_params["lng"]) except (KeyError, ValueError): return SoilOrder.objects.all() return SoilOrder.objects.filter(soillayer__geom__intersects=Point(lng, lat, srid=4326)) class EcologicalDistrictViewSet(viewsets.ModelViewSet): """ Filtered viewset for ecological district/region details. """ serializer_class = EcologicalDistrictLayerSerializer def get_queryset(self): """ Filtering ecological district/region query set by coordinate parameters in the URL. """ try: lat = float(self.request.query_params["lat"]) lng = float(self.request.query_params["lng"]) except (KeyError, ValueError): return EcologicalDistrictLayer.objects.all() return EcologicalDistrictLayer.objects.filter(geom__intersects=Point(lng, lat, srid=4326)) class LINZPropertyViewSet(viewsets.ViewSet): """ Filtered viewset for ecological district/region details. """ def list(self, request): try: lat = float(self.request.query_params["lat"]) lng = float(self.request.query_params["lng"]) except (KeyError, ValueError): lat = lng = None address = self.request.query_params.get('search') if address is not None: results = search_address(address) return Response(results) elif lat and lng: address_data = get_address_from_coordinates(Point(lng, lat, srid=4326)) serializer = AddressSerializer(address_data) return Response(serializer.data) return HttpResponseBadRequest("Invalid parameters.") class AuckCHCHRegionInformation(viewsets.ViewSet): """ Filtered viewset defining if coordinate falls inside auckland and chch regions. """ def list(self, request): try: lat = float(self.request.query_params["lat"]) lng = float(self.request.query_params["lng"]) except (KeyError, ValueError): return HttpResponseBadRequest("Missing or invalid coordinates.") p = Point(lng, lat, srid=4326) in_chch = False in_auckland = False # can avoid computing intersections for Auckland if we use a conditional here if is_in_christchurch(p): in_chch = True elif is_in_auckland(p): in_auckland = True return Response({"in_chch": in_chch, "in_auckland": in_auckland}) class HabitatViewSet(viewsets.ModelViewSet): """ Viewset for all habitats. """ serializer_class = HabitatSerializer queryset = Habitat.objects.all() class ZoneViewSet(viewsets.ModelViewSet): """ Viewset for all habitats. """ serializer_class = ZoneSerializer queryset = Zone.objects.all() class HabitatImageViewSet(viewsets.ViewSet): """ Viewset for a habitat image. """ def list(self, request): queryset = HabitatImage.objects.all() serializer = HabitatImageSerializer(queryset, many=True) return Response(serializer.data) def retrieve(self, request, pk=None): queryset = HabitatImage.objects.all() habitat_image = get_object_or_404(queryset, pk=pk) serializer = HabitatImageSerializer(habitat_image) return Response(serializer.data) class CSVDownloadView(viewsets.ViewSet): """ Viewset for a downloading a CSV plant list and filters. """ def list(self, request, *args, **kwargs): filtered_plants = get_filtered_plants(request) plant_data = serialize_plants_queryset(filtered_plants) filename = f"plants_{slugify(timezone.now())}.csv" generate_csv(plant_data, filename) return FileResponse( storage.open(filename, 'rb'), filename='plants.csv', content_type='text/csv', ) class PDFDownloadView(viewsets.ViewSet): """ Viewset for a downloading a PDF planting guide with appended filter and plant list info. """ def list(self, request, *args, **kwargs): filter_data = get_filter_values(request.query_params) filtered_plants = get_filtered_plants(request) plant_data = serialize_plants_queryset(filtered_plants) filename = f"planting_guide_{slugify(timezone.now())}.pdf" create_planting_guide_pdf(filter_data, plant_data, filename) return FileResponse( storage.open(filename, 'rb'), filename=PLANTING_GUIDE_PDF_FILENAME, content_type='application/pdf', ) class QuestionnaireViewSet(viewsets.ModelViewSet): serializer_class = QuestionnaireSerializer queryset = Questionnaire.objects.all() http_method_names = ("post",) permission_classes = [permissions.AllowAny] def validate_key(request): """Checks if a given key value is valid""" if request.method == "GET": data = request.GET elif request.method != "POST": return HttpResponseNotAllowed() else: try: data = request.POST or json.loads(request.body) except json.JSONDecodeError as e: return HttpResponseBadRequest(e) key = data.get("key") if not key: return HttpResponseBadRequest("'key' not specified") try: ak = ActivationKey.objects.get(key=key) except ActivationKey.DoesNotExist: return HttpResponseNotFound() if ak.remaining_activations > 0: # valid key, permit entry return JsonResponse({"type": ak.key_set.name}) elif ak.activations == 1: # key has been activated, but can return the existing data from that activation return JsonResponse(QuestionnaireSerializer(ak.questionnaire_set.first()).data) # key has multiple activations, but all are expended # could return most recent questionnaire but user who uses many-use keys probably doesn't care return HttpResponseBadRequest() def activate_key(request): """Adds a single activation to a given key if a Stripe payment has succeeded""" stripe.api_key = settings.STRIPE_API_KEY redirect_url = "/" key = request.GET['key'] stripe_session_id = redis_client.get(key).decode() stripe_session = stripe.checkout.Session.retrieve(stripe_session_id) is_physical = stripe_session.metadata.get("physical") == "true" kwargs = {} if is_physical: address, _ = CustomerAddress.objects.get_or_create( city=stripe_session.customer_details.address['city'], line1=stripe_session.customer_details.address['line1'], line2=stripe_session.customer_details.address['line2'], postal_code=int(stripe_session.customer_details.address['postal_code']), ) customer, _ = Customer.objects.get_or_create( email=stripe_session.customer_details.email, name=stripe_session.customer_details.name, address=address, ) key_set, _ = ActivationKeySet.objects.get_or_create( name=settings.STRIPE_PHYSICAL_KEY_SET, size=0, ) kwargs["customer"] = customer else: key_set, _ = ActivationKeySet.objects.get_or_create( name=settings.STRIPE_DIGITAL_KEY_SET, size=0, ) if stripe_session.payment_status == "paid": ActivationKey.objects.create(key=key, key_set=key_set, **kwargs) redis_client.delete(key) redirect_url += "?key=" + key return redirect(redirect_url) def purchase_key(request): """Generate a prospective key and redirect to the Stripe payment portal""" stripe.api_key = settings.STRIPE_API_KEY price_id = settings.STRIPE_DIGITAL_PRICE_ID extra_kwargs = {} key = ActivationKey.key_default() redirect_url = request.build_absolute_uri(reverse(activate_key)) + f"?key={key}" # requesting checkout for physical copy if request.GET.get("physical", "").lower() in {"t", "true", "y", "yes", "1"}: price_id = settings.STRIPE_PHYSICAL_PRICE_ID extra_kwargs = { 'shipping_address_collection': { 'allowed_countries': ['NZ'], }, 'metadata': { 'physical': 'true', }, } stripe_session = stripe.checkout.Session.create( line_items=[ { "price": price_id, "quantity": 1, }, ], automatic_tax={'enabled': True}, invoice_creation={ 'enabled': True, 'invoice_data': { 'description': f'Your activation key is {key}', 'rendering_options': {'amount_tax_display': 'include_inclusive_tax'}, 'footer': 'BioSphere Capital Limited', }, }, mode='payment', success_url=redirect_url, cancel_url=redirect_url, **extra_kwargs, ) redis_client.setex(key, timedelta(hours=8), stripe_session.id) return redirect(stripe_session.url)