Last Week

Successfully set up the email outbox infrastructure in the database - a queue table that holds all emails waiting to be sent. Connected the app to this outbox so guest notification emails now properly queue instead of attempting immediate delivery. The architecture is solid: emails queue, the drainer function works when manually invoked, but the cron job automation isn’t triggering.

The Cron Job Mystery

The email infrastructure is 90% complete. The outbox pattern is implemented, the drainer function successfully processes queued emails when manually invoked, but the automated cron job that should trigger every 60 seconds isn’t working. This should be a straightforward fix, but it’s blocking the entire email automation pipeline.

Also spent time squashing UI bugs in the guest list details pane where states weren’t updating correctly and error messages weren’t displaying. The goal is to have a polished user experience from the first interaction.

What does it mean in English?

The app can queue emails to send, and the sending function works perfectly when I manually trigger it. But the automatic timer that should trigger it every minute isn’t working. It’s like having a working dishwasher that requires you to manually press start every 60 seconds instead of running its cycle automatically.

Nerdy Details

Let me dive deep into the cron job debugging process, the intricacies of email queue processing, and the UI state management issues I encountered this week.

The Cron Job Architecture Problem

Supabase’s cron implementation uses pg_cron under the hood, which has specific limitations and quirks that aren’t immediately obvious. Here’s what the current non-working setup looks like:

-- Current cron job configuration (NOT WORKING)
SELECT cron.schedule(
  'drain-email-outbox',
  '* * * * *', -- Every minute
  $$
  SELECT net.http_post(
    url := 'https://[project-ref].functions.supabase.co/drain-email-outbox',
    headers := jsonb_build_object(
      'Authorization', 'Bearer ' || current_setting('app.service_key')
    )
  );
  $$
);

The issue? Several potential culprits:

  1. Service key not available in cron context
  2. Network functions not enabled for cron
  3. Incorrect function URL formation
  4. Missing error logging

Here’s the comprehensive debugging approach:

-- Step 1: Verify cron is actually running
CREATE TABLE cron_debug_log (
  id SERIAL PRIMARY KEY,
  executed_at TIMESTAMPTZ DEFAULT NOW(),
  message TEXT,
  error_details JSONB
);

-- Step 2: Create a debug cron job
SELECT cron.schedule(
  'debug-cron-test',
  '* * * * *',
  $$
  INSERT INTO cron_debug_log (message)
  VALUES ('Cron executed at ' || NOW());
  $$
);

-- Step 3: Check if network extension is available in cron context
SELECT cron.schedule(
  'test-network-access',
  '* * * * *',
  $$
  DO $$
  DECLARE
    result JSONB;
    error_msg TEXT;
  BEGIN
    -- Test if net.http_post is accessible
    BEGIN
      SELECT net.http_post(
        url := 'https://httpbin.org/post',
        body := jsonb_build_object('test', true)
      ) INTO result;

      INSERT INTO cron_debug_log (message, error_details)
      VALUES ('Network test succeeded', result);
    EXCEPTION WHEN OTHERS THEN
      GET STACKED DIAGNOSTICS error_msg = MESSAGE_TEXT;
      INSERT INTO cron_debug_log (message, error_details)
      VALUES ('Network test failed', jsonb_build_object('error', error_msg));
    END;
  END $$;
  $$
);

Alternative Cron Implementation Strategies

Given the potential issues with direct HTTP calls from pg_cron, here are alternative approaches:

Option 1: Database-Trigger Based Processing

-- Create a processing request table
CREATE TABLE email_processing_requests (
  id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
  requested_at TIMESTAMPTZ DEFAULT NOW(),
  processed_at TIMESTAMPTZ,
  batch_size INTEGER DEFAULT 50,
  emails_processed INTEGER DEFAULT 0,
  status TEXT DEFAULT 'pending' -- pending, processing, completed, failed
);

-- Cron job just inserts a processing request
SELECT cron.schedule(
  'request-email-processing',
  '* * * * *',
  $$
  INSERT INTO email_processing_requests (batch_size)
  VALUES (50);
  $$
);

