BullMQ:AI系统缺失的队列层

news2026/5/2 7:09:08
你是否曾经花了大量时间只是在等待等待 API 调用完成。看着请求超时。盯着加载中的转圈动画。在某个时刻你会意识到问题不在于代码而在于架构。我们不能只是调用一个慢的东西然后期望一切顺利。这就是人们发明任务队列的原因。1、什么是任务队列这是一种朴素的方法——在完美世界中可行的方法用户上传文档 → 服务器调用 AI API → 服务器等待 → 服务器响应用户很简单对吧直到 AI API 需要 30 秒。或者你的 HTTP 网关在 25 秒时超时。或者第三方服务在第 50 次请求后对你限流。或者三个用户同时上传你同时冲击同一个端点。同步方法假设一切都是快速和可靠的。没有什么是快速和可靠的。任务队列通过解耦请求和实际工作来解决这个问题。与其立即执行工作你将一个便签放入队列上面写着嘿有人需要做这件事。一个独立的进程——一个Worker——拾取这个便签并在自己的时间以可控的速度完成实际工作。把它想象成邮局。你不需要在柜台等待邮递员亲自把包裹送到目的地。你交出包裹拿到一个追踪号然后回家。投递在后台进行。你可以稍后查看追踪号以确认是否到达。那个追踪号就是你的任务 ID。2、如果你来自 RabbitMQ 或 Kafka 的快速说明在这个入门教程中我们将使用 BullMQ。如果你之前用过 RabbitMQ、Apache Kafka甚至 Amazon Simple Queue ServiceBullMQ 可能会感觉太简单了。那是因为 BullMQ 并不试图成为通用的消息代理或事件流平台。它是一个任务队列一个用来表达请稍后做这项工作的工具。使用 RabbitMQ 时你可能会想到交换机、路由键、发布者、消费者以及多个服务订阅同一条消息。使用 Kafka 时你可能会想到主题、分区、偏移量和可重放的事件流。BullMQ 并不做这些事情。没有交换机。没有分区。没有扇出式发布/订阅。一个任务通常有一个生产者、一个队列和一个最终处理它的 Worker。这是有意为之的。对于本文所讨论的问题——后台任务例如文档分块生成嵌入向量发送电子邮件调整图片大小调用慢速 AI API——BullMQ 通常更容易理解也更快上手。特别是如果你的技术栈已经使用了 Node.js 和 Redis。如果 RabbitMQ 是一个邮件分拣中心Kafka 是整个货运铁路网络那么 BullMQ 就是熟食店柜台上的取号机取一个号码排队等候最终轮到你时有人会叫你。权衡之处在于 BullMQ 的范围更窄。如果你以后需要多个独立的服务消费同一个事件、高级路由规则或重放历史事件那通常就是你升级到 RabbitMQ 或 Kafka 的时候。3、 BullMQ的三个主要角色让我们先建立正确的思维模型。系统中有三个角色生产者Producer你的应用中创建任务的部分。通常是你的 API 处理程序或控制器。当用户触发一些可能很耗资源的操作时你不会立即执行工作而是将其入队。队列Queue等待区域。它是一个数据结构在 BullMQ 中由 Redis 支持用于存储任务并跟踪它们的状态。任务在这里等待直到有 Worker 准备好。Worker实际执行工作的进程。它监视队列逐个或批量拾取任务运行你的处理逻辑并将任务标记为完成或失败。生产者和 Worker 不需要知道彼此。它们只是都与队列通信。这就是全部的诀窍。4、前后对比队列实际改变了什么让我用一个真实的例子来说明——将文档导入 RAG 系统。这意味着从 PDF 中提取文本、将其分成块、评估质量、生成嵌入向量并将所有内容写入向量数据库。没有队列时这在你的 HTTP 请求处理程序中同步运行这是消息队列系统在我们的例子中任务队列能极大改善用户体验的典型案例。使用队列后HTTP 处理程序只做一件事入队。然后立即返回如下面的时序图所示差异不仅仅是外观上的。有了队列API 保持快速无论后台工作需要多长时间系统可以处理 100 个并发上传而不会有 100 个被阻塞的线程嵌入 API 的速率限制由 Worker 遵守而不是由 100 个竞争请求管理如果嵌入 API 宕机任务会在队列中排队并自动重试。不会丢失任何工作。5、代码示例下面是一些代码示例如果你想尝试使用 BullMQ 创建任务队列。5.1 设置 RedisBullMQ 将所有内容存储在 Redis 中任务负载、状态、重试计数所有东西。Redis 很快能在重启后存活而且 BullMQ 专门围绕其数据结构构建。在本地运行 Redis 最快的方式是 Dockerdocker run -d -p 6379:6379 redis:7就这样。Redis 现在运行在localhost:6379。本地开发无需额外配置。5.2 安装 BullMQ如果你已经在使用 NodeJSBullMQ 非常适合。我们需要安装这两个库bullmq队列库和ioredis它底层使用的 Redis 客户端。npm install bullmq ioredis5.3 你的第一个队列和生产者假设我们正在构建一个文档处理功能。当用户上传文件时我们想要启动一个后台任务而不是阻塞 HTTP 响应。首先设置一个共享的 Redis 连接并创建一个队列import { Queue } from bullmq; import IORedis from ioredis; // create a connection to Redis const connection new IORedis({ maxRetriesPerRequest: null }); // create a queue with the name document-processing const document_queue new Queue(document-processing, { connection });现在在我们的 API 处理程序中通常我们会同步执行工作的地方我们只需入队// create a new job and // enqueue it in the document-processing queue const job await document_queue.add( process-document, { documentId: doc_abc123, userId: user_xyz, fileUrl: https://storage.example.com/uploads/abc123.pdf, } ); console.log(Job enqueued with ID: ${job.id}); // return job.id to the client so they can poll for status laterqueue.add()接受一个任务名称和一个负载。负载是我们的 Worker 需要的任何数据。该方法返回一个带有 ID 的任务对象请保存好它。5.4 我们的第一个 Worker现在我们需要一些东西来实际拾取这些任务。下面是一个基本的 Workerimport { Worker } from bullmq; import IORedis from ioredis; // create a connection to Redis const connection new IORedis({ maxRetriesPerRequest: null }); // create the worker for // the document-processing queue we have created previously const worker new Worker( document-processing, // - the name of the queue async (job) { console.log(Processing job ${job.id}); console.log(Document ID: ${job.data.documentId}); // This is where your actual processing logic goes // e.g., download file, call AI API, save results. // Before using job queue, an API handler would just // immediately call this function. await process_document(job.data); return { status: done, processedAt: new Date().toISOString() }; }, { connection } ); // event listener 1 worker.on(completed, (job) { console.log(Job ${job.id} finished.); }); // event listener 2 worker.on(failed, (job, err) { console.error(Job ${job?.id} failed:, err.message); });Worker 通过名称连接到同一个队列。BullMQ 处理所有的轮询——你的处理函数只在有任务需要处理时运行。你从处理器中return的任何内容都会被保存为任务的结果数据。6、任务状态入队后会发生什么每个任务都会经历一个生命周期。了解这一点会让调试容易得多。还有waiting-children用于任务依赖和paused状态但这些是高级主题。对于上面的例子waiting → active → completed/failed是核心循环。6.1 重试和退避因为事情会失败外部 API 会失败。网络会出问题。你的 AI 提供商在凌晨 3 点返回 503 错误。我们希望任务能在不需要人工干预的情况下自动重试。我们可以在入队时配置重试。前面的示例代码可以这样写const job await document_queue.add( process-document, { documentId: doc_abc123, userId: user_xyz, fileUrl: https://storage.example.com/uploads/abc123.pdf, }, // retry configuration { attempts: 5, backoff: { type: exponential, delay: 2000, // start with 2 seconds }, } );指数退避意味着等待 2 秒然后 4 秒然后 8 秒然后 16 秒……每次重试大约等待上一次的两倍时间。这很重要因为如果一个服务正在挣扎中每秒都去冲击它只会让情况更糟。给它恢复的空间。使用attempts: 5BullMQ 在第一次失败后最多会再重试四次之后才会最终将任务标记为failed。7、限流这是我最初在 RAG 系统中使用队列的原因之一。假设我们正在调用一个允许每分钟 100 次请求的外部 API。在并发用户足够多的情况下我们会超出限制并开始到处收到429错误。BullMQ 有一个内置的限流器。我们在 Worker 上配置它// adjusting the previous worker by adding a limiter configuration const worker new Worker( document-processing, ... { connection, // adding limiter limiter: { max: 100, // max jobs processed duration: 60000 // per 60 seconds (in ms) }, } );现在 BullMQ 自动将处理速度限制为每分钟 100 个任务无论队列中有多少任务或有多少 Worker 实例在运行。任务只是停留在delayed状态直到有处理能力。没有429错误不需要手动限流逻辑。仅这一点就值得采用队列系统。8、 检查任务状态轮询还记得我们从queue.add()获得的那个任务 ID 吗以下是我们如何使用它。这就是我们的前端在显示处理中…状态时轮询的内容import { Queue } from bullmq; const document_queue new Queue(document-processing, { connection }); // check job status async function getJobStatus(job_id: string) { const job await document_queue.getJob(job_id); if (!job) { return { status: not_found }; } const state await job.getState(); const result job.returnvalue; // only populated when completed return { id: job.id, state, // waiting | active | completed | failed | delayed result, failedReason: job.failedReason, }; }我们的 API 端点暴露了这个功能我们的前端每隔几秒轮询一次我们在不保持开放连接的情况下显示进度。干净简洁。9、常见问题解答为什么选 BullMQ 而不是 RabbitMQ这个问题经常被问到。两者都是合法的队列系统但它们解决的是略有不同的问题。RabbitMQ 是一个完整的消息代理一个独立的服务有自己的协议AMQP、自己的概念交换机、绑定、路由键和自己的运维开销。它功能强大且久经考验特别适用于需要根据复杂规则在许多不同消费者之间路由消息的微服务。BullMQ 是一个运行在 Redis 之上的任务队列库而 Redis 我们可能已经有了。它专注于一个用例将任务入队、可靠地处理它们、跟踪它们的状态。简而言之BullMQ 最适合后台任务和任务队列。RabbitMQ 最适合事件流和微服务消息传递。何时切换到 RabbitMQ 或 Kafka在某些场景下你确实会想要迁移到 RabbitMQ 或 Kafka。切换到 RabbitMQ 的时机你需要扇出——一个事件需要同时触发多个独立的消费者例如文档已上传应该同时触发导入流水线、通知服务和分析事件你正在构建一个微服务网格其中许多服务需要通过消息以复杂的路由规则进行通信你需要协议级别的互操作性——你的一些服务不是 Node.js需要使用 AMQP 协议切换到 Kafka 的时机你需要一个事件日志——你想要重放过去的事件而不仅仅是处理一次你正在处理极高的吞吐量——想想每秒数百万事件社交媒体信息流、IoT 遥测、点击流你需要流处理——实时聚合、转换或连接事件流你的保留要求意味着你需要将事件保存数天或数周以便下游消费者赶上10、结束语我们涵盖了以下内容为什么同步 API 调用在现实条件下会崩溃生产者/队列/Worker的思维模型一个可以处理重试、限流和任务状态跟踪的 BullMQ 工作设置以及最后什么时候坚持使用 BullMQ什么时候该使用更重的工具。BullMQ 确实足以处理大多数用例的后台处理。从你的 API 处理程序中投入任务让 Worker 以可控的速度处理它们然后从客户端轮询状态。后续方向Webhook 替代轮询——不再每隔几秒轮询一次检查状态我们可以让 Worker 在任务完成时调用一个 webhook URL。更高效更少噪音。优先级队列——有些任务比其他任务更紧急。BullMQ 支持queue.add()上的priority字段。BullMQ 仪表盘——bull-board是一个用于监控队列、检查失败任务和手动重试的仪表盘 UI。在生产环境中非常有用。任务流——BullMQ 有父/子任务的概念父任务会等待所有子任务完成。适用于多步骤流水线。如果你是消息队列系统的新手队列模式可能需要一些思维上的开销来设置。但一旦它在那里添加新的后台任务类型就是小事一桩。而且你将为未来更复杂的消息队列系统做好准备。添加队列是那些能快速获得回报的架构决策之一。一如既往持续学习快乐编码原文链接BullMQAI系统缺失的队列层 - 汇智网

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2574223.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

