基于 Flask 和 RabbitMQ 构建高效消息队列系统:从数据生成到消费

news2025/5/16 2:49:03

简介

在构建 Web 应用时,处理和传输大量数据是不可避免的。对于需要高效、可扩展的消息处理和异步任务执行的场景,使用 RabbitMQ(一种流行的消息队列中间件)与 Flask(一个轻量级的 Python Web 框架)结合,能够大大提升应用的性能和可靠性。本文将带你通过一个基于 Flask 和 RabbitMQ 的实际应用案例,深入了解如何构建一个高效的消息队列系统,完成从生成假数据到消费数据的全过程。

背景

我们要开发一个移动可视化平台监控系统,并且这些信息需要被实时分析或存储。面对这样的需求,直接将所有逻辑放在单个应用中可能会导致性能瓶颈。因此,我们考虑采用微服务架构,通过分离数据生成与处理逻辑来提高系统的可扩展性和响应速度。

环境介绍

为了实现这个项目,我们需要以下环境:

  • Python:一个强大的编程语言,适合快速开发。
  • Flask:一个轻量级的Web应用框架。
  • Pika:Python的RabbitMQ客户端库。
  • Faker:一个生成伪数据的Python库,用于生成测试数据
  • RabbitMQ:消息队列服务,用来存储 生产者产生的数据

技术选型

  • Flask:轻量级Web框架,非常适合快速开发小型到中等规模的应用。
  • RabbitMQ:一个广泛使用的开源消息代理软件(也称为消息中间件),用于实现应用程序之间的通信。

系统架构概览

  1. 生产者:负责生成模拟用户数据并将其发送至RabbitMQ。
  2. 消费者:从RabbitMQ接收数据后执行特定任务,如数据分析或存储。
  3. Flask应用:提供REST API接口给外部调用,同时启动消费者线程监听RabbitMQ中的消息。

搭建RabbitMQ服务

我们使用docker来搭建服务,如果win可以直接跑程序,相关流程请自行查询

临时使用(停止会自动删除服务)

docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 docker.cloudimages.asia/rabbitmq:4.0-management

长久使用

docker run -it -d --name rabbitmq -p 5672:5672 -p 15672:15672 docker.cloudimages.asia/rabbitmq:4.0-management
[root@prometheus-server ~]# docker ps | grep 9b3a9355fa4a
9b3a9355fa4a   docker.cloudimages.asia/rabbitmq:4.0-management   "docker-entrypoint.s…"   21 seconds ago   Up 19 seconds             4369/tcp, 5671/tcp, 0.0.0.0:5672->5672/tcp, :::5672->5672/tcp, 15671/tcp, 15691-15692/tcp, 25672/tcp, 0.0.0.0:15672->15672/tcp, :::15672->15672/tcp           rabbitmq

访问页面地址:http://192.168.82.105:15672/ 使用 RabbitMQ 的管理界面。
访问账号和密码: guest | guest

在这里插入图片描述
队列页面
在这里插入图片描述

生产者:将数据发送到 RabbitMQ 队列

生产者的任务是生成一些假数据,并将这些数据发送到 RabbitMQ 队列中。我们使用 Faker 库生成数据,并通过 RabbitMQ 的 basic_publish 方法发送消息。

确保你的环境中安装了Python。然后,使用pip安装Flask、Pika和Faker:

pip install flask pika faker

生产者代码

生产者部分主要负责生成随机数据并通过RabbitMQ发送出去。这里我们使用Faker库来生成看起来真实的数据。

# -*- coding: utf-8 -*-
# @Time    : 2024/11/24 10:20
# @Author  : 南宫乘风
# @Email   : 1794748404@qq.com
# @File    : test.py
# @Software: PyCharm
from faker import Faker
import pika
import json
import time

# 初始化 Faker 实例
fake = Faker()

# 配置 RabbitMQ 连接
connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.82.105',heartbeat=60))
channel = connection.channel()

# 声明一个队列
queue_name = 'ownit_queue'
channel.queue_declare(queue=queue_name)

