LlamaIndex 工作流简介以及基础工作流

news2025/6/7 13:52:52

什么是工作流?

工作流是一种由事件驱动、基于步骤的应用程序执行流程控制方式。

你的应用程序被划分为多个称为“步骤(Steps)”的部分,这些步骤由“事件(Events)”触发,并且它们自身也会发出事件,从而触发后续的步骤。通过组合使用步骤和事件,你可以创建出任意复杂的流程,以封装逻辑,使你的应用程序更易于维护和理解。一个步骤可以是一行代码,也可以是一个复杂的智能体(agent),它可以拥有任意的输入和输出,并通过事件进行传递。

示例

在这张示意图中,你可以看到一个中等复杂度的工作流。该工作流的设计目的是接收一个查询,然后选择性地对其进行优化,并使用三种不同的RAG(检索增强生成)策略来尝试回答这个查询。LLM(大语言模型)会从这三种策略中分别获取答案,并判断哪一个是“最佳”答案,然后将其返回。我们可以将这个流程拆解如下:

该工作流由一个 StartEvent(开始事件) 触发。

  • 一个名为 judge_query 的步骤会判断当前查询的质量是否达标。如果不符合要求,则会生成一个 BadQueryEvent(低质量查询事件)。

  • BadQueryEvent 将触发一个名为 improve_query(优化查询) 的步骤,尝试对查询进行改进,之后会触发一个 JudgeEvent(判断事件)。

  • JudgeEvent 将再次触发 judge_query 步骤,从而形成一个循环,直到查询被认为具有足够的质量为止。这种机制被称为“反思(Reflection)”,是 Workflows 让代理型(agentic)应用程序易于实现的关键部分。

如果查询被认为具有足够高的质量,则会同时生成三个事件:

  • NaiveRAGEvent

  • HighTopKEvent

  • RerankEvent

这三个事件将并行触发三个对应步骤,每个步骤运行一种不同的 RAG 策略。

每个查询步骤都会生成一个 ResponseEvent(响应事件)。当一个 ResponseEvent 被触发时,它会激活名为 judge_response(判断响应) 的步骤,并等待接收全部三种响应结果。

最后,judge_response 会从中挑选出“最佳”响应,并通过一个 StopEvent(结束事件) 将其返回给用户。

为什么要使用工作流?

随着生成式人工智能应用变得越来越复杂,管理和控制应用程序中的数据流动与执行流程也变得愈发困难。工作流(Workflows)提供了一种管理这种复杂性的方法,它通过将整个应用程序拆分为更小、更易于管理的模块来实现对流程的有效控制。

其他框架以及 LlamaIndex 本身此前曾尝试使用有向无环图(DAGs)来解决这一问题,但相比工作流(Workflows),这种方法存在一些局限性:

  • 类似循环和分支的逻辑需要被编码到图的边(edges)中,这使得整个结构难以阅读和理解。

  • 在 DAG 中节点之间传递数据时,可选值、默认值以及具体应传递哪些参数等问题带来了额外的复杂性。

  • 对于正在开发复杂、具有循环和分支结构的 AI 应用程序的开发者来说,DAG 的使用方式不够自然。

Workflows 所采用的基于事件的模式和原生 Python 的方式有效解决了上述问题。

对于简单的 RAG 流程和线性演示,我们并不要求你一定使用 Workflows。但随着你的应用程序复杂度不断增加,我们希望你能选择 Workflows 来应对这种复杂性。

基础工作流

安装SDK

工作流(Workflows)已内置于 LlamaIndex 核心中,因此要使用它们,你只需安装 LlamaIndex 即可。

pip install llama-index-core

在开发过程中,你可能会发现可视化你的工作流非常有帮助;你可以通过安装我们内置的可视化工具来实现这一点:

pip install llama-index-utils-workflow -i https://mirrors.aliyun.com/pypi/simple/

依赖的组件

工作流所需的最低依赖项包括:

from llama_index.core.workflow import (
    StartEvent,
    StopEvent,
    Workflow,
    step,
)

单步骤工作流

一个工作流通常通过一个继承自 Workflow 的类来实现。该类可以定义任意数量的步骤,每个步骤都是一个使用 @step 装饰的方法。以下是可能的最简单的工作流示例:

import asyncio

from llama_index.core.workflow import Workflow, StartEvent, StopEvent, step

class MyWorkflow(Workflow):

    @step
    async def my_step(self, ev: StartEvent)-> StopEvent:
        # do something here
        return StopEvent(result="hello world")


