如何用Golang处理每分钟100万个请求

news2025/9/20 8:29:35

用Golang处理每分钟100万个请求

转载请注明来源:https://janrs.com/9yaq


面临的问题

在我设计一个分析系统中,我们公司的目标是能够处理来自数百万个端点的大量POST请求。web 网络处理程序将收到一个JSON文档,其中可能包含许多有效载荷的集合,需要写入Amazon S3,以便我们的地图还原系统随后对这些数据进行操作。

传统上,我们会研究创建一个工人层架构,利用诸如以下东西:

  • Sidekiq
  • Resque
  • DelayedJob
  • Elasticbeanstalk Worker Tier
  • RabbitMQ
  • 还有等等其他的技术手段…

并设置 2 个不同的集群,一个用于 Web 前端,另一个用于 worker 处理进程,这样我们就可以扩大我们可以处理的后台工作量。

但从一开始,我们的团队就知道我们应该在 Go 中这样做,因为在讨论阶段我们看到这可能是一个非常大的流量系统。 我使用 Go 已有大约 2 年左右的时间,我们公司在处理业务时开发了一些系统,但没有一个能承受如此大的负载。以下是优化的过程。

我们首先创建一些结构体来定义我们将通过 POST 调用接收的 Web 请求负载,以及一种将其上传到我们的 S3 存储桶的方法。代码如下:

type PayloadCollection struct {
	WindowsVersion  string    `json:"version"`
	Token           string    `json:"token"`
	Payloads        []Payload `json:"data"`
}

type Payload struct {
    // ...负载字段
}

func (p *Payload) UploadToS3() error {
	// storageFolder 方法确保在我们在键名中获得相同时间戳时不会发生名称冲突
	storage_path := fmt.Sprintf("%v/%v", p.storageFolder, time.Now().UnixNano())

	bucket := S3Bucket

	b := new(bytes.Buffer)
	encodeErr := json.NewEncoder(b).Encode(payload)
	if encodeErr != nil {
		return encodeErr
	}

	// 我们发布到 S3 存储桶的所有内容都应标记为“私有”
	var acl = s3.Private
	var contentType = "application/octet-stream"

	return bucket.PutReader(storage_path, b, int64(b.Len()), contentType, acl, s3.Options{})
}

使用 Go 协程

最初我们采用了一个非常简单的 POST 处理程序实现,只是试图将job 处理程序并行化到一个简单的 goroutine 中:

func payloadHandler(w http.ResponseWriter, r *http.Request) {

	if r.Method != "POST" {
		w.WriteHeader(http.StatusMethodNotAllowed)
		return
	}

	// 将body读入字符串进行json解码
	var content = &PayloadCollection{}
	err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content)
	if err != nil {
		w.Header().Set("Content-Type", "application/json; charset=UTF-8")
		w.WriteHeader(http.StatusBadRequest)
		return
	}
	
	// 分别检查每个有效负载和队列项目以发布到 S3
	for _, payload := range content.Payloads {
		go payload.UploadToS3()   // <----- 这是不建议的做法。这里是最开始的做法。
	}

	w.WriteHeader(http.StatusOK)
}

对于中等负载,这可能适用于大多数公司的流量,但很快证明这在大规模情况下效果不佳。 我们期望有很多请求,但没有达到我们将第一个版本部署到生产环境时开始看到的数量级。 我们完全低估了流量。

上面的方法在几个不同的方面是不好的。 无法控制我们生成了多少个 go routines。 由于我们每分钟收到 100 万个 POST 请求,因此这段代码很快崩溃了。

进一步优化

我们需要找到一种不同的方式。 从一开始我们就开始讨论我们需要如何保持请求处理程序的生命周期非常短,并在后台进行生成处理。 当然,这是你在使用 Ruby on Rails 时必须做的,否则你将阻止所有可用的 worker web 处理器,无论你使用的是 puma、unicorn 还是 passenger(请不要进入 JRuby 讨论)。 然后我们需要利用常见的解决方案来做到这一点,例如 Resque、Sidekiq、SQS 等等,有很多方法可以实现这一点。

所以第二次迭代是创建一个缓冲通道,我们可以创建一些队列,然后把 job push到队列并将它们上传到 S3,并且由于我们可以控制job 队列中的最大数数量并且我们有足够的内存来处理队列中的 job。在这个方案中,我们认为只需要在通道队列中缓冲需要处理的 job 就可以了。

代码如下:

var Queue chan Payload

func init() {
    Queue = make(chan Payload, MAX_QUEUE)
}

func payloadHandler(w http.ResponseWriter, r *http.Request) {
    ...
    // 分别检查每个有效负载和队列项目以发布到 S3
    for _, payload := range content.Payloads {
        Queue <- payload // <----- 这是建议的做法。
    }
    ...
}