# 生成并发送假数据
def generate_fake_data():
    return {
        "name": fake.name(),
        "address": fake.address(),
        "email": fake.email(),
        "phone": fake.phone_number(),
        "company": fake.company(),
        "date": fake.date_this_year().isoformat(),
        "text": fake.text(max_nb_chars=200),
    }

try:
    for _ in range(10000):  # 生成 1000 条假数据
        fake_data = generate_fake_data()
        channel.basic_publish(
            exchange='',
            routing_key=queue_name,
            body=json.dumps(fake_data)  # 将数据序列化为 JSON 格式
        )
        # time.sleep(0.1)
        print(f"Sent: {fake_data}")
finally:
    connection.close()

执行插入10000条数据
在这里插入图片描述
在这里插入图片描述

数据持久化

重启docker 服务。让mq重启

docker restart 9b3a9355fa4a

因为重启,队列没有持久化,导致数据丢失
在这里插入图片描述

为了确保消息不会丢失,可以配置 RabbitMQ 的队列为持久化队列,即使 RabbitMQ 宕机或重启,队列中的消息也能被恢复。

channel.queue_declare(queue='ownit_queue', durable=True)



 properties=pika.BasicProperties(delivery_mode=2)

在这里插入图片描述
咦,还是没有数据?为什么?
因为使用Dokcer启动没有持久数据,重启会丢失数据,就算我们mq做持久化也不起作用。
在这里插入图片描述

docker stop  9b3a9355fa4a && docker rm  9b3a9355fa4a

持久化命令

docker run -it -d \
  --name rabbitmq \
  -p 5672:5672 \
  -p 15672:15672 \
  -v rabbitmq_data:/var/lib/rabbitmq/mnesia \
  docker.cloudimages.asia/rabbitmq:4.0-management

解释:

  • -it -d:以交互模式启动并在后台运行容器。
  • --name rabbitmq:给容器指定一个名字 rabbitmq
  • -p 5672:5672:映射 RabbitMQ 的默认 AMQP 协议端口(5672)到宿主机。
  • -p 15672:15672:映射 RabbitMQ 的管理界面端口(15672)到宿主机。
  • -v rabbitmq_data:/var/lib/rabbitmq/mnesia:将宿主机的 Docker 卷 rabbitmq_data 持久化到容器内的 /var/lib/rabbitmq/mnesia 目录,这是 RabbitMQ 默认存储队列和消息数据的地方。
    在这里插入图片描述
    使用宿主机目录
docker run -it -d \
  --name rabbitmq \
  -p 5672:5672 \
  -p 15672:15672 \
  -v /root/rabbitmq:/var/lib/rabbitmq \
  docker.cloudimages.asia/rabbitmq:4.0-management

[root@prometheus-server ~]# docker restart 9e200cf168c3
9e200cf168c3

在这里插入图片描述

关键点解析

  • Faker:用于生成虚拟数据。每次调用 generate_fake_data 函数时都会生成不同的姓名、地址、邮箱等信息。
  • RabbitMQ 连接:我们使用 pika 库与 RabbitMQ 进行连接,并声明了一个队列 ownit_queue,用于存储消息。
  • 数据发布:使用 channel.basic_publish() 方法将消息发布到指定的队列中,消息体使用 json.dumps() 序列化为 JSON 格式。

持久化完整代码

# -*- coding: utf-8 -*-
# @Time    : 2024/11/24 10:20
# @Author  : 南宫乘风
# @Email   : 1794748404@qq.com
# @File    : test.py
# @Software: PyCharm
from faker import Faker
import pika
import json
import time

# 初始化 Faker 实例
fake = Faker()

# 配置 RabbitMQ 连接
connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.82.105',heartbeat=60))
channel = connection.channel()

# 声明一个队列
queue_name = 'ownit_queue'
channel.queue_declare(queue=queue_name, durable=True)

# 生成并发送假数据
def generate_fake_data():
    return {
        "name": fake.name(),
        "address": fake.address(),
        "email": fake.email(),
        "phone": fake.phone_number(),
        "company": fake.company(),
        "date": fake.date_this_year().isoformat(),
        "text": fake.text(max_nb_chars=200),
    }

