Agently documentation for building AI applications with stable outputs, observable actions, and durable workflows.
语言:English · 中文
TriggerFlow execution 有三种状态。5 个入口 API 控制启动与结束方式。
open ──seal()──► sealed ──close()──► closed
│ │
└───(auto_close 在空闲超时后自动触发)───────┘
| 状态 | 接受什么 | 还在跑什么 |
|---|---|---|
open |
新外部事件(emit、continue_with) |
全部:chunk、runtime stream、已注册 task |
sealed |
不再接外部 | 已接受事件、内部 emit 链、已注册 task 继续 drain |
closed |
不接 | runtime stream 已关闭;close snapshot 已冻结 |
关键区别:seal() 停外部输入但让在途工作完成。close() 先 seal,再 drain,再冻结。
| API | 用途 | 返回 |
|---|---|---|
flow.start(...) / flow.async_start(...) |
隐式 execution 语法糖;create + start + wait + close | close snapshot |
flow.start_execution(...) / flow.async_start_execution(...) |
显式启动;你持有 execution handle | execution |
execution.start(...) / execution.async_start(...) |
启动已创建的 execution | auto_close=True 返回 close snapshot;auto_close=False 返回 execution |
execution.seal() / execution.async_seal() |
运行期 seal | — |
execution.close() / execution.async_close() |
收尾 | close snapshot |
flow.start(...) —— 隐式语法糖snapshot = await flow.async_start("input value")
内部做:create_execution(auto_close=True, auto_close_timeout=0.0)、启动、等到 close、返回 snapshot。
规则:
auto_close=False 非法 —— 立刻报错。wait_for_result= 的值被忽略并 warn。返回类型固定为 close snapshot。timeout= 当作 auto_close_timeout —— 最后一次活动后多久自动关闭。pause_for(...) 时不要用 flow.start() —— 外部没有 handle 来恢复。隐式 execution 走到 pause_for(...) 时 TriggerFlow 会 fail fast。改用 flow.start_execution(...) 或 flow.create_execution(...)。flow.start_execution(...) —— 显式启动execution = await flow.async_start_execution("input value")
# ... 用 handle 做事 ...
snapshot = await execution.async_close()
返回 execution,你决定何时 close。适合服务、SSE/WebSocket 流、人工介入、外部 emit()。
wait_for_result= 在这里也被忽略。
execution.start(...) —— 启动已构建的 executionexecution = flow.create_execution(auto_close=True)
snapshot = await execution.async_start("input") # 返回 close snapshot
execution = flow.create_execution(auto_close=False)
exec2 = await execution.async_start("input") # 返回 execution
# ... 做事,再 ...
snapshot = await execution.async_close()
auto_close |
async_start 返回 |
|---|---|
True(默认) |
close snapshot |
False |
execution 本身 |
Sync start() 仅支持 auto_close=True。需要手动 close 时用 await execution.async_start(...)。
execution.seal() —— 停新输入,让在途完成await execution.async_seal()
seal 后:
emit() / continue_with() 被拒。emit 链、已注册 task 继续。需要「停接新工作但让在途事完成」、稍后再 close(或让 auto_close 关)时用 seal。
execution.close() —— 收尾返回 snapshotsnapshot = await execution.async_close()
close 顺序:
close 上的 timeout= 是 drain timeout —— 在途 task 的最大等待时间,与 auto-close 计时无关。
auto_close=True(create_execution 的默认值)表示 execution 在空闲 auto_close_timeout 秒后自动 close —— 没 chunk 在跑、没事件待处理、没 pending pause。
| 来源 | 默认 auto_close_timeout |
|---|---|
flow.create_execution(...) |
10.0 秒 |
flow.start(...) / flow.async_start(...)(隐式糖) |
0.0 秒(一空闲就 close) |
pause_for(...) 暂停 auto-close 计时。continue_with(...) 后空闲计时重新开始。
close() / async_close() 默认拒绝关闭仍有 pending interrupt 的 execution。应先恢复这些 interrupt;如果关闭时就是要放弃等待,必须显式传 pending_interrupts="cancel"。
auto_close_timeout=None 关掉 auto-close —— execution 一直存活直到显式 close()。不要把 auto_close_timeout=None 与隐式糖一起用 —— flow.start() 会永远不返回。
| 场景 | 用 |
|---|---|
| 快速脚本,输入都已知 | flow.start(...) / flow.async_start(...) |
| 持续 emit / 消费 runtime stream 的服务 | flow.start_execution(...) |
需要 pause_for(...)(人工审批、异步 webhook) |
flow.create_execution(auto_close=False) + execution.async_start(...) + 手动 close() |
| 跨重启 save/load | create_execution(...) + execution.save() / load() |
# 这个 flow 暂停等用户输入 —— 不要用 flow.start()
flow = TriggerFlow(name="approval")
async def ask(data):
return await data.async_pause_for(type="approval", resume_to="next")
async def commit(data):
await data.async_set_state("approved", data.input)
flow.to(ask).to(commit)
execution = flow.create_execution(auto_close=False)
await execution.async_start(None)
# ... 等外部系统调 execution.async_continue_with(...) ...
snapshot = await execution.async_close()
如果写成 await flow.async_start(None),隐式 execution 没有可恢复 handle,走到 pause_for(...) 时会直接报错。
如果需要停止一个等待中的 execution 且不恢复它,必须显式表达取消等待:
snapshot = await execution.async_close(pending_interrupts="cancel")
| 参数 | 状态 |
|---|---|
wait_for_result=True / False |
值被忽略,发 warning;返回类型由 auto_close 决定 |
set_result() / get_result() / .end() |
deprecated;见 兼容 |
runtime_data(get_runtime_data / set_runtime_data 等) |
state 的 deprecated 别名;见 State 与 Resources |
pause_for 与 continue_withsave / load