Skip to main content

Queue Architecture

When an applicant submits a form, we save it immediately. Everything else—Airtable sync, email, webhooks—happens asynchronously.
Form submissions are sacred. If Airtable is down or an email fails, the applicant shouldn’t see an error. Terra uses a persistent async queue to decouple submission success from integration success.

Why Async?

Consider what happens on submission:
  1. Save submission to database
  2. Sync to Airtable
  3. Send confirmation email
  4. Fire webhooks
  5. Enrich with Plaid data
If any step fails synchronously, the applicant sees an error—even though their data was saved. This creates support tickets and anxiety. With async processing: The applicant sees success in ~300ms. Integrations process in the background.

The async_operations Table

The queue is a database table, not an external service like Redis or SQS:
CREATE TABLE async_operations (
  id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
  operation_type TEXT NOT NULL,  -- 'webhook', 'airtable_sync', etc.
  payload JSONB NOT NULL,        -- Operation-specific data

  -- Linking
  submission_id UUID REFERENCES form_submissions(id),
  form_id UUID REFERENCES forms(id),
  workspace_id UUID REFERENCES folders(id),

  -- State
  status TEXT NOT NULL DEFAULT 'pending',
  attempts INT NOT NULL DEFAULT 0,
  max_attempts INT NOT NULL DEFAULT 3,

  -- Timing
  last_attempt_at TIMESTAMPTZ,
  next_retry_at TIMESTAMPTZ DEFAULT NOW(),
  completed_at TIMESTAMPTZ,

  -- Error tracking
  last_error TEXT,
  error_history JSONB DEFAULT '[]',
  result JSONB,

  -- Timestamps
  created_at TIMESTAMPTZ DEFAULT NOW(),
  updated_at TIMESTAMPTZ DEFAULT NOW()
);

-- Index for queue processor
CREATE INDEX idx_pending_operations
ON async_operations(status, next_retry_at)
WHERE status IN ('pending', 'failed');

Operation Types

TypePurposeMax Attempts
webhookFire webhook to external URL5
airtable_syncSync submission to Airtable3
emailSend email via Resend3
smsSend SMS via Twilio3
plaid_enrichmentFetch Plaid verification data3
notificationCombined email + SMS3

Status Lifecycle


Enqueueing Operations

The enqueueOperation function adds jobs to the queue:
// src/lib/async-queue.ts

export async function enqueueOperation(
  operationType: OperationType,
  payload: Record<string, unknown>,
  options: EnqueueOptions = {}
): Promise<string | null> {
  const { data, error } = await supabaseAdmin.rpc("enqueue_async_operation", {
    p_operation_type: operationType,
    p_payload: payload,
    p_submission_id: options.submissionId || null,
    p_form_id: options.formId || null,
    p_workspace_id: options.workspaceId || null,
    p_max_attempts: options.maxAttempts || 3,
  });

  if (error) {
    console.error("[AsyncQueue] Failed to enqueue:", error);
    return null;
  }

  return data as string; // Returns operation ID
}

Convenience Functions

// Webhook with 5 retries
await enqueueWebhook(webhookUrl, formId, submissionId, data);

// Airtable sync
await enqueueAirtableSync(formId, submissionId, data, schema, settings);

// Email + SMS notification
await enqueueNotification({
  formId,
  submissionId,
  eventType: "submission_receipt",
  email: { to: "user@example.com", subject: "...", templateParams: {...} },
  sms: { to: "+1234567890", body: "..." },
});

Queue Processing

A cron-triggered API route processes the queue:
// /api/queue/process/route.ts

export async function GET() {
  // Get pending operations ready for processing
  const operations = await getPendingOperations(undefined, 10);

  for (const op of operations) {
    // Atomic claim (prevents double-processing)
    const claimed = await claimOperation(op.id);
    if (!claimed) continue;

    try {
      // Execute based on type
      switch (op.operation_type) {
        case "webhook":
          await processWebhook(op);
          break;
        case "airtable_sync":
          await processAirtableSync(op);
          break;
        case "email":
          await processEmail(op);
          break;
        // ...
      }

      // Mark complete
      await completeOperation(op.id, { processedAt: new Date().toISOString() });
    } catch (error) {
      // Mark failed (will retry if under max attempts)
      await failOperation(op.id, error.message);
    }
  }

  return Response.json({ processed: operations.length });
}

Claiming Operations

The claim_async_operation function uses atomic updates to prevent double-processing:
-- Stored procedure
CREATE OR REPLACE FUNCTION claim_async_operation(p_operation_id UUID)
RETURNS BOOLEAN AS $$
DECLARE
  v_claimed BOOLEAN;
