跳转至

并发

批量与 for_each 并发控制。

源码

import asyncio
from agently import TriggerFlow, TriggerFlowEventData


## TriggerFlow Concurrency: batch + for_each
def triggerflow_concurrency():
    # Idea: show concurrency limits for batch and for_each.
    # Flow: batch(echo, concurrency=2) -> print, then for_each(list, concurrency=2) -> print
    # Expect: outputs two results with interleaved timing.
    flow = TriggerFlow()

    async def echo(data: TriggerFlowEventData):
        await asyncio.sleep(0.1)
        return f"echo: {data.value}"

    # batch: run chunks in parallel with a concurrency limit
    flow.batch(
        ("a", echo),
        ("b", echo),
        ("c", echo),
        concurrency=2,
    ).end()

    result = flow.start("hello")
    print(result)

    # for_each: process list items with concurrency control
    flow_2 = TriggerFlow()
    (
        flow_2.to(lambda _: [1, 2, 3, 4])
        .for_each(concurrency=2)
        .to(echo)
        .end_for_each()
        .end()
    )
    result_2 = flow_2.start()
    print(result_2)


# triggerflow_concurrency()

讲解

  • 使用 concurrency 进行上限控制。

注释解读

  • Idea 表示案例思路
  • Flow 表示执行编排路径
  • Expect 表示预期输出或行为

你学会了什么

  • 理解 batch/for_each 并发控制
  • 知道 concurrency 上限的作用

练习任务

  • 把 concurrency 改成 1 观察耗时变化