SpringBoot-17-MyBatis动态SQL标签之常用标签

文章目录 1 代码1.1 实体User.java1.2 接口UserMapper.java1.3 映射UserMapper.xml1.3.1 标签if1.3.2 标签if和where1.3.3 标签choose和when和otherwise1.4 UserController.java2 常用动态SQL标签2.1 标签set2.1.1 UserMapper.java2.1.2 UserMapper.xml2.1.3 UserController.ja…

wordpress后台更新后 前端没变化的解决方法

使用siteground主机的wordpress网站,会出现更新了网站内容和修改了php模板文件、js文件、css文件、图片文件后,网站没有变化的情况。 不熟悉siteground主机的新手,遇到这个问题,就很抓狂,明明是哪都没操作错误&#x…

网络编程(Modbus进阶)

思维导图 Modbus RTU(先学一点理论) 概念 Modbus RTU 是工业自动化领域 最广泛应用的串行通信协议,由 Modicon 公司(现施耐德电气)于 1979 年推出。它以 高效率、强健性、易实现的特点成为工业控制系统的通信标准。 包…

UE5 学习系列(二)用户操作界面及介绍

这篇博客是 UE5 学习系列博客的第二篇,在第一篇的基础上展开这篇内容。博客参考的 B 站视频资料和第一篇的链接如下: 【Note】:如果你已经完成安装等操作,可以只执行第一篇博客中 2. 新建一个空白游戏项目 章节操作,重…