async def run_workflow():
    w = MyWorkflow(timeout=10, verbose=False)
    result = await w.run()
    print(result)

if __name__ == '__main__':
    asyncio.run(run_workflow())

这将只是向控制台打印 “Hello, World!”。

在这段代码中,我们:

  1. 定义了一个继承自 Workflow 的类 MyWorkflow

  2. 使用 @step 装饰器定义了一个名为 my_step 的步骤方法

  3. 该步骤接收一个参数 ev,它是 StartEvent 的一个实例

  4. 步骤返回一个 StopEvent,并附带结果 "Hello, world!"

  5. 我们创建了一个 MyWorkflow 实例,设置超时时间为 10 秒,并关闭了详细输出

  6. 最后运行该工作流并打印其结果

步骤的类型注解

类型注解(例如 ev: StartEvent)以及返回类型注解(如 -> StopEvent)对于工作流的运作方式至关重要。这些类型决定了哪些事件类型会触发对应的步骤。诸如可视化工具(见下文)之类的工具也依赖这些类型注解来判断会生成哪些类型的事件,从而确定控制流接下来的走向。

类型注解是在编译时进行验证的,因此如果你发布了某个事件,但没有任何步骤会消费该事件类型,你将会收到一条错误信息。

启动事件与停止事件

StartEvent 和 StopEvent 是用于启动和终止工作流的特殊事件。

  • 任何接受 StartEvent 的步骤都会由 run 命令触发执行。

  • 触发一个 StopEvent 将会结束整个工作流的执行,并返回最终结果,即使还有其他步骤尚未执行。

在普通 Python 中运行工作流

工作流默认是异步的,因此你需要使用 await 来获取 run 命令的结果。这在 Notebook 环境中可以正常运行;而在普通的 Python 脚本中,你需要导入 asyncio 并将你的代码封装在一个 async 函数中,如下所示:

async def run_workflow():
    w = MyWorkflow(timeout=10, verbose=False)
    result = await w.run()
    print(result)

if __name__ == '__main__':
    asyncio.run(run_workflow())

可视化工作流

工作流的一大特色是其内置的可视化工具,我们已经在前面安装好了。现在让我们来可视化一下刚刚创建的简单工作流:

from llama_index.core.workflow import Workflow, StartEvent, StopEvent, step
from llama_index.utils.workflow import draw_all_possible_flows

class MyWorkflow(Workflow):

    @step
    async def my_step(self, start: StartEvent)-> StopEvent:
        # do something here
        return StopEvent(result="hello world")

draw_all_possible_flows(MyWorkflow, filename="basic_workflow.html")

这将在当前目录下生成一个名为 basic_workflow.html 的文件。用浏览器打开该文件,即可查看工作流的交互式可视化表示。它看起来会类似于这样:

当然,只有一个步骤的工作流并没有太大用处!下面我们来定义一个包含多个步骤的工作流。

自定义事件

多个步骤可以通过定义自定义事件来实现,这些事件可以由某个步骤发出,并触发其他步骤。下面我们来定义一个简单的三步工作流。

我们像之前一样引入所需的模块,并新增一个用于 Event 的导入:

from llama_index.core.workflow import (
    StartEvent,
    StopEvent,
    Workflow,
    step,
    Event,
)
from llama_index.utils.workflow import draw_all_possible_flows

现在我们定义两个自定义事件:FirstEvent 和 SecondEvent。这些类可以具有任意的名称和属性,但必须继承自 Event:

class FirstEvent(Event):
    first_output: str


class SecondEvent(Event):
    second_output: str

定义工作流

现在我们来定义工作流本身。我们通过为每个步骤指定输入类型和输出类型来实现这一点。

  • step_one 接收一个 StartEvent,并返回一个 FirstEvent

  • step_two 接收一个 FirstEvent,并返回一个 SecondEvent

  • step_three 接收一个 SecondEvent,并返回一个 StopEvent

class MyWorkflow(Workflow):

    @step
    async def step_one(self, ev: StartEvent)-> FirstEvent:
        print(ev.first_input)
        return FirstEvent(first_output="完成第一步...")

    @step
    async def step_two(self, ev: FirstEvent)-> SecondEvent:
        print(ev.first_output)
        return SecondEvent(second_output="完成第二步...")

    @step
    async def step_three(self, ev: SecondEvent)-> StopEvent:
        print(ev.second_output)
        return StopEvent(result="工作流完成!")

# 运行工作流
async def run_workflow():
    w = MyWorkflow(timeout=10, verbose=False)
    result = await w.run(first_input="开始工作流...")
    print(result)

