Python+ZeroMQ实战:智能车辆状态监控与模拟模式自动切换

news2025/6/12 6:18:42

目录

关键点

技术实现1

技术实现2


摘要: 本文将介绍如何利用Python和ZeroMQ消息队列构建一个智能车辆状态监控系统。系统能够根据时间策略自动切换驾驶模式(自动驾驶、人工驾驶、远程驾驶、主动安全),并通过实时消息推送更新车辆状态信息。文章包含完整代码实现和ZeroMQ应用技巧。

关键点

# 深拷贝避免数据污染 避免直接修改原始配置文件
led_info = json.loads(json.dumps(original_data))

# 同步更新两个数据源
led_info['data']['driveMode'] = drive_control
vehicle_status['data']['driveMode'] = drive_control

技术实现1

#!/usr/bin/env python
import sys
import time
import zmq
import json
import random
from zmq import Poller

def main() -> None:  #> None 表示此函数不返回任何值
    global drive_control   #声明 drive_control 为全局变量,这样可以在函数内部和外部访问它
    drive_control = 1
    if len(sys.argv) != 2:
        print('Usage: publisher <bind-to>')
        sys.exit(1)
#tcp://192.168.1.136:5582  发布
    bind_to = sys.argv[1]

    # 加载初始数据
    with open('data.json', 'r', encoding='utf-8') as f:
        data = json.load(f)['filter_auto_drive_info']
        #使用 json.load() 方法读取 JSON 数据,将其中的 filter_auto_drive_info 部分加载到 data 变量中

    # 分离LED和车辆状态数据 从 data 中提取 id 为 'led_info' 的项
    led_info = next(item for item in data if item['id'] == 'led_info')
    vehicle_status = next(item for item in data if item['id'] == 'vehicle_status_upload')

    # ZeroMQ上下文
    ctx = zmq.Context()

    # 发布套接字
    pub_socket = ctx.socket(zmq.PUB)
    pub_socket.connect(bind_to)

    # 订阅套接字
    sub_socket = ctx.socket(zmq.SUB)
    sub_socket.connect("tcp://192.168.1.144:5581")  # 根据实际情况调整 订阅
    sub_socket.setsockopt_string(zmq.SUBSCRIBE, 'loudspeaker_end')
    #设置订阅过滤器,只订阅以 'loudspeaker_end' 为主题的消息
    #创建一个 Poller 对象,用于检测多个套接字的可用性
    #将订阅套接字注册到 poller,表示要监听 sub_socket 是否有消息可读
    poller = Poller()
    poller.register(sub_socket, zmq.POLLIN)

    start_time = time.time()
    received_end = False

    try:
        while True:
            current_time = time.time()
            elapsed = current_time - start_time

            # 确定当前状态
            if not received_end:
                if elapsed < 5:   #5s内  自动驾驶,亮绿灯
                    status = 1
                elif elapsed < 10:  # 全灭 人工驾驶
                    status = 2
                    drive_control = 0
                elif elapsed < 15:  # 远程驾驶,亮黄灯
                    status = 0
                    drive_control = 2
                else:
                    drive_control = 4  # 主动安全,亮红灯
            else:
                status = random.randint(1, 7)

            # 更新LED状态
            led_info['data']['loudspeakerStatus'] = status
            led_info['data']['driveMode'] = drive_control
            # 发送LED信息并打印
            #使用 json.dumps() 将 led_info 转换为 JSON 字符串
            led_data = json.dumps(led_info)
            send_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(current_time))
            print(f"Sending LED data at {send_time}: {led_data}")
            pub_socket.send_multipart([
                b'filter_auto_drive_info',
                led_data.encode('utf-8')
            ])
            time.sleep(1)  #0.1s,也即每100ms发送1次

            # 发送车辆状态并打印
            #将 vehicle_status 转换为 JSON 字符串并发送,每 100 毫秒发送一次
            vehicle_data = json.dumps(vehicle_status)
            print(f"Sending Vehicle data at {send_time}: {vehicle_data}")
            pub_socket.send_multipart([
                b'filter_auto_drive_info',
                vehicle_data.encode('utf-8')
            ])
            time.sleep(1)

            # 检查订阅消息 使用 poller.poll(0) 检查是否有新消息
            socks = dict(poller.poll(0))
            if sub_socket in socks:
                #如果有消息,读取并处理它,检查是否是 'loudspeaker_end' 主题的消息。如果是,则打印终止信号并设置 received_end = True
                while True:
                    try:
                        msg = sub_socket.recv_multipart(zmq.NOBLOCK)
                        message = json.loads(msg[1].decode('utf-8'))

                        # 打印订阅到的消息
                        print(f"Received message on topic '{msg[0].decode()}': {json.dumps(message, indent=2)}")

                        # 处理loudspeaker_end的消息
                        if message.get('id') == 'loudspeaker_end':
                            speaker_id = message.get('data', {}).get('speakerId')
                            print(f"Received termination signal, speakerId: {speaker_id} (This indicates that the sound has finished playing).")
                            received_end = True
                    except zmq.Again:
                        break
    #捕获 KeyboardInterrupt(例如用户按 Ctrl+C),打印消息并退出程序
    except KeyboardInterrupt:
        print("\nShutting down...")
    finally:
        pub_socket.close()
        sub_socket.close()
        ctx.term()

