深度

Claude API 流式输出完全指南:Streaming 实时响应原理与最佳实践

Claude API Streaming 流式输出完整教程:工作原理、SSE 事件类型解析、Python/Node.js 代码示例、流式工具调用处理、生产环境错误重试机制、前端 EventSource 集成与 Token 用量统计。

2026/3/154分钟 阅读ClaudeEagle

非流式调用需要等 Claude 生成完毕才返回,用户体验差。开启 Streaming 后,Claude 边生成边返回,用户看到逐字打印的效果,体验大幅提升。

工作原理

Streaming 基于 Server-Sent Events(SSE)。API 返回一个持续的事件流,每个事件包含部分内容,客户端按顺序拼接即可得到完整响应。

Python 流式基础用法

python
import anthropic
client = anthropic.Anthropic()

# 方式 1:使用 stream() 上下文管理器(推荐)
with client.messages.stream(
    model="claude-sonnet-4-5",
    max_tokens=1024,
    messages=[{"role": "user", "content": "Write a short story about a robot"}]
) as stream:
    for text in stream.text_stream:
        print(text, end='', flush=True)

# 获取最终消息对象(含 usage 统计)
    final_message = stream.get_final_message()
    print(f'\nTokens used: {final_message.usage.input_tokens} in, {final_message.usage.output_tokens} out')

处理所有事件类型

python
with client.messages.stream(
    model="claude-sonnet-4-5",
    max_tokens=1024,
    messages=[{"role": "user", "content": "Hello"}]
) as stream:
    for event in stream:
        event_type = type(event).__name__
        
        if event_type == "RawContentBlockDeltaEvent":
            # 文本增量
            if hasattr(event.delta, 'text'):
                print(event.delta.text, end='', flush=True)
        elif event_type == "RawMessageStartEvent":
            # 消息开始,含模型信息
            print(f'Model: {event.message.model}')
        elif event_type == "RawMessageStopEvent":
            # 消息结束
            print('\n[Done]')

低级 API(手动处理 SSE)

python
# 更底层的控制,适合需要自定义处理逻辑
with client.messages.stream(
    model="claude-sonnet-4-5",
    max_tokens=1024,
    messages=[{"role": "user", "content": "Count to 10"}]
) as stream:
    accumulated_text = ''
    for text in stream.text_stream:
        accumulated_text += text
        print(text, end='', flush=True)
    
    # 流结束后获取完整信息
    final = stream.get_final_message()
    print(f'\nStop reason: {final.stop_reason}')

Node.js 流式用法

javascript
import Anthropic from '@anthropic-ai/sdk';
const client = new Anthropic();

async function streamResponse() {
  const stream = await client.messages.stream({
    model: 'claude-sonnet-4-5',
    max_tokens: 1024,
    messages: [{ role: 'user', content: 'Write a haiku' }]
  });

  for await (const chunk of stream) {
    if (chunk.type === 'content_block_delta' && chunk.delta.type === 'text_delta') {
      process.stdout.write(chunk.delta.text);
    }
  }

  const finalMessage = await stream.finalMessage();
  console.log('\nUsage:', finalMessage.usage);
}

streamResponse();

流式工具调用

python
tools = [{
    "name": "get_weather",
    "description": "Get current weather",
    "input_schema": {
        "type": "object",
        "properties": {"location": {"type": "string"}},
        "required": ["location"]
    }
}]

with client.messages.stream(
    model="claude-sonnet-4-5",
    max_tokens=1024,
    tools=tools,
    messages=[{"role": "user", "content": "What's the weather in Shanghai?"}]
) as stream:
    tool_input_json = ''
    current_tool = None
    
    for event in stream:
        ename = type(event).__name__
        if ename == "RawContentBlockStartEvent":
            if event.content_block.type == "tool_use":
                current_tool = event.content_block.name
                print(f"Calling tool: {current_tool}")
        elif ename == "RawContentBlockDeltaEvent":
            if hasattr(event.delta, "partial_json"):
                tool_input_json += event.delta.partial_json
        elif ename == "RawContentBlockStopEvent":
            if tool_input_json:
                import json
                tool_input = json.loads(tool_input_json)
                print(f"Tool input: {tool_input}")
                # 执行工具并继续对话

生产环境:错误处理与重试

python
import time

