CodePilot
Background Workers

Queues

How CodePilot uses job queues to process repository ingestion and AI tasks asynchronously.

CodePilot uses a producer-consumer queue architecture to handle long-running tasks like repository ingestion, embedding generation, and AI code review asynchronously.

Architecture

┌──────────────┐         ┌──────────────┐         ┌──────────────┐
│   Express    │  enqueue │    Redis     │  dequeue │    Worker    │
│   API        │────────▶│   (BullMQ)   │────────▶│  (BullMQ)    │
│  (Producer)  │         │    Queue     │         │  (Consumer)  │
└──────────────┘         └──────────────┘         └──────────────┘

The API server acts as a producer, enqueueing jobs when users connect repositories or GitHub webhooks fire. The worker service acts as a consumer, processing jobs from the queue.

Job Types

Job TypeTriggerDescription
INGESTUser connects a repositoryFull repository clone, parse, chunk, embed, and store
INCREMENTAL_INGESTGitHub push webhookRe-process only changed files

Producer (API)

The API enqueues jobs via producer classes:

// Full ingestion
await GithubRepositoryProducer.ingest({
  repositoryId: repo.id,
  installationId: installation.id,
  repoFullName: "owner/repo-name",
  defaultBranch: "main",
});

// Incremental ingestion (webhook-triggered)
await GithubRepositoryProducer.incrementalIngest({
  repositoryId: repo.id,
  installationId: installation.id,
  repoFullName: "owner/repo-name",
  commitSha: "abc123",
});

// Bulk ingestion (multi-repo onboarding)
await GithubRepositoryProducer.ingestBulk(repositories);

Consumer (Worker)

The worker processes jobs with concurrency control and error handling:

const worker = new Worker(
  QUEUE_NAME,
  async (job) => {
    switch (job.name) {
      case "INGEST":
        await processIngestion(job.data);
        break;
      case "INCREMENTAL_INGEST":
        await processIncrementalIngestion(job.data);
        break;
    }
  },
  {
    connection: redisConnection,
    concurrency: 1, // Process one job at a time
  }
);

Concurrency

Ingestion jobs run with concurrency: 1 to avoid overwhelming Ollama with parallel embedding requests. This ensures stable memory usage and consistent throughput.

Job Lifecycle

  1. Waiting — Job is in the queue, waiting to be picked up
  2. Active — Worker is processing the job
  3. Completed — Job finished successfully
  4. Failed — Job encountered an error (may be retried)
  5. Delayed — Job is scheduled for retry after a delay

Error Handling

Jobs include built-in retry logic:

await queue.add("INGEST", jobData, {
  attempts: 3,
  backoff: {
    type: "exponential",
    delay: 5000, // 5s, 10s, 20s
  },
  removeOnComplete: true,
  removeOnFail: false,
});

Failed jobs are preserved for debugging and can be retried manually.

Next Steps

On this page