Pi Agent 包架构
packages/agent/src/ 实现了 Pi 框架中的 Agent 运行时,用于构建工具增强型 LLM 对话代理(Tool-Augmented LLM Agent)。
整个包由五个模块组成:
| 文件 | 职责 | 行数 |
|---|---|---|
types.ts | 类型定义层 | ~400 |
agent-loop.ts | 无状态循环引擎 | ~700 |
agent.ts | 有状态封装层 | ~540 |
proxy.ts | 代理流式函数 | ~320 |
index.ts | 导出入口 | 8 |
架构总览
导出入口] subgraph "类型系统" Types[types.ts
AgentMessage / AgentEvent
AgentLoopConfig / AgentTool] end subgraph "管理层 agent.ts" Agent[Agent 类
状态管理 / 队列 / 并发 / 事件] Queue[PendingMessageQueue
steering / follow-up 队列] State[MutableAgentState
状态归约] end subgraph "引擎层 agent-loop.ts" Engine[LoopEngine
runAgentLoop / runLoop] LLMCall[streamAssistantResponse
LLM 调用] ToolExec[executeToolCalls
工具执行] end subgraph "HTTP 代理 proxy.ts" Proxy[streamProxy
代理流式函数] end end subgraph "LLM 核心库 @earendil-works/pi-ai" AI[streamSimple / Model
Message / Tool] end App --> Agent Agent --> Engine Agent --> Queue Agent --> State Engine --> LLMCall Engine --> ToolExec LLMCall --> AI Proxy -- 替代 streamFn --> Agent Agent -.-> Types Engine -.-> Types
分层设计
无状态引擎 vs 有状态封装
Agent 包的核心设计思想是将引擎逻辑与状态管理分离:
| 维度 | agent.ts | agent-loop.ts |
|---|---|---|
| 有无状态 | 有状态(类实例) | 无状态(纯函数) |
| 并发控制 | ActiveRun + AbortController | 通过 signal? 支持取消 |
| 消息注入 | PendingMessageQueue 队列 | getSteeringMessages 回调 |
| 事件消费 | processEvents 归约状态 + 通知 listener | emit 发出,不关心谁收 |
| 工具钩子 | 持有 beforeToolCall / afterToolCall | 在 prepare / finalize 中调用 |
| 复用性 | 需实例化,单次运行 | 可任意并发调用 |
类型系统(types.ts)
类型系统定义了五个核心类别:
1. 消息体系
user / assistant / toolResult"] subgraph "AgentMessage(统一消息)" AM[AgentMessage] CUSTOM["CustomAgentMessages[key]
(通过 declaration merging 扩展)"] end Message --> AM CUSTOM --> AM
AgentMessage 通过 TypeScript 的 declaration merging 实现扩展:
| |
2. 事件体系(13 种)
3. 配置体系(AgentLoopConfig)
继承自 SimpleStreamOptions,增加:
- 消息转换:
convertToLlm(AgentMessage[] → Message[])、transformContext(上下文变换) - 队列回调:
getSteeringMessages、getFollowUpMessages - 工具钩子:
beforeToolCall(可阻拦执行)、afterToolCall(可覆盖结果) - 停止判定:
shouldStopAfterTurn - 动态认证:
getApiKey(用于短期 OAuth token)
4. 工具体系(AgentTool)
| |
5. 状态体系(AgentState)
| |
无状态循环引擎(agent-loop.ts)
导出接口
| |
核心:双层循环(runLoop)
这是整个 Agent 包最核心的函数。
while true"} OuterCond -->|true| InnerCond{"内层循环
while hasMoreToolCalls
|| pendingMessages"} InnerCond -->|true| Steering["1. 注入 steering 消息
getSteeringMessages()"] Steering --> LLM["2. streamAssistantResponse()
调 LLM"] LLM --> StopCheck{"stopReason
error / aborted?"} StopCheck -->|是| EmitAgentEnd["emit agent_end
return"] StopCheck -->|否| ToolCheck{"3. 有 tool calls?"} ToolCheck -->|有| ToolExec["4. executeToolCalls()
串行或并行执行工具"] ToolExec --> Results["推入 tool result 到上下文"] Results --> TurnEnd["emit turn_end"] ToolCheck -->|无| TurnEnd TurnEnd --> StopHook{"5. shouldStopAfterTurn?"} StopHook -->|true| EmitAgentEnd StopHook -->|false| SteeringPoll["6. 取 steering 消息"] SteeringPoll --> InnerCond InnerCond -->|false| FollowUp{"外层:
getFollowUpMessages()
有消息?"} FollowUp -->|有| SetPending["pendingMessages = followUp"] SetPending --> OuterCond FollowUp -->|无| EmitAgentEnd2["emit agent_end
return"]
内层循环条件
内层持续的条件:有工具调用待处理 或 有 steering 消息待注入。
hasMoreToolCalls:当前 assistant 消息有 tool call,且并非全部工具都要求终止pendingMessages:steering 队列中有消息
外层循环条件
内层结束后检查 follow-up 队列,有则设为 pending 继续外层。
LLM 调用(streamAssistantResponse)
(可选)"] CL["convertToLlm()"] M["Message[]"] end subgraph "LLM 调用" LLM["streamSimple / streamFn"] Stream["流式事件流"] end subgraph "响应处理" Events["start / text_delta /
toolcall_delta / done / error"] Partial["构建 partial message
逐帧更新 context.messages"] end
函数逐项解读
以下逐一解读 agent-loop.ts 中各函数的设计细节。
agentLoop() / agentLoopContinue() — 公开入口(EventStream 封装)
| |
| 方面 | 说明 |
|---|---|
| 职责 | 为外部调用者提供流式封装,避免直接处理 emit 回调 |
| 实现 | 内部创建 EventStream,启动 runAgentLoop / runAgentLoopContinue,通过 stream.push(event) 输出事件,最终 stream.end(messages) |
| 终止条件 | createAgentStream() 指定 agent_end 事件作为流结束标志,此时 messages 作为最终结果 |
agentLoopContinue 校验 | 空上下文 / 最后一条为 assistant 角色 → 直接抛异常 |
runAgentLoop() / runAgentLoopContinue() — 异步底层入口
| 方面 | runAgentLoop | runAgentLoopContinue |
|---|---|---|
| 启动方式 | 新 prompt,复制上下文并在末尾追加 prompt 消息 | 复用当前上下文,不追加新消息 |
| 初始化事件 | agent_start → turn_start → 逐一 message_start/end 发射 prompt | agent_start → turn_start,无 prompt 事件 |
| 用途 | agent.prompt() 调用 | agent.continue() 调用 |
两者最终都汇聚到 runLoop()。
runLoop() — 核心双层循环
已在前面用流程图详细展示,这里补充源码级要点:
关键设计点:
| 要点 | 说明 |
|---|---|
| 变量位于闭包中 | currentContext 和 newMessages 是引用类型,runLoop 内部修改直接反映到调用方 |
pendingMessages 作为内外层桥梁 | 内层每次迭代末尾通过 getSteeringMessages() 拉取;外层通过 getFollowUpMessages() 拉取后赋值给 pendingMessages 并 continue 外层 |
firstTurn 标志 | 首个 turn 不发射 turn_start(已在入口函数中发射);后续 turn 需发射 |
| 终止路径 | ① stopReason 为 error/aborted → 立即终止;② shouldStopAfterTurn 返回 true → 终止;③ 外层无 follow-up → 正常退出 |
streamAssistantResponse() — LLM 调用与流式处理
处理流水线:
| 阶段 | 步骤 | 说明 |
|---|---|---|
| 1. 上下文变换 | config.transformContext? | 可选,对整个 AgentMessage[] 做预处理 |
| 2. 消息转换 | config.convertToLlm(messages) | Agent 世界 → LLM 世界的边界 |
| 3. 构造 LLM context | { systemPrompt, messages, tools } | 拼装成 Context 类型 |
| 4. 选择流函数 | streamFn ?? streamSimple | 允许外部注入自定义流函数(proxy、mock 等) |
| 5. 解析 API key | config.getApiKey? | 支持短期 OAuth token 动态获取 |
| 6. 发起 LLM 调用 | streamFunction(model, context, options) | 返回 EventStream |
| 7. 事件循环 | for await (const event of response) | 逐帧处理流式事件 |
事件循环核心逻辑:
start -> 推入 partialMessage 到 context.messages 末尾(占位),emit message_start
delta -> 原地替换 context.messages 最后一条,emit message_update
done/error -> 取最终消息替换占位,emit message_end
设计要点:
partialMessage是引用,每次 delta 事件原地更新同一对象,避免频繁深拷贝context.messages最后一条被用作"占位":start 推入,update 替换,end 确定- 若 start 事件从未到达(如空响应),则直接 push 最终消息
executeToolCalls() — 工具执行调度
调度逻辑:
- 从
assistantMessage.content中过滤出所有type === "toolCall" - 检查是否有工具标记了
executionMode: "sequential" config.toolExecution === "sequential"或存在串行工具 ->executeToolCallsSequential- 否则 ->
executeToolCallsParallel
executeToolCallsSequential() — 串行执行
对每个 tool call 逐个:
emit tool_execution_start
-> prepareToolCall()
-> 若 immediate:直接记 error result
-> 若 prepared:executePreparedToolCall() -> finalizeExecutedToolCall()
-> emit tool_execution_end
-> createToolResultMessage + emit message_start/end
-> 全部完毕后计算 shouldTerminateToolBatch
- 每一步都严格串行:prepare -> execute -> finalize -> 下一个
executeToolCallsParallel() — 并行执行
两阶段设计:
| 阶段 | 行为 |
|---|---|
| Phase 1(串行 prepare) | 逐个遍历 tool calls,执行 prepareToolCall()。immediate 结果直接记入;prepared 结果包装为 lazy thunk |
| Phase 2(并发 execute) | Promise.all() 并发执行所有 thunk(execute + finalize) |
| 结果排序 | 按 assistant 原始 tool call 顺序排序(非完成顺序) |
| 终止判定 | 全部完成后统一计算 shouldTerminateToolBatch |
prepareToolCall() — 工具调用前置管道
toolCall -> 1. 查找工具定义(context.tools.find())
-> 不存在 -> return { kind: 'immediate', error }
-> 找到 -> 2. prepareArguments() 参数兼容转换
-> 3. validateToolArguments() Schema 校验
-> 4. beforeToolCall 钩子
-> block -> return { kind: 'immediate', error }
-> 允许 -> return { kind: 'prepared', tool, args }
| 步骤 | 错误处理 |
|---|---|
| 工具不存在 | error: “Tool xxx not found” |
prepareArguments() 抛错 | catch -> immediate error |
validateToolArguments() 抛错 | catch -> immediate error |
beforeToolCall 返回 block | immediate error,原因由钩子指定 |
executePreparedToolCall() — 工具执行
- 调用工具自带的
execute(),传入signal支持取消 - 第四个参数是
onProgress回调,工具可在执行中多次调用来发射tool_execution_update - 执行抛错 -> catch 后生成 error tool result(永不崩溃)
finalizeExecutedToolCall() — 工具后处理
afterToolCall钩子可以覆盖工具执行的任何输出字段(content / details / terminate / isError)- 钩子本身抛错 -> catch 后覆盖为 error tool result(防止钩子崩溃波及主流程)
辅助函数一览
| 函数 | 职责 |
|---|---|
createAgentStream() | 创建 EventStream,以 agent_end 为终止信号,提取 messages 为最终结果 |
createErrorToolResult(msg) | 生成标准错误格式的 AgentToolResult |
shouldTerminateToolBatch(calls) | 所有 finalized call 的 result.terminate === true 时才返回 true |
prepareToolCallArguments(tool, tc) | 若工具定义 prepareArguments,对参数做兼容转换 |
createToolResultMessage(finalized) | 从 finalized outcome 构建 ToolResultMessage |
emitToolExecutionEnd(finalized, emit) | 发射 tool_execution_end 事件 |
emitToolResultMessage(msg, emit) | 将 tool result 作为消息发射 message_start/end |
AM --> TC --> CL --> M --> LLM --> Stream --> Events --> Partial
### 工具执行(executeToolCalls)
```mermaid
flowchart TD
TC[tool calls 列表] --> Mode{"配置 + 工具标记<br/>sequential 还是 parallel?"}
Mode -->|sequential| Seq["executeToolCallsSequential()<br/>逐个执行"]
Mode -->|parallel| Par["executeToolCallsParallel()<br/>prepare 串行 → execute 并发"]
subgraph "每个工具的完整生命周期"
Prep["prepareToolCall()"]
Exec["executePreparedToolCall()"]
Final["finalizeExecutedToolCall()"]
end
Prep --> Exec --> Final
subgraph "prepareToolCall 细节"
Find["1. 查找工具定义"]
PrepArgs["2. prepareArguments()<br/>参数兼容"]
Validate["3. validateToolArguments()<br/>Schema 校验"]
Before["4. beforeToolCall 钩子<br/>可 block 执行"]
end
Find --> PrepArgs --> Validate --> Before
串行 vs 并行策略
| 方面 | 串行(sequential) | 并行(parallel) |
|---|---|---|
| preflight | 逐工具 prepare → execute → finalize | 全部 prepare(串行),然后全部 execute(并发) |
| 结果顺序 | 按执行顺序 | 按 assistant 原始 tool call 顺序 |
| 终止判定 | 每个工具独立判定 | 批量判定(全部 terminate 才终止) |
| 适用场景 | 工具间有依赖 / executionMode: "sequential" | 工具间无依赖,追求速度 |
终止语义(terminate)
- 仅当批次内所有工具的
result.terminate === true时,循环才会终止 - 单一工具返回
terminate: true不会立即终止 - 这是设计选择:避免一个工具意外终止整个 agent
有状态封装层(agent.ts)
Agent 类结构
消息队列设计
mode: one-at-a-time / all"] Q2["followUpQueue
mode: one-at-a-time / all"] end subgraph "agent-loop 回调" GS["getSteeringMessages()
→ 内层每次迭代"] GF["getFollowUpMessages()
→ 内层结束后"] end S1 --> Q1 S2 --> Q1 F1 --> Q2 F2 --> Q2 Q1 --> GS Q2 --> GF
drain() 模式:
| 模式 | drain() 行为 | 场景 |
|---|---|---|
"one-at-a-time"(默认) | 返回队列第一条,从队列移除 | 用户快速打字,每条消息独立处理 |
"all" | 返回全部并清空 | 批量注入,多条消息合并处理 |
事件归约(processEvents)
类似 Redux reducer,接收 agent-loop 的 emit 事件,更新 _state:
| 事件 | 状态变更 |
|---|---|
message_start | streamingMessage = event.message |
message_update | streamingMessage = updated |
message_end | streamingMessage = undefined,推入 messages[] |
tool_execution_start | 新建 Set,添加 toolCallId |
tool_execution_end | 新建 Set,删除 toolCallId |
turn_end | 如有 errorMessage,更新 _state.errorMessage |
agent_end | streamingMessage = undefined |
函数逐项解读
以下逐一解读 agent.ts 中各成员的设计细节。
subscribe() — 事件订阅系统
| 特性 | 说明 |
|---|---|
| 存储 | Set<(event, signal) => void>,保证去重 |
| 执行顺序 | 按订阅顺序依次 await(同步执行 + 异步推进) |
| 并发信号 | 回调接收当前 run 的 AbortSignal,可用于响应取消 |
| 解注册 | 返回 () => void 取消函数 |
| 生命周期 | agent_end 是最后一个事件,但 agent 不会 idle 直到所有 listener 处理完 agent_end |
内部调用方式(在 processEvents 中):
| |
prompt() — 发起对话
| |
流程:
prompt(input, images?)
-> normalizePromptInput() 统一成 AgentMessage[]
-> 检查 activeRun?存在 -> throw Error
-> runPromptMessages(messages)
-> runWithLifecycle()
-> createContextSnapshot()
-> createLoopConfig()
-> runAgentLoop(..., emit -> processEvents)
normalizePromptInput() 规则:
| 输入 | 输出 |
|---|---|
AgentMessage[] | 透传 |
AgentMessage | [message] |
string | [{ role: "user", content: [{ type: "text", text: input }], timestamp }] |
string + images | 同上,content 追加 images |
continue() — 继续对话
逻辑分支:
continue()
-> activeRun 存在? -> throw Error
-> 最后一条消息角色?
-> 无消息 -> throw Error
-> assistant -> steeringQueue 有消息?
-> 有 -> drain() -> runPromptMessages(steering, skipInitialSteeringPoll=true)
-> 无 -> followUpQueue 有消息?
-> 有 -> drain() -> runPromptMessages(followUps)
-> 无 -> throw Error "Cannot continue from assistant"
-> user/toolResult -> runContinuation() -> runAgentLoopContinue()
steer() / followUp() — 消息队列
| 方法 | 注入时机 | 对应回调 | 用途 |
|---|---|---|---|
steer() | 内层每次迭代前 | getSteeringMessages() | 用户想"插话"、外部事件推送 |
followUp() | 内层结束后、agent 停止前 | getFollowUpMessages() | 用户确认、后台任务结果 |
PendingMessageQueue 的 drain() 模式通过 QueueMode 控制:
| 模式 | drain() 行为 | 场景 |
|---|---|---|
"one-at-a-time"(默认) | 返回队列第一条,从队列移除 | 用户快速打字,每条消息独立处理 |
"all" | 返回全部并清空 | 批量注入,多条消息合并处理 |
abort() / waitForIdle() / reset() — 生命周期控制
| 方法 | 行为 |
|---|---|
abort() | activeRun.abortController.abort() -> LLM 调用和工具执行收到取消信号 |
waitForIdle() | 返回 activeRun.promise,在 finishRun() 中被 resolve |
reset() | 清空 messages / isStreaming / pendingToolCalls / errorMessage,并清空两个队列 |
runWithLifecycle() — 并发控制核心
runWithLifecycle(executor)
-> activeRun 存在? -> throw Error
-> 创建 AbortController + Promise
-> activeRun = { promise, resolve, abortController }
-> isStreaming = true
-> try { executor(signal) }
-> 成功 -> finally { finishRun() }
-> 失败 -> handleRunFailure(error, aborted) -> finally { finishRun() }
| 状态变更 | 时机 |
|---|---|
isStreaming = true | run 开始时 |
activeRun = { promise, resolve, abortController } | run 开始时 |
isStreaming = false | finishRun() 中 |
activeRun = undefined | finishRun() 中 |
handleRunFailure() — 失败处理
将错误编码为一条 stopReason: "error" / "aborted" 的 AgentMessage 并推入对话历史:
| 场景 | stopReason |
|---|---|
外部调用 abort() | "aborted" |
| LLM 调用抛错、工具执行抛错等 | "error" |
| |
所有失败都被编码为正常的 AgentMessage 推入对话历史,应用层可通过 subscribe() 监听到 agent_end 事件。
finishRun() — 运行终结
| |
清理 isStreaming、streamingMessage、pendingToolCalls,兑现 waitForIdle() 的 Promise,清除 activeRun 以允许下一次调用。
processEvents() — 事件归约(状态机)
类似 Redux reducer,接收 AgentEvent 并更新 _state:
| 事件 | 状态变更 |
|---|---|
message_start | streamingMessage = event.message |
message_update | streamingMessage = updated |
message_end | streamingMessage = undefined,推入 messages[] |
tool_execution_start | 新建 Set,添加 toolCallId |
tool_execution_end | 新建 Set,删除 toolCallId |
turn_end | 如有 errorMessage,更新 _state.errorMessage |
agent_end | streamingMessage = undefined |
然后按订阅顺序依次 await 所有 listener。
不可变性保证: pendingToolCalls 每次更新都新建 Set,外部通过 AgentState.pendingToolCalls 只读到 ReadonlySet,防止篡改。
createContextSnapshot() / createLoopConfig() — 适配层
| 函数 | 作用 |
|---|---|
createContextSnapshot() | 将 _state 转换为 AgentContext(无状态引擎入参),messages 和 tools 做浅拷贝 |
createLoopConfig() | 将 Agent 的配置和钩子映射为 AgentLoopConfig,同时将 steeringQueue / followUpQueue 包装为 getSteeringMessages / getFollowUpMessages 回调 |
然后按订阅顺序依次 await 所有 listener。
并发控制(ActiveRun)
resolve promise Agent-->>App: waitForIdle 完成 else 异常 Engine-->>Agent: throw error Agent->>Agent: handleRunFailure(error) Note over Agent: 生成 error 消息
emit agent_end Agent->>Agent: finishRun() else 外部取消 App->>Agent: abort() Agent->>Agent: abortController.abort() Engine-->>Agent: 检测 signal.aborted Agent->>Agent: handleRunFailure(error, aborted=true) end
完整调用链
HTTP 代理(proxy.ts)
streamProxy 是一个替代 streamSimple 的流式函数,用于需要经过代理服务器的场景。
Authorization: Bearer {token}
Body: { model, context, options } Server->>LLM: 转发请求 loop SSE 流式响应 LLM-->>Server: text_delta / toolcall_delta Server-->>Proxy: data: { type: "text_delta", delta: "..." } Note over Proxy: 重建 partial message Proxy-->>Agent: push(text_delta, partial) end Server-->>Proxy: data: { type: "done", reason: "stop", usage } Proxy-->>Agent: push(done, finalMessage) Proxy-->>Agent: end()
关键设计:
- 服务器端去掉
partial字段以减少带宽 - 客户端根据事件序列重建完整的
AssistantMessage - 支持 text / thinking / toolcall 所有三种内容类型
- 异常时生成
stopReason: "error" / "aborted"的消息,不抛异常
容错设计
Agent 包的设计哲学是永不崩溃——所有可能的失败都被捕获并编码为正常事件流:
| 失败场景 | 处理方式 |
|---|---|
| LLM 调用失败 | stopReason: "error", errorMessage: 描述 |
| 工具不存在 | 生成 error tool result: "Tool xxx not found" |
| 参数校验失败 | catch 后生成 error tool result |
beforeToolCall 阻拦 | 生成 error tool result: "Tool execution was blocked" |
afterToolCall 抛错 | catch 后覆盖为 error tool result |
transformContext 抛错 | 通过 runWithLifecycle 的 catch → handleRunFailure |
convertToLlm 抛错 | 同上 |
| 外部取消 | stopReason: "aborted" |
| 工具执行抛错 | catch → error tool result |
所有错误最终都以 AgentEvent 的形式通过 emit 输出,应用层可以通过 subscribe() 统一处理。
使用示例
基础用法
| |
自定义消息类型
| |
使用代理
| |
设计决策
为什么分离引擎层和管理层?
- 可测试性:
agent-loop.ts是纯函数,无需 mock 状态即可单元测试 - 复用性:无状态引擎可以被不同的管理层复用
- 关注点分离:引擎只管循环逻辑,管理只管状态和并发
为什么用双层循环(steering + follow-up)?
- steering:用户想在 agent 工作过程中"插话",需要立即处理
- follow-up:用户想等 agent 完成后再补充,不打断当前工作流
- 双层结构让这两种场景各得其所
为什么工具终止需要全部一致?
- 防止一个工具意外终止整个 agent
- 只有所有工具都明确要求终止时才生效
- 调用方可以通过
afterToolCall钩子调整terminate标志
为什么用 Set + 替换而非直接 mutable?
AgentState.pendingToolCalls对外是ReadonlySet,保证状态不可变性- 内部每次更新都新建 Set,防止外部持有引用后篡改