Agently documentation for building AI applications with stable outputs, observable actions, and durable workflows.
Languages: English · 中文
You have a process with three or more discrete stages. At least one of these is true:
If none of those apply, stay in the request layer — see Quickstart and Output Control.
Application
│
▼
TriggerFlow definition (one Python module per flow)
├── prepare ← validate / normalize input
├── classify ← model call: route by type
├── (branch on classification)
│ ├── handle_A → … → finalize
│ ├── handle_B → … → finalize
│ └── handle_C → … → finalize
├── for_each items ← if a list comes back from any handler, fan out
├── pause_for(...) ← optional human approval
└── finalize ← write final state, push to runtime stream
Outside the flow:
• create_execution(auto_close=False, runtime_resources={...})
• async_start(...)
• consume runtime stream for live UI
• async_close() → close snapshot for the API response
from agently import TriggerFlow, TriggerFlowRuntimeData
def build_flow():
flow = TriggerFlow(name="orchestration")
async def prepare(data: TriggerFlowRuntimeData):
# validate / normalize input
await data.async_set_state("input", data.input)
return data.input
async def classify(data: TriggerFlowRuntimeData):
agent = data.require_resource("agent")
return await agent.input(data.input).output({
"category": (str, "Category", True),
}).async_start()
async def handle_default(data: TriggerFlowRuntimeData):
# ...
await data.async_set_state("answer", "...")
(
flow.to(prepare)
.to(classify)
.match()
.case("A").to(handle_default)
.case("B").to(handle_default)
.case_else().to(handle_default)
.end_match()
)
return flow
async def run(input_value, agent):
flow = build_flow()
execution = flow.create_execution(
auto_close=False,
runtime_resources={"agent": agent},
)
await execution.async_start(input_value)
return await execution.async_close()
A few choices baked into this skeleton:
auto_close=False — the application controls close explicitly. Use this whenever you might want to consume runtime stream items or pause for external input.state (it’s a live object) and isn’t in flow_data (which is shared and risky). See State and Resources.match() over classification result — discrete categories use match; predicate branches use if_condition.data.input and writes to state — handlers should be small and have one job each.Replace a single handler with a for_each:
async def list_subtasks(data):
return data.input["subtasks"] # a list
async def handle_one(data):
return await some_agent.input(data.input).async_start()
(
flow.to(list_subtasks)
.for_each(concurrency=4)
.to(handle_one)
.end_for_each()
.to(collect)
)
See Patterns for batch, for_each, and concurrency caps.
Add a pause_for step. The execution must be created with auto_close=False; resume with continue_with and an explicit resume_to target.
async def ask(data):
return await data.async_pause_for(
type="approval",
payload={"summary": data.input["summary"]},
resume_to="next",
)
See Pause and Resume.
Save the execution state at meaningful checkpoints (typically when a pause_for is outstanding), persist the result somewhere durable, and restore by flow.create_execution(...).load(saved).
saved = execution.save()
db.put(execution_id, saved)
# later, possibly in a different process:
restored = flow.create_execution(auto_close=False, runtime_resources={...})
restored.load(db.get(execution_id))
Re-inject runtime resources on the restore side. See Persistence and Blueprint.
Consume execution.get_async_runtime_stream(...) from a FastAPI or WebSocket handler. Have your chunks push items via data.async_put_into_stream(...). See FastAPI Service Exposure and Events and Streams.
.start() already retries via the validation pipeline. See Output Control.state. Use runtime_resources and re-inject on load().auto_close and the five entry APIs