Agently documentation for building AI applications with stable outputs, observable actions, and durable workflows.
语言:English · 中文
过程有 3 个或更多离散阶段。下列任一为真:
都不沾,留在 request 层 —— 见 快速开始 与 输出控制。
应用
│
▼
TriggerFlow 定义(一个 flow 一个 Python 模块)
├── prepare ← 校验 / 归一化输入
├── classify ← 模型调用:按类型路由
├── (按分类分支)
│ ├── handle_A → … → finalize
│ ├── handle_B → … → finalize
│ └── handle_C → … → finalize
├── for_each items ← 任一 handler 返回 list 时 fan out
├── pause_for(...) ← 可选人工批准
└── finalize ← 写最终 state,推到 runtime stream
flow 之外:
• create_execution(auto_close=False, runtime_resources={...})
• async_start(...)
• 消费 runtime stream 给 live UI
• async_close() → 给 API 响应的 close snapshot
from agently import TriggerFlow, TriggerFlowRuntimeData
def build_flow():
flow = TriggerFlow(name="orchestration")
async def prepare(data: TriggerFlowRuntimeData):
# 校验 / 归一化输入
await data.async_set_state("input", data.input)
return data.input
async def classify(data: TriggerFlowRuntimeData):
agent = data.require_resource("agent")
return await agent.input(data.input).output({
"category": (str, "分类", True),
}).async_start()
async def handle_default(data: TriggerFlowRuntimeData):
# ...
await data.async_set_state("answer", "...")
(
flow.to(prepare)
.to(classify)
.match()
.case("A").to(handle_default)
.case("B").to(handle_default)
.case_else().to(handle_default)
.end_match()
)
return flow
async def run(input_value, agent):
flow = build_flow()
execution = flow.create_execution(
auto_close=False,
runtime_resources={"agent": agent},
)
await execution.async_start(input_value)
return await execution.async_close()
骨架里的几个选择:
auto_close=False —— 应用显式控制 close。任何可能消费 runtime stream item 或暂停等外部输入的场景都用这个。state(live 对象),不在 flow_data(共享且风险)。见 State 与 Resources。match() 走分类结果 —— 离散类别用 match;predicate 分支用 if_condition。data.input、写 state —— handler 应小且单一职责。把单 handler 替换为 for_each:
async def list_subtasks(data):
return data.input["subtasks"] # list
async def handle_one(data):
return await some_agent.input(data.input).async_start()
(
flow.to(list_subtasks)
.for_each(concurrency=4)
.to(handle_one)
.end_for_each()
.to(collect)
)
batch、for_each 与并发上限见 模式。
加 pause_for 步骤。execution 必须用 auto_close=False 创建;通过 continue_with 和显式 resume_to 目标恢复。
async def ask(data):
return await data.async_pause_for(
type="approval",
payload={"summary": data.input["summary"]},
resume_to="next",
)
在有意义的 checkpoint 保存 execution state(通常在 pause_for 未决时),把结果持久化到耐久存储,用 flow.create_execution(...).load(saved) 恢复。
saved = execution.save()
db.put(execution_id, saved)
# 后续,可能在另一进程:
restored = flow.create_execution(auto_close=False, runtime_resources={...})
restored.load(db.get(execution_id))
恢复侧重新注入 runtime resource。见 持久化与 Blueprint。
在 FastAPI / WebSocket handler 内消费 execution.get_async_runtime_stream(...)。chunk 通过 data.async_put_into_stream(...) 推 item。见 FastAPI 服务封装 与 事件与流。
.start() 已经经校验流水线重试。见 输出控制。state。用 runtime_resources,load() 时重新注入。auto_close 与 5 个入口 API