Best Practices
Guidelines for writing reliable, efficient background jobs.
Idempotency
Design jobs to be safely re-runnable. If a job fails mid-execution and gets rescheduled, it should handle duplicate processing gracefully:
async run({ serverApp, log, job }) {
// Check if the work was already done
const existing = await serverApp.db.select()
.from(processedItems)
.where(eq(processedItems.externalId, job.data.externalId))
.then(rows => rows[0])
if (existing) {
log.info("Already processed, skipping")
return
}
// Do the actual work
await processItem(serverApp, job.data)
}
Use unique constraints in the database to prevent duplicate records even if the check-then-act pattern has a race condition.
Error Handling
Let errors bubble up — the worker system records failures and handles retries based on job configuration. Log context before throwing so the error is debuggable:
async run({ serverApp, log, job }) {
const result = await fetchExternalData(job.data.url)
if (!result.ok) {
log.error("External API returned error", {
status: result.status,
jobId: job.id
})
throw new Error(`API returned ${result.status}`)
}
// Process result...
}
Duration
Keep jobs focused and short. For long-running work, break it into chains of smaller jobs:
// Instead of one massive job, chain them
async run({ serverApp, log, job }) {
const batch = await getNextBatch(serverApp, job.data.cursor)
await processBatch(serverApp, batch)
if (batch.hasMore) {
// Schedule continuation
await serverApp.workerManager.submitJob({
tag: `process-batch-${batch.nextCursor}`,
type: "processBatch",
userId: 0,
data: { cursor: batch.nextCursor }
})
}
}
Database Transactions
Use withTransaction for operations that need atomicity. The transaction wrapper handles serialization conflicts automatically with retries:
import { withTransaction } from "../../db/shared"
async run({ serverApp, log, job }) {
await withTransaction(serverApp.db, async (tx) => {
const [user] = await tx.insert(users).values({}).returning()
await tx.insert(userAuth).values({
userId: user.id,
authType: "email",
authIdentifier: job.data.email
})
})
}
Monitoring
Use structured logging with job IDs for tracing. The log parameter is already scoped to the job:
log.info("Processing started", { itemCount: items.length })
log.debug("Item details", { item })
log.info("Processing complete", { processed: items.length })
Check the workerJobs table to monitor execution patterns:
-- Recent failures
SELECT type, tag, result, finished
FROM worker_jobs
WHERE success = false
ORDER BY finished DESC
LIMIT 20;
-- Average execution time by type
SELECT type, AVG(EXTRACT(EPOCH FROM (finished - started))) as avg_seconds
FROM worker_jobs
WHERE finished IS NOT NULL
GROUP BY type;
Resource Management
Worker processes use smaller database connection pools (2 connections vs 10 for the main server). Be mindful of concurrent database operations within a job — avoid opening many parallel queries.
Job Deduplication
Use tags to prevent scheduling duplicate jobs:
// Check if a job with this tag is already pending
const existing = await serverApp.db.select()
.from(workerJobs)
.where(and(
eq(workerJobs.tag, `sync-user-${userId}`),
isNull(workerJobs.started)
))
.then(rows => rows[0])
if (!existing) {
await serverApp.workerManager.submitJob({
tag: `sync-user-${userId}`,
type: "syncUser",
userId,
data: {}
})
}
Cron Scheduling
For recurring jobs, use cron expressions. Keep frequency appropriate for the task — polling too often wastes resources:
await serverApp.workerManager.submitJob({
tag: "cleanup",
type: "removeOldWorkerJobs",
userId: 0,
cronSchedule: "0 0 * * * *", // Every hour
persistent: true
})
The persistent flag ensures the job survives server restarts. Without it, cron jobs need to be re-submitted on startup.
Testing
Test jobs in isolation using the test helpers:
import { startTestServer } from "../helpers/server"
test("my job processes correctly", async () => {
const { serverApp } = await startTestServer()
// Set up test data
await serverApp.db.insert(items).values({ ... })
// Run the job directly
const log = serverApp.createLogger("test")
await myJob.run({
serverApp,
log,
job: { id: 1, type: "myJob", data: { ... }, userId: 0 }
})
// Verify results
const result = await serverApp.db.select().from(items)
expect(result).toHaveLength(1)
})