使用 NV‑Ingest、Unstructured 和 Elasticsearch 处理非结构化数据

news2025/5/11 7:08:50

作者:来自 Elastic Ajay Krishnan Gopalan

了解如何使用 NV-Ingest、Unstructured Platform 和 Elasticsearch 为 RAG 应用构建可扩展的非结构化文档数据管道。

Elasticsearch 原生集成了行业领先的生成式 AI 工具和提供商。查看我们的网络研讨会,了解如何超越 RAG 基础,或使用 Elastic 向量数据库构建可投入生产的应用。

为了为你的用例构建最佳搜索解决方案,现在就开始免费云试用,或在本地机器上试用 Elastic。


在这篇博客中,我们将讨论如何使用 NV-Ingest、Unstructured Platform 和 Elasticsearch 实现一个可扩展的数据处理流水线。该流水线将来自数据源的非结构化数据转换为结构化、可搜索的内容,为下游的 AI 应用(如 RAG)做好准备。检索增强生成(RAG)是一种 AI 技术,它为大语言模型(LLMs)提供外部知识,以生成对用户查询的响应。这使得 LLM 的回答能够根据特定上下文进行定制,从而使答案更准确、更相关。

在开始之前,让我们先了解一下实现该流水线的关键组件,以及它们各自的作用。

流水线组件

NV-Ingest 是一组微服务,用于将非结构化文档转换为结构化内容和元数据。它可以大规模处理文档解析、视觉结构识别和 OCR 处理。

Unstructured 是一个 ETL+ 平台,用于协调整个非结构化数据处理流程:从从多个数据源中摄取非结构化数据、通过可配置的工作流引擎将原始非结构化文件转换为结构化数据、使用附加转换丰富数据,一直到将结果上传到向量存储、数据库和搜索引擎。它提供了可视化 UI、API 和可扩展的后端基础设施,在一个工作流中协调文档解析、数据丰富和嵌入处理。

Elasticsearch 是业界领先的搜索和分析引擎,现在具备原生的向量搜索能力。它既可以作为传统的文本数据库,也可以作为向量数据库,支持像 k-NN 相似度搜索这样的功能,实现大规模语义搜索。

现在我们已经介绍了核心组件,接下来让我们看看它们在典型工作流程中是如何协同工作的,然后再深入了解具体实现。

使用 NV-Ingest - Unstructured - Elasticsearch 实现 RAG

虽然这里我们只提供关键要点,你可以在此处查看完整的 notebook。

本博客分为三个部分:

  1. 设置源和目标连接器

  2. 使用 Unstructured API 设置工作流

  3. 基于处理后的数据进行 RAG

Unstructured 的工作流以 DAG(Directed Acyclic Graph - 有向无环图)的形式表示,节点称为连接器,用于控制数据的摄取来源以及处理结果的上传目标。这些节点在任何工作流中都是必需的。源连接器配置原始数据从数据源的摄取,目标连接器配置处理后数据上传到向量存储、搜索引擎或数据库。

在本博客中,我们将研究论文存储在 Amazon S3 中,并希望将处理后的数据传送到 Elasticsearch 用于下游用途。这意味着,在构建数据处理工作流之前,我们需要通过 Unstructured API 创建一个 Amazon S3 的源连接器和一个 Elasticsearch 的目标连接器。

步骤 1:设置 S3 源连接器

在创建源连接器时,你需要为其指定一个唯一名称,明确其类型(例如 S3 或 Google Drive),并提供配置,通常包括你要连接的数据源的位置(例如 S3 bucket 的 URI 或 Google Drive 文件夹)以及身份验证信息。

source_connector_response = unstructured_client.sources.create_source(
    request=CreateSourceRequest(
        create_source_connector=CreateSourceConnector(
            name="demo_source1",
            type=SourceConnectorType.S3,
            config=S3SourceConnectorConfigInput(
                key=os.environ['S3_AWS_KEY'],
                secret=os.environ['S3_AWS_SECRET'],
                remote_url=os.environ["S3_REMOTE_URL"],
                recursive=False #True/False
            )
        )
    )
)

pretty_print_model(source_connector_response.source_connector_information)

步骤 2:设置 Elasticsearch 目标连接器

接下来,我们来设置 Elasticsearch 目标连接器。你使用的 Elasticsearch 索引必须具有与 Unstructured 为你生成的文档架构兼容的架构 —— 你可以在文档中找到所有详细信息。

destination_connector_response = unstructured_client.destinations.create_destination(
    request=CreateDestinationRequest(
        create_destination_connector=CreateDestinationConnector(
            name="demo_dest-3",
            type=DestinationConnectorType.ELASTICSEARCH,
            config=ElasticsearchConnectorConfigInput(
                hosts=[os.environ['es_host']],
                es_api_key=os.environ['es_api_key'],
                index_name="demo-index"
            )
        )
    )
)

步骤 3:使用 Unstructured 创建工作流

一旦你拥有了源连接器和目标连接器,就可以创建一个新的数据处理工作流。我们将通过以下节点构建工作流 DAG:

  • NV-Ingest 用于文档分区

  • Unstructured 的 Image Summarizer、Table Summarizer 和 Named Entity Recognition 节点用于内容丰富

  • Chunker 和 Embedder 节点用于使内容准备好进行相似性搜索

from unstructured_client.models.shared import (
    WorkflowNode,
    WorkflowNodeType,
    WorkflowType,
    Schedule
)

# Partition the content by using NV-Ingest
parition_node = WorkflowNode(
            name="Ingest",
            subtype="nvingest",
            type="partition",
            settings={"nvingest_host":  userdata.get('NV-Ingest-host-address')},
        )


# Summarize each detected image.
image_summarizer_node = WorkflowNode(
    name="Image summarizer",
    subtype="openai_image_description",
    type=WorkflowNodeType.PROMPTER,
    settings={}
)

# Summarize each detected table.
table_summarizer_node = WorkflowNode(
    name="Table summarizer",
    subtype="anthropic_table_description",
    type=WorkflowNodeType.PROMPTER,
    settings={}
)

# Label each recognized named entity.
named_entity_recognizer_node = WorkflowNode(
    name="Named entity recognizer",
    subtype="openai_ner",
    type=WorkflowNodeType.PROMPTER,
    settings={
        "prompt_interface_overrides": None
    }
)

# Chunk the partitioned content.
chunk_node = WorkflowNode(
    name="Chunker",
    subtype="chunk_by_title",
    type=WorkflowNodeType.CHUNK,
    settings={
        "unstructured_api_url": None,
        "unstructured_api_key": None,
        "multipage_sections": False,
        "combine_text_under_n_chars": 0,
        "include_orig_elements": True,
        "max_characters": 1537,
        "overlap": 160,
        "overlap_all": False,
        "contextual_chunking_strategy": None
    }
)

# Generate vector embeddings.
embed_node = WorkflowNode(
    name="Embedder",
    subtype="azure_openai",
    type=WorkflowNodeType.EMBED,
    settings={
        "model_name": "text-embedding-3-large"
    }
)


response = unstructured_client.workflows.create_workflow(
    request={
        "create_workflow": {
            "name": f"s3-to-es-NV-Ingest-custom-workflow",
            "source_id": source_connector_response.source_connector_information.id,
            "destination_id": "a72838a4-bb72-4e93-972d-22dc0403ae9e",
            "workflow_type": WorkflowType.CUSTOM,
            "workflow_nodes": [
                parition_node,
                image_summarizer_node,
                table_summarizer_node,
                named_entity_recognizer_node,
                chunk_node,
                embed_node
            ],
        }
    }
)

workflow_id = response.workflow_information.id
pretty_print_model(response.workflow_information)

job = unstructured_client.workflows.run_workflow(
    request={
        "workflow_id": workflow_id,
    }
)

pretty_print_model(job.job_information)

一旦这个工作流的任务完成,数据将被上传到 Elasticsearch,我们就可以继续构建一个基础的 RAG 应用程序。

步骤 4:RAG 设置

让我们继续设置一个简单的检索器,它将连接到数据,接收用户查询,使用与原始数据嵌入相同的模型对其进行嵌入,并计算余弦相似度以检索前 3 个文档。

from langchain_elasticsearch import ElasticsearchStore
from langchain.embeddings import OpenAIEmbeddings
import os

embeddings = OpenAIEmbeddings(
    model="text-embedding-3-large",
    openai_api_key=os.environ['OPENAI_API_KEY']

)

vector_store = ElasticsearchStore(
    es_url=os.environ['es_host'],
    index_name="demo-index",
    embedding=embeddings,
    es_api_key=os.environ['es_api_key'],
    query_field="text",
    vector_query_field="embeddings",
    distance_strategy="COSINE"
)

retriever = vector_store.as_retriever(
    search_type="similarity",
    search_kwargs={"k": 3}  # Number of results to return
)

然后,让我们设置一个工作流来接收用户查询,从 Elasticsearch 中获取相似文档,并使用这些文档作为上下文来回答用户的问题。

from openai import OpenAI

client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))

def generate_answer(question: str, documents: str):

    prompt = """
    You are an assistant that can answer user questions given provided context.
    Your answer should be thorough and technical.
    If you don't know the answer, or no documents are provided, say 'I do not have enough context to answer the question.'
    """

    augmented_prompt = (
        f"{prompt}"
        f"User question: {question}\n\n"
        f"{documents}"
    )
    response = client.chat.completions.create(
        messages=[
            {'role': 'system', 'content': 'You answer users questions.'},
            {'role': 'user', 'content': augmented_prompt},
        ],
        model="gpt-4o-2024-11-20",
        temperature=0,
    )

    return response.choices[0].message.content


def format_docs(docs):
    seen_texts = set()
    useful_content = [doc.page_content for doc in docs]

    return  "\nRetrieved documents:\n" + "".join(
        [
            f"\n\n===== Document {str(i)} =====\n" + doc
            for i, doc in enumerate(useful_content)
        ]
    )
def rag(query):
  docs = retriever.invoke(query)
  documents = format_docs(docs)
  answer = generate_answer(query, documents)
  return documents, answer

将所有内容组合在一起,我们得到:

query = "How did the response lengths change with training?"

docs, answer = rag(query)

print(answer)

和一个响应:

Based on the provided context, the response lengths during training for the DeepSeek-R1-Zero model showed a clear trend of increasing as the number of training steps progressed. This is evident from the graphs described in Document 0 and Document 1, which both depict the "average length per response" on the y-axis and training steps on the x-axis.

### Key Observations:
1. **Increasing Trend**: The average response length consistently increased as training steps advanced. This suggests that the model naturally learned to allocate more "thinking time" (i.e., generate longer responses) as it improved its reasoning capabilities during the reinforcement learning (RL) process.

2. **Variability**: Both graphs include a shaded area around the average response length, indicating some variability in response lengths during training. However, the overall trend remained upward.

3. **Quantitative Range**: The y-axis for response length ranged from 0 to 12,000 tokens, and the graphs show a steady increase in the average response length over the course of training, though specific numerical values at different steps are not provided in the descriptions.

### Implications:
The increase in response length aligns with the model's goal of solving reasoning tasks more effectively. Longer responses likely reflect the model's ability to provide more detailed and comprehensive reasoning, which is critical for tasks requiring complex problem-solving.

In summary, the response lengths increased during training, indicating that the model adapted to allocate more resources (in terms of response length) to improve its reasoning performance.

Elasticsearch 提供了多种增强搜索的策略,包括混合搜索,这是近似语义搜索和基于关键字的搜索的结合。

这种方法可以提高作为上下文使用的 RAG 架构中的 top 文档的相关性。要启用此功能,您需要按照以下方式修改 vector_store 初始化:

from langchain_elasticsearch import DenseVectorStrategy

vector_store = ElasticsearchStore(
    es_url=os.environ['es_host'],
    index_name="demo-index",
    embedding=embeddings,
    es_api_key=os.environ['es_api_key'],
    query_field="text",
    vector_query_field="embeddings",
    strategy=DenseVectorStrategy(hybrid=True) // <-- here the change
)

结论

良好的 RAG 从准备充分的数据开始,而 Unstructured 简化了这一关键的第一步。通过 NV-Ingest 启用文档分区、对非结构化数据进行元数据丰富并高效地将其摄取到 Elasticsearch,它确保了您的 RAG 管道建立在坚实的基础上,为所有下游任务释放其全部潜力。

