Agently documentation for building AI applications with stable outputs, observable actions, and durable workflows.
语言:English · 中文
收到一串 item(工单、邮件、告警、请求)。每个需要:
模型负责分类(可能也负责部分处理)。你想要稳定的类别、可预期的重试、决策的审计轨。
足够小,可以在两种形态间选:
from agently import Agently
agent = Agently.create_agent()
result = (
agent
.info({
"categories": ["billing", "technical", "spam", "other"],
"format": "仅按下面的 schema 回答。",
}, always=True)
.input(ticket_text)
.output({
"category": (str, "billing/technical/spam/other 之一", True),
"severity": (str, "low/med/high", True),
"summary": (str, "一行摘要", True),
})
.validate(ensure_known_category)
.start()
)
route_to_handler(result["category"], result)
info(always=True) 让分类列表每次都对模型可见,不撑大每次请求。.validate(...) 强制 category 是允许字符串之一 —— 见 输出控制。
route_to_handler(...) 是普通 Python:一个 category → 函数的 dict。
不要让分词、关键词命中、子串规则或正则成为语义路由的 owner。语义分类应由模型通过 output schema 产出结构化结果,确定性代码消费校验后的 category 并执行分派。确定性预处理仍可用于精确 ID 查询、去重、规范化或硬性策略闸门这类非语义工作。类别少、规则简单时可以用较小模型,条件允许也可以用本地模型;类别多、规则交织、输入歧义高、风险高或返回结构复杂时,使用更大参数的模型。
每类处理本身有自己的步骤时:
def build_flow():
flow = TriggerFlow(name="triage")
async def classify(data: TriggerFlowRuntimeData):
return await classifier.input(data.input).output({
"category": (str, "...", True),
"severity": (str, "...", True),
"summary": (str, "...", True),
}).async_start()
async def handle_billing(data):
# 多步 billing flow ...
await data.async_set_state("outcome", {"path": "billing", "ok": True})
async def handle_technical(data):
await data.async_set_state("outcome", {"path": "technical", "ok": True})
async def handle_spam(data):
await data.async_set_state("outcome", {"path": "spam", "ok": True})
async def handle_other(data):
await data.async_set_state("outcome", {"path": "other", "ok": True})
(
flow.to(classify)
.match_on(lambda d: d.input["category"])
.case("billing").to(handle_billing)
.case("technical").to(handle_technical)
.case("spam").to(handle_spam)
.case_else().to(handle_other)
.end_match()
)
return flow
每个分类处理可成长为自己的子流 —— 见 Sub-Flow。
工单批量到达时 fan out 并行处理:
flow.for_each(concurrency=8).to(triage_one_ticket).end_for_each().to(persist_results)
concurrency 设到模型限速与下游 API 能扛的程度。
高风险类别(退款、关账户)暂停 flow 等人工:
async def maybe_request_approval(data):
if data.input["category"] == "refund" and data.input["amount"] > 1000:
return await data.async_pause_for(
type="approval",
payload={"ticket_id": data.input["id"], "amount": data.input["amount"]},
resume_to="next",
)
return data.input
execution 必须以 auto_close=False 创建(见 Pause 与 Resume)。
把每个决策推到 runtime stream 让外部 logger 记录:
async def classify(data):
result = await classifier.input(data.input).output({...}).async_start()
await data.async_put_into_stream({"event": "classified", "result": result})
return result
flow 之外消费 execution.get_async_runtime_stream(...)。
category、severity、summary),让你的代码路由。模型擅长分类;编排逻辑属于代码。flow_data。用 runtime_resources(或在模块级 pin 住 agent)。.validate(...) 强制类别(type, "...", True)match 与 case