然后为了实际出列作业并处理它们,我们使用了类似的东西:

func StartProcessor() {
    for {
        select {
        case job := <-Queue:
            job.payload.UploadToS3()  // <-- 这里虽然优化了,但还不是最好的。
        }
    }
}

在上面的代码中,我们用一个缓冲队列来交换有缺陷的并发性,而缓冲队列只是推迟了问题。 我们的同步处理器一次只将一个有效负载上传到 S3,并且由于传入请求的速率远远大于单个处理器上传到 S3 的能力,我们的 job 缓冲通道很快达到了极限并阻止了请求处理程序的能力,队列很快就阻塞满了。

我们只是在避免这个问题,并开始倒计时,直到我们的系统最终死亡。 在我们部署这个有缺陷的版本后,我们的延迟率在几分钟内以恒定的速度持续增加。以下是延迟率增长图:

更好的解决方案

我们决定在使用 Go 通道时使用一种通用模式,以创建一个 2 层通道系统,一个用于 Job 队列,另一个用于控制同时在 Job 队列上操作的 Worker 的数量。

这个想法是将上传到 S3 的数据并行化到某种程度上可持续的速度,这种速度既不会削弱机器也不会开始从 S3 生成连接错误。 所以我们选择创建 Job/Worker 模式。 对于那些熟悉 Java、C# 等的人来说,可以将其视为 Golang 使用通道实现 Worker 线程池的方式。

代码如下:

var (
	MaxWorker = os.Getenv("MAX_WORKERS")
	MaxQueue  = os.Getenv("MAX_QUEUE")
)

// Job 表示要运行的作业
type Job struct {
	Payload Payload
}

// 我们可以在 Job 队列上发送工作请求的缓冲通道。
var JobQueue chan Job

// Worker 代表执行作业的 Worker。
type Worker struct {
	WorkerPool  chan chan Job
	JobChannel  chan Job
	quit    	chan bool
}

func NewWorker(workerPool chan chan Job) Worker {
	return Worker{
		WorkerPool: workerPool,
		JobChannel: make(chan Job),
		quit:       make(chan bool)}
}

// Start 方法为 Worker 启动循环监听。监听退出信号以防我们需要停止它。
func (w Worker) Start() {
	go func() {
		for {
			// 将当前 woker 注册到工作队列中。
			w.WorkerPool <- w.JobChannel

			select {
			case job := <-w.JobChannel:
				// 接收 work 请求。
				if err := job.Payload.UploadToS3(); err != nil {
					log.Errorf("Error uploading to S3: %s", err.Error())
				}

			case <-w.quit:
				// 接收一个退出的信号。
				return
			}
		}
	}()
}

// 将退出信号传递给 Worker 进程以停止处理清理。
func (w Worker) Stop() {
	go func() {
		w.quit <- true
	}()
}

我们已经修改了我们的 Web 请求处理程序,以创建一个带有有效负载的 Job 结构实例,并将其发送到 JobQueue 通道以供 Worker 提取。

func payloadHandler(w http.ResponseWriter, r *http.Request) {

    if r.Method != "POST" {
		w.WriteHeader(http.StatusMethodNotAllowed)
		return
	}

    // 将body读入字符串进行json解码
	var content = &PayloadCollection{}
	err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content)
    if err != nil {
		w.Header().Set("Content-Type", "application/json; charset=UTF-8")
		w.WriteHeader(http.StatusBadRequest)
		return
	}

    // 分别检查每个有效负载和队列项目以发布到 S3
    for _, payload := range content.Payloads {

        // 创建一个有效负载的job
        work := Job{Payload: payload}

		// 将 work push 到队列。
        JobQueue <- work
    }

    w.WriteHeader(http.StatusOK)
}

在我们的 Web 服务器初始化期间,我们创建一个 Dispatcher 调度器并调用 Run() 来创建 Woker 工作池并开始侦听将出现在 Job 队列中的 Job。

dispatcher := NewDispatcher(MaxWorker) 
dispatcher.Run()

下面是我们的调度程序实现的代码:

type Dispatcher struct {
	// 通过调度器注册一个 Worker 通道池
	WorkerPool chan chan Job
}

func NewDispatcher(maxWorkers int) *Dispatcher {
	pool := make(chan chan Job, maxWorkers)
	return &Dispatcher{WorkerPool: pool}
}

func (d *Dispatcher) Run() {
	// 启动指定数量的 Worker
	for i := 0; i < d.maxWorkers; i++ {
		worker := NewWorker(d.pool)
		worker.Start()
	}

	go d.dispatch()
}

func (d *Dispatcher) dispatch() {
	for {
		select {
		case job := <-JobQueue:
			// 接收一个 job 请求
			go func(job Job) {
				// 尝试获取可用的 worker job 通道
				// 这将阻塞 worker 直到空闲
				jobChannel := <-d.WorkerPool

				// 调度一个 job 到 worker job 通道
				jobChannel <- job
			}(job)
		}
	}
}

