PyQt5 基于paho-mqtt库 实现MQTT通信
- paho-mqtt
 - 安装paho-mqtt库
 - 综合示例
 - 错误处理
 
paho-mqtt
paho-mqtt官网文档
安装paho-mqtt库
pip install paho-mqtt
综合示例
- 封装MQTT类
 - 订阅消息
 - 发布消息
 - 信号方式接收处理MQTT消息
 
import paho.mqtt.client as mqtt
import sys
import json
from PyQt5.QtGui import *
from PyQt5.QtCore import *
from PyQt5.QtWidgets import *
# 填写实际的MQTT相关参数
mqtt_host = "192.168.0.11"
mqtt_port = 1883
mqtt_client_id="sn_864423065869616"
mqtt_username = "xxxx"
mqtt_password = "xxxx"
mqtt_sub_topic = "topic/sub"
mqtt_pub_topic = "topic/pub"
# 封装一个MQTT客户端
class MqttClient(QObject):
    # 创建信号用于UI更新数据
    message_signal = pyqtSignal(str, str)
    def __init__(self, broker, port, client_id, protocol=mqtt.MQTTv311):
        super(MqttClient, self).__init__()
        self.broker = broker
        self.port = port
        self.client_id = client_id
        self.client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, client_id)
    
    def connect(self, username=None, password=None, keepalive=60):
        self.client.username_pw_set(username, password)
        self.client.connect(self.broker, self.port, keepalive)
    def subscribe(self, topic, qos=0):
        self.client.subscribe(topic, qos)
    def publish(self, topic, paylaod, qos=0, retain=False):
        self.client.publish(topic, paylaod, qos, retain)
    def on_connect(self, client, userdata, flags, reason_code, properties):
        if reason_code == 0:
            print("Connected to MQTT Broker!")
        else:
            print("Failed to connect, return code %\n", reason_code)
        
    def on_disconnect(self, client, userdata, flags, reason_code, properties):
        if reason_code == 0:
            # success disconnect
            print("Disconnect to MQTT Broker!")
        if reason_code > 0:
            # error processing
            print("Failed to disconnect, return code %\n", reason_code)
    def on_message(self, client, userdata, message):
        print("Received message: ", str(message.payload.decode("utf-8")))
        self.message_signal.emit(message.topic, message.payload.decode())   # 发射信号UI线程里处理更新数据
    def on_subscribe(self, client, userdata, mid, reason_codes, properties):
        for sub_result in reason_codes:
            if sub_result == 1:
                # process QoS == 1
                print("on_subscribe process QoS == 1")
            # Any reason code >= 128 is a failure.
            if sub_result >= 128:
                # error processing
                print("on_subscribe error processing。")
    def on_unsubscribe(client, userdata, mid, reason_codes, properties):
        # In NEW version, reason_codes is always a list. Empty for MQTTv3
        for unsub_result in reason_codes:
            # Any reason code >= 128 is a failure.
            if reason_codes[0] >= 128:
                # error processing
                print("on_unsubscribe error processing.")
    def on_publish(self, client, userdata, mid, reason_codes, properties):
        print('Public reason_codes %\n', reason_codes)
    def on_log(self, client, userdata, level, buf):
        print(buf)
    
    def start(self):
        self.client.on_connect = self.on_connect
        self.client.on_disconnect = self.on_disconnect
        self.client.on_subscribe = self.on_subscribe
        self.client.on_unsubscribe = self.on_unsubscribe
        self.client.on_publish = self.on_publish
        self.client.on_message = self.on_message
        self.client.on_log = self.on_log
        self.client.loop_start()
        
    def stop(self):
        self.client.loop_stop()
        self.client.disconnect()
class MainWindow(QMainWindow):
    def __init__(self, parent=None):
        super(MainWindow, self).__init__(parent)
        self.initUI()
        self.client = MqttClient(broker=mqtt_host,  port=mqtt_port, client_id=mqtt_client_id)
        self.client.connect(username=mqtt_username, password=mqtt_password, keepalive=60)
        self.client.subscribe(topic=mqtt_sub_topic, qos=0)
        self.client.message_signal.connect(self.update_ui)
        self.client.start()
    def initUI(self):
        self.setWindowTitle("MQTT测试工具")
        self.resize(800, 480)
        self.center()   # 窗口居中显示
        self.label_show = QLabel(self)
        self.label_show.setText("...")
        self.label_show.setStyleSheet("color:blue; font-size:20px;")
        self.btn_mqttpub = QPushButton(self)
        self.btn_mqttpub.setText("发布消息")
        self.btn_mqttpub.clicked.connect(lambda: self.publish_message())
        root = QVBoxLayout()        
        root.addWidget(self.label_show)
        root.addWidget(self.btn_mqttpub)
        
        mwidget = QWidget()
        mwidget.setLayout(root)
        self.setCentralWidget(mwidget)
    def center(self):
        screen = QDesktopWidget().screenGeometry()
        size = self.geometry()
        self.move((int)((screen.width()-size.width())/2), (int)((screen.height()-size.height())/2))
    def update_ui(self, topic, message):
        print('接收到的消息更新到UI显示')
        print(topic)
        print(message)
        self.label_show.setText(topic +" "+ message)
    
    def publish_message(self):
        print("发布消息")
        self.client.publish(topic=mqtt_pub_topic, paylaod="hello world", qos=0, retain=False)
        
    def closeEvent(self, event):
        # 重写closeEvent方法
        print('窗口关闭前执行的操作')
        self.client.stop() # 停止MQTT
        # 调用基类的closeEvent方法来执行关闭事件
        super().closeEvent(event)
if __name__ == "__main__":
    app = QApplication(sys.argv)    
    win = MainWindow()
    win.show()
    sys.exit(app.exec_())
 

错误处理
- 报错信息:
 
Unsupported callback API version: version 2.0 added a callback_api_version, see docs/migrations.rst for details
不支持的回调 API 版本:2.0 版本添加了一个callback_api_version,详情请参阅docs/migrations.rst
参考官方文档
-  
原因:回调参数不一致,2.0 版本更改了传递给用户回调的参数。回调的版本1已弃用,但在版本 2.x 中仍受支持。
 -  
解决方法:
- 方法1:
使用旧版本的回调。需告诉 paho-mqtt 您选择此版本即可,修改如下代码:
但是这种方法每次运行的时候,会出现以下警告:from paho.mqtt import client as mqtt # OLD code client = mqtt.Client(client_id) # NEW code client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION1, client_id)
DeprecationWarning: Callback API version 1 is deprecated, update to latest version - 方法2:需要修改两处
1)在创建client对象时,新增参数:mqtt_client.CallbackAPIVersion.VERSION2
2)在on_connect()函数中,新增参数:propertiesfrom paho.mqtt import client as mqtt # OLD code client = mqtt.Client(client_id) # NEW code client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, client_id) def on_connect(client, userdata, flags, rc, properties): if rc == 0: print("Connected to MQTT Broker!") else: print("Failed to connect,return code {}".format(rc)) client.on_connect = on_connect - 方法3:降低paho-mqtt版本号到1.x版本 
    
$ pip install paho-mqtt==1.6.1
 
 - 方法1:
 



















