Pi Agent 包源码逐行解读

本文对 packages/agent/src/ 下的 agent-loop.ts(~700 行)和 agent.ts(~540 行)做逐行级的代码解读,涵盖每个函数的设计意图、类型系统、控制流和错误处理策略。


目录

  1. 文件概览
  2. agent-loop.ts 逐行解读
  3. agent.ts 逐行解读
  4. 设计模式总结

文件概览

packages/agent/src/
├── agent-loop.ts    # 无状态循环引擎(~700 行,纯函数)
├── agent.ts         # 有状态封装层(~540 行,Agent 类)
├── types.ts         # 类型定义(~400 行)
├── proxy.ts         # HTTP 代理(~320 行)
├── index.ts         # 导出入口(~8 行)
├── agent-learn.ts   # 学习相关
├── agent-loop-learn.ts
└── README.md

核心分界线:

层面文件有无状态职责
引擎层agent-loop.ts无状态(纯函数)双层循环、LLM 调用、工具执行、事件发射
管理层agent.ts有状态(Agent 类)状态管理、并发控制、消息队列、事件分发

pi-ai 核心类型体系

agent-loop.tsagent.ts 大量依赖 @earendil-works/pi-ai(位于 packages/ai/)的核心类型。这些类型构成了 LLM 调用、流式事件、消息格式和工具系统的基石。


Message — 统一消息类型

1
type Message = UserMessage | AssistantMessage | ToolResultMessage;

三种消息角色构成了 LLM 对话的基本单元:

UserMessage

1
2
3
4
5
interface UserMessage {
    role: "user";
    content: string | (TextContent | ImageContent)[];
    timestamp: number;
}
字段说明
content字符串(自动转为 text content)或 content 数组(支持文本+图片多模态)
timestampUnix 毫秒时间戳,用于排序和上下文管理

AssistantMessage

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
interface AssistantMessage {
    role: "assistant";
    content: (TextContent | ThinkingContent | ToolCall)[];
    api: Api;
    provider: Provider;
    model: string;
    responseModel?: string;
    responseId?: string;
    diagnostics?: AssistantMessageDiagnostic[];
    usage: Usage;
    stopReason: StopReason;
    errorMessage?: string;
    timestamp: number;
}
字段说明
content可混合文本、思考过程和工具调用
api / provider / model标识响应的来源
responseModel实际处理请求的模型(如 OpenRouter 路由后可能与请求模型不同)
responseIdProvider 返回的响应标识符
usageToken 用量(含 cost 估算)
stopReason终止原因:"stop" / "length" / "toolUse" / "error" / "aborted"
errorMessage仅在 stopReason 为 error/aborted 时存在

stopReason 语义:

含义
"stop"LLM 正常结束
"length"达到 maxTokens 截断
"toolUse"LLM 要求调用工具
"error"LLM 调用失败或被 engine catch 到异常后转换
"aborted"外部调用 abortController.abort()

ToolResultMessage

1
2
3
4
5
6
7
8
9
interface ToolResultMessage<TDetails = any> {
    role: "toolResult";
    toolCallId: string;
    toolName: string;
    content: (TextContent | ImageContent)[];
    details?: TDetails;
    isError: boolean;
    timestamp: number;
}
字段说明
toolCallId与 AssistantMessage 中的 ToolCall.id 对应
toolName工具名称,用于路由和日志
isError工具执行是否出错

Content 类型

TextContent

1
interface TextContent { type: "text"; text: string; textSignature?: string; }

ThinkingContent

1
2
3
4
5
6
interface ThinkingContent {
    type: "thinking";
    thinking: string;
    thinkingSignature?: string;
    redacted?: boolean;  // 安全过滤导致内容被隐去
}

ImageContent

1
interface ImageContent { type: "image"; data: string; mimeType: string; }

ToolCall

1
2
3
4
5
6
7
interface ToolCall {
    type: "toolCall";
    id: string;
    name: string;
    arguments: Record<string, any>;
    thoughtSignature?: string;
}

Context — LLM 上下文

1
2
3
4
5
interface Context {
    systemPrompt?: string;
    messages: Message[];
    tools?: Tool[];
}
字段说明
systemPrompt可选,系统提示词
messages对话历史(Message[],即 UserMessage / AssistantMessage / ToolResultMessage)
tools可选,工具定义

Agent 上下文 vs LLM 上下文的区别:

AgentContext(Agent 层)                 Context(LLM 层)
systemPrompt: string                     systemPrompt?: string
messages: AgentMessage[]  --convertToLlm--> messages: Message[]
tools: AgentTool[]                        tools?: Tool[]

AgentMessage 是 Agent 层的扩展消息(支持自定义角色),而 Message 是 LLM 层的标准消息。


Tool — 工具定义

1
2
3
4
5
interface Tool<TParameters extends TSchema = TSchema> {
    name: string;
    description: string;
    parameters: TParameters;  // JSON Schema (TypeBox)
}

使用 TypeBox 的 TSchema 定义参数类型,支持 JSON Schema 校验。


Model — 模型定义

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
interface Model<TApi extends Api> {
    id: string;
    name: string;
    api: TApi;
    provider: Provider;
    baseUrl: string;
    reasoning: boolean;
    thinkingLevelMap?: ThinkingLevelMap;
    input: ("text" | "image")[];
    cost: { input: number; output: number; cacheRead: number; cacheWrite: number; };
    contextWindow: number;
    maxTokens: number;
    compat?: OpenAICompletionsCompat | OpenAIResponsesCompat | AnthropicMessagesCompat;
}
字段说明
api已知 API 类型(如 "openai-completions", "anthropic-messages" 等 30+ 种)
provider已知 Provider 名称(如 "openai", "anthropic", "bedrock" 等 30+ 种)
reasoning是否支持推理
cost每百万 token 的价格(用于 cost 估算)
compatAPI 兼容性覆盖

AssistantMessageEvent — 流式事件协议

共 13 种子事件:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
type AssistantMessageEvent =
    | { type: "start"; partial }
    | { type: "text_start"; contentIndex; partial }
    | { type: "text_delta"; contentIndex; delta; partial }
    | { type: "text_end"; contentIndex; content; partial }
    | { type: "thinking_start"; contentIndex; partial }
    | { type: "thinking_delta"; contentIndex; delta; partial }
    | { type: "thinking_end"; contentIndex; content; partial }
    | { type: "toolcall_start"; contentIndex; partial }
    | { type: "toolcall_delta"; contentIndex; delta; partial }
    | { type: "toolcall_end"; contentIndex; toolCall; partial }
    | { type: "done"; reason; message }
    | { type: "error"; reason; error }

状态机:

start -> text_start -> text_delta* -> text_end -> done
       -> thinking_start -> thinking_delta* -> thinking_end ->
       -> toolcall_start -> toolcall_delta* -> toolcall_end ->
       -> error (任意时刻)

EventStream — 通用流式事件容器

