Agently documentation for building AI applications with stable outputs, observable actions, and durable workflows.
Languages: English · 中文
A chunk handler is a regular async function. You can call any agent, request, or response API inside it. The good patterns concentrate on three things: async (because the surrounding flow is async), structured output (because the next chunk expects a known shape), and streaming when the user actually benefits.
from agently import Agently, TriggerFlow, TriggerFlowRuntimeData
agent = Agently.create_agent()
async def classify(data: TriggerFlowRuntimeData):
result = await (
agent
.input(data.input)
.output({
"category": (str, "Category", True),
"confidence": (float, "0.0 to 1.0"),
})
.async_start()
)
await data.async_set_state("classification", result)
return result
flow = TriggerFlow(name="classify")
flow.to(classify)
The agent is created at module scope so it’s reused across executions. await ... async_start() returns the parsed dict. The dict goes into state for the close snapshot, and is also returned so the next chunk receives it as data.input.
The surrounding flow is async. Calling sync start() inside a chunk works but blocks the event loop while the model request is in flight, hurting concurrency. Use async_start() / async_get_data() / get_async_generator(...). See Async First.
When the UI consuming the runtime stream benefits from incremental updates, push completed structured fields out as they arrive:
async def draft_with_streaming(data: TriggerFlowRuntimeData):
response = (
agent
.input(data.input)
.output({
"title": (str, "Title", True),
"body": (str, "Body", True),
})
.get_response()
)
async for item in response.get_async_generator(type="instant"):
if item.is_complete:
await data.async_put_into_stream({"path": item.path, "value": item.value})
final = await response.async_get_data()
await data.async_set_state("draft", final)
return final
type="instant" yields per-leaf events as each field finishes parsing — the consumer of the runtime stream sees title complete before body is done. After the stream ends, async_get_data() returns the cached parsed dict (no second request).
Call get_response() once, then read text + data + meta from response.result without re-issuing. See Model Response:
async def step(data):
response = agent.input(data.input).output({...}).get_response()
text = await response.result.async_get_text()
obj = await response.result.async_get_data()
meta = await response.result.async_get_meta()
await data.async_set_state("text", text)
await data.async_set_state("obj", obj)
await data.async_set_state("meta", meta)
If the flow’s chunks need different model configuration per execution, inject the configured agent via runtime resources:
execution = flow.create_execution(
runtime_resources={"agent": Agently.create_agent().set_settings(...)},
)
async def step(data):
agent = data.require_resource("agent")
return await agent.input(data.input).async_start()
Don’t put the agent in state — agents hold network clients and aren’t snapshot-friendly. Use runtime_resources (see State and Resources).
.validate(...) and ensure_keys work the same way inside a chunk as they do at the request layer. The retry budget is per-request, so a chunk that needs to retry the model call doesn’t affect the rest of the flow. See Output Control.
async def step(data):
return await (
agent
.input(data.input)
.output({"answer": (str, "answer", True)})
.validate(custom_business_check)
.async_start(max_retries=5)
)
flow_data is shared across all executions of the flow and emits a warning. Don’t use it to “remember the last model answer” — use state for execution-local memory, or a real session if it’s a multi-turn conversation. See Session Memory.
Multiple chunks can use multiple agents — different model providers, different prompt configurations, different tool sets:
classifier = Agently.create_agent().set_settings("OpenAICompatible", {"model": "${ENV.CLASSIFIER_MODEL}"})
writer = Agently.create_agent().set_settings("OpenAICompatible", {"model": "${ENV.WRITER_MODEL}"})
async def classify(data):
return await classifier.input(data.input).output({...}).async_start()
async def draft(data):
return await writer.input(data.input).async_start()
flow.to(classify).to(draft)
This is how TriggerFlow plays the orchestration role: the flow keeps the wiring; each agent stays a small, focused unit.
get_response() and the result cache