提高nodejs中promise的性能

news2025/5/16 1:56:16

提高nodejs中promise的性能

我们先来看一个常见问题,假设我们有 N 条记录需要处理,或者例如,为每条记录发出 API 请求以获取数据。

通常情况下我们都是使用promise.all方法来实现这一需求:

// 记录
const data = [{}, {}, {}];

// 处理所有的数据(获取所有记录的数据)
const results = await Promise.all(data.map(processRecord)); 

所以这里发生的是我们正在“并行”执行异步代码,所以它在时间线上可能看起来像这样。

在这里插入图片描述

可能我们都知道promise.all方法就是当最后一个promise也解决完才会有结果。 正如上图所看到的,Promise.all()的整体运行时间与批次中最慢的一样长。所以我们的主线程基本上是“什么都不做”,正在等待最慢的请求完成。

为了解决这个问题,本文将讲解如何使用更好的优化这个方法。

promise pool

Promise Pool 的想法是充分利用 Node.js 的主线程的潜力。为了实现更好的利用率,我们需要密集地打包 API 调用(或任何其他异步任务),这样我们就不会等待最长时间的调用完成,而是在第一个调用完成后立即安排下一个调用。

在这里插入图片描述
通过这种方式,我们可以实现多种目标:

  • 通过设置Promise Pool的并发数来控制我们服务的吞吐量;
  • 通过设置Promise Pool的并发度来管理下游服务的负载;
  • 提高我们应用程序的性能;减少 CPU 的空闲时间。

实现

让我们从需求开始。我们希望能够为 Promise Pool 提供一些记录和并发“请求”的最大数量以及可以进行处理的回调。处理后,我们希望收到每个单独记录的处理结果数组。

类似下面的方式:

const results = await PromisePool.for([])
    .withConcurrency(50)
    .process(async (data) => {
       return result;
    });

我们需要跟踪当前正在处理的记录数、总共处理了多少条记录以及其他参数。

const { EventEmitter } = require("events");
class PromisePool {
    constructor(
        data,
        concurrency,
        processor,
    ) {
        this.data = data;
        this.results = [];
        this.concurrency = concurrency;
        this.inFlightTasks = 0;
        this.processedEntries = 0;
        this.processor = processor;
        this.eventsEmitter = new EventEmitter();
        this.executionPromise = null;
    }
}

让我们添加一个结构方法以及公共函数来配置并发并指定处理器回调。


    static for(data) {
        return new this(data, 10, () => {})
    }

    withConcurrency(concurrency) {
        this.concurrency = concurrency;

        return this;
    }

    process(processor) {
        this.processor = processor;

        return this._processRecords();
    }

我们已经建立了基础,现在我们需要实现 Promise Pool 的关键。这个想法很简单:

我们迭代数组中的每个记录并等待可用的坐位(基本上是我们拥有的并发坐位数量中的空闲坐位),如果我们有空闲坐位,我们将继续安排以下作业,除非所有坐位都被占用。如果所有座位都被占用,我们会等待至少一项工作完成。

 _waitAvailableSit() {
        if (this.inFlightTasks >= this.concurrency) {
            return new Promise((res) => {
                this.eventsEmitter.once(PromisePool.TASK_COMPLETED, res);
            });
        } else {
            return Promise.resolve();
        }
    }

    async _processRecord(data) {
        try {
            this.inFlightTasks++;

            const result = await this.processor(data, this);

            this.results.push(result);
        } catch (e) {
        } finally {
            this.inFlightTasks--;
            this.processedEntries++;
            this.eventsEmitter.emit(PromisePool.TASK_COMPLETED);
            if (this.inFlightTasks === 0 && this.processedEntries === this.data.length) {
                this.eventsEmitter.emit(PromisePool.DRAIN)
            }
        }
    }
    _processRecords() {
        if (this.executionPromise !== null) {
            return this.executionPromise;
        }

        this.executionPromise = new Promise(async (res, rej) => {
            try {
                for (const element of this.data) {
                    await this._waitAvailableSit();

                    this._processRecord(element);
                }

                this.eventsEmitter.once(PromisePool.DRAIN, () => res(this.results));
            } catch (e) {
                rej(e)
            }
        });

        return this.executionPromise;
    }

那么,一个可以处理并发“请求”的promise pool就写好了。

性能测试

我们将把处理器模拟为需要 150 到 1150 毫秒执行的函数。