请注意,我们提供了要实例化并添加到我们的 Worker 池中的最大worker 数量。 由于我们在这个项目中使用了 Amazon Elasticbeanstalk 和 dockerized Go 环境,因此我们从环境变量中读取这些值。 这样我们就可以控制 Job 队列的数量和最大大小,因此我们可以快速调整这些值而无需重新部署集群。

var ( 
  MaxWorker = os.Getenv("MAX_WORKERS")
  MaxQueue  = os.Getenv("MAX_QUEUE")
)

在我们部署它之后,我们立即看到我们所有的延迟率都下降到极低的延迟,并且我们处理请求的能力急剧上升。以下是流量截图:

在我们的弹性负载均衡器完全预热几分钟后,我们看到我们的 ElasticBeanstalk 应用程序每分钟处理近 100 万个请求。 我们通常在早上有几个小时的流量会飙升至每分钟超过一百万。

一旦我们部署了新代码,服务器数量就从 100 台服务器大幅下降到大约 20 台服务器。以下是服务器数量变化截图:

在正确配置集群和自动缩放设置后,我们能够将其进一步降低到仅 4x EC2 c4.Large 实例,并且如果 CPU 使用率超过 90% 持续 5 天,Elastic Auto-Scaling 将生成一个新实例 分钟值。以下是截图:

总结

可以看出利用 Elasticbeanstalk 自动缩放的强大功能以及 Golang 提供的开箱即用的高效和简单的并发方法,就可以构建出一个高性能的处理程序。


转载请注明来源:https://janrs.com/9yaq

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/411563.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Spring Boot基础学习之(十六):用户认证和授权

本次项目使用静态资源代码免费下载 https://download.csdn.net/download/m0_52479012/87679062?spm1001.2014.3001.5501 在日常的登录网页中&#xff0c;是怎么分辨那些用户是具有那种权限呢&#xff1f; 本次博客实现的功能是&#xff1a;哪些网页是谁有可以访问的&#xff…

进程状态概念详解!(7千字长文详解)

进程状态概念详解 文章目录进程状态概念详解进程状态为什么会有这些状态运行总结阻塞就绪/新建挂起阻塞和挂起的区别&#xff01;linux下的进程状态运行——R睡眠——S暂停——T关于号深度睡眠——D追踪暂停 ——t死亡——X僵尸——Z僵尸进程的危害总结孤儿进程——S状态进程优…

一位腾讯在职7年测试工程师的心声...

作为一个在腾讯工作7年的测试工程师&#xff0c;今天就来聊聊腾讯工作压力到底从何而来。 压力的开始&#xff1a;时间回到7年前&#xff0c;我人生中的第一份实习工作&#xff0c;是在腾讯公司做一个自动化测试工程师。当时的我可谓意气风发&#xff0c;想要大干一场&#xf…

SpringBoot 介绍

1.简介 SpringBoot最开始基于Spring4.0设计&#xff0c;是由Pivotal公司提供的框架。 SpringBoot发展史&#xff1a; 2003年Rod Johnson成立Interface公司&#xff0c;产品是SpringFramework2004年&#xff0c;Spring框架开源&#xff0c;公司改名为Spring Source2008年&…

[MAUI 项目实战] 手势控制音乐播放器(三): 动画

文章目录吸附动画确定位置平移动画回弹动画使用自定义缓动函数多重动画点击动画项目地址上一章节我们创建了手势容器控件PanContainer&#xff0c;它对拖拽物进行包装并响应了平移手势和点击手势。拖拽物现在虽然可以响应手势操作&#xff0c;但视觉效果较生硬&#xff0c;一个…

【ros2】ubuntu18.04同时安装ros1和ros2

序言 ubuntu18.04&#xff08;已安装ros melodic&#xff09;中安装ros2 dashing版本&#xff0c;以支持ros2工程的编译使用 1. 安装ros melodic 参考我之前的文章&#xff1a;docker容器中安装melodic-ros-core过程总结 2. 安装ros2 dashing &#xff08;1&#xff09;设置…

《花雕学AI》12:从ChatGPT的出现看人类与人工智能的互补关系与未来发展

马云说道&#xff0c;ChatGPT这一类技术已经对教育带来挑战&#xff0c;但是ChatGPT这一类技术只是AI时代的开始。 谷歌CEO桑德尔皮猜曾说&#xff1a;“人工智能是我们人类正在从事的最为深刻的研究方向之一&#xff0c;甚至要比火与电还更加深刻。” 360周鸿祎认为&#xf…

用WPF设计一个简易的休息提醒闹钟

