Background Workers
BullMQ
BullMQ configuration and job processing in CodePilot's worker service.
CodePilot uses BullMQ as its job queue library. BullMQ is a Node.js library for managing distributed jobs and messages backed by Redis.
Why BullMQ?
- Redis-backed — Persistent, distributed, and battle-tested
- Priority queues — Ensure critical jobs are processed first
- Retry logic — Exponential backoff with configurable attempts
- Concurrency control — Limit parallel job execution
- Stall detection — Detect and recover from stuck jobs
- Job events — Track job lifecycle with events and listeners
Queue Configuration
import { Queue } from "bullmq";
const connection = {
host: process.env.REDIS_HOST || "localhost",
port: parseInt(process.env.REDIS_PORT || "6379"),
};
export const githubRepoQueue = new Queue("github-repository", {
connection,
defaultJobOptions: {
attempts: 3,
backoff: {
type: "exponential",
delay: 5000,
},
removeOnComplete: true,
removeOnFail: false,
},
});Worker Configuration
import { Worker } from "bullmq";
const worker = new Worker(
"github-repository",
async (job) => {
// Job processing logic
console.log(`Processing job ${job.id}: ${job.name}`);
switch (job.name) {
case "INGEST":
await processIngestion(job.data);
break;
case "INCREMENTAL_INGEST":
await processIncrementalIngestion(job.data);
break;
default:
throw new Error(`Unknown job type: ${job.name}`);
}
},
{
connection,
concurrency: 1,
}
);
// Event handlers
worker.on("completed", (job) => {
console.log(`Job ${job.id} completed`);
});
worker.on("failed", (job, err) => {
console.error(`Job ${job?.id} failed:`, err.message);
});
worker.on("stalled", (jobId) => {
console.warn(`Job ${jobId} stalled`);
});Job Options
| Option | Default | Description |
|---|---|---|
attempts | 3 | Max retry attempts |
backoff.type | "exponential" | Retry delay strategy |
backoff.delay | 5000 | Base delay in milliseconds |
removeOnComplete | true | Remove job data after success |
removeOnFail | false | Keep failed jobs for debugging |
priority | 0 | Lower = higher priority |
Internal Queue
The worker uses an internal queue for orchestrating sub-tasks within a single ingestion job:
// Internal queue for processing files within an ingestion job
class InternalQueue {
private tasks: Array<() => Promise<void>> = [];
add(task: () => Promise<void>) {
this.tasks.push(task);
}
async processAll(concurrency: number = 5) {
// Process tasks with controlled concurrency
for (let i = 0; i < this.tasks.length; i += concurrency) {
const batch = this.tasks.slice(i, i + concurrency);
await Promise.all(batch.map(task => task()));
}
}
}This allows file-level parallelism within a single ingestion job while maintaining job-level sequential processing.
Monitoring
BullMQ integrates with Redis Stack's monitoring capabilities. You can inspect queues, jobs, and worker status using:
- RedisInsight — Available at http://localhost:8001 when using the provided Docker Compose
- BullMQ Dashboard — third-party dashboards like Bull Board can be added for richer monitoring
Queue Monitoring
RedisInsight provides a visual interface for inspecting BullMQ queues, including job counts, processing times, and failure rates.