const processor = (i) => {
    return new Promise((res, rej) => {
        console.log(`[${Date.now()}] 开始处理 item. i=${i}`);
        setTimeout(() => {
            console.log(`[${Date.now()}] 开始处理 item. i=${i}`);
            res(i);
        }, 150 + Math.random() * 1000);
    });
}

然后我们将同时处理 1000 条记录的数组,分成 50 条记录的块。首先,我们使用 Promise.all 方法对其进行测试。

async function main() {
    const timeStart = Date.now();

    const data = Array.from({ length: 1000 }).map((d,i) => i);

    while (data.length > 0) {
        const arr = data.splice(0, 50);

        await Promise.all(arr.map(processor));
    }
    
    const timeEnd = Date.now();
    console.log(`Promise.all=${timeEnd-timeStart}`);
}

然后是使用刚刚写的PromisePool

async function main() {
    const timeStart = Date.now();

    const data = Array.from({ length: 1000 }).map((d,i) => i);
    const promisePool = PromisePool
        .for(data)
        .withConcurrency(50);

    await promisePool.process(processor);

    const timeEnd = Date.now();
    console.log(`PromisePool=${timeEnd-timeStart}`);
}

在这里插入图片描述

经过测试,使用 Promise.all 处理所有记录平均需要 22.9 秒,而 PromisePool 需要 12.9 秒,快了大约 40%!(大家可以代码复制下来去跑一下)

Promise Pool是同时处理多个记录的绝佳模式,它不仅可以帮助我们提高应用程序的性能,还可以让我们控制下游服务的速率限制,如果大家感兴趣可以自行扩展一下。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/924489.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Linux网络编程:线程池并发服务器 _UDP客户端和服务器_本地和网络套接字

文章目录: 一:线程池模块分析 threadpool.c 二:UDP通信 1.TCP通信和UDP通信各自的优缺点 2.UDP实现的C/S模型 server.c client.c 三:套接字 1.本地套接字 2.本地套 和 网络套对比 server.c client.c 一:线…

如何使用CSS实现一个3D旋转效果?

聚沙成塔每天进步一点点 ⭐ 专栏简介⭐ 3D效果实现⭐ 写在最后 ⭐ 专栏简介 前端入门之旅:探索Web开发的奇妙世界 记得点击上方或者右侧链接订阅本专栏哦 几何带你启航前端之旅 欢迎来到前端入门之旅!这个专栏是为那些对Web开发感兴趣、刚刚踏入前端领域…

有线耳机插入电脑没声音

有线耳机插入电脑没声音 首先确保耳机和电脑都没问题,那就有可能是声音输出设备设置错误 右击任务栏的声音图标-打开声音设置-选择输出设备。

长时间序列的25米全球sar卫星镶嵌数据

数据简介 1992年JAXA(Japan Aerospace Exploration Agency,日本宇宙航空研究开发机构)发射了一颗JERS-1卫星,该卫星携带有18*24m分辨率的SAR传感器。随后,JAXA又在2006年和2014年分别发射了带有SAR传感器的alos卫星和…

拓世科技集团 | “书剑人生”李步云学术思想研讨会暨李步云先生九十华诞志庆

2023年,中国改革开放迎来了45周年,改革春风浩荡,席卷神州大地,45年间,中国特色社会主义伟大事业大步迈入崭新境界,一路上结出了饶为丰硕的果实。中华民族在这45年间的砥砺前行,不仅使中国的经济…

Clion 找不到头文件,无法DEBUG,无法进入断点,断点灰了

文章目录 1. 找不到远程toolchain的头文件2.gdb未安装或版本不对无法debug3.远程debug断点灰了 1. 找不到远程toolchain的头文件 ①检查cmake头文件引入指令include_directories(SYSTEM "/usr/lib/jvm/jdk-17-oracle-x64/include") ②菜单栏Tools->Resyinc with r…

运放的分类、运放的参数

一、运放的分类 运放按功能分为通用运放与专用运放(高速运放、精密运放、低IB运放等)。 1.1通用运放 除廉价外,没有任何最优指标的运放。 例:uA741,LM324,TL06X,TL07X、TL08X等 国外知名运放…

科技赋能,教育革新——大步迈向体育强国梦

在 "全民健身"、"体育强国建设"战略的推进下,体育考试成绩被纳入重要升学考试且分值不断提高,体育科目的地位逐步上升到前所未有的高度,在此趋势下,体育教学正演变出更多元化、个性化的需求。然而现实中却面临…

