JK
JustKalm
Background Tasks

Job Processing

Scalable background job processing with Redis queues, priority scheduling, and automatic retries for reliable async operations.

45K

Jobs/Hour

Peak throughput

99.7%

Success Rate

Last 24h

1.2s

Avg Latency

Queue to complete

8

Active Workers

Processing now

Job Processing Architecture

Redis-backed job queues with ARQ for async processing.

# Job Processing Architecture

┌─────────────────────────────────────┐
│          API Servers                │
│   (FastAPI - Enqueue Jobs)          │
└─────────────────────────────────────┘
            │
            ▼
┌─────────────────────────────────────┐
│        Redis Queue Broker           │
│  ┌─────────────────────────────┐   │
│  │ high_priority    (0-10s)    │   │
│  │ default          (10-60s)   │   │
│  │ low_priority     (1-5m)     │   │
│  │ scheduled        (cron)     │   │
│  │ dead_letter      (failed)   │   │
│  └─────────────────────────────┘   │
└─────────────────────────────────────┘
            │
            ▼
┌─────────────────────────────────────┐
│         Worker Pool (ARQ)           │
│   ├── 4 high_priority workers       │
│   ├── 4 default workers             │
│   └── 2 low_priority workers        │
└─────────────────────────────────────┘
            │
            ▼
┌─────────────────────────────────────┐
│   Results / Side Effects            │
│   └── DB, Webhooks, Notifications   │
└─────────────────────────────────────┘

ARQ Configuration

# ARQ Worker Configuration

from arq import create_pool
from arq.connections import RedisSettings

redis_settings = RedisSettings(
    host="redis.justkalm.internal",
    port=6379,
    database=1,  # Separate from cache
    password=REDIS_PASSWORD,
)

class WorkerSettings:
    redis_settings = redis_settings
    
    # Job functions
    functions = [
        process_valuation,
        send_webhook,
        generate_report,
        sync_product,
        cleanup_expired,
    ]
    
    # Queue configuration
    queue_name = "default"
    max_jobs = 10
    job_timeout = 300  # 5 minutes
    
    # Retry configuration
    max_tries = 3
    retry_delay = 60  # 1 minute
    
    # Health check
    health_check_interval = 30
    health_check_key = "arq:health"
    
    # Graceful shutdown
    timeout_graceful_shutdown = 30

# Enqueue a job
async def enqueue_valuation(product_id: str):
    redis = await create_pool(redis_settings)
    await redis.enqueue_job(
        "process_valuation",
        product_id,
        _queue_name="high_priority"
    )

Reliable Background Processing

Scalable job queues with guaranteed delivery and automatic retries.

45K Jobs/Hour99.7% Success RatePriority Queues