第九章:工程实践——值得学习的设计模式

“好的代码不仅正确,还能让读它的人学到东西。”


9.1 快速路径分发(Fast Path Dispatch)

设计目标

CLI 工具对启动速度极其敏感。用户敲 claude --version 不应该等待 50 万行代码加载完毕。快速路径分发的核心思想是:为轻量请求提供捷径,避免加载重量模块

实现机制

src/entrypoints/cli.tsx 的设计原则是:所有 import 都是动态的(await import()),没有任何静态导入重模块

flowchart TD Entry["cli.tsx 入口
(零静态导入)"] --> CheckArgs{"检查 process.argv"} CheckArgs -->|"--version"| FastPath1["直接输出 MACRO.VERSION
零模块加载,毫秒返回"] CheckArgs -->|"--daemon-worker"| FastPath2["仅加载 workerRegistry.js"] CheckArgs -->|"--dump-system-prompt"| FastPath3["仅加载 config + model + prompts"] CheckArgs -->|"bridge/remote-control"| FastPath4["仅加载 auth + bridge"] CheckArgs -->|"daemon"| FastPath5["仅加载 config + sinks + daemon"] CheckArgs -->|"ps/logs/attach/kill"| FastPath6["仅加载 config + bg.js"] CheckArgs -->|"无匹配"| FullLoad["加载完整 main.tsx
(~135ms)"] style FastPath1 fill:#e8f5e9 style FullLoad fill:#fff3e0

关键代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/**
* Bootstrap entrypoint - checks for special flags before loading the full CLI.
* All imports are dynamic to minimize module evaluation for fast paths.
* Fast-path for --version has zero imports beyond this file.
*/

// 最快路径:零动态导入,直接返回
if (args.length === 1 && (args[0] === '--version' || args[0] === '-v')) {
// MACRO.VERSION 在构建时内联
console.log(`${MACRO.VERSION} (Claude Code)`)
return
}

// 次级快速路径:仅加载最小模块集
if (args[0] === '--daemon-worker') {
const { startWorker } = await import('../daemon/workerRegistry.js')
await startWorker(args.slice(1))
return
}

// ... 其他快速路径 ...

// 全功能路径(仅在无匹配时才触发)
const { startCapturingEarlyInput } = await import('../utils/earlyInput.js')
startCapturingEarlyInput()
const { main: cliMain } = await import('../main.js')
await cliMain()

设计原则

原则 说明
零静态导入 入口文件不 import 任何重模块,全部用 await import()
分层拦截 最快的路径(--version)最先检查,逐步递进
最小依赖 每个快速路径只加载该路径必需的模块
延迟加载 只有确认需要完整功能时才加载主模块

9.2 并行预取(Parallel Prefetch)

设计目标

即使需要加载完整功能,也可以通过将 I/O 等待与 CPU 计算重叠来优化启动时间。并行预取的核心思想是:尽早发起 I/O,在其完成之前做其他有用的事

三阶段并行策略

sequenceDiagram participant Main as main.tsx 模块求值 participant MDM as MDM 子进程 participant KC as Keychain 子进程 participant Import as 180+ import 语句 participant Init as init() 初始化 participant Prefetch as 延迟预取 Note over Main: Phase 1: 模块求值时启动子进程 Main->>MDM: startMdmRawRead()(plutil/reg query) Main->>KC: startKeychainPrefetch()(security 命令) Main->>Import: 开始执行 180+ import 语句(~135ms) Note over MDM,Import: 子进程与 import 并行执行 MDM-->>MDM: 执行中(~65ms) KC-->>KC: 执行中(~65ms) Note over Main: Phase 2: preAction Hook 等待结果 Import-->>Main: import 完成 Main->>MDM: await ensureMdmSettingsLoaded() MDM-->>Main: 已完成(几乎零等待) Main->>KC: await ensureKeychainPrefetchCompleted() KC-->>Main: 已完成(几乎零等待) Main->>Init: await init() Note over Main: Phase 3: REPL 就绪后延迟预取 Init-->>Main: 初始化完成 Main->>Prefetch: startDeferredPrefetches() Prefetch->>Prefetch: initUser() / getUserContext() Prefetch->>Prefetch: prefetchSystemContext() Prefetch->>Prefetch: prefetchAwsCredentials() Prefetch->>Prefetch: refreshModelCapabilities()

关键代码

Phase 1:模块求值时火速启动子进程

1
2
3
4
5
6
7
8
9
10
11
12
// main.tsx 顶部 —— 在所有 import 之前!
import { profileCheckpoint } from './utils/startupProfiler.js'
profileCheckpoint('main_tsx_entry')

import { startMdmRawRead } from './utils/settings/mdm/rawRead.js'
startMdmRawRead() // 启动 plutil 子进程(macOS)或 reg query(Windows)

import { startKeychainPrefetch } from './utils/secureStorage/keychainPrefetch.js'
startKeychainPrefetch() // 启动两个 security find-generic-password 子进程

// 接下来是 180+ 个 import 语句(~135ms CPU 时间)
// 此时子进程已在后台并行执行...

Keychain 预取的并行实现

1
2
3
4
5
6
7
8
9
10
11
12
export function startKeychainPrefetch(): void {
if (process.platform !== 'darwin' || prefetchPromise || isBareMode()) return

// 同时启动两个 keychain 读取(OAuth + Legacy)
const oauthSpawn = spawnSecurity(getMacOsKeychainStorageServiceName(CREDENTIALS_SERVICE_SUFFIX))
const legacySpawn = spawnSecurity(getMacOsKeychainStorageServiceName())

prefetchPromise = Promise.all([oauthSpawn, legacySpawn]).then(([oauth, legacy]) => {
if (!oauth.timedOut) primeKeychainCacheFromPrefetch(oauth.stdout)
if (!legacy.timedOut) legacyApiKeyPrefetch = { stdout: legacy.stdout }
})
}

Phase 2:在需要时等待(几乎零等待)

1
2
3
4
5
program.hook('preAction', async thisCommand => {
// 子进程在 import 期间已完成,这里几乎不阻塞
await Promise.all([ensureMdmSettingsLoaded(), ensureKeychainPrefetchCompleted()])
await init()
})

Phase 3:REPL 就绪后的延迟预取

1
2
3
4
5
6
7
8
9
10
export function startDeferredPrefetches(): void {
// 这些预取在用户开始输入时后台执行
void initUser() // 用户信息
void getUserContext() // 用户上下文
void prefetchSystemContextIfSafe() // 系统上下文
void prefetchAwsCredentialsAndBedRockInfo() // AWS 凭证
void countFilesRoundedRg(getCwd(), AbortSignal.timeout(3000), []) // 文件计数
void refreshModelCapabilities() // 模型能力
void settingsChangeDetector.initialize() // 设置监听
}

时间线可视化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
时间轴 ────────────────────────────────────────────────────→

Phase 1 (模块求值):
├─ MDM 子进程 ─────── (~65ms) ─┐
├─ Keychain 子进程 ── (~65ms) ─┤ ← 与 import 并行
└─ 180+ import 语句 ─ (~135ms) ┘

Phase 2 (Commander preAction):
├─ await MDM ─── (0ms,已完成)
├─ await KC ──── (0ms,已完成)
└─ init() ────── (~50ms)

Phase 3 (用户输入时):
├─ initUser() ──── 后台
├─ prefetchAWS() ─ 后台
└─ ... 其他预取 ── 后台

用户可以开始输入
(首次 API 调用时预取已完成)

9.3 异步生成器实现流式处理(AsyncGenerator Pipeline)

设计目标

Agent 循环需要一边执行一边向 UI 推送中间结果——LLM 的流式文本、工具调用进度、状态变化事件。异步生成器(async function*)完美解决了这个"生产者-消费者"问题。

生成器管道架构

flowchart LR subgraph 最内层["queryLoop()"] direction TB A1["while(true)"] --> A2["yield stream_request_start"] A2 --> A3["for await (model response)
yield 每个 token"] A3 --> A4["for await (tool results)
yield 工具进度"] A4 --> A5{"terminal?"} A5 -->|No| A1 A5 -->|Yes| A6["return Terminal"] end subgraph 中间层["query()"] B1["yield* queryLoop(...)"] end subgraph 外层["QueryEngine.submitMessage()"] C1["for await (query(...))
transform → SDKMessage"] end subgraph UI层["REPL / SDK Consumer"] D1["for await (engine.submitMessage(...))
渲染到终端"] end 最内层 -->|"yield events"| 中间层 中间层 -->|"yield events"| 外层 外层 -->|"yield SDKMessage"| UI层

关键代码

最内层:queryLoop() —— 状态机 + 流式产出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
async function* queryLoop(
params: QueryParams,
consumedCommandUuids: string[],
): AsyncGenerator<
StreamEvent | RequestStartEvent | Message | TombstoneMessage | ToolUseSummaryMessage,
Terminal // 返回类型:循环终止原因
> {
let state: LoopState = { type: 'initial' }

while (true) {
// 通知 UI:新的 API 请求开始
yield { type: 'stream_request_start' }

// 流式消费 LLM 响应,逐个 yield 给上层
for await (const message of deps.callModel(...)) {
yield message // 每个 token 都实时推送
}

// 流式消费工具执行结果
for await (const update of toolUpdates) {
yield update // 工具进度实时推送
}

// 状态转移:决定继续循环还是终止
const next = computeNextState(state, toolResults)
if (next.type === 'terminal') {
return next // 返回终止原因(completed / max_turns / aborted)
}
state = next
continue // 回到循环顶部
}
}

中间层:query() —— 委托 + 后处理

1
2
3
4
5
6
7
8
9
10
11
12
export async function* query(params: QueryParams): AsyncGenerator<...> {
const consumedCommandUuids: string[] = []

// yield* 将内层所有事件透传给外层
const terminal = yield* queryLoop(params, consumedCommandUuids)

// 循环结束后的清理工作
for (const uuid of consumedCommandUuids) {
notifyCommandLifecycle(uuid, 'completed')
}
return terminal
}

外层:QueryEngine.submitMessage() —— 事件转换

1
2
3
4
5
6
7
async *submitMessage(prompt, options): AsyncGenerator<SDKMessage, void, unknown> {
for await (const event of query({ messages, systemPrompt, ... })) {
// 将内部事件格式转换为 SDK 公共消息格式
const sdkMessage = transformToSDKMessage(event)
if (sdkMessage) yield sdkMessage
}
}

为什么用生成器而不用回调/EventEmitter?

对比维度 回调/EventEmitter AsyncGenerator
控制流 调用方被动接收 消费方主动拉取(for await
背压 需要手动实现 天然具备(消费方不拉取,生产方自动暂停)
可组合性 难以链式组合 yield* 委托,管道式组合
错误传播 需要特殊错误事件 标准 try/catch
状态机表达 外部变量 + switch while + continue,流程自然
返回值 无法返回最终结果 return Terminal(终止原因)
可读性 事件分散,难追踪 线性执行流,一目了然

9.4 依赖注入提高可测试性(Dependency Injection)

设计目标

Agent 循环的核心 query() 函数依赖 LLM API 调用、上下文压缩等外部 I/O。测试时需要替换为 mock 实现,但传统的 jest.spyOn(module, 'function') 方式存在问题:

  • 每个测试文件都需要 6-8 行 spy 设置样板代码
  • 模块级 mock 容易产生意外副作用
  • 签名变更时 mock 不会报类型错误

实现方案

src/query/deps.ts 实现了一个精简的依赖注入容器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// 使用 typeof 保持签名自动同步
export type QueryDeps = {
// -- model
callModel: typeof queryModelWithStreaming

// -- compaction
microcompact: typeof microcompactMessages
autocompact: typeof autoCompactIfNeeded

// -- platform
uuid: () => string
}

// 生产环境工厂
export function productionDeps(): QueryDeps {
return {
callModel: queryModelWithStreaming,
microcompact: microcompactMessages,
autocompact: autoCompactIfNeeded,
uuid: randomUUID,
}
}

消费方(query.ts)

1
2
3
4
5
6
7
// deps 是可选参数,默认使用生产实现
const deps = params.deps ?? productionDeps()

// 内部通过 deps 调用,而非直接 import
for await (const message of deps.callModel(...)) { ... }
await deps.autocompact(messages, model, ...)
const id = deps.uuid()

测试使用

1
2
3
4
5
6
7
8
9
10
// 测试中直接注入 fake,零 spy 样板
const result = yield* query({
...baseParams,
deps: {
callModel: async function*() { yield fakeResponse },
microcompact: async (msgs) => msgs, // 跳过压缩
autocompact: async () => ({ wasCompacted: false }),
uuid: () => 'test-uuid-001',
},
})

类型安全保证

flowchart LR Real["queryModelWithStreaming
(真实实现)"] -->|"typeof"| Type["QueryDeps.callModel
(类型定义)"] Type -->|"约束"| Fake["testCallModel
(测试 fake)"] Real -.->|"签名变更"| Type Type -.->|"编译错误"| Fake style Type fill:#e3f2fd

typeof fn 模式的精妙之处:当真实函数签名变更时,所有不匹配的测试 fake 都会产生编译期错误,无需运行时才发现 mock 不匹配。

设计原则

原则 说明
范围最小化 只注入 4 个真正需要 mock 的 I/O 依赖,不过度抽象
类型驱动 typeof 自动同步签名,零维护成本
默认生产 params.deps ?? productionDeps(),调用方无感知
渐进扩展 注释明确说"后续 PR 可加入 runTools, logEvent 等"

9.5 工具即数据(Tool as Data)

设计目标

传统面向对象设计中,44 个工具会形成一个继承体系(BaseTool → ReadOnlyTool → FileReadTool)。Claude Code 选择了另一条路:工具不是类实例,是符合接口的数据对象

实现方案

flowchart TB subgraph 传统OOP["传统 OOP 方式 ❌"] Base["class BaseTool"] --> ReadOnly["class ReadOnlyTool"] ReadOnly --> FileRead["class FileReadTool"] ReadOnly --> Grep["class GrepTool"] Base --> Writable["class WritableTool"] Writable --> FileEdit["class FileEditTool"] Writable --> Bash["class BashTool"] end subgraph DataPattern["Tool as Data 方式 ✓"] Factory["buildTool(def)"] --> Tool1["GlobTool 对象"] Factory --> Tool2["GrepTool 对象"] Factory --> Tool3["BashTool 对象"] Factory --> Tool4["FileReadTool 对象"] Defaults["TOOL_DEFAULTS
(共享默认行为)"] --> Factory end style 传统OOP fill:#ffebee style DataPattern fill:#e8f5e9

buildTool() 工厂:合并默认值,而非继承

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
const TOOL_DEFAULTS = {
isEnabled: () => true,
isConcurrencySafe: () => false,
isReadOnly: () => false,
isDestructive: () => false,
checkPermissions: (input) => Promise.resolve({ behavior: 'allow', updatedInput: input }),
userFacingName: () => '',
}

export function buildTool<D extends AnyToolDef>(def: D): BuiltTool<D> {
return {
...TOOL_DEFAULTS,
userFacingName: () => def.name,
...def, // 用户定义覆盖默认值
} as BuiltTool<D>
}

具体工具:一个对象字面量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
export const GlobTool = buildTool({
name: 'Glob',
searchHint: 'find files by name pattern or wildcard',
maxResultSizeChars: 100_000,

async description() { return DESCRIPTION },
get inputSchema() { return inputSchema() },

// 覆盖默认值
isConcurrencySafe() { return true },
isReadOnly() { return true },

async call(input, context) {
// 具体执行逻辑
},

mapToolResultToToolResultBlockParam(output, toolUseID) { ... },
} satisfies ToolDef<InputSchema, Output>)

工具注册:平坦数组 + 条件组合

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
export function getAllBaseTools(): Tools {
return [
AgentTool,
BashTool,
...(hasEmbeddedSearchTools() ? [] : [GlobTool, GrepTool]),
FileReadTool,
FileEditTool,
// ... 条件包含更多工具
]
}

// 合并内置工具 + MCP 外部工具,去重
export function assembleToolPool(permissionContext, mcpTools): Tools {
const builtInTools = getTools(permissionContext)
const allowedMcpTools = filterToolsByDenyRules(mcpTools, permissionContext)
return uniqBy([...builtInTools].sort(byName).concat(allowedMcpTools.sort(byName)), 'name')
}

数据对象 vs 类实例的优势

维度 类实例 数据对象
注册 需要工厂/容器 直接放入数组
过滤 需要类型检查 tools.filter(t => t.isReadOnly())
排序 需要 Comparable 接口 tools.sort((a, b) => a.name.localeCompare(b.name))
序列化 需要序列化器 天然可 JSON 化(inputSchema → JSON Schema)
条件包含 条件 new + 注册 ...(condition ? [Tool] : [])
组合 多重继承/Mixin 对象展开 { ...defaults, ...specific }
类型安全 extends/implements satisfies ToolDef<Input, Output>

9.6 深度不可变(DeepImmutable)

设计目标

AppStateToolPermissionContext 等核心数据结构在 50+ 个文件中被传递和读取。如果任何消费方意外修改了这些状态,将导致难以追踪的 bug。DeepImmutable<T>编译期防止一切意外修改。

实现机制

1
2
3
4
5
6
7
8
// 递归地将所有属性标记为 readonly
type DeepImmutable<T> =
T extends Function ? T :
T extends Map<infer K, infer V> ? ReadonlyMap<K, DeepImmutable<V>> :
T extends Set<infer V> ? ReadonlySet<DeepImmutable<V>> :
T extends Array<infer V> ? readonly DeepImmutable<V>[] :
T extends object ? { readonly [K in keyof T]: DeepImmutable<T[K]> } :
T

应用场景

1. 权限上下文——全局共享,绝对不可变

1
2
3
4
5
6
7
8
9
export type ToolPermissionContext = DeepImmutable<{
mode: PermissionMode
additionalWorkingDirectories: Map<string, AdditionalWorkingDirectory>
alwaysAllowRules: ToolPermissionRulesBySource
alwaysDenyRules: ToolPermissionRulesBySource
alwaysAskRules: ToolPermissionRulesBySource
isBypassPermissionsModeAvailable: boolean
shouldAvoidPermissionPrompts?: boolean
}>

2. AppState——交集类型逃逸阀

1
2
3
4
5
6
7
8
9
10
11
12
// 主体使用 DeepImmutable(50+ 字段)
export type AppState = DeepImmutable<{
settings: SettingsJson
verbose: boolean
mainLoopModel: ModelSetting
toolPermissionContext: ToolPermissionContext
// ... ~50 more fields
}> & {
// 包含函数类型的字段排除在外(函数不能 readonly)
tasks: { [taskId: string]: TaskState }
agentNameRegistry: Map<string, AgentId>
}

3. 组件 Props——快照语义

1
2
3
4
5
6
7
8
// 组件接收只读快照,无法修改 store 中的原始数据
function BackgroundTasksDialog({ task }: {
task: DeepImmutable<LocalShellTaskState>
}) { ... }

function ShellDetailDialog({ shell }: {
shell: DeepImmutable<LocalShellTaskState>
}) { ... }

状态修改的唯一路径

flowchart LR subgraph 只读消费["只读消费(编译期强制)"] C1["query.ts"] C2["tools/*"] C3["components/*"] end subgraph 状态所有者["状态所有者(唯一写入点)"] Store["AppStateStore
setState(updater)"] end Store -->|"DeepImmutable"| C1 Store -->|"DeepImmutable"| C2 Store -->|"DeepImmutable"| C3 C1 -.->|"❌ 编译错误
Cannot assign to readonly"| Store style 只读消费 fill:#e8f5e9 style 状态所有者 fill:#e3f2fd

设计原则

原则 说明
编译期保护 不依赖运行时 Object.freeze(),零性能开销
交集逃逸 & { mutableFields } 为函数类型/不可冻结字段提供出口
快照语义 组件拿到的是快照引用,生命周期内数据不变
单一写入点 只有 setState(updater) 能产生新状态

9.7 构建时特性裁剪(Build-time Feature Gating)

设计目标

同一份代码可能需要产出不同功能集的产品(内部版 vs 公开版)。不需要的功能代码应该在编译阶段就被删除,不增加安装包体积,也不泄露内部实现细节。

两层门控架构

flowchart TD subgraph BuildTime["构建时(Bun Bundler)"] Feature["feature('KAIROS')"] -->|"内部构建: true"| Include["代码保留"] Feature -->|"外部构建: false"| DCE["Dead Code Elimination
代码 + 字符串被删除"] end subgraph Runtime["运行时(GrowthBook/Statsig)"] Include --> GB["getFeatureValue('tengu_xxx', false)"] GB -->|"true"| Enable["功能启用"] GB -->|"false"| Disable["功能禁用"] end DCE --> Never["永远不执行
(代码不存在于产物中)"] style DCE fill:#ffebee style Never fill:#ffcdd2 style Enable fill:#e8f5e9

关键代码模式

模式 1:正向三元表达式(推荐)

1
2
3
4
5
6
7
8
import { feature } from 'bun:bundle'

// 正向三元模式 —— 确保 DCE 能消除字符串字面量
export function isUltrathinkEnabled(): boolean {
return feature('ULTRATHINK')
? getFeatureValue_CACHED_MAY_BE_STALE('tengu_turtle_carbon', true)
: false // 外部构建:整个函数体被优化为 return false
}

注意:负向模式 if (!feature(...)) return 无法让 bundler 消除后续代码中的字符串字面量。文档明确要求使用正向三元。

模式 2:条件模块导入(整个模块 DCE)

1
2
3
4
5
6
7
8
// 外部构建中,这些 require() 及其目标模块被完全删除
const reactiveCompact = feature('REACTIVE_COMPACT')
? (require('./services/compact/reactiveCompact.js') as typeof import('./services/compact/reactiveCompact.js'))
: null

const contextCollapse = feature('CONTEXT_COLLAPSE')
? (require('./services/contextCollapse/index.js') as typeof import('./services/contextCollapse/index.js'))
: null

模式 3:条件数组元素(工具/配置 DCE)

1
2
3
4
5
6
7
const FOREGROUND_529_RETRY_SOURCES = new Set<QuerySource>([
'repl_main_thread',
'sdk',
'compact',
// bash_classifier 是内部专属,feature gate 确保字符串不泄露到外部构建
...(feature('BASH_CLASSIFIER') ? (['bash_classifier'] as const) : []),
])

模式 4:入口级特性开关

1
2
3
4
5
6
// cli.tsx:消融实验基线模式(仅内部构建存在)
if (feature('ABLATION_BASELINE') && process.env.CLAUDE_CODE_ABLATION_BASELINE) {
for (const k of ['CLAUDE_CODE_SIMPLE', 'CLAUDE_CODE_DISABLE_THINKING', ...]) {
process.env[k] ??= '1'
}
}

已知特性标志(50+)

类别 标志名 说明
核心功能 BRIDGE_MODE, COORDINATOR_MODE, VOICE_MODE IDE 桥接、协调者模式、语音输入
模型增强 ULTRATHINK, REACTIVE_COMPACT, CONTEXT_COLLAPSE 超长思考、响应式压缩、上下文折叠
内部工具 KAIROS, KAIROS_BRIEF, BASH_CLASSIFIER 内部专属工具和分类器
实验功能 ABLATION_BASELINE, QUICK_SEARCH, TERMINAL_PANEL A/B 测试、快速搜索、终端面板
基础设施 UDS_INBOX, MONITOR_TOOL, CCR_MIRROR Unix 域套接字、监控工具、镜像
数据 PROMPT_CACHE_BREAK_DETECTION, TRANSCRIPT_CLASSIFIER 缓存击穿检测、转录分类

9.8 多层记忆化(Tiered Memoization)

设计目标

CLI 工具中大量操作是重复性 I/O(查找 git 根目录、解析 JSON、读取配置)。但简单的 lodash.memoize 有严重问题:无界缓存会导致内存泄漏(曾出现 300MB+ 内存占用)。

三种记忆化原语

flowchart TD subgraph TTL["memoizeWithTTL(时间失效)"] direction LR TTL1["首次调用"] -->|"缓存 miss"| TTL2["计算 + 缓存"] TTL2 -->|"TTL 内"| TTL3["直接返回"] TTL3 -->|"TTL 过期"| TTL4["返回旧值 + 后台刷新"] end subgraph TTLAsync["memoizeWithTTLAsync(异步 + 去重)"] direction LR TTLA1["并发请求 A"] -->|"缓存 miss"| TTLA2["发起计算"] TTLA3["并发请求 B"] -->|"检查 inFlight"| TTLA2 TTLA2 --> TTLA4["共享同一个 Promise"] end subgraph LRU["memoizeWithLRU(空间有界)"] direction LR LRU1["缓存满"] -->|"淘汰最旧"| LRU2["LRU 驱逐"] LRU2 --> LRU3["插入新条目"] end style TTL fill:#e3f2fd style TTLAsync fill:#e8f5e9 style LRU fill:#fff3e0

核心实现

memoizeWithTTL —— 写入时刷新(Stale-While-Revalidate)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
export function memoizeWithTTL<Args extends unknown[], Result>(
f: (...args: Args) => Result,
cacheLifetimeMs: number = 5 * 60 * 1000,
): MemoizedFunction<Args, Result> {
const cache = new Map<string, CacheEntry<Result>>()

return (...args: Args): Result => {
const key = jsonStringify(args)
const cached = cache.get(key)

if (!cached) {
// 冷启动:阻塞计算
const value = f(...args)
cache.set(key, { value, timestamp: Date.now(), refreshing: false })
return value
}

if (Date.now() - cached.timestamp > cacheLifetimeMs && !cached.refreshing) {
cached.refreshing = true
// 后台刷新(不阻塞当前调用)
Promise.resolve().then(() => {
const newValue = f(...args)
if (cache.get(key) === cached) { // 防止覆盖并发更新
cache.set(key, { value: newValue, timestamp: Date.now(), refreshing: false })
}
}).catch(e => {
if (cache.get(key) === cached) cache.delete(key)
})
return cached.value // 立即返回旧值
}

return cached.value
}
}

memoizeWithTTLAsync —— 防雷群(Thundering Herd Prevention)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
export function memoizeWithTTLAsync<Args extends unknown[], Result>(
f: (...args: Args) => Promise<Result>,
cacheLifetimeMs: number = 5 * 60 * 1000,
): ... {
const cache = new Map<string, CacheEntry<Result>>()
const inFlight = new Map<string, Promise<Result>>() // 去重并发请求

return async (...args): Promise<Result> => {
const key = jsonStringify(args)

// 并发请求共享同一个 Promise(防止雷群)
const existing = inFlight.get(key)
if (existing) return existing

const promise = f(...args)
inFlight.set(key, promise)
try {
const value = await promise
cache.set(key, { value, timestamp: Date.now(), refreshing: false })
return value
} finally {
inFlight.delete(key)
}
}
}

memoizeWithLRU —— 有界缓存(防内存泄漏)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
export function memoizeWithLRU<Args extends unknown[], Result>(
f: (...args: Args) => Result,
cacheFn: (...args: Args) => string, // 自定义 key 函数
maxCacheSize: number = 100, // 最大条目数
): LRUMemoizedFunction<Args, Result> {
const cache = new LRUCache<string, Result>({ max: maxCacheSize })

const memoized = (...args: Args): Result => {
const key = cacheFn(...args)
const cached = cache.get(key)
if (cached !== undefined) return cached
const value = f(...args)
cache.set(key, value)
return value
}

memoized.cache = {
clear: () => cache.clear(),
size: () => cache.size,
delete: (key) => cache.delete(key),
get: (key) => cache.get(key),
has: (key) => cache.has(key),
}
return memoized
}

使用场景分布

原语 适用场景 典型使用
memoizeWithTTL 配置类数据(变化慢) 设置读取、能力检查
memoizeWithTTLAsync 网络凭证(过期刷新) AWS 凭证、GCP Token、OAuth
memoizeWithLRU 路径/解析类(调用频繁、参数多样) findGitRoot、JSON 解析、Shell 前缀

9.9 弹性错误恢复(Resilient Error Recovery)

设计目标

与 LLM API 的通信面临多种失败模式:限流(429)、过载(529)、认证过期、上下文溢出。系统需要优雅降级而不是直接崩溃。

多策略恢复架构

flowchart TD APICall["API 调用"] --> Error{"错误类型?"} Error -->|"429 限流"| Retry["指数退避重试
base × 2^attempt + jitter"] Error -->|"529 过载"| Check529{"前台请求?"} Check529 -->|"是"| Retry529["重试(最多 3 次)"] Check529 -->|"否(后台任务)"| Bail["立即放弃
(防止级联放大)"] Retry529 -->|"超过 3 次"| Fallback["模型降级
Opus → Sonnet"] Error -->|"401 认证"| AuthRefresh["清除缓存
重新认证"] Error -->|"413 上下文溢出"| Compact["触发 reactive compact
压缩后重试"] Error -->|"fast-mode 拒绝"| FastDeg["Fast 模式降级
切换标准速度"] Error -->|"连接错误"| ConnRetry["退避重试
禁用 Keep-Alive"] subgraph Unattended["无人值守模式"] PersistRetry["无限重试
max backoff 5min
心跳 30s"] end Retry --> PersistRetry style Bail fill:#ffebee style Fallback fill:#fff3e0 style FastDeg fill:#fff3e0

关键实现

指数退避 + 抖动

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
export function getRetryDelay(
attempt: number,
retryAfterHeader?: string | null,
maxDelayMs = 32000,
): number {
// 优先使用服务端建议的重试时间
if (retryAfterHeader) {
const seconds = parseInt(retryAfterHeader, 10)
if (!isNaN(seconds)) return seconds * 1000
}
// 指数退避 + 25% 随机抖动(防止惊群)
const baseDelay = Math.min(BASE_DELAY_MS * Math.pow(2, attempt - 1), maxDelayMs)
const jitter = Math.random() * 0.25 * baseDelay
return baseDelay + jitter
}

后台任务 529 抑制(防级联放大)

1
2
3
4
5
6
7
8
9
10
// 只有前台请求才重试 529
// 后台任务(摘要、标题、建议)立即放弃:
// 容量级联期间,每次重试是 3-10× 网关放大
const FOREGROUND_529_RETRY_SOURCES = new Set<QuerySource>([
'repl_main_thread', 'sdk', 'compact', 'auto_mode', ...
])

function shouldRetry529(querySource: QuerySource | undefined): boolean {
return querySource === undefined || FOREGROUND_529_RETRY_SOURCES.has(querySource)
}

无人值守心跳(防宿主超时)

1
2
3
4
5
6
7
8
9
// 长等待分块为 30s 心跳,防止宿主环境判定会话空闲
let remaining = delayMs
while (remaining > 0) {
if (options.signal?.aborted) throw new APIUserAbortError()
yield createSystemAPIErrorMessage(error, remaining, reportedAttempt, maxRetries)
const chunk = Math.min(remaining, HEARTBEAT_INTERVAL_MS)
await sleep(chunk, options.signal, { abortError })
remaining -= chunk
}

熔断器(Circuit Breaker)

1
2
3
4
5
6
7
// autoCompact 连续失败 3 次后停止重试
// 背景:曾出现 1,279 个会话连续失败 50+ 次,每天浪费 ~250K API 调用
const MAX_CONSECUTIVE_AUTOCOMPACT_FAILURES = 3

if (tracking?.consecutiveFailures >= MAX_CONSECUTIVE_AUTOCOMPACT_FAILURES) {
return { wasCompacted: false } // 熔断,不再尝试
}

9.10 轻量事件系统(Lightweight Event Architecture)

设计目标

传统的 EventEmitter 对于大多数内部通知场景过于重量。Claude Code 实现了三层事件原语,按粒度选择:

三层抽象

flowchart TB subgraph Layer1["第一层:createSignal
(纯事件通知)"] S1["subscribe(listener)"] S2["emit(...args)"] S3["clear()"] S1 --- S2 --- S3 end subgraph Layer2["第二层:createStore
(可观察状态)"] ST1["getState()"] ST2["setState(updater)"] ST3["subscribe(listener)"] ST1 --- ST2 --- ST3 end subgraph Layer3["第三层:EventEmitter
(复杂 UI 事件)"] E1["on(type, handler)"] E2["emit(type, event)"] E3["stopImmediatePropagation()"] E1 --- E2 --- E3 end Layer1 -->|"需要存储状态"| Layer2 Layer2 -->|"需要传播控制"| Layer3 style Layer1 fill:#e8f5e9 style Layer2 fill:#e3f2fd style Layer3 fill:#fff3e0

createSignal —— 42 行实现的发布/订阅

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
export function createSignal<Args extends unknown[] = []>(): Signal<Args> {
const listeners = new Set<(...args: Args) => void>()
return {
subscribe(listener) {
listeners.add(listener)
return () => { listeners.delete(listener) } // 返回取消订阅函数
},
emit(...args) {
for (const listener of listeners) listener(...args)
},
clear() {
listeners.clear()
},
}
}

使用场景(20+ 处)

1
2
3
4
5
6
7
8
9
10
11
// 设置变更通知
const settingsChanged = createSignal<[SettingSource]>()
export const onSettingsChanged = settingsChanged.subscribe

// Fast Mode 状态变化
const fastModeChanged = createSignal()
export const onFastModeChanged = fastModeChanged.subscribe

// 技能文件变更
const skillChanged = createSignal<[string]>()
export const onSkillChanged = skillChanged.subscribe

createStore —— 35 行实现的状态容器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
export function createStore<T>(initialState: T, onChange?: OnChange<T>): Store<T> {
let state = initialState
const listeners = new Set<Listener>()

return {
getState: () => state,
setState: (updater: (prev: T) => T) => {
const prev = state
const next = updater(prev)
if (Object.is(next, prev)) return // 引用相等则跳过通知
state = next
onChange?.({ newState: next, oldState: prev })
for (const listener of listeners) listener()
},
subscribe: (listener: Listener) => {
listeners.add(listener)
return () => listeners.delete(listener)
},
}
}

选择指南

需求 选择 理由
“某事发生了”,无状态 createSignal 最轻量,无内存开销
“状态变了”,需要当前值 createStore getState(),支持 React 集成
复杂 UI 事件(键盘、鼠标) EventEmitter 支持 stopImmediatePropagation()

9.11 上下文压缩系统(Context Compression)

设计目标

LLM 有上下文窗口限制。当对话变长时,需要自动压缩以保持在限制内,同时尽可能保留重要信息。

多层压缩策略

flowchart LR subgraph 策略["压缩策略(成本递增)"] direction TB M1["Micro-compact
清除旧工具结果
(零 API 调用)"] M2["Session Memory Compact
基于记忆裁剪消息
(零 API 调用)"] M3["Full Compact
LLM 摘要整个对话
(1次 API 调用)"] M4["Reactive Compact
响应 413 错误
(紧急压缩)"] end TokenCount["Token 计数"] -->|"接近阈值"| M1 M1 -->|"仍然超限"| M2 M2 -->|"仍然超限"| M3 API413["API 返回 413"] --> M4 style M1 fill:#e8f5e9 style M2 fill:#e3f2fd style M3 fill:#fff3e0 style M4 fill:#ffebee

Micro-compact(最轻量):选择性清除旧工具结果

1
2
3
4
5
6
7
8
// 只清除这些工具的历史输出(它们的结果通常很大且时效性弱)
const COMPACTABLE_TOOLS = new Set<string>([
FILE_READ_TOOL_NAME,
...SHELL_TOOL_NAMES,
GREP_TOOL_NAME, GLOB_TOOL_NAME,
WEB_SEARCH_TOOL_NAME, WEB_FETCH_TOOL_NAME,
FILE_EDIT_TOOL_NAME, FILE_WRITE_TOOL_NAME,
])

Auto-compact 阈值计算

1
2
3
4
5
6
7
8
9
export function getAutoCompactThreshold(model: string): number {
const effectiveContextWindow = getEffectiveContextWindowSize(model)
return effectiveContextWindow - AUTOCOMPACT_BUFFER_TOKENS // 13,000 token 缓冲
}

// 有效窗口 = 上下文窗口 - 预留输出 token(20,000)
export function getEffectiveContextWindowSize(model: string): number {
return getContextWindowForModel(model) - Math.min(getMaxOutputTokens(model), 20_000)
}

熔断保护

1
2
3
4
5
6
// 防止无限重试压缩(曾浪费 250K API 调用/天)
const MAX_CONSECUTIVE_AUTOCOMPACT_FAILURES = 3

export const AUTOCOMPACT_BUFFER_TOKENS = 13_000 // 触发阈值缓冲
export const WARNING_THRESHOLD_BUFFER_TOKENS = 20_000 // 黄色警告
export const ERROR_THRESHOLD_BUFFER_TOKENS = 20_000 // 红色警告

9.12 设计模式总结

mindmap root((工程实践)) 启动优化 快速路径分发 零静态导入 分层拦截 并行预取 子进程与import重叠 延迟预取 核心架构 AsyncGenerator管道 生产者-消费者解耦 背压天然支持 工具即数据 buildTool工厂 平坦数组组合 依赖注入 typeof签名同步 精简4依赖 类型安全 DeepImmutable 编译期不可变 交集逃逸阀 构建时裁剪 feature门控 两层策略 运行时健壮 多层记忆化 TTL写入刷新 LRU防泄漏 异步去重 弹性恢复 指数退避+抖动 模型降级 熔断器 上下文压缩 多策略递进 阈值自适应 事件通信 createSignal createStore EventEmitter

核心原则提炼

# 模式 核心思想 关键收益
1 快速路径分发 为轻量请求提供捷径 –version 零加载,毫秒响应
2 并行预取 I/O 等待与 CPU 计算重叠 启动时间减少 ~65ms
3 AsyncGenerator 生产者-消费者的优雅实现 流式 UI 更新 + 背压控制
4 依赖注入 接口抽象外部依赖 测试零 spy 样板 + 类型安全
5 工具即数据 数据对象取代类继承 注册/过滤/排序/序列化全简化
6 深度不可变 编译期防止状态突变 零运行时开销 + 100% 安全
7 构建时裁剪 两层门控(构建+运行时) 外部构建无内部代码泄露
8 多层记忆化 TTL/LRU/异步去重三种原语 防内存泄漏 + 防雷群
9 弹性恢复 退避/降级/熔断/抑制 250K API 调用/天 → 0 浪费
10 轻量事件 按粒度选择 Signal/Store/Emitter 42 行代码覆盖 80% 场景
11 上下文压缩 多策略递进(成本递增) 对话无限长而不溢出

9.13 核心文件速查表

文件 职责 模式
src/entrypoints/cli.tsx 入口分发:快速路径 + 动态导入 快速路径
src/main.tsx 启动:并行预取 + 延迟加载 并行预取
src/query.ts Agent 循环:AsyncGenerator 状态机 异步生成器
src/query/deps.ts 依赖注入容器(4 个 I/O 依赖) DI
src/Tool.ts 工具类型定义 + buildTool() 工厂 工具即数据
src/tools.ts 工具注册表:平坦数组 + 条件组合 工具即数据
src/state/AppStateStore.ts DeepImmutable<AppState> 定义 深度不可变
src/state/store.ts createStore 可观察状态容器 轻量事件
src/utils/signal.ts createSignal 发布/订阅原语 轻量事件
src/utils/memoize.ts 三种记忆化原语实现 记忆化
src/services/api/withRetry.ts 重试/退避/降级/熔断 弹性恢复
src/services/compact/autoCompact.ts 自动压缩阈值 + 熔断器 上下文压缩
src/services/compact/microCompact.ts 轻量压缩(清除工具结果) 上下文压缩
src/utils/settings/mdm/rawRead.ts MDM 子进程并行读取 并行预取
src/utils/secureStorage/keychainPrefetch.ts Keychain 并行预取 并行预取
src/utils/thinking.ts feature() 两层门控示例 构建时裁剪
src/ink/events/emitter.ts 自定义 EventEmitter(传播控制) 轻量事件