IDEA运行Tomcat出现乱码问题解决汇总

最近正值期末周,有很多同学在写期末Java web作业时,运行tomcat出现乱码问题,经过多次解决与研究,我做了如下整理: 原因: IDEA本身编码与tomcat的编码与Windows编码不同导致,Windows 系统控制台…

利用最小二乘法找圆心和半径

#include <iostream> #include <vector> #include <cmath> #include <Eigen/Dense> // 需安装Eigen库用于矩阵运算 // 定义点结构 struct Point { double x, y; Point(double x_, double y_) : x(x_), y(y_) {} }; // 最小二乘法求圆心和半径 …

使用docker在3台服务器上搭建基于redis 6.x的一主两从三台均是哨兵模式

一、环境及版本说明 如果服务器已经安装了docker,则忽略此步骤,如果没有安装,则可以按照一下方式安装: 1. 在线安装(有互联网环境): 请看我这篇文章 传送阵>> 点我查看 2. 离线安装(内网环境):请看我这篇文章 传送阵>> 点我查看 说明&#xff1a;假设每台服务器已…

XML Group端口详解

在XML数据映射过程中&#xff0c;经常需要对数据进行分组聚合操作。例如&#xff0c;当处理包含多个物料明细的XML文件时&#xff0c;可能需要将相同物料号的明细归为一组&#xff0c;或对相同物料号的数量进行求和计算。传统实现方式通常需要编写脚本代码&#xff0c;增加了开…