try:
    for _ in range(1000):  # 生成 1000 条假数据
        fake_data = generate_fake_data()
        channel.basic_publish(
            exchange='',
            routing_key=queue_name,
            body=json.dumps(fake_data),  # 将数据序列化为 JSON 格式
            properties=pika.BasicProperties(delivery_mode=2)
        )
        # time.sleep(0.1)
        print(f"Sent: {fake_data}")
finally:
    connection.close()

消费者:从 RabbitMQ 中获取数据

消费者部分由Flask应用托管,它不仅提供了API接口,还启动了一个后台线程持续监听RabbitMQ上的消息。
消费者的任务是从队列中读取消息,并进行处理。在这个例子中,我们将模拟一个简单的消息消费过程,打印接收到的数据。

import json
import threading
import time

import pika
from flask import Flask, request

app = Flask(__name__)


@app.route('/', methods=['GET'])
def send_order():
    return 'Hello, World! MQ'



# 消费者函数
def consume():
    # 创建与 RabbitMQ 服务器的连接
    connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.82.105'))
    # 从连接中创建一个通道
    channel = connection.channel()

    # 声明一个名为 order_queue 的队列,如果队列不存在则创建它
    channel.queue_declare(queue='ownit_queue',durable=True)

    # 定义一个回调函数,用于处理接收到的消息
    def callback(ch, method, properties, body):
        # 打印接收到的消息体
        print(f"数据接受: {body}")
        time.sleep(1)

    # 配置通道以消费来自 order_queue 的消息,指定回调函数处理消息,并设置自动确认消息
    channel.basic_consume(queue='ownit_queue', on_message_callback=callback, auto_ack=True)
    # 打印消息表示程序正在等待接收消息,并提示用户按 CTRL+C 退出
    print('Waiting for messages. To exit press CTRL+C')
    # 开始一个循环以持续接收消息
    channel.start_consuming()


# 启动消费者线程
def run_consumer():
    thread = threading.Thread(target=consume, daemon=True)  # 设置守护线程
    thread.start()
    print("Consumer thread started.")


if __name__ == '__main__':
    run_consumer()
    app.run(debug=True)



在这里插入图片描述

在这里插入图片描述

关键点解析

  • consume() 函数:通过 pika 连接 RabbitMQ,声明 ownit_queue 队列,并通过回调函数 callback 处理接收到的消息。
  • 线程化消费:为了使 Flask 应用能够正常处理 Web 请求,同时也能处理消息队列中的消息,我们将消息消费部分放在一个单独的线程中运行。
  • auto_ack=True:自动确认消息,表示一旦消费者接收到消息后会自动从队列中删除该消息。

持续发送数据

每秒接收1条数据
在这里插入图片描述
没消费的数据一直在MQ中
在这里插入图片描述

总结

结合 FlaskRabbitMQ 构建一个高效的消息队列系统,从假数据的生成到数据的消费处理,整个过程都得到了详细展示。在实现过程中,我们主要涵盖了以下内容:

  1. RabbitMQ 的安装与配置:了解了如何通过 RabbitMQ 管理消息队列,以及如何与 Python 进行交互。
  2. 生产者的实现:使用 Faker 库生成假数据,并将其发布到 RabbitMQ 队列中。通过 pika 库与 RabbitMQ 进行连接,确保数据能够被成功发送到队列中。
  3. 消费者的实现:通过 Flask 启动一个独立的消费者线程,从 RabbitMQ 队列中获取数据并进行处理。我们还讨论了如何在 Flask 应用中嵌入多线程操作,保证 Web 应用的响应性和消息处理的高效性。
  4. 消息确认和持久化:我们讨论了消息的持久化和确认机制,这对于生产环境中的高可用性和数据安全性至关重要。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2249003.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Linux:文件管理(一)——文件描述符fd

目录 一、文件基础认识 二、C语言操作文件的接口 1.> 和 >> 2.理解“当前路径” 三、相关系统调用 1.open 2.文件描述符 3.一切皆文件 4.再次理解重定向 一、文件基础认识 文件 内容 属性。换句话说,如果在电脑上新建了一个空白文档&#xff0…

机器学习模型——线性回归