1
2
3
4
5
6
7
class EventStream<T, R = T> implements AsyncIterable<T> {
    constructor(isComplete: (event: T) => boolean, extractResult: (event: T) => R) {}
    push(event: T): void
    end(result?: R): void
    [Symbol.asyncIterator](): AsyncIterator<T>
    result(): Promise<R>
}

三要素:

方法用途调用方
push(event)发射事件生产者
for await (const e of stream)消费事件消费者
result()获取最终结果调用方

内部机制:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
push(event: T): void {
    if (this.isComplete(event)) {
        this.done = true;
        this.resolveFinalResult(this.extractResult(event));
    }
    const waiter = this.waiting.shift();
    if (waiter) {
        waiter({ value: event, done: false });  // 直接交付等待者
    } else {
        this.queue.push(event);                   // 无人消费则入队
    }
}
  • 生产者-消费者模式:事件要么被 for-await 消费者立即取走,要么入队等待
  • isComplete 判定流终止,extractResult 提取最终结果

AssistantMessageEventStream

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
class AssistantMessageEventStream extends EventStream<AssistantMessageEvent, AssistantMessage> {
    constructor() {
        super(
            (event) => event.type === "done" || event.type === "error",
            (event) => {
                if (event.type === "done") return event.message;
                if (event.type === "error") return event.error;
            },
        );
    }
}

LLM 返回的流。doneerror 事件触发流结束。


StreamFunction — 流式函数签名

1
2
3
4
5
type StreamFunction = (
    model: Model,
    context: Context,
    options?: StreamOptions,
) => AssistantMessageEventStream;

契约:

  • 必须返回 AssistantMessageEventStream
  • 错误应编码在流中(error 事件),而非抛异常
  • 错误终止必须产生 stopReason: "error" | "aborted" 的 AssistantMessage

StreamOptions / SimpleStreamOptions — 流式选项

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
interface StreamOptions {
    temperature?: number;
    maxTokens?: number;
    signal?: AbortSignal;
    apiKey?: string;
    transport?: Transport;       // "sse" | "websocket" | "websocket-cached" | "auto"
    cacheRetention?: CacheRetention;
    sessionId?: string;
    onPayload?: (payload, model) => unknown;
    onResponse?: (response, model) => void;
    timeoutMs?: number;
    maxRetries?: number;
    maxRetryDelayMs?: number;
}

interface SimpleStreamOptions extends StreamOptions {
    reasoning?: ThinkingLevel;
    thinkingBudgets?: ThinkingBudgets;
}

类型关系总览

graph TB subgraph "LLM 核心库" Message["Message"] AM["AssistantMessage
stopReason / usage"] Context["Context"] Tool["Tool"] Event["AssistantMessageEvent
13 种"] Stream["EventStream"] end subgraph "Agent 包" AgentMsg["AgentMessage"] AgentCtx["AgentContext"] AgentTool["AgentTool"] AgentEvent["AgentEvent"] end Message --> AgentMsg Context -.->|convertToLlm| AgentCtx Tool --> AgentTool ## agent-loop.ts 逐行解读 ### 类型与导入 ```typescript import { type AssistantMessage, type Context, EventStream, streamSimple, type ToolResultMessage, validateToolArguments, } from "@earendil-works/pi-ai"; import type { AgentContext, AgentEvent, AgentLoopConfig, AgentMessage, AgentTool, AgentToolCall, AgentToolResult, StreamFn, } from "./types.js";
导入来源用途
AssistantMessagepi-aiLLM 返回的 assistant 消息类型
Contextpi-aiLLM 调用所需的上下文(systemPrompt + messages + tools)
EventStreampi-ai流式事件流,支持 push/end 模式
streamSimplepi-ai默认的 LLM 流式调用函数
ToolResultMessagepi-ai工具执行返回的消息类型
validateToolArgumentspi-ai工具参数的 JSON Schema 校验
AgentContext./typesAgent 层的上下文(systemPrompt + AgentMessage[] + tools)
AgentEvent./typesAgent 生命周期事件的联合类型(13 种)
AgentLoopConfig./types引擎配置(含回调钩子)
AgentMessage./typesAgent 统一消息类型(可扩展)
AgentTool./typesAgent 工具定义
AgentToolCall./typesLLM 发出的工具调用请求
AgentToolResult./types工具执行结果
StreamFn./types流式函数类型签名

设计要点: agent-loop.ts 不直接依赖 Agent 类,它通过 AgentEventSinkemit 回调)与上层通信。这是依赖倒置的体现——引擎层定义接口,管理层实现接口。


AgentEventSink — 事件接收器类型

1
export type AgentEventSink = (event: AgentEvent) => Promise<void> | void;
  • 同步或异步均可
  • 引擎层 await 每次 emit 调用,保证事件顺序
  • 这是引擎层与管理层之间的唯一通信通道

agentLoop() / agentLoopContinue() — 公开入口

1
2
3
4
5
6
7
export function agentLoop(
    prompts: AgentMessage[],
    context: AgentContext,
    config: AgentLoopConfig,
    signal?: AbortSignal,
    streamFn?: StreamFn,
): EventStream<AgentEvent, AgentMessage[]>

逐行解读:

1
const stream = createAgentStream();
  • 创建 EventStream 实例,内部实现了终止判断和结果提取
1
2
3
4
5
void runAgentLoop(
    prompts, context, config,
    async (event) => { stream.push(event); },
    signal, streamFn,
).then((messages) => { stream.end(messages); });
  • void 关键字:显式表达我们不 await 这个 Promise(由 EventStream 管理生命周期)
  • stream.push(event):每产生一个 AgentEvent,立即推入流
  • stream.end(messages):流结束时,AgentMessage[] 作为最终结果写入流
1
return stream;
  • 调用方通过 for await (const event of stream) 消费事件
  • 最终通过 stream.result() 获取 AgentMessage[]

agentLoopContinue() 的额外校验:

1
2
3
4
5
6
if (context.messages.length === 0) {
    throw new Error("Cannot continue: no messages in context");
}
if (context.messages[context.messages.length - 1].role === "assistant") {
    throw new Error("Cannot continue from message role: assistant");
}
  • 空上下文 → 无法继续
  • 最后一条是 assistant → LLM 的 context 最后一条必须是 user 或 toolResult,否则 LLM provider 会拒绝请求

runAgentLoop() / runAgentLoopContinue() — 异步底层

1
2
3
4
5
6
7
8
export async function runAgentLoop(
    prompts: AgentMessage[],
    context: AgentContext,
    config: AgentLoopConfig,
    emit: AgentEventSink,
    signal?: AbortSignal,
    streamFn?: StreamFn,
): Promise<AgentMessage[]>

逐行解读:

1
const newMessages: AgentMessage[] = [...prompts];
  • newMessages 是返回值,记录本轮新增的所有消息(prompt + assistant response + tool results)
