Agently documentation for building AI applications with stable outputs, observable actions, and durable workflows.
Languages: English · 中文
Every morning, collect a curated list of items from a few feeds, group them by topic, score each for relevance, and produce a single structured digest.
Schedule (cron / external)
│
▼
TriggerFlow execution
├── pull_feeds (parallel via for_each)
├── normalize (clean and dedupe)
├── classify (model: assign topic + score)
├── filter_low_score
├── group_by_topic
└── render_digest (model: produce the human-friendly output)
from agently import Agently, TriggerFlow, TriggerFlowRuntimeData
agent = Agently.create_agent()
async def pull_feed(data):
feed_url = data.input
items = await fetch_feed(feed_url)
return [{"feed": feed_url, **item} for item in items]
async def normalize(data):
items = data.input
seen = set()
unique = []
for item in items:
key = (item.get("title"), item.get("link"))
if key not in seen:
seen.add(key)
unique.append(item)
await data.async_set_state("normalized", unique)
return unique
async def classify_one(data):
item = data.input
result = await agent.input(item["title"] + "\n\n" + item.get("summary", "")).output({
"topic": (str, "ai|infra|product|other", True),
"score": (float, "0.0–1.0 relevance", True),
}).async_start()
return {**item, **result}
async def filter_and_group(data):
items = [i for i in data.input if i["score"] >= 0.5]
grouped = {}
for item in items:
grouped.setdefault(item["topic"], []).append(item)
await data.async_set_state("grouped", grouped)
return grouped
async def render_digest(data):
grouped = data.input
digest = await agent.info({"grouped": grouped}, always=False).input(
"Render a markdown digest. Group by topic, top 5 per topic by score."
).async_start()
await data.async_set_state("digest", digest)
flow = TriggerFlow(name="daily-news")
(
flow.for_each(concurrency=4).to(pull_feed).end_for_each() # one per feed
.to(lambda data: [item for sub in data.input for item in sub]) # flatten
.to(normalize)
.for_each(concurrency=8).to(classify_one).end_for_each()
.to(filter_and_group)
.to(render_digest)
)
# external scheduler triggers this:
async def run_daily(feed_urls):
snapshot = await flow.async_start(feed_urls)
publish_digest(snapshot["digest"])
for_each(concurrency=4)) are first-class in TriggerFlow. Doing this in plain async needs careful scaffolding; in a flow it’s two operators.flow.async_start(...) not create_execution — this flow is self-closing, no human input, no external emit. The hidden execution sugar is fine. See Lifecycle.normalized, grouped, digest — all three are useful for debugging when something goes wrong. They land in the close snapshot and you can inspect them after the fact.info(grouped, always=False) in render_digest — the grouped data is large and only relevant to this call. always=False keeps it out of the agent’s persistent prompt.runtime_resources map and use data.require_resource("classifier_for_<topic>")..start() provides per request. If a feed fetch fails, the whole flow fails. Add a try/except around pull_feed if partial output is acceptable.save() if a single run is long enough to be interrupted.for_each and concurrencytopic / score schema