文章目录 前言1.基础概念2.代价函数3.单变量线性回归3.1加载数据3.2初始化超参数3.3梯度下降算法3.3.1初次梯度下降3.3.2 多次梯度下降3.3.3结果可视化 前言 随着互联网数据不断累积,硬件不断升级迭代,在这个信息爆炸的时代,机器学习已被应用…

如何安全高效地打开和管理动态链接库(DLL)?系统提示dll丢失问题的多种有效修复指南

动态链接库(DLL)文件是Windows操作系统中非常重要的一部分,它们包含了程序运行所需的代码和数据。当系统提示DLL文件丢失时,可能会导致应用程序无法正常运行。以下是一些安全高效地打开和管理DLL文件以及修复DLL丢失问题的方法&am…

数据结构(初阶7)---七大排序法(堆排序,快速排序,归并排序,希尔排序,冒泡排序,选择排序,插入排序)(详解)

排序 1.插入排序2.希尔排序3.冒泡排序4.选择排序(双头排序优化版)5.堆排序6.快速排序1). 双指针法2).前后指针法3).非递归法 7.归并排序1).递归版本(递归的回退就是归并)2).非递归版本(迭代版本) 计算机执行的最多的操作之一就有排序,排序是一项极其重要的技能 接下…

【JavaEE初阶 — 网络原理】初识网络原理

目录 1. 网络发展史 1.1 独立模式 1.2 网络互连 1.2.1 网络互联的背景 1.2.2 网络互联的定义 1.3 局域网LAN 1.4 广域网WAN 2. 网络通信基础 2.1 IP地址 2.2 端口号 2.3 认识协议 2.4 五元组 2.5 协议分层 2.5.1 分…

【C++习题】15.滑动窗口_串联所有单词的子串

文章目录 题目链接&#xff1a;题目描述&#xff1a;解法C 算法代码&#xff1a;图解 题目链接&#xff1a; 30. 串联所有单词的子串 题目描述&#xff1a; 解法 滑动窗口哈希表 这题和第14题不同的是&#xff1a; 哈希表不同&#xff1a;hash<string,int>left与right指…

【学术讲座】视觉计算中的深度学习方法 AIGC图像视频生成模型的推理加速

视觉计算中的深度学习方法 发展历程 backbone 强化学习、LLM等&#xff1a;有监督 && 无监督的结合 目标检测 图像分割 网络结构搜索 搜索方法 1&#xff1a;强化学习 2&#xff1a;强化学习 3&#xff1a;梯度算法 结构选择的作用 1&#xff1a;开放环境感知网络…

【VLANPWN】一款针对VLAN的安全研究和渗透测试工具

关于VLANPWN VLANPWN是一款针对VLAN的安全研究和渗透测试工具&#xff0c;该工具可以帮助广大研究人员通过对VLAN执行渗透测试&#xff0c;来研究和分析目标VLAN的安全状况。该工具专为红队研究人员和安全学习爱好者设计&#xff0c;旨在训练网络工程师提升网络的安全性能&…

机器学习之数据预处理理论——基于表格数据分析

一、机器学习中数据预处理的作用与目的 对于机器学习而言&#xff0c;数据预处理是指在数据挖掘、数据分析、模型构建训练等过程中&#xff0c;对原始数据进行一系列的处理&#xff0c;以提高数据质量、减少噪声、提取有用信息等。数据预处理的主要目的是将原始数据转换为有用的…

如何写出好证明(支持思想的深入数学写作)

不断的修改和精炼是写作过程中的重要环节&#xff0c;数学写作最终目的是提供对问题的深刻洞察而非仅仅陈述细节。 根据harvey mudd college Francis Su教授的《GUIDELINES FOR GOOD MATHEMATICAL WRITING》讲稿&#xff0c;总结出撰写好的数学证明需要注意以下几个要点&#x…

中英双语介绍DeepSpeed 的 ZeRO 优化

DeepSpeed 的 ZeRO 优化&#xff1a;通俗易懂的原理与实践指南 引言 在深度学习的大规模模型训练中&#xff0c;显存瓶颈是常见的挑战。DeepSpeed 提供了革命性的 ZeRO (Zero Redundancy Optimizer) 优化技术&#xff0c;为大模型训练节省显存、提高效率提供了强有力的工具。…