1
2
3
4
const currentContext: AgentContext = {
    ...context,
    messages: [...context.messages, ...prompts],
};
  • 浅拷贝 context,并在末尾追加 prompt 消息
  • 注意 ...context 是浅拷贝,但 messages: [...context.messages, ...prompts] 是新数组,防止引擎层篡改外部引用
1
2
3
4
5
6
await emit({ type: "agent_start" });
await emit({ type: "turn_start" });
for (const prompt of prompts) {
    await emit({ type: "message_start", message: prompt });
    await emit({ type: "message_end", message: prompt });
}
  • 发射生命周期事件:agent_start → turn_start → 每个 prompt 的 message_start/end
  • prompt 消息的 message_start 和 message_end 连续发射(中间没有 update 事件,因为 prompt 是已确定的消息)
1
2
await runLoop(currentContext, newMessages, config, signal, emit, streamFn);
return newMessages;
  • 进入核心双层循环
  • 返回本轮新增的所有消息

runAgentLoopContinue()runAgentLoop() 的差异:

  • newMessages 初始化为 [](没有新 prompt)
  • currentContext 直接 { ...context }(不追加消息)
  • 不发射 prompt 的 message_start/end 事件

createAgentStream() — EventStream 工厂

1
2
3
4
5
6
function createAgentStream(): EventStream<AgentEvent, AgentMessage[]> {
    return new EventStream<AgentEvent, AgentMessage[]>(
        (event: AgentEvent) => event.type === "agent_end",
        (event: AgentEvent) => (event.type === "agent_end" ? event.messages : []),
    );
}
  • 第一个参数(终止判定):当 event.type === “agent_end” 时流结束
  • 第二个参数(结果提取):从 agent_end 事件中提取 messages 作为最终结果
  • 调用方通过 stream.result() 获取 AgentMessage[]

runLoop() — 核心双层循环

1
2
3
4
5
6
7
8
async function runLoop(
    currentContext: AgentContext,
    newMessages: AgentMessage[],
    config: AgentLoopConfig,
    signal: AbortSignal | undefined,
    emit: AgentEventSink,
    streamFn?: StreamFn,
): Promise<void>

这是整个 Agent 包最核心的函数,约 80 行。

初始化

1
2
let firstTurn = true;
let pendingMessages: AgentMessage[] = (await config.getSteeringMessages?.()) || [];
变量用途
firstTurn首个 turn 已在入口函数中发射了 turn_start,后续 turn 需要自行发射
pendingMessages两层循环的桥梁:内层消费,外层(follow-up)生产

外层循环

