目录
关键点
技术实现1
技术实现2
摘要: 本文将介绍如何利用Python和ZeroMQ消息队列构建一个智能车辆状态监控系统。系统能够根据时间策略自动切换驾驶模式(自动驾驶、人工驾驶、远程驾驶、主动安全),并通过实时消息推送更新车辆状态信息。文章包含完整代码实现和ZeroMQ应用技巧。
关键点
# 深拷贝避免数据污染 避免直接修改原始配置文件
led_info = json.loads(json.dumps(original_data))
# 同步更新两个数据源
led_info['data']['driveMode'] = drive_control
vehicle_status['data']['driveMode'] = drive_control
技术实现1
#!/usr/bin/env python
import sys
import time
import zmq
import json
import random
from zmq import Poller
def main() -> None: #> None 表示此函数不返回任何值
global drive_control #声明 drive_control 为全局变量,这样可以在函数内部和外部访问它
drive_control = 1
if len(sys.argv) != 2:
print('Usage: publisher <bind-to>')
sys.exit(1)
#tcp://192.168.1.136:5582 发布
bind_to = sys.argv[1]
# 加载初始数据
with open('data.json', 'r', encoding='utf-8') as f:
data = json.load(f)['filter_auto_drive_info']
#使用 json.load() 方法读取 JSON 数据,将其中的 filter_auto_drive_info 部分加载到 data 变量中
# 分离LED和车辆状态数据 从 data 中提取 id 为 'led_info' 的项
led_info = next(item for item in data if item['id'] == 'led_info')
vehicle_status = next(item for item in data if item['id'] == 'vehicle_status_upload')
# ZeroMQ上下文
ctx = zmq.Context()
# 发布套接字
pub_socket = ctx.socket(zmq.PUB)
pub_socket.connect(bind_to)
# 订阅套接字
sub_socket = ctx.socket(zmq.SUB)
sub_socket.connect("tcp://192.168.1.144:5581") # 根据实际情况调整 订阅
sub_socket.setsockopt_string(zmq.SUBSCRIBE, 'loudspeaker_end')
#设置订阅过滤器,只订阅以 'loudspeaker_end' 为主题的消息
#创建一个 Poller 对象,用于检测多个套接字的可用性
#将订阅套接字注册到 poller,表示要监听 sub_socket 是否有消息可读
poller = Poller()
poller.register(sub_socket, zmq.POLLIN)
start_time = time.time()
received_end = False
try:
while True:
current_time = time.time()
elapsed = current_time - start_time
# 确定当前状态
if not received_end:
if elapsed < 5: #5s内 自动驾驶,亮绿灯
status = 1
elif elapsed < 10: # 全灭 人工驾驶
status = 2
drive_control = 0
elif elapsed < 15: # 远程驾驶,亮黄灯
status = 0
drive_control = 2
else:
drive_control = 4 # 主动安全,亮红灯
else:
status = random.randint(1, 7)
# 更新LED状态
led_info['data']['loudspeakerStatus'] = status
led_info['data']['driveMode'] = drive_control
# 发送LED信息并打印
#使用 json.dumps() 将 led_info 转换为 JSON 字符串
led_data = json.dumps(led_info)
send_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(current_time))
print(f"Sending LED data at {send_time}: {led_data}")
pub_socket.send_multipart([
b'filter_auto_drive_info',
led_data.encode('utf-8')
])
time.sleep(1) #0.1s,也即每100ms发送1次
# 发送车辆状态并打印
#将 vehicle_status 转换为 JSON 字符串并发送,每 100 毫秒发送一次
vehicle_data = json.dumps(vehicle_status)
print(f"Sending Vehicle data at {send_time}: {vehicle_data}")
pub_socket.send_multipart([
b'filter_auto_drive_info',
vehicle_data.encode('utf-8')
])
time.sleep(1)
# 检查订阅消息 使用 poller.poll(0) 检查是否有新消息
socks = dict(poller.poll(0))
if sub_socket in socks:
#如果有消息,读取并处理它,检查是否是 'loudspeaker_end' 主题的消息。如果是,则打印终止信号并设置 received_end = True
while True:
try:
msg = sub_socket.recv_multipart(zmq.NOBLOCK)
message = json.loads(msg[1].decode('utf-8'))
# 打印订阅到的消息
print(f"Received message on topic '{msg[0].decode()}': {json.dumps(message, indent=2)}")
# 处理loudspeaker_end的消息
if message.get('id') == 'loudspeaker_end':
speaker_id = message.get('data', {}).get('speakerId')
print(f"Received termination signal, speakerId: {speaker_id} (This indicates that the sound has finished playing).")
received_end = True
except zmq.Again:
break
#捕获 KeyboardInterrupt(例如用户按 Ctrl+C),打印消息并退出程序
except KeyboardInterrupt:
print("\nShutting down...")
finally:
pub_socket.close()
sub_socket.close()
ctx.term()
if __name__ == "__main__":
main()
"""
try 块用来包裹可能抛出异常的代码。
except 可以用来捕获并处理特定的异常。
finally 块中的代码无论如何都会被执行,通常用于清理操作。
finally 语句确保了无论 try 代码块是否有异常抛出,都会执行其中的代码,因此它非常适合资源管理(如文件、数据库连接的关闭等)。
"""
技术实现2
#!/usr/bin/env python
import sys
import time
import zmq
import json
import random
from zmq import Poller
def main() -> None:
global drive_control
drive_control = 1
if len(sys.argv) != 2:
print('Usage: publisher <bind-to>')
sys.exit(1)
bind_to = sys.argv[1]
# 加载初始数据
with open('data.json', 'r', encoding='utf-8') as f:
full_data = json.load(f)
# 提取原始数据结构
data = full_data['filter_auto_drive_info']
# 创建深拷贝避免修改原始数据
led_info = json.loads(json.dumps(next(item for item in data if item['id'] == 'led_info')))
vehicle_status = json.loads(json.dumps(next(item for item in data if item['id'] == 'vehicle_status_upload')))
# ZeroMQ上下文
ctx = zmq.Context()
# 发布套接字
pub_socket = ctx.socket(zmq.PUB)
pub_socket.connect(bind_to)
# 订阅套接字
sub_socket = ctx.socket(zmq.SUB)
sub_socket.connect("tcp://192.168.1.144:5581")
sub_socket.setsockopt_string(zmq.SUBSCRIBE, 'loudspeaker_end')
poller = Poller()
poller.register(sub_socket, zmq.POLLIN)
start_time = time.time()
received_end = False
mode_change_time = time.time() # 记录模式切换时间
try:
while True:
current_time = time.time()
elapsed = current_time - start_time
mode_elapsed = current_time - mode_change_time
# 状态机逻辑
if not received_end:
if mode_elapsed < 5: # 自动驾驶(绿灯)
status = 1
drive_control = 1
elif mode_elapsed < 10: # 人工驾驶(全灭)
status = 2
drive_control = 0
elif mode_elapsed < 15: # 远程驾驶(黄灯)
status = 0
drive_control = 2
else: # 主动安全(红灯)
drive_control = 4
status = 3
# 重置计时器进入循环
mode_change_time = time.time()
else:
# 接收到结束信号后保持主动安全模式
drive_control = 4
status = random.randint(1, 7) # 随机状态演示
# 更新数据结构
led_info['data']['driveMode'] = drive_control
led_info['data']['loudspeakerStatus'] = status
#vehicle_status['data']['driveMode'] = drive_control
# 发送LED信息
led_data = json.dumps(led_info)
send_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(current_time))
print(f"[{send_time}] LED状态 | 模式: {drive_control} | 状态: {status}")
pub_socket.send_multipart([
b'filter_auto_drive_info',
led_data.encode('utf-8')
])
# 发送车辆状态
vehicle_data = json.dumps(vehicle_status)
print(f"[{send_time}] 车辆状态 | 模式: {drive_control}")
pub_socket.send_multipart([
b'filter_auto_drive_info',
vehicle_data.encode('utf-8')
])
time.sleep(1) # 1000ms发送间隔
# 检查订阅消息
socks = dict(poller.poll(0))
if sub_socket in socks:
while True:
try:
msg = sub_socket.recv_multipart(zmq.NOBLOCK)
message = json.loads(msg[1].decode('utf-8'))
if message.get('id') == 'loudspeaker_end':
speaker_id = message.get('data', {}).get('speakerId')
print(f"\n[!] 收到终止信号 | 设备ID: {speaker_id}")
print("[系统] 切换到主动安全模式(4)")
received_end = True
drive_control = 4 # 强制切换到主动安全模式
except zmq.Again:
break
except KeyboardInterrupt:
print("\n[系统] 程序关闭")
finally:
pub_socket.close()
sub_socket.close()
ctx.term()
if __name__ == "__main__":
main()
其中,所需的data.json文件如下
{
"filter_auto_drive_info": [
{
"id": "led_info",
"data":{
"driveMode":0,
"workStatus": 4,
"vehicleStatus": 3,
"arrowStatus": 0,
"vehicleSpeed": 100,
"loudspeakerStatus": 1,
"faultStatus": 0
}
},
{
"id": "vehicle_status_upload",
"data":{
"ch4Concentration": 444,
"coConcentration": 123,
"powerOnStatus": 1,
"soc": 99,
"mileageAccrual": 234
}
}]
}
通过运行python ./test.py tcp://192.168.1.137:5582 即可运行 (zmq发布端连接)
结语: 本系统展示了如何利用ZeroMQ的发布-订阅模式构建实时车辆监控系统。通过状态机设计和深度数据同步,实现了驾驶模式的智能切换。