Agently Docs

Agently documentation for building AI applications with stable outputs, observable actions, and durable workflows.

View the Project on GitHub AgentEra/Agently

TriggerFlow Overview

Languages: English · 中文

TriggerFlow is Agently’s orchestration layer. It owns:

It sits above the action runtime — your flow can call agents, tools, MCP servers, or anything else inside its handlers. It sits below your application code — the application decides which flow to run and what to pass in.

When to use it

You have Use
One model call (with retries / validation) a request, not a flow
Linear pipeline of 2–3 steps with no fan-out sometimes a flow is overkill; consider plain async
Branches based on intermediate results TriggerFlow if_condition or match
Concurrency across N inputs TriggerFlow for_each / batch
Long-running with human approval TriggerFlow pause_for
Needs to survive process restart TriggerFlow save / load
Live event stream to UI / SSE TriggerFlow runtime stream

If none of the right column applies, stay in the request layer.

Dynamic Task is a separate first-class Agently surface for model-generated or app-generated DAG data. Use Agently.create_dynamic_task(...) when the plan itself must be planned, validated, pruned, and executed. Dynamic Task uses TriggerFlow as its execution substrate, but it is not a TriggerFlow sub-API. See Dynamic Task.

Mental model

┌──────────────────────────────────────┐
│ application code                     │
│   create execution → start → close   │
└────────────────┬─────────────────────┘
                 │
   ┌─────────────▼──────────────┐
   │  TriggerFlow execution     │
   │  open → sealed → closed    │
   │   • state (snapshot)       │
   │   • runtime_resources      │  ◄── live objects you inject
   │   • runtime stream         │  ◄── items chunks emit
   │   • pending interrupts     │
   └─────────────┬──────────────┘
                 │
   chunks (async functions you write) call agents, tools,
   external APIs, then update state and/or emit events

A TriggerFlow object is the definition — the chain of handlers and branches. An execution is one run of that definition. You can have many concurrent executions of the same flow.

Hello flow

import asyncio
from agently import TriggerFlow, TriggerFlowRuntimeData


async def hello():
    flow = TriggerFlow(name="hello")

    async def greet(data: TriggerFlowRuntimeData):
        await data.async_set_state("greeting", f"Hello, {data.input}!")

    flow.to(greet)

    execution = flow.create_execution()
    await execution.async_start("World")
    snapshot = await execution.async_close()
    print(snapshot["greeting"])  # Hello, World!


asyncio.run(hello())

What happened:

  1. TriggerFlow(name=...) defines a flow.
  2. flow.to(greet) chains a handler. The handler receives data: TriggerFlowRuntimeData with data.input (= the value passed to start()).
  3. flow.create_execution() makes one runnable execution.
  4. async_start("World") starts it; async_close() waits for everything to drain and returns the close snapshot — a dict of all state set by handlers.

Hidden execution sugar

When you don’t need to control the execution explicitly, flow.start(...) / flow.async_start(...) create a temporary execution, run it to close, and return the snapshot:

snapshot = await flow.async_start("World")
print(snapshot["greeting"])

Use this for scripts. Don’t use it when the flow pauses for human input or expects external events — see Lifecycle.

What chunks can do

Inside a chunk handler, data exposes:

API Purpose
data.input the value flowing in (start input, or previous chunk’s return)
data.async_set_state(key, value) / get_state(key) execution-local serializable state
data.async_emit(event, payload) trigger when(event) branches
data.async_put_into_stream(item) push to runtime stream
data.async_pause_for(type=..., resume_to=...) pause for external graph resumption
data.require_resource(name) fetch a live object you injected
return value becomes the next chunk’s data.input

The full vocabulary lives in the rest of this section.