Agently documentation for building AI applications with stable outputs, observable actions, and durable workflows.
语言:English · 中文
TriggerFlow 是 Agently 的编排层,负责:
if/elif/else、match/case)batch、for_each)when(...))它位于 action runtime 之上 —— 你的 flow 可以在 chunk 内调 agent、tool、MCP 等。它位于应用代码之下 —— 应用决定跑哪个 flow、传什么。
| 你有 | 用 |
|---|---|
| 单次模型调用(带重试 / 校验) | 一个 request,不是 flow |
| 2-3 步线性管道、无 fan-out | 有时 flow 是 overkill;考虑纯 async |
| 基于中间结果的分支 | TriggerFlow if_condition 或 match |
| N 个输入的并发 | TriggerFlow for_each / batch |
| 长跑且有人工审批 | TriggerFlow pause_for |
| 需要跨进程重启存活 | TriggerFlow save / load |
| 给 UI / SSE 推 live 事件流 | TriggerFlow runtime stream |
右栏都不沾就留在 request 层。
Dynamic Task 是 Agently 独立的一等能力面,面向模型生成或应用提交的 DAG
数据。当计划本身需要规划、校验、裁剪和执行时,使用
Agently.create_dynamic_task(...)。Dynamic Task 内部以 TriggerFlow 作为执行
基座,但它不是 TriggerFlow 子 API。见 Dynamic Task。
┌──────────────────────────────────────┐
│ 应用代码 │
│ create execution → start → close │
└────────────────┬─────────────────────┘
│
┌─────────────▼──────────────┐
│ TriggerFlow execution │
│ open → sealed → closed │
│ • state(snapshot) │
│ • runtime_resources │ ◄── 注入的 live 对象
│ • runtime stream │ ◄── chunk 推出的 item
│ • pending interrupts │
└─────────────┬──────────────┘
│
chunk(你写的 async 函数)调 agent、tool、外部 API,
再更新 state 和 / 或 emit 事件
TriggerFlow 对象是定义 —— handler 链与分支。execution 是该定义的一次运行。同一个 flow 可以有多个并发 execution。
import asyncio
from agently import TriggerFlow, TriggerFlowRuntimeData
async def hello():
flow = TriggerFlow(name="hello")
async def greet(data: TriggerFlowRuntimeData):
await data.async_set_state("greeting", f"Hello, {data.input}!")
flow.to(greet)
execution = flow.create_execution()
await execution.async_start("World")
snapshot = await execution.async_close()
print(snapshot["greeting"]) # Hello, World!
asyncio.run(hello())
发生了什么:
TriggerFlow(name=...) 定义一个 flow。flow.to(greet) 链入 handler。handler 收到 data: TriggerFlowRuntimeData,里面 data.input 是 start() 传入的值。flow.create_execution() 创建可运行 execution。async_start("World") 启动;async_close() 等所有事 drain 完,返回 close snapshot —— 一个 dict,含所有 handler 设置的 state。不需要显式控制 execution 时,flow.start(...) / flow.async_start(...) 创建临时 execution、跑到 close、返回 snapshot:
snapshot = await flow.async_start("World")
print(snapshot["greeting"])
脚本里用这个。flow 会暂停等待人工输入或依赖外部事件时不要用 —— 见 Lifecycle。
handler 内 data 暴露:
| API | 用途 |
|---|---|
data.input |
流入的值(start 的 input,或上一 chunk 的返回) |
data.async_set_state(key, value) / get_state(key) |
execution-local 可序列化 state |
data.async_emit(event, payload) |
触发 when(event) 分支 |
data.async_put_into_stream(item) |
推到 runtime stream |
data.async_pause_for(type=..., resume_to=...) |
暂停等待外部图恢复 |
data.require_resource(name) |
取你注入的 live 对象 |
return value |
成为下一 chunk 的 data.input |
完整词汇表见本节其余页。
emit、when、runtime stream.end() / set_result() / wait_for_result= / 旧 runtime_data 迁移