-- Database trigger invokes Edge Function
CREATE OR REPLACE FUNCTION trigger_email_processing()
RETURNS TRIGGER AS $$
DECLARE
  auth_token TEXT;
  function_url TEXT;
  result JSONB;
BEGIN
  -- Get configuration
  SELECT value INTO auth_token
  FROM app_config
  WHERE key = 'edge_function_service_key';

  SELECT value INTO function_url
  FROM app_config
  WHERE key = 'drain_outbox_url';

  -- Use pg_background to make async HTTP call
  PERFORM pg_background_launch(
    format(
      $$
      SELECT net.http_post(
        url := %L,
        headers := jsonb_build_object('Authorization', 'Bearer ' || %L),
        body := jsonb_build_object('request_id', %L)
      );
      $$,
      function_url,
      auth_token,
      NEW.id
    )
  );

  RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER process_email_request
AFTER INSERT ON email_processing_requests
FOR EACH ROW
EXECUTE FUNCTION trigger_email_processing();

Option 2: External Cron Service

// Using Cloudflare Workers with Cron Triggers
// wrangler.toml
export default {
  async scheduled(event: ScheduledEvent, env: Env, ctx: ExecutionContext) {
    // This runs every minute
    const response = await fetch(
      `${env.SUPABASE_URL}/functions/v1/drain-email-outbox`,
      {
        method: 'POST',
        headers: {
          'Authorization': `Bearer ${env.SUPABASE_SERVICE_KEY}`,
          'Content-Type': 'application/json'
        },
        body: JSON.stringify({
          source: 'cloudflare-cron',
          timestamp: new Date().toISOString()
        })
      }
    );

    if (!response.ok) {
      // Log to error tracking service
      await logError({
        message: 'Failed to trigger email drainer',
        status: response.status,
        body: await response.text()
      });
    }
  }
};

// Configuration in wrangler.toml
// [triggers]
// crons = ["* * * * *"]

Option 3: Durable Objects for Sub-Minute Processing

// Cloudflare Durable Object for more frequent processing
export class EmailQueueProcessor {
  private state: DurableObjectState;
  private env: Env;
  private processingInterval?: number;

  constructor(state: DurableObjectState, env: Env) {
    this.state = state;
    this.env = env;
  }

  async fetch(request: Request) {
    const url = new URL(request.url);

    switch (url.pathname) {
      case '/start':
        return this.startProcessing();
      case '/stop':
        return this.stopProcessing();
      case '/status':
        return this.getStatus();
      default:
        return new Response('Not Found', { status: 404 });
    }
  }

  private async startProcessing() {
    if (this.processingInterval) {
      return new Response('Already processing', { status: 200 });
    }

    // Set up alarm to run every 10 seconds
    this.state.storage.setAlarm(Date.now() + 10000);

    return new Response('Processing started', { status: 200 });
  }

  async alarm() {
    // Process email queue
    await this.drainEmailQueue();

    // Schedule next alarm
    this.state.storage.setAlarm(Date.now() + 10000);
  }

  private async drainEmailQueue() {
    const startTime = Date.now();

    try {
      const response = await fetch(
        `${this.env.SUPABASE_URL}/functions/v1/drain-email-outbox`,
        {
          method: 'POST',
          headers: {
            'Authorization': `Bearer ${this.env.SUPABASE_SERVICE_KEY}`,
            'Content-Type': 'application/json'
          },
          body: JSON.stringify({
            source: 'durable-object',
            max_processing_time: 8000, // 8 seconds max
            batch_size: 20 // Smaller batch for frequent processing
          })
        }
      );

      const result = await response.json();

      // Store metrics
      await this.state.storage.put({
        [`processing_${Date.now()}`]: {
          duration: Date.now() - startTime,
          emails_processed: result.processed,
          success: response.ok
        }
      });

    } catch (error) {
      // Store error
      await this.state.storage.put({
        [`error_${Date.now()}`]: {
          message: error.message,
          stack: error.stack
        }
      });
    }
  }
}

Email Queue Processing Optimizations

The drainer function works when manually invoked, but let’s optimize it for production:

