Agently documentation for building AI applications with stable outputs, observable actions, and durable workflows.
Languages: English · 中文
A TriggerFlow execution moves through three states. Five entry APIs let you control how it starts and ends.
open ──seal()──► sealed ──close()──► closed
│ │
└─── (auto_close fires after idle timeout) ──┘
| State | What it accepts | What still runs |
|---|---|---|
open |
new external events (emit, continue_with) |
everything: chunks, runtime stream, registered tasks |
sealed |
nothing new from outside | already-accepted events, internal emit chains, registered tasks continue to drain |
closed |
nothing | runtime stream is closed; close snapshot is frozen |
Key distinction: seal() stops external input but lets in-flight work finish. close() does seal first, then drains and freezes.
| API | Purpose | Returns |
|---|---|---|
flow.start(...) / flow.async_start(...) |
hidden-execution sugar; create + start + wait + close | close snapshot |
flow.start_execution(...) / flow.async_start_execution(...) |
explicit launch; you keep the execution handle | execution |
execution.start(...) / execution.async_start(...) |
start an execution you already created | close snapshot if auto_close=True; execution if auto_close=False |
execution.seal() / execution.async_seal() |
runtime seal | — |
execution.close() / execution.async_close() |
finalize | close snapshot |
flow.start(...) — hidden sugarsnapshot = await flow.async_start("input value")
What it does internally: create_execution(auto_close=True, auto_close_timeout=0.0), start, wait until close, return snapshot.
Rules:
auto_close=False is illegal here — raises immediately.wait_for_result= value is ignored with a warning. Return type is fixed to the close snapshot.timeout= is treated as auto_close_timeout — how long to wait after the last activity before auto-closing.pause_for(...), do not use flow.start() — there is no handle for the outside to resume against. TriggerFlow fails fast when hidden execution sugar reaches pause_for(...). Use flow.start_execution(...) or flow.create_execution(...).flow.start_execution(...) — explicit launchexecution = await flow.async_start_execution("input value")
# ... do something with the handle ...
snapshot = await execution.async_close()
Returns the execution. You decide when to close. Suited for services, SSE/WebSocket streams, human-in-the-loop, external emit() callers.
wait_for_result= is ignored here too.
execution.start(...) — start a pre-built executionexecution = flow.create_execution(auto_close=True)
snapshot = await execution.async_start("input") # returns close snapshot
execution = flow.create_execution(auto_close=False)
exec2 = await execution.async_start("input") # returns the execution
# ... do work, then ...
snapshot = await execution.async_close()
auto_close |
async_start returns |
|---|---|
True (default) |
close snapshot |
False |
the execution itself |
Sync start() only supports auto_close=True. If your execution must be manually closed, use await execution.async_start(...) instead.
execution.seal() — stop new input, let in-flight finishawait execution.async_seal()
After seal:
emit() / continue_with() calls are rejected.emit chains, and registered tasks keep running.Use seal when you want to stop accepting new work but still finish what’s in flight, and you’ll close later (or let auto_close close it).
execution.close() — finalize and return snapshotsnapshot = await execution.async_close()
What close does, in order:
timeout= on close is the drain timeout — the maximum wait for in-flight tasks before forcing the close. It is not the auto-close timer.
auto_close=True (the default for create_execution) means the execution will close itself after auto_close_timeout seconds of being idle — no chunks running, no events to process, no pending pause.
| Source | Default auto_close_timeout |
|---|---|
flow.create_execution(...) |
10.0 seconds |
flow.start(...) / flow.async_start(...) (hidden sugar) |
0.0 seconds (close as soon as idle) |
pause_for(...) pauses the auto-close timer. After continue_with(...), the idle timer starts fresh.
close() / async_close() reject pending interrupts by default. Resume them first, or explicitly cancel them with pending_interrupts="cancel" when shutdown should abandon the wait.
auto_close_timeout=None disables auto-close — the execution stays alive until you call close() explicitly. Don’t combine auto_close_timeout=None with hidden sugar — flow.start() would never return.
| Situation | Use |
|---|---|
| Quick script, all inputs known up front | flow.start(...) / flow.async_start(...) |
| Service that needs to keep emitting / consuming runtime stream | flow.start_execution(...) |
Need pause_for(...) (human approval, async webhook) |
flow.create_execution(auto_close=False) + execution.async_start(...) + manual close() |
| Need to save and resume across restarts | create_execution(...) + execution.save() / load() |
# This flow pauses for user input — DO NOT use flow.start()
flow = TriggerFlow(name="approval")
async def ask(data):
return await data.async_pause_for(type="approval", resume_to="next")
async def commit(data):
await data.async_set_state("approved", data.input)
flow.to(ask).to(commit)
execution = flow.create_execution(auto_close=False)
await execution.async_start(None)
# ... wait for an external system to call execution.async_continue_with(...) ...
snapshot = await execution.async_close()
If you’d written await flow.async_start(None) instead, TriggerFlow would raise when pause_for(...) is reached because the hidden execution has no resumable handle.
If you need to stop a waiting execution without resuming it, make that explicit:
snapshot = await execution.async_close(pending_interrupts="cancel")
| Parameter | Status |
|---|---|
wait_for_result=True / False |
value is ignored, warning emitted; return type is governed by auto_close |
set_result() / get_result() / .end() |
deprecated; see Compatibility |
runtime_data (get_runtime_data / set_runtime_data etc.) |
deprecated alias of state; see State and Resources |
pause_for and continue_withsave / load