Batch Processing
Async batch API lifecycle -- submitting requests, polling for results, cancellation, database records, and cost savings.
The Anthropic Message Batches API processes requests asynchronously at 50% of the standard cost. Trovella's @repo/ai package wraps this API with database-backed tracking through the ai_batch table and automatic per-result usage recording.
When to Use Batch
Batch processing is appropriate when:
- The results are not needed in real time (no user waiting)
- You have multiple independent requests that can be submitted together
- Cost savings justify the processing delay (up to 24 hours, typically minutes)
Current and planned use cases:
- Re-embedding content -- when the embedding model or chunking strategy changes, all existing content needs re-processing
- Bulk extraction -- extracting structured data from a large set of documents
- Batch analysis -- running the same analysis prompt across many research artifacts
Lifecycle
submitBatch() Anthropic API pollBatchResults()
| | |
|-- create batch -----→| |
|←- batch ID ----------| |
| | |
|-- insert ai_batch -->| |
| (status: pending) | |
| |-- processing ---------|
| | |
| |←- pollBatch() --------|
| |-- status: processing →|
| | |
| |←- pollBatch() --------|
| |-- status: ended -----→|
| |-- stream results ----→|
| | |-- recordCompletion()
| | | per succeeded result
| | |-- update ai_batch
Status Transitions
pending --> processing --> ended
--> canceled (via cancelBatch)
--> expired (Anthropic timeout)
The canceling intermediate state occurs between calling cancelBatch() and Anthropic confirming the cancellation.
Submitting a Batch
const { batchId, anthropicBatchId } = await ctx.ai.submitBatch({
requests: documents.map((doc) => ({
customId: doc.id,
chatOptions: {
messages: [{ role: "user", content: doc.text }],
system: "Extract key entities from this document.",
model: "claude-haiku-4-5",
},
})),
feature: "batch-extraction",
});
What submitBatch() Does
- Calls
buildRequestParams()for each request to translateChatOptionsinto Anthropic API format - Sends the batch to
messages.batches.create() - Inserts an
ai_batchrow withstatus: "pending"and theanthropicBatchId - Returns both the internal
batchId(UUID) and the Anthropic batch ID
The feature field applies to all requests in the batch. Individual requests use Omit<ChatOptions, "feature"> -- the feature is specified once at the batch level.
Polling for Results
Batch processing is asynchronous. You must poll for completion:
const result = await ctx.ai.pollBatch(batchId, anthropicBatchId, "batch-extraction");
if (result.status === "ended") {
for (const item of result.results ?? []) {
switch (item.type) {
case "succeeded":
// item.message is a full Anthropic Message
processResult(item.customId, item.message);
break;
case "errored":
// item.error has { type, message }
handleError(item.customId, item.error);
break;
case "expired":
// Request was not processed before timeout
requeueItem(item.customId);
break;
}
}
}
What pollBatchResults() Does
- Calls
messages.batches.retrieve()to check the batch status - Maps the Anthropic status to the internal status enum and updates
ai_batch - If the batch has ended:
- Streams all results via
messages.batches.results() - For each
succeededresult, callsrecordCompletion()to writeai_usageandai_call_detailsrows (withbatchIdlinking them to the batch) - Returns the full results array with typed discriminated unions
- Streams all results via
- If not ended, returns just the current status
Polling from Inngest
In practice, batch polling is driven by an Inngest job that runs on a schedule:
// Simplified example
export const pollBatches = inngest.createFunction(
{ id: "poll-ai-batches" },
{ cron: "*/5 * * * *" }, // Every 5 minutes
async ({ step }) => {
const pendingBatches = await step.run("get-pending", () => getPendingBatches());
for (const batch of pendingBatches) {
await step.run(`poll-${batch.id}`, () =>
pollBatchResults(ctx, batch.id, batch.anthropicBatchId, batch.feature),
);
}
},
);
Cancelling a Batch
await ctx.ai.cancelBatch(batchId, anthropicBatchId);
This sends a cancel request to Anthropic and updates the ai_batch row to "canceling". The batch transitions to "canceled" on the next poll.
Database Schema: ai_batch
The ai_batch table tracks batch lifecycle separately from individual call records:
| Column | Type | Purpose |
|---|---|---|
id | text (UUID) | Internal batch ID |
organizationId | text | Tenant scope |
userId | text | Who submitted the batch |
feature | text | Cost attribution (shared across all requests) |
anthropicBatchId | text | Anthropic's batch identifier |
status | enum | pending, processing, ended, canceling, canceled, expired |
totalRequests | integer | Number of requests in the batch |
succeededCount | integer | Requests that completed successfully |
erroredCount | integer | Requests that failed |
expiredCount | integer | Requests that timed out |
completedAt | timestamp | When the batch finished processing |
Individual results from a batch are recorded as normal ai_usage + ai_call_details rows, linked back to the batch via ai_usage.batchId.
Cost Savings
Batch processing runs at 50% of the standard API price. For example, if Sonnet 4.6 costs $3.00/M input tokens and $15.00/M output tokens for synchronous calls, batch calls cost $1.50/M input and $7.50/M output.
The cost estimate recorded in ai_usage.estimatedCost for batch results uses the standard pricing rates (the ai_model_pricing table currently stores standard rates, not batch-specific rates). The batch_input and batch_output pricing types exist in the schema for future use when batch-specific pricing rows are seeded.
Error Handling
Errors from the batch API are mapped through the same mapAnthropicError() pipeline as synchronous calls, producing structured AIError instances with code, statusCode, retryable, and retryAfterMs fields. See the Error Handling section in the Reasoning Overview.
Individual request errors within a completed batch are returned as { type: "errored", customId, error } in the results array. These are not thrown -- the caller must handle them explicitly.