跳转至

Loop Flow

通过事件回流实现循环。

源码

from agently import TriggerFlow, TriggerFlowEventData


## TriggerFlow Loop: emit event to re-enter the flow
def loop_flow_demo():
    # Idea: create a loop by emitting the same event until a stop condition.
    # Flow: start_loop -> emit Loop -> loop_step -> emit Loop/LoopEnd
    # Expect: prints "done: 3".
    flow = TriggerFlow()

    async def start_loop(data: TriggerFlowEventData):
        await data.async_emit("Loop", 0)
        return None

    async def loop_step(data: TriggerFlowEventData):
        count = data.value
        if count >= 3:
            await data.async_emit("LoopEnd", count)
            return None
        await data.async_emit("Loop", count + 1)
        return count

    flow.to(start_loop)
    flow.when("Loop").to(loop_step)
    flow.when("LoopEnd").to(lambda d: f"done: {d.value}").end()

    result = flow.start("start")
    print(result)


# loop_flow_demo()

讲解

  • 终止条件显式控制。

注释解读

  • Idea 表示案例思路
  • Flow 表示执行编排路径
  • Expect 表示预期输出或行为

你学会了什么

  • 掌握事件回流的循环方式

练习任务

  • 修改终止条件为 5 次