Skip to content

Loop Flow

Loop by re-emitting the same event.

Source Code

from agently import TriggerFlow, TriggerFlowEventData


## TriggerFlow Loop: emit event to re-enter the flow
def loop_flow_demo():
    # Idea: create a loop by emitting the same event until a stop condition.
    # Flow: start_loop -> emit Loop -> loop_step -> emit Loop/LoopEnd
    # Expect: prints "done: 3".
    flow = TriggerFlow()

    async def start_loop(data: TriggerFlowEventData):
        await data.async_emit("Loop", 0)
        return None

    async def loop_step(data: TriggerFlowEventData):
        count = data.value
        if count >= 3:
            await data.async_emit("LoopEnd", count)
            return None
        await data.async_emit("Loop", count + 1)
        return count

    flow.to(start_loop)
    flow.when("Loop").to(loop_step)
    flow.when("LoopEnd").to(lambda d: f"done: {d.value}").end()

    result = flow.start("start")
    print(result)


# loop_flow_demo()

Walkthrough

  • Use exit conditions to terminate loops.
  • Use explicit events for clarity.

What you'll learn

  • Build event-driven loops

Exercises

  • Change loop limit to 5