做 Claude API 集成时,Rate Limit 是最常遇到的挑战。本文讲清楚限流机制、如何处理 429 错误,以及生产环境的架构设计。
限流的三个维度
Anthropic API 有三种限制维度:
| 维度 | 说明 |
|---|---|
| RPM(Requests Per Minute) | 每分钟请求数 |
| TPM(Tokens Per Minute) | 每分钟输入+输出 Token 总数 |
| TPD(Tokens Per Day) | 每日 Token 总量(新账号限制) |
触发 429 的常见场景:
- 批量处理时同时发起太多请求(超 RPM)
- 处理超长文本,单分钟 Token 消耗太大(超 TPM)
- 新账号每日 Token 上限耗尽(超 TPD)
各套餐限额参考
| Tier | RPM | TPM | 升级条件 |
|---|---|---|---|
| Free | 5 | 25K | 充值 $5 |
| Tier 1 | 50 | 50K | 充值 $5 |
| Tier 2 | 1000 | 160K | 消费 $40 |
| Tier 3 | 2000 | 320K | 消费 $200 |
| Tier 4 | 4000 | 800K | 消费 $4000 |
具体以 console.anthropic.com 显示为准,数值会更新
429 错误的响应头信息
python
try:
response = client.messages.create(...)
except anthropic.RateLimitError as e:
# 响应头包含限流信息
print(e.response.headers.get('retry-after')) # 建议等待秒数
print(e.response.headers.get('x-ratelimit-limit-requests')) # RPM 限额
print(e.response.headers.get('x-ratelimit-remaining-requests')) # 剩余
print(e.response.headers.get('x-ratelimit-reset-requests')) # 重置时间指数退避重试(生产标准实现)
python
import anthropic, time, random
def call_with_retry(client, max_retries=5, **kwargs):
for attempt in range(max_retries):
try:
return client.messages.create(**kwargs)
except anthropic.RateLimitError as e:
if attempt == max_retries - 1:
raise
# 指数退避 + 随机抖动
retry_after = int(e.response.headers.get('retry-after', 0))
wait = max(retry_after, (2 ** attempt) + random.uniform(0, 1))
print(f'Rate limited. Waiting {wait:.1f}s (attempt {attempt+1})')
time.sleep(wait)
except anthropic.APIStatusError as e:
if e.status_code == 529: # API overloaded
time.sleep(2 ** attempt)
else:
raise
# 使用
response = call_with_retry(
client,
model="claude-sonnet-4-5",
max_tokens=1024,
messages=[{"role": "user", "content": "Hello"}]
)并发请求队列
批量处理时控制并发数,避免超出 RPM:
python
import asyncio
import anthropic
async def process_batch(items, max_concurrent=5):
client = anthropic.AsyncAnthropic()
semaphore = asyncio.Semaphore(max_concurrent)
async def process_one(item):
async with semaphore:
response = await client.messages.create(
model="claude-haiku-3-5",
max_tokens=200,
messages=[{"role": "user", "content": item}]
)
return response.content[0].text
tasks = [process_one(item) for item in items]
return await asyncio.gather(*tasks, return_exceptions=True)
# 1000 条数据,同时最多 5 个请求
results = asyncio.run(process_batch(data_list, max_concurrent=5))Token 用量监控
python
class TokenTracker:
def __init__(self, tpm_limit=50000):
self.tpm_limit = tpm_limit
self.tokens_this_minute = 0
self.minute_start = time.time()
def track(self, usage):
now = time.time()
if now - self.minute_start > 60:
self.tokens_this_minute = 0
self.minute_start = now
self.tokens_this_minute += usage.input_tokens + usage.output_tokens
utilization = self.tokens_this_minute / self.tpm_limit
if utilization > 0.8:
print(f'Warning: {utilization:.0%} of TPM limit used')
if utilization > 0.95:
sleep_time = 60 - (now - self.minute_start)
print(f'Near limit, sleeping {sleep_time:.0f}s')
time.sleep(max(0, sleep_time))
tracker = TokenTracker(tpm_limit=50000)
response = client.messages.create(...)
tracker.track(response.usage)三大降低用量的策略
策略 1:Prompt Caching
重复内容(系统提示、长文档)启用缓存,读取只需 10% 的 Token 计费。
策略 2:按任务选模型
python
# 分类、摘要等简单任务用 Haiku(成本 1/10)
model = "claude-haiku-3-5" if task_type == "classify" else "claude-sonnet-4-5"策略 3:缩短 Prompt
每减少 100 个 Token,1000 次调用节省 $0.30(Sonnet)。审视 system prompt,删除冗余内容。
申请提升限额
Console -> Settings -> Limits -> Request Limit Increase。
说明:用途、预计月消费、具体需要的 RPM/TPM。通常 1-3 个工作日审批。
高并发架构设计
用户请求
|
v
请求队列(Redis/BullMQ)
|
v
Worker 池(控制并发数)
|
v
Claude API(带重试)
|
v
结果缓存(相同请求命中缓存)
关键参数:
- Worker 数 = RPM 限额 / 平均处理时间(秒)
- 队列超时 = 用户可接受的最大等待时间
- 缓存 TTL = 内容更新频率决定
来源:Rate Limits - Anthropic 官方文档