1
2
while (true) {
    let hasMoreToolCalls = true;
  • hasMoreToolCalls 每轮外层循环重置为 true(进入内层至少一次,检查 steering 消息)

内层循环条件

1
while (hasMoreToolCalls || pendingMessages.length > 0) {

内层持续的条件:

  1. hasMoreToolCalls === true:当前轮次有 tool call,且未全部 terminate
  2. pendingMessages.length > 0:steering 队列中有待处理消息

Turn 管理

1
2
3
4
5
if (!firstTurn) {
    await emit({ type: "turn_start" });
} else {
    firstTurn = false;
}
  • 首个 turn 不发射(已在入口函数中发射)
  • 后续 turn(steering 注入后、follow-up 注入后)各自发射 turn_start

处理 pending 消息

1
2
3
4
5
6
7
8
9
if (pendingMessages.length > 0) {
    for (const message of pendingMessages) {
        await emit({ type: "message_start", message });
        await emit({ type: "message_end", message });
        currentContext.messages.push(message);
        newMessages.push(message);
    }
    pendingMessages = [];
}
  • 把 steering 队列中的消息以 message_start → message_end 的事件序列注入
  • 同时追加到 currentContext(LLM 调用用)和 newMessages(返回值用)
  • 清空 pendingMessages

LLM 调用

1
2
const message = await streamAssistantResponse(currentContext, config, signal, emit, streamFn);
newMessages.push(message);
  • 返回 AssistantMessage(LLM 的完整回复)
  • 推入 newMessages

错误/中止检测

1
2
3
4
5
if (message.stopReason === "error" || message.stopReason === "aborted") {
    await emit({ type: "turn_end", message, toolResults: [] });
    await emit({ type: "agent_end", messages: newMessages });
    return;
}
  • stopReason 为 error 或 aborted → 立即终止整个 agent
  • 先发射 turn_end(空 toolResults),再发射 agent_end

工具调用检测与执行

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
const toolCalls = message.content.filter((c) => c.type === "toolCall");
const toolResults: ToolResultMessage[] = [];
hasMoreToolCalls = false;

if (toolCalls.length > 0) {
    const executedToolBatch = await executeToolCalls(currentContext, message, config, signal, emit);
    toolResults.push(...executedToolBatch.messages);
    hasMoreToolCalls = !executedToolBatch.terminate;
    for (const result of toolResults) {
        currentContext.messages.push(result);
        newMessages.push(result);
    }
}
  • hasMoreToolCalls = !executedToolBatch.terminate:当所有工具都返回 terminate: true 时,内层循环终止
  • tool result 同时推入 currentContext(LLM 下一轮上下文)和 newMessages

Turn 结束与停止判定

1
2
3
4
5
6
await emit({ type: "turn_end", message, toolResults });

if (await config.shouldStopAfterTurn?.({ message, toolResults, context: currentContext, newMessages })) {
    await emit({ type: "agent_end", messages: newMessages });
    return;
}
  • shouldStopAfterTurn 是可选回调,调用方可以自定义停止条件(如:基于消息数量、工具调用次数等)
  • 若返回 true → agent_end

获取下一轮 steering 消息

1
pendingMessages = (await config.getSteeringMessages?.()) || [];
  • 内层循环末尾拉取 steering 消息,如果有 → 继续内层

外层 follow-up 机制

1
2
3
4
5
6
const followUpMessages = (await config.getFollowUpMessages?.()) || [];
if (followUpMessages.length > 0) {
    pendingMessages = followUpMessages;
    continue;
}
break;
  • 内层结束(无更多 tool call 和 steering 消息)
  • 检查 follow-up 队列,有消息 → 设为 pending → continue 外层循环
  • 无消息 → break 退出
1
await emit({ type: "agent_end", messages: newMessages });
  • 最终发射 agent_end

streamAssistantResponse() — LLM 流式调用

1
2
3
4
5
6
7
async function streamAssistantResponse(
    context: AgentContext,
    config: AgentLoopConfig,
    signal: AbortSignal | undefined,
    emit: AgentEventSink,
    streamFn?: StreamFn,
): Promise<AssistantMessage>

第一步:上下文变换(可选)

1
2
3
4
let messages = context.messages;
if (config.transformContext) {
    messages = await config.transformContext(messages, signal);
}
  • transformContext 允许对整个消息列表做预处理
  • 典型用途:过滤、重排序、注入系统消息、合并上下文窗口

第二步:消息转换

1
const llmMessages = await config.convertToLlm(messages);
  • AgentMessage[] → Message[]:这是 Agent 世界与 LLM 世界的边界
  • 默认实现:只保留 userassistanttoolResult 角色,过滤掉自定义消息

第三步:构造 LLM Context

1
2
3
4
5
const llmContext: Context = {
    systemPrompt: context.systemPrompt,
    messages: llmMessages,
    tools: context.tools,
};

第四步:选择流函数

1
const streamFunction = streamFn || streamSimple;

第五步:解析 API Key

1
2
const resolvedApiKey =
    (config.getApiKey ? await config.getApiKey(config.model.provider) : undefined) || config.apiKey;
  • 支持动态获取 API key(如短期 OAuth token)
  • 先尝试 getApiKey 回调,fallback 到 config.apiKey
  • 短路求值:若 config.getApiKey 未定义,跳过调用

第六步:发起 LLM 调用

1
2
3
4
5
const response = await streamFunction(config.model, llmContext, {
    ...config,
    apiKey: resolvedApiKey,
    signal,
});
  • AgentLoopConfig 的所有属性透传给 LLM 核心库(model、reasoning、sessionId、onPayload、transport、thinkingBudgets 等)
  • apiKey 覆盖传入

第七步:事件循环

1
2
let partialMessage: AssistantMessage | null = null;
let addedPartial = false;
变量用途
partialMessage当前正在构建中的消息引用
addedPartial标记是否已推入占位消息到 context.messages
1
2
for await (const event of response) {
    switch (event.type) {
case “start”
1
2
3
4
5
6
case "start":
    partialMessage = event.partial;
    context.messages.push(partialMessage);
    addedPartial = true;
    await emit({ type: "message_start", message: { ...partialMessage } });
    break;
  • 将 partial 消息推入 context.messages 末尾作为占位
  • 浅拷贝后发射 message_start,防止 listener 篡改引用
case text/thinking/toolcall delta
1
2
3
4
5
6
7
8
9
case "text_delta":
case "toolcall_delta":
    // ... 等 11 种子事件类型
    if (partialMessage) {
        partialMessage = event.partial;
        context.messages[context.messages.length - 1] = partialMessage;
        await emit({ type: "message_update", assistantMessageEvent: event, message: { ...partialMessage } });
    }
    break;
  • partialMessage = event.partial:变量指向新的引用(事件对象中的更新版本)
  • context.messages[last] = partialMessage原地替换占位消息
  • 关键设计:使用引用更新,避免每次 delta 都深拷贝完整消息
case “done” / “error”
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
case "done":
case "error": {
    const finalMessage = await response.result();
    if (addedPartial) {
        context.messages[context.messages.length - 1] = finalMessage;
    } else {
        context.messages.push(finalMessage);
    }
    if (!addedPartial) {
        await emit({ type: "message_start", message: { ...finalMessage } });
    }
    await emit({ type: "message_end", message: finalMessage });
    return finalMessage;
}
  • 分支 A:addedPartial === true → 替换占位
  • 分支 B:addedPartial === false → 没有 start 事件(LLM 直接返回完整结果),推入并补发 message_start

防线:for-await 正常结束(无 done/error 事件)

1
2
3
4
5
6
7
8
9
const finalMessage = await response.result();
if (addedPartial) {
    context.messages[context.messages.length - 1] = finalMessage;
} else {
    context.messages.push(finalMessage);
    await emit({ type: "message_start", message: { ...finalMessage } });
}
await emit({ type: "message_end", message: finalMessage });
return finalMessage;
  • 与 done/error 分支逻辑相同
  • 此分支作为 for-await 循环正常结束的兜底

executeToolCalls() — 工具调度

1
2
3
async function executeToolCalls(
    currentContext, assistantMessage, config, signal, emit
): Promise<ExecutedToolCallBatch>

调度逻辑(仅 7 行):

1
const toolCalls = assistantMessage.content.filter((c) => c.type === "toolCall");
  • 从 assistant 消息中提取所有 tool call
1
2
3
const hasSequentialToolCall = toolCalls.some(
    (tc) => currentContext.tools?.find((t) => t.name === tc.name)?.executionMode === "sequential",
);
  • 检查是否有工具声明了串行模式
1
2
3
4
if (config.toolExecution === "sequential" || hasSequentialToolCall) {
    return executeToolCallsSequential(...);
}
return executeToolCallsParallel(...);
  • 全局配置或单个工具声明 → 串行;否则并行

executeToolCallsSequential() — 串行执行

1
2
3
4
5
async function executeToolCallsSequential(...): Promise<ExecutedToolCallBatch> {
    const finalizedCalls: FinalizedToolCallOutcome[] = [];
    const messages: ToolResultMessage[] = [];

    for (const toolCall of toolCalls) {

每个 tool call 的完整生命周期:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
// 1. 发射 tool_execution_start
await emit({ type: "tool_execution_start", toolCallId, toolName, args });

// 2. 前置处理
const preparation = await prepareToolCall(...);

// 3. immediate → 直接记录错误
if (preparation.kind === "immediate") {
    finalized = { toolCall, result: preparation.result, isError: preparation.isError };
} else {
    // 4. 执行
    const executed = await executePreparedToolCall(preparation, signal, emit);
    // 5. 后处理
    finalized = await finalizeExecutedToolCall(...);
}

// 6. 发射 tool_execution_end
await emitToolExecutionEnd(finalized, emit);

// 7. 构建 ToolResultMessage 并发射
const toolResultMessage = createToolResultMessage(finalized);
await emitToolResultMessage(toolResultMessage, emit);

串行的关键: for...of 确保逐个处理,一个完成才到下一个。


executeToolCallsParallel() — 并行执行

两阶段设计:

Phase 1:串行 prepare

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
for (const toolCall of toolCalls) {
    await emit({ type: "tool_execution_start", ... });

    const preparation = await prepareToolCall(...);
    if (preparation.kind === "immediate") {
        // 直接记入结果(不是 thunk)
        finalizedCalls.push(finalized);
        continue;
    }

    // 包装为 lazy thunk
    finalizedCalls.push(async () => {
        const executed = await executePreparedToolCall(preparation, signal, emit);
        const finalized = await finalizeExecutedToolCall(...);
        await emitToolExecutionEnd(finalized, emit);
        return finalized;
    });
}
kind处理方式
"immediate"(工具未找到 / 校验失败 / beforeToolCall 阻拦)直接记入 finalizedCalls 数组
"prepared"包装为 () => Promise<FinalizedToolCallOutcome> 的 thunk

FinalizedToolCallEntry 类型:

1
type FinalizedToolCallEntry = FinalizedToolCallOutcome | (() => Promise<FinalizedToolCallOutcome>);

即:要么是直接结果(immediate),要么是延迟计算的 thunk(prepared)。

Phase 2:并发 execute

1
2
3
const orderedFinalizedCalls = await Promise.all(
    finalizedCalls.map((entry) => (typeof entry === "function" ? entry() : Promise.resolve(entry))),
);
  • Promise.all() 并发执行所有 thunk
  • typeof entry === "function" 区分 direct result 和 thunk
  • orderedFinalizedCalls 保持原始 tool call 顺序(因为 finalizedCalls 的顺序与 Phase 1 遍历顺序一致)
1
2
3
4
5
6
const messages: ToolResultMessage[] = [];
for (const finalized of orderedFinalizedCalls) {
    const toolResultMessage = createToolResultMessage(finalized);
    await emitToolResultMessage(toolResultMessage, emit);
    messages.push(toolResultMessage);
}
  • 按原始顺序构建消息
1
2
3
4
return {
    messages,
    terminate: shouldTerminateToolBatch(orderedFinalizedCalls),
};

prepareToolCall() — 前置管道

1
async function prepareToolCall(...): Promise<PreparedToolCall | ImmediateToolCallOutcome>

4 步检查管道:

步骤 1:查找工具定义

1
2
3
4
5
6
7
8
const tool = currentContext.tools?.find((t) => t.name === toolCall.name);
if (!tool) {
    return {
        kind: "immediate",
        result: createErrorToolResult(`Tool ${toolCall.name} not found`),
        isError: true,
    };
}
  • 不在 tools 数组中?→ 立即返回错误

步骤 2:参数兼容

1
const preparedToolCall = prepareToolCallArguments(tool, toolCall);
  • 若工具定义了 prepareArguments,允许对 LLM 生成的参数做运行时转换
  • 典型用途:字符串 ID → 对象引用、格式转换

步骤 3:Schema 校验

1
const validatedArgs = validateToolArguments(tool, preparedToolCall);
  • @earendil-works/pi-aivalidateToolArguments 根据工具的 JSON Schema 校验
  • 校验失败会抛异常,被外层的 try/catch 捕获

步骤 4:beforeToolCall 钩子

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
if (config.beforeToolCall) {
    const beforeResult = await config.beforeToolCall({ assistantMessage, toolCall, args: validatedArgs, context }, signal);
    if (beforeResult?.block) {
        return {
            kind: "immediate",
            result: createErrorToolResult(beforeResult.reason || "Tool execution was blocked"),
            isError: true,
        };
    }
}
  • 钩子可阻止工具执行(返回 { block: true }
  • 应用场景:权限检查、限流、内容审核

整个管道的异常兜底

1
2
3
4
5
6
7
} catch (error) {
    return {
        kind: "immediate",
        result: createErrorToolResult(error instanceof Error ? error.message : String(error)),
        isError: true,
    };
}
  • 任何步骤抛异常 → catch → 返回 error tool result
  • 这是第一层容错:工具调用前置阶段的异常不会扩散到主循环

executePreparedToolCall() — 工具执行

1
2
3
4
5
async function executePreparedToolCall(
    prepared: PreparedToolCall,
    signal: AbortSignal | undefined,
    emit: AgentEventSink,
): Promise<ExecutedToolCallOutcome>
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
const updateEvents: Promise<void>[] = [];

try {
    const result = await prepared.tool.execute(
        prepared.toolCall.id,
        prepared.args as never,
        signal,
        (partialResult) => {
            updateEvents.push(
                Promise.resolve(
                    emit({ type: "tool_execution_update", ... })
                ),
            );
        },
    );
    await Promise.all(updateEvents);
    return { result, isError: false };
} catch (error) {
    await Promise.all(updateEvents);
    return {
        result: createErrorToolResult(error instanceof Error ? error.message : String(error)),
        isError: true,
    };
}

设计要点:

特性说明
signal传入 AbortSignal,工具应监听取消信号
onProgress工具可多次调用来发射 tool_execution_update 事件
updateEvents收集所有 update 发射的 Promise,执行完后统一 await
异常处catch 后生成 error tool result,不崩溃

finalizeExecutedToolCall() — 后处理

1
async function finalizeExecutedToolCall(...): Promise<FinalizedToolCallOutcome>
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
let result = executed.result;
let isError = executed.isError;

if (config.afterToolCall) {
    try {
        const afterResult = await config.afterToolCall({ assistantMessage, toolCall, args, result, isError, context }, signal);
        if (afterResult) {
            result = {
                content: afterResult.content ?? result.content,
                details: afterResult.details ?? result.details,
                terminate: afterResult.terminate ?? result.terminate,
            };
            isError = afterResult.isError ?? isError;
        }
    } catch (error) {
        result = createErrorToolResult(error instanceof Error ? error.message : String(error));
        isError = true;
    }
}

return { toolCall: prepared.toolCall, result, isError };

afterToolCall 钩子可覆盖的字段:

字段覆盖方式用途
contentafterResult.content ?? result.content修改返回内容(脱敏、格式化)
detailsafterResult.details ?? result.details追加额外信息
terminateafterResult.terminate ?? result.terminate强制终止或解除终止
isErrorafterResult.isError ?? isError标记为错误

第二层容错: 钩子自身抛异常 → catch 后覆盖为 error tool result(防止钩子崩溃波及主流程)。


辅助函数

shouldTerminateToolBatch()

1
2
3
function shouldTerminateToolBatch(finalizedCalls: FinalizedToolCallOutcome[]): boolean {
    return finalizedCalls.length > 0 && finalizedCalls.every((finalized) => finalized.result.terminate === true);
}
  • 仅当所有工具都要求终止时才返回 true
  • 空数组 → false(没有 tool call 时不应终止)
  • 设计选择:防止单个工具意外终止整个 agent

prepareToolCallArguments()

1
2
3
4
5
6
function prepareToolCallArguments(tool: AgentTool<any>, toolCall: AgentToolCall): AgentToolCall {
    if (!tool.prepareArguments) return toolCall;
    const preparedArguments = tool.prepareArguments(toolCall.arguments);
    if (preparedArguments === toolCall.arguments) return toolCall;
    return { ...toolCall, arguments: preparedArguments as Record<string, any> };
}
  • 若工具定义了 prepareArguments,对参数做兼容转换
  • === 引用比较:若返回值与输入相同,跳过属性复制(性能优化)

createErrorToolResult()

1
2
3
function createErrorToolResult(message: string): AgentToolResult<any> {
    return { content: [{ type: "text", text: message }], details: {} };
}
  • 统一错误格式:纯文本 content + 空 details

emitToolExecutionEnd()

1
2
3
async function emitToolExecutionEnd(finalized: FinalizedToolCallOutcome, emit: AgentEventSink): Promise<void> {
    await emit({ type: "tool_execution_end", toolCallId, toolName, result, isError });
}

createToolResultMessage()

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
function createToolResultMessage(finalized: FinalizedToolCallOutcome): ToolResultMessage {
    return {
        role: "toolResult",
        toolCallId: finalized.toolCall.id,
        toolName: finalized.toolCall.name,
        content: finalized.result.content,
        details: finalized.result.details,
        isError: finalized.isError,
        timestamp: Date.now(),
    };
}

emitToolResultMessage()

1
2
3
4
async function emitToolResultMessage(toolResultMessage: ToolResultMessage, emit: AgentEventSink): Promise<void> {
    await emit({ type: "message_start", message: toolResultMessage });
    await emit({ type: "message_end", message: toolResultMessage });
}
  • tool result 以完整的 message_start/end 事件序列发射

agent.ts 逐行解读

类型与导入

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
import {
    type ImageContent, type Message, type Model,
    type SimpleStreamOptions, streamSimple,
    type TextContent, type ThinkingBudgets, type Transport,
} from "@earendil-works/pi-ai";
import { runAgentLoop, runAgentLoopContinue } from "./agent-loop.js";
import type {
    AfterToolCallContext, AfterToolCallResult,
    AgentContext, AgentEvent, AgentLoopConfig, AgentMessage, AgentState, AgentTool,
    BeforeToolCallContext, BeforeToolCallResult, StreamFn, ToolExecutionMode,
} from "./types.js";

关键导入: runAgentLooprunAgentLoopContinue 是 agent-loop.ts 的两个核心函数,agent.ts 将它们封装为有状态的 API。


defaultConvertToLlm() — 默认消息转换

1
2
3
4
5
function defaultConvertToLlm(messages: AgentMessage[]): Message[] {
    return messages.filter(
        (message) => message.role === "user" || message.role === "assistant" || message.role === "toolResult",
    );
}
  • 只保留 LLM 能识别的三种角色
  • 过滤掉自定义消息(artifact、notification 等)

常量

1
2
const EMPTY_USAGE = { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, totalTokens: 0, cost: { ... } };
const DEFAULT_MODEL = { id: "unknown", name: "unknown", api: "unknown", ... };
  • EMPTY_USAGE:用于失败消息的空用量(避免 undefined)
  • DEFAULT_MODEL:fallback 模型(用户未指定 initialState.model 时使用)

MutableAgentStatecreateMutableAgentState()

1
2
3
4
5
6
type MutableAgentState = Omit<AgentState, "isStreaming" | "streamingMessage" | "pendingToolCalls" | "errorMessage"> & {
    isStreaming: boolean;
    streamingMessage?: AgentMessage;
    pendingToolCalls: Set<string>;
    errorMessage?: string;
};

AgentState vs MutableAgentState

字段AgentState(对外只读)MutableAgentState(内部可写)
isStreamingreadonly booleanboolean
streamingMessagereadonly可写
pendingToolCallsReadonlySet<string>Set<string>
errorMessagereadonly可写

Getter/Setter 技巧:

1
2
3
4
5
6
7
return {
    get tools() { return tools; },
    set tools(nextTools: AgentTool<any>[]) { tools = nextTools.slice(); },
    get messages() { return messages; },
    set messages(nextMessages: AgentMessage[]) { messages = nextMessages.slice(); },
    // ...
};
  • getter 返回内部数组引用
  • setter 自动 slice() 拷贝,防止外部通过 .push() 等操作篡改内部状态
  • pendingToolCallsprocessEvents 中每次更新都新建 Set

PendingMessageQueue

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class PendingMessageQueue {
    private messages: AgentMessage[] = [];
    constructor(public mode: QueueMode) {}

    enqueue(message: AgentMessage): void {
        this.messages.push(message);
    }

    hasItems(): boolean { return this.messages.length > 0; }

    drain(): AgentMessage[] {
        if (this.mode === "all") {
            const drained = this.messages.slice();
            this.messages = [];
            return drained;     // 返回全部并清空
        }
        const first = this.messages[0];
        this.messages = this.messages.slice(1);
        return [first];          // 只返回第一条
    }

    clear(): void { this.messages = []; }
}

QueueMode 控制 drain 行为:

模式drain场景
"one-at-a-time"返回第一条,队列保留其余用户快速打字,逐条处理
"all"返回全部并清空批量注入,合并处理

设计要点:

  • clear() 方法用于 reset()
  • hasItems()hasQueuedMessages() 调用
  • drain() 不阻塞——队列为空时返回 []

ActiveRun 类型

1
2
3
4
5
type ActiveRun = {
    promise: Promise<void>;
    resolve: () => void;
    abortController: AbortController;
};
字段用途
promisewaitForIdle() 返回此 Promise
resolvefinishRun() 中调用,兑现 promise
abortControllerabort() 时调用 .abort()

Agent 类 — 构造器

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
export class Agent {
    private _state: MutableAgentState;
    private readonly listeners = new Set<(event: AgentEvent, signal: AbortSignal) => Promise<void> | void>();
    private readonly steeringQueue: PendingMessageQueue;
    private readonly followUpQueue: PendingMessageQueue;

    // 公共可配置属性
    public convertToLlm: (messages: AgentMessage[]) => Message[] | Promise<Message[]>;
    public transformContext?: ...
    public streamFn: StreamFn;
    public getApiKey?: ...
    public onPayload?: ...
    public onResponse?: ...
    public beforeToolCall?: ...
    public afterToolCall?: ...
    private activeRun?: ActiveRun;
    public sessionId?: string;
    public thinkingBudgets?: ThinkingBudgets;
    public transport: Transport;
    public maxRetryDelayMs?: number;
    public toolExecution: ToolExecutionMode;

构造器初始化:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
constructor(options: AgentOptions = {}) {
    this._state = createMutableAgentState(options.initialState);
    this.convertToLlm = options.convertToLlm ?? defaultConvertToLlm;
    this.transformContext = options.transformContext;
    this.streamFn = options.streamFn ?? streamSimple;
    this.getApiKey = options.getApiKey;
    this.onPayload = options.onPayload;
    this.onResponse = options.onResponse;
    this.beforeToolCall = options.beforeToolCall;
    this.afterToolCall = options.afterToolCall;
    this.steeringQueue = new PendingMessageQueue(options.steeringMode ?? "one-at-a-time");
    this.followUpQueue = new PendingMessageQueue(options.followUpMode ?? "one-at-a-time");
    this.sessionId = options.sessionId;
    this.thinkingBudgets = options.thinkingBudgets;
    this.transport = options.transport ?? "auto";
    this.maxRetryDelayMs = options.maxRetryDelayMs;
    this.toolExecution = options.toolExecution ?? "parallel";
}

默认值策略:

选项默认值说明
convertToLlmdefaultConvertToLlm过滤非标准角色
streamFnstreamSimple直接调用 LLM
steeringMode"one-at-a-time"逐条处理 steering 消息
followUpMode"one-at-a-time"逐条处理 follow-up
transport"auto"自动选择传输方式
toolExecution"parallel"默认并行执行工具

subscribe() — 事件订阅

1
2
3
4
subscribe(listener: (event: AgentEvent, signal: AbortSignal) => Promise<void> | void): () => void {
    this.listeners.add(listener);
    return () => this.listeners.delete(listener);
}
  • 返回解注册函数
  • 内部 processEvents 中按订阅顺序 await 每个 listener
  • signal 参数允许 listener 响应取消

state — 状态访问器

1
2
3
get state(): AgentState {
    return this._state;
}
  • 返回 MutableAgentState,但 TypeScript 类型为 AgentState(只读视图)
  • 外部不能直接修改 isStreamingstreamingMessage 等只读字段

steer() / followUp() — 消息队列

1
2
steer(message: AgentMessage): void { this.steeringQueue.enqueue(message); }
followUp(message: AgentMessage): void { this.followUpQueue.enqueue(message); }

注入时机差异:

方法入队被消费的时机
steer()steeringQueuerunLoop 内层每次迭代前(getSteeringMessages
followUp()followUpQueuerunLoop 内层结束后、agent 停止前(getFollowUpMessages

应用场景:

  • steer() → 用户打字"等一下、我换个问题",agent 在当前上下文注入新消息,立即处理
  • followUp() → 用户等 agent 完成后说"还有一件事",agent 做新一轮处理

prompt() — 发起对话

1
2
async prompt(message: AgentMessage | AgentMessage[]): Promise<void>;
async prompt(input: string, images?: ImageContent[]): Promise<void>;

重载实现:

1
2
3
4
5
6
7
async prompt(input: string | AgentMessage | AgentMessage[], images?: ImageContent[]): Promise<void> {
    if (this.activeRun) {
        throw new Error("Agent is already processing a prompt. Use steer() or followUp() to queue messages.");
    }
    const messages = this.normalizePromptInput(input, images);
    await this.runPromptMessages(messages);
}
  • 并发保护: activeRun 存在时抛异常,不允许并发 prompt
  • normalizePromptInput 统一输入格式

normalizePromptInput()

1
2
3
4
5
6
7
private normalizePromptInput(input, images?): AgentMessage[] {
    if (Array.isArray(input)) return input;
    if (typeof input !== "string") return [input];
    const content: Array<TextContent | ImageContent> = [{ type: "text", text: input }];
    if (images && images.length > 0) content.push(...images);
    return [{ role: "user", content, timestamp: Date.now() }];
}
输入输出
"Hello"[{ role: "user", content: [{ type: "text", text: "Hello" }], timestamp }]
"Hello" + images同上,content 追加 image 数组
AgentMessage[message]
AgentMessage[]透传

continue() — 继续对话

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
async continue(): Promise<void> {
    if (this.activeRun) {
        throw new Error("Agent is already processing. Wait for completion before continuing.");
    }

    const lastMessage = this._state.messages[this._state.messages.length - 1];
    if (!lastMessage) throw new Error("No messages to continue from");

    if (lastMessage.role === "assistant") {
        // 最后是 assistant → 尝试从队列取消息
        const queuedSteering = this.steeringQueue.drain();
        if (queuedSteering.length > 0) {
            await this.runPromptMessages(queuedSteering, { skipInitialSteeringPoll: true });
            return;
        }
        const queuedFollowUps = this.followUpQueue.drain();
        if (queuedFollowUps.length > 0) {
            await this.runPromptMessages(queuedFollowUps);
            return;
        }
        throw new Error("Cannot continue from message role: assistant");
    }

    await this.runContinuation();
}

分支逻辑:

continue()
  → activeRun 存在?→ throw
  → 无消息?→ throw
  → 最后是 assistant?
    → steeringQueue 有?→ drain → runPromptMessages(skipInitialSteeringPoll=true)
    → followUpQueue 有?→ drain → runPromptMessages
    → 都没有?→ throw
  → 最后是 user/toolResult?
    → runContinuation() → runAgentLoopContinue()

设计意图: 避免"最后是 assistant"时直接调 LLM 导致重复响应。只有当我们有新的 user/toolResult 消息时才应该触发新一轮 LLM 调用。


abort() / waitForIdle() / reset() — 生命周期控制

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
abort(): void {
    this.activeRun?.abortController.abort();
}

waitForIdle(): Promise<void> {
    return this.activeRun?.promise ?? Promise.resolve();
}

reset(): void {
    this._state.messages = [];
    this._state.isStreaming = false;
    this._state.streamingMessage = undefined;
    this._state.pendingToolCalls = new Set<string>();
    this._state.errorMessage = undefined;
    this.clearFollowUpQueue();
    this.clearSteeringQueue();
}
方法行为
abort()触发 AbortController → LLM 和工具执行收到取消信号
waitForIdle()返回 Promise,在 finishRun() 中 resolve
reset()清空 transcript 和运行时状态

私有方法:runPromptMessages() / runContinuation()

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private async runPromptMessages(
    messages: AgentMessage[],
    options: { skipInitialSteeringPoll?: boolean } = {},
): Promise<void> {
    await this.runWithLifecycle(async (signal) => {
        await runAgentLoop(
            messages,
            this.createContextSnapshot(),
            this.createLoopConfig(options),
            (event) => this.processEvents(event),
            signal,
            this.streamFn,
        );
    });
}

private async runContinuation(): Promise<void> {
    await this.runWithLifecycle(async (signal) => {
        await runAgentLoopContinue(
            this.createContextSnapshot(),
            this.createLoopConfig(),
            (event) => this.processEvents(event),
            signal,
            this.streamFn,
        );
    });
}
方法调用引擎函数调用者
runPromptMessages()runAgentLoop()prompt()continue()(有队列消息时)
runContinuation()runAgentLoopContinue()continue()(最后一条非 assistant)

options.skipInitialSteeringPollcontinue() 从 steeringQueue drain 消息后调用 runPromptMessages 时,跳过首次 steering 轮询,避免刚 drain 的消息又被 agent-loop 取走。


createContextSnapshot() — 上下文快照

1
2
3
4
5
6
7
private createContextSnapshot(): AgentContext {
    return {
        systemPrompt: this._state.systemPrompt,
        messages: this._state.messages.slice(),
        tools: this._state.tools.slice(),
    };
}
  • slice() 浅拷贝,防止引擎层篡改 _state 的引用
  • 每次 run 创建新快照,保证多个 run 之间的隔离

createLoopConfig() — 配置适配

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private createLoopConfig(options: { skipInitialSteeringPoll?: boolean } = {}): AgentLoopConfig {
    let skipInitialSteeringPoll = options.skipInitialSteeringPoll === true;
    return {
        model: this._state.model,
        reasoning: this._state.thinkingLevel === "off" ? undefined : this._state.thinkingLevel,
        sessionId: this.sessionId,
        onPayload: this.onPayload,
        onResponse: this.onResponse,
        transport: this.transport,
        thinkingBudgets: this.thinkingBudgets,
        maxRetryDelayMs: this.maxRetryDelayMs,
        toolExecution: this.toolExecution,
        beforeToolCall: this.beforeToolCall,
        afterToolCall: this.afterToolCall,
        convertToLlm: this.convertToLlm,
        transformContext: this.transformContext,
        getApiKey: this.getApiKey,
        getSteeringMessages: async () => {
            if (skipInitialSteeringPoll) {
                skipInitialSteeringPoll = false;
                return [];
            }
            return this.steeringQueue.drain();
        },
        getFollowUpMessages: async () => this.followUpQueue.drain(),
    };
}

关键转换:

Agent 属性AgentLoopConfig 字段
steeringQueue包装为 getSteeringMessages 回调
followUpQueue包装为 getFollowUpMessages 回调
thinkingLevel转换为 reasoning"off" → undefined)
beforeToolCall透传
afterToolCall透传
convertToLlm透传

runWithLifecycle() — 并发控制

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
private async runWithLifecycle(executor: (signal: AbortSignal) => Promise<void>): Promise<void> {
    if (this.activeRun) {
        throw new Error("Agent is already processing.");
    }

    const abortController = new AbortController();
    let resolvePromise = () => {};
    const promise = new Promise<void>((resolve) => { resolvePromise = resolve; });
    this.activeRun = { promise, resolve: resolvePromise, abortController };

    this._state.isStreaming = true;
    this._state.streamingMessage = undefined;
    this._state.errorMessage = undefined;

    try {
        await executor(abortController.signal);
    } catch (error) {
        await this.handleRunFailure(error, abortController.signal.aborted);
    } finally {
        this.finishRun();
    }
}

完整生命周期:

runWithLifecycle(executor)
  │
  ├─ activeRun 存在?→ throw(并发保护)
  │
  ├─ 创建 AbortController + Promise
  ├─ activeRun = { promise, resolve, abortController }
  ├─ isStreaming = true
  │
  ├─ try { executor(signal) }
  │   ├─ 成功 → finishRun()
  │   └─ 失败 → handleRunFailure(error, aborted) → finishRun()
  │
  └─ finally { finishRun() }  ← 确保始终执行

handleRunFailure() — 失败处理

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
private async handleRunFailure(error: unknown, aborted: boolean): Promise<void> {
    const failureMessage = {
        role: "assistant",
        content: [{ type: "text", text: "" }],
        api: this._state.model.api,
        provider: this._state.model.provider,
        model: this._state.model.id,
        usage: EMPTY_USAGE,
        stopReason: aborted ? "aborted" : "error",
        errorMessage: error instanceof Error ? error.message : String(error),
        timestamp: Date.now(),
    } satisfies AgentMessage;

    this._state.messages.push(failureMessage);
    this._state.errorMessage = failureMessage.errorMessage;
    await this.processEvents({ type: "agent_end", messages: [failureMessage] });
}

将错误编码为正常消息:

场景stopReason说明
外部调用 abort()"aborted"用户主动取消
LLM 调用抛错"error"网络故障、API 错误
工具执行抛错"error"工具内部异常
transformContext 抛错"error"预处理失败
convertToLlm 抛错"error"消息转换失败

哲学:永不崩溃,所有失败都编码为事件。


finishRun() — 运行终结

1
2
3
4
5
6
7
private finishRun(): void {
    this._state.isStreaming = false;
    this._state.streamingMessage = undefined;
    this._state.pendingToolCalls = new Set<string>();
    this.activeRun?.resolve();
    this.activeRun = undefined;
}

5 步清理:

语句效果
isStreaming = false外部可感知 run 已结束
streamingMessage = undefined清除流式占位
pendingToolCalls = new Set()清除工具跟踪
activeRun.resolve()waitForIdle() Promise 兑现
activeRun = undefined允许下一次 prompt() / continue()

processEvents() — 事件归约

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
private async processEvents(event: AgentEvent): Promise<void> {
    switch (event.type) {
        case "message_start":
            this._state.streamingMessage = event.message;
            break;

        case "message_update":
            this._state.streamingMessage = event.message;
            break;

        case "message_end":
            this._state.streamingMessage = undefined;
            this._state.messages.push(event.message);
            break;

        case "tool_execution_start": {
            const pendingToolCalls = new Set(this._state.pendingToolCalls);
            pendingToolCalls.add(event.toolCallId);
            this._state.pendingToolCalls = pendingToolCalls;
            break;
        }

        case "tool_execution_end": {
            const pendingToolCalls = new Set(this._state.pendingToolCalls);
            pendingToolCalls.delete(event.toolCallId);
            this._state.pendingToolCalls = pendingToolCalls;
            break;
        }

        case "turn_end":
            if (event.message.role === "assistant" && event.message.errorMessage) {
                this._state.errorMessage = event.message.errorMessage;
            }
            break;

        case "agent_end":
            this._state.streamingMessage = undefined;
            break;
    }

    const signal = this.activeRun?.abortController.signal;
    if (!signal) throw new Error("Agent listener invoked outside active run");

    for (const listener of this.listeners) {
        await listener(event, signal);
    }
}

状态转换表:

事件类型streamingMessagemessagespendingToolCallserrorMessage
message_start设置为 event.message
message_update更新为最新
message_endundefinedpush event.message
tool_execution_start添加 toolCallId
tool_execution_end删除 toolCallId
turn_end若有 error 则更新
agent_endundefined

不可变性策略: pendingToolCalls 每次更新都新建 Set,对外暴露 ReadonlySet

1
2
3
4
5
6
// 过程:每次更新都新建 Set
const pendingToolCalls = new Set(this._state.pendingToolCalls);
pendingToolCalls.add(event.toolCallId);
this._state.pendingToolCalls = pendingToolCalls;

// 效果:外部持有的旧引用不会被篡改

设计模式总结

1. 分层架构(Layered Architecture)

应用层
  ↓ prompt() / subscribe()
Agent 类(有状态管理层)
  ↓ runAgentLoop() / emit()
无状态引擎层(agent-loop.ts)
  ↓ streamSimple()
LLM 核心库(pi-ai)

2. 依赖倒置(Dependency Inversion)

  • 引擎层定义 AgentEventSink 接口
  • 管理层实现该接口(processEvents
  • 引擎层不依赖管理层

3. 事件溯源(Event Sourcing)

  • processEvents 类似 Redux reducer
  • 每次事件驱动状态变更,状态变更可追溯
  • AgentEvent 是唯一的日志来源

4. 两阶段执行(Parallel Tool Execution)

  • Phase 1:串行 prepare(含 beforeToolCall 钩子)
  • Phase 2:并发 execute(Promise.all)
  • 兼顾钩子顺序可预测性和执行性能

5. 错误边界(Error Boundaries)

层级错误处理
LLM 调用层stopReason: "error"
工具 prepare 层catch → immediate error tool result
工具 execute 层catch → error tool result
afterToolCall 钩子catch → error tool result
Agent 管理层handleRunFailure → 失败消息

6. 不可变状态(Immutable State)

  • pendingToolCalls 每次更新新建 Set
  • messages 通过 setter 自动 slice
  • contextnewMessages 通过引用传递(引擎层无需拷贝,因为引擎层是纯函数)