Agently Docs

Agently documentation for building AI applications with stable outputs, observable actions, and durable workflows.

View the Project on GitHub AgentEra/Agently

工单分流 Playbook

语言:English · 中文

何时用本 playbook

收到一串 item(工单、邮件、告警、请求)。每个需要:

  1. 分类到一个小集合的类别。
  2. 按分类选下游 handler。
  3. 跑 handler(调 API、调模型、升级到人)。
  4. 记录结果。

模型负责分类(可能也负责部分处理)。你想要稳定的类别、可预期的重试、决策的审计轨。

推荐结构

足够小,可以在两种形态间选:

单次请求形态

from agently import Agently

agent = Agently.create_agent()

result = (
    agent
    .info({
        "categories": ["billing", "technical", "spam", "other"],
        "format": "仅按下面的 schema 回答。",
    }, always=True)
    .input(ticket_text)
    .output({
        "category": (str, "billing/technical/spam/other 之一", True),
        "severity": (str, "low/med/high", True),
        "summary": (str, "一行摘要", True),
    })
    .validate(ensure_known_category)
    .start()
)

route_to_handler(result["category"], result)

info(always=True) 让分类列表每次都对模型可见,不撑大每次请求。.validate(...) 强制 category 是允许字符串之一 —— 见 输出控制

route_to_handler(...) 是普通 Python:一个 category → 函数的 dict。

不要让分词、关键词命中、子串规则或正则成为语义路由的 owner。语义分类应由模型通过 output schema 产出结构化结果,确定性代码消费校验后的 category 并执行分派。确定性预处理仍可用于精确 ID 查询、去重、规范化或硬性策略闸门这类非语义工作。类别少、规则简单时可以用较小模型,条件允许也可以用本地模型;类别多、规则交织、输入歧义高、风险高或返回结构复杂时,使用更大参数的模型。

TriggerFlow 形态

每类处理本身有自己的步骤时:

def build_flow():
    flow = TriggerFlow(name="triage")

    async def classify(data: TriggerFlowRuntimeData):
        return await classifier.input(data.input).output({
            "category": (str, "...", True),
            "severity": (str, "...", True),
            "summary": (str, "...", True),
        }).async_start()

    async def handle_billing(data):
        # 多步 billing flow ...
        await data.async_set_state("outcome", {"path": "billing", "ok": True})

    async def handle_technical(data):
        await data.async_set_state("outcome", {"path": "technical", "ok": True})

    async def handle_spam(data):
        await data.async_set_state("outcome", {"path": "spam", "ok": True})

    async def handle_other(data):
        await data.async_set_state("outcome", {"path": "other", "ok": True})

    (
        flow.to(classify)
        .match_on(lambda d: d.input["category"])
            .case("billing").to(handle_billing)
            .case("technical").to(handle_technical)
            .case("spam").to(handle_spam)
            .case_else().to(handle_other)
        .end_match()
    )

    return flow

每个分类处理可成长为自己的子流 —— 见 Sub-Flow

变体

高量 —— batch 并行

工单批量到达时 fan out 并行处理:

flow.for_each(concurrency=8).to(triage_one_ticket).end_for_each().to(persist_results)

concurrency 设到模型限速与下游 API 能扛的程度。

部分类别需要人工批准

高风险类别(退款、关账户)暂停 flow 等人工:

async def maybe_request_approval(data):
    if data.input["category"] == "refund" and data.input["amount"] > 1000:
        return await data.async_pause_for(
            type="approval",
            payload={"ticket_id": data.input["id"], "amount": data.input["amount"]},
            resume_to="next",
        )
    return data.input

execution 必须以 auto_close=False 创建(见 Pause 与 Resume)。

审计轨

把每个决策推到 runtime stream 让外部 logger 记录:

async def classify(data):
    result = await classifier.input(data.input).output({...}).async_start()
    await data.async_put_into_stream({"event": "classified", "result": result})
    return result

flow 之外消费 execution.get_async_runtime_stream(...)

不要做什么

交叉链接