Pi Agent 包源码逐行解读#
本文对 packages/agent/src/ 下的 agent-loop.ts(~700 行)和 agent.ts(~540 行)做逐行级的代码解读,涵盖每个函数的设计意图、类型系统、控制流和错误处理策略。
- 文件概览
- agent-loop.ts 逐行解读
- agent.ts 逐行解读
- 设计模式总结
文件概览#
packages/agent/src/
├── agent-loop.ts # 无状态循环引擎(~700 行,纯函数)
├── agent.ts # 有状态封装层(~540 行,Agent 类)
├── types.ts # 类型定义(~400 行)
├── proxy.ts # HTTP 代理(~320 行)
├── index.ts # 导出入口(~8 行)
├── agent-learn.ts # 学习相关
├── agent-loop-learn.ts
└── README.md
核心分界线:
| 层面 | 文件 | 有无状态 | 职责 |
|---|
| 引擎层 | agent-loop.ts | 无状态(纯函数) | 双层循环、LLM 调用、工具执行、事件发射 |
| 管理层 | agent.ts | 有状态(Agent 类) | 状态管理、并发控制、消息队列、事件分发 |
pi-ai 核心类型体系#
agent-loop.ts 和 agent.ts 大量依赖 @earendil-works/pi-ai(位于 packages/ai/)的核心类型。这些类型构成了 LLM 调用、流式事件、消息格式和工具系统的基石。
Message — 统一消息类型#
1
| type Message = UserMessage | AssistantMessage | ToolResultMessage;
|
三种消息角色构成了 LLM 对话的基本单元:
UserMessage#
1
2
3
4
5
| interface UserMessage {
role: "user";
content: string | (TextContent | ImageContent)[];
timestamp: number;
}
|
| 字段 | 说明 |
|---|
content | 字符串(自动转为 text content)或 content 数组(支持文本+图片多模态) |
timestamp | Unix 毫秒时间戳,用于排序和上下文管理 |
AssistantMessage#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
| interface AssistantMessage {
role: "assistant";
content: (TextContent | ThinkingContent | ToolCall)[];
api: Api;
provider: Provider;
model: string;
responseModel?: string;
responseId?: string;
diagnostics?: AssistantMessageDiagnostic[];
usage: Usage;
stopReason: StopReason;
errorMessage?: string;
timestamp: number;
}
|
| 字段 | 说明 |
|---|
content | 可混合文本、思考过程和工具调用 |
api / provider / model | 标识响应的来源 |
responseModel | 实际处理请求的模型(如 OpenRouter 路由后可能与请求模型不同) |
responseId | Provider 返回的响应标识符 |
usage | Token 用量(含 cost 估算) |
stopReason | 终止原因:"stop" / "length" / "toolUse" / "error" / "aborted" |
errorMessage | 仅在 stopReason 为 error/aborted 时存在 |
stopReason 语义:
| 值 | 含义 |
|---|
"stop" | LLM 正常结束 |
"length" | 达到 maxTokens 截断 |
"toolUse" | LLM 要求调用工具 |
"error" | LLM 调用失败或被 engine catch 到异常后转换 |
"aborted" | 外部调用 abortController.abort() |
1
2
3
4
5
6
7
8
9
| interface ToolResultMessage<TDetails = any> {
role: "toolResult";
toolCallId: string;
toolName: string;
content: (TextContent | ImageContent)[];
details?: TDetails;
isError: boolean;
timestamp: number;
}
|
| 字段 | 说明 |
|---|
toolCallId | 与 AssistantMessage 中的 ToolCall.id 对应 |
toolName | 工具名称,用于路由和日志 |
isError | 工具执行是否出错 |
Content 类型#
TextContent#
1
| interface TextContent { type: "text"; text: string; textSignature?: string; }
|
ThinkingContent#
1
2
3
4
5
6
| interface ThinkingContent {
type: "thinking";
thinking: string;
thinkingSignature?: string;
redacted?: boolean; // 安全过滤导致内容被隐去
}
|
ImageContent#
1
| interface ImageContent { type: "image"; data: string; mimeType: string; }
|
1
2
3
4
5
6
7
| interface ToolCall {
type: "toolCall";
id: string;
name: string;
arguments: Record<string, any>;
thoughtSignature?: string;
}
|
Context — LLM 上下文#
1
2
3
4
5
| interface Context {
systemPrompt?: string;
messages: Message[];
tools?: Tool[];
}
|
| 字段 | 说明 |
|---|
systemPrompt | 可选,系统提示词 |
messages | 对话历史(Message[],即 UserMessage / AssistantMessage / ToolResultMessage) |
tools | 可选,工具定义 |
Agent 上下文 vs LLM 上下文的区别:
AgentContext(Agent 层) Context(LLM 层)
systemPrompt: string systemPrompt?: string
messages: AgentMessage[] --convertToLlm--> messages: Message[]
tools: AgentTool[] tools?: Tool[]
AgentMessage 是 Agent 层的扩展消息(支持自定义角色),而 Message 是 LLM 层的标准消息。
1
2
3
4
5
| interface Tool<TParameters extends TSchema = TSchema> {
name: string;
description: string;
parameters: TParameters; // JSON Schema (TypeBox)
}
|
使用 TypeBox 的 TSchema 定义参数类型,支持 JSON Schema 校验。
Model — 模型定义#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
| interface Model<TApi extends Api> {
id: string;
name: string;
api: TApi;
provider: Provider;
baseUrl: string;
reasoning: boolean;
thinkingLevelMap?: ThinkingLevelMap;
input: ("text" | "image")[];
cost: { input: number; output: number; cacheRead: number; cacheWrite: number; };
contextWindow: number;
maxTokens: number;
compat?: OpenAICompletionsCompat | OpenAIResponsesCompat | AnthropicMessagesCompat;
}
|
| 字段 | 说明 |
|---|
api | 已知 API 类型(如 "openai-completions", "anthropic-messages" 等 30+ 种) |
provider | 已知 Provider 名称(如 "openai", "anthropic", "bedrock" 等 30+ 种) |
reasoning | 是否支持推理 |
cost | 每百万 token 的价格(用于 cost 估算) |
compat | API 兼容性覆盖 |
AssistantMessageEvent — 流式事件协议#
共 13 种子事件:
1
2
3
4
5
6
7
8
9
10
11
12
13
| type AssistantMessageEvent =
| { type: "start"; partial }
| { type: "text_start"; contentIndex; partial }
| { type: "text_delta"; contentIndex; delta; partial }
| { type: "text_end"; contentIndex; content; partial }
| { type: "thinking_start"; contentIndex; partial }
| { type: "thinking_delta"; contentIndex; delta; partial }
| { type: "thinking_end"; contentIndex; content; partial }
| { type: "toolcall_start"; contentIndex; partial }
| { type: "toolcall_delta"; contentIndex; delta; partial }
| { type: "toolcall_end"; contentIndex; toolCall; partial }
| { type: "done"; reason; message }
| { type: "error"; reason; error }
|
状态机:
start -> text_start -> text_delta* -> text_end -> done
-> thinking_start -> thinking_delta* -> thinking_end ->
-> toolcall_start -> toolcall_delta* -> toolcall_end ->
-> error (任意时刻)
EventStream — 通用流式事件容器#
1
2
3
4
5
6
7
| class EventStream<T, R = T> implements AsyncIterable<T> {
constructor(isComplete: (event: T) => boolean, extractResult: (event: T) => R) {}
push(event: T): void
end(result?: R): void
[Symbol.asyncIterator](): AsyncIterator<T>
result(): Promise<R>
}
|
三要素:
| 方法 | 用途 | 调用方 |
|---|
push(event) | 发射事件 | 生产者 |
for await (const e of stream) | 消费事件 | 消费者 |
result() | 获取最终结果 | 调用方 |
内部机制:
1
2
3
4
5
6
7
8
9
10
11
12
| push(event: T): void {
if (this.isComplete(event)) {
this.done = true;
this.resolveFinalResult(this.extractResult(event));
}
const waiter = this.waiting.shift();
if (waiter) {
waiter({ value: event, done: false }); // 直接交付等待者
} else {
this.queue.push(event); // 无人消费则入队
}
}
|
- 生产者-消费者模式:事件要么被 for-await 消费者立即取走,要么入队等待
isComplete 判定流终止,extractResult 提取最终结果
AssistantMessageEventStream#
1
2
3
4
5
6
7
8
9
10
11
| class AssistantMessageEventStream extends EventStream<AssistantMessageEvent, AssistantMessage> {
constructor() {
super(
(event) => event.type === "done" || event.type === "error",
(event) => {
if (event.type === "done") return event.message;
if (event.type === "error") return event.error;
},
);
}
}
|
LLM 返回的流。done 或 error 事件触发流结束。
StreamFunction — 流式函数签名#
1
2
3
4
5
| type StreamFunction = (
model: Model,
context: Context,
options?: StreamOptions,
) => AssistantMessageEventStream;
|
契约:
- 必须返回
AssistantMessageEventStream - 错误应编码在流中(error 事件),而非抛异常
- 错误终止必须产生
stopReason: "error" | "aborted" 的 AssistantMessage
StreamOptions / SimpleStreamOptions — 流式选项#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| interface StreamOptions {
temperature?: number;
maxTokens?: number;
signal?: AbortSignal;
apiKey?: string;
transport?: Transport; // "sse" | "websocket" | "websocket-cached" | "auto"
cacheRetention?: CacheRetention;
sessionId?: string;
onPayload?: (payload, model) => unknown;
onResponse?: (response, model) => void;
timeoutMs?: number;
maxRetries?: number;
maxRetryDelayMs?: number;
}
interface SimpleStreamOptions extends StreamOptions {
reasoning?: ThinkingLevel;
thinkingBudgets?: ThinkingBudgets;
}
|
类型关系总览#
graph TB
subgraph "LLM 核心库"
Message["Message"]
AM["AssistantMessage
stopReason / usage"]
Context["Context"]
Tool["Tool"]
Event["AssistantMessageEvent
13 种"]
Stream["EventStream"]
end
subgraph "Agent 包"
AgentMsg["AgentMessage"]
AgentCtx["AgentContext"]
AgentTool["AgentTool"]
AgentEvent["AgentEvent"]
end
Message --> AgentMsg
Context -.->|convertToLlm| AgentCtx
Tool --> AgentTool
## agent-loop.ts 逐行解读
### 类型与导入
```typescript
import {
type AssistantMessage,
type Context,
EventStream,
streamSimple,
type ToolResultMessage,
validateToolArguments,
} from "@earendil-works/pi-ai";
import type {
AgentContext,
AgentEvent,
AgentLoopConfig,
AgentMessage,
AgentTool,
AgentToolCall,
AgentToolResult,
StreamFn,
} from "./types.js";
| 导入 | 来源 | 用途 |
|---|
AssistantMessage | pi-ai | LLM 返回的 assistant 消息类型 |
Context | pi-ai | LLM 调用所需的上下文(systemPrompt + messages + tools) |
EventStream | pi-ai | 流式事件流,支持 push/end 模式 |
streamSimple | pi-ai | 默认的 LLM 流式调用函数 |
ToolResultMessage | pi-ai | 工具执行返回的消息类型 |
validateToolArguments | pi-ai | 工具参数的 JSON Schema 校验 |
AgentContext | ./types | Agent 层的上下文(systemPrompt + AgentMessage[] + tools) |
AgentEvent | ./types | Agent 生命周期事件的联合类型(13 种) |
AgentLoopConfig | ./types | 引擎配置(含回调钩子) |
AgentMessage | ./types | Agent 统一消息类型(可扩展) |
AgentTool | ./types | Agent 工具定义 |
AgentToolCall | ./types | LLM 发出的工具调用请求 |
AgentToolResult | ./types | 工具执行结果 |
StreamFn | ./types | 流式函数类型签名 |
设计要点: agent-loop.ts 不直接依赖 Agent 类,它通过 AgentEventSink(emit 回调)与上层通信。这是依赖倒置的体现——引擎层定义接口,管理层实现接口。
AgentEventSink — 事件接收器类型#
1
| export type AgentEventSink = (event: AgentEvent) => Promise<void> | void;
|
- 同步或异步均可
- 引擎层
await 每次 emit 调用,保证事件顺序 - 这是引擎层与管理层之间的唯一通信通道
agentLoop() / agentLoopContinue() — 公开入口#
1
2
3
4
5
6
7
| export function agentLoop(
prompts: AgentMessage[],
context: AgentContext,
config: AgentLoopConfig,
signal?: AbortSignal,
streamFn?: StreamFn,
): EventStream<AgentEvent, AgentMessage[]>
|
逐行解读:
1
| const stream = createAgentStream();
|
- 创建
EventStream 实例,内部实现了终止判断和结果提取
1
2
3
4
5
| void runAgentLoop(
prompts, context, config,
async (event) => { stream.push(event); },
signal, streamFn,
).then((messages) => { stream.end(messages); });
|
void 关键字:显式表达我们不 await 这个 Promise(由 EventStream 管理生命周期)stream.push(event):每产生一个 AgentEvent,立即推入流stream.end(messages):流结束时,AgentMessage[] 作为最终结果写入流
- 调用方通过
for await (const event of stream) 消费事件 - 最终通过
stream.result() 获取 AgentMessage[]
agentLoopContinue() 的额外校验:
1
2
3
4
5
6
| if (context.messages.length === 0) {
throw new Error("Cannot continue: no messages in context");
}
if (context.messages[context.messages.length - 1].role === "assistant") {
throw new Error("Cannot continue from message role: assistant");
}
|
- 空上下文 → 无法继续
- 最后一条是 assistant → LLM 的 context 最后一条必须是 user 或 toolResult,否则 LLM provider 会拒绝请求
runAgentLoop() / runAgentLoopContinue() — 异步底层#
1
2
3
4
5
6
7
8
| export async function runAgentLoop(
prompts: AgentMessage[],
context: AgentContext,
config: AgentLoopConfig,
emit: AgentEventSink,
signal?: AbortSignal,
streamFn?: StreamFn,
): Promise<AgentMessage[]>
|
逐行解读:
1
| const newMessages: AgentMessage[] = [...prompts];
|
newMessages 是返回值,记录本轮新增的所有消息(prompt + assistant response + tool results)
1
2
3
4
| const currentContext: AgentContext = {
...context,
messages: [...context.messages, ...prompts],
};
|
- 浅拷贝 context,并在末尾追加 prompt 消息
- 注意
...context 是浅拷贝,但 messages: [...context.messages, ...prompts] 是新数组,防止引擎层篡改外部引用
1
2
3
4
5
6
| await emit({ type: "agent_start" });
await emit({ type: "turn_start" });
for (const prompt of prompts) {
await emit({ type: "message_start", message: prompt });
await emit({ type: "message_end", message: prompt });
}
|
- 发射生命周期事件:agent_start → turn_start → 每个 prompt 的 message_start/end
- prompt 消息的 message_start 和 message_end 连续发射(中间没有 update 事件,因为 prompt 是已确定的消息)
1
2
| await runLoop(currentContext, newMessages, config, signal, emit, streamFn);
return newMessages;
|
runAgentLoopContinue() 与 runAgentLoop() 的差异:
newMessages 初始化为 [](没有新 prompt)currentContext 直接 { ...context }(不追加消息)- 不发射 prompt 的 message_start/end 事件
createAgentStream() — EventStream 工厂#
1
2
3
4
5
6
| function createAgentStream(): EventStream<AgentEvent, AgentMessage[]> {
return new EventStream<AgentEvent, AgentMessage[]>(
(event: AgentEvent) => event.type === "agent_end",
(event: AgentEvent) => (event.type === "agent_end" ? event.messages : []),
);
}
|
- 第一个参数(终止判定):当 event.type === “agent_end” 时流结束
- 第二个参数(结果提取):从 agent_end 事件中提取 messages 作为最终结果
- 调用方通过
stream.result() 获取 AgentMessage[]
runLoop() — 核心双层循环#
1
2
3
4
5
6
7
8
| async function runLoop(
currentContext: AgentContext,
newMessages: AgentMessage[],
config: AgentLoopConfig,
signal: AbortSignal | undefined,
emit: AgentEventSink,
streamFn?: StreamFn,
): Promise<void>
|
这是整个 Agent 包最核心的函数,约 80 行。
初始化#
1
2
| let firstTurn = true;
let pendingMessages: AgentMessage[] = (await config.getSteeringMessages?.()) || [];
|
| 变量 | 用途 |
|---|
firstTurn | 首个 turn 已在入口函数中发射了 turn_start,后续 turn 需要自行发射 |
pendingMessages | 两层循环的桥梁:内层消费,外层(follow-up)生产 |
外层循环#
1
2
| while (true) {
let hasMoreToolCalls = true;
|
hasMoreToolCalls 每轮外层循环重置为 true(进入内层至少一次,检查 steering 消息)
内层循环条件#
1
| while (hasMoreToolCalls || pendingMessages.length > 0) {
|
内层持续的条件:
hasMoreToolCalls === true:当前轮次有 tool call,且未全部 terminatependingMessages.length > 0:steering 队列中有待处理消息
Turn 管理#
1
2
3
4
5
| if (!firstTurn) {
await emit({ type: "turn_start" });
} else {
firstTurn = false;
}
|
- 首个 turn 不发射(已在入口函数中发射)
- 后续 turn(steering 注入后、follow-up 注入后)各自发射 turn_start
处理 pending 消息#
1
2
3
4
5
6
7
8
9
| if (pendingMessages.length > 0) {
for (const message of pendingMessages) {
await emit({ type: "message_start", message });
await emit({ type: "message_end", message });
currentContext.messages.push(message);
newMessages.push(message);
}
pendingMessages = [];
}
|
- 把 steering 队列中的消息以 message_start → message_end 的事件序列注入
- 同时追加到 currentContext(LLM 调用用)和 newMessages(返回值用)
- 清空 pendingMessages
LLM 调用#
1
2
| const message = await streamAssistantResponse(currentContext, config, signal, emit, streamFn);
newMessages.push(message);
|
- 返回
AssistantMessage(LLM 的完整回复) - 推入 newMessages
错误/中止检测#
1
2
3
4
5
| if (message.stopReason === "error" || message.stopReason === "aborted") {
await emit({ type: "turn_end", message, toolResults: [] });
await emit({ type: "agent_end", messages: newMessages });
return;
}
|
- stopReason 为 error 或 aborted → 立即终止整个 agent
- 先发射 turn_end(空 toolResults),再发射 agent_end
工具调用检测与执行#
1
2
3
4
5
6
7
8
9
10
11
12
13
| const toolCalls = message.content.filter((c) => c.type === "toolCall");
const toolResults: ToolResultMessage[] = [];
hasMoreToolCalls = false;
if (toolCalls.length > 0) {
const executedToolBatch = await executeToolCalls(currentContext, message, config, signal, emit);
toolResults.push(...executedToolBatch.messages);
hasMoreToolCalls = !executedToolBatch.terminate;
for (const result of toolResults) {
currentContext.messages.push(result);
newMessages.push(result);
}
}
|
hasMoreToolCalls = !executedToolBatch.terminate:当所有工具都返回 terminate: true 时,内层循环终止- tool result 同时推入 currentContext(LLM 下一轮上下文)和 newMessages
Turn 结束与停止判定#
1
2
3
4
5
6
| await emit({ type: "turn_end", message, toolResults });
if (await config.shouldStopAfterTurn?.({ message, toolResults, context: currentContext, newMessages })) {
await emit({ type: "agent_end", messages: newMessages });
return;
}
|
shouldStopAfterTurn 是可选回调,调用方可以自定义停止条件(如:基于消息数量、工具调用次数等)- 若返回 true → agent_end
获取下一轮 steering 消息#
1
| pendingMessages = (await config.getSteeringMessages?.()) || [];
|
- 内层循环末尾拉取 steering 消息,如果有 → 继续内层
外层 follow-up 机制#
1
2
3
4
5
6
| const followUpMessages = (await config.getFollowUpMessages?.()) || [];
if (followUpMessages.length > 0) {
pendingMessages = followUpMessages;
continue;
}
break;
|
- 内层结束(无更多 tool call 和 steering 消息)
- 检查 follow-up 队列,有消息 → 设为 pending → continue 外层循环
- 无消息 → break 退出
1
| await emit({ type: "agent_end", messages: newMessages });
|
streamAssistantResponse() — LLM 流式调用#
1
2
3
4
5
6
7
| async function streamAssistantResponse(
context: AgentContext,
config: AgentLoopConfig,
signal: AbortSignal | undefined,
emit: AgentEventSink,
streamFn?: StreamFn,
): Promise<AssistantMessage>
|
第一步:上下文变换(可选)#
1
2
3
4
| let messages = context.messages;
if (config.transformContext) {
messages = await config.transformContext(messages, signal);
}
|
transformContext 允许对整个消息列表做预处理- 典型用途:过滤、重排序、注入系统消息、合并上下文窗口
第二步:消息转换#
1
| const llmMessages = await config.convertToLlm(messages);
|
- AgentMessage[] → Message[]:这是 Agent 世界与 LLM 世界的边界
- 默认实现:只保留
user、assistant、toolResult 角色,过滤掉自定义消息
第三步:构造 LLM Context#
1
2
3
4
5
| const llmContext: Context = {
systemPrompt: context.systemPrompt,
messages: llmMessages,
tools: context.tools,
};
|
第四步:选择流函数#
1
| const streamFunction = streamFn || streamSimple;
|
第五步:解析 API Key#
1
2
| const resolvedApiKey =
(config.getApiKey ? await config.getApiKey(config.model.provider) : undefined) || config.apiKey;
|
- 支持动态获取 API key(如短期 OAuth token)
- 先尝试
getApiKey 回调,fallback 到 config.apiKey - 短路求值:若
config.getApiKey 未定义,跳过调用
第六步:发起 LLM 调用#
1
2
3
4
5
| const response = await streamFunction(config.model, llmContext, {
...config,
apiKey: resolvedApiKey,
signal,
});
|
- 将
AgentLoopConfig 的所有属性透传给 LLM 核心库(model、reasoning、sessionId、onPayload、transport、thinkingBudgets 等) apiKey 覆盖传入
第七步:事件循环#
1
2
| let partialMessage: AssistantMessage | null = null;
let addedPartial = false;
|
| 变量 | 用途 |
|---|
partialMessage | 当前正在构建中的消息引用 |
addedPartial | 标记是否已推入占位消息到 context.messages |
1
2
| for await (const event of response) {
switch (event.type) {
|
case “start”#
1
2
3
4
5
6
| case "start":
partialMessage = event.partial;
context.messages.push(partialMessage);
addedPartial = true;
await emit({ type: "message_start", message: { ...partialMessage } });
break;
|
- 将 partial 消息推入 context.messages 末尾作为占位
- 浅拷贝后发射 message_start,防止 listener 篡改引用
case text/thinking/toolcall delta#
1
2
3
4
5
6
7
8
9
| case "text_delta":
case "toolcall_delta":
// ... 等 11 种子事件类型
if (partialMessage) {
partialMessage = event.partial;
context.messages[context.messages.length - 1] = partialMessage;
await emit({ type: "message_update", assistantMessageEvent: event, message: { ...partialMessage } });
}
break;
|
partialMessage = event.partial:变量指向新的引用(事件对象中的更新版本)context.messages[last] = partialMessage:原地替换占位消息- 关键设计:使用引用更新,避免每次 delta 都深拷贝完整消息
case “done” / “error”#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
| case "done":
case "error": {
const finalMessage = await response.result();
if (addedPartial) {
context.messages[context.messages.length - 1] = finalMessage;
} else {
context.messages.push(finalMessage);
}
if (!addedPartial) {
await emit({ type: "message_start", message: { ...finalMessage } });
}
await emit({ type: "message_end", message: finalMessage });
return finalMessage;
}
|
- 分支 A:
addedPartial === true → 替换占位 - 分支 B:
addedPartial === false → 没有 start 事件(LLM 直接返回完整结果),推入并补发 message_start
防线:for-await 正常结束(无 done/error 事件)#
1
2
3
4
5
6
7
8
9
| const finalMessage = await response.result();
if (addedPartial) {
context.messages[context.messages.length - 1] = finalMessage;
} else {
context.messages.push(finalMessage);
await emit({ type: "message_start", message: { ...finalMessage } });
}
await emit({ type: "message_end", message: finalMessage });
return finalMessage;
|
- 与 done/error 分支逻辑相同
- 此分支作为 for-await 循环正常结束的兜底
1
2
3
| async function executeToolCalls(
currentContext, assistantMessage, config, signal, emit
): Promise<ExecutedToolCallBatch>
|
调度逻辑(仅 7 行):
1
| const toolCalls = assistantMessage.content.filter((c) => c.type === "toolCall");
|
- 从 assistant 消息中提取所有 tool call
1
2
3
| const hasSequentialToolCall = toolCalls.some(
(tc) => currentContext.tools?.find((t) => t.name === tc.name)?.executionMode === "sequential",
);
|
1
2
3
4
| if (config.toolExecution === "sequential" || hasSequentialToolCall) {
return executeToolCallsSequential(...);
}
return executeToolCallsParallel(...);
|
1
2
3
4
5
| async function executeToolCallsSequential(...): Promise<ExecutedToolCallBatch> {
const finalizedCalls: FinalizedToolCallOutcome[] = [];
const messages: ToolResultMessage[] = [];
for (const toolCall of toolCalls) {
|
每个 tool call 的完整生命周期:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
| // 1. 发射 tool_execution_start
await emit({ type: "tool_execution_start", toolCallId, toolName, args });
// 2. 前置处理
const preparation = await prepareToolCall(...);
// 3. immediate → 直接记录错误
if (preparation.kind === "immediate") {
finalized = { toolCall, result: preparation.result, isError: preparation.isError };
} else {
// 4. 执行
const executed = await executePreparedToolCall(preparation, signal, emit);
// 5. 后处理
finalized = await finalizeExecutedToolCall(...);
}
// 6. 发射 tool_execution_end
await emitToolExecutionEnd(finalized, emit);
// 7. 构建 ToolResultMessage 并发射
const toolResultMessage = createToolResultMessage(finalized);
await emitToolResultMessage(toolResultMessage, emit);
|
串行的关键: for...of 确保逐个处理,一个完成才到下一个。
两阶段设计:
Phase 1:串行 prepare#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
| for (const toolCall of toolCalls) {
await emit({ type: "tool_execution_start", ... });
const preparation = await prepareToolCall(...);
if (preparation.kind === "immediate") {
// 直接记入结果(不是 thunk)
finalizedCalls.push(finalized);
continue;
}
// 包装为 lazy thunk
finalizedCalls.push(async () => {
const executed = await executePreparedToolCall(preparation, signal, emit);
const finalized = await finalizeExecutedToolCall(...);
await emitToolExecutionEnd(finalized, emit);
return finalized;
});
}
|
kind | 处理方式 |
|---|
"immediate"(工具未找到 / 校验失败 / beforeToolCall 阻拦) | 直接记入 finalizedCalls 数组 |
"prepared" | 包装为 () => Promise<FinalizedToolCallOutcome> 的 thunk |
FinalizedToolCallEntry 类型:
1
| type FinalizedToolCallEntry = FinalizedToolCallOutcome | (() => Promise<FinalizedToolCallOutcome>);
|
即:要么是直接结果(immediate),要么是延迟计算的 thunk(prepared)。
Phase 2:并发 execute#
1
2
3
| const orderedFinalizedCalls = await Promise.all(
finalizedCalls.map((entry) => (typeof entry === "function" ? entry() : Promise.resolve(entry))),
);
|
Promise.all() 并发执行所有 thunktypeof entry === "function" 区分 direct result 和 thunkorderedFinalizedCalls 保持原始 tool call 顺序(因为 finalizedCalls 的顺序与 Phase 1 遍历顺序一致)
1
2
3
4
5
6
| const messages: ToolResultMessage[] = [];
for (const finalized of orderedFinalizedCalls) {
const toolResultMessage = createToolResultMessage(finalized);
await emitToolResultMessage(toolResultMessage, emit);
messages.push(toolResultMessage);
}
|
1
2
3
4
| return {
messages,
terminate: shouldTerminateToolBatch(orderedFinalizedCalls),
};
|
1
| async function prepareToolCall(...): Promise<PreparedToolCall | ImmediateToolCallOutcome>
|
4 步检查管道:
步骤 1:查找工具定义#
1
2
3
4
5
6
7
8
| const tool = currentContext.tools?.find((t) => t.name === toolCall.name);
if (!tool) {
return {
kind: "immediate",
result: createErrorToolResult(`Tool ${toolCall.name} not found`),
isError: true,
};
}
|
步骤 2:参数兼容#
1
| const preparedToolCall = prepareToolCallArguments(tool, toolCall);
|
- 若工具定义了
prepareArguments,允许对 LLM 生成的参数做运行时转换 - 典型用途:字符串 ID → 对象引用、格式转换
步骤 3:Schema 校验#
1
| const validatedArgs = validateToolArguments(tool, preparedToolCall);
|
- 由
@earendil-works/pi-ai 的 validateToolArguments 根据工具的 JSON Schema 校验 - 校验失败会抛异常,被外层的
try/catch 捕获
1
2
3
4
5
6
7
8
9
10
| if (config.beforeToolCall) {
const beforeResult = await config.beforeToolCall({ assistantMessage, toolCall, args: validatedArgs, context }, signal);
if (beforeResult?.block) {
return {
kind: "immediate",
result: createErrorToolResult(beforeResult.reason || "Tool execution was blocked"),
isError: true,
};
}
}
|
- 钩子可阻止工具执行(返回
{ block: true }) - 应用场景:权限检查、限流、内容审核
整个管道的异常兜底#
1
2
3
4
5
6
7
| } catch (error) {
return {
kind: "immediate",
result: createErrorToolResult(error instanceof Error ? error.message : String(error)),
isError: true,
};
}
|
- 任何步骤抛异常 → catch → 返回 error tool result
- 这是第一层容错:工具调用前置阶段的异常不会扩散到主循环
1
2
3
4
5
| async function executePreparedToolCall(
prepared: PreparedToolCall,
signal: AbortSignal | undefined,
emit: AgentEventSink,
): Promise<ExecutedToolCallOutcome>
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
| const updateEvents: Promise<void>[] = [];
try {
const result = await prepared.tool.execute(
prepared.toolCall.id,
prepared.args as never,
signal,
(partialResult) => {
updateEvents.push(
Promise.resolve(
emit({ type: "tool_execution_update", ... })
),
);
},
);
await Promise.all(updateEvents);
return { result, isError: false };
} catch (error) {
await Promise.all(updateEvents);
return {
result: createErrorToolResult(error instanceof Error ? error.message : String(error)),
isError: true,
};
}
|
设计要点:
| 特性 | 说明 |
|---|
signal | 传入 AbortSignal,工具应监听取消信号 |
onProgress | 工具可多次调用来发射 tool_execution_update 事件 |
updateEvents | 收集所有 update 发射的 Promise,执行完后统一 await |
| 异常处 | catch 后生成 error tool result,不崩溃 |
1
| async function finalizeExecutedToolCall(...): Promise<FinalizedToolCallOutcome>
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
| let result = executed.result;
let isError = executed.isError;
if (config.afterToolCall) {
try {
const afterResult = await config.afterToolCall({ assistantMessage, toolCall, args, result, isError, context }, signal);
if (afterResult) {
result = {
content: afterResult.content ?? result.content,
details: afterResult.details ?? result.details,
terminate: afterResult.terminate ?? result.terminate,
};
isError = afterResult.isError ?? isError;
}
} catch (error) {
result = createErrorToolResult(error instanceof Error ? error.message : String(error));
isError = true;
}
}
return { toolCall: prepared.toolCall, result, isError };
|
afterToolCall 钩子可覆盖的字段:
| 字段 | 覆盖方式 | 用途 |
|---|
content | afterResult.content ?? result.content | 修改返回内容(脱敏、格式化) |
details | afterResult.details ?? result.details | 追加额外信息 |
terminate | afterResult.terminate ?? result.terminate | 强制终止或解除终止 |
isError | afterResult.isError ?? isError | 标记为错误 |
第二层容错: 钩子自身抛异常 → catch 后覆盖为 error tool result(防止钩子崩溃波及主流程)。
辅助函数#
1
2
3
| function shouldTerminateToolBatch(finalizedCalls: FinalizedToolCallOutcome[]): boolean {
return finalizedCalls.length > 0 && finalizedCalls.every((finalized) => finalized.result.terminate === true);
}
|
- 仅当所有工具都要求终止时才返回 true
- 空数组 → false(没有 tool call 时不应终止)
- 设计选择:防止单个工具意外终止整个 agent
1
2
3
4
5
6
| function prepareToolCallArguments(tool: AgentTool<any>, toolCall: AgentToolCall): AgentToolCall {
if (!tool.prepareArguments) return toolCall;
const preparedArguments = tool.prepareArguments(toolCall.arguments);
if (preparedArguments === toolCall.arguments) return toolCall;
return { ...toolCall, arguments: preparedArguments as Record<string, any> };
}
|
- 若工具定义了
prepareArguments,对参数做兼容转换 === 引用比较:若返回值与输入相同,跳过属性复制(性能优化)
1
2
3
| function createErrorToolResult(message: string): AgentToolResult<any> {
return { content: [{ type: "text", text: message }], details: {} };
}
|
- 统一错误格式:纯文本 content + 空 details
1
2
3
| async function emitToolExecutionEnd(finalized: FinalizedToolCallOutcome, emit: AgentEventSink): Promise<void> {
await emit({ type: "tool_execution_end", toolCallId, toolName, result, isError });
}
|
1
2
3
4
5
6
7
8
9
10
11
| function createToolResultMessage(finalized: FinalizedToolCallOutcome): ToolResultMessage {
return {
role: "toolResult",
toolCallId: finalized.toolCall.id,
toolName: finalized.toolCall.name,
content: finalized.result.content,
details: finalized.result.details,
isError: finalized.isError,
timestamp: Date.now(),
};
}
|
1
2
3
4
| async function emitToolResultMessage(toolResultMessage: ToolResultMessage, emit: AgentEventSink): Promise<void> {
await emit({ type: "message_start", message: toolResultMessage });
await emit({ type: "message_end", message: toolResultMessage });
}
|
- tool result 以完整的 message_start/end 事件序列发射
agent.ts 逐行解读#
类型与导入#
1
2
3
4
5
6
7
8
9
10
11
| import {
type ImageContent, type Message, type Model,
type SimpleStreamOptions, streamSimple,
type TextContent, type ThinkingBudgets, type Transport,
} from "@earendil-works/pi-ai";
import { runAgentLoop, runAgentLoopContinue } from "./agent-loop.js";
import type {
AfterToolCallContext, AfterToolCallResult,
AgentContext, AgentEvent, AgentLoopConfig, AgentMessage, AgentState, AgentTool,
BeforeToolCallContext, BeforeToolCallResult, StreamFn, ToolExecutionMode,
} from "./types.js";
|
关键导入: runAgentLoop 和 runAgentLoopContinue 是 agent-loop.ts 的两个核心函数,agent.ts 将它们封装为有状态的 API。
defaultConvertToLlm() — 默认消息转换#
1
2
3
4
5
| function defaultConvertToLlm(messages: AgentMessage[]): Message[] {
return messages.filter(
(message) => message.role === "user" || message.role === "assistant" || message.role === "toolResult",
);
}
|
- 只保留 LLM 能识别的三种角色
- 过滤掉自定义消息(artifact、notification 等)
1
2
| const EMPTY_USAGE = { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, totalTokens: 0, cost: { ... } };
const DEFAULT_MODEL = { id: "unknown", name: "unknown", api: "unknown", ... };
|
EMPTY_USAGE:用于失败消息的空用量(避免 undefined)DEFAULT_MODEL:fallback 模型(用户未指定 initialState.model 时使用)
MutableAgentState 与 createMutableAgentState()#
1
2
3
4
5
6
| type MutableAgentState = Omit<AgentState, "isStreaming" | "streamingMessage" | "pendingToolCalls" | "errorMessage"> & {
isStreaming: boolean;
streamingMessage?: AgentMessage;
pendingToolCalls: Set<string>;
errorMessage?: string;
};
|
AgentState vs MutableAgentState:
| 字段 | AgentState(对外只读) | MutableAgentState(内部可写) |
|---|
isStreaming | readonly boolean | boolean |
streamingMessage | readonly | 可写 |
pendingToolCalls | ReadonlySet<string> | Set<string> |
errorMessage | readonly | 可写 |
Getter/Setter 技巧:
1
2
3
4
5
6
7
| return {
get tools() { return tools; },
set tools(nextTools: AgentTool<any>[]) { tools = nextTools.slice(); },
get messages() { return messages; },
set messages(nextMessages: AgentMessage[]) { messages = nextMessages.slice(); },
// ...
};
|
getter 返回内部数组引用setter 自动 slice() 拷贝,防止外部通过 .push() 等操作篡改内部状态pendingToolCalls 在 processEvents 中每次更新都新建 Set
PendingMessageQueue#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
| class PendingMessageQueue {
private messages: AgentMessage[] = [];
constructor(public mode: QueueMode) {}
enqueue(message: AgentMessage): void {
this.messages.push(message);
}
hasItems(): boolean { return this.messages.length > 0; }
drain(): AgentMessage[] {
if (this.mode === "all") {
const drained = this.messages.slice();
this.messages = [];
return drained; // 返回全部并清空
}
const first = this.messages[0];
this.messages = this.messages.slice(1);
return [first]; // 只返回第一条
}
clear(): void { this.messages = []; }
}
|
QueueMode 控制 drain 行为:
| 模式 | drain | 场景 |
|---|
"one-at-a-time" | 返回第一条,队列保留其余 | 用户快速打字,逐条处理 |
"all" | 返回全部并清空 | 批量注入,合并处理 |
设计要点:
clear() 方法用于 reset()hasItems() 被 hasQueuedMessages() 调用drain() 不阻塞——队列为空时返回 []
ActiveRun 类型#
1
2
3
4
5
| type ActiveRun = {
promise: Promise<void>;
resolve: () => void;
abortController: AbortController;
};
|
| 字段 | 用途 |
|---|
promise | waitForIdle() 返回此 Promise |
resolve | finishRun() 中调用,兑现 promise |
abortController | abort() 时调用 .abort() |
Agent 类 — 构造器#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
| export class Agent {
private _state: MutableAgentState;
private readonly listeners = new Set<(event: AgentEvent, signal: AbortSignal) => Promise<void> | void>();
private readonly steeringQueue: PendingMessageQueue;
private readonly followUpQueue: PendingMessageQueue;
// 公共可配置属性
public convertToLlm: (messages: AgentMessage[]) => Message[] | Promise<Message[]>;
public transformContext?: ...
public streamFn: StreamFn;
public getApiKey?: ...
public onPayload?: ...
public onResponse?: ...
public beforeToolCall?: ...
public afterToolCall?: ...
private activeRun?: ActiveRun;
public sessionId?: string;
public thinkingBudgets?: ThinkingBudgets;
public transport: Transport;
public maxRetryDelayMs?: number;
public toolExecution: ToolExecutionMode;
|
构造器初始化:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
| constructor(options: AgentOptions = {}) {
this._state = createMutableAgentState(options.initialState);
this.convertToLlm = options.convertToLlm ?? defaultConvertToLlm;
this.transformContext = options.transformContext;
this.streamFn = options.streamFn ?? streamSimple;
this.getApiKey = options.getApiKey;
this.onPayload = options.onPayload;
this.onResponse = options.onResponse;
this.beforeToolCall = options.beforeToolCall;
this.afterToolCall = options.afterToolCall;
this.steeringQueue = new PendingMessageQueue(options.steeringMode ?? "one-at-a-time");
this.followUpQueue = new PendingMessageQueue(options.followUpMode ?? "one-at-a-time");
this.sessionId = options.sessionId;
this.thinkingBudgets = options.thinkingBudgets;
this.transport = options.transport ?? "auto";
this.maxRetryDelayMs = options.maxRetryDelayMs;
this.toolExecution = options.toolExecution ?? "parallel";
}
|
默认值策略:
| 选项 | 默认值 | 说明 |
|---|
convertToLlm | defaultConvertToLlm | 过滤非标准角色 |
streamFn | streamSimple | 直接调用 LLM |
steeringMode | "one-at-a-time" | 逐条处理 steering 消息 |
followUpMode | "one-at-a-time" | 逐条处理 follow-up |
transport | "auto" | 自动选择传输方式 |
toolExecution | "parallel" | 默认并行执行工具 |
subscribe() — 事件订阅#
1
2
3
4
| subscribe(listener: (event: AgentEvent, signal: AbortSignal) => Promise<void> | void): () => void {
this.listeners.add(listener);
return () => this.listeners.delete(listener);
}
|
- 返回解注册函数
- 内部
processEvents 中按订阅顺序 await 每个 listener signal 参数允许 listener 响应取消
state — 状态访问器#
1
2
3
| get state(): AgentState {
return this._state;
}
|
- 返回
MutableAgentState,但 TypeScript 类型为 AgentState(只读视图) - 外部不能直接修改
isStreaming、streamingMessage 等只读字段
steer() / followUp() — 消息队列#
1
2
| steer(message: AgentMessage): void { this.steeringQueue.enqueue(message); }
followUp(message: AgentMessage): void { this.followUpQueue.enqueue(message); }
|
注入时机差异:
| 方法 | 入队 | 被消费的时机 |
|---|
steer() | steeringQueue | runLoop 内层每次迭代前(getSteeringMessages) |
followUp() | followUpQueue | runLoop 内层结束后、agent 停止前(getFollowUpMessages) |
应用场景:
steer() → 用户打字"等一下、我换个问题",agent 在当前上下文注入新消息,立即处理followUp() → 用户等 agent 完成后说"还有一件事",agent 做新一轮处理
prompt() — 发起对话#
1
2
| async prompt(message: AgentMessage | AgentMessage[]): Promise<void>;
async prompt(input: string, images?: ImageContent[]): Promise<void>;
|
重载实现:
1
2
3
4
5
6
7
| async prompt(input: string | AgentMessage | AgentMessage[], images?: ImageContent[]): Promise<void> {
if (this.activeRun) {
throw new Error("Agent is already processing a prompt. Use steer() or followUp() to queue messages.");
}
const messages = this.normalizePromptInput(input, images);
await this.runPromptMessages(messages);
}
|
- 并发保护: activeRun 存在时抛异常,不允许并发 prompt
- normalizePromptInput 统一输入格式
1
2
3
4
5
6
7
| private normalizePromptInput(input, images?): AgentMessage[] {
if (Array.isArray(input)) return input;
if (typeof input !== "string") return [input];
const content: Array<TextContent | ImageContent> = [{ type: "text", text: input }];
if (images && images.length > 0) content.push(...images);
return [{ role: "user", content, timestamp: Date.now() }];
}
|
| 输入 | 输出 |
|---|
"Hello" | [{ role: "user", content: [{ type: "text", text: "Hello" }], timestamp }] |
"Hello" + images | 同上,content 追加 image 数组 |
AgentMessage | [message] |
AgentMessage[] | 透传 |
continue() — 继续对话#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
| async continue(): Promise<void> {
if (this.activeRun) {
throw new Error("Agent is already processing. Wait for completion before continuing.");
}
const lastMessage = this._state.messages[this._state.messages.length - 1];
if (!lastMessage) throw new Error("No messages to continue from");
if (lastMessage.role === "assistant") {
// 最后是 assistant → 尝试从队列取消息
const queuedSteering = this.steeringQueue.drain();
if (queuedSteering.length > 0) {
await this.runPromptMessages(queuedSteering, { skipInitialSteeringPoll: true });
return;
}
const queuedFollowUps = this.followUpQueue.drain();
if (queuedFollowUps.length > 0) {
await this.runPromptMessages(queuedFollowUps);
return;
}
throw new Error("Cannot continue from message role: assistant");
}
await this.runContinuation();
}
|
分支逻辑:
continue()
→ activeRun 存在?→ throw
→ 无消息?→ throw
→ 最后是 assistant?
→ steeringQueue 有?→ drain → runPromptMessages(skipInitialSteeringPoll=true)
→ followUpQueue 有?→ drain → runPromptMessages
→ 都没有?→ throw
→ 最后是 user/toolResult?
→ runContinuation() → runAgentLoopContinue()
设计意图: 避免"最后是 assistant"时直接调 LLM 导致重复响应。只有当我们有新的 user/toolResult 消息时才应该触发新一轮 LLM 调用。
abort() / waitForIdle() / reset() — 生命周期控制#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
| abort(): void {
this.activeRun?.abortController.abort();
}
waitForIdle(): Promise<void> {
return this.activeRun?.promise ?? Promise.resolve();
}
reset(): void {
this._state.messages = [];
this._state.isStreaming = false;
this._state.streamingMessage = undefined;
this._state.pendingToolCalls = new Set<string>();
this._state.errorMessage = undefined;
this.clearFollowUpQueue();
this.clearSteeringQueue();
}
|
| 方法 | 行为 |
|---|
abort() | 触发 AbortController → LLM 和工具执行收到取消信号 |
waitForIdle() | 返回 Promise,在 finishRun() 中 resolve |
reset() | 清空 transcript 和运行时状态 |
私有方法:runPromptMessages() / runContinuation()#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
| private async runPromptMessages(
messages: AgentMessage[],
options: { skipInitialSteeringPoll?: boolean } = {},
): Promise<void> {
await this.runWithLifecycle(async (signal) => {
await runAgentLoop(
messages,
this.createContextSnapshot(),
this.createLoopConfig(options),
(event) => this.processEvents(event),
signal,
this.streamFn,
);
});
}
private async runContinuation(): Promise<void> {
await this.runWithLifecycle(async (signal) => {
await runAgentLoopContinue(
this.createContextSnapshot(),
this.createLoopConfig(),
(event) => this.processEvents(event),
signal,
this.streamFn,
);
});
}
|
| 方法 | 调用引擎函数 | 调用者 |
|---|
runPromptMessages() | runAgentLoop() | prompt()、continue()(有队列消息时) |
runContinuation() | runAgentLoopContinue() | continue()(最后一条非 assistant) |
options.skipInitialSteeringPoll: 当 continue() 从 steeringQueue drain 消息后调用 runPromptMessages 时,跳过首次 steering 轮询,避免刚 drain 的消息又被 agent-loop 取走。
createContextSnapshot() — 上下文快照#
1
2
3
4
5
6
7
| private createContextSnapshot(): AgentContext {
return {
systemPrompt: this._state.systemPrompt,
messages: this._state.messages.slice(),
tools: this._state.tools.slice(),
};
}
|
slice() 浅拷贝,防止引擎层篡改 _state 的引用- 每次 run 创建新快照,保证多个 run 之间的隔离
createLoopConfig() — 配置适配#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
| private createLoopConfig(options: { skipInitialSteeringPoll?: boolean } = {}): AgentLoopConfig {
let skipInitialSteeringPoll = options.skipInitialSteeringPoll === true;
return {
model: this._state.model,
reasoning: this._state.thinkingLevel === "off" ? undefined : this._state.thinkingLevel,
sessionId: this.sessionId,
onPayload: this.onPayload,
onResponse: this.onResponse,
transport: this.transport,
thinkingBudgets: this.thinkingBudgets,
maxRetryDelayMs: this.maxRetryDelayMs,
toolExecution: this.toolExecution,
beforeToolCall: this.beforeToolCall,
afterToolCall: this.afterToolCall,
convertToLlm: this.convertToLlm,
transformContext: this.transformContext,
getApiKey: this.getApiKey,
getSteeringMessages: async () => {
if (skipInitialSteeringPoll) {
skipInitialSteeringPoll = false;
return [];
}
return this.steeringQueue.drain();
},
getFollowUpMessages: async () => this.followUpQueue.drain(),
};
}
|
关键转换:
| Agent 属性 | AgentLoopConfig 字段 |
|---|
steeringQueue | 包装为 getSteeringMessages 回调 |
followUpQueue | 包装为 getFollowUpMessages 回调 |
thinkingLevel | 转换为 reasoning("off" → undefined) |
beforeToolCall | 透传 |
afterToolCall | 透传 |
convertToLlm | 透传 |
runWithLifecycle() — 并发控制#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
| private async runWithLifecycle(executor: (signal: AbortSignal) => Promise<void>): Promise<void> {
if (this.activeRun) {
throw new Error("Agent is already processing.");
}
const abortController = new AbortController();
let resolvePromise = () => {};
const promise = new Promise<void>((resolve) => { resolvePromise = resolve; });
this.activeRun = { promise, resolve: resolvePromise, abortController };
this._state.isStreaming = true;
this._state.streamingMessage = undefined;
this._state.errorMessage = undefined;
try {
await executor(abortController.signal);
} catch (error) {
await this.handleRunFailure(error, abortController.signal.aborted);
} finally {
this.finishRun();
}
}
|
完整生命周期:
runWithLifecycle(executor)
│
├─ activeRun 存在?→ throw(并发保护)
│
├─ 创建 AbortController + Promise
├─ activeRun = { promise, resolve, abortController }
├─ isStreaming = true
│
├─ try { executor(signal) }
│ ├─ 成功 → finishRun()
│ └─ 失败 → handleRunFailure(error, aborted) → finishRun()
│
└─ finally { finishRun() } ← 确保始终执行
handleRunFailure() — 失败处理#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
| private async handleRunFailure(error: unknown, aborted: boolean): Promise<void> {
const failureMessage = {
role: "assistant",
content: [{ type: "text", text: "" }],
api: this._state.model.api,
provider: this._state.model.provider,
model: this._state.model.id,
usage: EMPTY_USAGE,
stopReason: aborted ? "aborted" : "error",
errorMessage: error instanceof Error ? error.message : String(error),
timestamp: Date.now(),
} satisfies AgentMessage;
this._state.messages.push(failureMessage);
this._state.errorMessage = failureMessage.errorMessage;
await this.processEvents({ type: "agent_end", messages: [failureMessage] });
}
|
将错误编码为正常消息:
| 场景 | stopReason | 说明 |
|---|
外部调用 abort() | "aborted" | 用户主动取消 |
| LLM 调用抛错 | "error" | 网络故障、API 错误 |
| 工具执行抛错 | "error" | 工具内部异常 |
transformContext 抛错 | "error" | 预处理失败 |
convertToLlm 抛错 | "error" | 消息转换失败 |
哲学:永不崩溃,所有失败都编码为事件。
finishRun() — 运行终结#
1
2
3
4
5
6
7
| private finishRun(): void {
this._state.isStreaming = false;
this._state.streamingMessage = undefined;
this._state.pendingToolCalls = new Set<string>();
this.activeRun?.resolve();
this.activeRun = undefined;
}
|
5 步清理:
| 语句 | 效果 |
|---|
isStreaming = false | 外部可感知 run 已结束 |
streamingMessage = undefined | 清除流式占位 |
pendingToolCalls = new Set() | 清除工具跟踪 |
activeRun.resolve() | waitForIdle() Promise 兑现 |
activeRun = undefined | 允许下一次 prompt() / continue() |
processEvents() — 事件归约#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
| private async processEvents(event: AgentEvent): Promise<void> {
switch (event.type) {
case "message_start":
this._state.streamingMessage = event.message;
break;
case "message_update":
this._state.streamingMessage = event.message;
break;
case "message_end":
this._state.streamingMessage = undefined;
this._state.messages.push(event.message);
break;
case "tool_execution_start": {
const pendingToolCalls = new Set(this._state.pendingToolCalls);
pendingToolCalls.add(event.toolCallId);
this._state.pendingToolCalls = pendingToolCalls;
break;
}
case "tool_execution_end": {
const pendingToolCalls = new Set(this._state.pendingToolCalls);
pendingToolCalls.delete(event.toolCallId);
this._state.pendingToolCalls = pendingToolCalls;
break;
}
case "turn_end":
if (event.message.role === "assistant" && event.message.errorMessage) {
this._state.errorMessage = event.message.errorMessage;
}
break;
case "agent_end":
this._state.streamingMessage = undefined;
break;
}
const signal = this.activeRun?.abortController.signal;
if (!signal) throw new Error("Agent listener invoked outside active run");
for (const listener of this.listeners) {
await listener(event, signal);
}
}
|
状态转换表:
| 事件类型 | streamingMessage | messages | pendingToolCalls | errorMessage |
|---|
message_start | 设置为 event.message | — | — | — |
message_update | 更新为最新 | — | — | — |
message_end | undefined | push event.message | — | — |
tool_execution_start | — | — | 添加 toolCallId | — |
tool_execution_end | — | — | 删除 toolCallId | — |
turn_end | — | — | — | 若有 error 则更新 |
agent_end | undefined | — | — | — |
不可变性策略: pendingToolCalls 每次更新都新建 Set,对外暴露 ReadonlySet。
1
2
3
4
5
6
| // 过程:每次更新都新建 Set
const pendingToolCalls = new Set(this._state.pendingToolCalls);
pendingToolCalls.add(event.toolCallId);
this._state.pendingToolCalls = pendingToolCalls;
// 效果:外部持有的旧引用不会被篡改
|
设计模式总结#
1. 分层架构(Layered Architecture)#
应用层
↓ prompt() / subscribe()
Agent 类(有状态管理层)
↓ runAgentLoop() / emit()
无状态引擎层(agent-loop.ts)
↓ streamSimple()
LLM 核心库(pi-ai)
2. 依赖倒置(Dependency Inversion)#
- 引擎层定义
AgentEventSink 接口 - 管理层实现该接口(
processEvents) - 引擎层不依赖管理层
3. 事件溯源(Event Sourcing)#
processEvents 类似 Redux reducer- 每次事件驱动状态变更,状态变更可追溯
AgentEvent 是唯一的日志来源
- Phase 1:串行 prepare(含 beforeToolCall 钩子)
- Phase 2:并发 execute(Promise.all)
- 兼顾钩子顺序可预测性和执行性能
5. 错误边界(Error Boundaries)#
| 层级 | 错误处理 |
|---|
| LLM 调用层 | stopReason: "error" |
| 工具 prepare 层 | catch → immediate error tool result |
| 工具 execute 层 | catch → error tool result |
| afterToolCall 钩子 | catch → error tool result |
| Agent 管理层 | handleRunFailure → 失败消息 |
6. 不可变状态(Immutable State)#
pendingToolCalls 每次更新新建 Setmessages 通过 setter 自动 slicecontext 和 newMessages 通过引用传递(引擎层无需拷贝,因为引擎层是纯函数)