如何将 GitHub 私有仓库(private)转换为公共仓库(public)

文章目录 如何将 GitHub 私有仓库转换为公共仓库步骤 1: 登录 GitHub步骤 2: 导航到目标仓库步骤 3: 访问仓库设置步骤 4: 更改仓库可见性步骤 5: 确认更改步骤 6: 验证更改注意事项 如何将 GitHub 私有仓库转换为公共仓库 在软件开发领域&#xff0c;GitHub 是一个广受欢迎的…

【webrtc】 mediasoup中m77的IntervalBudget及其在AlrDetector的应用

IntervalBudget 用于带宽控制和流量整形 mediasoup中m77 代码的IntervalBudget ,版本比较老IntervalBudget 在特定时间间隔内的比特预算管理,从而实现带宽控制和流量整形。 一。 pacedsender 执行周期: 下一次执行的时间的动态可变的 int64_t PacedSender::TimeUntilNextPr…

Z2400023基于Java+Servlet+jsp+mysql的酒店管理系统的设计与实现 源码 调试 文档

酒店管理系统的设计与实现 1.摘要2.主要功能3. 项目技术栈运行环境 4.系统界面截图5.源码获取 1.摘要 本文介绍了一个基于Java的酒店管理系统&#xff0c;该系统采用Servlet、JSP、JDBC以及c3p0等技术构建&#xff0c;为酒店提供了一个全面的管理平台。该系统不仅适合酒店进行…

《操作系统 - 清华大学》5 -5:缺页异常

文章目录 1. 缺页异常的处理流程2.在何处保存未被映射的页&#xff1f;3. 虚拟内存性能 1. 缺页异常的处理流程 缺页中断的处理过程: CPU读内存单元&#xff0c;在TLB中根据其虚拟地址匹配物理地址&#xff0c;未命中&#xff0c;读页表; 由于页表项的存在位为0&#xff0c;CP…

C++:多态的原理

目录 一、多态的原理 1.虚函数表 2.多态的原理 二、单继承和多继承的虚函数表 1、单继承中的虚函数表 2、多继承中的虚函数表 一、多态的原理 1.虚函数表 首先我们创建一个使用了多态的类&#xff0c;创建一个对象来看其内部的内容&#xff1a; #include<iostre…

Ubuntu 硬盘分区并挂载

一、什么是挂载 1.挂载的定义 在 Ubuntu&#xff08;或其他 Linux 系统&#xff09;中&#xff0c;挂载&#xff08;Mount&#xff09; 是将一个存储设备或分区连接到系统的文件系统层次结构中的过程。挂载后&#xff0c;你可以通过某个目录&#xff08;挂载点&#xff09;访问…

python-docx -- 读取word页眉、页脚

文章目录 sections介绍访问section添加section页眉、页脚综合案例:sections介绍 word支持section的概念,即一个文档的划分部分,不同的部分均包含相同的页面布局设置,如相同的边距、页面方向等;在每个section中可以定义页眉、页脚来应用于该section下的所有页面;大部分wor…

开源加密库mbedtls及其Windows编译库

目录 1 项目简介 2 功能特性 3 性能优势 4 平台兼容性 5 应用场景 6 特点 7 Windows编译 8 编译静态库及其测试示例下载 1 项目简介 Mbed TLS是一个由ARM Maintained的开源项目&#xff0c;它提供了一个轻量级的加密库&#xff0c;适用于嵌入式系统和物联网设备。这个项…

《生成式 AI》课程 第7講:大型語言模型修練史 — 第二階段: 名師指點,發揮潛力 (兼談對 ChatGPT 做逆向工程與 LLaMA 時代的開始)

资料来自李宏毅老师《生成式 AI》课程&#xff0c;如有侵权请通知下线 Introduction to Generative AI 2024 Springhttps://speech.ee.ntu.edu.tw/~hylee/genai/2024-spring.php 摘要 这一系列的作业是为 2024 年春季的《生成式 AI》课程设计的&#xff0c;共包含十个作业。…