if __name__ == "__main__":
    main()

"""
try 块用来包裹可能抛出异常的代码。
except 可以用来捕获并处理特定的异常。
finally 块中的代码无论如何都会被执行,通常用于清理操作。
finally 语句确保了无论 try 代码块是否有异常抛出,都会执行其中的代码,因此它非常适合资源管理(如文件、数据库连接的关闭等)。
"""

技术实现2

#!/usr/bin/env python
import sys
import time
import zmq
import json
import random
from zmq import Poller

def main() -> None:
    global drive_control
    drive_control = 1
    if len(sys.argv) != 2:
        print('Usage: publisher <bind-to>')
        sys.exit(1)

    bind_to = sys.argv[1]

    # 加载初始数据
    with open('data.json', 'r', encoding='utf-8') as f:
        full_data = json.load(f)
        # 提取原始数据结构
        data = full_data['filter_auto_drive_info']
        # 创建深拷贝避免修改原始数据
        led_info = json.loads(json.dumps(next(item for item in data if item['id'] == 'led_info')))
        vehicle_status = json.loads(json.dumps(next(item for item in data if item['id'] == 'vehicle_status_upload')))

    # ZeroMQ上下文
    ctx = zmq.Context()

    # 发布套接字
    pub_socket = ctx.socket(zmq.PUB)
    pub_socket.connect(bind_to)

    # 订阅套接字
    sub_socket = ctx.socket(zmq.SUB)
    sub_socket.connect("tcp://192.168.1.144:5581")
    sub_socket.setsockopt_string(zmq.SUBSCRIBE, 'loudspeaker_end')

    poller = Poller()
    poller.register(sub_socket, zmq.POLLIN)

    start_time = time.time()
    received_end = False
    mode_change_time = time.time()  # 记录模式切换时间

    try:
        while True:
            current_time = time.time()
            elapsed = current_time - start_time
            mode_elapsed = current_time - mode_change_time

            # 状态机逻辑
            if not received_end:
                if mode_elapsed < 5:   # 自动驾驶(绿灯)
                    status = 1
                    drive_control = 1
                elif mode_elapsed < 10:  # 人工驾驶(全灭)
                    status = 2
                    drive_control = 0
                elif mode_elapsed < 15:  # 远程驾驶(黄灯)
                    status = 0
                    drive_control = 2
                else:  # 主动安全(红灯)
                    drive_control = 4
                    status = 3
                    # 重置计时器进入循环
                    mode_change_time = time.time()
            else:
                # 接收到结束信号后保持主动安全模式
                drive_control = 4
                status = random.randint(1, 7)  # 随机状态演示

            # 更新数据结构
            led_info['data']['driveMode'] = drive_control
            led_info['data']['loudspeakerStatus'] = status
            #vehicle_status['data']['driveMode'] = drive_control

            # 发送LED信息
            led_data = json.dumps(led_info)
            send_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(current_time))
            print(f"[{send_time}] LED状态 | 模式: {drive_control} | 状态: {status}")
            pub_socket.send_multipart([
                b'filter_auto_drive_info',
                led_data.encode('utf-8')
            ])

            # 发送车辆状态
            vehicle_data = json.dumps(vehicle_status)
            print(f"[{send_time}] 车辆状态 | 模式: {drive_control}")
            pub_socket.send_multipart([
                b'filter_auto_drive_info',
                vehicle_data.encode('utf-8')
            ])

            time.sleep(1)  # 1000ms发送间隔

            # 检查订阅消息
            socks = dict(poller.poll(0))
            if sub_socket in socks:
                while True:
                    try:
                        msg = sub_socket.recv_multipart(zmq.NOBLOCK)
                        message = json.loads(msg[1].decode('utf-8'))

                        if message.get('id') == 'loudspeaker_end':
                            speaker_id = message.get('data', {}).get('speakerId')
                            print(f"\n[!] 收到终止信号 | 设备ID: {speaker_id}")
                            print("[系统] 切换到主动安全模式(4)")
                            received_end = True
                            drive_control = 4  # 强制切换到主动安全模式
                    except zmq.Again:
                        break
    except KeyboardInterrupt:
        print("\n[系统] 程序关闭")
    finally:
        pub_socket.close()
        sub_socket.close()
        ctx.term()

