Agently Docs

Agently documentation for building AI applications with stable outputs, observable actions, and durable workflows.

View the Project on GitHub AgentEra/Agently

TriggerFlow Orchestration Playbook

Languages: English · 中文

When to use this playbook

You have a process with three or more discrete stages. At least one of these is true:

If none of those apply, stay in the request layer — see Quickstart and Output Control.

Application
   │
   ▼
TriggerFlow definition  (one Python module per flow)
   ├── prepare         ← validate / normalize input
   ├── classify        ← model call: route by type
   ├── (branch on classification)
   │     ├── handle_A → … → finalize
   │     ├── handle_B → … → finalize
   │     └── handle_C → … → finalize
   ├── for_each items  ← if a list comes back from any handler, fan out
   ├── pause_for(...)  ← optional human approval
   └── finalize        ← write final state, push to runtime stream

Outside the flow:
   • create_execution(auto_close=False, runtime_resources={...})
   • async_start(...)
   • consume runtime stream for live UI
   • async_close() → close snapshot for the API response

Skeleton

from agently import TriggerFlow, TriggerFlowRuntimeData


def build_flow():
    flow = TriggerFlow(name="orchestration")

    async def prepare(data: TriggerFlowRuntimeData):
        # validate / normalize input
        await data.async_set_state("input", data.input)
        return data.input

    async def classify(data: TriggerFlowRuntimeData):
        agent = data.require_resource("agent")
        return await agent.input(data.input).output({
            "category": (str, "Category", True),
        }).async_start()

    async def handle_default(data: TriggerFlowRuntimeData):
        # ...
        await data.async_set_state("answer", "...")

    (
        flow.to(prepare)
            .to(classify)
            .match()
                .case("A").to(handle_default)
                .case("B").to(handle_default)
                .case_else().to(handle_default)
            .end_match()
    )

    return flow


async def run(input_value, agent):
    flow = build_flow()
    execution = flow.create_execution(
        auto_close=False,
        runtime_resources={"agent": agent},
    )
    await execution.async_start(input_value)
    return await execution.async_close()

A few choices baked into this skeleton:

Variations

Need to fan out

Replace a single handler with a for_each:

async def list_subtasks(data):
    return data.input["subtasks"]   # a list

async def handle_one(data):
    return await some_agent.input(data.input).async_start()

(
    flow.to(list_subtasks)
        .for_each(concurrency=4)
            .to(handle_one)
        .end_for_each()
        .to(collect)
)

See Patterns for batch, for_each, and concurrency caps.

Need human approval

Add a pause_for step. The execution must be created with auto_close=False; resume with continue_with and an explicit resume_to target.

async def ask(data):
    return await data.async_pause_for(
        type="approval",
        payload={"summary": data.input["summary"]},
        resume_to="next",
    )

See Pause and Resume.

Need to survive a restart

Save the execution state at meaningful checkpoints (typically when a pause_for is outstanding), persist the result somewhere durable, and restore by flow.create_execution(...).load(saved).

saved = execution.save()
db.put(execution_id, saved)

# later, possibly in a different process:
restored = flow.create_execution(auto_close=False, runtime_resources={...})
restored.load(db.get(execution_id))

Re-inject runtime resources on the restore side. See Persistence and Blueprint.

Need a streaming UI

Consume execution.get_async_runtime_stream(...) from a FastAPI or WebSocket handler. Have your chunks push items via data.async_put_into_stream(...). See FastAPI Service Exposure and Events and Streams.

What to skip