Agently documentation for building AI applications with stable outputs, observable actions, and durable workflows.
Languages: English · 中文
pause_for(...) lets a chunk stop at a durable interrupt barrier while it waits for an external event. The execution stays alive but idle. Auto-close is paused while a pause_for is outstanding. When the outside calls continue_with(...), TriggerFlow resumes the graph according to the interrupt’s resume target.
async def ask(data: TriggerFlowRuntimeData):
return await data.async_pause_for(
type="human_input",
payload={"question": f"Approve action for {data.input}?"},
resume_to="next",
)
What pause_for does:
execution.get_pending_interrupts().continue_with(interrupt_id, payload) is called, the graph resumes according to resume_to.| Argument | Meaning |
|---|---|
type= |
a string label (e.g. "human_input", "approval", "webhook"). The application uses this to decide how to surface the interrupt. |
payload= |
structured details for whatever’s responsible for resuming (UI to render a question, webhook recipient, etc.). |
resume_to= |
optional target for continue_with: "next", "self", or {"event": "EventName"}. |
resume_event= |
compatibility shortcut. If set without resume_to, continue_with and matching emit(...) route to that event. |
interrupt_id= |
optional. Specify the id yourself; otherwise the framework generates one. |
interrupt_id = next(iter(execution.get_pending_interrupts()))
await execution.async_continue_with(interrupt_id, {"approved": True})
With resume_to="next", the payload becomes the paused chunk’s output and the next .to(...) receives it.
With resume_to="self", the same chunk runs again. Use data.is_resume and data.resume.value:
async def gate(data: TriggerFlowRuntimeData):
if data.is_resume:
return {"decision": data.resume.value}
return await data.async_pause_for(
type="approval",
payload={"question": "Approve?"},
resume_to="self",
)
With resume_to={"event": "ApprovalGiven"}, TriggerFlow emits that event with the resume payload. resume_event="ApprovalGiven" keeps the older event-based behavior.
import asyncio
from agently import TriggerFlow, TriggerFlowRuntimeData
async def main():
flow = TriggerFlow(name="approval")
async def ask(data: TriggerFlowRuntimeData):
return await data.async_pause_for(
type="approval",
payload={"question": f"Approve refund for ticket {data.input}?"},
resume_to="next",
)
async def commit(data: TriggerFlowRuntimeData):
await data.async_set_state("decision", data.input)
flow.to(ask).to(commit)
execution = flow.create_execution(auto_close=False)
await execution.async_start("T-001")
# In a real system the UI / webhook would call continue_with later.
# Here we resume from the same coroutine for demo purposes.
interrupt_id = next(iter(execution.get_pending_interrupts()))
await execution.async_continue_with(interrupt_id, {"approved": True})
snapshot = await execution.async_close()
print(snapshot["decision"]) # {'approved': True}
asyncio.run(main())
Note: this flow uses pause_for(...). It must be created with flow.create_execution(...) (or flow.start_execution(...)), not flow.start(...) — the hidden execution sugar has no handle the outside can use to call continue_with, so TriggerFlow raises as soon as pause_for(...) is reached.
For a document-review example where the model itself decides to interrupt the
workflow, see examples/step_by_step/11-triggerflow-19_document_review_pause_resume.py.
It uses pause_for(..., resume_to="self") inside the model-owned gate, so the
same gate re-enters with data.is_resume and data.resume after human review.
pause_for(...) integrates cleanly with save / load:
execution = flow.create_execution(auto_close=False)
await execution.async_start("topic")
# at this point pause_for has been hit; an interrupt is pending
saved = execution.save()
# persist saved somewhere
# later, in a different process / worker:
restored = flow.create_execution(
auto_close=False,
runtime_resources={...}, # re-inject whatever the chunk needs
)
restored.load(saved)
interrupt_id = next(iter(restored.get_pending_interrupts()))
await restored.async_continue_with(interrupt_id, {"approved": True})
snapshot = await restored.async_close()
The interrupt is part of the saved state, so the new process knows what’s pending. See Persistence and Blueprint.
A single execution can have several outstanding interrupts (e.g., two parallel branches each waiting for human input). get_pending_interrupts() returns all of them; continue_with(id, payload) resolves one at a time.
If you want a specific id, supply interrupt_id="my-id" to pause_for(...) and use the same id in continue_with.
| Pattern | Use |
|---|---|
pause_for(..., resume_to="next") + continue_with |
the next graph step should receive the resume payload |
pause_for(..., resume_to="self") + continue_with |
the same chunk should run again with data.resume context |
emit + when(...) |
a separate handler should run when an event happens; the original chunk doesn’t need to wait |
Pause is the right choice for human-in-the-loop because the chunk’s logic depends on the human response. Emit/when is the right choice for fan-out side effects.
auto_close=True does not fire while any pause_for is outstanding. Once continue_with resolves the last pending interrupt and the execution becomes idle again, the auto-close timer restarts from zero.
If you want the execution to never auto-close while waiting indefinitely, use auto_close_timeout=None (and remember to call close() explicitly).
async_close() refuses to close while interrupts are still waiting. Resume them first, or explicitly cancel the waits:
snapshot = await execution.async_close(pending_interrupts="cancel")
runtime_resources after load()