LBE-LEX系列工业语音播放器|预警播报器|喇叭蜂鸣器的上位机配置操作说明

LBE-LEX系列工业语音播放器|预警播报器|喇叭蜂鸣器专为工业环境精心打造&#xff0c;完美适配AGV和无人叉车。同时&#xff0c;集成以太网与语音合成技术&#xff0c;为各类高级系统&#xff08;如MES、调度系统、库位管理、立库等&#xff09;提供高效便捷的语音交互体验。 L…

(LeetCode 每日一题) 3442. 奇偶频次间的最大差值 I (哈希、字符串)

题目&#xff1a;3442. 奇偶频次间的最大差值 I 思路 &#xff1a;哈希&#xff0c;时间复杂度0(n)。 用哈希表来记录每个字符串中字符的分布情况&#xff0c;哈希表这里用数组即可实现。 C版本&#xff1a; class Solution { public:int maxDifference(string s) {int a[26]…

【大模型RAG】拍照搜题技术架构速览:三层管道、两级检索、兜底大模型

摘要 拍照搜题系统采用“三层管道&#xff08;多模态 OCR → 语义检索 → 答案渲染&#xff09;、两级检索&#xff08;倒排 BM25 向量 HNSW&#xff09;并以大语言模型兜底”的整体框架&#xff1a; 多模态 OCR 层 将题目图片经过超分、去噪、倾斜校正后&#xff0c;分别用…

