Agently Docs

Agently documentation for building AI applications with stable outputs, observable actions, and durable workflows.

View the Project on GitHub AgentEra/Agently

日常资讯收集器

语言:English · 中文

问题

每天早上从几个 feed 收一份精选 item,按主题分组,对每条打相关性分,最后产出一份结构化 digest。

形态

计划任务(cron / 外部)
   │
   ▼
TriggerFlow execution
   ├── pull_feeds         (for_each 并行)
   ├── normalize          (清洗去重)
   ├── classify           (模型:分配主题 + 分数)
   ├── filter_low_score
   ├── group_by_topic
   └── render_digest      (模型:产出人友好输出)

走读

from agently import Agently, TriggerFlow, TriggerFlowRuntimeData

agent = Agently.create_agent()


async def pull_feed(data):
    feed_url = data.input
    items = await fetch_feed(feed_url)
    return [{"feed": feed_url, **item} for item in items]


async def normalize(data):
    items = data.input
    seen = set()
    unique = []
    for item in items:
        key = (item.get("title"), item.get("link"))
        if key not in seen:
            seen.add(key)
            unique.append(item)
    await data.async_set_state("normalized", unique)
    return unique


async def classify_one(data):
    item = data.input
    result = await agent.input(item["title"] + "\n\n" + item.get("summary", "")).output({
        "topic": (str, "ai|infra|product|other", True),
        "score": (float, "0.0–1.0 相关性", True),
    }).async_start()
    return {**item, **result}


async def filter_and_group(data):
    items = [i for i in data.input if i["score"] >= 0.5]
    grouped = {}
    for item in items:
        grouped.setdefault(item["topic"], []).append(item)
    await data.async_set_state("grouped", grouped)
    return grouped


async def render_digest(data):
    grouped = data.input
    digest = await agent.info({"grouped": grouped}, always=False).input(
        "渲染一份 markdown digest。按主题分组,每主题前 5 按分数。"
    ).async_start()
    await data.async_set_state("digest", digest)


flow = TriggerFlow(name="daily-news")
(
    flow.for_each(concurrency=4).to(pull_feed).end_for_each()  # 一个 feed 一个
    .to(lambda data: [item for sub in data.input for item in sub])  # flatten
    .to(normalize)
    .for_each(concurrency=8).to(classify_one).end_for_each()
    .to(filter_and_group)
    .to(render_digest)
)

# 外部调度器触发:
async def run_daily(feed_urls):
    snapshot = await flow.async_start(feed_urls)
    publish_digest(snapshot["digest"])

为什么这么选

取舍

交叉链接