Pi Agent 包架构

packages/agent/src/ 实现了 Pi 框架中的 Agent 运行时,用于构建工具增强型 LLM 对话代理(Tool-Augmented LLM Agent)。

整个包由五个模块组成:

文件职责行数
types.ts类型定义层~400
agent-loop.ts无状态循环引擎~700
agent.ts有状态封装层~540
proxy.ts代理流式函数~320
index.ts导出入口8

架构总览

graph TD App[外部应用] subgraph "Agent 包" Index[index.ts
导出入口] subgraph "类型系统" Types[types.ts
AgentMessage / AgentEvent
AgentLoopConfig / AgentTool] end subgraph "管理层 agent.ts" Agent[Agent 类
状态管理 / 队列 / 并发 / 事件] Queue[PendingMessageQueue
steering / follow-up 队列] State[MutableAgentState
状态归约] end subgraph "引擎层 agent-loop.ts" Engine[LoopEngine
runAgentLoop / runLoop] LLMCall[streamAssistantResponse
LLM 调用] ToolExec[executeToolCalls
工具执行] end subgraph "HTTP 代理 proxy.ts" Proxy[streamProxy
代理流式函数] end end subgraph "LLM 核心库 @earendil-works/pi-ai" AI[streamSimple / Model
Message / Tool] end App --> Agent Agent --> Engine Agent --> Queue Agent --> State Engine --> LLMCall Engine --> ToolExec LLMCall --> AI Proxy -- 替代 streamFn --> Agent Agent -.-> Types Engine -.-> Types

分层设计

无状态引擎 vs 有状态封装

Agent 包的核心设计思想是将引擎逻辑状态管理分离:

graph LR subgraph "agent.ts(管理层)" A1[持有 state / listeners / queues] A2[并发控制 ActiveRun] A3[事件归约 processEvents] A4[API: prompt / steer / abort] end subgraph "agent-loop.ts(引擎层)" E1[纯函数] E2[双层循环 runLoop] E3[LLM 调用 + 工具执行] E4[通过 emit 输出事件] end A1 -.->|createContextSnapshot| E1 A4 -->|runPromptMessages| E2 E4 -->|emit event| A3
维度agent.tsagent-loop.ts
有无状态有状态(类实例)无状态(纯函数)
并发控制ActiveRun + AbortController通过 signal? 支持取消
消息注入PendingMessageQueue 队列getSteeringMessages 回调
事件消费processEvents 归约状态 + 通知 listeneremit 发出,不关心谁收
工具钩子持有 beforeToolCall / afterToolCall在 prepare / finalize 中调用
复用性需实例化,单次运行可任意并发调用

类型系统(types.ts)

类型系统定义了五个核心类别:

1. 消息体系

