Agently Docs

Agently documentation for building AI applications with stable outputs, observable actions, and durable workflows.

View the Project on GitHub AgentEra/Agently

Dynamic Task

Dynamic Task 是 Agently 的一等动态任务面,用简洁的应用层 API 执行模型或 应用生成的 DAG。内部会校验 TaskDAG、解析任务 handler,并把图编译成普通 TriggerFlow execution 作为实现基座。

task = Agently.create_dynamic_task(target="review policy")
result = await task.async_start()

调用方已经有计划时,直接传入 TaskDAG,跳过模型规划:

async def local_handler(context):
    return {
        "task_id": context.task.id,
        "deps": dict(context.dependency_results),
    }

task = Agently.create_dynamic_task(
    target="review policy",
    plan={
        "graph_id": "review",
        "task_schema_version": "task_dag/v1",
        "tasks": [
            {"id": "extract", "kind": "local", "binding": "local_handler"},
            {
                "id": "final",
                "kind": "local",
                "binding": "local_handler",
                "depends_on": ["extract"],
            },
        ],
        "semantic_outputs": {"final": "final"},
    },
    handlers={"local_handler": local_handler},
)
snapshot = await task.async_start(timeout=10)

提交式 DAG 的 inputs 可以用占位符引用运行时数据。整个字符串就是占位符时会保留原始值类型;占位符嵌在普通字符串里时会渲染成字符串。Slot 名大小写不敏感,但文档推荐大写:

plan = {
    "graph_id": "review",
    "task_schema_version": "task_dag/v1",
    "tasks": [
        {"id": "lookup", "kind": "local", "binding": "local_handler"},
        {
            "id": "final",
            "kind": "local",
            "binding": "local_handler",
            "depends_on": ["lookup"],
            "inputs": {
                "account": "${INIT.account}",
                "ticket": "${DEPS.lookup.ticket}",
                "summary": "Ticket ${STATE.task_results.lookup.ticket.id} for ${INIT.account}",
            },
        },
    ],
}

${INIT} 指向提交 DAG 时传入的 graph input / 初始 execution input。 ${DEPS...} 指向已完成的上游依赖结果。${STATE...} 读取 execution state,例如 ${STATE.task_results.lookup}${TRIGGER...} 指向原始 TriggerFlow trigger payload(data.value),主要用于高级调试或 executor 层集成。运行时路径缺失会在任务执行时 fail closed,而不是把未解析字符串继续传给 handler。

当提交式 DAG 通过 agent.use_dynamic_task(...).create_execution() 运行时, ${INIT...} 会优先读取显式传入的 use_dynamic_task(graph_input=...)。如果没有 传这个参数,则读取 create_execution() 时冻结的 execution prompt snapshot 的 input slot。只有两者都不存在时,Agent route 才回退到 {"target": task_target}

如果 create_dynamic_task(..., output_schema=..., ensure_keys=...) 为 semantic output 的 model 节点提供了前台结构契约,这个宿主契约优先于 planner 在节点上选择的 不兼容格式。多字段结构化契约遇到 planner 写出的 inputs.output_format="flat_markdown" 时,会被纠正回 auto,让输出解析器选择兼容的结构化格式。

提交式计划也可以保存成 YAML 或 JSON 配置。先把配置加载成 TaskDAG,再走同一个 facade:

from agently.core import TaskDAG

graph = TaskDAG.from_yaml("examples/dynamic_task/config_policy_review.yaml")
task = Agently.create_dynamic_task(
    target="review policy",
    plan=graph,
    handlers={"local_handler": local_handler},
)
snapshot = await task.async_run(graph_input={"doc": "policy"}, timeout=10)

TaskDAG.from_json(...) 接受文件路径或 JSON/JSON5 原始内容。from_yaml(...)from_json(...) 都支持 task_dag_key_path="plans.review",用于从较大的配置文件里选择 某一个 DAG。使用 graph.get_yaml(path)graph.get_json(path) 可以导出归一化后的图。

Agent 实例也提供同名 facade:

task = agent.create_dynamic_task(target="review policy")

