Skip to main content

Background Jobs

Cron jobs trigger API routes to process queued operations and scheduled tasks.
Terra has two types of background processing:
  1. Queue Processor — Processes async operations (webhooks, emails, Airtable syncs)
  2. Lifecycle Worker — Handles scheduled form lifecycle events (auto-publish, auto-close)

Queue Processor

The queue processor handles async operations that were enqueued during form submission.
// /api/queue/process/route.ts

export async function GET() {
  const operations = await getPendingOperations(undefined, 10);

  const results = await Promise.allSettled(
    operations.map(processOperation)
  );

  return Response.json({
    processed: operations.length,
    succeeded: results.filter(r => r.status === "fulfilled").length,
    failed: results.filter(r => r.status === "rejected").length,
  });
}

Operation Types

async function processOperation(op: AsyncOperation) {
  const claimed = await claimOperation(op.id);
  if (!claimed) return;

  try {
    switch (op.operation_type) {
      case "webhook":
        await fireWebhook(op.payload);
        break;
      case "airtable_sync":
        await syncToAirtable(op.payload);
        break;
      case "email":
        await sendEmail(op.payload);
        break;
      case "sms":
        await sendSms(op.payload);
        break;
    }
    await completeOperation(op.id);
  } catch (error) {
    await failOperation(op.id, error.message);
  }
}

Lifecycle Worker

The lifecycle worker handles scheduled form events and cleanup tasks.
// src/lib/worker.ts

export async function runWorkerTasks(): Promise<WorkerResult> {
  const [
    formsPublished,
    formsClosed,
    formsFrozen,
    sessionsCleanedUp,
    softDeleteCleanup,
  ] = await Promise.all([
    processScheduledPublishes(),
    processScheduledCloses(),
    processAutoFreeze(),
    cleanupExpiredSessions(),
    cleanupSoftDeletedRecords(),
  ]);

  return {
    formsPublished,
    formsClosed,
    formsFrozen,
    sessionsCleanedUp,
    softDeleteCleanup,
    duration: Date.now() - startTime,
  };
}

Task Breakdown

TaskDescriptionFrequency
processScheduledPublishesPublishes forms where published_at has passedEvery minute
processScheduledClosesCloses forms where closes_at has passedEvery minute
processAutoFreezeFreezes forms after first submissionEvery minute
cleanupExpiredSessionsRemoves abandoned draft sessions (7+ days)Every minute
cleanupSoftDeletedRecordsPermanently deletes 30+ day old soft-deleted recordsEvery minute

Scheduled Publishing

Forms can be scheduled to publish automatically:
async function processScheduledPublishes(): Promise<number> {
  const result = await formsRepository.getFormsDueForPublish();
  if (!result.success || !result.data.length) return 0;

  let published = 0;
  for (const form of result.data) {
    await supabaseAdmin
      .from("forms")
      .update({
        status: "published",
        published_schema: form.schema,
      })
      .eq("id", form.id);

    published++;
    logger.info("Auto-published form", { formId: form.id });
  }

  return published;
}

Form Auto-Freezing

Forms are automatically frozen after receiving their first submission:
async function processAutoFreeze(): Promise<number> {
  // Find published, non-frozen forms
  const { data: forms } = await supabaseAdmin
    .from("forms")
    .select("id, title")
    .eq("status", "published")
    .eq("frozen", false);

  let frozen = 0;
  for (const form of forms) {
    // Check if form has any submissions
    const { count } = await supabaseAdmin
      .from("submissions")
      .select("*", { count: "exact", head: true })
      .eq("form_id", form.id);

    if (count > 0) {
      await supabaseAdmin
        .from("forms")
        .update({
          frozen: true,
          frozen_at: new Date().toISOString(),
          frozen_reason: "First submission received",
        })
        .eq("id", form.id);

      frozen++;
    }
  }

  return frozen;
}

Cron Configuration

Both workers are triggered via Vercel Cron Jobs:
// vercel.json
{
  "crons": [
    {
      "path": "/api/queue/process",
      "schedule": "* * * * *"
    },
    {
      "path": "/api/cron/worker",
      "schedule": "* * * * *"
    }
  ]
}

API Endpoints

EndpointPurposeMethod
/api/queue/processProcess async operations queueGET
/api/cron/workerRun lifecycle and cleanup tasksGET/POST

Monitoring

Queue Health

-- Check queue status
SELECT operation_type, status, COUNT(*)
FROM async_operations
GROUP BY operation_type, status;

-- Find stuck operations
SELECT * FROM async_operations
WHERE status = 'processing'
AND updated_at < NOW() - INTERVAL '10 minutes';

Worker Health

The worker returns detailed results:
interface WorkerResult {
  formsPublished: number;
  formsClosed: number;
  formsFrozen: number;
  sessionsCleanedUp: number;
  softDeleteCleanup: { forms: number; submissions: number };
  duration: number;
  error?: string;
}
Check logs for worker activity:
Worker completed: formsPublished=2, formsClosed=0, formsFrozen=1, duration=234ms

Error Handling

Both workers handle errors gracefully:
  1. Per-item isolation: One failed operation doesn’t block others
  2. Logging: All errors are logged with context
  3. Retry logic: Queue operations retry with exponential backoff
  4. Dead letter: Persistently failing operations are marked as failed
try {
  await processOperation(op);
  await completeOperation(op.id);
} catch (error) {
  logger.error("Operation failed", { operationId: op.id, error });
  await failOperation(op.id, error.message);
}

Local Development

For local testing, you can trigger workers manually:
# Trigger queue processor
curl http://localhost:3000/api/queue/process

# Trigger lifecycle worker
curl http://localhost:3000/api/cron/worker
Or run the worker loop directly:
import { startWorkerLoop } from "@/lib/worker";

// Runs continuously, checking every minute
await startWorkerLoop();