if __name__ == "__main__":
    main()

其中,所需的data.json文件如下

{
"filter_auto_drive_info": [
{
"id": "led_info",
"data":{
"driveMode":0,
"workStatus": 4,
"vehicleStatus": 3,
"arrowStatus": 0,
"vehicleSpeed": 100,
"loudspeakerStatus": 1,
"faultStatus": 0
}
},
{
"id": "vehicle_status_upload",
"data":{
"ch4Concentration": 444,
"coConcentration": 123,
"powerOnStatus": 1,
"soc": 99,
"mileageAccrual": 234
}
}]
}

通过运行python ./test.py tcp://192.168.1.137:5582 即可运行 (zmq发布端连接)

结语: 本系统展示了如何利用ZeroMQ的发布-订阅模式构建实时车辆监控系统。通过状态机设计和深度数据同步,实现了驾驶模式的智能切换。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2407173.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Razor编程中@Html的方法使用大全

文章目录 1. 基础HTML辅助方法1.1 Html.ActionLink()1.2 Html.RouteLink()1.3 Html.Display() / Html.DisplayFor()1.4 Html.Editor() / Html.EditorFor()1.5 Html.Label() / Html.LabelFor()1.6 Html.TextBox() / Html.TextBoxFor() 2. 表单相关辅助方法2.1 Html.BeginForm() …

Windows安装Miniconda

一、下载 https://www.anaconda.com/download/success 二、安装 三、配置镜像源 Anaconda/Miniconda pip 配置清华镜像源_anaconda配置清华源-CSDN博客 四、常用操作命令 Anaconda/Miniconda 基本操作命令_miniconda创建环境命令-CSDN博客

iview框架主题色的应用