【Axure高保真原型】引导弹窗

今天和大家中分享引导弹窗的原型模板&#xff0c;载入页面后&#xff0c;会显示引导弹窗&#xff0c;适用于引导用户使用页面&#xff0c;点击完成后&#xff0c;会显示下一个引导弹窗&#xff0c;直至最后一个引导弹窗完成后进入首页。具体效果可以点击下方视频观看或打开下方…

接口测试中缓存处理策略

在接口测试中&#xff0c;缓存处理策略是一个关键环节&#xff0c;直接影响测试结果的准确性和可靠性。合理的缓存处理策略能够确保测试环境的一致性&#xff0c;避免因缓存数据导致的测试偏差。以下是接口测试中常见的缓存处理策略及其详细说明&#xff1a; 一、缓存处理的核…

龙虎榜——20250610

上证指数放量收阴线&#xff0c;个股多数下跌&#xff0c;盘中受消息影响大幅波动。 深证指数放量收阴线形成顶分型&#xff0c;指数短线有调整的需求&#xff0c;大概需要一两天。 2025年6月10日龙虎榜行业方向分析 1. 金融科技 代表标的&#xff1a;御银股份、雄帝科技 驱动…

观成科技:隐蔽隧道工具Ligolo-ng加密流量分析

1.工具介绍 Ligolo-ng是一款由go编写的高效隧道工具&#xff0c;该工具基于TUN接口实现其功能&#xff0c;利用反向TCP/TLS连接建立一条隐蔽的通信信道&#xff0c;支持使用Let’s Encrypt自动生成证书。Ligolo-ng的通信隐蔽性体现在其支持多种连接方式&#xff0c;适应复杂网…

铭豹扩展坞 USB转网口 突然无法识别解决方法

当 USB 转网口扩展坞在一台笔记本上无法识别,但在其他电脑上正常工作时,问题通常出在笔记本自身或其与扩展坞的兼容性上。以下是系统化的定位思路和排查步骤,帮助你快速找到故障原因: 背景: 一个M-pard(铭豹)扩展坞的网卡突然无法识别了,扩展出来的三个USB接口正常。…

未来机器人的大脑:如何用神经网络模拟器实现更智能的决策?

编辑&#xff1a;陈萍萍的公主一点人工一点智能 未来机器人的大脑&#xff1a;如何用神经网络模拟器实现更智能的决策&#xff1f;RWM通过双自回归机制有效解决了复合误差、部分可观测性和随机动力学等关键挑战&#xff0c;在不依赖领域特定归纳偏见的条件下实现了卓越的预测准…

Linux应用开发之网络套接字编程(实例篇)

服务端与客户端单连接 服务端代码 #include <sys/socket.h> #include <sys/types.h> #include <netinet/in.h> #include <stdio.h> #include <stdlib.h> #include <string.h> #include <arpa/inet.h> #include <pthread.h> …

华为云AI开发平台ModelArts

华为云ModelArts&#xff1a;重塑AI开发流程的“智能引擎”与“创新加速器”&#xff01; 在人工智能浪潮席卷全球的2025年&#xff0c;企业拥抱AI的意愿空前高涨&#xff0c;但技术门槛高、流程复杂、资源投入巨大的现实&#xff0c;却让许多创新构想止步于实验室。数据科学家…

深度学习在微纳光子学中的应用

深度学习在微纳光子学中的主要应用方向 深度学习与微纳光子学的结合主要集中在以下几个方向&#xff1a; 逆向设计 通过神经网络快速预测微纳结构的光学响应&#xff0c;替代传统耗时的数值模拟方法。例如设计超表面、光子晶体等结构。 特征提取与优化 从复杂的光学数据中自…