// Enhanced drainer with better error handling and metrics
interface DrainerConfig {
  batchSize: number;
  maxProcessingTime: number;
  priorityLevels: number[];
  retryStrategy: RetryStrategy;
}

interface RetryStrategy {
  maxAttempts: number;
  backoffMultiplier: number;
  maxBackoffMs: number;
}

class EmailDrainer {
  private config: DrainerConfig;
  private metrics: DrainerMetrics;

  constructor(config: DrainerConfig) {
    this.config = config;
    this.metrics = new DrainerMetrics();
  }

  async drain(): Promise<DrainResult> {
    const startTime = Date.now();
    const results: EmailResult[] = [];

    try {
      // Use advisory lock to prevent concurrent draining
      const lockAcquired = await this.acquireAdvisoryLock();
      if (!lockAcquired) {
        return {
          status: 'skipped',
          reason: 'Another drainer is running',
          duration: Date.now() - startTime
        };
      }

      // Process each priority level
      for (const priority of this.config.priorityLevels) {
        if (this.shouldStopProcessing(startTime)) {
          break;
        }

        const batch = await this.fetchBatch(priority);
        if (batch.length === 0) continue;

        const batchResults = await this.processBatch(batch);
        results.push(...batchResults);

        // Update metrics
        this.metrics.recordBatch(priority, batchResults);
      }

      return {
        status: 'success',
        processed: results.length,
        successful: results.filter(r => r.success).length,
        failed: results.filter(r => !r.success).length,
        duration: Date.now() - startTime,
        metrics: this.metrics.getSnapshot()
      };

    } catch (error) {
      return {
        status: 'error',
        error: error.message,
        processed: results.length,
        duration: Date.now() - startTime
      };

    } finally {
      await this.releaseAdvisoryLock();
    }
  }

  private async fetchBatch(priority: number): Promise<EmailOutboxRow[]> {
    const { data, error } = await supabase
      .from('email_outbox')
      .select('*')
      .eq('priority', priority)
      .in('status', ['pending', 'retry'])
      .lte('scheduled_for', new Date().toISOString())
      .lt('attempts', this.config.retryStrategy.maxAttempts)
      .order('scheduled_for', { ascending: true })
      .limit(this.config.batchSize)
      .forUpdate({ skipLocked: true }); // Skip locked rows

    if (error) throw error;
    return data || [];
  }

  private async processBatch(emails: EmailOutboxRow[]): Promise<EmailResult[]> {
    // Process with controlled concurrency
    const concurrency = 5;
    const results: EmailResult[] = [];

    for (let i = 0; i < emails.length; i += concurrency) {
      const chunk = emails.slice(i, i + concurrency);
      const chunkPromises = chunk.map(email => this.processEmail(email));
      const chunkResults = await Promise.allSettled(chunkPromises);

      results.push(...chunkResults.map((result, index) => ({
        emailId: chunk[index].id,
        success: result.status === 'fulfilled',
        error: result.status === 'rejected' ? result.reason : null,
        attempts: chunk[index].attempts + 1
      })));

      // Rate limiting between chunks
      await this.rateLimitPause();
    }

    return results;
  }

  private async processEmail(email: EmailOutboxRow): Promise<void> {
    const startTime = Date.now();

    try {
      // Update status to sending
      await this.updateEmailStatus(email.id, 'sending');

      // Send via provider
      const result = await this.sendViaProvider(email);

      // Update with success
      await this.updateEmailStatus(email.id, 'sent', {
        provider_id: result.id,
        sent_at: new Date().toISOString(),
        processing_time: Date.now() - startTime
      });

      // Trigger success webhook
      await this.triggerWebhook('email.sent', email, result);

    } catch (error) {
      const shouldRetry = this.shouldRetry(error, email);

      if (shouldRetry) {
        const nextAttempt = this.calculateNextAttemptTime(email.attempts);
        await this.updateEmailStatus(email.id, 'retry', {
          error: error.message,
          next_attempt: nextAttempt.toISOString(),
          attempts: email.attempts + 1
        });
      } else {
        await this.updateEmailStatus(email.id, 'failed', {
          error: error.message,
          failed_at: new Date().toISOString(),
          attempts: email.attempts + 1
        });

        // Trigger failure webhook
        await this.triggerWebhook('email.failed', email, error);
      }

      throw error;
    }
  }