if __name__ == '__main__':
    asyncio.run(run_workflow())

运行结果:

开始工作流...
完成第一步...
完成第二步...
工作流完成!

我们还可以使用可视化工具来查看该工作流中的所有可能流程:

from llama_index.utils.workflow import draw_all_possible_flows

draw_all_possible_flows(MyWorkflow, filename="multi_step_workflow.html")

完整代码

import asyncio

from llama_index.core.workflow import (
    StartEvent,
    StopEvent,
    Workflow,
    step,
    Event,
)
from llama_index.utils.workflow import draw_all_possible_flows

# 第一个事件
class FirstEvent(Event):
    first_output: str
    
# 第二个事件
class SecondEvent(Event):
    second_output: str

class MyWorkflow(Workflow):

    @step
    async def step_one(self, ev: StartEvent)-> FirstEvent:
        """第一步"""
        print(ev.first_input)
        return FirstEvent(first_output="完成第一步...")

    @step
    async def step_two(self, ev: FirstEvent)-> SecondEvent:
        """第二步"""
        print(ev.first_output)
        return SecondEvent(second_output="完成第二步...")

    @step
    async def step_three(self, ev: SecondEvent)-> StopEvent:
        """第三步"""
        print(ev.second_output)
        return StopEvent(result="工作流完成!")

# 运行工作流
async def run_workflow():
    w = MyWorkflow(timeout=10, verbose=False)
    result = await w.run(first_input="开始工作流...")
    print(result)

if __name__ == '__main__':
    asyncio.run(run_workflow())

# 工作流可视化工具
draw_all_possible_flows(MyWorkflow, filename="multi_step_workflow.html")

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2402968.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

如何利用Elastic Stack(ELK)进行安全日志分析

在以下文章中,我将解释如何使用Elastic Stack(ELK)进行安全日志分析,以提高安全性和监控网络活动。ELK是一个功能强大的开源日志管理和分析平台,由Elasticsearch、Logstash和Kibana组成,适用于各种用例&…

创客匠人:以 AI 利器赋能创始人 IP 打造,加速知识变现新路径

在知识付费与个人 IP 崛起的时代,创客匠人作为行业领先的技术服务商,正通过 AI 工具重构创始人 IP 打造与知识变现的生态。其推出的三大 AI 利器 ——AI 销售信、免训数字人、AI 智能客服,精准解决 IP 运营中的核心痛点。 以 AI 销售信为例&…

Opencv中的copyto函数

一.OpenCV中copyto函数详解 copyto()是 OpenCV 中用于图像复制和融合的核心函数,支持灵活的数据复制和掩模(Mask)操作,其功能和使用方法如下: 1. 核心功能 基础复制:将源图像&…

基于深度强化学习的Scrapy-Redis分布式爬虫动态调度策略研究

在大数据时代,网络数据的采集与分析变得至关重要,分布式爬虫作为高效获取海量数据的工具,被广泛应用于各类场景。然而,传统的爬虫调度策略在面对复杂多变的网络环境和动态的抓取需求时,往往存在效率低下、资源浪费等问…

在 Ubuntu 24.04 LTS 上安装 Jenkins 并配置全局工具(Git、JDK、Maven)

在 Ubuntu 24.04 LTS 上安装 Jenkins 并配置全局工具(Git、JDK、Maven) Jenkins 是一款开源的持续集成和持续交付(CI/CD)工具,在 DevOps 实践中有着广泛的应用。本文将手把手带你在 Ubuntu 24.04 LTS 系统中完成 Jenkins 的安装,并配置所需的全局工具:Git、JDK 和 Maven…

防爆型断链保护器的应用场景有哪些?

​ ​防爆型断链保护器是一种用于防止链条断裂导致设备损坏或安全事故的装置,尤其适用于存在爆炸风险的工业环境。以下是其主要应用场景: ​ ​1.石油化工行业 在石油化工厂、炼油厂等场所,防爆型断链保护器可用于保护输送设备&#xf…

OPenCV CUDA模块光流------高效地执行光流估计的类BroxOpticalFlow

操作系统:ubuntu22.04 OpenCV版本:OpenCV4.9 IDE:Visual Studio Code 编程语言:C11 算法描述 cv::cuda::BroxOpticalFlow 是 OpenCV CUDA 模块中实现Brox光流算法的类。该类用于在 GPU 上高效地计算两帧图像之间的稠密光流(Dens…

K8S认证|CKS题库+答案| 3. 默认网络策略

目录 3. 默认网络策略 免费获取并激活 CKA_v1.31_模拟系统 题目 开始操作: 1)、切换集群 2)、官网找模板 3)、按照题目要求创建networkpolicy 4)、应用networkpolicy 5)、检查策略 3. 默认网络策略…

