right-tree/backend/right_tree/api/tasks.py
Matthew Northcott 3f9f816a7e [#40] Bulk PDF export
- backend changes
2023-02-22 15:08:30 +13:00

53 lines
1.7 KiB
Python

import json
import logging
import pathlib
import zipfile
from celery import shared_task
from django.utils import timezone
from .models import Questionnaire, Export
from .resource_generation_utils import create_planting_guide_pdf, get_filter_values, serialize_plants_queryset, storage
@shared_task
def generate_pdf(questionnaire_id, export_id):
q = Questionnaire.objects.get(pk=questionnaire_id)
e = Export.objects.get(pk=export_id)
z = q.zone
filename = f"export_{e.pk}/{q.slug}.pdf"
try:
create_planting_guide_pdf(
get_filter_values({ # awful hack to reuse some logic we already have
'coordinates': {'lat': q.location.y, 'lng': q.location.x},
'soilVariant': q.soil_variant.name[0],
'habitat': json.dumps({'name': z.habitat.name}),
'zone': json.dumps({'name': z.name, 'variant': z.variant, 'refined_variant': z.refined_variant}),
}),
serialize_plants_queryset(q.plants),
filename,
)
except Exception as e:
logging.warning(e)
else:
if not storage.exists(filename):
raise FileNotFoundError(f"There was an error creating file: {filename}")
finally:
if e.completion >= 1:
generate_zip.delay(export_id)
@shared_task
def generate_zip(export_id):
export = Export.objects.get(pk=export_id)
zfilepath = storage.path(f"export_{export_id}/export.zip")
with zipfile.ZipFile(zfilepath, 'w', zipfile.ZIP_DEFLATED) as zf:
for q in export.questionnaires.all():
fpath = pathlib.Path(storage.path(f"export_{export_id}/{q.slug}.pdf"))
zf.write(fpath, fpath.name)
export.completion_date = timezone.now()
export.save()