跳转至

Agently 4:把“智能体落地”变成可控的工程问题

在和大量企业团队共创的过程中,我们反复听到同一句话:“Demo 很快,生产很难。”
难的不是模型能不能回答,而是系统能不能在真实流量、真实数据、真实依赖下稳定运行——能被工程化、能被测试、能被运维、能被持续迭代。

Agently 4 的目标很明确:把 LLM 的不确定性收进工程边界,给团队一套“可控输出 + 可编排流程 + 可观测执行”的框架能力,让智能体从“灵感产品”变成“可靠系统”。

下面是我们在企业落地里最常见、也最容易让项目“卡住”的问题场景,以及 Agently 的应对方式与可预期收益。

我们如何设计 Agently:把不确定性收进边界

  • 先定契约,再生成:结构化输出把模型变成“填表器”,而不是“随笔作者”。见 输出格式控制
  • 先定流程,再自治:多步骤任务用 TriggerFlow 约束成可执行图,自治发生在可控的节点里。见 TriggerFlow 总览
  • 先留证据,再上线:工具调用、元数据、运行态事件都可追溯,便于排障与评估。见 ToolsResponse ResultRuntime Stream

场景 1:接口契约被“自然语言”打穿

我们见过太多这样的故事:团队把 LLM 接进业务服务,要求它输出 JSON;上线后却发现同样的字段,有时少 key、有时多解释、有时直接把 JSON 包在一段“礼貌寒暄”里。解析失败不是小问题——它会像多米诺骨牌一样传导到下游:队列积压、重试风暴、告警爆炸。

Agently 的做法:output() 定义结构化契约,用 ensure_keys 做关键字段的校验与重试,把“输出稳定性”从模型能力变成框架能力。

更关键的是,这套结构化输出并不要求你依赖某个模型服务端的“专用开关”(例如 response_format/json_schema 之类参数)来保证 JSON 一定合法:schema 对齐、流式解析、关键字段校验与重试都发生在框架处理过程中。只要底层能提供常规的对话/补全能力,Agently 就能把输出“收口”成可用的结构化数据——这让你在切换模型、切换推理服务时,依然能保住接口契约。对于 2024 年以来主流的 7B+ 指令模型(本地或云端)尤其友好:是真正意义上的“与模型依赖解耦”。

预期收益: - 下游解析稳定,接口契约可被工程团队验收 - 失败可控(可重试/可降级/可容错),减少线上事故 - 需求交付更快:产品要什么字段,工程直接把 schema 写出来

from agently import Agently

agent = Agently.create_agent()

release = (
    agent.input("写一段本周版本发布公告,面向企业客户")
    .output(
        {
            "title": (str, "标题"),
            "highlights": [(str, "3-5 个要点")],
            "compatibility": (str, "兼容性说明"),
            "risk_notes": (str, "风险与回滚提示"),
        }
    )
    .start(
        ensure_keys=["title", "highlights[*]", "risk_notes"],
        max_retries=1,
        raise_ensure_failure=False,
    )
)
print(release)

场景 2:工具越接越多,系统越跑越“玄学”

在企业里,智能体迟早要接工具:搜索、知识库、工单、CRM、审批、数据库、内部 API……现实往往是:第一个工具很快接上;第十个工具开始出现“调用错参/调用错时机/调用失败没人知道”的混乱,最后变成“只有某个同学电脑上能跑通”的系统。

Agently 的做法:统一工具注册与描述(内置工具 + 装饰器工具),并且把工具调用记录沉淀到 extra,让每一次“模型调用了什么、拿到了什么结果”可复盘、可审计。

我们特别强调一点:在 Agently 里,“要不要用工具、用哪个工具、参数怎么组装”是一段内置在框架中的规划过程,而不是强依赖某个厂商接口是否实现了 function calling / tool calling。换句话说,哪怕底层只是一个普通的 chat 接口(很多 2024 年以来的 7B+ 新模型都属于这种可用范畴),Agently 也能用结构化规划 + 运行时注入把工具链跑起来,并留下可追踪的证据链——工具调用能力同样做到与模型接口能力解耦

