Queues
How CodePilot uses job queues to process repository ingestion and AI tasks asynchronously.
CodePilot uses a producer-consumer queue architecture to handle long-running tasks like repository ingestion, embedding generation, and AI code review asynchronously.
Architecture
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ Express │ enqueue │ Redis │ dequeue │ Worker │
│ API │────────▶│ (BullMQ) │────────▶│ (BullMQ) │
│ (Producer) │ │ Queue │ │ (Consumer) │
└──────────────┘ └──────────────┘ └──────────────┘The API server acts as a producer, enqueueing jobs when users connect repositories or GitHub webhooks fire. The worker service acts as a consumer, processing jobs from the queue.
Job Types
| Job Type | Trigger | Description |
|---|---|---|
INGEST | User connects a repository | Full repository clone, parse, chunk, embed, and store |
INCREMENTAL_INGEST | GitHub push webhook | Re-process only changed files |
Producer (API)
The API enqueues jobs via producer classes:
// Full ingestion
await GithubRepositoryProducer.ingest({
repositoryId: repo.id,
installationId: installation.id,
repoFullName: "owner/repo-name",
defaultBranch: "main",
});
// Incremental ingestion (webhook-triggered)
await GithubRepositoryProducer.incrementalIngest({
repositoryId: repo.id,
installationId: installation.id,
repoFullName: "owner/repo-name",
commitSha: "abc123",
});
// Bulk ingestion (multi-repo onboarding)
await GithubRepositoryProducer.ingestBulk(repositories);Consumer (Worker)
The worker processes jobs with concurrency control and error handling:
const worker = new Worker(
QUEUE_NAME,
async (job) => {
switch (job.name) {
case "INGEST":
await processIngestion(job.data);
break;
case "INCREMENTAL_INGEST":
await processIncrementalIngestion(job.data);
break;
}
},
{
connection: redisConnection,
concurrency: 1, // Process one job at a time
}
);Concurrency
Ingestion jobs run with concurrency: 1 to avoid overwhelming Ollama with parallel embedding requests. This ensures stable memory usage and consistent throughput.
Job Lifecycle
- Waiting — Job is in the queue, waiting to be picked up
- Active — Worker is processing the job
- Completed — Job finished successfully
- Failed — Job encountered an error (may be retried)
- Delayed — Job is scheduled for retry after a delay
Error Handling
Jobs include built-in retry logic:
await queue.add("INGEST", jobData, {
attempts: 3,
backoff: {
type: "exponential",
delay: 5000, // 5s, 10s, 20s
},
removeOnComplete: true,
removeOnFail: false,
});Failed jobs are preserved for debugging and can be retried manually.