流式输出让用户看到 Claude 「边想边说」,而不是等待整个回复生成完再显示, 大幅提升交互体验——尤其是长回复时效果最明显。
为什么用流式输出?
| 对比 | 普通请求 | 流式请求 |
|---|---|---|
| 用户等待时间 | 完整生成才显示(可能 10s+) | 几百毫秒内开始看到文字 |
| 感知响应速度 | 慢 | 快(提升 50%+ 主观体验) |
| 适合场景 | 程序批处理 | 聊天界面、代码生成 |
| 实现复杂度 | 简单 | 略复杂(需处理 SSE 事件流) |
Python 流式输出
方式一:with_streaming_response(推荐)
python
import anthropic
client = anthropic.Anthropic()
with client.messages.stream(
model="claude-sonnet-4-6",
max_tokens=2048,
messages=[{"role": "user", "content": "写一个完整的 Python 爬虫示例"}],
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)
# 获取最终消息(含 usage 统计)
final = stream.get_final_message()
print(f"
共消耗 {final.usage.input_tokens} 输入 + {final.usage.output_tokens} 输出 tokens")方式二:raw 事件流
python
with client.messages.stream(...) as stream:
for event in stream:
if event.type == "content_block_delta":
if event.delta.type == "text_delta":
print(event.delta.text, end="", flush=True)
elif event.type == "message_stop":
print("\n[生成完毕]")异步版本(async/await)
python
import asyncio
import anthropic
async def stream_response(prompt: str):
client = anthropic.AsyncAnthropic()
async with client.messages.stream(
model="claude-sonnet-4-6",
max_tokens=1024,
messages=[{"role": "user", "content": prompt}],
) as stream:
async for text in stream.text_stream:
print(text, end="", flush=True)
asyncio.run(stream_response("解释什么是协程"))Node.js / TypeScript 流式输出
typescript
import Anthropic from "@anthropic-ai/sdk";
const client = new Anthropic();
async function streamChat(prompt: string) {
const stream = await client.messages.stream({
model: "claude-sonnet-4-6",
max_tokens: 2048,
messages: [{ role: "user", content: prompt }],
});
// 方式一:async iterator
for await (const chunk of stream) {
if (
chunk.type === "content_block_delta" &&
chunk.delta.type === "text_delta"
) {
process.stdout.write(chunk.delta.text);
}
}
// 方式二:事件监听
stream.on("text", (text) => process.stdout.write(text));
stream.on("message", (msg) => console.log("
使用量:", msg.usage));
await stream.done();
}SSE 事件类型完整说明
流式响应由多个 SSE 事件组成:
event: message_start ← 消息开始,包含 message_id 和初始 usage
data: {"type": "message_start", "message": {"id": "msg_xxx", ...}}
event: content_block_start ← 内容块开始(文本或工具调用)
data: {"type": "content_block_start", "index": 0, "content_block": {"type": "text"}}
event: content_block_delta ← 增量文本(核心事件,持续发送)
data: {"type": "content_block_delta", "delta": {"type": "text_delta", "text": "你好"}}
event: content_block_stop ← 内容块结束
data: {"type": "content_block_stop", "index": 0}
event: message_delta ← 消息结束,包含 stop_reason 和最终 usage
data: {"type": "message_delta", "delta": {"stop_reason": "end_turn"}, "usage": {...}}
event: message_stop ← 流结束
data: {"type": "message_stop"}
在 Web 框架中实现流式 API
FastAPI(Python)
python
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import anthropic
app = FastAPI()
client = anthropic.Anthropic()
@app.get("/chat/stream")
async def chat_stream(prompt: str):
async def generate():
with client.messages.stream(
model="claude-sonnet-4-6",
max_tokens=1024,
messages=[{"role": "user", "content": prompt}],
) as stream:
for text in stream.text_stream:
# SSE 格式
yield f"data: {json.dumps({'text': text})}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(generate(), media_type="text/event-stream")Next.js App Router(Edge Runtime)
typescript
// app/api/chat/route.ts
import Anthropic from "@anthropic-ai/sdk";
export const runtime = "edge";
export async function POST(req: Request) {
const { messages } = await req.json();
const client = new Anthropic();
const encoder = new TextEncoder();
const stream = new ReadableStream({
async start(controller) {
const anthropicStream = await client.messages.stream({
model: "claude-sonnet-4-6",
max_tokens: 2048,
messages,
});
for await (const chunk of anthropicStream) {
if (
chunk.type === "content_block_delta" &&
chunk.delta.type === "text_delta"
) {
controller.enqueue(
encoder.encode(`data: ${JSON.stringify({ text: chunk.delta.text })}\n\n`)
);
}
}
controller.enqueue(encoder.encode("data: [DONE]\n\n"));
controller.close();
},
});
return new Response(stream, {
headers: {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
},
});
}前端消费 SSE
typescript
// 使用 EventSource API
const evtSource = new EventSource("/api/chat/stream?prompt=你好");
evtSource.onmessage = (event) => {
if (event.data === "[DONE]") {
evtSource.close();
return;
}
const { text } = JSON.parse(event.data);
setResponse((prev) => prev + text);
};
// 或使用 fetch + ReadableStream(支持 POST)
const resp = await fetch("/api/chat", {
method: "POST",
body: JSON.stringify({ messages }),
headers: { "Content-Type": "application/json" },
});
const reader = resp.body!.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value);
// 解析 SSE 数据
}来源:Anthropic 官方文档 - docs.anthropic.com/en/api/messages-streaming