Skip to main content

为什么需要流式

想象 AI 需要 30 秒才能生成完整回答——如果等 30 秒后才一次性显示,用户体验是灾难性的。 流式响应让用户实时看到 AI 的思考过程
  • 文字逐字出现,用户能提前判断方向是否正确
  • 工具调用的参数在生成过程中就能预览
  • 长时间任务不会让用户觉得”卡死了”

BetaRawMessageStreamEvent 核心事件类型

流式 API 返回的是一系列 BetaRawMessageStreamEvent,每种事件类型对应流式响应的不同阶段(src/services/api/claude.ts):
message_start           ← 消息开始,包含 model、usage 初始值
  ├── content_block_start   ← 内容块开始(text / tool_use / thinking)
  │   ├── content_block_delta  ← 增量数据(text_delta / input_json_delta / thinking_delta)
  │   ├── content_block_delta  ← ... 持续到达
  │   └── content_block_stop   ← 内容块结束,yield AssistantMessage
  ├── content_block_start   ← 下一个内容块...
  │   └── ...
  └── message_delta       ← stop_reason + 最终 usage
message_stop            ← 消息结束

事件处理状态机

src/services/api/claude.ts:1980-2298 实现了一个基于 switch(part.type) 的状态机:
事件类型处理逻辑状态变更
message_start初始化 partialMessage,记录 TTFT(首字节延迟)usage 初始化
content_block_startpart.index 创建对应类型的内容块contentBlocks[index] 初始化
content_block_delta按子类型增量追加数据text / thinking / input 累加
content_block_stop构建完整 AssistantMessage 并 yield消息推入 newMessages
message_delta更新 stop_reason 和最终 usage写回最后一条消息
message_stop无操作(流结束标记)

内容块类型及其增量数据

content_block_start 中的 content_block.type 决定了如何处理后续 delta:
内容块类型Delta 类型累加逻辑
texttext_deltatext += delta.text
thinkingthinking_delta + signature_deltathinking += delta.thinkingsignature = delta.signature
tool_useinput_json_deltainput += delta.partial_json(JSON 字符串增量拼接)
server_tool_useinput_json_delta同 tool_use
connector_textconnector_text_delta特殊连接器文本(feature flag 控制)
关键设计:content_block_start 时所有文本字段初始化为空字符串,只通过 content_block_delta 累加。这是因为 SDK 有时在 start 和 delta 中重复发送相同文本。

文本 chunk 和 tool_use block 的交织

一次 AI 响应可能包含多个内容块,交替出现:
content_block_start (text, index=0)     "我来帮你修复这个 bug。"
content_block_delta  (text_delta)       "首先..."
content_block_stop  (index=0)
content_block_start (tool_use, index=1) { name: "Read", input: "..." }
content_block_delta  (input_json_delta) '{"file_p' → 'ath":' → '"src/foo.ts"}'
content_block_stop  (index=1)
content_block_start (text, index=2)     "我已经看到了问题所在..."
content_block_stop  (index=2)
每个 content_block_stop 触发一次 yield,将完整的 AssistantMessage 推送给消费者。这意味着一个 AI 响应会产生多条 AssistantMessage——文本消息和工具调用消息交替产出。 stop_reason 要等到 message_delta 才确定(可能是 end_turntool_usemax_tokens 等),所以最后一条消息的 stop_reason回写的:
// claude.ts:2246 — 直接属性修改,不用对象替换
// 因为 transcript 写队列持有 message.message 的引用
const lastMsg = newMessages.at(-1)
if (lastMsg) {
  lastMsg.message.usage = usage
  lastMsg.message.stop_reason = stopReason
}

流式中的错误处理

网络断开

流式连接依赖 SSE(Server-Sent Events)。当连接中断时:
  1. Stream idle watchdog:定时检测事件间隔,超过阈值(stall)触发告警和重试
  2. Stream abort:如果 watchdog 检测到长时间无事件,抛出错误进入重试流程
  3. 非流式降级:作为最后手段,回退到非流式请求(一次性获取完整响应)
// claude.ts:2338-2355 — 检测空流
// 1. 完全没有事件 → 代理返回了非 SSE 响应
// 2. 有 message_start 但没有 content_block_stop → 流被截断

API 限流

当 API 返回限流错误时,系统使用 withRetry 包装器进行指数退避重试。重试逻辑考虑了:
  • 错误类型(429 限流 vs 500 服务器错误)
  • 重试次数上限
  • 退避间隔

Token 超限

两种 token 超限场景有不同的处理:
场景stop_reason处理方式
输出超限max_tokens生成错误消息,建议设置 CLAUDE_CODE_MAX_OUTPUT_TOKENS
上下文窗口超限model_context_window_exceeded触发 compaction 压缩对话历史后重试
// claude.ts:2267-2293
if (stopReason === 'max_tokens') {
  yield createAssistantAPIErrorMessage({ error: 'max_output_tokens', ... })
}
if (stopReason === 'model_context_window_exceeded') {
  // 复用 max_output_tokens 的恢复路径
  yield createAssistantAPIErrorMessage({ error: 'max_output_tokens', ... })
}

流式停滞检测

系统持续监控事件到达间隔,检测”停滞”(stall):
// claude.ts:1940-1966
const STALL_THRESHOLD_MS = 10_000  // 10 秒无事件视为停滞
if (timeSinceLastEvent > STALL_THRESHOLD_MS) {
  stallCount++
  totalStallTime += timeSinceLastEvent
  logEvent('tengu_streaming_stall', { stall_duration_ms, stall_count, ... })
}
多个 stall 累积后,watchdog 可能决定中断流并触发重试。

工具执行的流式反馈

BashTool 的命令执行也是流式的——通过 onProgress 回调逐行推送输出:
BashTool.call() → runShellCommand() → AsyncGenerator
  ├── 每秒轮询输出文件 → onProgress(lastLines, allLines, ...)
  ├── yield { type: 'progress', output, fullOutput, elapsedTimeSeconds }
  └── return { code, stdout, interrupted, ... }
UI 层通过 useToolCallProgress hook 实时展示命令输出,而不是等命令完全结束。长时间运行的命令还支持自动后台化(shouldAutoBackground)。

多 Provider 适配

Provider流式协议特殊处理
Anthropic Direct原生 SSE延迟最低,TTFT 最快
AWS BedrockAWS SDK 流式接口需要额外的 beta header 和认证
Google VertexgRPC → 事件流通过 getMergedBetas() 适配
AzureAnthropic 兼容 API自定义 base URL
所有 Provider 通过统一的 Stream<BetaRawMessageStreamEvent> 抽象层屏蔽差异。上层代码(QueryEngine、REPL)不需要关心底层用的是哪个 Provider。

Provider 选择

src/utils/model/providers.ts 中的 getAPIProvider() 根据配置决定使用哪个 Provider:
// 根据 api_provider 配置选择:
// "anthropic" → 直连
// "bedrock"   → AWS SDK
// "vertex"    → Google SDK
// 第三方 base URL → 自动检测
每个 Provider 需要适配的细节包括:认证方式、beta header、请求参数格式、错误码映射——但这些差异在 claude.tsqueryStream() 函数中被统一处理。