第九章:工程实践——值得学习的设计模式
“好的代码不仅正确,还能让读它的人学到东西。”
9.1 快速路径分发(Fast Path Dispatch)
设计目标
CLI 工具对启动速度极其敏感。用户敲 claude --version 不应该等待 50 万行代码加载完毕。快速路径分发的核心思想是:为轻量请求提供捷径,避免加载重量模块 。
实现机制
src/entrypoints/cli.tsx 的设计原则是:所有 import 都是动态的(await import()),没有任何静态导入重模块 。
flowchart TD
Entry["cli.tsx 入口 (零静态导入)"] --> CheckArgs{"检查 process.argv"}
CheckArgs -->|"--version"| FastPath1["直接输出 MACRO.VERSION 零模块加载,毫秒返回"]
CheckArgs -->|"--daemon-worker"| FastPath2["仅加载 workerRegistry.js"]
CheckArgs -->|"--dump-system-prompt"| FastPath3["仅加载 config + model + prompts"]
CheckArgs -->|"bridge/remote-control"| FastPath4["仅加载 auth + bridge"]
CheckArgs -->|"daemon"| FastPath5["仅加载 config + sinks + daemon"]
CheckArgs -->|"ps/logs/attach/kill"| FastPath6["仅加载 config + bg.js"]
CheckArgs -->|"无匹配"| FullLoad["加载完整 main.tsx (~135ms)"]
style FastPath1 fill:#e8f5e9
style FullLoad fill:#fff3e0
关键代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 if (args.length === 1 && (args[0 ] === '--version' || args[0 ] === '-v' )) { console .log(`${MACRO.VERSION} (Claude Code)` ) return } if (args[0 ] === '--daemon-worker' ) { const { startWorker } = await import ('../daemon/workerRegistry.js' ) await startWorker(args.slice(1 )) return } const { startCapturingEarlyInput } = await import ('../utils/earlyInput.js' )startCapturingEarlyInput() const { main: cliMain } = await import ('../main.js' )await cliMain()
设计原则
原则
说明
零静态导入
入口文件不 import 任何重模块,全部用 await import()
分层拦截
最快的路径(--version)最先检查,逐步递进
最小依赖
每个快速路径只加载该路径必需的模块
延迟加载
只有确认需要完整功能时才加载主模块
9.2 并行预取(Parallel Prefetch)
设计目标
即使需要加载完整功能,也可以通过将 I/O 等待与 CPU 计算重叠 来优化启动时间。并行预取的核心思想是:尽早发起 I/O,在其完成之前做其他有用的事 。
三阶段并行策略
sequenceDiagram
participant Main as main.tsx 模块求值
participant MDM as MDM 子进程
participant KC as Keychain 子进程
participant Import as 180+ import 语句
participant Init as init() 初始化
participant Prefetch as 延迟预取
Note over Main: Phase 1: 模块求值时启动子进程
Main->>MDM: startMdmRawRead()(plutil/reg query)
Main->>KC: startKeychainPrefetch()(security 命令)
Main->>Import: 开始执行 180+ import 语句(~135ms)
Note over MDM,Import: 子进程与 import 并行执行
MDM-->>MDM: 执行中(~65ms)
KC-->>KC: 执行中(~65ms)
Note over Main: Phase 2: preAction Hook 等待结果
Import-->>Main: import 完成
Main->>MDM: await ensureMdmSettingsLoaded()
MDM-->>Main: 已完成(几乎零等待)
Main->>KC: await ensureKeychainPrefetchCompleted()
KC-->>Main: 已完成(几乎零等待)
Main->>Init: await init()
Note over Main: Phase 3: REPL 就绪后延迟预取
Init-->>Main: 初始化完成
Main->>Prefetch: startDeferredPrefetches()
Prefetch->>Prefetch: initUser() / getUserContext()
Prefetch->>Prefetch: prefetchSystemContext()
Prefetch->>Prefetch: prefetchAwsCredentials()
Prefetch->>Prefetch: refreshModelCapabilities()
关键代码
Phase 1:模块求值时火速启动子进程
1 2 3 4 5 6 7 8 9 10 11 12 import { profileCheckpoint } from './utils/startupProfiler.js' profileCheckpoint('main_tsx_entry' ) import { startMdmRawRead } from './utils/settings/mdm/rawRead.js' startMdmRawRead() import { startKeychainPrefetch } from './utils/secureStorage/keychainPrefetch.js' startKeychainPrefetch()
Keychain 预取的并行实现 :
1 2 3 4 5 6 7 8 9 10 11 12 export function startKeychainPrefetch ( ): void { if (process.platform !== 'darwin' || prefetchPromise || isBareMode()) return const oauthSpawn = spawnSecurity(getMacOsKeychainStorageServiceName(CREDENTIALS_SERVICE_SUFFIX)) const legacySpawn = spawnSecurity(getMacOsKeychainStorageServiceName()) prefetchPromise = Promise .all([oauthSpawn, legacySpawn]).then(([oauth, legacy] ) => { if (!oauth.timedOut) primeKeychainCacheFromPrefetch(oauth.stdout) if (!legacy.timedOut) legacyApiKeyPrefetch = { stdout: legacy.stdout } }) }
Phase 2:在需要时等待(几乎零等待) :
1 2 3 4 5 program.hook('preAction' , async thisCommand => { await Promise .all([ensureMdmSettingsLoaded(), ensureKeychainPrefetchCompleted()]) await init() })
Phase 3:REPL 就绪后的延迟预取 :
1 2 3 4 5 6 7 8 9 10 export function startDeferredPrefetches ( ): void { void initUser() void getUserContext() void prefetchSystemContextIfSafe() void prefetchAwsCredentialsAndBedRockInfo() void countFilesRoundedRg(getCwd(), AbortSignal.timeout(3000 ), []) void refreshModelCapabilities() void settingsChangeDetector.initialize() }
时间线可视化
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 时间轴 ────────────────────────────────────────────────────→ Phase 1 (模块求值): ├─ MDM 子进程 ─────── (~65ms) ─┐ ├─ Keychain 子进程 ── (~65ms) ─┤ ← 与 import 并行 └─ 180+ import 语句 ─ (~135ms) ┘ Phase 2 (Commander preAction): ├─ await MDM ─── (0ms,已完成) ├─ await KC ──── (0ms,已完成) └─ init() ────── (~50ms) Phase 3 (用户输入时): ├─ initUser() ──── 后台 ├─ prefetchAWS() ─ 后台 └─ ... 其他预取 ── 后台 ↓ 用户可以开始输入 (首次 API 调用时预取已完成)
9.3 异步生成器实现流式处理(AsyncGenerator Pipeline)
设计目标
Agent 循环需要一边执行一边向 UI 推送中间结果 ——LLM 的流式文本、工具调用进度、状态变化事件。异步生成器(async function*)完美解决了这个"生产者-消费者"问题。
生成器管道架构
flowchart LR
subgraph 最内层["queryLoop()"]
direction TB
A1["while(true)"] --> A2["yield stream_request_start"]
A2 --> A3["for await (model response) yield 每个 token"]
A3 --> A4["for await (tool results) yield 工具进度"]
A4 --> A5{"terminal?"}
A5 -->|No| A1
A5 -->|Yes| A6["return Terminal"]
end
subgraph 中间层["query()"]
B1["yield* queryLoop(...)"]
end
subgraph 外层["QueryEngine.submitMessage()"]
C1["for await (query(...)) transform → SDKMessage"]
end
subgraph UI层["REPL / SDK Consumer"]
D1["for await (engine.submitMessage(...)) 渲染到终端"]
end
最内层 -->|"yield events"| 中间层
中间层 -->|"yield events"| 外层
外层 -->|"yield SDKMessage"| UI层
关键代码
最内层:queryLoop() —— 状态机 + 流式产出
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 async function * queryLoop ( params: QueryParams, consumedCommandUuids: string [], ): AsyncGenerator < StreamEvent | RequestStartEvent | Message | TombstoneMessage | ToolUseSummaryMessage , Terminal // 返回类型:循环终止原因 > { let state: LoopState = { type : 'initial' } while (true ) { yield { type : 'stream_request_start' } for await (const message of deps.callModel(...)) { yield message } for await (const update of toolUpdates) { yield update } const next = computeNextState(state, toolResults) if (next.type === 'terminal' ) { return next } state = next continue } }
中间层:query() —— 委托 + 后处理
1 2 3 4 5 6 7 8 9 10 11 12 export async function * query (params: QueryParams ): AsyncGenerator <...> { const consumedCommandUuids: string [] = [] const terminal = yield * queryLoop(params, consumedCommandUuids) for (const uuid of consumedCommandUuids) { notifyCommandLifecycle(uuid, 'completed' ) } return terminal }
外层:QueryEngine.submitMessage() —— 事件转换
1 2 3 4 5 6 7 async *submitMessage(prompt, options): AsyncGenerator<SDKMessage, void , unknown> { for await (const event of query({ messages, systemPrompt, ... })) { const sdkMessage = transformToSDKMessage(event) if (sdkMessage) yield sdkMessage } }
为什么用生成器而不用回调/EventEmitter?
对比维度
回调/EventEmitter
AsyncGenerator
控制流
调用方被动接收
消费方主动拉取(for await)
背压
需要手动实现
天然具备(消费方不拉取,生产方自动暂停)
可组合性
难以链式组合
yield* 委托,管道式组合
错误传播
需要特殊错误事件
标准 try/catch
状态机表达
外部变量 + switch
while + continue,流程自然
返回值
无法返回最终结果
return Terminal(终止原因)
可读性
事件分散,难追踪
线性执行流,一目了然
9.4 依赖注入提高可测试性(Dependency Injection)
设计目标
Agent 循环的核心 query() 函数依赖 LLM API 调用、上下文压缩等外部 I/O。测试时需要替换为 mock 实现,但传统的 jest.spyOn(module, 'function') 方式存在问题:
每个测试文件都需要 6-8 行 spy 设置样板代码
模块级 mock 容易产生意外副作用
签名变更时 mock 不会报类型错误
实现方案
src/query/deps.ts 实现了一个精简的依赖注入容器 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 export type QueryDeps = { callModel: typeof queryModelWithStreaming microcompact: typeof microcompactMessages autocompact: typeof autoCompactIfNeeded uuid: () => string } export function productionDeps ( ): QueryDeps { return { callModel: queryModelWithStreaming, microcompact: microcompactMessages, autocompact: autoCompactIfNeeded, uuid: randomUUID, } }
消费方(query.ts) :
1 2 3 4 5 6 7 const deps = params.deps ?? productionDeps()for await (const message of deps.callModel(...)) { ... }await deps.autocompact(messages, model, ...)const id = deps.uuid()
测试使用 :
1 2 3 4 5 6 7 8 9 10 const result = yield * query({ ...baseParams, deps: { callModel: async function *( ) { yield fakeResponse }, microcompact: async (msgs) => msgs, autocompact: async () => ({ wasCompacted: false }), uuid: () => 'test-uuid-001' , }, })
类型安全保证
flowchart LR
Real["queryModelWithStreaming (真实实现)"] -->|"typeof"| Type["QueryDeps.callModel (类型定义)"]
Type -->|"约束"| Fake["testCallModel (测试 fake)"]
Real -.->|"签名变更"| Type
Type -.->|"编译错误"| Fake
style Type fill:#e3f2fd
typeof fn 模式的精妙之处:当真实函数签名变更时,所有不匹配的测试 fake 都会产生编译期错误 ,无需运行时才发现 mock 不匹配。
设计原则
原则
说明
范围最小化
只注入 4 个真正需要 mock 的 I/O 依赖,不过度抽象
类型驱动
typeof 自动同步签名,零维护成本
默认生产
params.deps ?? productionDeps(),调用方无感知
渐进扩展
注释明确说"后续 PR 可加入 runTools, logEvent 等"
设计目标
传统面向对象设计中,44 个工具会形成一个继承体系(BaseTool → ReadOnlyTool → FileReadTool)。Claude Code 选择了另一条路:工具不是类实例,是符合接口的数据对象 。
实现方案
flowchart TB
subgraph 传统OOP["传统 OOP 方式 ❌"]
Base["class BaseTool"] --> ReadOnly["class ReadOnlyTool"]
ReadOnly --> FileRead["class FileReadTool"]
ReadOnly --> Grep["class GrepTool"]
Base --> Writable["class WritableTool"]
Writable --> FileEdit["class FileEditTool"]
Writable --> Bash["class BashTool"]
end
subgraph DataPattern["Tool as Data 方式 ✓"]
Factory["buildTool(def)"] --> Tool1["GlobTool 对象"]
Factory --> Tool2["GrepTool 对象"]
Factory --> Tool3["BashTool 对象"]
Factory --> Tool4["FileReadTool 对象"]
Defaults["TOOL_DEFAULTS (共享默认行为)"] --> Factory
end
style 传统OOP fill:#ffebee
style DataPattern fill:#e8f5e9
buildTool() 工厂:合并默认值,而非继承
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 const TOOL_DEFAULTS = { isEnabled: () => true , isConcurrencySafe: () => false , isReadOnly: () => false , isDestructive: () => false , checkPermissions: (input ) => Promise .resolve({ behavior: 'allow' , updatedInput: input }), userFacingName: () => '' , } export function buildTool <D extends AnyToolDef >(def: D ): BuiltTool <D > { return { ...TOOL_DEFAULTS, userFacingName: () => def.name, ...def, } as BuiltTool<D> }
具体工具:一个对象字面量
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 export const GlobTool = buildTool({ name: 'Glob' , searchHint: 'find files by name pattern or wildcard' , maxResultSizeChars: 100 _000, async description() { return DESCRIPTION }, get inputSchema() { return inputSchema() }, isConcurrencySafe() { return true }, isReadOnly() { return true }, async call(input, context) { }, mapToolResultToToolResultBlockParam(output, toolUseID) { ... }, } satisfies ToolDef<InputSchema, Output>)
工具注册:平坦数组 + 条件组合
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 export function getAllBaseTools ( ): Tools { return [ AgentTool, BashTool, ...(hasEmbeddedSearchTools() ? [] : [GlobTool, GrepTool]), FileReadTool, FileEditTool, ] } export function assembleToolPool (permissionContext, mcpTools ): Tools { const builtInTools = getTools(permissionContext) const allowedMcpTools = filterToolsByDenyRules(mcpTools, permissionContext) return uniqBy([...builtInTools].sort(byName).concat(allowedMcpTools.sort(byName)), 'name' ) }
数据对象 vs 类实例的优势
维度
类实例
数据对象
注册
需要工厂/容器
直接放入数组
过滤
需要类型检查
tools.filter(t => t.isReadOnly())
排序
需要 Comparable 接口
tools.sort((a, b) => a.name.localeCompare(b.name))
序列化
需要序列化器
天然可 JSON 化(inputSchema → JSON Schema)
条件包含
条件 new + 注册
...(condition ? [Tool] : [])
组合
多重继承/Mixin
对象展开 { ...defaults, ...specific }
类型安全
extends/implements
satisfies ToolDef<Input, Output>
9.6 深度不可变(DeepImmutable)
设计目标
AppState、ToolPermissionContext 等核心数据结构在 50+ 个文件中被传递和读取。如果任何消费方意外修改了这些状态,将导致难以追踪的 bug。DeepImmutable<T> 在编译期 防止一切意外修改。
实现机制
1 2 3 4 5 6 7 8 type DeepImmutable<T> = T extends Function ? T : T extends Map<infer K, infer V> ? ReadonlyMap<K, DeepImmutable<V>> : T extends Set<infer V> ? ReadonlySet<DeepImmutable<V>> : T extends Array <infer V> ? readonly DeepImmutable<V>[] : T extends object ? { readonly [K in keyof T]: DeepImmutable<T[K]> } : T
应用场景
1. 权限上下文——全局共享,绝对不可变
1 2 3 4 5 6 7 8 9 export type ToolPermissionContext = DeepImmutable<{ mode: PermissionMode additionalWorkingDirectories: Map<string , AdditionalWorkingDirectory> alwaysAllowRules: ToolPermissionRulesBySource alwaysDenyRules: ToolPermissionRulesBySource alwaysAskRules: ToolPermissionRulesBySource isBypassPermissionsModeAvailable: boolean shouldAvoidPermissionPrompts?: boolean }>
2. AppState——交集类型逃逸阀
1 2 3 4 5 6 7 8 9 10 11 12 export type AppState = DeepImmutable<{ settings: SettingsJson verbose: boolean mainLoopModel: ModelSetting toolPermissionContext: ToolPermissionContext }> & { tasks: { [taskId: string ]: TaskState } agentNameRegistry: Map<string , AgentId> }
3. 组件 Props——快照语义
1 2 3 4 5 6 7 8 function BackgroundTasksDialog ({ task }: { task: DeepImmutable<LocalShellTaskState> } ) { ... }function ShellDetailDialog ({ shell }: { shell: DeepImmutable<LocalShellTaskState> } ) { ... }
状态修改的唯一路径
flowchart LR
subgraph 只读消费["只读消费(编译期强制)"]
C1["query.ts"]
C2["tools/*"]
C3["components/*"]
end
subgraph 状态所有者["状态所有者(唯一写入点)"]
Store["AppStateStore
setState(updater)"]
end
Store -->|"DeepImmutable
"| C1
Store -->|"DeepImmutable"| C2
Store -->|"DeepImmutable"| C3
C1 -.->|"❌ 编译错误 Cannot assign to readonly"| Store
style 只读消费 fill:#e8f5e9
style 状态所有者 fill:#e3f2fd
设计原则
原则
说明
编译期保护
不依赖运行时 Object.freeze(),零性能开销
交集逃逸
& { mutableFields } 为函数类型/不可冻结字段提供出口
快照语义
组件拿到的是快照引用,生命周期内数据不变
单一写入点
只有 setState(updater) 能产生新状态
9.7 构建时特性裁剪(Build-time Feature Gating)
设计目标
同一份代码可能需要产出不同功能集的产品(内部版 vs 公开版)。不需要的功能代码应该在编译阶段就被删除 ,不增加安装包体积,也不泄露内部实现细节。
两层门控架构
flowchart TD
subgraph BuildTime["构建时(Bun Bundler)"]
Feature["feature('KAIROS')"] -->|"内部构建: true"| Include["代码保留"]
Feature -->|"外部构建: false"| DCE["Dead Code Elimination 代码 + 字符串被删除"]
end
subgraph Runtime["运行时(GrowthBook/Statsig)"]
Include --> GB["getFeatureValue('tengu_xxx', false)"]
GB -->|"true"| Enable["功能启用"]
GB -->|"false"| Disable["功能禁用"]
end
DCE --> Never["永远不执行 (代码不存在于产物中)"]
style DCE fill:#ffebee
style Never fill:#ffcdd2
style Enable fill:#e8f5e9
关键代码模式
模式 1:正向三元表达式(推荐)
1 2 3 4 5 6 7 8 import { feature } from 'bun:bundle' export function isUltrathinkEnabled ( ): boolean { return feature('ULTRATHINK' ) ? getFeatureValue_CACHED_MAY_BE_STALE('tengu_turtle_carbon' , true ) : false }
注意 :负向模式 if (!feature(...)) return 无法让 bundler 消除后续代码中的字符串字面量。文档明确要求使用正向三元。
模式 2:条件模块导入(整个模块 DCE)
1 2 3 4 5 6 7 8 const reactiveCompact = feature('REACTIVE_COMPACT' ) ? (require ('./services/compact/reactiveCompact.js' ) as typeof import ('./services/compact/reactiveCompact.js' )) : null const contextCollapse = feature('CONTEXT_COLLAPSE' ) ? (require ('./services/contextCollapse/index.js' ) as typeof import ('./services/contextCollapse/index.js' )) : null
模式 3:条件数组元素(工具/配置 DCE)
1 2 3 4 5 6 7 const FOREGROUND_529_RETRY_SOURCES = new Set<QuerySource>([ 'repl_main_thread' , 'sdk' , 'compact' , ...(feature('BASH_CLASSIFIER' ) ? (['bash_classifier' ] as const ) : []), ])
模式 4:入口级特性开关
1 2 3 4 5 6 if (feature('ABLATION_BASELINE' ) && process.env.CLAUDE_CODE_ABLATION_BASELINE) { for (const k of ['CLAUDE_CODE_SIMPLE' , 'CLAUDE_CODE_DISABLE_THINKING' , ...]) { process.env[k] ??= '1' } }
已知特性标志(50+)
类别
标志名
说明
核心功能
BRIDGE_MODE, COORDINATOR_MODE, VOICE_MODE
IDE 桥接、协调者模式、语音输入
模型增强
ULTRATHINK, REACTIVE_COMPACT, CONTEXT_COLLAPSE
超长思考、响应式压缩、上下文折叠
内部工具
KAIROS, KAIROS_BRIEF, BASH_CLASSIFIER
内部专属工具和分类器
实验功能
ABLATION_BASELINE, QUICK_SEARCH, TERMINAL_PANEL
A/B 测试、快速搜索、终端面板
基础设施
UDS_INBOX, MONITOR_TOOL, CCR_MIRROR
Unix 域套接字、监控工具、镜像
数据
PROMPT_CACHE_BREAK_DETECTION, TRANSCRIPT_CLASSIFIER
缓存击穿检测、转录分类
9.8 多层记忆化(Tiered Memoization)
设计目标
CLI 工具中大量操作是重复性 I/O (查找 git 根目录、解析 JSON、读取配置)。但简单的 lodash.memoize 有严重问题:无界缓存会导致内存泄漏 (曾出现 300MB+ 内存占用)。
三种记忆化原语
flowchart TD
subgraph TTL["memoizeWithTTL(时间失效)"]
direction LR
TTL1["首次调用"] -->|"缓存 miss"| TTL2["计算 + 缓存"]
TTL2 -->|"TTL 内"| TTL3["直接返回"]
TTL3 -->|"TTL 过期"| TTL4["返回旧值 + 后台刷新"]
end
subgraph TTLAsync["memoizeWithTTLAsync(异步 + 去重)"]
direction LR
TTLA1["并发请求 A"] -->|"缓存 miss"| TTLA2["发起计算"]
TTLA3["并发请求 B"] -->|"检查 inFlight"| TTLA2
TTLA2 --> TTLA4["共享同一个 Promise"]
end
subgraph LRU["memoizeWithLRU(空间有界)"]
direction LR
LRU1["缓存满"] -->|"淘汰最旧"| LRU2["LRU 驱逐"]
LRU2 --> LRU3["插入新条目"]
end
style TTL fill:#e3f2fd
style TTLAsync fill:#e8f5e9
style LRU fill:#fff3e0
核心实现
memoizeWithTTL —— 写入时刷新(Stale-While-Revalidate)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 export function memoizeWithTTL <Args extends unknown [], Result >( f: (...args: Args) => Result, cacheLifetimeMs: number = 5 * 60 * 1000, ): MemoizedFunction <Args , Result > { const cache = new Map<string , CacheEntry<Result>>() return (...args: Args): Result => { const key = jsonStringify(args) const cached = cache.get(key) if (!cached) { const value = f(...args) cache.set(key, { value, timestamp: Date .now(), refreshing: false }) return value } if (Date .now() - cached.timestamp > cacheLifetimeMs && !cached.refreshing) { cached.refreshing = true Promise .resolve().then(() => { const newValue = f(...args) if (cache.get(key) === cached) { cache.set(key, { value: newValue, timestamp: Date .now(), refreshing: false }) } }).catch(e => { if (cache.get(key) === cached) cache.delete(key) }) return cached.value } return cached.value } }
memoizeWithTTLAsync —— 防雷群(Thundering Herd Prevention)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 export function memoizeWithTTLAsync <Args extends unknown [], Result >( f: (...args: Args) => Promise <Result>, cacheLifetimeMs: number = 5 * 60 * 1000, ): ... { const cache = new Map<string , CacheEntry<Result>>() const inFlight = new Map<string , Promise <Result>>() return async (...args): Promise <Result> => { const key = jsonStringify(args) const existing = inFlight.get(key) if (existing) return existing const promise = f(...args) inFlight.set(key, promise) try { const value = await promise cache.set(key, { value, timestamp: Date .now(), refreshing: false }) return value } finally { inFlight.delete(key) } } }
memoizeWithLRU —— 有界缓存(防内存泄漏)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 export function memoizeWithLRU <Args extends unknown [], Result >( f: (...args: Args) => Result, cacheFn: (...args: Args) => string , maxCacheSize: number = 100, ): LRUMemoizedFunction <Args , Result > { const cache = new LRUCache<string , Result>({ max: maxCacheSize }) const memoized = (...args: Args): Result => { const key = cacheFn(...args) const cached = cache.get(key) if (cached !== undefined ) return cached const value = f(...args) cache.set(key, value) return value } memoized.cache = { clear: () => cache.clear(), size: () => cache.size, delete : (key ) => cache.delete(key), get : (key ) => cache.get(key), has: (key ) => cache.has(key), } return memoized }
使用场景分布
原语
适用场景
典型使用
memoizeWithTTL
配置类数据(变化慢)
设置读取、能力检查
memoizeWithTTLAsync
网络凭证(过期刷新)
AWS 凭证、GCP Token、OAuth
memoizeWithLRU
路径/解析类(调用频繁、参数多样)
findGitRoot、JSON 解析、Shell 前缀
9.9 弹性错误恢复(Resilient Error Recovery)
设计目标
与 LLM API 的通信面临多种失败模式:限流(429)、过载(529)、认证过期、上下文溢出。系统需要优雅降级 而不是直接崩溃。
多策略恢复架构
flowchart TD
APICall["API 调用"] --> Error{"错误类型?"}
Error -->|"429 限流"| Retry["指数退避重试 base × 2^attempt + jitter"]
Error -->|"529 过载"| Check529{"前台请求?"}
Check529 -->|"是"| Retry529["重试(最多 3 次)"]
Check529 -->|"否(后台任务)"| Bail["立即放弃 (防止级联放大)"]
Retry529 -->|"超过 3 次"| Fallback["模型降级 Opus → Sonnet"]
Error -->|"401 认证"| AuthRefresh["清除缓存 重新认证"]
Error -->|"413 上下文溢出"| Compact["触发 reactive compact 压缩后重试"]
Error -->|"fast-mode 拒绝"| FastDeg["Fast 模式降级 切换标准速度"]
Error -->|"连接错误"| ConnRetry["退避重试 禁用 Keep-Alive"]
subgraph Unattended["无人值守模式"]
PersistRetry["无限重试 max backoff 5min 心跳 30s"]
end
Retry --> PersistRetry
style Bail fill:#ffebee
style Fallback fill:#fff3e0
style FastDeg fill:#fff3e0
关键实现
指数退避 + 抖动
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 export function getRetryDelay ( attempt: number , retryAfterHeader?: string | null , maxDelayMs = 32000, ): number { if (retryAfterHeader) { const seconds = parseInt (retryAfterHeader, 10 ) if (!isNaN (seconds)) return seconds * 1000 } const baseDelay = Math .min(BASE_DELAY_MS * Math .pow(2 , attempt - 1 ), maxDelayMs) const jitter = Math .random() * 0.25 * baseDelay return baseDelay + jitter }
后台任务 529 抑制(防级联放大)
1 2 3 4 5 6 7 8 9 10 const FOREGROUND_529_RETRY_SOURCES = new Set<QuerySource>([ 'repl_main_thread' , 'sdk' , 'compact' , 'auto_mode' , ... ]) function shouldRetry529 (querySource: QuerySource | undefined ): boolean { return querySource === undefined || FOREGROUND_529_RETRY_SOURCES.has(querySource) }
无人值守心跳(防宿主超时)
1 2 3 4 5 6 7 8 9 let remaining = delayMswhile (remaining > 0 ) { if (options.signal?.aborted) throw new APIUserAbortError() yield createSystemAPIErrorMessage(error, remaining, reportedAttempt, maxRetries) const chunk = Math .min(remaining, HEARTBEAT_INTERVAL_MS) await sleep(chunk, options.signal, { abortError }) remaining -= chunk }
熔断器(Circuit Breaker)
1 2 3 4 5 6 7 const MAX_CONSECUTIVE_AUTOCOMPACT_FAILURES = 3 if (tracking?.consecutiveFailures >= MAX_CONSECUTIVE_AUTOCOMPACT_FAILURES) { return { wasCompacted: false } }
9.10 轻量事件系统(Lightweight Event Architecture)
设计目标
传统的 EventEmitter 对于大多数内部通知场景过于重量。Claude Code 实现了三层事件原语,按粒度选择:
三层抽象
flowchart TB
subgraph Layer1["第一层:createSignal (纯事件通知)"]
S1["subscribe(listener)"]
S2["emit(...args)"]
S3["clear()"]
S1 --- S2 --- S3
end
subgraph Layer2["第二层:createStore (可观察状态)"]
ST1["getState()"]
ST2["setState(updater)"]
ST3["subscribe(listener)"]
ST1 --- ST2 --- ST3
end
subgraph Layer3["第三层:EventEmitter (复杂 UI 事件)"]
E1["on(type, handler)"]
E2["emit(type, event)"]
E3["stopImmediatePropagation()"]
E1 --- E2 --- E3
end
Layer1 -->|"需要存储状态"| Layer2
Layer2 -->|"需要传播控制"| Layer3
style Layer1 fill:#e8f5e9
style Layer2 fill:#e3f2fd
style Layer3 fill:#fff3e0
createSignal —— 42 行实现的发布/订阅
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 export function createSignal <Args extends unknown [] = []>( ): Signal <Args > { const listeners = new Set<(...args: Args ) => void >() return { subscribe(listener) { listeners.add(listener) return () => { listeners.delete(listener) } }, emit(...args) { for (const listener of listeners) listener(...args) }, clear() { listeners.clear() }, } }
使用场景(20+ 处) :
1 2 3 4 5 6 7 8 9 10 11 const settingsChanged = createSignal<[SettingSource]>()export const onSettingsChanged = settingsChanged.subscribeconst fastModeChanged = createSignal()export const onFastModeChanged = fastModeChanged.subscribeconst skillChanged = createSignal<[string ]>()export const onSkillChanged = skillChanged.subscribe
createStore —— 35 行实现的状态容器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 export function createStore <T >(initialState: T, onChange?: OnChange<T> ): Store <T > { let state = initialState const listeners = new Set<Listener>() return { getState: () => state, setState: (updater: (prev: T ) => T ) => { const prev = state const next = updater(prev) if (Object .is(next, prev)) return state = next onChange?.({ newState: next, oldState: prev }) for (const listener of listeners) listener() }, subscribe: (listener: Listener ) => { listeners.add(listener) return () => listeners.delete(listener) }, } }
选择指南
需求
选择
理由
“某事发生了”,无状态
createSignal
最轻量,无内存开销
“状态变了”,需要当前值
createStore
有 getState(),支持 React 集成
复杂 UI 事件(键盘、鼠标)
EventEmitter
支持 stopImmediatePropagation()
9.11 上下文压缩系统(Context Compression)
设计目标
LLM 有上下文窗口限制。当对话变长时,需要自动压缩 以保持在限制内,同时尽可能保留重要信息。
多层压缩策略
flowchart LR
subgraph 策略["压缩策略(成本递增)"]
direction TB
M1["Micro-compact 清除旧工具结果 (零 API 调用)"]
M2["Session Memory Compact 基于记忆裁剪消息 (零 API 调用)"]
M3["Full Compact LLM 摘要整个对话 (1次 API 调用)"]
M4["Reactive Compact 响应 413 错误 (紧急压缩)"]
end
TokenCount["Token 计数"] -->|"接近阈值"| M1
M1 -->|"仍然超限"| M2
M2 -->|"仍然超限"| M3
API413["API 返回 413"] --> M4
style M1 fill:#e8f5e9
style M2 fill:#e3f2fd
style M3 fill:#fff3e0
style M4 fill:#ffebee
Micro-compact(最轻量) :选择性清除旧工具结果
1 2 3 4 5 6 7 8 const COMPACTABLE_TOOLS = new Set<string >([ FILE_READ_TOOL_NAME, ...SHELL_TOOL_NAMES, GREP_TOOL_NAME, GLOB_TOOL_NAME, WEB_SEARCH_TOOL_NAME, WEB_FETCH_TOOL_NAME, FILE_EDIT_TOOL_NAME, FILE_WRITE_TOOL_NAME, ])
Auto-compact 阈值计算 :
1 2 3 4 5 6 7 8 9 export function getAutoCompactThreshold (model: string ): number { const effectiveContextWindow = getEffectiveContextWindowSize(model) return effectiveContextWindow - AUTOCOMPACT_BUFFER_TOKENS } export function getEffectiveContextWindowSize (model: string ): number { return getContextWindowForModel(model) - Math .min(getMaxOutputTokens(model), 20 _000) }
熔断保护 :
1 2 3 4 5 6 const MAX_CONSECUTIVE_AUTOCOMPACT_FAILURES = 3 export const AUTOCOMPACT_BUFFER_TOKENS = 13 _000 export const WARNING_THRESHOLD_BUFFER_TOKENS = 20 _000 export const ERROR_THRESHOLD_BUFFER_TOKENS = 20 _000
9.12 设计模式总结
mindmap
root((工程实践))
启动优化
快速路径分发
零静态导入
分层拦截
并行预取
子进程与import重叠
延迟预取
核心架构
AsyncGenerator管道
生产者-消费者解耦
背压天然支持
工具即数据
buildTool工厂
平坦数组组合
依赖注入
typeof签名同步
精简4依赖
类型安全
DeepImmutable
编译期不可变
交集逃逸阀
构建时裁剪
feature门控
两层策略
运行时健壮
多层记忆化
TTL写入刷新
LRU防泄漏
异步去重
弹性恢复
指数退避+抖动
模型降级
熔断器
上下文压缩
多策略递进
阈值自适应
事件通信
createSignal
createStore
EventEmitter
核心原则提炼
#
模式
核心思想
关键收益
1
快速路径分发
为轻量请求提供捷径
–version 零加载,毫秒响应
2
并行预取
I/O 等待与 CPU 计算重叠
启动时间减少 ~65ms
3
AsyncGenerator
生产者-消费者的优雅实现
流式 UI 更新 + 背压控制
4
依赖注入
接口抽象外部依赖
测试零 spy 样板 + 类型安全
5
工具即数据
数据对象取代类继承
注册/过滤/排序/序列化全简化
6
深度不可变
编译期防止状态突变
零运行时开销 + 100% 安全
7
构建时裁剪
两层门控(构建+运行时)
外部构建无内部代码泄露
8
多层记忆化
TTL/LRU/异步去重三种原语
防内存泄漏 + 防雷群
9
弹性恢复
退避/降级/熔断/抑制
250K API 调用/天 → 0 浪费
10
轻量事件
按粒度选择 Signal/Store/Emitter
42 行代码覆盖 80% 场景
11
上下文压缩
多策略递进(成本递增)
对话无限长而不溢出
9.13 核心文件速查表
文件
职责
模式
src/entrypoints/cli.tsx
入口分发:快速路径 + 动态导入
快速路径
src/main.tsx
启动:并行预取 + 延迟加载
并行预取
src/query.ts
Agent 循环:AsyncGenerator 状态机
异步生成器
src/query/deps.ts
依赖注入容器(4 个 I/O 依赖)
DI
src/Tool.ts
工具类型定义 + buildTool() 工厂
工具即数据
src/tools.ts
工具注册表:平坦数组 + 条件组合
工具即数据
src/state/AppStateStore.ts
DeepImmutable<AppState> 定义
深度不可变
src/state/store.ts
createStore 可观察状态容器
轻量事件
src/utils/signal.ts
createSignal 发布/订阅原语
轻量事件
src/utils/memoize.ts
三种记忆化原语实现
记忆化
src/services/api/withRetry.ts
重试/退避/降级/熔断
弹性恢复
src/services/compact/autoCompact.ts
自动压缩阈值 + 熔断器
上下文压缩
src/services/compact/microCompact.ts
轻量压缩(清除工具结果)
上下文压缩
src/utils/settings/mdm/rawRead.ts
MDM 子进程并行读取
并行预取
src/utils/secureStorage/keychainPrefetch.ts
Keychain 并行预取
并行预取
src/utils/thinking.ts
feature() 两层门控示例
构建时裁剪
src/ink/events/emitter.ts
自定义 EventEmitter(传播控制)
轻量事件