ubuntu20.04 编译安装运行emqx

文章目录 安装依赖编译运行登录dashboard压力测试 安装依赖 Erlang/OTP OTP 24 或 25 版本 apt-get install libncurses5-dev sudo apt-get install erlang如果安装的erlang版本小于24的话,可以使用如下方法自行编译erlang 1.源码获取 wget https://github.com/erla…

ARM开发,stm32mp157a-A7核IIC实验(采集温湿度传感器值)

1.实验目标:采集温湿度传感器值; 2.分析框图(模拟IIC控制器); 3.代码; ---iic.h封装时序协议头文件--- #ifndef __IIC_H__ #define __IIC_H__ #include "stm32mp1xx_gpio.h" #include "st…

bash: conda: command not found

问题描述: 在Pycharm上用SSH远程连接到服务器,打开Terminal准备查看用 conda 创建的虚拟环境时,却发现调用 conda 指令时出现以下报错: -bash: conda: command not found如果使用Xshell 利用端口号直接连接该 docker 容器&#…

学无止境·运维高阶⑦Docker进阶一(构建个人网盘)

Docker进阶一 1、使用mysql:5.6和 owncloud 镜像,构建一个个人网盘。1.1 拉取镜像1.2 创建容器1.3登录查看 1、使用mysql:5.6和 owncloud 镜像,构建一个个人网盘。 1.1 拉取镜像 [rootnode3 ~]# docker pull mysql:5.6 [rootnode3 ~]# docker pull own…

基于jeecg-boot的flowable流程跳转功能实现

更多nbcio-boot功能请看演示系统 gitee源代码地址 后端代码: https://gitee.com/nbacheng/nbcio-boot 前端代码:https://gitee.com/nbacheng/nbcio-vue.git 在线演示(包括H5) : http://122.227.135.243:9888 今天我…

云计算在线实训系统建设方案

一、 人工智能与云计算系统概述 人工智能(Artificial Intelligence,简称AI)是一种模拟人类智能的科学和工程,通过使用计算机系统来模拟、扩展和增强人类的智能能力。人工智能涉及多个领域,包括机器学习、深度学习、自然…

零基础如何使用IDEA启动前后端分离中的前端项目(Vue)?

一、在IDEA中配置vue插件 点击File-->Settings-->Plugins-->搜索vue.js插件进行安装,下面的图中我已经安装好了 二、搭建node.js环境 安装node.js 可以去官网下载:安装过程就很简单,直接下一步就行 测试是否安装成功:要…

HCIP-OpenStack组件之neutron

neutron(ovs、ovn) OVS OVS(Open vSwitch)是虚拟交换机,遵循SDN(Software Defined Network,软件定义网络)架构来管理的。 OVS介绍参考:https://mp.weixin.qq.com/s?__bizMzAwMDQyOTcwOA&mid2247485088&idx1…

基于PyTorch深度学习遥感影像地物分类与目标检测、分割及遥感影像问题深度学习优化

我国高分辨率对地观测系统重大专项已全面启动,高空间、高光谱、高时间分辨率和宽地面覆盖于一体的全球天空地一体化立体对地观测网逐步形成,将成为保障国家安全的基础性和战略性资源。未来10年全球每天获取的观测数据将超过10PB,遥感大数据时…

【面试题】MVC、MVP与MVVM模式是什么?

MVC模式 MVC是应用最广泛的软件架构之一,一般MVC分为: Model( 模型 )、Controller( 控制器 )、View( 视图 )。 这主要是基于分层的目的,让彼此的职责分开。View 一般…

R语言快速生成三线表(1)

R语言的优势在于批量处理&#xff0c;常使用到循环和函数&#xff0c;三线表是科研文章中必备的内容。利用函数实现自动判断数据类型和计算。使用R包&#xff08;table1&#xff09;。 # 创建连续性变量 continuous_var1 <- c(1.2, 2.5, 3.7, 4.8, 5.9) continuous_var2 &l…

Kali 软件管理

kali 更新 1. 查看发行版本 ┌──(root㉿kali)-[~] └─# lsb_release -a No LSB modules are available. Distributor ID: Kali Description: Kali GNU/Linux Rolling Release: 2023.2 Codename: kali-rolling2. 查看内核版本 ┌──(root㉿kali)-[~] └─…