  private calculateNextAttemptTime(attempts: number): Date {
    const { backoffMultiplier, maxBackoffMs } = this.config.retryStrategy;
    const backoffMs = Math.min(
      Math.pow(backoffMultiplier, attempts) * 1000,
      maxBackoffMs
    );
    return new Date(Date.now() + backoffMs);
  }

  private shouldRetry(error: any, email: EmailOutboxRow): boolean {
    // Don't retry if max attempts reached
    if (email.attempts >= this.config.retryStrategy.maxAttempts) {
      return false;
    }

    // Retry on network errors
    if (error.code === 'ECONNREFUSED' || error.code === 'ETIMEDOUT') {
      return true;
    }

    // Retry on rate limiting
    if (error.status === 429) {
      return true;
    }

    // Don't retry on permanent failures
    if (error.status >= 400 && error.status < 500) {
      return false;
    }

    // Retry on server errors
    return error.status >= 500;
  }
}

Database Queue Optimization

Optimizing the database for queue operations is crucial:

-- Optimized queue table with better indexes
CREATE TABLE email_outbox_optimized (
  id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
  created_at TIMESTAMPTZ DEFAULT NOW(),
  scheduled_for TIMESTAMPTZ DEFAULT NOW(),
  priority SMALLINT DEFAULT 5,
  status TEXT DEFAULT 'pending',
  attempts SMALLINT DEFAULT 0,

  -- Partitioning key for large-scale operations
  partition_key INTEGER GENERATED ALWAYS AS (
    EXTRACT(EPOCH FROM scheduled_for)::INTEGER / 3600
  ) STORED,

  -- Email content (compressed)
  email_data JSONB COMPRESSED,

  -- Processing metadata
  locked_at TIMESTAMPTZ,
  locked_by TEXT,
  processed_at TIMESTAMPTZ,

  -- Indexes for queue operations
  INDEX idx_queue_processing (status, priority, scheduled_for)
    WHERE status IN ('pending', 'retry'),
  INDEX idx_queue_locked (locked_by, locked_at)
    WHERE locked_by IS NOT NULL,
  INDEX idx_queue_partition (partition_key, status)
) PARTITION BY RANGE (partition_key);

-- Create partitions for efficient querying
CREATE TABLE email_outbox_y2025m02 PARTITION OF email_outbox_optimized
  FOR VALUES FROM (1738368000) TO (1740960000);

-- Function to automatically create new partitions
CREATE OR REPLACE FUNCTION create_monthly_partition()
RETURNS void AS $$
DECLARE
  start_epoch INTEGER;
  end_epoch INTEGER;
  partition_name TEXT;
BEGIN
  -- Calculate next month's partition
  start_epoch := EXTRACT(EPOCH FROM date_trunc('month', NOW() + INTERVAL '1 month'))::INTEGER;
  end_epoch := EXTRACT(EPOCH FROM date_trunc('month', NOW() + INTERVAL '2 months'))::INTEGER;
  partition_name := 'email_outbox_' || to_char(NOW() + INTERVAL '1 month', 'YYYY_MM');

  -- Create partition if it doesn't exist
  IF NOT EXISTS (
    SELECT 1 FROM pg_class
    WHERE relname = partition_name
  ) THEN
    EXECUTE format(
      'CREATE TABLE %I PARTITION OF email_outbox_optimized FOR VALUES FROM (%s) TO (%s)',
      partition_name, start_epoch, end_epoch
    );
  END IF;
END;
$$ LANGUAGE plpgsql;

-- Schedule partition creation
SELECT cron.schedule(
  'create-email-partitions',
  '0 0 1 * *', -- First day of each month
  'SELECT create_monthly_partition()'
);

UI State Management Debugging

The guest list details pane had several state management issues:

// Problem: State updates not reflecting in UI
class GuestListDetailsViewModel {
  private _state = MutableStateFlow<GuestListDetailsState>(
    GuestListDetailsState.Loading
  )