BEGIN
  UPDATE async_operations
  SET status = 'processing',
      last_attempt_at = NOW(),
      attempts = attempts + 1,
      updated_at = NOW()
  WHERE id = p_operation_id
    AND status IN ('pending', 'failed')
    AND (next_retry_at IS NULL OR next_retry_at <= NOW())
  RETURNING TRUE INTO v_claimed;

  RETURN COALESCE(v_claimed, FALSE);
END;
$$ LANGUAGE plpgsql;
This ensures only one processor handles each operation, even with concurrent workers.

Retry Strategy

Failed operations retry with exponential backoff:
// Calculated in fail_async_operation procedure
const backoffMinutes = Math.pow(2, attempts) * 5; // 5, 10, 20, 40 minutes...
const nextRetryAt = new Date(Date.now() + backoffMinutes * 60 * 1000);
AttemptBackoffTotal Wait
15 min5 min
210 min15 min
320 min35 min
440 min75 min
580 min~2.5 hours
After max_attempts, the operation moves to dead status.

Error History

Each failure is recorded:
{
  "error_history": [
    { "error": "Connection timeout", "timestamp": "2025-01-02T10:30:00Z", "attempt": 1 },
    { "error": "Rate limited", "timestamp": "2025-01-02T10:35:00Z", "attempt": 2 },
    { "error": "Rate limited", "timestamp": "2025-01-02T10:45:00Z", "attempt": 3 }
  ]
}
This helps debug persistent failures.

Dead-Letter Handling

Operations that fail all retries need manual attention:
// Get dead operations
const deadOps = await getDeadLetterOperations(50);

// In admin UI, display for review
for (const op of deadOps) {
  console.log(`${op.operation_type}: ${op.last_error}`);
  console.log(`Attempts: ${op.attempts}, Created: ${op.created_at}`);
}

// Manual retry after fixing the issue
await retryOperation(operationId);

// Or cancel if it's no longer needed
await cancelOperation(operationId);

Monitoring

Queue Status View

A database view summarizes queue health:
CREATE VIEW async_operations_status AS
SELECT
  operation_type,
  status,
  COUNT(*) as count,
  MIN(created_at) as oldest,
  MAX(created_at) as newest,
  AVG(attempts) as avg_attempts
FROM async_operations
GROUP BY operation_type, status;
// Get status summary
const status = await getQueueStatus();
// Returns: [
//   { operation_type: 'webhook', status: 'pending', count: 5, ... },
//   { operation_type: 'airtable_sync', status: 'completed', count: 142, ... },
// ]

Notification Statistics

const stats = await getNotificationQueueStats();
// {
//   pending: 3,
//   processing: 1,
//   completed: 1250,
//   failed: 12,
//   todaysSent: 45
// }

Operation Payloads

Each operation type has a specific payload structure:

Webhook

interface WebhookPayload {
  webhookUrl: string;
  formId: string;
  submissionId: string;
  data: Record<string, unknown>;
  headers?: Record<string, string>;
}

Airtable Sync

interface AirtableSyncPayload {
  formId: string;
  submissionId: string;
  data: Record<string, unknown>;
  schema: Record<string, unknown>;
  airtableSettings: {
    base_id: string;
    table_name: string;
    api_key: string;
  };
}

Notification

interface NotificationPayload {
  formId: string;
  submissionId: string;
  eventType: "submission_receipt" | "status_change" | "reminder";
  email?: {
    to: string;
    subject: string;
    templateParams: { applicantName: string; programName: string; ... };
    from?: string;
    replyTo?: string;
  };
  sms?: {
    to: string;
    body: string;
  };
  metadata?: Record<string, unknown>;
}

Why Database, Not Redis/SQS?

We use PostgreSQL instead of dedicated queue services because:
  1. Transactional consistency — Enqueue in same transaction as submission
  2. No extra infrastructure — One less service to manage
  3. Easy querying — SQL for debugging and monitoring
  4. Persistence — Survives restarts without configuration
  5. ACID guarantees — Claims are atomic, no lost messages
For our volume (thousands of submissions/day), PostgreSQL handles it easily.

Edge Cases

Duplicate Prevention

Webhook idempotency keys prevent double-delivery:
const idempotencyKey = `${formId}-${submissionId}-${webhookId}`;
// External system can use this to deduplicate

Long-Running Operations

Some operations (like large Airtable syncs) may timeout. The processor sets a maximum execution time and fails gracefully:
const controller = new AbortController();
const timeout = setTimeout(() => controller.abort(), 30000); // 30s timeout

try {
  await processWithTimeout(operation, controller.signal);
} finally {
  clearTimeout(timeout);
}

Stuck Processing

If a processor crashes mid-operation, the operation stays in processing status. A cleanup job resets stale processing operations:
-- Reset operations stuck in processing for > 5 minutes
UPDATE async_operations
SET status = 'pending',
    next_retry_at = NOW()
WHERE status = 'processing'
  AND last_attempt_at < NOW() - INTERVAL '5 minutes';