预期收益: - 工具集成规范化,降低长期维护成本 - 调用链可追踪,排障从“猜”变成“查证据” - 安全边界更清晰:能调用什么工具、能拿到什么数据,一目了然

from agently import Agently

agent = Agently.create_agent()

@agent.tool_func
def lookup_order(order_id: str) -> str:
    return f"order:{order_id}"

agent.use_tools(lookup_order)
response = agent.input("查一下订单 A1001 的状态,并解释原因").get_response()

extra = response.result.full_result_data.get("extra", {})
print("[tool_calls]", extra.get("tool_calls") or extra.get("tool_logs"))
print("[answer]", response.result.get_text())

场景 3:多步骤任务“能跑,但不敢放给用户”

很多团队都经历过类似阶段:让智能体自己规划、自己调用工具、自己总结,看起来很聪明;但一旦进入生产,问题就出现了——走偏、死循环、在错误的时机调用高风险工具、或者某一步失败后没有可控的补偿策略。

Agently 的做法:用 TriggerFlow 把多步骤任务变成可编排、可分支、可收敛、可控并发的执行图。你可以明确每个节点的输入输出、分支条件、退出条件,以及“什么时候需要人工介入/什么时候可以自动降级”。见 TriggerFlow 总览Emit + When

预期收益: - 行为更可预测,能被 QA 验证、能被灰度发布 - 多人协作更顺:业务逻辑和模型节点边界清晰 - 失败可控:错误路径有设计,而不是“看模型心情”

from agently import TriggerFlow, TriggerFlowEventData

flow = TriggerFlow()

async def planner(data: TriggerFlowEventData):
    await data.async_emit("Task.Read", {"id": "A1001"})
    await data.async_emit("Task.Write", {"draft": True})
    return "planned"

flow.to(planner).end()
flow.when("Task.Read").to(lambda d: {"read": d.value}).collect("tasks", "read")
flow.when("Task.Write").to(lambda d: {"write": d.value}).collect("tasks", "write").end()

flow.start("go", wait_for_result=False)

场景 4:低代码编排“越画越乱”,迁移成代码服务却举步维艰

我们在不少企业里看到这样的落地路径:先用 n8n / Dify / Coze 等可视化编排工具快速搭出“能跑的智能体流程”,对齐业务、验证价值;但很快就会进入第二阶段——流程越来越长、分支越来越多、状态越来越复杂,靠拖拽节点已经难以维护:
改一个节点,不敢确定会不会影响旁路;流程复用只能复制粘贴;版本对比和回滚困难;更不用说单元测试、Code Review、CI/CD 这些“工程化标配”。

TriggerFlow 的价值在于:它几乎用最符合直觉的方式,把低代码里那些“节点/连线/分支/汇聚/循环/并发”翻译成代码。工程师可以按“流程图的思维”写出可维护的服务:
to(...) 是节点,when(...) 是分支监听,collect(...) 是汇聚,for_each(...)/batch(...) 是并发与批处理,runtime_data 是运行态上下文。你可以把低代码流程按模块逐段迁移,并马上享受到版本控制、可测试性与持续维护能力。

更关键的是,TriggerFlow 可以和 Agently 的 Instant 模式配合:在模型单次请求的生成过程中,实时截取结构化输出的“部分节点”,再通过信号驱动触发下游动作。这类“边生成、边编排、边执行”的实时编排能力,是许多低代码平台很难自然表达与稳定实现的。

预期收益: - 从“画流程”升级到“写服务”:可测试、可评审、可持续迭代 - 更复杂的编排逻辑可控落地:实时触发、事件驱动、并发与状态管理 - 产品体验更强:一边生成、一边执行动作与 UI 更新

下面是一个“陪伴机器人”示例:机器人一边说话(流式输出),一边根据模型生成的结构化动作节点实时做动作。动作触发来自 Instant 模式的 actions[*] 节点完成事件,而不是等整段回复生成完才开始执行。

