Agently Docs

Agently documentation for building AI applications with stable outputs, observable actions, and durable workflows.

View the Project on GitHub AgentEra/Agently

Patterns

Languages: English · 中文

The patterns below cover the day-to-day shapes most flows fall into.

Linear chain

flow.to(step_a).to(step_b).to(step_c)

Each handler receives the previous handler’s return value as data.input.

if / elif / else

async def score(data):
    return {"score": 82}

async def store_grade(data):
    await data.async_set_state("grade", data.input)

(
    flow.to(score)
    .if_condition(lambda data: data.input["score"] >= 90)
        .to(lambda _: "A")
    .elif_condition(lambda data: data.input["score"] >= 80)
        .to(lambda _: "B")
    .else_condition()
        .to(lambda _: "C")
    .end_condition()
    .to(store_grade)
)

end_condition() is required — it closes the conditional branch and gives you back the chain to continue. The chosen branch’s return becomes the next chunk’s data.input.

match / case

(
    flow.to(lambda _: "medium")
    .match()
        .case("low").to(lambda _: "priority: low")
        .case("medium").to(lambda _: "priority: medium")
        .case("high").to(lambda _: "priority: high")
        .case_else().to(lambda _: "priority: unknown")
    .end_match()
    .to(store_result)
)

match() switches on data.input from the previous chunk. Use it when you have a small set of discrete values; for predicates, prefer if_condition.

batch — parallel named branches

async def echo(data):
    return f"echo: {data.input}"

flow.batch(
    ("a", echo),
    ("b", echo),
    ("c", echo),
).to(store_batch)

All branches run in parallel against the same data.input. The next chunk receives a list (or dict, depending on configuration) of all branch outputs.

Throttle concurrency at the execution level:

execution = flow.create_execution(concurrency=2)

for_each — fan-out over a sequence input

async def double(data):
    return data.input * 2

(
    flow.for_each(concurrency=2)
        .to(double)
    .end_for_each()
    .to(store_items)
)

execution = flow.create_execution()
await execution.async_start([1, 2, 3, 4])
# store_items receives [2, 4, 6, 8]

for_each inspects the previous chunk’s output (or the start input): non-string Sequence values are expanded into items; scalar values are treated as one item. Each item runs through the body in parallel up to the concurrency cap, and results are collected in input order.

If you want “run N times”, return a sequence explicitly from the previous chunk:

async def make_range(data):
    return list(range(data.input))

flow.to(make_range).for_each().to(double).end_for_each()

Event-driven loops

Python for loops still belong inside handler functions. At the graph level, repeated fan-out is for_each; loops driven by flow-internal signals are expressed with emit + when:

flow = TriggerFlow(name="loop")

async def start_loop(data):
    await data.async_set_state("values", [], emit=False)
    data.emit_nowait("Loop", 0)

async def loop_step(data):
    values = data.get_state("values", []) or []
    values.append(data.input)
    await data.async_set_state("values", values, emit=False)
    if data.input < 3:
        data.emit_nowait("Loop", data.input + 1)
    else:
        await data.async_set_state("done", {"last": data.input, "count": len(values)})

flow.to(start_loop)
flow.when("Loop").to(loop_step)

Mechanics:

Pass emit=False to async_set_state when you want to update state without triggering observers — useful inside hot loops to keep observation overhead reasonable.

For long loops, give the execution a sensible auto_close_timeout (or auto_close=False + manual close()) so it doesn’t fall off the cliff during a brief pause between iterations.

Side branches that don’t block the main path

A when(...) branch and the main chain run independently. You can use this for fire-and-forget logging, telemetry, or out-of-band notifications:

flow.to(main_step)

@flow.when("MainStepDone").to
async def log_step(data):
    await some_external_log(data.input)

main_step runs data.async_emit("MainStepDone", {...}) and the side branch fans out from there without blocking the main return value.

Combining patterns

A single flow often mixes patterns. The sub-flow page has a worked example with if_condition + for_each + sub-flow composition; see Sub-Flow.

See also