对于不需要实时响应的任务(数据处理、内容生成、批量分析), 使用 Messages Batches API 可节省 50% 成本,且不占用普通请求的速率限制。
批量 API vs 普通 API
| 对比 | 普通 API | Batch API |
|---|---|---|
| 响应方式 | 同步(等待结果) | 异步(最多 24 小时完成) |
| 成本 | 标准价格 | 50% 折扣 |
| 速率限制 | 占用 RPM/TPM | 独立限额,不占用 |
| 适合 | 实时交互 | 离线批处理 |
| 最大批次大小 | - | 10,000 个请求 / 32 MB |
Python 快速上手
创建批次
python
import anthropic
client = anthropic.Anthropic()
# 一次提交 3 个请求(实际可到 10,000 个)
batch = client.messages.batches.create(
requests=[
{
"custom_id": "article-001",
"params": {
"model": "claude-haiku-4-5",
"max_tokens": 256,
"messages": [{
"role": "user",
"content": "用一句话总结:人工智能是一种..."
}],
},
},
{
"custom_id": "article-002",
"params": {
"model": "claude-haiku-4-5",
"max_tokens": 256,
"messages": [{
"role": "user",
"content": "用一句话总结:机器学习是一种..."
}],
},
},
]
)
print(f"批次 ID:{batch.id}")
print(f"状态:{batch.processing_status}") # in_progress轮询批次状态
python
import time
def wait_for_batch(batch_id: str, poll_interval: int = 30):
# 轮询直到完成(不要轮询太频繁,最少 30s 一次)
while True:
batch = client.messages.batches.retrieve(batch_id)
status = batch.processing_status
print(f"状态: {status} | "
f"成功: {batch.request_counts.succeeded} | "
f"失败: {batch.request_counts.errored} | "
f"处理中: {batch.request_counts.processing}")
if status == "ended":
break
time.sleep(poll_interval)
return batch
batch = wait_for_batch(batch.id)
print("批次处理完成!")下载并处理结果
python
results = {}
for result in client.messages.batches.results(batch.id):
custom_id = result.custom_id
if result.result.type == "succeeded":
text = result.result.message.content[0].text
results[custom_id] = {"status": "ok", "text": text}
else:
error = result.result.error
results[custom_id] = {"status": "error", "error": str(error)}
# 查看结果
for cid, r in results.items():
if r["status"] == "ok":
print(f"{cid}: {r['text'][:50]}...")
else:
print(f"{cid}: 失败 - {r['error']}")Node.js 示例
typescript
import Anthropic from "@anthropic-ai/sdk";
const client = new Anthropic();
// 创建批次
const batch = await client.messages.batches.create({
requests: articles.map((article, i) => ({
customId: `article-${i}`,
params: {
model: "claude-haiku-4-5",
maxTokens: 200,
messages: [{
role: "user",
content: `一句话总结以下文章:
${article.content}`,
}],
},
})),
});
// 等待完成
let batchStatus = batch;
while (batchStatus.processingStatus !== "ended") {
await new Promise((r) => setTimeout(r, 30_000));
batchStatus = await client.messages.batches.retrieve(batch.id);
}
// 处理结果
for await (const result of await client.messages.batches.results(batch.id)) {
if (result.result.type === "succeeded") {
console.log(result.customId, result.result.message.content[0].text);
}
}完整实战:批量处理 1000 篇文章摘要
python
import anthropic
import json
import time
from pathlib import Path
client = anthropic.Anthropic()
# 1. 读取文章数据
articles = json.loads(Path("articles.json").read_text())
print(f"共 {len(articles)} 篇文章需要处理")
# 2. 分批提交(每批最多 10,000 个)
BATCH_SIZE = 1000
all_batches = []
for i in range(0, len(articles), BATCH_SIZE):
chunk = articles[i:i+BATCH_SIZE]
batch = client.messages.batches.create(
requests=[
{
"custom_id": f"article-{a['id']}",
"params": {
"model": "claude-haiku-4-5",
"max_tokens": 200,
"messages": [{
"role": "user",
"content": f"用两句话概括这篇文章的核心内容:
{a['content'][:2000]}"
}],
},
}
for a in chunk
]
)
all_batches.append(batch.id)
print(f"批次 {len(all_batches)} 已提交,ID: {batch.id}")
# 3. 等待所有批次完成
results = {}
for batch_id in all_batches:
while True:
b = client.messages.batches.retrieve(batch_id)
if b.processing_status == "ended":
break
time.sleep(60) # 1 分钟轮询一次
# 4. 收集结果
for result in client.messages.batches.results(batch_id):
if result.result.type == "succeeded":
article_id = result.custom_id.replace("article-", "")
results[article_id] = result.result.message.content[0].text
# 5. 保存结果
Path("summaries.json").write_text(json.dumps(results, ensure_ascii=False, indent=2))
print(f"完成!共处理 {len(results)} 篇,节省约 50% 成本")适合/不适合批量 API 的场景
适合
- 批量生成产品描述(电商场景)
- 大量文档翻译或摘要
- 训练数据生成和清洗
- 每日报告自动生成
- 内容审核和分类
不适合
- 用户实时交互(聊天机器人)
- 需要立即返回结果的 API
- 多轮对话(每次请求依赖上次结果)
来源:Anthropic 官方文档 - docs.anthropic.com/en/docs/build-with-claude/message-batches