import asyncio
from agently import Agently, TriggerFlow, TriggerFlowEventData

agent = Agently.create_agent()


class CompanionRobot:
    async def speak_delta(self, text_delta: str):
        # In real systems, this can be TTS streaming.
        await asyncio.sleep(0)

    async def do_action(self, action: dict):
        # Replace with real device SDK calls.
        print(f"\n[robot action] {action}\n", end="", flush=True)
        await asyncio.sleep(0.2)


robot = CompanionRobot()
flow = TriggerFlow()


async def plan_and_stream(data: TriggerFlowEventData):
    request = (
        agent.input({"user": data.value, "role": "companion"})
        .output(
            {
                "speech": (str, "What to say to the user, warm and supportive"),
                "actions": [
                    {
                        "type": (str, "Robot action type, e.g. 'wave'/'nod'/'dance'"),
                        "args": (dict, "Action parameters"),
                    }
                ],
            }
        )
    )

    async for instant in request.get_async_generator(type="instant"):
        # 1) Stream speech as it's generated (robot can speak while planning actions).
        if instant.path == "speech" and instant.delta:
            data.put_into_stream(instant.delta)
            await robot.speak_delta(instant.delta)

        # 2) As soon as an action item is completed, emit a signal for execution.
        if instant.wildcard_path == "actions[*]" and instant.is_complete:
            await data.async_emit("Robot.Action", instant.value)

    data.stop_stream()
    return "done"


async def exec_action(data: TriggerFlowEventData):
    await robot.do_action(data.value)
    return "ok"


flow.to(plan_and_stream).end()
flow.when("Robot.Action").to(exec_action).end()

# Runtime stream: you can forward this to UI, logs, or a realtime gateway.
for event in flow.get_runtime_stream("我今天有点低落,你能陪我聊聊吗?", timeout=None):
    print(event, end="", flush=True)

场景 5:真实流量一来,下游被打爆(限流、超时、重试风暴)

智能体“多步 + 多工具”的模式,天然会把一次用户请求放大成多次下游调用。一旦并发上来,最先出问题的往往不是模型,而是第三方 API、内部网关、数据库连接池。最可怕的是“雪崩”:超时 → 重试 → 更慢 → 更超时。

