Agently documentation for building AI applications with stable outputs, observable actions, and durable workflows.
Languages: English · 中文
You receive a stream of items (tickets, emails, alerts, requests). For each, you need to:
The model is doing the classification (and possibly some of the handling). You want stable categories, predictable retries, and an audit trail of what was decided.
This is small enough that you can decide between two shapes:
from agently import Agently
agent = Agently.create_agent()
result = (
agent
.info({
"categories": ["billing", "technical", "spam", "other"],
"format": "Reply only with the schema below.",
}, always=True)
.input(ticket_text)
.output({
"category": (str, "One of billing/technical/spam/other", True),
"severity": (str, "low/med/high", True),
"summary": (str, "One-line summary", True),
})
.validate(ensure_known_category)
.start()
)
route_to_handler(result["category"], result)
info(always=True) keeps the category list visible to the model on every call without bloating per-request prompts. .validate(...) enforces that category is one of the allowed strings — see Output Control.
The Python route_to_handler(...) is plain code: a dict of category → function.
Do not make tokenization, word segmentation, keyword hits, substring rules, or regex the owner of semantic routing. The model owns the classification through the output schema, and deterministic code dispatches from the validated structured category. Deterministic preprocessing is still fine for non-semantic work such as exact ID lookup, deduplication, normalization, or hard policy gates. Small or local models can be enough for short category lists and simple rules; use a larger model when the labels, rules, ambiguity, risk, or returned structure are more complex.
When per-category handling has its own steps:
def build_flow():
flow = TriggerFlow(name="triage")
async def classify(data: TriggerFlowRuntimeData):
return await classifier.input(data.input).output({
"category": (str, "...", True),
"severity": (str, "...", True),
"summary": (str, "...", True),
}).async_start()
async def handle_billing(data):
# multi-step billing flow ...
await data.async_set_state("outcome", {"path": "billing", "ok": True})
async def handle_technical(data):
# multi-step technical flow ...
await data.async_set_state("outcome", {"path": "technical", "ok": True})
async def handle_spam(data):
await data.async_set_state("outcome", {"path": "spam", "ok": True})
async def handle_other(data):
await data.async_set_state("outcome", {"path": "other", "ok": True})
(
flow.to(classify)
.match_on(lambda d: d.input["category"]) # or use match() + cases on the category value
.case("billing").to(handle_billing)
.case("technical").to(handle_technical)
.case("spam").to(handle_spam)
.case_else().to(handle_other)
.end_match()
)
return flow
Each per-category handler can grow into its own sub-flow if it gets complicated — see Sub-Flow.
When tickets arrive in batches, fan out and process in parallel:
flow.for_each(concurrency=8).to(triage_one_ticket).end_for_each().to(persist_results)
Set concurrency to whatever your model rate limit and downstream APIs can sustain.
For high-stakes categories (refunds, account closures), pause the flow and wait for a human:
async def maybe_request_approval(data):
if data.input["category"] == "refund" and data.input["amount"] > 1000:
return await data.async_pause_for(
type="approval",
payload={"ticket_id": data.input["id"], "amount": data.input["amount"]},
resume_to="next",
)
return data.input
The execution must be created with auto_close=False (see Pause and Resume).
Push each decision to runtime stream so an external logger can record it:
async def classify(data):
result = await classifier.input(data.input).output({...}).async_start()
await data.async_put_into_stream({"event": "classified", "result": result})
return result
Consume from execution.get_async_runtime_stream(...) outside the flow.
category, severity, summary), then let your code route. Models are good at classifying; orchestration logic belongs in your code.flow_data. Use runtime_resources (or pin the agent at module level)..validate(...) for category enforcement(type, "...", True) for the classification fieldsmatch and case