Linux编程:1、文件编程

一、Linux 文件编程与 C 语言文件编程的区别 特性C 语言 I/O 库函数Linux 文件编程(系统调用)实现层面用户空间(glibc 库)内核空间(系统调用)跨平台性跨平台(Windows/Linux)仅限 Li…

Kyosan K5BMC ELECTRONIC INTERLOCKING MANUAL 电子联锁

Kyosan K5BMC ELECTRONIC INTERLOCKING MANUAL 电子联锁

【Spark征服之路-2.3-Spark运行架构】

运行架构 Spark 框架的核心是一个计算引擎,整体来说,它采用了标准 master-slave 的结构。 如下图所示,它展示了一个 Spark 执行时的基本结构。图形中的 Driver 表示 master,负责管理整个集群中的作业任务调度。图形中的 Executor …

Cilium动手实验室: 精通之旅---4.Cilium Gateway API - Lab

Cilium动手实验室: 精通之旅---4.Cilium Gateway API - Lab 1. 环境准备2. API 网关--HTTP2.1 部署应用2.2 部署网关2.3 HTTP路径匹配2.4 HTTP头匹配 3. API网关--HTTPS3.1 创建TLS证书和私钥3.2 部署HTTPS网关3.3 HTTPS请求测试 4. API网关--TLS 路由4.1 部署应用4.2 部署网关…

Java转Go日记(六十):gin其他常用知识

1. 日志文件 package mainimport ("io""os""github.com/gin-gonic/gin" )func main() {gin.DisableConsoleColor()// Logging to a file.f, _ : os.Create("gin.log")gin.DefaultWriter io.MultiWriter(f)// 如果需要同时将日志写入…

89.实现添加收藏的功能的后端实现

实现完查看收藏列表之后,实现的是添加收藏的功能 我的设想是:在对话界面中,如果用户认为AI的回答非常好,可以通过点击该回答对应的气泡中的图标,对该内容进行添加 所以后端实现为: service类中添加&…

v1.0.1版本更新·2025年5月22日发布-优雅草星云物联网AI智控系统

v1.0.1版本更新2025年5月22日发布-优雅草星云物联网AI智控系统 开源地址 星云智控官网: 优雅草星云物联网AI智控软件-移动端vue: 优雅草星云物联网AI智控软件-移动端vue 星云智控PC端开源: 优雅草星云物联网AI智控软件-PC端vue: 优雅草星云物联网AI…

如何创造出一种不同于程序语言的人与机器自然交互语言?

人机交互自然语言通过模拟人类日常交流方式,使机器能够理解并响应人类的自然表达,从而打破编程语言的复杂性壁垒,极大地提升人机协同的效率和自然性,让机器更好地融入人类的工作与生活场景。创造一种通用的人与机器自然交互语言是…

宝塔think PHP8 安装使用FFmpeg 视频上传

宝塔think PHP8 安装使用FFmpeg 一、 安装think PHP8二、安装 FFmpeg1,登录到宝塔面板。2,进入“软件商店”。3,搜索“FFmpeg”。4,选择版本点击安装。5,检查 FFmpeg 是否安装成功6, 在 ThinkPHP 8 中使用 …

26.【新型数据架构】-零ETL架构

26.【新型数据架构】-零ETL架构:减少数据移动,原系统直接分析;典型实现(AWS Zero-ETL) 一、零ETL的本质:从“数据搬运工”到“数据翻译官” 传统ETL(Extract-Transform-Load)需要将数据从源系统抽取、清洗、转换后加载到目标系统,这一过程往往耗时费力,且面临数据延…

静态相机中的 CCD和CMOS的区别

文章目录 CCD处理方式CMOS处理方式两者区别 首先根据 成像原理,CCD和CMOS的作用是一致的,都是为了将光子转化为数字图像,只是 转换的方式出现差异。 CCD处理方式 获取光子: 在电荷耦合器件(CCD)传感器中…

bug:undefined is not iterable (cannot read property Symbol(Symbol.iterator))

1.如图 2.分析 关键报错提示: undefined is not iterable (cannot read property Symbol(Symbol.iterator)) 直译: undefined是不可迭代的(不能读取属性Symbol(Symbol.iterator)) 理解: 有一个值、不存在&#x…