def stream_with_retry(messages, max_retries=3):
    for attempt in range(max_retries):
        try:
            with client.messages.stream(
                model="claude-sonnet-4-5",
                max_tokens=2048,
                messages=messages
            ) as stream:
                full_text = ''
                for text in stream.text_stream:
                    yield text
                    full_text += text
                return  # 成功则退出
        except anthropic.APIStatusError as e:
            if e.status_code == 529:  # Overloaded
                wait = 2 ** attempt
                print(f'API overloaded, retrying in {wait}s...')
                time.sleep(wait)
            else:
                raise
        except anthropic.APIConnectionError:
            if attempt == max_retries - 1:
                raise
            time.sleep(1)

前端集成(FastAPI + EventSource)

python
# FastAPI 后端
from fastapi import FastAPI
from fastapi.responses import StreamingResponse

app = FastAPI()

@app.post('/chat/stream')
async def chat_stream(body: dict):
    def generate():
        with client.messages.stream(
            model="claude-sonnet-4-5",
            max_tokens=2048,
            messages=[{"role": "user", "content": body["message"]}]
        ) as stream:
            for text in stream.text_stream:
                yield f'data: {json.dumps({"text": text})}\n\n'
            yield 'data: [DONE]\n\n'
    
    return StreamingResponse(generate(), media_type='text/event-stream')
javascript
// 前端
const response = await fetch('/chat/stream', {
  method: 'POST',
  headers: {'Content-Type': 'application/json'},
  body: JSON.stringify({ message: userInput })
});

const reader = response.body.getReader();
const decoder = new TextDecoder();

while (true) {
  const { done, value } = await reader.read();
  if (done) break;
  const lines = decoder.decode(value).split('\n');
  for (const line of lines) {
    if (line.startsWith('data: ') && line !== 'data: [DONE]') {
      const data = JSON.parse(line.slice(6));
      outputElement.textContent += data.text;
    }
  }
}

来源:Streaming - Anthropic 官方文档

相关文章推荐

深度高级提示词工程完全指南 2026:CoT、Few-Shot 与 XML 结构化技巧面向 Claude API 开发者的高级提示词工程完整指南:Chain-of-Thought(思维链)的原理与触发方式、Few-Shot 示例选取策略、Zero-Shot CoT 触发词、XML 标签结构化输出控制(强制 JSON)、角色扮演提示的正确姿势、多步骤任务分解、Claude 专属优化技巧(正向指令 vs 禁止指令)以及提示词 A/B 测试框架。2026/3/21深度Claude API 错误码完全手册:所有错误类型、原因与解决方案Anthropic Claude API 错误码完整参考:authentication_error(401/403)、rate_limit_error(429)、invalid_request_error(400)、api_error(500)、overloaded_error(529)的详细说明,每种错误的常见触发原因、标准解决方案和代码示例(Python/Node.js),以及生产环境的错误处理最佳实践(区分可重试/不可重试错误)。2026/3/18深度Claude API 速率限制完全指南:限额说明、错误处理与优化策略Anthropic Claude API 速率限制完整说明:请求频率限制(RPM)、Token 用量限制(TPM/TPD)、不同使用层级的限额对比(免费层/Build/Scale/Enterprise)、429 错误的标准处理方式(指数退避重试)、提升限额的申请方法、Prompt Caching 和 Batch API 绕过限制的技巧,以及高并发场景的队列设计方案。2026/3/18深度Claude API 批量处理完全指南:Message Batches API 大规模数据处理实战Claude API Message Batches 完整教程:批量 API 原理、与普通 API 的区别(50% 成本折扣)、Python/Node.js 提交批次代码示例、进度追踪与结果获取、错误处理策略、并发批次管理,以及文档摘要/数据分类/批量翻译等典型大规模处理场景实战。2026/3/16深度Claude API 限流完全指南:Rate Limit 报错原因、重试策略与生产环境最佳实践Claude API Rate Limit 完整应对指南:限流类型(RPM/TPM/并发)、各套餐限额表、429 错误处理、指数退避重试实现、Prompt Caching 降低用量、请求队列设计、Tier 升级申请,以及高并发生产环境架构方案。2026/3/15深度Claude API 工具调用完全指南:Tool Use 函数调用从入门到实战Claude API Tool Use(工具调用/函数调用)完整教程:工具定义格式、单工具/多工具调用、工具结果传回、并行工具调用、流式工具调用、Python/Node.js 代码示例,以及构建 AI Agent 工具调用循环的最佳实践。2026/3/15