  // BUG: Direct mutation doesn't trigger recomposition
  fun updateGuestStatus(guestId: String, newStatus: GuestStatus) {
    val currentState = _state.value
    if (currentState is GuestListDetailsState.Success) {
      // This doesn't work - modifying list in place
      currentState.guests.find { it.id == guestId }?.status = newStatus
      _state.value = currentState // No actual change detected
    }
  }

  // FIX: Create new state object
  fun updateGuestStatusCorrectly(guestId: String, newStatus: GuestStatus) {
    _state.update { currentState ->
      when (currentState) {
        is GuestListDetailsState.Success -> {
          currentState.copy(
            guests = currentState.guests.map { guest ->
              if (guest.id == guestId) {
                guest.copy(status = newStatus)
              } else {
                guest
              }
            }
          )
        }
        else -> currentState
      }
    }
  }
}

// Problem: Error messages not displaying
sealed class GuestListDetailsState {
  object Loading : GuestListDetailsState()
  data class Success(
    val guests: List<Guest>,
    val waitlist: Waitlist
  ) : GuestListDetailsState()

  // BUG: Error state was missing context
  data class Error(val message: String) : GuestListDetailsState()
}

// FIX: Enhanced error state with recovery options
sealed class GuestListDetailsState {
  object Loading : GuestListDetailsState()

  data class Success(
    val guests: List<Guest>,
    val waitlist: Waitlist,
    val lastError: UiError? = null // Persist last error for toast
  ) : GuestListDetailsState()

  data class Error(
    val error: UiError,
    val canRetry: Boolean = true,
    val lastKnownData: Success? = null // Allow showing stale data
  ) : GuestListDetailsState()
}

data class UiError(
  val message: String,
  val code: String,
  val debugInfo: String? = null,
  val userAction: UserAction? = null
)

enum class UserAction {
  RETRY,
  REFRESH,
  CONTACT_SUPPORT,
  DISMISS
}

// Enhanced error handling in UI
@Composable
fun GuestListDetailsScreen(
  state: GuestListDetailsState,
  onRetry: () -> Unit
) {
  when (state) {
    is GuestListDetailsState.Error -> {
      if (state.lastKnownData != null) {
        // Show stale data with error banner
        Column {
          ErrorBanner(
            error = state.error,
            onAction = { action ->
              when (action) {
                UserAction.RETRY -> onRetry()
                UserAction.REFRESH -> onRetry()
                UserAction.DISMISS -> { /* Dismiss banner */ }
                else -> { /* Handle other actions */ }
              }
            }
          )
          GuestList(guests = state.lastKnownData.guests)
        }
      } else {
        // Full error screen
        ErrorScreen(
          error = state.error,
          canRetry = state.canRetry,
          onRetry = onRetry
        )
      }
    }
    is GuestListDetailsState.Success -> {
      // Show success with optional error toast
      state.lastError?.let { error ->
        LaunchedEffect(error) {
          showErrorToast(error)
        }
      }
      GuestList(guests = state.guests)
    }
    is GuestListDetailsState.Loading -> {
      LoadingScreen()
    }
  }
}

Testing the Complete Email Pipeline

Comprehensive testing ensures reliability:

class EmailPipelineIntegrationTest {
  @Test
  fun `complete email flow from queue to delivery`() = runTest {
    // 1. Setup
    val testGuest = createTestGuest()
    val testWaitlist = createTestWaitlist()

    // 2. Queue email
    val emailId = emailService.queueNotificationEmail(
      guestId = testGuest.id,
      waitlistId = testWaitlist.id,
      message = "Your table is ready!",
      priority = EmailPriority.URGENT
    ).getOrThrow()

    // 3. Verify queued
    val queuedEmail = supabase.from("email_outbox")
      .select()
      .eq("id", emailId)
      .single<EmailOutboxRow>()

    assertEquals("pending", queuedEmail.status)
    assertEquals(1, queuedEmail.priority)

    // 4. Trigger drainer manually (since cron isn't working)
    val drainResult = triggerDrainerFunction()

    assertTrue(drainResult.success)
    assertEquals(1, drainResult.processed)

    // 5. Verify sent
    val sentEmail = supabase.from("email_outbox")
      .select()
      .eq("id", emailId)
      .single<EmailOutboxRow>()

    assertEquals("sent", sentEmail.status)
    assertNotNull(sentEmail.resend_id)

    // 6. Simulate webhook callback
    val webhookPayload = createResendWebhook(
      type = "email.delivered",
      email_id = sentEmail.resend_id!!
    )

    val webhookResult = processWebhook(webhookPayload)
    assertTrue(webhookResult.success)

    // 7. Verify final status
    val deliveredEmail = supabase.from("email_outbox")
      .select()
      .eq("id", emailId)
      .single<EmailOutboxRow>()

    assertEquals("delivered", deliveredEmail.resend_status)
  }

