用 OCaml 的 Effect System 实现一个完整的 Agent Harness
这篇文章的价值:上一篇讲清楚了 ReAct 在 MDP 上的形式化——状态、动作、转移、奖励。这篇把视角换到工程实现:如果 ReAct 的核心循环真的只有 5 行伪代码,为什么主流 agent 框架(LangGraph、AutoGPT、Claude Code)的代码体量都是几千到几万行?答案是——核心循环本身简单,复杂度全在"循环之外":HTTP 调用、重试、断路器、成本核算、追踪、回放、并行 sub-agent、Pause/Resume、沙箱、Governor 全局预算。Python 框架把这些和循环搅在一起,于是循环不再简单。OCaml 5 的 algebraic effects 给了第二种选项:把"做什么"和"怎么做"在类型系统层面切干净——agent 代码只 perform effect,每一种 effect 由谁实现、用什么实现,全部由外层 handler 决定。这一篇会带你从一个最小的 ReAct 循环开始,逐层加上 retry、circuit breaker、tape replay、cost tracking、parallel、checkpoint、ask_user 等所有真实 Agent harness 需要的特性,看看 effect system 怎么让这一切叠起来不打架。所有代码都来自wombat——一个我自己写的、用 OCaml 5 实现的 Agent 运行时。
一、为什么是 OCaml 的 Effect System
先回顾上一篇里那段 ReAct 伪代码:
while not done:
thought = LLM(context)
action = LLM(context)
observation = execute(action)
context.append(thought, action, observation)
return LLM("总结", context)
形式上五行。但一个能上线的 agent harness 至少要回答这些问题:
- LLM 调用从哪里来?换 provider 怎么办?需要重试 / 限流 / 缓存 / 流式 / token 计数怎么办?
- Tool 执行怎么挂上沙箱?怎么做超时?怎么做幂等?怎么做断路器和 dedup?
- 整个 run 的预算(token / cost / wall time / iterations)谁来管?超了怎么干净地中止?
- 如何把"需要问用户"的瞬间从一个递归调用栈里挂起,再在下一次 HTTP 请求中恢复?
- 测试时怎么替换整个底层?把 LLM 换成预录响应、tool 换成桩、time 换成确定时钟、文件系统换成内存映射?
- 线上跑过的真实 LLM/tool 调用要不要录像(tape),方便 bug 复现和 regression 测试?
- sub-agent / planner / verifier 这些嵌套调用要不要共享预算、共享追踪、共享缓存?
主流 Python 框架对这一切的处理方式很类似:在 agent 循环里加一个 cost_tracker,在每个 tool 上挂一个 retry 装饰器,把 context 对象塞进 thread-local,给整个 run 包一层 try/except BudgetExceeded。结果是循环不再简单——你想 "看一下 agent 循环到底干了什么",得在 7 个文件里跳。
OCaml 5 的 algebraic effects 提供了一种几乎是为这件事量身定做的工具。它让你做到这样的事情:
- Agent 循环里只perform 一个 effect:"我要调一次 LLM,参数是 X"——具体由 HTTP / mock / replay / cache 哪个来响应,循环根本不知道。
- Handler 之间通过
|>像积木一样拼接,每一层都是handler -> handler的纯函数变换。 - "需要问用户"是一个
Pauseeffect,handler 可以选择带着答案恢复 continuation(自动测试模式),也可以选择丢弃 continuation 直接从最外层返回 "Waiting" 状态(聊天模式)。 - "任务结束"是一个类型多态的
'a Effect.t——handler 永远不会 resume 它,因此它不需要返回类型。这是普通 exception 做不到的,因为 exception 不能携带类型信息。
下面把这些一个个拆开。
二、Effect System 30 秒速览
对于没用过 OCaml 5 effects 的读者:
(* 声明 effect。Effect.t 是 GADT,"+=" 是扩展可扩展变体 *)
type _ Effect.t +=
| Llm_complete : llm_call_args -> llm_response Effect.t
| Log : string -> unit Effect.t
(* perform:像抛异常一样发出 effect,但是可恢复 *)
let answer = Effect.perform (Llm_complete args) in
...
(* try_with + effc:安装 handler,决定每个 effect 怎么响应 *)
try_with thunk ()
{ effc = fun (type a) (eff : a Effect.t) ->
match eff with
| Llm_complete args ->
Some (fun (k : (a, _) continuation) ->
let resp = http_call args in
continue k resp) (* 把响应塞回 perform 点 *)
| Log msg ->
Some (fun (k : (_, _) continuation) ->
print_endline msg;
continue k ())
| _ -> None (* 不处理,向外冒泡 *)
}
三件关键的事情:
- Effect 是带类型返回值的。
Llm_complete : llm_call_args -> llm_response Effect.t的意思是"perform 这个 effect 会返回一个llm_response"。 - Continuation 是一等公民。
k代表"perform 之后剩下的整段计算"。你可以continue k v把v塞回 perform 点继续跑;也可以不 continue,直接 return 一个值上去,这就实现了"挂起"。 - Handler 可以嵌套。没被当前 handler 处理的 effect(
None那一支)会向外层 handler 冒泡。这是组合式 middleware 的基础。
OK,这就够用了。下面把 ReAct 写出来。
三、第一步:把 ReAct 循环写成纯 effect-perform 代码
下面是 wombat 的 Step.once——执行一次 ReAct 循环的核心,删掉了 logging 和 trace span:
let once ~(ctx : Context.t) () : result =
let args = Context.to_llm_args ctx in
let response = Effect.perform (Effects.Llm_complete args) in
match response.stop_reason with
| End_turn ->
let ctx = Context.push_assistant response.content ctx in
Terminal_text { answer = extract_text response.content; ctx }
| Tool_use_stop ->
let tool_uses = collect_tool_uses response.content in
let results = Effect.perform (Effects.Tool_calls (tools, tool_uses)) in
let ctx = Context.push_tool_results results ctx in
Continue ctx
| Max_tokens ->
Failed { reason = Llm_max_tokens; ctx }
...
注意几个东西:
- 这段代码不知道 LLM 在哪里。没有
http.post、没有anthropic_client、没有retry装饰器。它只是 perform 了一个 effect,期望某个 handler 给它一个llm_response。 - 这段代码不知道 tool 怎么跑。Tool 可能在子进程、可能在 Docker、可能在 mock 表里——loop 不关心。
- 这段代码是普通的同步代码。没有 async、没有 promise、没有 callback hell。Effect 在 perform 点"看起来"返回一个值,背后的 handler 可能做了网络 IO、可能做了等待、可能做了模拟,但 loop 看到的就是一个返回值。
这正是 effect system 给 agent 代码带来的第一个礼物:循环回到了它本来的样子。
外面套一个 run_loop 把它跑到 End_turn 或者 max_iterations:
let rec loop ctx iter =
if iter > max_iterations then Error (Max_iterations_reached, ctx)
else
match Step.once ~ctx () with
| Continue ctx -> loop ctx (iter + 1)
| Terminal_text r -> Ok (r.answer, r.ctx)
| Failed { reason; _} -> Error (reason, ctx)
in loop ctx0 1
整个 ReAct agent 的"逻辑部分",就这么多了。
四、第二步:同一段 agent 代码,五个世界
把循环写成纯 effect-perform 之后,最直接的好处是:同一段 agent 代码可以跑在任意 handler 组合下。
看一个最小的"mock handler",用于测试:
let mock ~llm_responses ~tool_results f =
let pending = ref llm_responses in
try_with f ()
{ effc = fun (type a) (eff : a Effect.t) ->
match eff with
| Effects.Llm_complete _ ->
Some (fun (k : (a, _) continuation) ->
let r :: rest = !pending in
pending := rest;
continue k r)
| Effects.Tool_calls (_, uses) ->
Some (fun (k : (a, _) continuation) ->
let results = List.map (fun (id, name, _) ->
(id, List.assoc name tool_results)) uses in
continue k results)
| _ -> None
}
现在我可以这样写一个单元测试:
let () =
Handlers.mock
~llm_responses:[
mock_response_with_tool_call "search_files" `Assoc [...] ;
mock_response_with_text "找到 3 个文件";
]
~tool_results:[ "search_files", Ok "a.ml\nb.ml\nc.ml" ]
(fun () ->
let result = Agent.execute ~spec ~input:(Fresh "找文件") in
assert (result = Done { answer = "找到 3 个文件"; ... }))
这是 byte-identical 的 agent 代码,跑在一个完全没有 HTTP、没有 subprocess 的世界里。对 Python 框架来说要做到这点意味着 mock requests.post、mock subprocess.run、mock time.time()、mock 文件系统——通常要拉进 unittest.mock 半个标准库。这里只需要写一个 80 行的 handler。
这种"同一段代码、不同 handler"的能力打开了很多扇门:
| Handler 组合 | 用途 |
|---|---|
| Anthropic LLM + 真实 tool | 生产 |
| Mock LLM + Mock tool | 单元测试 |
| Tape replay LLM + Tape replay tool | regression / bug 复现 |
| Real LLM + dry-run tool(只 log 不执行) | 评估 plan 而不真改东西 |
| Real LLM + sandbox tool | 限制只能写指定目录 |
| Anthropic + DeepSeek 按 model 名路由 | planner 用 Claude,executor 用 DeepSeek,省钱 |
| Cost-only handler(不真调) | 估算 token 成本不烧钱 |
这些组合下,agent loop / step / spec 代码一行都不用改。
五、第三步:Handler chain — middleware 的真正含义
Mock 是"完全替换"。但真正的 harness 复杂度在于:在生产环境里,你也需要在真实 HTTP 调用之外加上一堆横切关注点——重试、断路器、超时、限流、token 计数、追踪、日志、缓存。Python 框架一般用装饰器或者 hook 解决。OCaml effects 里,更优雅的做法是把每一层都写成 handler -> handler 的纯函数变换。
Wombat 里的 LLM handler 就是这么写的:
(* 类型签名:每个 middleware 都是 handler -> handler *)
type t = llm_call_args -> llm_response
(* 叶子层:真正的 HTTP 调用 *)
let anthropic ?api_key ?model () : t =
fun args -> Anthropic.complete_stream ?api_key ?model args
(* 一层 retry:transient 错误指数退避 *)
let with_retry ~policy ~on_retry (inner : t) : t =
fun args ->
let rec attempt n =
try inner args
with Llm_api_error err when Llm_error.is_retryable err
&& n < policy.max_attempts ->
Unix.sleepf (backoff n);
attempt (n + 1)
in attempt 0
(* 一层 cost tracking *)
let with_cost_tracking ~cost (inner : t) : t =
fun args ->
let resp = inner args in
cost.input_tokens <- cost.input_tokens + resp.usage.input_tokens;
cost.output_tokens <- cost.output_tokens + resp.usage.output_tokens;
resp
(* 一层 trace span *)
let with_tracing ~tracer (inner : t) : t =
fun args ->
Trace.with_span tracer ~kind:Llm_call (fun () -> inner args)
组装的时候就一行 |>:
let llm =
Llm_handler.anthropic ~api_key ()
|> Llm_handler.with_validation
|> Llm_handler.with_cost_tracking ~cost
|> Llm_handler.with_retry ~policy
|> Chaos.with_llm chaos_config (* 故障注入 *)
|> Llm_handler.with_tracing ~tracer
|> Llm_handler.with_governor_ticks (* 给 Governor 发 tick *)
|> Llm_handler.with_logging ~on_log
关键的设计要点:
- 顺序决定语义。
retry在tracing内层意味着每次重试是一个独立 span;放在外层则一个逻辑调用就一个 span。Wombat 选了后者——一个 LLM 调用的语义边界比"每次 HTTP 尝试"更有意义。 - Chaos middleware 故意放在 retry 外面,让注入的失败可以传播到上层的 workflow recovery,验证恢复逻辑而不是 retry 逻辑。
- 同一个
type t = args -> response类型签名让所有这些 middleware 可以任意组合,类型系统会在编译期把不兼容的组合挡掉。
Tool 的 handler chain 同构:
let tool =
Tool_handler.direct
|> Tool_handler.with_validation
|> Tool_handler.with_retry ~policy
|> Tool_handler.with_circuit_breaker ~failure_threshold:5 ~cooldown:60.0
|> Tool_handler.with_dedup_repeats ~threshold:3 (* 死循环检测 *)
|> Chaos.with_tool chaos_config
|> Tool_handler.with_tracing ~tracer
|> Tool_handler.with_logging ~on_log
这里的 with_dedup_repeats 正是上一篇里讲过的"同一个 (tool, input) 连续失败 N 次 → 死循环"——在这里它就是 20 行的一个 middleware,并不需要侵入 agent loop。
把整个 chain 转成 effect handler 只在最外层一次:
let install (chain : t) f =
try_with f ()
{ effc = fun (type a) (eff : a Effect.t) ->
match eff with
| Effects.Llm_complete args ->
Some (fun (k : (a, _) continuation) ->
continue k (chain args))
| _ -> None
}
一行 perform,N 层 middleware。这是 effect system 的核心红利。
六、第四步:Pause / Resume — 丢弃 continuation 的艺术
到目前为止演示的 effects 都是"perform 一下、continue 一下"的常规模式——和函数调用没有本质区别。Effect system 真正不可替代的地方在于:handler 可以选择不 continue。
Agent 场景里有一个非常典型的"必须 pause"需求:ask_user。模型决定问用户一个问题,agent 要把当前进度持久化下来,然后从最外层返回一个 "Waiting" 状态,等下一次 HTTP 请求带着用户的答复进来,再恢复。
用 callback 风格写这件事很恶心——你得把整个 agent 循环改造成一个状态机,每个可能 pause 的点都拆成 before / after 两半。用 effects 写这件事是一行的。
(* 声明 Pause effect。它的"返回值"是用户的回复(一个 message) *)
type _ Effect.t +=
| Pause : { tool_use_id : Id.Tool_use_id.t;
schema : Pause_schema.t;
ctx_so_far : Context.t; }
-> message Effect.t
当 agent 循环检测到模型调用了带 Pause capability 的工具时:
let resume_msg : message =
Effect.perform
(Effects.Pause { tool_use_id; schema; ctx_so_far })
in
(* 把 resume_msg 拼回上下文继续跑 *)
let conv = Conversation.push_user_with_results ... in
Continue (Context.with_conversation conv ctx)
现在handler 决定一切。如果是聊天会话,handler 这样写:
| Effects.Pause { tool_use_id; schema; ctx_so_far } ->
Some (fun (_k : (a, _) continuation) ->
(* 注意:_k 没有被 continue。直接 return 一个 Waiting 值。 *)
Waiting {
tool_use_id;
schema;
messages = final_messages ctx_so_far;
})
这里发生了一件神奇的事情:continuation 被丢弃了。perform Pause 的那行永远不会返回,整段 agent 循环的剩余部分作为闭包躺在 _k 里被丢掉,handler 直接从最外层返回 Waiting。Agent loop 的代码不知道这件事——从它的视角看就是"perform 然后我应该拿到一个 message",但实际上控制流根本没回到这一行。
而如果是自动答复模式(dry-run / 批处理 / 单测),handler 这样写:
| Effects.Pause { schema; _ } ->
Some (fun (k : (a, _) continuation) ->
let synthetic_answer = generate_synthetic_answer schema in
continue k synthetic_answer) (* 这次 continue 了 *)
同一段 agent 代码——一次跑出来是"挂起等用户",一次跑出来是"自动答复后继续"。区别只在 handler 是否 continue。在 Python 里写这种东西要么用 generator 的 send/throw 协议、要么写显式状态机,没有第三条路。OCaml 的 effects 让它就是 perform 一行。
Wombat 用一模一样的模式实现了"任务完成"——Terminal effect。注意它的类型:
| Terminal : { tool_name : string;
payload : Yojson.Safe.t;
ctx_so_far : Context.t; }
-> 'a Effect.t (* 多态返回类型! *)
返回类型是 'a——这意味着 perform Terminal 在类型上可以满足任何上下文的返回类型,因为 handler 保证永远不 resume 它。这是 effect-as-exception 模式的一个非常优雅的体现:用类型系统编码"这条路径永不返回"。Python 也能写 "raise",但 raise 没法携带 "I'm done, here's my answer" 这种结构化 payload,而 OCaml 的 Terminal effect 可以。
七、第五步:横切的 Governor — 全局预算与中止
到目前为止 LLM 和 Tool 的预算控制都是单次调用的——单次超时、单次重试。但真正的 harness 还需要全局预算:整个 run 不能超过 $5、不能跑超过 10 分钟、不能调超过 200 次 LLM、不能让 sub-agent 嵌套超过 3 层。
这种"横切"需求传统上是个痛点。Python 框架经常用 thread-local 或者 contextvars 把"当前预算"传到每个角落。OCaml effects 给的方案漂亮得多:再定义一个 effect。
(* Governor.Tick:每个 component 在做事情前后报告一下 *)
module Event = struct
type t =
| Llm_started of { messages : int; tools : int }
| Llm_finished of { input_tokens : int; output_tokens : int }
| Tool_started of { name : string; input_digest : string; ... }
| Tool_finished of { ok : bool; duration : float; ... }
| Subagent_entered
| Subagent_exited
end
type _ Effect.t += Tick : Event.t -> unit Effect.t
LLM handler 在调用前后 perform Tick:
let with_governor_ticks (inner : t) : t =
fun args ->
Effect.perform (Governor.Tick (Llm_started { messages; tools }));
let response = inner args in
Effect.perform (Governor.Tick (Llm_finished {
input_tokens = response.usage.input_tokens;
output_tokens = response.usage.output_tokens;
}));
response
然后在 runtime 最外层装一个 Governor handler。它在 effc 里观察每个 Tick,更新内部计数器,超限就抛 exception:
| Governor.Tick ev ->
Some (fun (k : (a, _) continuation) ->
update_counters cost limits ev;
match check_limits cost limits with
| `Ok -> continue k ()
| `Exceeded reason ->
(* 通过 discontinue 让 exception 沿 fiber 栈正常 unwind *)
discontinue k (Governor_aborted reason))
这里有个 OCaml 5 effects 的小坑:从 effc body 里直接 raise 的 exception 会跳出 try_with,绕过 fiber 内部的所有 try ... with。这会让 Workflow.with_checkpoint 的 git rollback 之类的"局部 try/with"失效。解法是用 discontinue k exn,告诉 effect runtime "沿 fiber 栈正常 unwind 这个异常"。这是 wombat 里折腾出来的一个真实经验——你写 effect-based runtime 的第一年大概率会踩到。
Governor 装在哪一层、和谁的关系是什么,决定了整个 harness 的语义边界:
┌─ Governor.install (max_steps / cost / wall_time)
│ ┌─ Llm_handler.install (chain: cost / retry / chaos / trace / ticks)
│ │ ┌─ Time_handler / File_handler / Log_handler
│ │ │ ┌─ Skill.with_state / Environment.with_current
│ │ │ │ ┌─ Tool_handler.install (chain)
│ │ │ │ │ ┌─ ReAct loop perform Effects.Llm_complete /
│ │ │ │ │ │ Effects.Tool_calls / Effects.Pause / Effects.Terminal
└──────────────────────────────────────────────────────────────────
注意一件容易踩坑的事:File_handler / Time_handler / Log_handler 必须装在 Tool_handler 外面。因为 tool handler 在 effc 里调用具体工具的代码时,工具的 perform File_read / perform Time_now 会从 effc 里向上冒泡——必须冒到比 Tool_handler 更外层的 handler 才能被接住。这是 wombat 早期反复调试的一个层级问题,本质源于"effc 内 perform 的 effect 从 effc 调用点向外查找"的语义。
八、第六步:Workflow combinators — 在 leaf 之上做编排
有了"一个能跑的 leaf agent"之后,复杂任务的下一层抽象是编排:先 planner 拆任务、再 executor 一个个跑、跑挂了让 recovery agent 决定怎么办、最后 summarizer 汇总。这一层完全不需要再碰 effect system——它就是普通的 Result monad:
type 'a t = unit -> ('a, agent_error) result
let leaf spec input : Agent.output t =
fun () -> Ok (Agent.execute ~spec ~input)
let with_retry ?(max_attempts = 3) body =
fun () ->
let rec attempt n =
match body () with
| Ok _ as r -> r
| Error _ as r when n >= max_attempts -> r
| Error _ -> attempt (n + 1)
in attempt 1
let recover body handler =
fun () ->
match body () with
| Ok _ as r -> r
| Error e -> handler e ()
let with_checkpoint ~cwd ~message inner =
fun () ->
let ckpt = Git_checkpoint.create ~cwd in
match inner () with
| Ok _ as r -> Git_checkpoint.commit ckpt ~message; r
| Error _ as r -> Git_checkpoint.rollback ckpt; r
| exception exn -> Git_checkpoint.rollback ckpt; raise exn
这一层有意识地不用 effects——因为它要的是显式的 "succeed / fail / retry" 数据流,而不是隐式的中断。Effect 系统适合"看起来是一个值、其实背后有 IO/重试/挂起"的场景;workflow 编排适合"看得见的成功失败分支"。把两层分开是个有意识的设计选择,而不是"用了 effects 就到处用"。
有了这些组合子,一个 plan-act 流水线就是几行 let*:
let plan_act_flow ~goal =
let* plan = Workflow.leaf planner_spec (Fresh goal)
|> Workflow.expect_terminal_tool ~name:"submit_plan" in
let* results = Workflow.foreach plan.tasks (fun task ->
Workflow.with_checkpoint ~cwd ~message:(fun () -> task.name) (
Workflow.with_retry ~max_attempts:2 (
Workflow.leaf executor_spec (Fresh task.instruction))))
in
Workflow.leaf summarizer_spec (Fresh (render results))
|> Workflow.expect_done
每个 leaf 都自动继承了上面那一整套 LLM / Tool middleware、Governor 预算、Pause / Terminal 语义。因为它们都通过 effect 抽象层往下走。
九、第七步:并行 sub-agent — Effect system 的边界
到这里你可能已经觉得 effect system 解决了所有问题。但有一个角落它解决不了:OCaml 5 的 effect handler 不会传播到 OS 线程。一个 Thread.create 出来的 worker 线程里 perform 任何 effect 都会得到 Effect.Unhandled。
这件事在 wombat 里被反复撞到了:
- 最早的 tool batch 派发想用
Parallel.map_threaded并行跑——结果只要 tool 自己 perform 了File_read/Time_now/Log,worker thread 里就崩。 - 最终改回串行派发。LLM 一般一次只发 2-5 个 tool call,串行的成本可以接受。
- 需要真正并行的场景(多个 sub-agent 并跑)改用 OCaml 5 Domain——每个 Domain 独立装自己的 handler stack。Wombat 用一个
set_branch_wrapper把"如何为一个 thunk 装 child runtime" 注册到 Domain-Local Storage,Workflow.parallel就用这个 wrapper 包每条并行支路。
let parallel (flows : 'a t list) : 'a list t =
fun () ->
let { wrap } = Domain.DLS.get branch_wrapper_slot in
(* 每条分支跑在自己的 Domain 里,自己 install 完整 handler 栈 *)
let thunks = List.map (fun f () -> wrap f) flows in
let results = Parallel_subagent.run thunks in
...
这是 effect system 设计里一条真实的边界。OCaml 5 的 effect 是fiber-local的(runtime 通过 fiber 栈实现 continuation),不是 thread-local 也不是 domain-local。任何用 effect 写的 runtime 早晚都得回答这个问题。Python 不存在这个问题,因为它的"重试装饰器"本来就靠 thread-local 跑——但它也没有真正的"组合式 middleware"。
十、第八步:Tape replay — 一个 handler 卷起一个时间机器
最后展示一个 effect system 真正出彩的应用:tape replay。
需求:希望每次 agent 跑的时候,把所有 LLM 响应和所有 tool 结果录像到一个 JSONL 文件里。下次再跑同样的输入时,直接从录像里重放,跳过真实 HTTP / 真实 subprocess——这对 bug 复现、regression 测试、demo 都极其有用。
实现核心就是两段 handler 包装:
(* Tape 包在 LLM handler 外面:先查 tape,没命中再 fallthrough 到真实 chain *)
let with_tape_llm session (inner : Llm_handler.t) : Llm_handler.t =
fun args ->
let key = digest_of_request args in
match Tape.lookup session key with
| Some recorded ->
Tape.log session "replay_hit" key;
recorded (* 跳过真实 HTTP *)
| None ->
let response = inner args in
Tape.record session key response; (* 录下来 *)
response
(* Tape 包在 Tool handler 外面:同构 *)
let with_tape_tool session (inner : Tool_handler.t) : Tool_handler.t =
fun args ->
let key = digest_of_tool_call args in
match Tape.lookup_tool session key with
| Some recorded -> recorded
| None ->
let result = inner args in
Tape.record_tool session key result;
result
组装:
let llm_chain =
Llm_handler.anthropic ()
|> Llm_handler.with_retry ~policy
|> Llm_handler.with_cost_tracking ~cost
|> Checkpoint.with_tape_llm session (* ← 一行 *)
|> Llm_handler.with_logging
就这一行。Agent 代码不知道有 tape。Cost tracking 还能照常工作(哪怕命中 tape,replay 出来的 response 还是带 usage 字段,会被 cost middleware 计入——这一点要看你想要"记账"算原始那次的还是 replay 那次的,wombat 选了算 replay 的,因为它代表"这次 run 的逻辑成本")。Trace 还能照常工作。Retry 还能照常工作。所有的横切关注点彼此正交。
这件事在传统 Python 框架里要么是写一整套 VCR-like 的 monkey-patch、要么是把整个 agent runtime 重写一遍走"录制模式"和"重放模式"。OCaml effects 让它就是一层 middleware。
十一、把整张图拼起来
一个完整的 wombat run 在 Runtime.install 里把所有 handler 层堆出来:
Governor.install (limits, cost)
└─ Llm_handler.install (
anthropic_or_deepseek
|> with_validation
|> with_cost_tracking
|> with_retry
|> Chaos.with_llm ← 故障注入
|> with_tracing
|> with_governor_ticks
|> with_request_dump
|> with_logging
|> [optional] with_tape_llm
)
└─ Time_handler.install
└─ File_handler.install (
direct |> [optional] with_sandbox ~root
)
└─ Log_handler.install
└─ Skill.with_state
└─ Environment.with_current
└─ Tool_handler.install (
direct
|> with_validation
|> with_retry
|> with_circuit_breaker
|> with_dedup_repeats
|> Chaos.with_tool
|> with_tracing
|> with_logging
|> with_event_emit ~on_tool_event
|> [optional] with_tape_tool
)
└─ Agent.execute
└─ Step.once 循环
├─ Effects.Llm_complete
├─ Effects.Tool_calls
├─ Effects.Pause
├─ Effects.Terminal
├─ Effects.File_*
├─ Effects.Time_now
└─ Effects.Log
从外到内一共大概十几层,但每一层都是 handler -> handler 的纯函数变换,每一层都能单独测试,每一层都能在任意位置插入或拿掉。Agent loop 不知道任何一层的存在。
这是 effect system 给 agent harness 设计带来的真正东西:一个真正可组合的 runtime——不是靠继承、不是靠插件框架、不是靠 hook 表,而是靠类型系统强制的、纯函数式的 middleware chain。
十二、不是银弹:Effect system 的代价
说点反面的事情。Effect-based runtime 不是没代价:
- OCaml 5 effects 还是个相对年轻的 feature。文档稀疏、生态库少、错误信息有时让人发懵("Effect.Unhandled" 不告诉你是哪个 effect 没接住)。如果你的团队还没用 OCaml,这是一个不小的入场成本。
- Effects 不跨 thread / 跨 Domain 传播。前面讲过——任何并行方案都得自己想清楚 handler 栈怎么 fork。
- 调试栈更深。当 perform 一个 effect 时,"调用栈"在物理上是真实的 fiber 栈,但在概念上中间穿插着 effc 回调。Backtrace 不像普通函数调用那么直白。
- 类型签名变长。一旦给一个函数标注它会 perform 的 effects(OCaml 现在还没有 effect 类型推断到签名级别——它只检查 Effect.t 是否被处理),手工管理"哪个函数会触发哪些 effect"是程序员的责任。OCaml 5.x 或 5.y 路线图里有 effect rows 的提案,但还没落地。
- 诱惑过度抽象。一旦你看到 effect 能干这么多事,会想把所有东西都抽成 effect——这会让本来朴素的
Result.t数据流变成隐式的控制流,反而难读。Wombat 的 workflow 层故意没用 effects,就是为了避免这种过度抽象。
但这些代价比起 effect system 在 agent harness 这个场景下带来的结构清晰度——是远远值得的。
十三、总结
把这一整套东西回到最开始的问题:
主流 Python agent 框架的代码膨胀,不是因为它们的作者写得差,而是因为它们的语言里没有把"做什么"和"怎么做"切干净的工具。结果是 LLM 调用要么用 mock 框架猴补、要么走 dependency injection、要么走 hook plugin——每条路都让 agent loop 多绑一根线。线越多,循环越难看。
OCaml 5 的 algebraic effects 提供的工具不复杂——就三件事:
- Effect 是带类型返回值的可恢复"异常"。
- Continuation 是一等公民——可以 continue,也可以丢。
- Handler 通过
|>像普通函数一样组合。
有了这三件事,agent harness 设计就可以做到一些 Python 框架很难做到的事:
- 同一段 agent 代码跑在 production / mock / replay / sandbox / cost-only 五种世界里,一行不改。
- 横切关注点(重试、断路器、追踪、成本、tape、chaos)每个都是
handler -> handler的纯函数 middleware,用|>拼出来,类型系统帮你接住所有组合错误。 - Pause / Ask user / Submit task 等"会改变控制流走向"的瞬间,通过选择不 continue 直接从最外层返回,agent loop 完全感知不到。
- Governor 全局预算这种横切关注点,通过 Tick effect 实现,不需要 thread-local,不需要把 context 塞进每个调用。
这一切的底座是 ReAct loop 本身——上一篇讲了它是 MDP 上的一个朴素策略,这一篇讲了它实现起来真正应该是什么样子:一段几十行的、不知道 HTTP 存在的、可以在任何 handler 栈下运行的代码。
Agent 框架接下来几年会经历一次类似 Web framework 在 2010 年代经历过的洗牌——从"全家桶"走向"组合式 runtime"。OCaml 的 effect system 不一定是最终赢家(Koka、Eff、Unison 都在这个方向上探索),但它提供了一个非常清晰的存在性证明:你可以把 agent runtime 写得很干净,而且不需要为此牺牲任何性能、任何可测试性、任何可观测性。
Wombat 还在持续演进,但目前已经能跑 plan-act、parallel sub-agent、git checkpoint rollback、tape replay、ask_user、sandbox 等所有真实 Agent harness 需要的能力。如果你也对"用语言特性而不是设计模式来组织 Agent runtime"感兴趣,欢迎一起聊。