CodePilot
Background Workers

BullMQ

BullMQ configuration and job processing in CodePilot's worker service.

CodePilot uses BullMQ as its job queue library. BullMQ is a Node.js library for managing distributed jobs and messages backed by Redis.

Why BullMQ?

  • Redis-backed — Persistent, distributed, and battle-tested
  • Priority queues — Ensure critical jobs are processed first
  • Retry logic — Exponential backoff with configurable attempts
  • Concurrency control — Limit parallel job execution
  • Stall detection — Detect and recover from stuck jobs
  • Job events — Track job lifecycle with events and listeners

Queue Configuration

import { Queue } from "bullmq";

const connection = {
  host: process.env.REDIS_HOST || "localhost",
  port: parseInt(process.env.REDIS_PORT || "6379"),
};

export const githubRepoQueue = new Queue("github-repository", {
  connection,
  defaultJobOptions: {
    attempts: 3,
    backoff: {
      type: "exponential",
      delay: 5000,
    },
    removeOnComplete: true,
    removeOnFail: false,
  },
});

Worker Configuration

import { Worker } from "bullmq";

const worker = new Worker(
  "github-repository",
  async (job) => {
    // Job processing logic
    console.log(`Processing job ${job.id}: ${job.name}`);

    switch (job.name) {
      case "INGEST":
        await processIngestion(job.data);
        break;
      case "INCREMENTAL_INGEST":
        await processIncrementalIngestion(job.data);
        break;
      default:
        throw new Error(`Unknown job type: ${job.name}`);
    }
  },
  {
    connection,
    concurrency: 1,
  }
);

// Event handlers
worker.on("completed", (job) => {
  console.log(`Job ${job.id} completed`);
});

worker.on("failed", (job, err) => {
  console.error(`Job ${job?.id} failed:`, err.message);
});

worker.on("stalled", (jobId) => {
  console.warn(`Job ${jobId} stalled`);
});

Job Options

OptionDefaultDescription
attempts3Max retry attempts
backoff.type"exponential"Retry delay strategy
backoff.delay5000Base delay in milliseconds
removeOnCompletetrueRemove job data after success
removeOnFailfalseKeep failed jobs for debugging
priority0Lower = higher priority

Internal Queue

The worker uses an internal queue for orchestrating sub-tasks within a single ingestion job:

// Internal queue for processing files within an ingestion job
class InternalQueue {
  private tasks: Array<() => Promise<void>> = [];

  add(task: () => Promise<void>) {
    this.tasks.push(task);
  }

  async processAll(concurrency: number = 5) {
    // Process tasks with controlled concurrency
    for (let i = 0; i < this.tasks.length; i += concurrency) {
      const batch = this.tasks.slice(i, i + concurrency);
      await Promise.all(batch.map(task => task()));
    }
  }
}

This allows file-level parallelism within a single ingestion job while maintaining job-level sequential processing.

Monitoring

BullMQ integrates with Redis Stack's monitoring capabilities. You can inspect queues, jobs, and worker status using:

  • RedisInsight — Available at http://localhost:8001 when using the provided Docker Compose
  • BullMQ Dashboard — third-party dashboards like Bull Board can be added for richer monitoring

Queue Monitoring

RedisInsight provides a visual interface for inspecting BullMQ queues, including job counts, processing times, and failure rates.

On this page