1.下载 less要使用3.0.0以下的版本 npm install less2.7.3 npm install less-loader4.0.52./src/config/theme.js文件 module.exports {yellow: {theme-color: #FDCE04},blue: {theme-color: #547CE7} }在sass中使用theme配置的颜色主题&#xff0c;无需引入&#xff0c;直接可…

Golang——6、指针和结构体

指针和结构体 1、指针1.1、指针地址和指针类型1.2、指针取值1.3、new和make 2、结构体2.1、type关键字的使用2.2、结构体的定义和初始化2.3、结构体方法和接收者2.4、给任意类型添加方法2.5、结构体的匿名字段2.6、嵌套结构体2.7、嵌套匿名结构体2.8、结构体的继承 3、结构体与…

三分算法与DeepSeek辅助证明是单峰函数

前置 单峰函数有唯一的最大值&#xff0c;最大值左侧的数值严格单调递增&#xff0c;最大值右侧的数值严格单调递减。 单谷函数有唯一的最小值&#xff0c;最小值左侧的数值严格单调递减&#xff0c;最小值右侧的数值严格单调递增。 三分的本质 三分和二分一样都是通过不断缩…

宇树科技,改名了!

提到国内具身智能和机器人领域的代表企业&#xff0c;那宇树科技&#xff08;Unitree&#xff09;必须名列其榜。 最近&#xff0c;宇树科技的一项新变动消息在业界引发了不少关注和讨论&#xff0c;即&#xff1a; 宇树向其合作伙伴发布了一封公司名称变更函称&#xff0c;因…

【网络安全】开源系统getshell漏洞挖掘

审计过程&#xff1a; 在入口文件admin/index.php中&#xff1a; 用户可以通过m,c,a等参数控制加载的文件和方法&#xff0c;在app/system/entrance.php中存在重点代码&#xff1a; 当M_TYPE system并且M_MODULE include时&#xff0c;会设置常量PATH_OWN_FILE为PATH_APP.M_T…

uniapp 开发ios, xcode 提交app store connect 和 testflight内测

uniapp 中配置 配置manifest 文档&#xff1a;manifest.json 应用配置 | uni-app官网 hbuilderx中本地打包 下载IOS最新SDK 开发环境 | uni小程序SDK hbulderx 版本号&#xff1a;4.66 对应的sdk版本 4.66 两者必须一致 本地打包的资源导入到SDK 导入资源 | uni小程序SDK …

Git 3天2K星标:Datawhale 的 Happy-LLM 项目介绍(附教程)

引言 在人工智能飞速发展的今天&#xff0c;大语言模型&#xff08;Large Language Models, LLMs&#xff09;已成为技术领域的焦点。从智能写作到代码生成&#xff0c;LLM 的应用场景不断扩展&#xff0c;深刻改变了我们的工作和生活方式。然而&#xff0c;理解这些模型的内部…

GO协程(Goroutine)问题总结

在使用Go语言来编写代码时&#xff0c;遇到的一些问题总结一下 [参考文档]&#xff1a;https://www.topgoer.com/%E5%B9%B6%E5%8F%91%E7%BC%96%E7%A8%8B/goroutine.html 1. main()函数默认的Goroutine 场景再现&#xff1a; 今天在看到这个教程的时候&#xff0c;在自己的电…

C++ 设计模式 《小明的奶茶加料风波》

&#x1f468;‍&#x1f393; 模式名称&#xff1a;装饰器模式&#xff08;Decorator Pattern&#xff09; &#x1f466; 小明最近上线了校园奶茶配送功能&#xff0c;业务火爆&#xff0c;大家都在加料&#xff1a; 有的同学要加波霸 &#x1f7e4;&#xff0c;有的要加椰果…

【MATLAB代码】基于最大相关熵准则(MCC)的三维鲁棒卡尔曼滤波算法(MCC-KF),附源代码|订阅专栏后可直接查看

文章所述的代码实现了基于最大相关熵准则(MCC)的三维鲁棒卡尔曼滤波算法(MCC-KF),针对传感器观测数据中存在的脉冲型异常噪声问题,通过非线性加权机制提升滤波器的抗干扰能力。代码通过对比传统KF与MCC-KF在含异常值场景下的表现,验证了后者在状态估计鲁棒性方面的显著优…

代码规范和架构【立芯理论一】(2025.06.08)

1、代码规范的目标 代码简洁精炼、美观&#xff0c;可持续性好高效率高复用&#xff0c;可移植性好高内聚&#xff0c;低耦合没有冗余规范性&#xff0c;代码有规可循&#xff0c;可以看出自己当时的思考过程特殊排版&#xff0c;特殊语法&#xff0c;特殊指令&#xff0c;必须…

解读《网络安全法》最新修订,把握网络安全新趋势

《网络安全法》自2017年施行以来&#xff0c;在维护网络空间安全方面发挥了重要作用。但随着网络环境的日益复杂&#xff0c;网络攻击、数据泄露等事件频发&#xff0c;现行法律已难以完全适应新的风险挑战。 2025年3月28日&#xff0c;国家网信办会同相关部门起草了《网络安全…

计算机基础知识解析:从应用到架构的全面拆解

目录 前言 1、 计算机的应用领域&#xff1a;无处不在的数字助手 2、 计算机的进化史&#xff1a;从算盘到量子计算 3、计算机的分类&#xff1a;不止 “台式机和笔记本” 4、计算机的组件&#xff1a;硬件与软件的协同 4.1 硬件&#xff1a;五大核心部件 4.2 软件&#…

C# 表达式和运算符(求值顺序)

求值顺序 表达式可以由许多嵌套的子表达式构成。子表达式的求值顺序可以使表达式的最终值发生 变化。 例如&#xff0c;已知表达式3*52&#xff0c;依照子表达式的求值顺序&#xff0c;有两种可能的结果&#xff0c;如图9-3所示。 如果乘法先执行&#xff0c;结果是17。如果5…

打手机检测算法AI智能分析网关V4守护公共/工业/医疗等多场景安全应用

一、方案背景​ 在现代生产与生活场景中&#xff0c;如工厂高危作业区、医院手术室、公共场景等&#xff0c;人员违规打手机的行为潜藏着巨大风险。传统依靠人工巡查的监管方式&#xff0c;存在效率低、覆盖面不足、判断主观性强等问题&#xff0c;难以满足对人员打手机行为精…

[大语言模型]在个人电脑上部署ollama 并进行管理,最后配置AI程序开发助手.

ollama官网: 下载 https://ollama.com/ 安装 查看可以使用的模型 https://ollama.com/search 例如 https://ollama.com/library/deepseek-r1/tags # deepseek-r1:7bollama pull deepseek-r1:7b改token数量为409622 16384 ollama命令说明 ollama serve #&#xff1a…

逻辑回归暴力训练预测金融欺诈

简述 「使用逻辑回归暴力预测金融欺诈&#xff0c;并不断增加特征维度持续测试」的做法&#xff0c;体现了一种逐步建模与迭代验证的实验思路&#xff0c;在金融欺诈检测中非常有价值&#xff0c;本文作为一篇回顾性记录了早年间公司给某行做反欺诈预测用到的技术和思路。百度…

FFmpeg:Windows系统小白安装及其使用

一、安装 1.访问官网 Download FFmpeg 2.点击版本目录 3.选择版本点击安装 注意这里选择的是【release buids】&#xff0c;注意左上角标题 例如我安装在目录 F:\FFmpeg 4.解压 5.添加环境变量 把你解压后的bin目录&#xff08;即exe所在文件夹&#xff09;加入系统变量…