Trovella Wiki

Batch Processing

Async batch API lifecycle -- submitting requests, polling for results, cancellation, database records, and cost savings.

The Anthropic Message Batches API processes requests asynchronously at 50% of the standard cost. Trovella's @repo/ai package wraps this API with database-backed tracking through the ai_batch table and automatic per-result usage recording.

When to Use Batch

Batch processing is appropriate when:

  • The results are not needed in real time (no user waiting)
  • You have multiple independent requests that can be submitted together
  • Cost savings justify the processing delay (up to 24 hours, typically minutes)

Current and planned use cases:

  • Re-embedding content -- when the embedding model or chunking strategy changes, all existing content needs re-processing
  • Bulk extraction -- extracting structured data from a large set of documents
  • Batch analysis -- running the same analysis prompt across many research artifacts

Lifecycle

submitBatch()          Anthropic API          pollBatchResults()
    |                      |                       |
    |-- create batch -----→|                       |
    |←- batch ID ----------|                       |
    |                      |                       |
    |-- insert ai_batch -->|                       |
    |   (status: pending)  |                       |
    |                      |-- processing ---------|
    |                      |                       |
    |                      |←- pollBatch() --------|
    |                      |-- status: processing →|
    |                      |                       |
    |                      |←- pollBatch() --------|
    |                      |-- status: ended -----→|
    |                      |-- stream results ----→|
    |                      |                       |-- recordCompletion()
    |                      |                       |   per succeeded result
    |                      |                       |-- update ai_batch

Status Transitions

pending --> processing --> ended
                      --> canceled  (via cancelBatch)
                      --> expired   (Anthropic timeout)

The canceling intermediate state occurs between calling cancelBatch() and Anthropic confirming the cancellation.

Submitting a Batch

const { batchId, anthropicBatchId } = await ctx.ai.submitBatch({
  requests: documents.map((doc) => ({
    customId: doc.id,
    chatOptions: {
      messages: [{ role: "user", content: doc.text }],
      system: "Extract key entities from this document.",
      model: "claude-haiku-4-5",
    },
  })),
  feature: "batch-extraction",
});

What submitBatch() Does

  1. Calls buildRequestParams() for each request to translate ChatOptions into Anthropic API format
  2. Sends the batch to messages.batches.create()
  3. Inserts an ai_batch row with status: "pending" and the anthropicBatchId
  4. Returns both the internal batchId (UUID) and the Anthropic batch ID

The feature field applies to all requests in the batch. Individual requests use Omit<ChatOptions, "feature"> -- the feature is specified once at the batch level.

Polling for Results

Batch processing is asynchronous. You must poll for completion:

const result = await ctx.ai.pollBatch(batchId, anthropicBatchId, "batch-extraction");

if (result.status === "ended") {
  for (const item of result.results ?? []) {
    switch (item.type) {
      case "succeeded":
        // item.message is a full Anthropic Message
        processResult(item.customId, item.message);
        break;
      case "errored":
        // item.error has { type, message }
        handleError(item.customId, item.error);
        break;
      case "expired":
        // Request was not processed before timeout
        requeueItem(item.customId);
        break;
    }
  }
}

What pollBatchResults() Does

  1. Calls messages.batches.retrieve() to check the batch status
  2. Maps the Anthropic status to the internal status enum and updates ai_batch
  3. If the batch has ended:
    • Streams all results via messages.batches.results()
    • For each succeeded result, calls recordCompletion() to write ai_usage and ai_call_details rows (with batchId linking them to the batch)
    • Returns the full results array with typed discriminated unions
  4. If not ended, returns just the current status

Polling from Inngest

In practice, batch polling is driven by an Inngest job that runs on a schedule:

// Simplified example
export const pollBatches = inngest.createFunction(
  { id: "poll-ai-batches" },
  { cron: "*/5 * * * *" }, // Every 5 minutes
  async ({ step }) => {
    const pendingBatches = await step.run("get-pending", () => getPendingBatches());

    for (const batch of pendingBatches) {
      await step.run(`poll-${batch.id}`, () =>
        pollBatchResults(ctx, batch.id, batch.anthropicBatchId, batch.feature),
      );
    }
  },
);

Cancelling a Batch

await ctx.ai.cancelBatch(batchId, anthropicBatchId);

This sends a cancel request to Anthropic and updates the ai_batch row to "canceling". The batch transitions to "canceled" on the next poll.

Database Schema: ai_batch

The ai_batch table tracks batch lifecycle separately from individual call records:

ColumnTypePurpose
idtext (UUID)Internal batch ID
organizationIdtextTenant scope
userIdtextWho submitted the batch
featuretextCost attribution (shared across all requests)
anthropicBatchIdtextAnthropic's batch identifier
statusenumpending, processing, ended, canceling, canceled, expired
totalRequestsintegerNumber of requests in the batch
succeededCountintegerRequests that completed successfully
erroredCountintegerRequests that failed
expiredCountintegerRequests that timed out
completedAttimestampWhen the batch finished processing

Individual results from a batch are recorded as normal ai_usage + ai_call_details rows, linked back to the batch via ai_usage.batchId.

Cost Savings

Batch processing runs at 50% of the standard API price. For example, if Sonnet 4.6 costs $3.00/M input tokens and $15.00/M output tokens for synchronous calls, batch calls cost $1.50/M input and $7.50/M output.

The cost estimate recorded in ai_usage.estimatedCost for batch results uses the standard pricing rates (the ai_model_pricing table currently stores standard rates, not batch-specific rates). The batch_input and batch_output pricing types exist in the schema for future use when batch-specific pricing rows are seeded.

Error Handling

Errors from the batch API are mapped through the same mapAnthropicError() pipeline as synchronous calls, producing structured AIError instances with code, statusCode, retryable, and retryAfterMs fields. See the Error Handling section in the Reasoning Overview.

Individual request errors within a completed batch are returned as { type: "errored", customId, error } in the results array. These are not thrown -- the caller must handle them explicitly.

On this page