Loop Flow¶
通过事件回流实现循环。
源码¶
from agently import TriggerFlow, TriggerFlowEventData
## TriggerFlow Loop: emit event to re-enter the flow
def loop_flow_demo():
# Idea: create a loop by emitting the same event until a stop condition.
# Flow: start_loop -> emit Loop -> loop_step -> emit Loop/LoopEnd
# Expect: prints "done: 3".
flow = TriggerFlow()
async def start_loop(data: TriggerFlowEventData):
await data.async_emit("Loop", 0)
return None
async def loop_step(data: TriggerFlowEventData):
count = data.value
if count >= 3:
await data.async_emit("LoopEnd", count)
return None
await data.async_emit("Loop", count + 1)
return count
flow.to(start_loop)
flow.when("Loop").to(loop_step)
flow.when("LoopEnd").to(lambda d: f"done: {d.value}").end()
result = flow.start("start")
print(result)
# loop_flow_demo()
讲解¶
- 终止条件显式控制。
注释解读¶
- Idea 表示案例思路
- Flow 表示执行编排路径
- Expect 表示预期输出或行为
你学会了什么¶
- 掌握事件回流的循环方式
练习任务¶
- 修改终止条件为 5 次