Agently 的做法:在流程层提供并发上限控制(batch / for_each(concurrency=...),让团队能把“系统承载”变成可配置的工程参数,而不是靠拍脑袋。见 并发

预期收益: - 保护下游依赖,降低雪崩概率 - 更容易做容量评估与压测:并发上限就是你的“阀门” - 成本更可控:避免无意义的并行与重复调用

import asyncio
from agently import TriggerFlow, TriggerFlowEventData

flow = TriggerFlow()

async def call_tool(data: TriggerFlowEventData):
    await asyncio.sleep(0.1)
    return f"ok:{data.value}"

(
    flow.to(lambda _: ["a", "b", "c", "d"])
    .for_each(concurrency=2)
    .to(call_tool)
    .end_for_each()
    .end()
)
print(flow.start())

场景 6:用户觉得“慢”,更觉得“黑箱”

LLM 的延迟不仅是客观时间,更是“感知时间”。很多系统端到端要 2-5 秒,但用户真正不满的是:2 秒里什么都没发生。更糟糕的是,遇到多步骤任务时,用户不知道系统卡在“检索”、还是卡在“工具”、还是卡在“生成”。

Agently 的做法:流式输出是默认能力delta 让你尽早把 token 推到 UI;instant / specific 让结构化字段、推理事件、工具事件都能被消费;TriggerFlow 还能用运行态 stream 输出进度事件。见 流式输出Runtime Stream

预期收益: - 更低的感知延迟(TTFB 更小),体验更接近“实时” - 多步骤任务可视化:让用户知道“系统正在做什么” - 更容易做产品化:把结构化流变成卡片、进度条、步骤面板

场景 7:答案必须“对齐知识库”,并且可溯源

很多工程师第一次做企业级 RAG,会被一个看似简单但很“折磨”的问题卡住:怎么证明答案真的来自知识库?
现实里,业务方/合规/一线客服经常会追问两句话: - “你这个结论依据是哪份文档的哪一段?” - “如果知识库里没有,能不能明确说不知道,而不是编?”

这件事如果只靠把检索结果拼进 prompt,很容易变成“玄学调参”:有时模型引用、有时不引用;有时引用了,但引用不到位;有时甚至把自己想出来的内容当“知识库结论”。工程师摸不着头脑,业务也不敢信任。

在 Agently 里,这件事会变得很工程化、也很简单:检索结果是结构化的(带 id/document/metadata),我们把它显式注入,并要求模型输出“答案 + 引用清单(按 id 指向证据)”,再用 ensure_keys 把引用作为硬性契约。见 知识库输出格式控制

预期收益: - 答案可审计:每次回复都带来源 id 与引用片段,方便复盘与合规 - 答案更对齐:通过“只允许基于 retrieval_results”的约束,降低跑偏与幻觉 - 治理更清晰:你能区分“检索没命中”和“生成没遵循”,优化方向明确

from agently import Agently
from agently.integrations.chromadb import ChromaCollection

# 1) Build / reuse a KB (demo)
embedding = Agently.create_agent()
collection = ChromaCollection(collection_name="demo", embedding_agent=embedding)
collection.add(
    [
        {
            "id": "kb-q3-001",
            "document": "Q3 业务目标:降低流失率;重点动作:提升新手引导与召回。",
            "metadata": {"dept": "sales", "doc": "OKR-2025Q3"},
        }
    ]
)

# 2) Retrieve evidence (each item has id/document/metadata/distance)
query = "今年 Q3 的重点是什么?"
retrieval_results = collection.query(query, top_n=5)

# 3) Answer with citations that point back to retrieval_results[*].id
agent = Agently.create_agent()
result = (
    agent.input(query)
    .info({"retrieval_results": retrieval_results})
    .output(
        {
            "answer": (str, "最终回答(只允许基于检索证据)"),
            "citations": [
                {
                    "source_id": (str, "Must in {retrieval_results.[].id}"),
                    "quote": (str, "直接引用 {retrieval_results} 中的原文片段"),
                    "why": (str, "这条引用支持了 answer 的哪部分"),
                }
            ],
            "not_found": (bool, "检索证据不足时为 true,并在 answer 中说明缺口"),
        }
    )
    .instruct(
        "你必须只基于 {retrieval_results} 回答;"
        "每个关键结论都要在 citations 中给出 source_id+quote;"
        "如果证据不足,not_found=true,并在 answer 中明确说明“知识库未覆盖/需要补充什么信息”。"
    )
    .start(
        ensure_keys=["answer", "citations[*].source_id", "citations[*].quote", "not_found"],
        max_retries=1,
        raise_ensure_failure=False,
    )
)
print(result)

场景 8:出了问题却无法复现,排障像“盲人摸象”

传统系统出了错,你能看日志、看 trace、看 metrics;而很多 LLM 系统出了错,只剩一句“模型答错了”。没有输入输出快照、没有工具调用轨迹、没有时序信息,排障就只能靠猜。

Agently 的做法:Response/Result 提供多视图结果与元数据,工具调用记录可取证,TriggerFlow 的运行态 stream 可输出关键事件。你可以用同一份证据复盘一次请求如何走到结果。见 Response Result

预期收益: - 排障成本下降,定位更快 - 更容易做评估闭环:把失败样本变成可回放数据 - 更容易做合规与审计:关键动作有迹可循

结语:我们希望交付的是“能上线、能运维、能迭代”的智能体

Agently 不替你决定用哪个模型、不替你搭 GPU 集群,但我们会把智能体落地里最容易失控的部分,变成你可以写进代码、写进规范、写进 SLA 的工程能力。
如果你正在从 Demo 走向生产,欢迎从这些页面开始:输出格式控制ToolsTriggerFlow 总览知识库