  @Test
  fun `cron job debugging test`() = runTest {
    // Insert debug marker
    supabase.from("cron_debug_log")
      .insert(mapOf("message" to "Test started"))

    // Wait for cron execution
    delay(65000) // Wait 65 seconds

    // Check for cron execution
    val logs = supabase.from("cron_debug_log")
      .select()
      .gte("executed_at", testStartTime)
      .order("executed_at", ascending = false)

    assertTrue(
      logs.size > 1,
      "Cron should have executed at least once in 65 seconds"
    )

    // Check for network call attempts
    val networkLogs = logs.filter {
      it.message.contains("Network")
    }

    if (networkLogs.isEmpty()) {
      fail("Cron cannot make network calls - need alternative approach")
    }
  }
}

Performance Monitoring

Setting up monitoring for the email system:

-- Email system health dashboard
CREATE VIEW email_system_health AS
WITH queue_stats AS (
  SELECT
    COUNT(*) FILTER (WHERE status = 'pending') as pending,
    COUNT(*) FILTER (WHERE status = 'sending') as sending,
    COUNT(*) FILTER (WHERE status = 'sent' AND created_at > NOW() - INTERVAL '1 hour') as sent_last_hour,
    COUNT(*) FILTER (WHERE status = 'failed') as failed,
    MIN(scheduled_for) FILTER (WHERE status = 'pending') as oldest_pending,
    AVG(EXTRACT(EPOCH FROM (processed_at - created_at))) FILTER (WHERE status = 'sent') as avg_processing_time
  FROM email_outbox
  WHERE created_at > NOW() - INTERVAL '24 hours'
),
cron_stats AS (
  SELECT
    MAX(executed_at) as last_cron_execution,
    COUNT(*) as executions_last_hour
  FROM cron_debug_log
  WHERE executed_at > NOW() - INTERVAL '1 hour'
)
SELECT
  q.*,
  c.*,
  CASE
    WHEN c.last_cron_execution < NOW() - INTERVAL '5 minutes' THEN 'CRITICAL'
    WHEN q.oldest_pending < NOW() - INTERVAL '10 minutes' THEN 'WARNING'
    WHEN q.failed > 10 THEN 'WARNING'
    ELSE 'HEALTHY'
  END as system_status
FROM queue_stats q, cron_stats c;

-- Alert function
CREATE OR REPLACE FUNCTION check_email_system_health()
RETURNS void AS $$
DECLARE
  health_status TEXT;
  pending_count INTEGER;
  last_cron TIMESTAMPTZ;
BEGIN
  SELECT
    system_status,
    pending,
    last_cron_execution
  INTO health_status, pending_count, last_cron
  FROM email_system_health;

  IF health_status != 'HEALTHY' THEN
    -- Send alert
    PERFORM net.http_post(
      url := current_setting('app.alert_webhook'),
      body := jsonb_build_object(
        'alert_type', 'email_system_health',
        'status', health_status,
        'pending_emails', pending_count,
        'last_cron', last_cron,
        'timestamp', NOW()
      )
    );
  END IF;
END;
$$ LANGUAGE plpgsql;

Next Week

Priority one: Fix the cron job. Whether that means debugging the pg_cron configuration, implementing an external cron service, or using Cloudflare Durable Objects for sub-minute processing. Once the automated email pipeline is working, focus shifts to comprehensive UI testing and bug fixes. The goal is a polished user experience - a functional app that frustrates users with bugs is worse than no app at all.