Agently documentation for building AI applications with stable outputs, observable actions, and durable workflows.
语言:English · 中文
下面是日常 flow 的常见形态。
flow.to(step_a).to(step_b).to(step_c)
每个 handler 收到上一 handler 的返回值作为 data.input。
async def score(data):
return {"score": 82}
async def store_grade(data):
await data.async_set_state("grade", data.input)
(
flow.to(score)
.if_condition(lambda data: data.input["score"] >= 90)
.to(lambda _: "A")
.elif_condition(lambda data: data.input["score"] >= 80)
.to(lambda _: "B")
.else_condition()
.to(lambda _: "C")
.end_condition()
.to(store_grade)
)
end_condition() 是必需的 —— 关闭条件分支并把链交还给你。被选中的分支返回成为下一 chunk 的 data.input。
(
flow.to(lambda _: "medium")
.match()
.case("low").to(lambda _: "priority: low")
.case("medium").to(lambda _: "priority: medium")
.case("high").to(lambda _: "priority: high")
.case_else().to(lambda _: "priority: unknown")
.end_match()
.to(store_result)
)
match() 对前一 chunk 的 data.input 分发。少量离散值用它;要 predicate 用 if_condition。
async def echo(data):
return f"echo: {data.input}"
flow.batch(
("a", echo),
("b", echo),
("c", echo),
).to(store_batch)
所有分支并行跑同一份 data.input。下一 chunk 收到一个含所有分支输出的 list(或 dict,取决于配置)。
execution 级限并发:
execution = flow.create_execution(concurrency=2)
async def double(data):
return data.input * 2
(
flow.for_each(concurrency=2)
.to(double)
.end_for_each()
.to(store_items)
)
execution = flow.create_execution()
await execution.async_start([1, 2, 3, 4])
# store_items 收到 [2, 4, 6, 8]
for_each 会检查前一 chunk 的输出(或 start input):非字符串 Sequence 会被拆成多个 item;标量值会被当成单个 item 处理。每个 item 在 concurrency 上限内并行跑 body,输出按输入顺序收集成 list。
如果要“按数字 N 循环 N 次”,先在前一 chunk 显式返回一个序列:
async def make_range(data):
return list(range(data.input))
flow.to(make_range).for_each().to(double).end_for_each()
Python 的 for 仍然可以写在 handler 函数内部。图层上的重复 / fan-out 用 for_each;需要由 flow 内部信号持续推进的循环,用 emit + when 表达:
flow = TriggerFlow(name="loop")
async def start_loop(data):
await data.async_set_state("values", [], emit=False)
data.emit_nowait("Loop", 0)
async def loop_step(data):
values = data.get_state("values", []) or []
values.append(data.input)
await data.async_set_state("values", values, emit=False)
if data.input < 3:
data.emit_nowait("Loop", data.input + 1)
else:
await data.async_set_state("done", {"last": data.input, "count": len(values)})
flow.to(start_loop)
flow.when("Loop").to(loop_step)
机制:
when(...) 分支跑后要么再 emit(继续)要么停(退出)。async_set_state 传 emit=False 表示这次 state 更新不触发观察者,适合热循环里降低观测开销。
长循环给 execution 合理的 auto_close_timeout(或 auto_close=False + 手动 close()),避免迭代间短暂停顿被 auto-close 误关。
when(...) 分支与主链独立运行,可用于 fire-and-forget log、telemetry、带外通知:
flow.to(main_step)
@flow.when("MainStepDone").to
async def log_step(data):
await some_external_log(data.input)
main_step 跑 data.async_emit("MainStepDone", {...}),旁路从那里 fan out 不阻塞主返回。
单个 flow 经常混用模式。Sub-flow 页有一个 if_condition + for_each + 子流的完整例子,见 Sub-Flow。