Last Week
Successfully set up the email outbox infrastructure in the database - a queue table that holds all emails waiting to be sent. Connected the app to this outbox so guest notification emails now properly queue instead of attempting immediate delivery. The architecture is solid: emails queue, the drainer function works when manually invoked, but the cron job automation isn’t triggering.
The Cron Job Mystery
The email infrastructure is 90% complete. The outbox pattern is implemented, the drainer function successfully processes queued emails when manually invoked, but the automated cron job that should trigger every 60 seconds isn’t working. This should be a straightforward fix, but it’s blocking the entire email automation pipeline.
Also spent time squashing UI bugs in the guest list details pane where states weren’t updating correctly and error messages weren’t displaying. The goal is to have a polished user experience from the first interaction.
What does it mean in English?
The app can queue emails to send, and the sending function works perfectly when I manually trigger it. But the automatic timer that should trigger it every minute isn’t working. It’s like having a working dishwasher that requires you to manually press start every 60 seconds instead of running its cycle automatically.
Nerdy Details
Let me dive deep into the cron job debugging process, the intricacies of email queue processing, and the UI state management issues I encountered this week.
The Cron Job Architecture Problem
Supabase’s cron implementation uses pg_cron under the hood, which has specific limitations and quirks that aren’t immediately obvious. Here’s what the current non-working setup looks like:
-- Current cron job configuration (NOT WORKING)
SELECT cron.schedule(
'drain-email-outbox',
'* * * * *', -- Every minute
$$
SELECT net.http_post(
url := 'https://[project-ref].functions.supabase.co/drain-email-outbox',
headers := jsonb_build_object(
'Authorization', 'Bearer ' || current_setting('app.service_key')
)
);
$$
);
The issue? Several potential culprits:
- Service key not available in cron context
- Network functions not enabled for cron
- Incorrect function URL formation
- Missing error logging
Here’s the comprehensive debugging approach:
-- Step 1: Verify cron is actually running
CREATE TABLE cron_debug_log (
id SERIAL PRIMARY KEY,
executed_at TIMESTAMPTZ DEFAULT NOW(),
message TEXT,
error_details JSONB
);
-- Step 2: Create a debug cron job
SELECT cron.schedule(
'debug-cron-test',
'* * * * *',
$$
INSERT INTO cron_debug_log (message)
VALUES ('Cron executed at ' || NOW());
$$
);
-- Step 3: Check if network extension is available in cron context
SELECT cron.schedule(
'test-network-access',
'* * * * *',
$$
DO $$
DECLARE
result JSONB;
error_msg TEXT;
BEGIN
-- Test if net.http_post is accessible
BEGIN
SELECT net.http_post(
url := 'https://httpbin.org/post',
body := jsonb_build_object('test', true)
) INTO result;
INSERT INTO cron_debug_log (message, error_details)
VALUES ('Network test succeeded', result);
EXCEPTION WHEN OTHERS THEN
GET STACKED DIAGNOSTICS error_msg = MESSAGE_TEXT;
INSERT INTO cron_debug_log (message, error_details)
VALUES ('Network test failed', jsonb_build_object('error', error_msg));
END;
END $$;
$$
);
Alternative Cron Implementation Strategies
Given the potential issues with direct HTTP calls from pg_cron, here are alternative approaches:
Option 1: Database-Trigger Based Processing
-- Create a processing request table
CREATE TABLE email_processing_requests (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
requested_at TIMESTAMPTZ DEFAULT NOW(),
processed_at TIMESTAMPTZ,
batch_size INTEGER DEFAULT 50,
emails_processed INTEGER DEFAULT 0,
status TEXT DEFAULT 'pending' -- pending, processing, completed, failed
);
-- Cron job just inserts a processing request
SELECT cron.schedule(
'request-email-processing',
'* * * * *',
$$
INSERT INTO email_processing_requests (batch_size)
VALUES (50);
$$
);
-- Database trigger invokes Edge Function
CREATE OR REPLACE FUNCTION trigger_email_processing()
RETURNS TRIGGER AS $$
DECLARE
auth_token TEXT;
function_url TEXT;
result JSONB;
BEGIN
-- Get configuration
SELECT value INTO auth_token
FROM app_config
WHERE key = 'edge_function_service_key';
SELECT value INTO function_url
FROM app_config
WHERE key = 'drain_outbox_url';
-- Use pg_background to make async HTTP call
PERFORM pg_background_launch(
format(
$$
SELECT net.http_post(
url := %L,
headers := jsonb_build_object('Authorization', 'Bearer ' || %L),
body := jsonb_build_object('request_id', %L)
);
$$,
function_url,
auth_token,
NEW.id
)
);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER process_email_request
AFTER INSERT ON email_processing_requests
FOR EACH ROW
EXECUTE FUNCTION trigger_email_processing();
Option 2: External Cron Service
// Using Cloudflare Workers with Cron Triggers
// wrangler.toml
export default {
async scheduled(event: ScheduledEvent, env: Env, ctx: ExecutionContext) {
// This runs every minute
const response = await fetch(
`${env.SUPABASE_URL}/functions/v1/drain-email-outbox`,
{
method: 'POST',
headers: {
'Authorization': `Bearer ${env.SUPABASE_SERVICE_KEY}`,
'Content-Type': 'application/json'
},
body: JSON.stringify({
source: 'cloudflare-cron',
timestamp: new Date().toISOString()
})
}
);
if (!response.ok) {
// Log to error tracking service
await logError({
message: 'Failed to trigger email drainer',
status: response.status,
body: await response.text()
});
}
}
};
// Configuration in wrangler.toml
// [triggers]
// crons = ["* * * * *"]
Option 3: Durable Objects for Sub-Minute Processing
// Cloudflare Durable Object for more frequent processing
export class EmailQueueProcessor {
private state: DurableObjectState;
private env: Env;
private processingInterval?: number;
constructor(state: DurableObjectState, env: Env) {
this.state = state;
this.env = env;
}
async fetch(request: Request) {
const url = new URL(request.url);
switch (url.pathname) {
case '/start':
return this.startProcessing();
case '/stop':
return this.stopProcessing();
case '/status':
return this.getStatus();
default:
return new Response('Not Found', { status: 404 });
}
}
private async startProcessing() {
if (this.processingInterval) {
return new Response('Already processing', { status: 200 });
}
// Set up alarm to run every 10 seconds
this.state.storage.setAlarm(Date.now() + 10000);
return new Response('Processing started', { status: 200 });
}
async alarm() {
// Process email queue
await this.drainEmailQueue();
// Schedule next alarm
this.state.storage.setAlarm(Date.now() + 10000);
}
private async drainEmailQueue() {
const startTime = Date.now();
try {
const response = await fetch(
`${this.env.SUPABASE_URL}/functions/v1/drain-email-outbox`,
{
method: 'POST',
headers: {
'Authorization': `Bearer ${this.env.SUPABASE_SERVICE_KEY}`,
'Content-Type': 'application/json'
},
body: JSON.stringify({
source: 'durable-object',
max_processing_time: 8000, // 8 seconds max
batch_size: 20 // Smaller batch for frequent processing
})
}
);
const result = await response.json();
// Store metrics
await this.state.storage.put({
[`processing_${Date.now()}`]: {
duration: Date.now() - startTime,
emails_processed: result.processed,
success: response.ok
}
});
} catch (error) {
// Store error
await this.state.storage.put({
[`error_${Date.now()}`]: {
message: error.message,
stack: error.stack
}
});
}
}
}
Email Queue Processing Optimizations
The drainer function works when manually invoked, but let’s optimize it for production:
// Enhanced drainer with better error handling and metrics
interface DrainerConfig {
batchSize: number;
maxProcessingTime: number;
priorityLevels: number[];
retryStrategy: RetryStrategy;
}
interface RetryStrategy {
maxAttempts: number;
backoffMultiplier: number;
maxBackoffMs: number;
}
class EmailDrainer {
private config: DrainerConfig;
private metrics: DrainerMetrics;
constructor(config: DrainerConfig) {
this.config = config;
this.metrics = new DrainerMetrics();
}
async drain(): Promise<DrainResult> {
const startTime = Date.now();
const results: EmailResult[] = [];
try {
// Use advisory lock to prevent concurrent draining
const lockAcquired = await this.acquireAdvisoryLock();
if (!lockAcquired) {
return {
status: 'skipped',
reason: 'Another drainer is running',
duration: Date.now() - startTime
};
}
// Process each priority level
for (const priority of this.config.priorityLevels) {
if (this.shouldStopProcessing(startTime)) {
break;
}
const batch = await this.fetchBatch(priority);
if (batch.length === 0) continue;
const batchResults = await this.processBatch(batch);
results.push(...batchResults);
// Update metrics
this.metrics.recordBatch(priority, batchResults);
}
return {
status: 'success',
processed: results.length,
successful: results.filter(r => r.success).length,
failed: results.filter(r => !r.success).length,
duration: Date.now() - startTime,
metrics: this.metrics.getSnapshot()
};
} catch (error) {
return {
status: 'error',
error: error.message,
processed: results.length,
duration: Date.now() - startTime
};
} finally {
await this.releaseAdvisoryLock();
}
}
private async fetchBatch(priority: number): Promise<EmailOutboxRow[]> {
const { data, error } = await supabase
.from('email_outbox')
.select('*')
.eq('priority', priority)
.in('status', ['pending', 'retry'])
.lte('scheduled_for', new Date().toISOString())
.lt('attempts', this.config.retryStrategy.maxAttempts)
.order('scheduled_for', { ascending: true })
.limit(this.config.batchSize)
.forUpdate({ skipLocked: true }); // Skip locked rows
if (error) throw error;
return data || [];
}
private async processBatch(emails: EmailOutboxRow[]): Promise<EmailResult[]> {
// Process with controlled concurrency
const concurrency = 5;
const results: EmailResult[] = [];
for (let i = 0; i < emails.length; i += concurrency) {
const chunk = emails.slice(i, i + concurrency);
const chunkPromises = chunk.map(email => this.processEmail(email));
const chunkResults = await Promise.allSettled(chunkPromises);
results.push(...chunkResults.map((result, index) => ({
emailId: chunk[index].id,
success: result.status === 'fulfilled',
error: result.status === 'rejected' ? result.reason : null,
attempts: chunk[index].attempts + 1
})));
// Rate limiting between chunks
await this.rateLimitPause();
}
return results;
}
private async processEmail(email: EmailOutboxRow): Promise<void> {
const startTime = Date.now();
try {
// Update status to sending
await this.updateEmailStatus(email.id, 'sending');
// Send via provider
const result = await this.sendViaProvider(email);
// Update with success
await this.updateEmailStatus(email.id, 'sent', {
provider_id: result.id,
sent_at: new Date().toISOString(),
processing_time: Date.now() - startTime
});
// Trigger success webhook
await this.triggerWebhook('email.sent', email, result);
} catch (error) {
const shouldRetry = this.shouldRetry(error, email);
if (shouldRetry) {
const nextAttempt = this.calculateNextAttemptTime(email.attempts);
await this.updateEmailStatus(email.id, 'retry', {
error: error.message,
next_attempt: nextAttempt.toISOString(),
attempts: email.attempts + 1
});
} else {
await this.updateEmailStatus(email.id, 'failed', {
error: error.message,
failed_at: new Date().toISOString(),
attempts: email.attempts + 1
});
// Trigger failure webhook
await this.triggerWebhook('email.failed', email, error);
}
throw error;
}
}
private calculateNextAttemptTime(attempts: number): Date {
const { backoffMultiplier, maxBackoffMs } = this.config.retryStrategy;
const backoffMs = Math.min(
Math.pow(backoffMultiplier, attempts) * 1000,
maxBackoffMs
);
return new Date(Date.now() + backoffMs);
}
private shouldRetry(error: any, email: EmailOutboxRow): boolean {
// Don't retry if max attempts reached
if (email.attempts >= this.config.retryStrategy.maxAttempts) {
return false;
}
// Retry on network errors
if (error.code === 'ECONNREFUSED' || error.code === 'ETIMEDOUT') {
return true;
}
// Retry on rate limiting
if (error.status === 429) {
return true;
}
// Don't retry on permanent failures
if (error.status >= 400 && error.status < 500) {
return false;
}
// Retry on server errors
return error.status >= 500;
}
}
Database Queue Optimization
Optimizing the database for queue operations is crucial:
-- Optimized queue table with better indexes
CREATE TABLE email_outbox_optimized (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
created_at TIMESTAMPTZ DEFAULT NOW(),
scheduled_for TIMESTAMPTZ DEFAULT NOW(),
priority SMALLINT DEFAULT 5,
status TEXT DEFAULT 'pending',
attempts SMALLINT DEFAULT 0,
-- Partitioning key for large-scale operations
partition_key INTEGER GENERATED ALWAYS AS (
EXTRACT(EPOCH FROM scheduled_for)::INTEGER / 3600
) STORED,
-- Email content (compressed)
email_data JSONB COMPRESSED,
-- Processing metadata
locked_at TIMESTAMPTZ,
locked_by TEXT,
processed_at TIMESTAMPTZ,
-- Indexes for queue operations
INDEX idx_queue_processing (status, priority, scheduled_for)
WHERE status IN ('pending', 'retry'),
INDEX idx_queue_locked (locked_by, locked_at)
WHERE locked_by IS NOT NULL,
INDEX idx_queue_partition (partition_key, status)
) PARTITION BY RANGE (partition_key);
-- Create partitions for efficient querying
CREATE TABLE email_outbox_y2025m02 PARTITION OF email_outbox_optimized
FOR VALUES FROM (1738368000) TO (1740960000);
-- Function to automatically create new partitions
CREATE OR REPLACE FUNCTION create_monthly_partition()
RETURNS void AS $$
DECLARE
start_epoch INTEGER;
end_epoch INTEGER;
partition_name TEXT;
BEGIN
-- Calculate next month's partition
start_epoch := EXTRACT(EPOCH FROM date_trunc('month', NOW() + INTERVAL '1 month'))::INTEGER;
end_epoch := EXTRACT(EPOCH FROM date_trunc('month', NOW() + INTERVAL '2 months'))::INTEGER;
partition_name := 'email_outbox_' || to_char(NOW() + INTERVAL '1 month', 'YYYY_MM');
-- Create partition if it doesn't exist
IF NOT EXISTS (
SELECT 1 FROM pg_class
WHERE relname = partition_name
) THEN
EXECUTE format(
'CREATE TABLE %I PARTITION OF email_outbox_optimized FOR VALUES FROM (%s) TO (%s)',
partition_name, start_epoch, end_epoch
);
END IF;
END;
$$ LANGUAGE plpgsql;
-- Schedule partition creation
SELECT cron.schedule(
'create-email-partitions',
'0 0 1 * *', -- First day of each month
'SELECT create_monthly_partition()'
);
UI State Management Debugging
The guest list details pane had several state management issues:
// Problem: State updates not reflecting in UI
class GuestListDetailsViewModel {
private _state = MutableStateFlow<GuestListDetailsState>(
GuestListDetailsState.Loading
)
// BUG: Direct mutation doesn't trigger recomposition
fun updateGuestStatus(guestId: String, newStatus: GuestStatus) {
val currentState = _state.value
if (currentState is GuestListDetailsState.Success) {
// This doesn't work - modifying list in place
currentState.guests.find { it.id == guestId }?.status = newStatus
_state.value = currentState // No actual change detected
}
}
// FIX: Create new state object
fun updateGuestStatusCorrectly(guestId: String, newStatus: GuestStatus) {
_state.update { currentState ->
when (currentState) {
is GuestListDetailsState.Success -> {
currentState.copy(
guests = currentState.guests.map { guest ->
if (guest.id == guestId) {
guest.copy(status = newStatus)
} else {
guest
}
}
)
}
else -> currentState
}
}
}
}
// Problem: Error messages not displaying
sealed class GuestListDetailsState {
object Loading : GuestListDetailsState()
data class Success(
val guests: List<Guest>,
val waitlist: Waitlist
) : GuestListDetailsState()
// BUG: Error state was missing context
data class Error(val message: String) : GuestListDetailsState()
}
// FIX: Enhanced error state with recovery options
sealed class GuestListDetailsState {
object Loading : GuestListDetailsState()
data class Success(
val guests: List<Guest>,
val waitlist: Waitlist,
val lastError: UiError? = null // Persist last error for toast
) : GuestListDetailsState()
data class Error(
val error: UiError,
val canRetry: Boolean = true,
val lastKnownData: Success? = null // Allow showing stale data
) : GuestListDetailsState()
}
data class UiError(
val message: String,
val code: String,
val debugInfo: String? = null,
val userAction: UserAction? = null
)
enum class UserAction {
RETRY,
REFRESH,
CONTACT_SUPPORT,
DISMISS
}
// Enhanced error handling in UI
@Composable
fun GuestListDetailsScreen(
state: GuestListDetailsState,
onRetry: () -> Unit
) {
when (state) {
is GuestListDetailsState.Error -> {
if (state.lastKnownData != null) {
// Show stale data with error banner
Column {
ErrorBanner(
error = state.error,
onAction = { action ->
when (action) {
UserAction.RETRY -> onRetry()
UserAction.REFRESH -> onRetry()
UserAction.DISMISS -> { /* Dismiss banner */ }
else -> { /* Handle other actions */ }
}
}
)
GuestList(guests = state.lastKnownData.guests)
}
} else {
// Full error screen
ErrorScreen(
error = state.error,
canRetry = state.canRetry,
onRetry = onRetry
)
}
}
is GuestListDetailsState.Success -> {
// Show success with optional error toast
state.lastError?.let { error ->
LaunchedEffect(error) {
showErrorToast(error)
}
}
GuestList(guests = state.guests)
}
is GuestListDetailsState.Loading -> {
LoadingScreen()
}
}
}
Testing the Complete Email Pipeline
Comprehensive testing ensures reliability:
class EmailPipelineIntegrationTest {
@Test
fun `complete email flow from queue to delivery`() = runTest {
// 1. Setup
val testGuest = createTestGuest()
val testWaitlist = createTestWaitlist()
// 2. Queue email
val emailId = emailService.queueNotificationEmail(
guestId = testGuest.id,
waitlistId = testWaitlist.id,
message = "Your table is ready!",
priority = EmailPriority.URGENT
).getOrThrow()
// 3. Verify queued
val queuedEmail = supabase.from("email_outbox")
.select()
.eq("id", emailId)
.single<EmailOutboxRow>()
assertEquals("pending", queuedEmail.status)
assertEquals(1, queuedEmail.priority)
// 4. Trigger drainer manually (since cron isn't working)
val drainResult = triggerDrainerFunction()
assertTrue(drainResult.success)
assertEquals(1, drainResult.processed)
// 5. Verify sent
val sentEmail = supabase.from("email_outbox")
.select()
.eq("id", emailId)
.single<EmailOutboxRow>()
assertEquals("sent", sentEmail.status)
assertNotNull(sentEmail.resend_id)
// 6. Simulate webhook callback
val webhookPayload = createResendWebhook(
type = "email.delivered",
email_id = sentEmail.resend_id!!
)
val webhookResult = processWebhook(webhookPayload)
assertTrue(webhookResult.success)
// 7. Verify final status
val deliveredEmail = supabase.from("email_outbox")
.select()
.eq("id", emailId)
.single<EmailOutboxRow>()
assertEquals("delivered", deliveredEmail.resend_status)
}
@Test
fun `cron job debugging test`() = runTest {
// Insert debug marker
supabase.from("cron_debug_log")
.insert(mapOf("message" to "Test started"))
// Wait for cron execution
delay(65000) // Wait 65 seconds
// Check for cron execution
val logs = supabase.from("cron_debug_log")
.select()
.gte("executed_at", testStartTime)
.order("executed_at", ascending = false)
assertTrue(
logs.size > 1,
"Cron should have executed at least once in 65 seconds"
)
// Check for network call attempts
val networkLogs = logs.filter {
it.message.contains("Network")
}
if (networkLogs.isEmpty()) {
fail("Cron cannot make network calls - need alternative approach")
}
}
}
Performance Monitoring
Setting up monitoring for the email system:
-- Email system health dashboard
CREATE VIEW email_system_health AS
WITH queue_stats AS (
SELECT
COUNT(*) FILTER (WHERE status = 'pending') as pending,
COUNT(*) FILTER (WHERE status = 'sending') as sending,
COUNT(*) FILTER (WHERE status = 'sent' AND created_at > NOW() - INTERVAL '1 hour') as sent_last_hour,
COUNT(*) FILTER (WHERE status = 'failed') as failed,
MIN(scheduled_for) FILTER (WHERE status = 'pending') as oldest_pending,
AVG(EXTRACT(EPOCH FROM (processed_at - created_at))) FILTER (WHERE status = 'sent') as avg_processing_time
FROM email_outbox
WHERE created_at > NOW() - INTERVAL '24 hours'
),
cron_stats AS (
SELECT
MAX(executed_at) as last_cron_execution,
COUNT(*) as executions_last_hour
FROM cron_debug_log
WHERE executed_at > NOW() - INTERVAL '1 hour'
)
SELECT
q.*,
c.*,
CASE
WHEN c.last_cron_execution < NOW() - INTERVAL '5 minutes' THEN 'CRITICAL'
WHEN q.oldest_pending < NOW() - INTERVAL '10 minutes' THEN 'WARNING'
WHEN q.failed > 10 THEN 'WARNING'
ELSE 'HEALTHY'
END as system_status
FROM queue_stats q, cron_stats c;
-- Alert function
CREATE OR REPLACE FUNCTION check_email_system_health()
RETURNS void AS $$
DECLARE
health_status TEXT;
pending_count INTEGER;
last_cron TIMESTAMPTZ;
BEGIN
SELECT
system_status,
pending,
last_cron_execution
INTO health_status, pending_count, last_cron
FROM email_system_health;
IF health_status != 'HEALTHY' THEN
-- Send alert
PERFORM net.http_post(
url := current_setting('app.alert_webhook'),
body := jsonb_build_object(
'alert_type', 'email_system_health',
'status', health_status,
'pending_emails', pending_count,
'last_cron', last_cron,
'timestamp', NOW()
)
);
END IF;
END;
$$ LANGUAGE plpgsql;
Next Week
Priority one: Fix the cron job. Whether that means debugging the pg_cron configuration, implementing an external cron service, or using Cloudflare Durable Objects for sub-minute processing. Once the automated email pipeline is working, focus shifts to comprehensive UI testing and bug fixes. The goal is a polished user experience - a functional app that frustrates users with bugs is worse than no app at all.