right-tree/backend/right_tree/api/tasks.py

55 lines
1.7 KiB
Python
Raw Permalink Normal View History

import json
import logging
import pathlib
import zipfile
from celery import shared_task
from django.utils import timezone
from .models import Questionnaire, Export
from .resource_generation_utils import create_planting_guide_pdf, get_filter_values, serialize_plants_queryset, storage
@shared_task
def generate_pdf(questionnaire_id, export_id):
q = Questionnaire.objects.get(pk=questionnaire_id)
z = q.zone
2023-03-30 15:29:23 +13:00
export = Export.objects.get(pk=export_id)
filename = f"export_{export.pk}/{q.slug}.pdf"
try:
create_planting_guide_pdf(
get_filter_values({ # awful hack to reuse some logic we already have
'coordinates': {'lat': q.location.y, 'lng': q.location.x},
'soilVariant': q.soil_variant.name[0],
'habitat': json.dumps({'name': z.habitat.name}),
'zone': json.dumps({'name': z.name, 'variant': z.variant, 'refined_variant': z.refined_variant}),
}),
serialize_plants_queryset(q.plants),
filename,
)
except Exception as e:
logging.warning(e)
else:
if not storage.exists(filename):
raise FileNotFoundError(f"There was an error creating file: {filename}")
2023-03-30 15:29:23 +13:00
if export.completion >= 1:
generate_zip.delay(export_id)
@shared_task
def generate_zip(export_id):
export = Export.objects.get(pk=export_id)
zfilepath = storage.path(f"export_{export_id}/export.zip")
with zipfile.ZipFile(zfilepath, 'w', zipfile.ZIP_DEFLATED) as zf:
for q in export.questionnaires.all():
fpath = pathlib.Path(storage.path(f"export_{export_id}/{q.slug}.pdf"))
zf.write(fpath, fpath.name)
export.completion_date = timezone.now()
export.save()