非流式调用需要等 Claude 生成完毕才返回,用户体验差。开启 Streaming 后,Claude 边生成边返回,用户看到逐字打印的效果,体验大幅提升。
工作原理
Streaming 基于 Server-Sent Events(SSE)。API 返回一个持续的事件流,每个事件包含部分内容,客户端按顺序拼接即可得到完整响应。
Python 流式基础用法
python
import anthropic
client = anthropic.Anthropic()
# 方式 1:使用 stream() 上下文管理器(推荐)
with client.messages.stream(
model="claude-sonnet-4-5",
max_tokens=1024,
messages=[{"role": "user", "content": "Write a short story about a robot"}]
) as stream:
for text in stream.text_stream:
print(text, end='', flush=True)
# 获取最终消息对象(含 usage 统计)
final_message = stream.get_final_message()
print(f'\nTokens used: {final_message.usage.input_tokens} in, {final_message.usage.output_tokens} out')处理所有事件类型
python
with client.messages.stream(
model="claude-sonnet-4-5",
max_tokens=1024,
messages=[{"role": "user", "content": "Hello"}]
) as stream:
for event in stream:
event_type = type(event).__name__
if event_type == "RawContentBlockDeltaEvent":
# 文本增量
if hasattr(event.delta, 'text'):
print(event.delta.text, end='', flush=True)
elif event_type == "RawMessageStartEvent":
# 消息开始,含模型信息
print(f'Model: {event.message.model}')
elif event_type == "RawMessageStopEvent":
# 消息结束
print('\n[Done]')低级 API(手动处理 SSE)
python
# 更底层的控制,适合需要自定义处理逻辑
with client.messages.stream(
model="claude-sonnet-4-5",
max_tokens=1024,
messages=[{"role": "user", "content": "Count to 10"}]
) as stream:
accumulated_text = ''
for text in stream.text_stream:
accumulated_text += text
print(text, end='', flush=True)
# 流结束后获取完整信息
final = stream.get_final_message()
print(f'\nStop reason: {final.stop_reason}')Node.js 流式用法
javascript
import Anthropic from '@anthropic-ai/sdk';
const client = new Anthropic();
async function streamResponse() {
const stream = await client.messages.stream({
model: 'claude-sonnet-4-5',
max_tokens: 1024,
messages: [{ role: 'user', content: 'Write a haiku' }]
});
for await (const chunk of stream) {
if (chunk.type === 'content_block_delta' && chunk.delta.type === 'text_delta') {
process.stdout.write(chunk.delta.text);
}
}
const finalMessage = await stream.finalMessage();
console.log('\nUsage:', finalMessage.usage);
}
streamResponse();流式工具调用
python
tools = [{
"name": "get_weather",
"description": "Get current weather",
"input_schema": {
"type": "object",
"properties": {"location": {"type": "string"}},
"required": ["location"]
}
}]
with client.messages.stream(
model="claude-sonnet-4-5",
max_tokens=1024,
tools=tools,
messages=[{"role": "user", "content": "What's the weather in Shanghai?"}]
) as stream:
tool_input_json = ''
current_tool = None
for event in stream:
ename = type(event).__name__
if ename == "RawContentBlockStartEvent":
if event.content_block.type == "tool_use":
current_tool = event.content_block.name
print(f"Calling tool: {current_tool}")
elif ename == "RawContentBlockDeltaEvent":
if hasattr(event.delta, "partial_json"):
tool_input_json += event.delta.partial_json
elif ename == "RawContentBlockStopEvent":
if tool_input_json:
import json
tool_input = json.loads(tool_input_json)
print(f"Tool input: {tool_input}")
# 执行工具并继续对话生产环境:错误处理与重试
python
import time
def stream_with_retry(messages, max_retries=3):
for attempt in range(max_retries):
try:
with client.messages.stream(
model="claude-sonnet-4-5",
max_tokens=2048,
messages=messages
) as stream:
full_text = ''
for text in stream.text_stream:
yield text
full_text += text
return # 成功则退出
except anthropic.APIStatusError as e:
if e.status_code == 529: # Overloaded
wait = 2 ** attempt
print(f'API overloaded, retrying in {wait}s...')
time.sleep(wait)
else:
raise
except anthropic.APIConnectionError:
if attempt == max_retries - 1:
raise
time.sleep(1)前端集成(FastAPI + EventSource)
python
# FastAPI 后端
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
app = FastAPI()
@app.post('/chat/stream')
async def chat_stream(body: dict):
def generate():
with client.messages.stream(
model="claude-sonnet-4-5",
max_tokens=2048,
messages=[{"role": "user", "content": body["message"]}]
) as stream:
for text in stream.text_stream:
yield f'data: {json.dumps({"text": text})}\n\n'
yield 'data: [DONE]\n\n'
return StreamingResponse(generate(), media_type='text/event-stream')javascript
// 前端
const response = await fetch('/chat/stream', {
method: 'POST',
headers: {'Content-Type': 'application/json'},
body: JSON.stringify({ message: userInput })
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const lines = decoder.decode(value).split('\n');
for (const line of lines) {
if (line.startsWith('data: ') && line !== 'data: [DONE]') {
const data = JSON.parse(line.slice(6));
outputElement.textContent += data.text;
}
}
}来源:Streaming - Anthropic 官方文档