Agent prompt 方法是配置阶段。agent.create_dynamic_task() 会像 agent.start() / agent.create_execution() 一样消费当前 prompt snapshot:

task = (
    agent
    .info({"customer": "Acme"})
    .instruct("Focus on renewal risk and account-team actions.")
    .input({"account": "Acme", "ticket": "T-42"})
    .output({
        "summary": (str, "risk summary", True),
        "risks": ([str], "risk bullets", True),
    }, format="json")
    .create_dynamic_task()
)

这份 prompt snapshot 会通过现有 Prompt generator 渲染成 Dynamic Task 的 target。output slot 会成为 facade 级 output_schemaoutput_format 会成为 默认 model-task format。set_agent_prompt(...) / always=True 写入的长期 prompt 会被继承;set_request_prompt(...) / quick prompt 写入的本轮 request prompt 会被冻结 到新 task,然后从 pending request 中清理。显式传入的 create_dynamic_task(target=..., output_schema=..., output_format=...) 参数优先于 prompt 推导值。

模型任务应复用 Agently request 的输出流水线,不要在 handler 或 example 里自行解析 模型文本。output_schema 会作用到 semantic output 模型节点;如果某个模型节点 需要独立契约,可以在该节点的 inputs.output_schema 覆盖。每个模型任务也可以设置 inputs.output_format

task = Agently.create_dynamic_task(
    target="write an incident briefing",
    output_schema={
        "brief": (str, "customer-facing briefing", True),
        "next_update": (str, "next update timing", True),
    },
)
snapshot = await task.async_start(timeout=120)
_, output = next(iter(snapshot["semantic_outputs"].items()))
brief = output["result"]["brief"]

提交式 DAG 可以把任务级策略放在模型节点自身:

{
    "id": "render_html",
    "kind": "model",
    "inputs": {
        "output_schema": {"html": (str, "render-ready HTML", True)},
        "output_format": "flat_markdown",
    },
}

提交式 DAG placeholder 使用和 Prompt reference 一致的大写命名风格,但它属于 TriggerFlow runtime 命名空间,不是 Prompt slot reference。${INIT.foo} 指向 初始输入,${DEPS.task.path} 指向已完成的上游依赖结果, ${STATE.task_results.task.path} 指向 execution state,${TRIGGER.result} 指向原始 TriggerFlow trigger payload。在 DAG task inputs 里,整个字符串就是 placeholder 时会保留原始运行时值类型;placeholder 嵌在普通字符串里时会转成字符串拼接。

架构

Dynamic Task 拆成四段:

bindings 不再是 public facade。自定义本地函数使用 handlers。任务确实需要资源时再显式传入 plannermodelactionsskillsactionsskills 不会在调用方未传入时暴露给 planner。

Resolver 语义

自定义 handler 应使用清晰且以 _handler 结尾的名字:

task = Agently.create_dynamic_task(
    target="review policy",
    plan=task_dag,
    handlers={"risk_check_handler": risk_check_handler},
)

DAG 中这样引用:

{"id": "check_risk", "kind": "local", "binding": "risk_check_handler"}

未知的可选 handler 如果不影响 required semantic output、必要下游节点、审批或副作用策略,Validator 可以安全剪枝。被剪枝节点必须写入 diagnostics;未知的必需 handler 会在执行前 fail closed。

低层控制

框架集成需要分阶段控制时,再使用低层类:

from agently.builtins.plugins import AgentlyTaskDAGPlanner
from agently.core import DynamicTaskResolver, TaskDAGExecutor, TaskDAGValidator

resolver = DynamicTaskResolver({"risk_check_handler": risk_check_handler})
validator = TaskDAGValidator(resolver)
planner = AgentlyTaskDAGPlanner(validator=validator)

graph = await planner.async_plan(planner_agent, {"target": "review policy"})
validation = validator.validate(graph, strict_schema_version=True)
snapshot = await TaskDAGExecutor(resolver, validator=validator).async_run(graph)

Executor 不依赖 Agent。模型和 Action 访问由 facade 或 resolver adapter 承接; TriggerFlow 是 Dynamic Task 之下的执行基座,不是 owner API。

示例

examples/dynamic_task/ 按层次提供示例: