Dify知识检索模块API深度封装:从源码解析到独立服务部署
1. 为什么要把Dify的知识检索模块单独拎出来如果你用过Dify肯定知道它的知识库功能有多香。上传文档、智能问答、工作流集成一套组合拳下来确实能解决很多问题。但不知道你有没有遇到过这样的场景你手里有个老旧的业务系统或者一个轻量级的自动化脚本就想简单地调用一下Dify里已经训练好的知识库来回答问题难道非得把整个Dify平台都集成进去吗我刚开始也这么想觉得太“重”了。Dify官方提供的API更多是面向平台本身的管理和操作比如创建应用、管理知识库文档。但如果你想直接、纯粹地调用某个知识库的检索能力就像调用一个普通的搜索接口那样你会发现官方并没有直接提供这样一个“开箱即用”的端点。这就好比你有了一台功能强大的咖啡机Dify平台但你只想接一根水管出来直接喝浓缩咖啡知识检索结果而不是每次都得操作一遍复杂的咖啡机面板。这就是我们这次要干的事情把Dify知识检索模块的核心能力从它庞大的身躯里“剥离”出来封装成一个独立的、高内聚的RESTful API服务。你可以把这个服务部署在任何地方你的任何系统、脚本只需要发送一个HTTP POST请求就能获得结构化的知识检索结果。这样做的好处显而易见解耦。你的业务系统不再需要关心Dify的整个架构只需要对接这个轻量的API复用一次封装多处调用性能可控你可以针对这个检索服务单独进行扩缩容和优化。听起来是不是有点像“微服务”的思路没错我们就是要给Dify做一个“微服务化”的手术不过这个手术是局部的、非侵入式的完全基于现有的源码进行二次开发和封装。不用担心会破坏原有Dify的稳定性我们只是把它的一颗“心脏”Knowledge Retrieval Node小心翼翼地取出来为它建造一个独立的小房子。2. 庖丁解牛深入Dify知识检索模块源码要封装先得读懂它。Dify的知识检索能力主要藏身在两个地方Workflow和Chatflow的“知识检索”节点里。我们这次聚焦在Workflow的节点上因为它的逻辑更清晰更适合剥离。2.1 核心文件定位打开你的Dify源码目录以1.3.1版本为例关键文件在这里dify-main/api/core/workflow/nodes/knowledge_retrieval/这个目录下你会看到几个核心文件knowledge_retrieval_node.py这是知识检索节点的执行类所有检索逻辑的“大脑”都在这里。我们封装API的核心目标就是能直接调用这个类里的某个方法。entities.py这里定义了节点配置的数据结构比如KnowledgeRetrievalNodeData。我们构造请求时需要按照这个结构来组装数据。其他文件可能涉及向量模型、分块策略等我们暂时不需要深入。我们的主攻目标就是knowledge_retrieval_node.py。打开它你会发现一个类KnowledgeRetrievalNode里面有一个非常重要的方法_fetch_dataset_retriever。看名字就知道这是真正执行数据集检索的底层方法。它接收查询语句和节点配置然后返回检索到的文档片段列表。但是直接调用这个方法行不通。因为它是一个类实例方法依赖这个节点实例的许多上下文状态比如tenant_id租户ID、app_id应用ID、运行时变量池等等。Dify在工作流引擎中调用它时这些状态都是齐备的。我们要做的就是在不启动整个工作流引擎的情况下模拟出这些必要的状态然后安全地调用这个核心方法。2.2 理解依赖链条为什么不能直接调用我们来看看_fetch_dataset_retriever方法大概需要什么简化版逻辑节点配置来自前端的设置比如用哪个知识库、检索模式、选用什么模型、返回几条结果等。这些信息封装在KnowledgeRetrievalNodeData对象里。查询语句用户的问题比如“什么是机器学习”。这个查询语句通常来自工作流中上一个节点的输出存储在“变量池”里。上下文信息tenant_id,app_id,user_id。这是Dify实现多租户和应用隔离的基础检索时必须知道是哪个租户下的哪个应用在查询。运行时环境节点实例本身的一些属性以及访问向量数据库、大模型所需的客户端等。所以我们的封装思路就清晰了我们需要构造一个“最小化”的KnowledgeRetrievalNode实例把上述第1、2、3点的信息“喂”给它并模拟出第4点所需的基本运行时环境然后触发_fetch_dataset_retriever方法。这听起来有点复杂但别怕我们不需要重新发明轮子。Dify的源码已经提供了绝大部分工具我们只是扮演一个“导演”把必要的“演员”和“道具”凑齐让戏能演下去。3. 动手封装构建独立的API服务理论分析完了我们开始动手。我们的目标是创建两个新的文件并修改一个路由文件最终新增一个API端点。3.1 第一步创建服务层Service服务层负责最核心的业务逻辑。我们在api/services/workflow/目录下如果没有就创建新建一个文件叫dataset_retriever.py。这个文件的任务就是实现我们上面说的“模拟调用”。# api/services/workflow/dataset_retriever.py import logging from typing import List, Dict, Any from core.workflow.nodes.knowledge_retrieval.knowledge_retrieval_node import KnowledgeRetrievalNode from core.workflow.nodes.knowledge_retrieval.entities import KnowledgeRetrievalNodeData from core.variables import StringSegment logger logging.getLogger(__name__) def fetch_dataset_retriever(payload: Dict[str, Any]) - List[Dict[str, Any]]: 核心服务函数根据payload执行知识检索。 payload结构需要包含node_data, inputs, 以及必要的上下文ID。 try: # 1. 从payload中提取并构造节点数据 raw_node_data payload[node_data].copy() # 节点ID是必须的但节点数据对象本身不需要它 node_id raw_node_data.pop(id) # 将字典转换为强类型的节点配置对象 node_data KnowledgeRetrievalNodeData(**raw_node_data) # 2. 从inputs中提取查询字符串 # query_variable_selector 指定了查询语句在inputs中的键名通常是数组取第一个 query_selector node_data.query_variable_selector[0] query_str payload[inputs].get(query_selector) if not isinstance(query_str, str): logger.error(fQuery must be a string, got {type(query_str)}) raise ValueError(Query must be a string) # 3. 创建一个“空壳”的知识检索节点实例 # 我们不通过__init__初始化因为那需要完整的上下文我们手动设置必要属性 node KnowledgeRetrievalNode.__new__(KnowledgeRetrievalNode) node.id node_id node.node_data node_data # 4. 模拟运行时状态主要是变量池 # 节点方法会尝试从 runtime_state 的变量池中获取查询变量 # 我们创建一个简单的虚拟变量池来提供这个值 class DummyVariablePool: def __init__(self, query_value: str): self.query_value query_value def get(self, _selector): # 无论请求哪个选择器我们都返回封装好的查询字符串 return StringSegment(valueself.query_value) class DummyRuntimeState: variable_pool DummyVariablePool(query_str) # 将模拟的运行时状态挂载到节点实例上 # 节点内部可能会通过不同属性名访问运行时状态这里都设置上以防万一 setattr(node, graph_runtime_state, DummyRuntimeState()) setattr(node, fgraph_{node.id}_runtime_state, DummyRuntimeState()) # 5. 补充其他必要的上下文属性 node.tenant_id payload.get(tenant_id) node.user_id payload.get(user_id) node.app_id payload.get(app_id) # user_from 表示请求来源如 web 或 api node.user_from type(UserFrom, (), {value: payload.get(user_from, api)})() # 6. 调用核心检索方法 # 这里调用的就是Dify原生的检索逻辑 retrieval_results node._fetch_dataset_retriever(node_datanode_data, queryquery_str) # 7. 将结果转换为可JSON序列化的格式 # 原始结果可能是包含复杂对象的列表我们尝试将其转为字典 serializable_results: List[Dict[str, Any]] [] for item in retrieval_results: if hasattr(item, __dict__): serializable_results.append(item.__dict__) elif isinstance(item, dict): serializable_results.append(item) else: # 如果是其他类型如Pydantic模型尝试用dict()转换 try: serializable_results.append(dict(item)) except Exception as e: logger.warning(fCould not serialize result item {item}: {e}) serializable_results.append({content: str(item), metadata: {}}) return serializable_results except KeyError as e: logger.error(fMissing required key in payload: {e}) raise except Exception as e: logger.error(fError during knowledge retrieval: {e}, exc_infoTrue) raise我来解释一下这段代码的“精妙”之处。我们没有去修改Dify的任何一行源码而是利用了Python的动态特性。KnowledgeRetrievalNode.__new__是创建一个类实例而不调用其初始化方法__init__这让我们绕过了复杂的依赖初始化。然后我们像“拼装乐高”一样手动给这个实例装上它执行_fetch_dataset_retriever时所必需的最少零件ID、配置、运行时状态和上下文ID。最后我们调用这个“原装”的方法得到的结果和Dify工作流内部产生的完全一致。这就完美实现了“剥离”而不“破坏”。3.2 第二步创建API控制器Controller服务层准备好了我们需要一个HTTP接口来暴露它。在api/controllers/console/目录下我们新建一个子目录knowledge然后在里面创建retriever.py。# api/controllers/console/knowledge/retriever.py from flask_restful import Resource, request from services.workflow.dataset_retriever import fetch_dataset_retriever class KnowledgeRetrieverApi(Resource): 知识检索API资源类。 提供一个POST端点接收JSON payload返回检索结果。 def post(self): 处理知识检索请求。 期望的JSON结构 { node_data: {...}, // 知识检索节点的配置 inputs: {question: 你的问题}, // 输入变量 tenant_id: ..., app_id: ..., user_id: ..., user_from: api } # 强制解析JSON即使Content-Type设置不正确 payload request.get_json(forceTrue, silentFalse) if not payload: return {status: error, message: Invalid or empty JSON payload}, 400 # 调用服务层函数 try: result_data fetch_dataset_retriever(payload) return { status: success, data: result_data }, 200 except ValueError as e: return {status: error, message: fInvalid request data: {str(e)}}, 400 except Exception as e: # 记录详细错误到日志返回通用错误信息 # 实际生产环境应区分内部错误和客户端错误 return {status: error, message: Internal server error during retrieval}, 500这个控制器非常简单干净它就是一座桥把HTTP请求翻译成对服务层函数的调用再把结果包装成JSON响应。这里我增加了基本的错误处理比如检查JSON是否有效并区分客户端错误400和服务器错误500这让API更健壮。3.3 第三步注册API路由光有控制器不行还得告诉Flask这个路由在哪里。我们需要修改Dify API的路由注册文件。找到api/controllers/console/__init__.py文件。# api/controllers/console/__init__.py (部分代码) from flask_restful import Api from .knowledge.retriever import KnowledgeRetrieverApi # 导入我们新建的API类 api Api() # ... 其他已有的路由注册 ... # 注册我们新的知识检索API api.add_resource( KnowledgeRetrieverApi, /workflow/knowledge-retriever/fetch-dataset, # API端点路径 endpointworkflow_knowledge_retriever_fetch # 端点名称 )这样当你的Dify API服务启动后就会多出一个POST /console/api/workflow/knowledge-retriever/fetch-dataset的接口。注意路径中的/console/api是Dify API的默认前缀。4. 独立部署打造专属的检索微服务代码写好了但我们现在还是和Dify主项目绑在一起的。我们的目标是独立服务可以单独部署、单独伸缩。这就需要用到Docker了。4.1 定制专属Docker镜像我们不想维护一整个Dify的代码库只想关注我们修改和新增的这部分。所以思路是以官方Dify API镜像为基础只把我们改动过的文件“覆盖”进去。这就像给官方镜像打一个只包含我们补丁的“补丁包”。在Dify项目根目录下创建一个docker文件夹如果已有就在里面操作。然后新建一个Dockerfile# docker/Dockerfile # 使用与本地开发一致的Dify API官方镜像作为基础 FROM langgenius/dify-api:1.3.1 # 设置工作目录 WORKDIR /app # 将我们修改和新增的API代码复制到容器中 # 注意路径这里假设docker-compose.yaml在docker目录Dockerfile也在docker目录 # 我们需要复制上一级目录dify-main下的api目录 COPY ../api /app/api # 特别地确保核心的knowledge_retrieval_node.py被正确覆盖 # 虽然上面的COPY命令应该已经包含了但这里显式复制一次更保险 COPY ../api/core/workflow/nodes/knowledge_retrieval/knowledge_retrieval_node.py /app/api/core/workflow/nodes/knowledge_retrieval/ # 暴露端口通常官方镜像已暴露此处显式声明以示意图 EXPOSE 5001 # 使用官方镜像的默认启动命令 # CMD 通常已在基础镜像中定义无需重复这个Dockerfile的关键在于COPY ../api /app/api。它把我们本地的整个api目录复制到了容器的/app/api覆盖了官方镜像里的同名目录。这样我们新增的services/workflow/dataset_retriever.py、controllers/console/knowledge/目录以及修改的__init__.py就都进去了。而Dify的其他部分比如数据库连接、缓存、前端等都原封不动地使用官方镜像的内容。4.2 编写Docker Compose配置单有一个镜像还不够Dify依赖PostgreSQL、Redis等服务。我们沿用Dify官方的docker-compose.yaml思路但只启动我们需要的服务并且让API服务使用我们自定义的镜像。你可以基于官方的docker-compose.yaml进行简化。这里给出最关键的api服务部分修改# docker/docker-compose.yml (关键部分) version: 3.8 services: # 原有的 db, redis 服务定义保持原样确保数据持久化 ... db: image: postgres:15-alpine ... redis: image: redis:7-alpine ... # 关键自定义的API服务 api: # 不再使用官方镜像而是使用我们上面构建的镜像 # build: # context: .. # 构建上下文指向项目根目录 # dockerfile: docker/Dockerfile # 或者如果你已经构建好并推送到仓库可以直接用image标签 image: my-dify-api-custom:1.3.1 # 假设这是你构建并命名的镜像 restart: always environment: # 继承共享的环境变量配置这些通常定义在.env或compose文件顶部 : *shared-api-env # 启动模式为API服务器 MODE: api # 其他必要的环境变量如数据库连接、Redis连接、密钥等 # 这些应与原始Dify部署保持一致 DB_HOST: db DB_PORT: 5432 DB_USER: postgres DB_PASSWORD: dify123456 # 请使用强密码 REDIS_HOST: redis ... depends_on: db: condition: service_healthy redis: condition: service_started volumes: # 挂载存储卷用于持久化上传的文件等 - ./storage:/app/api/storage ports: - 5001:5001 # 将API端口映射到宿主机 networks: - dify-network # 定义网络和共享环境变量块参考官方compose文件 networks: dify-network: driver: bridge x-shared-api-env: shared-api-env DB_HOST: db DB_PORT: 5432 ...这里我注释掉了build部分直接使用了image。在实际操作中你需要先构建镜像在项目根目录执行docker build -f docker/Dockerfile -t my-dify-api-custom:1.3.1 .。然后修改docker-compose.yml中的image值为你构建的镜像名。这个配置的精髓在于我们只启动了api这一个自定义服务以及它依赖的db和redis。Dify的Web前端web服务、工作流引擎worker服务等都被我们舍弃了。因为我们这个微服务只提供知识检索API不需要那些组件。这极大地减少了资源占用和部署复杂度。4.3 部署与验证准备环境确保你的服务器有Docker和Docker Compose。将我们修改后的整个api源码目录、docker/Dockerfile和docker/docker-compose.yml上传到服务器。构建镜像在服务器上进入项目目录运行docker-compose -f docker/docker-compose.yml build来构建自定义API镜像。启动服务运行docker-compose -f docker/docker-compose.yml up -d在后台启动服务。验证服务服务启动后你可以通过curl或 Postman 测试新API是否工作。但在这之前你需要先知道几个必要的参数怎么获取。5. 实战如何调用你的知识检索APIAPI部署好了怎么用呢你需要构造一个符合规范的JSON请求体。这里我用一个Python脚本示例比原文的PowerShell更通用。5.1 获取必要的参数调用API需要四个关键ID它们都存在于Dify的数据库中tenant_id租户ID。Dify支持多租户每个租户数据隔离。执行SQLSELECT id, name FROM tenants;user_id用户ID。执行SQLSELECT id, email, name FROM accounts LIMIT 5;app_id应用ID。这个最简单打开Dify前端进入你的应用浏览器地址栏的URL里就有appId参数复制过来即可。dataset_ids知识库ID。同样在Dify前端进入你的知识库URL里找dataset参数。可以传多个用英文逗号隔开。如何连接数据库如果你用的是Docker Compose部署的Dify可以这样# 进入数据库容器 docker-compose exec db bash # 连接PostgreSQL psql -U postgres -d dify # 然后执行上面的SQL查询5.2 构造请求与调用示例假设你已经拿到了上述ID下面是一个完整的Python调用示例import requests import json # API端点地址根据你的部署修改 API_URL http://你的服务器IP:5001/console/api/workflow/knowledge-retriever/fetch-dataset # 请求头 headers { Content-Type: application/json, } # 请求体 - 这是最核心的部分需要仔细构造 payload { node_data: { id: knowledge-retrieval-node-001, # 任意唯一字符串用于日志追踪 title: 知识检索API节点, type: knowledge-retrieval, query_variable_selector: [question], # 指定查询变量名与下面的inputs键对应 dataset_ids: [你的知识库ID1, 你的知识库ID2], # 替换为真实的dataset_ids retrieval_mode: single, single_retrieval_config: { model: { provider: openai, # 根据你的Dify配置选择如 openai, azure_openai, tongyi等 name: gpt-3.5-turbo, # 模型名称 mode: chat }, top_k: 5, # 返回最相关的几条结果 score_threshold: 0.1 # 相关性分数阈值低于此值的结果不返回 } }, inputs: { question: 请解释一下什么是机器学习 # 你的查询问题键名与query_variable_selector对应 }, tenant_id: 你的租户ID, # 替换 app_id: 你的应用ID, # 替换 user_id: 你的用户ID, # 替换 user_from: api # 请求来源标识 } try: response requests.post(API_URL, headersheaders, datajson.dumps(payload)) response.raise_for_status() # 检查HTTP错误 result response.json() if result.get(status) success: print(检索成功) for idx, doc in enumerate(result.get(data, [])): print(f\n--- 结果 {idx1} ---) # 结果结构通常包含 content, metadata, score 等字段 print(f内容摘要{doc.get(content, )[:200]}...) print(f相关性分数{doc.get(score, N/A)}) print(f元数据{doc.get(metadata, {})}) else: print(fAPI返回错误{result.get(message)}) except requests.exceptions.RequestException as e: print(f网络请求失败{e}) except json.JSONDecodeError as e: print(f响应解析失败{e})运行这个脚本如果一切配置正确你就会收到一个JSON响应里面包含了从你的知识库中检索到的、与“机器学习”相关的文档片段并且按照相关性排序。至此你就拥有了一个完全独立于Dify前端、可以随时被任何系统调用的知识检索服务。我在实际封装和部署过程中发现最关键的是确保payload的结构与Dify内部KnowledgeRetrievalNodeData模型完全匹配特别是model的provider和name必须是你Dify环境中实际配置并可用的大模型。否则检索过程可能会因为模型调用失败而报错。另外第一次部署时建议先在一个测试用的Dify环境和知识库上操作避免影响生产数据。这个封装方案就像给你的核心业务逻辑加了一个轻量级的“外壳”让它能以最标准、最灵活的方式对外提供服务对于构建企业内部的智能中台或者整合多个老旧系统来说是一个非常实用的技巧。
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2410264.html
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!