原文:Unstructured data processing with NV‑Ingest, Unstructured, and Elasticsearch - Elasticsearch Labs

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2372941.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

20250508在WIN10下使用移远的4G模块EC200A-CN直接上网

1、在WIN10/11下安装驱动程序&#xff1a;Quectel_Windows_USB_DriverA_Customer_V1.1.13.zip 2、使用移远的专用串口工具&#xff1a;QCOM_V1.8.2.7z QCOM_V1.8.2_win64.exe 3、配置串口UART42/COM42【移远会自动生成连续三个串口&#xff0c;最小的那一个】 AT命令&#xf…

C++(6):逻辑运算符

目录 1. 代码示例 示例 1&#xff1a;基础用法 示例 2&#xff1a;条件判断 2. 短路求值&#xff08;Short-Circuit Evaluation&#xff09; 代码示例 3. 实际应用场景 场景 1&#xff1a;输入合法性验证 场景 2&#xff1a;游戏状态判断 4. 注意事项 逻辑运算符用于组…

NXP iMX8MP ARM 平台多屏幕克隆显示测试

By Toradex秦海 1). 简介 NXP i.MX8MP ARM SoC 支持 3 路 Display Controller 分别提供 DSI/HDMI/LVDS 显示输出&#xff0c;在 Yocto Linux BSP 下采用 Wayland Backend 基于 DRM subsystem 显示驱动&#xff0c;前端默认基于 Weston Compositor。因此在默认情况下连接多个屏…

【数据结构】——栈

一、栈的概念和结构 栈其实就是一种特殊的顺序表&#xff0c;其只允许在一端进出&#xff0c;就是栈的数据的插入和删除只能在一端进行&#xff0c;进行数据的插入和删除操作的一端称为栈顶&#xff0c;另一端称为栈底。栈中的元素遵循先进后出LIFO&#xff08;Last InFirst O…

Navicat中保存的数据库密码找回 Java 8

导出数据库连接打开导出的connections.ncx文件找到加密的password放入java程序中解密即可 package com.asia.card.cloud.enterprise.api;import javax.crypto.Cipher; import javax.crypto.spec.IvParameterSpec; import javax.crypto.spec.SecretKeySpec; import java.nio.cha…

vs code管理员权限启动问题

vs code非管理员启动可以正常启动用管理员启动vs code&#xff0c;会提示 解决办法 找到argv.json文件在argv.json文件中添加 "disable-chromium-sandbox": true重启vs code即可

Spring Cloud与Service Mesh集成:Istio服务网格实践

文章目录 引言一、Spring Cloud与Service Mesh概述二、Istio服务网格架构三、Spring Cloud与Istio集成的基础设施准备四、服务发现与负载均衡五、流量管理与弹性模式六、安全通信与认证授权七、可观测性集成八、配置管理集成总结 引言 微服务架构已成为现代分布式系统的主流设…

React+Taro选择日期组件封装

话不多说&#xff0c;直接上效果 1.页面渲染时间模块 {this.renderCalendarPopup()}2.引入时间组件弹层&#xff0c;state中加入showPopup(控制什么时候展示时间选择弹层)&#xff0c;time(选择后的时间值) private renderCalendarPopup () > {const { showPopup, time…

C++进阶--AVL树的实现续

文章目录 C进阶--AVL树的实现双旋AVL树的查找AVL树的检验结语 很高兴和搭大家见面&#xff0c;给生活加点impetus&#xff0c;开启今天的比编程之路&#xff01;&#xff01; 今天我们来完善AVL树的操作&#xff0c;为后续红黑树奠定基础&#xff01;&#xff01; 作者&#x…

AutoGen+Deepseek+chainlit的简单使用

AutoGen 的应用场景 AutoGen 作为一个强大的多智能体协作框架&#xff0c;可用于多种复杂任务&#xff1a; 自动化工作流&#xff1a;构建由多个智能体组成的流水线&#xff0c;例如数据收集、分析、报告生成复杂问题分解&#xff1a;将难题拆解为子任务&#xff0c;分配给不…

采用SqlSugarClient创建数据库实例引发的异步调用问题