目录一.视频演示地址二.代码展示三.源代码&#xff1a;最近利用工作之余&#xff0c;写了一个WPF程序玩玩&#xff0c;用来提醒自己在长时间学习后要休息一会儿哈哈&#xff0c;功能很简单&#xff0c;没啥难点一.视频演示地址 可以设定间隔提醒时长和休息时长&#xff0c;点击…

【C++】STL之stack、queue的使用和模拟实现+优先级队列(附仿函数)+容器适配器详解

之前的一段时间&#xff0c;我们共同学习了STL中一些容器&#xff0c;如string、vector和list等等。本章我们将步入新阶段的学习——容器适配器。本章将详解stack、queue的使用和模拟实现优先级队列&#xff08;附仿函数&#xff09;容器适配器等。 目录 &#xff08;一&…

WMI系列--关于WMI

本系列预计有三节,分别记录关于WMI的一些基础知识&#xff0c;WMI的永久订阅事件,WMI常见的攻防对抗手段 WMI简介 WMI 的全称是 Windows Management Instrumentation&#xff0c;即 Windows 管理规范&#xff0c;在 Windows 操作系统中&#xff0c;随着 WMI 技术的引入并在之…

Document Imaging SDK 11.6 for .NET Crack

Document Imaging SDK for .NET View, Convert, Annotate, Process,Edit, Scan, OCR, Print 基本上被认为是一种导出 PDF 解决方案&#xff0c;能够为用户和开发人员提供完整且创新的 PDF 文档处理属性。它具有提供简单集成的能力&#xff0c;可用于增强用户 .NET 的文档成像程…

c语言—指针进阶

创作不易&#xff0c;本篇文章如果帮助到了你&#xff0c;还请点赞支持一下♡>&#x16966;<)!! 主页专栏有更多知识&#xff0c;如有疑问欢迎大家指正讨论&#xff0c;共同进步&#xff01; 给大家跳段街舞感谢支持&#xff01;ጿ ኈ ቼ ዽ ጿ ኈ ቼ ዽ ጿ ኈ ቼ ዽ ጿ…

HC小区管理系统-海康摄像头监控配置

HC小区管理系统-海康摄像头监控配置 【HC小区管理系统-海康摄像头监控配置】 HC小区管理系统-海康摄像头监控配置_哔哩哔哩_bilibili 监控配置说明&#xff1a; 一、安装HC物业系统 HC小区管理系统安装本地代码发布 二、安装物联网系统 三、安装srs 流媒体服务器 四、启动s…

MobTech MobLink|裂变拓新,助力运营

一、打破移动应用孤岛 在移动互联网时代&#xff0c;应用的数量和质量都在不断上升&#xff0c;用户的需求和体验也越来越高。然而&#xff0c;应用之间的跳转和互通却存在很多障碍和不便&#xff0c;导致用户的流失和挫败感。例如&#xff1a; 用户在浏览器或社交平台上看到一…

看完这个你就牛了,自动化测试框架设计

一、引言 随着IT技术的快速发展&#xff0c;软件开发变得越来越快速和复杂化。在这种背景下&#xff0c;传统的手工测试方式已经无法满足测试需求&#xff0c;而自动化测试随之而生。 自动化测试可以提高测试效率和测试质量&#xff0c;减少重复性的测试工作&#xff0c;从而…

前端大概要知道的 AST

认识 AST 定义&#xff1a;在计算机科学中&#xff0c;抽象语法树是源代码语法结构的一种抽象表示。它以树状的形式表现编程语言的语法结构&#xff0c;树上的每个节点都表示源代码中的一种结构。之所以说语法是“抽象”的&#xff0c;是因为这里的语法并不会表示出真实语法中…

手机测试—adb

一、Android Debug Bridge 1.1 Android系统主要的目录 1.2 ADB工具介绍 ADB的全称为Android Debug Bridge,就是起到调试桥的作用,是Android SDK里面一个多用途调试工具,通过它可以和Android设备或模拟器通信,借助adb工具,我们可以管理设备或手机模拟器的状态。还可以进行很多…

【负荷预测】基于VMD-SSA-LSTM光伏功率预测【可以换数据变为其他负荷等预测】(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…

文件小注意

目录 0 前言 1 标识 O_CREAT O_APPEND 2 ftruncate与truncate 3 O_DIRECT与O_DSYNC、O_SYNC 4 open与fopen 5 关于mmap 0 前言 文件操作在软件开发中是很常见的一件事。虽然与它相关的工作看起来不怎么起眼&#xff0c;无非就是通过通过open、read、write、close几个调用…

Unity——网格变形(制作一个压力球)

主要参考链接&#xff1a;Mesh Deformation, a Unity C# Tutorial&#xff08;本文为其翻译版&#xff09; unity项目下载链接&#xff1a;https://download.csdn.net/download/weixin_43042683/87679832 在物体上投射射线并画出调试线。将力转换为顶点的速度。用弹簧和阻尼保…