graph TD Message["@pi-ai Message
user / assistant / toolResult"] subgraph "AgentMessage(统一消息)" AM[AgentMessage] CUSTOM["CustomAgentMessages[key]
(通过 declaration merging 扩展)"] end Message --> AM CUSTOM --> AM

AgentMessage 通过 TypeScript 的 declaration merging 实现扩展:

1
2
3
4
5
6
7
8
// 应用层扩展自定义消息类型
declare module "@mariozechner/agent" {
  interface CustomAgentMessages {
    artifact: ArtifactMessage;
    notification: NotificationMessage;
  }
}
// AgentMessage 自动包含 Message | ArtifactMessage | NotificationMessage

2. 事件体系(13 种)

graph LR subgraph "Agent 生命周期" AS[agent_start] AE[agent_end] end subgraph "Turn 生命周期" TS[turn_start] TE[turn_end] end subgraph "消息生命周期" MS[message_start] MU[message_update] ME[message_end] end subgraph "工具执行生命周期" TES[tool_execution_start] TEU[tool_execution_update] TEE[tool_execution_end] end AS --> TS --> MS --> MU --> ME --> TES --> TEU --> TEE --> TE --> AE

3. 配置体系(AgentLoopConfig)

继承自 SimpleStreamOptions,增加:

  • 消息转换convertToLlm(AgentMessage[] → Message[])、transformContext(上下文变换)
  • 队列回调getSteeringMessagesgetFollowUpMessages
  • 工具钩子beforeToolCall(可阻拦执行)、afterToolCall(可覆盖结果)
  • 停止判定shouldStopAfterTurn
  • 动态认证getApiKey(用于短期 OAuth token)

4. 工具体系(AgentTool)

1
2
3
4
5
6
interface AgentTool<TParameters, TDetails> extends Tool<TParameters> {
  label: string;                          // UI 显示名称
  prepareArguments?: (args: unknown) => Static<TParameters>; // 参数兼容
  execute: (...) => Promise<AgentToolResult<TDetails>>;      // 执行
  executionMode?: "sequential" | "parallel";  // 单工具执行模式覆盖
}

5. 状态体系(AgentState)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
interface AgentState {
  systemPrompt: string;           // 系统提示词
  model: Model<any>;              // 当前模型
  thinkingLevel: ThinkingLevel;   // 推理级别
  tools: AgentTool<any>[];        // 工具列表(setter 自动 slice 拷贝)
  messages: AgentMessage[];       // 对话记录(setter 自动 slice 拷贝)
  readonly isStreaming: boolean;  // 是否正在运行
  readonly streamingMessage?: AgentMessage;  // 当前流式消息
  readonly pendingToolCalls: ReadonlySet<string>;  // 执行中的工具
  readonly errorMessage?: string; // 最近错误
}

无状态循环引擎(agent-loop.ts)

导出接口

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
// 新 prompt 入口 → EventStream(流式封装)
function agentLoop(prompts, context, config, signal?, streamFn?):
  EventStream<AgentEvent, AgentMessage[]>

// 继续入口 → EventStream
function agentLoopContinue(context, config, signal?, streamFn?):
  EventStream<AgentEvent, AgentMessage[]>

// 底层异步版(被 agent.ts 调用)
function runAgentLoop(prompts, context, config, emit, signal?, streamFn?):
  Promise<AgentMessage[]>

function runAgentLoopContinue(context, config, emit, signal?, streamFn?):
  Promise<AgentMessage[]>

核心:双层循环(runLoop)

这是整个 Agent 包最核心的函数

flowchart TD Start(["runLoop 开始"]) --> OuterCond{"外层循环
while true"} OuterCond -->|true| InnerCond{"内层循环
while hasMoreToolCalls
|| pendingMessages"} InnerCond -->|true| Steering["1. 注入 steering 消息
getSteeringMessages()"] Steering --> LLM["2. streamAssistantResponse()
调 LLM"] LLM --> StopCheck{"stopReason
error / aborted?"} StopCheck -->|是| EmitAgentEnd["emit agent_end
return"] StopCheck -->|否| ToolCheck{"3. 有 tool calls?"} ToolCheck -->|有| ToolExec["4. executeToolCalls()
串行或并行执行工具"] ToolExec --> Results["推入 tool result 到上下文"] Results --> TurnEnd["emit turn_end"] ToolCheck -->|无| TurnEnd TurnEnd --> StopHook{"5. shouldStopAfterTurn?"} StopHook -->|true| EmitAgentEnd StopHook -->|false| SteeringPoll["6. 取 steering 消息"] SteeringPoll --> InnerCond InnerCond -->|false| FollowUp{"外层:
getFollowUpMessages()
有消息?"} FollowUp -->|有| SetPending["pendingMessages = followUp"] SetPending --> OuterCond FollowUp -->|无| EmitAgentEnd2["emit agent_end
return"]

内层循环条件

内层持续的条件:有工具调用待处理有 steering 消息待注入

  • hasMoreToolCalls:当前 assistant 消息有 tool call,且并非全部工具都要求终止
  • pendingMessages:steering 队列中有消息

外层循环条件

内层结束后检查 follow-up 队列,有则设为 pending 继续外层。

LLM 调用(streamAssistantResponse)

flowchart LR subgraph "消息转换链" AM["AgentMessage[]"] TC["transformContext()
(可选)"] CL["convertToLlm()"] M["Message[]"] end subgraph "LLM 调用" LLM["streamSimple / streamFn"] Stream["流式事件流"] end subgraph "响应处理" Events["start / text_delta /
toolcall_delta / done / error"] Partial["构建 partial message
逐帧更新 context.messages"] end

函数逐项解读

以下逐一解读 agent-loop.ts 中各函数的设计细节。


agentLoop() / agentLoopContinue() — 公开入口(EventStream 封装)

1
2
function agentLoop(prompts, context, config, signal?, streamFn?): EventStream<AgentEvent, AgentMessage[]>
function agentLoopContinue(context, config, signal?, streamFn?): EventStream<AgentEvent, AgentMessage[]>
方面说明
职责为外部调用者提供流式封装,避免直接处理 emit 回调
实现内部创建 EventStream,启动 runAgentLoop / runAgentLoopContinue,通过 stream.push(event) 输出事件,最终 stream.end(messages)
终止条件createAgentStream() 指定 agent_end 事件作为流结束标志,此时 messages 作为最终结果
agentLoopContinue 校验空上下文 / 最后一条为 assistant 角色 → 直接抛异常

runAgentLoop() / runAgentLoopContinue() — 异步底层入口

方面runAgentLooprunAgentLoopContinue
启动方式新 prompt,复制上下文并在末尾追加 prompt 消息复用当前上下文,不追加新消息
初始化事件agent_startturn_start → 逐一 message_start/end 发射 promptagent_startturn_start,无 prompt 事件
用途agent.prompt() 调用agent.continue() 调用

两者最终都汇聚到 runLoop()


runLoop() — 核心双层循环

已在前面用流程图详细展示,这里补充源码级要点:

关键设计点:

要点说明
变量位于闭包中currentContextnewMessages 是引用类型,runLoop 内部修改直接反映到调用方
pendingMessages 作为内外层桥梁内层每次迭代末尾通过 getSteeringMessages() 拉取;外层通过 getFollowUpMessages() 拉取后赋值给 pendingMessagescontinue 外层
firstTurn 标志首个 turn 不发射 turn_start(已在入口函数中发射);后续 turn 需发射
终止路径① stopReason 为 error/aborted → 立即终止;② shouldStopAfterTurn 返回 true → 终止;③ 外层无 follow-up → 正常退出

streamAssistantResponse() — LLM 调用与流式处理

处理流水线:

阶段步骤说明
1. 上下文变换config.transformContext?可选,对整个 AgentMessage[] 做预处理
2. 消息转换config.convertToLlm(messages)Agent 世界 → LLM 世界的边界
3. 构造 LLM context{ systemPrompt, messages, tools }拼装成 Context 类型
4. 选择流函数streamFn ?? streamSimple允许外部注入自定义流函数(proxy、mock 等)
5. 解析 API keyconfig.getApiKey?支持短期 OAuth token 动态获取
6. 发起 LLM 调用streamFunction(model, context, options)返回 EventStream
7. 事件循环for await (const event of response)逐帧处理流式事件

事件循环核心逻辑:

start   -> 推入 partialMessage 到 context.messages 末尾(占位),emit message_start
delta   -> 原地替换 context.messages 最后一条,emit message_update
done/error -> 取最终消息替换占位,emit message_end

设计要点:

  • partialMessage 是引用,每次 delta 事件原地更新同一对象,避免频繁深拷贝
  • context.messages 最后一条被用作"占位":start 推入,update 替换,end 确定
  • 若 start 事件从未到达(如空响应),则直接 push 最终消息

executeToolCalls() — 工具执行调度

调度逻辑:

  1. assistantMessage.content 中过滤出所有 type === "toolCall"
  2. 检查是否有工具标记了 executionMode: "sequential"
  3. config.toolExecution === "sequential" 或存在串行工具 -> executeToolCallsSequential
  4. 否则 -> executeToolCallsParallel

executeToolCallsSequential() — 串行执行

对每个 tool call 逐个:
  emit tool_execution_start
  -> prepareToolCall()
    -> 若 immediate:直接记 error result
    -> 若 prepared:executePreparedToolCall() -> finalizeExecutedToolCall()
  -> emit tool_execution_end
  -> createToolResultMessage + emit message_start/end
-> 全部完毕后计算 shouldTerminateToolBatch
  • 每一步都严格串行:prepare -> execute -> finalize -> 下一个

executeToolCallsParallel() — 并行执行

两阶段设计:

阶段行为
Phase 1(串行 prepare)逐个遍历 tool calls,执行 prepareToolCall()。immediate 结果直接记入;prepared 结果包装为 lazy thunk
Phase 2(并发 execute)Promise.all() 并发执行所有 thunk(execute + finalize)
结果排序按 assistant 原始 tool call 顺序排序(非完成顺序)
终止判定全部完成后统一计算 shouldTerminateToolBatch

prepareToolCall() — 工具调用前置管道

toolCall -> 1. 查找工具定义(context.tools.find())
           -> 不存在 -> return { kind: 'immediate', error }
           -> 找到 -> 2. prepareArguments() 参数兼容转换
                  -> 3. validateToolArguments() Schema 校验
                  -> 4. beforeToolCall 钩子
                    -> block -> return { kind: 'immediate', error }
                    -> 允许 -> return { kind: 'prepared', tool, args }
步骤错误处理
工具不存在error: “Tool xxx not found”
prepareArguments() 抛错catch -> immediate error
validateToolArguments() 抛错catch -> immediate error
beforeToolCall 返回 blockimmediate error,原因由钩子指定

executePreparedToolCall() — 工具执行

  • 调用工具自带的 execute(),传入 signal 支持取消
  • 第四个参数是 onProgress 回调,工具可在执行中多次调用来发射 tool_execution_update
  • 执行抛错 -> catch 后生成 error tool result(永不崩溃)

finalizeExecutedToolCall() — 工具后处理

  • afterToolCall 钩子可以覆盖工具执行的任何输出字段(content / details / terminate / isError)
  • 钩子本身抛错 -> catch 后覆盖为 error tool result(防止钩子崩溃波及主流程)

辅助函数一览

函数职责
createAgentStream()创建 EventStream,以 agent_end 为终止信号,提取 messages 为最终结果
createErrorToolResult(msg)生成标准错误格式的 AgentToolResult
shouldTerminateToolBatch(calls)所有 finalized call 的 result.terminate === true 时才返回 true
prepareToolCallArguments(tool, tc)若工具定义 prepareArguments,对参数做兼容转换
createToolResultMessage(finalized)从 finalized outcome 构建 ToolResultMessage
emitToolExecutionEnd(finalized, emit)发射 tool_execution_end 事件
emitToolResultMessage(msg, emit)将 tool result 作为消息发射 message_start/end
AM --> TC --> CL --> M --> LLM --> Stream --> Events --> Partial

### 工具执行(executeToolCalls)

```mermaid
flowchart TD
    TC[tool calls 列表] --> Mode{"配置 + 工具标记<br/>sequential 还是 parallel?"}

    Mode -->|sequential| Seq["executeToolCallsSequential()<br/>逐个执行"]
    Mode -->|parallel| Par["executeToolCallsParallel()<br/>prepare 串行 → execute 并发"]

    subgraph "每个工具的完整生命周期"
        Prep["prepareToolCall()"]
        Exec["executePreparedToolCall()"]
        Final["finalizeExecutedToolCall()"]
    end

    Prep --> Exec --> Final

    subgraph "prepareToolCall 细节"
        Find["1. 查找工具定义"]
        PrepArgs["2. prepareArguments()<br/>参数兼容"]
        Validate["3. validateToolArguments()<br/>Schema 校验"]
        Before["4. beforeToolCall 钩子<br/>可 block 执行"]
    end

    Find --> PrepArgs --> Validate --> Before

串行 vs 并行策略

方面串行(sequential)并行(parallel)
preflight逐工具 prepare → execute → finalize全部 prepare(串行),然后全部 execute(并发)
结果顺序按执行顺序按 assistant 原始 tool call 顺序
终止判定每个工具独立判定批量判定(全部 terminate 才终止)
适用场景工具间有依赖 / executionMode: "sequential"工具间无依赖,追求速度

终止语义(terminate

  • 仅当批次内所有工具的 result.terminate === true 时,循环才会终止
  • 单一工具返回 terminate: true 不会立即终止
  • 这是设计选择:避免一个工具意外终止整个 agent

有状态封装层(agent.ts)

Agent 类结构

classDiagram class Agent { -MutableAgentState _state -Set~Listener~ listeners -PendingMessageQueue steeringQueue -PendingMessageQueue followUpQueue -ActiveRun activeRun +StreamFn streamFn +convertToLlm() +beforeToolCall() +afterToolCall() +subscribe(listener) +prompt(input) +continue() +steer(msg) +followUp(msg) +abort() +reset() +waitForIdle() -runPromptMessages() -runContinuation() -createLoopConfig() -runWithLifecycle() -processEvents(event) -handleRunFailure(error) -finishRun() } class PendingMessageQueue { -AgentMessage[] messages +mode: QueueMode +enqueue(msg) +drain() AgentMessage[] +clear() } class ActiveRun { +Promise~void~ promise +resolve() +AbortController abortController } Agent *-- PendingMessageQueue Agent *-- ActiveRun

消息队列设计

flowchart LR subgraph "steer() 注入" S1["用户输入中间消息"] S2["外部事件推送"] end subgraph "followUp() 注入" F1["用户最终确认"] F2["后台任务结果"] end subgraph "PendingMessageQueue" Q1["steeringQueue
mode: one-at-a-time / all"] Q2["followUpQueue
mode: one-at-a-time / all"] end subgraph "agent-loop 回调" GS["getSteeringMessages()
→ 内层每次迭代"] GF["getFollowUpMessages()
→ 内层结束后"] end S1 --> Q1 S2 --> Q1 F1 --> Q2 F2 --> Q2 Q1 --> GS Q2 --> GF

drain() 模式:

模式drain() 行为场景
"one-at-a-time"(默认)返回队列第一条,从队列移除用户快速打字,每条消息独立处理
"all"返回全部并清空批量注入,多条消息合并处理

事件归约(processEvents)

类似 Redux reducer,接收 agent-loop 的 emit 事件,更新 _state

stateDiagram-v2 [*] --> 空闲: reset / finishRun 空闲 --> 流式: message_start 流式 --> 流式: message_update 流式 --> 有消息: message_end 流式 --> 有消息: agent_end 有消息 --> 执行工具: tool_execution_start 执行工具 --> 执行工具: tool_execution_update 执行工具 --> 有消息: tool_execution_end 有消息 --> 空闲: turn_end + shouldStop 有消息 --> 流式: 继续下一轮
事件状态变更
message_startstreamingMessage = event.message
message_updatestreamingMessage = updated
message_endstreamingMessage = undefined,推入 messages[]
tool_execution_start新建 Set,添加 toolCallId
tool_execution_end新建 Set,删除 toolCallId
turn_end如有 errorMessage,更新 _state.errorMessage
agent_endstreamingMessage = undefined

函数逐项解读

以下逐一解读 agent.ts 中各成员的设计细节。


subscribe() — 事件订阅系统

特性说明
存储Set<(event, signal) => void>,保证去重
执行顺序按订阅顺序依次 await(同步执行 + 异步推进)
并发信号回调接收当前 run 的 AbortSignal,可用于响应取消
解注册返回 () => void 取消函数
生命周期agent_end 是最后一个事件,但 agent 不会 idle 直到所有 listener 处理完 agent_end

内部调用方式(在 processEvents 中):

1
2
3
for (const listener of this.listeners) {
  await listener(event, signal);
}

prompt() — 发起对话

1
2
3
// 三种重载
async prompt(message: AgentMessage | AgentMessage[]): Promise<void>
async prompt(input: string, images?: ImageContent[]): Promise<void>

流程:

prompt(input, images?)
  -> normalizePromptInput() 统一成 AgentMessage[]
  -> 检查 activeRun?存在 -> throw Error
  -> runPromptMessages(messages)
    -> runWithLifecycle()
      -> createContextSnapshot()
      -> createLoopConfig()
      -> runAgentLoop(..., emit -> processEvents)

normalizePromptInput() 规则:

输入输出
AgentMessage[]透传
AgentMessage[message]
string[{ role: "user", content: [{ type: "text", text: input }], timestamp }]
string + images同上,content 追加 images

continue() — 继续对话

逻辑分支:

continue()
  -> activeRun 存在? -> throw Error
  -> 最后一条消息角色?
    -> 无消息 -> throw Error
    -> assistant -> steeringQueue 有消息?
      -> 有 -> drain() -> runPromptMessages(steering, skipInitialSteeringPoll=true)
      -> 无 -> followUpQueue 有消息?
        -> 有 -> drain() -> runPromptMessages(followUps)
        -> 无 -> throw Error "Cannot continue from assistant"
    -> user/toolResult -> runContinuation() -> runAgentLoopContinue()

steer() / followUp() — 消息队列

方法注入时机对应回调用途
steer()内层每次迭代前getSteeringMessages()用户想"插话"、外部事件推送
followUp()内层结束后、agent 停止前getFollowUpMessages()用户确认、后台任务结果

PendingMessageQueuedrain() 模式通过 QueueMode 控制:

模式drain() 行为场景
"one-at-a-time"(默认)返回队列第一条,从队列移除用户快速打字,每条消息独立处理
"all"返回全部并清空批量注入,多条消息合并处理

abort() / waitForIdle() / reset() — 生命周期控制

方法行为
abort()activeRun.abortController.abort() -> LLM 调用和工具执行收到取消信号
waitForIdle()返回 activeRun.promise,在 finishRun() 中被 resolve
reset()清空 messages / isStreaming / pendingToolCalls / errorMessage,并清空两个队列

runWithLifecycle() — 并发控制核心

runWithLifecycle(executor)
  -> activeRun 存在? -> throw Error
  -> 创建 AbortController + Promise
  -> activeRun = { promise, resolve, abortController }
  -> isStreaming = true
  -> try { executor(signal) }
    -> 成功 -> finally { finishRun() }
    -> 失败 -> handleRunFailure(error, aborted) -> finally { finishRun() }
状态变更时机
isStreaming = truerun 开始时
activeRun = { promise, resolve, abortController }run 开始时
isStreaming = falsefinishRun()
activeRun = undefinedfinishRun()

handleRunFailure() — 失败处理

将错误编码为一条 stopReason: "error" / "aborted"AgentMessage 并推入对话历史:

场景stopReason
外部调用 abort()"aborted"
LLM 调用抛错、工具执行抛错等"error"
1
2
3
4
5
6
7
const failureMessage = {
  role: "assistant", content: [{ type: "text", text: "" }],
  stopReason: aborted ? "aborted" : "error",
  errorMessage: error instanceof Error ? error.message : String(error),
};
this._state.messages.push(failureMessage);
await this.processEvents({ type: "agent_end", messages: [failureMessage] });

所有失败都被编码为正常的 AgentMessage 推入对话历史,应用层可通过 subscribe() 监听到 agent_end 事件。


finishRun() — 运行终结

1
2
3
4
5
6
7
private finishRun(): void {
  this._state.isStreaming = false;
  this._state.streamingMessage = undefined;
  this._state.pendingToolCalls = new Set<string>();
  this.activeRun?.resolve();
  this.activeRun = undefined;
}

清理 isStreamingstreamingMessagependingToolCalls,兑现 waitForIdle() 的 Promise,清除 activeRun 以允许下一次调用。


processEvents() — 事件归约(状态机)

类似 Redux reducer,接收 AgentEvent 并更新 _state

事件状态变更
message_startstreamingMessage = event.message
message_updatestreamingMessage = updated
message_endstreamingMessage = undefined,推入 messages[]
tool_execution_start新建 Set,添加 toolCallId
tool_execution_end新建 Set,删除 toolCallId
turn_end如有 errorMessage,更新 _state.errorMessage
agent_endstreamingMessage = undefined

然后按订阅顺序依次 await 所有 listener。

不可变性保证: pendingToolCalls 每次更新都新建 Set,外部通过 AgentState.pendingToolCalls 只读到 ReadonlySet,防止篡改。


createContextSnapshot() / createLoopConfig() — 适配层

函数作用
createContextSnapshot()_state 转换为 AgentContext(无状态引擎入参),messagestools 做浅拷贝
createLoopConfig()将 Agent 的配置和钩子映射为 AgentLoopConfig,同时将 steeringQueue / followUpQueue 包装为 getSteeringMessages / getFollowUpMessages 回调

然后按订阅顺序依次 await 所有 listener。

并发控制(ActiveRun)

sequenceDiagram participant App as 应用 participant Agent as Agent participant Engine as agent-loop App->>Agent: prompt("Hello") Agent->>Agent: 检查 activeRun Note over Agent: activeRun 不存在 → 继续 Agent->>Agent: 创建 AbortController Agent->>Agent: isStreaming = true Agent->>Agent: activeRun = { promise, resolve, abortController } Agent->>Engine: runAgentLoop(...) alt 正常完成 Engine-->>Agent: 返回 Agent->>Agent: finishRun() Note over Agent: isStreaming = false
resolve promise Agent-->>App: waitForIdle 完成 else 异常 Engine-->>Agent: throw error Agent->>Agent: handleRunFailure(error) Note over Agent: 生成 error 消息
emit agent_end Agent->>Agent: finishRun() else 外部取消 App->>Agent: abort() Agent->>Agent: abortController.abort() Engine-->>Agent: 检测 signal.aborted Agent->>Agent: handleRunFailure(error, aborted=true) end

完整调用链

sequenceDiagram participant App as 应用 participant A as agent.ts participant AL as agent-loop.ts participant LLM as LLM Provider App->>A: agent.prompt("Hello") A->>A: normalizePromptInput() A->>A: runWithLifecycle() Note over A: 设 isStreaming = true A->>A: createContextSnapshot() A->>A: createLoopConfig() A->>AL: runAgentLoop(prompts, ctx, config, emit) AL->>AL: agent_start / turn_start AL->>AL: message_start/end for prompt loop 内层循环 AL->>AL: getSteeringMessages() AL->>AL: streamAssistantResponse() AL->>AL: transformContext() AL->>AL: convertToLlm() AL->>LLM: streamSimple(model, context) LLM-->>AL: text_delta / toolcall_delta AL-->>A: emit(message_start/update/end) alt 有 tool calls AL->>AL: executeToolCalls() AL-->>A: emit(tool_execution_start) AL->>AL: prepareToolCall() AL->>AL: beforeToolCall 钩子 AL->>AL: executePreparedToolCall() AL->>AL: finalizeExecutedToolCall() AL->>AL: afterToolCall 钩子 AL-->>A: emit(tool_execution_end) end AL-->>A: emit(turn_end) AL->>AL: shouldStopAfterTurn? end loop 外层循环 AL->>AL: getFollowUpMessages() end AL-->>A: emit(agent_end) A->>A: processEvents() 更新 state A->>A: 通知 listeners A->>A: finishRun() Note over A: isStreaming = false

HTTP 代理(proxy.ts)

streamProxy 是一个替代 streamSimple 的流式函数,用于需要经过代理服务器的场景。

sequenceDiagram participant Agent as Agent participant Proxy as streamProxy participant Server as 代理服务器 participant LLM as LLM Provider Agent->>Proxy: streamProxy(model, context, options) Proxy->>Proxy: 创建 partial message Proxy->>Server: POST /api/stream
Authorization: Bearer {token}
Body: { model, context, options } Server->>LLM: 转发请求 loop SSE 流式响应 LLM-->>Server: text_delta / toolcall_delta Server-->>Proxy: data: { type: "text_delta", delta: "..." } Note over Proxy: 重建 partial message Proxy-->>Agent: push(text_delta, partial) end Server-->>Proxy: data: { type: "done", reason: "stop", usage } Proxy-->>Agent: push(done, finalMessage) Proxy-->>Agent: end()

关键设计:

  • 服务器端去掉 partial 字段以减少带宽
  • 客户端根据事件序列重建完整的 AssistantMessage
  • 支持 text / thinking / toolcall 所有三种内容类型
  • 异常时生成 stopReason: "error" / "aborted" 的消息,不抛异常

容错设计

Agent 包的设计哲学是永不崩溃——所有可能的失败都被捕获并编码为正常事件流:

失败场景处理方式
LLM 调用失败stopReason: "error", errorMessage: 描述
工具不存在生成 error tool result: "Tool xxx not found"
参数校验失败catch 后生成 error tool result
beforeToolCall 阻拦生成 error tool result: "Tool execution was blocked"
afterToolCall 抛错catch 后覆盖为 error tool result
transformContext 抛错通过 runWithLifecycle 的 catch → handleRunFailure
convertToLlm 抛错同上
外部取消stopReason: "aborted"
工具执行抛错catch → error tool result

所有错误最终都以 AgentEvent 的形式通过 emit 输出,应用层可以通过 subscribe() 统一处理。


使用示例

基础用法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import { Agent } from "@mariozechner/agent";

const agent = new Agent({
  initialState: {
    systemPrompt: "你是一个有用的助手。",
    model: myModel,
  },
});

// 订阅事件(更新 UI)
agent.subscribe((event) => {
  switch (event.type) {
    case "message_update":
      updateUI(event.message);
      break;
    case "turn_end":
      updateToolPanel(event.toolResults);
      break;
  }
});

// 发起对话
await agent.prompt("Hello!");

// 注入中间消息(不打断当前 LLM 调用)
agent.steer({ role: "user", content: [{ type: "text", text: "等一下..." }] });

自定义消息类型

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
// 扩展自定义消息
declare module "@mariozechner/agent" {
  interface CustomAgentMessages {
    artifact: { role: "artifact"; content: ArtifactContent[]; timestamp: number };
  }
}

// 自定义转换函数
const agent = new Agent({
  convertToLlm: (messages) =>
    messages.flatMap((m) => {
      if (m.role === "artifact") return []; // 过滤掉 artifact 消息
      return [m];
    }),
});

使用代理

1
2
3
4
5
6
7
8
const agent = new Agent({
  streamFn: (model, context, options) =>
    streamProxy(model, context, {
      ...options,
      authToken: await getAuthToken(),
      proxyUrl: "https://genai.example.com",
    }),
});

设计决策

为什么分离引擎层和管理层?

  1. 可测试性agent-loop.ts 是纯函数,无需 mock 状态即可单元测试
  2. 复用性:无状态引擎可以被不同的管理层复用
  3. 关注点分离:引擎只管循环逻辑,管理只管状态和并发

为什么用双层循环(steering + follow-up)?

  • steering:用户想在 agent 工作过程中"插话",需要立即处理
  • follow-up:用户想等 agent 完成后再补充,不打断当前工作流
  • 双层结构让这两种场景各得其所

为什么工具终止需要全部一致?

  • 防止一个工具意外终止整个 agent
  • 只有所有工具都明确要求终止时才生效
  • 调用方可以通过 afterToolCall 钩子调整 terminate 标志

为什么用 Set + 替换而非直接 mutable?

  • AgentState.pendingToolCalls 对外是 ReadonlySet,保证状态不可变性
  • 内部每次更新都新建 Set,防止外部持有引用后篡改