Building a Job Queue System with PostgreSQL and Go: A Practical Guide
A Simple But Powerful Alternative to RabbitMQ for Small to Medium Applications
--
In this guide, we’ll build a robust job queue system using PostgreSQL and Go. We’ll create something similar to popular job queues like Sidekiq or Bull, but tailored for Go applications and leveraging PostgreSQL’s powerful features.
Imagine you’re building an e-commerce platform where customers can upload their product catalogs. Processing these catalogs involves several steps: image resizing, data validation, and updating inventory. Running these tasks during the HTTP request would make the website slow and frustrating for users. This is where a job queue system comes in handy!
A Job’s Lifecycle
It is important to understand the lifecycle of a job. Let me explain job lifecycles in a simple way that a 20-year-old would easily understand: A Job’s Lifecycle: From Start to Finish
Think of a job like a task you submit on a food delivery app. From the moment you place your order until it arrives at your door, it goes through different stages. Here’s how jobs work in our queue system:
When you first submit a job, it can start in one of two ways:
- Regular Wait: Most jobs start here. Like being in line at a coffee shop — you wait your turn until a barista (worker) is free to handle your order.
- Delayed Start: This is like scheduling a food delivery for later. The job waits until its scheduled time, then moves to the front of the line.
Once a worker picks up your job, it enters the “active” state. Just like when a delivery driver picks up your food and starts driving to your address. The job stays active until it’s done processing.
Finally, one of two things happens:
- Success: The job completes, like getting your food delivery
- Failure: Something goes wrong, like when a delivery can’t be completed
If a job fails, the system might try again, just like a delivery app might send a new driver if the first attempt failed.
That’s it! Every job moves through these stages: wait/delay → active → complete/fail. Simple as that!
Understanding Job Queues with a Real Example
Let’s say a store owner uploads a CSV file with 1000 products. Instead of processing it immediately, our system will:
- Accept the file and respond quickly to the user
- Create a job in the queue
- Process the file in the background
- Update the user through notifications when it’s done
This will help illustrate how jobs flow through the system and how workers interact with the queue
Explanation:
- Successful Job Processing
- Client submits a job through the API
- Job gets inserted into PostgreSQL with ‘pending’ status
- Worker 1 claims the job using FOR UPDATE SKIP LOCKED
- Job is processed and marked as ‘completed’
2. Concurrent Processing Attempt
- While Worker 1 is processing, Worker 2 tries to claim a job
- Due to SKIP LOCKED, Worker 2 doesn’t block and immediately learns no jobs are available
3. Job Failure Scenario
- Client submits another job
- Worker 2 claims it but encounters an error
- Job is marked for retry with incremented retry count
Designing Our Queue Table
First, let’s create a table that will store our jobs:
CREATE TYPE job_status AS ENUM ('pending', 'running', 'completed', 'failed', 'retry');
CREATE TABLE jobs (
id BIGSERIAL PRIMARY KEY,
queue_name TEXT NOT NULL,
payload JSONB NOT NULL,
status job_status DEFAULT 'pending',
priority INTEGER DEFAULT 0,
max_retries INTEGER DEFAULT 3,
retry_count INTEGER DEFAULT 0,
error_message TEXT,
scheduled_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP,
started_at TIMESTAMPTZ,
completed_at TIMESTAMPTZ,
created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP,
locked_by TEXT,
locked_at TIMESTAMPTZ,
-- Add an index for faster job fetching
CONSTRAINT valid_retry_count CHECK (retry_count <= max_retries)
);
-- Index for finding jobs to process
CREATE INDEX idx_jobs_status_scheduled ON jobs (status, scheduled_at)
WHERE status = 'pending' OR status = 'retry';
Let’s break down why we chose these columns:
queue_name
: Different queues for different types of jobs (e.g., "email", "catalog_processing")payload
: Stores job-specific data as JSON (flexible for different job types)priority
: Higher priority jobs get processed firstlocked_by
: Prevents multiple workers from processing the same jobscheduled_at
: Allows delayed job execution
Job Processing in Go
Here’s how we implement the worker system in Go:
// Job represents a task in our queue
type Job struct {
ID int64 `db:"id"`
QueueName string `db:"queue_name"`
Payload json.RawMessage `db:"payload"`
Status string `db:"status"`
RetryCount int `db:"retry_count"`
MaxRetries int `db:"max_retries"`
ScheduledAt time.Time `db:"scheduled_at"`
}
// Worker handles job processing
type Worker struct {
db *sqlx.DB
workerID string
handlers map[string]JobHandler
}
type JobHandler func(context.Context, json.RawMessage) error
// FetchJob attempts to get and lock a job for processing
func (w *Worker) FetchJob(ctx context.Context) (*Job, error) {
var job Job
// Use a transaction to ensure atomic job claiming
tx, err := w.db.BeginTxx(ctx, nil)
if err != nil {
return nil, err
}
defer tx.Rollback()
// Try to find and lock a job
err = tx.QueryRowxContext(ctx, `
UPDATE jobs
SET status = 'running',
locked_by = $1,
locked_at = CURRENT_TIMESTAMP,
started_at = CURRENT_TIMESTAMP
WHERE id = (
SELECT id
FROM jobs
WHERE (status = 'pending' OR status = 'retry')
AND scheduled_at <= CURRENT_TIMESTAMP
AND locked_by IS NULL
ORDER BY priority DESC, scheduled_at ASC
FOR UPDATE SKIP LOCKED
LIMIT 1
)
RETURNING *
`, w.workerID).StructScan(&job)
if err == sql.ErrNoRows {
return nil, nil // No jobs available
}
if err != nil {
return nil, err
}
return &job, tx.Commit()
}
The FOR UPDATE SKIP LOCKED
clause is crucial here — it allows multiple workers to fetch jobs concurrently without conflicts. When one worker locks a job, others will skip it and move to the next available job.
Implementing Job Handlers
Now let’s implement a real handler for processing product catalogs:
type CatalogProcessingPayload struct {
FileURL string `json:"file_url"`
StoreID int64 `json:"store_id"`
NotifyEmail string `json:"notify_email"`
}
func ProcessCatalogHandler(ctx context.Context, payload json.RawMessage) error {
var data CatalogProcessingPayload
if err := json.Unmarshal(payload, &data); err != nil {
return fmt.Errorf("invalid payload: %w", err)
}
// Download and process the catalog file
products, err := downloadAndParseCatalog(data.FileURL)
if err != nil {
return fmt.Errorf("failed to process catalog: %w", err)
}
// Update inventory in batches
if err := updateInventory(ctx, data.StoreID, products); err != nil {
return fmt.Errorf("failed to update inventory: %w", err)
}
// Notify user
if err := sendNotificationEmail(data.NotifyEmail); err != nil {
// Log but don't fail the job for notification errors
log.Printf("Failed to send notification: %v", err)
}
return nil
}
Error Handling and Retries
When jobs fail, we want to retry them with increasing delays. Here’s how we handle that:
func (w *Worker) handleJobFailure(ctx context.Context, job *Job, err error) error {
// Calculate next retry delay using exponential backoff
retryDelay := time.Minute * time.Duration(math.Pow(2, float64(job.RetryCount)))
status := "failed"
if job.RetryCount < job.MaxRetries {
status = "retry"
}
_, err = w.db.ExecContext(ctx, `
UPDATE jobs
SET status = $1,
error_message = $2,
retry_count = retry_count + 1,
scheduled_at = CASE
WHEN retry_count < max_retries
THEN CURRENT_TIMESTAMP + $3::interval
ELSE scheduled_at
END,
locked_by = NULL,
locked_at = NULL
WHERE id = $4
`, status, err.Error(), retryDelay.String(), job.ID)
return err
}
Creating a Worker Pool
Finally, let’s create a pool of workers to process jobs concurrently:
type WorkerPool struct {
workers []*Worker
quit chan struct{}
}
func NewWorkerPool(db *sqlx.DB, numWorkers int) *WorkerPool {
pool := &WorkerPool{
workers: make([]*Worker, numWorkers),
quit: make(chan struct{}),
}
for i := 0; i < numWorkers; i++ {
pool.workers[i] = &Worker{
db: db,
workerID: fmt.Sprintf("worker-%d", i),
handlers: make(map[string]JobHandler),
}
}
return pool
}
func (p *WorkerPool) Start(ctx context.Context) {
for _, worker := range p.workers {
go func(w *Worker) {
for {
select {
case <-ctx.Done():
return
case <-p.quit:
return
default:
if job, err := w.FetchJob(ctx); err != nil {
log.Printf("Error fetching job: %v", err)
time.Sleep(time.Second)
} else if job != nil {
if handler, ok := w.handlers[job.QueueName]; ok {
if err := handler(ctx, job.Payload); err != nil {
w.handleJobFailure(ctx, job, err)
} else {
w.completeJob(ctx, job)
}
}
} else {
// No jobs available, wait a bit
time.Sleep(time.Second)
}
}
}
}(worker)
}
}
Using the Queue in Your Application
Here’s how to use this system in your application:
func main() {
db := setupDatabase()
// Create a worker pool with 5 workers
pool := NewWorkerPool(db, 5)
// Register job handlers
pool.RegisterHandler("catalog_processing", ProcessCatalogHandler)
// Start the worker pool
ctx := context.Background()
pool.Start(ctx)
// Example: Queue a new job
_, err := db.ExecContext(ctx, `
INSERT INTO jobs (queue_name, payload)
VALUES ($1, $2)
`, "catalog_processing", `{
"file_url": "https://example.com/catalog.csv",
"store_id": 123,
"notify_email": "store@example.com"
}`)
}
Best Practices and Tips
- Monitor Your Queue: Keep track of job processing times, failure rates, and queue depth. Create alerts for when things go wrong.
- Handle Graceful Shutdown: Make sure your workers can finish their current jobs before shutting down.
- Clean Up Old Jobs: Periodically archive or delete completed and failed jobs to keep the table size manageable.
- Use Transactions: Always use transactions when updating job status to prevent race conditions.
- Consider Job Dependencies: For complex workflows, you might want to add support for job dependencies where one job can only start after others complete.
When to Use This System
This PostgreSQL-based job queue is perfect for:
- Small to medium-sized applications
- Teams already using PostgreSQL
- Applications with basic job processing needs
- Situations where operational simplicity is important
Consider using specialized job queue systems like RabbitMQ or Redis-based solutions when you need:
- Very high throughput (thousands of jobs per second)
- Complex routing patterns
- Built-in monitoring and management UI
- Message persistence guarantees
Summary
We’ve built a robust job queue system that:
- Handles concurrent job processing
- Provides reliable job execution with retries
- Supports scheduled and prioritized jobs
- Uses PostgreSQL’s powerful features for job locking and atomic updates
The best part? It’s built using tools you probably already have in your stack! Feel free to extend this system with features like: Job cancellation, Progress reporting, Job dependencies, Web dashboard for monitoring
Remember to always monitor your job queue and set up appropriate alerting to catch issues early!
If you’re interested in diving deeper into system design and backend development, be sure to follow me for more insights, tips, and practical examples. Together, we can explore the intricacies of creating efficient systems, optimizing database performance, and mastering the tools that drive modern applications. Join me on this journey to enhance your skills and stay updated on the latest trends in the tech world! 🚀
Read the design system in bahasa on iniakunhuda.com