基于SqlSugar编写的多个WebApi接口&#xff0c;项目初始化时采用单例模式注册SqlSugarClient实例对象&#xff0c;前端页面采用layui布局&#xff0c;并在一个按钮事件中通过Ajax连续调用多个WebApi接口获取数据。实际运行时点击按钮会随机报下面几种错误&#xff1a; Execute…

第7次课 栈A

课堂学习 栈&#xff08;stack&#xff09; 是一种遵循先入后出逻辑的线性数据结构。 我们可以将栈类比为桌面上的一摞盘子&#xff0c;如果想取出底部的盘子&#xff0c;则需要先将上面的盘子依次移走。我们将盘子替换为各种类型的元素&#xff08;如整数、字符、对象等&…

软考-软件设计师中级备考 13、刷题 数据结构

倒计时17天时间不多了&#xff0c;数据库、UML、等知识点有基础直接略过&#xff0c;法律全靠考前的一两天刷题&#xff0c;英语直接放弃。 一、数据结构&#xff1a;链表、栈、队列、数组、哈希表、树、图 1、关于链表操作&#xff0c;说法正确的是&#xff1a; A)新增一个头…

centos的根目录占了大量空间怎么办

问题 当根目录磁盘不够时&#xff0c;就必须删除无用的文件了 上面的&#xff0c;如果删除/usr 或/var是可以释放出系统盘的 定位占空间大的文件 经过命令&#xff0c;一层层查哪些是占磁盘的。 du -sh /* | sort -rh | head -n 10 最终排查&#xff0c;是有个系统日志占了20…

电子电器架构 --- 新能源高压上下电那点事一文通

我是穿拖鞋的汉子,魔都中坚持长期主义的汽车电子工程师。 老规矩,分享一段喜欢的文字,避免自己成为高知识低文化的工程师: 钝感力的“钝”,不是木讷、迟钝,而是直面困境的韧劲和耐力,是面对外界噪音的通透淡然。 生活中有两种人,一种人格外在意别人的眼光;另一种人无论…

每日算法-250510

每日算法学习记录 - 250510 1. LeetCode 2086. 喂食仓鼠的最小食物桶数 题目描述: 解题思路 这是一个典型的贪心问题。我们的目标是用最少的食物桶喂饱所有仓鼠。 解题过程 核心思想是&#xff1a;当遇到一只仓鼠时&#xff0c;如何放置食物桶才能最有效地利用这个桶。 …

渗透测试行业术语1

渗透测试行业术语1 1. 肉鸡 所谓“肉鸡”是一种很形象的比喻&#xff0c;比喻那些可以随意被我们控制的电脑&#xff0c;对方可以是 WINDOWS 系统&#xff0c;也可以是 UNIX/LINUX 系统可以是普通的个人电脑&#xff0c;也可以是大型的服务器我们可以象操作自己的电脑那样来操…

【大模型】使用 LLaMA-Factory 进行大模型微调:从入门到精通

使用 LLaMA-Factory 进行模型微调&#xff1a;从入门到精通 一、环境搭建&#xff1a;奠定微调基础&#xff08;一&#xff09;安装依赖工具&#xff08;二&#xff09;创建 conda 环境&#xff08;三&#xff09;克隆仓库并安装依赖 二、数据准备&#xff1a;微调的基石&#…

使用Python 打造多格式文件预览工具 — 图、PDF、Word、Excel 一站式查看

在日常办公或文件管理场景中&#xff0c;我们经常面临这样的问题&#xff1a;在一个文件夹中短时间内产生了大量不同类型的文件&#xff08;如图片、PDF、Word、Excel&#xff09;&#xff0c;我们需要快速浏览和筛选这些文件的内容&#xff0c;却不希望一个个打开它们。有没有…

[docker基础四]容器虚拟化基础之 LXC

目录 一 认识LXC 二 LXC容器操作实战 1&#xff09;实战目的 2&#xff09;基础知识 lxc-checkconfig lxc-create lxc-start lxc-ls lxc-info lxc-attach lxc-stop lxc-destory 3&#xff09;安装LXC(我的是Ubuntu) 4&#xff09;操作实战 1. 检查 lxc 是否运行…