内嵌式mqtt server

news2025/6/8 3:32:48

添加moquette依赖

 <dependency>
     <groupId>io.moquette</groupId>
     <artifactId>moquette-broker</artifactId>
     <version>0.17</version>
     <exclusions>
          <exclusion>
              <groupId>org.slf4j</groupId>
              <artifactId>slf4j-reload4j</artifactId>
          </exclusion>
      </exclusions>
</dependency>

配置文件类MoquetteProperties

package com.mqtt.config;

import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;

@Data
@Configuration
public class MoquetteProperties {

    @Value("${mqtt.port:1883}")
    private String mqttPort;

    @Value("${mqtt.host:0.0.0.0}")
    private String mqttHost;

    @Value("${mqtt.allow_anonymous:false}")
    private String allowAnonymous;

    @Value("${mqtt.username:admin}")
    private String username;

    @Value("${mqtt.password:moque123432}")
    private String password;

}

权限认证类CustomAuthenticator

package com.mqtt.interceptor;

import com.mqtt.config.MoquetteProperties;
import io.moquette.broker.security.IAuthenticator;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class CustomAuthenticator implements IAuthenticator {

    @Autowired
    private MoquetteProperties moquetteProperties;

    @Override
    public boolean checkValid(String clientId, String username, byte[] password) {
        String passwordStr = new String(password);
        if (moquetteProperties.getUsername().equals(username)
                && moquetteProperties.getPassword().equals(passwordStr)) {
            return true;
        }
        log.error("CustomAuthenticator checkValid: 用户名或密码错误");
        return false;
    }
}

消息拦截类MqttMessageInterceptor

package com.mqtt.interceptor;

import io.moquette.interception.AbstractInterceptHandler;
import io.moquette.interception.messages.InterceptPublishMessage;
import io.netty.buffer.ByteBuf;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import java.nio.charset.StandardCharsets;

@Slf4j
@Component
public class MqttMessageInterceptor extends AbstractInterceptHandler {

    @Override
    public String getID() {
        return MqttMessageInterceptor.class.getName();
    }

    @Override
    public void onPublish(InterceptPublishMessage msg) {
        String clientId = msg.getClientID();
        String topic = msg.getTopicName();
        // 获取消息的有效载荷
        ByteBuf payload = msg.getPayload();
        String content = safeReadByteBuf(payload);
        log.info("MqttMessageInterceptor Received message - Client: {}, Topic: {}, Payload: {}",
                clientId, topic, content);
    }

    @Override
    public void onSessionLoopError(Throwable error) {
        log.error("MqttMessageInterceptor onSessionLoopError", error);
    }

    /**
     * 安全读取 ByteBuf 数据
     */
    private String safeReadByteBuf(ByteBuf byteBuf) {
        try {
            if (byteBuf == null || !byteBuf.isReadable()) {
                return "";
            }
            if (byteBuf.hasArray()) {
                // 堆内缓冲区
                byte[] array = byteBuf.array();
                int offset = byteBuf.arrayOffset() + byteBuf.readerIndex();
                int length = byteBuf.readableBytes();
                return new String(array, offset, length, StandardCharsets.UTF_8);
            } else {
                // 堆外缓冲区
                byte[] array = new byte[byteBuf.readableBytes()];
                byteBuf.getBytes(byteBuf.readerIndex(), array);
                return new String(array, StandardCharsets.UTF_8);
            }
        } finally {
            // 确保释放 ByteBuf 资源
            if (byteBuf != null && byteBuf.refCnt() > 0) {
                byteBuf.release();
            }
        }
    }
}

MQTT Server类MoquetteBrokerConfig

参考
https://github.com/mochi-mqtt/server/blob/main/examples/hooks/main.go#L22

package com.mqtt.config;

import com.mqtt.interceptor.CustomAuthenticator;
import com.mqtt.interceptor.MqttMessageInterceptor;
import io.moquette.broker.Server;
import io.moquette.broker.config.IConfig;
import io.moquette.broker.config.MemoryConfig;
import io.moquette.interception.InterceptHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.io.IOException;
import java.util.List;
import java.util.Properties;

@Slf4j
@Configuration
public class MoquetteBrokerConfig {

    @Autowired
    private MoquetteProperties moquetteProperties;

    @Bean(destroyMethod = "stopServer")
    public Server mqttBroker(MqttMessageInterceptor interceptor, CustomAuthenticator customAuthenticator) {
        // 创建 Moquette 的配置
        Properties properties = new Properties();
        // 设置监听端口为 1883
        properties.setProperty(IConfig.PORT_PROPERTY_NAME, moquetteProperties.getMqttPort());
        // 监听所有网络接口
        properties.setProperty(IConfig.HOST_PROPERTY_NAME, moquetteProperties.getMqttHost());
        // 允许匿名连接
        properties.setProperty(IConfig.ALLOW_ANONYMOUS_PROPERTY_NAME, moquetteProperties.getAllowAnonymous());
        IConfig config = new MemoryConfig(properties);
        // 初始化 Moquette 服务器
        Server mqttServer = new Server();
        List<InterceptHandler> handlers = List.of(interceptor);
        try {
            mqttServer.startServer(config, handlers, null, customAuthenticator, null);
        } catch (IOException e) {
            log.error("启动内嵌式MQTT服务器失败", e);
            throw new RuntimeException(e);
        }
        log.info("内嵌式MQTT服务器已启动,监听端口: 1883");
        return mqttServer;
    }
}

go 内嵌mqtt

配置

package constants

const (
	MqttUser     = "admin"
	MqttPassword = "bydmqtt123432"
	MqttPort     = "1883"
)

MQTT Server

package mqtt

import (
	"bytes"
	"fmt"
	"iaas-server-manager/constants"

	"github.com/google/uuid"
	"github.com/labstack/gommon/log"

	mqtt "github.com/mochi-mqtt/server/v2"
	"github.com/mochi-mqtt/server/v2/hooks/auth"
	"github.com/mochi-mqtt/server/v2/listeners"
	"github.com/mochi-mqtt/server/v2/packets"
)

// CustomAuthHook 实现自定义认证
type CustomAuthHook struct {
	auth.Hook
}

func (h *CustomAuthHook) OnConnectAuthenticate(cl *mqtt.Client, pk packets.Packet) bool {
	username := pk.Connect.Username
	password := pk.Connect.Password
	if len(username) == 0 || len(password) == 0 {
		return false
	}
	// 自定义认证逻辑
	if string(username) == constants.MqttUser && string(password) == constants.MqttPassword {
		return true
	}
	// 或者检查数据库等外部系统
	// user := db.GetUser(pk.Username)
	// return user != nil && user.CheckPassword(pk.Password)
	return false
}

// 自定义 Hook 实现
// https://github.com/mochi-mqtt/server/blob/main/examples/hooks/main.go#L22
type MqttCustomHook struct {
	mqtt.HookBase
}

func (h *MqttCustomHook) OnConnect(cl *mqtt.Client, pk packets.Packet) error {
	log.Info("client connected", "client", cl.ID)
	return nil
}

func (h *MqttCustomHook) OnDisconnect(cl *mqtt.Client, err error, expire bool) {
	if err != nil {
		log.Info("client disconnected", "client", cl.ID, "expire", expire, "error", err)
	} else {
		log.Info("client disconnected", "client", cl.ID, "expire", expire)
	}
}

// OnPacketRead 在读取数据包时触发
func (h *MqttCustomHook) OnPacketRead(cl *mqtt.Client, pk packets.Packet) (packets.Packet, error) {
	switch pk.FixedHeader.Type {
	case packets.Publish:
		log.Printf("收到PUBLISH消息 clientId = %s, TopicName = %s, Payload = %s", cl.ID, pk.TopicName, string(pk.Payload))
	case packets.Connect:
		log.Printf("客户端连接clientId = %s", cl.ID)
	case packets.Subscribe:
		log.Printf("客户端订阅clientId = %s", cl.ID)
	}
	return pk, nil // 继续处理数据包
}

func (h *MqttCustomHook) OnSubscribed(cl *mqtt.Client, pk packets.Packet, reasonCodes []byte) {
	log.Printf("subscribed clientId=%s, qos=%v", cl.ID, "filters", reasonCodes)
}

func (h *MqttCustomHook) OnUnsubscribed(cl *mqtt.Client, pk packets.Packet) {
	log.Printf("unsubscribed clientId=%s", cl.ID)
}

func (h *MqttCustomHook) OnPublish(cl *mqtt.Client, pk packets.Packet) (packets.Packet, error) {
	log.Printf("received from clientId=%s, payload=%s", cl.ID, string(pk.Payload))
	return pk, nil
}

func (h *MqttCustomHook) OnPublished(cl *mqtt.Client, pk packets.Packet) {
	log.Printf("published to clientId=%s, payload=%s", cl.ID, string(pk.Payload))
}

func (h *MqttCustomHook) Provides(b byte) bool {
	return bytes.Contains([]byte{
		mqtt.OnConnect,
		mqtt.OnDisconnect,
		mqtt.OnSubscribed,
		mqtt.OnUnsubscribed,
		mqtt.OnPublished,
		mqtt.OnPublish,
		mqtt.OnPacketRead,
	}, []byte{b})
}

type MqttServer struct {
}

// https://github.com/mochi-mqtt/server/blob/main/README-CN.md
// 启动MqttServer
func (ms *MqttServer) StartServer() {
	// Create the new MQTT Server.
	server := mqtt.New(&mqtt.Options{
		InlineClient: true,
	})
	// Allow all connections.
	errAddHook := server.AddHook(new(CustomAuthHook), nil)
	if errAddHook != nil {
		log.Fatal("CustomAuthHook AddHook fail", errAddHook)
		panic(errAddHook)
	}

	errAddHook = server.AddHook(new(MqttCustomHook), nil)
	if errAddHook != nil {
		log.Fatal("MqttCustomHook AddHook fail", errAddHook)
		panic(errAddHook)
	}

	serverId := uuid.New()
	// Create a TCP listener on a standard port.
	tcp := listeners.NewTCP(listeners.Config{ID: fmt.Sprintf("mqtt_%s", serverId), Address: fmt.Sprintf(":%s", constants.MqttPort)})
	err := server.AddListener(tcp)
	if err != nil {
		log.Fatal("AddListener fail", err)
		panic(err)
	}

	go func() {
		err := server.Serve()
		if err != nil {
			log.Fatal("AddListener Serve", err)
			panic(err)
		}
	}()
	log.Info("StartServer start finish, Listener On Port: ", constants.MqttPort)
}

启动

package main

func main() {
	//启动mqtt服务
	mqttServer := mqtt.MqttServer{}
	mqttServer.StartServer()
}

mqtt客户端

添加依赖

<dependency>
    <groupId>org.eclipse.paho</groupId>
    <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
    <version>1.2.5</version>
</dependency>

订阅消息

package com.olive;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;

public class MqttReceiver2 {

	public static void main(String[] args) {
		String broker = "tcp://localhost:1883";
		String clientId = "ReceiverClient2";
		String topic = "exclusive/test/topic";
		String username = "admin";
		String password = "moque123432";

		try {
			MqttClient client = new MqttClient(broker, clientId);
			MqttConnectOptions options = new MqttConnectOptions();
			options.setUserName(username);
			options.setPassword(password.toCharArray());
			options.setCleanSession(true);

			client.setCallback(new MqttCallback() {

				@Override
				public void messageArrived(String topic, MqttMessage message) throws Exception {
					System.out.println("收到消息: " + new String(message.getPayload()));
				}

				@Override
				public void deliveryComplete(IMqttDeliveryToken token) {
					// 不需要处理
				}

				@Override
				public void connectionLost(Throwable cause) {
					System.out.println("连接丢失");

				}
			});

			client.connect(options);
			System.out.println("接收方客户端已连接");

			client.subscribe(topic);
			System.out.println("已订阅主题: " + topic);

		} catch (MqttException e) {
			e.printStackTrace();
		}
	}
}

发布消息

package com.olive;

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;

public class MqttSender {

    public static void main(String[] args) {
        String broker = "tcp://localhost:1883";
        String clientId = "SenderClient";
        String topic = "patrol/publish/result";
        String content = "Hello from SenderClient";
        String username = "admin";
        String password = "bydmqtt123432";

        try {
            MqttClient client = new MqttClient(broker, clientId);
            MqttConnectOptions options = new MqttConnectOptions();
            options.setUserName(username);
            options.setPassword(password.toCharArray());
            options.setCleanSession(true);

            client.connect(options);
            System.out.println("发送方客户端已连接");

            MqttMessage message = new MqttMessage(content.getBytes());
            message.setQos(1); // 设置 QoS 级别为 1

            client.publish(topic, message);
            System.out.println("消息已发送到主题: " + topic);

            client.disconnect();
            System.out.println("发送方客户端已断开连接");

        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
}

go语言发布消息

package main

import (
	"fmt"
	"log"
	"time"

	mqtt "github.com/eclipse/paho.mqtt.golang"
)

var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
	fmt.Printf("收到消息: %s from topic: %s\n", msg.Payload(), msg.Topic())
}

var connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) {
	log.Println("已连接到 MQTT Broker")

	// 订阅主题
	topic := "exclusive/test/topic"
	qos := byte(0)
	token := client.Subscribe(topic, qos, messagePubHandler)
	token.Wait()
	if token.Error() != nil {
		log.Printf("订阅失败: %v\n", token.Error())
	} else {
		log.Printf("已订阅主题: %s\n", topic)
	}
}

var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
	fmt.Printf("连接丢失: %v", err)
}

func main() {
	// 设置 MQTT Broker 的地址
	broker := "tcp://localhost:1883"
	clientID := "go_mqtt_client"

	// 创建客户端选项
	opts := mqtt.NewClientOptions()
	opts.AddBroker(broker)
	opts.SetUsername("admin")
	opts.SetPassword("bydmqtt123432")
	opts.SetClientID(clientID)
	opts.SetAutoReconnect(true)                    // 启用自动重连
	opts.SetResumeSubs(true)                       // 重连后恢复订阅
	opts.SetMaxReconnectInterval(30 * time.Second) // 最大重连间隔

	// 设置回调函数
	opts.OnConnect = connectHandler
	opts.OnConnectionLost = connectLostHandler

	// 创建客户端
	client := mqtt.NewClient(opts)

	// 连接到 Broker
	if token := client.Connect(); token.Wait() && token.Error() != nil {
		fmt.Println("连接失败:", token.Error())
		return
	}
	fmt.Println("成功连接到 MQTT Broker")

	// // 订阅主题
	// topic := "exclusive/test/topic"
	// qos := byte(0)
	// token := client.Subscribe(topic, qos, messagePubHandler)
	// token.Wait()
	// if token.Error() != nil {
	// 	fmt.Println("订阅失败:", token.Error())
	// 	return
	// }
	// fmt.Println("成功订阅主题:", topic)

	// 发布消息
	topic := "exclusive/test/topic/1"
	payload := "back Hello, MQTT from Go!"
	qos := byte(0)
	retain := false
	token := client.Publish(topic, qos, retain, payload)
	token.Wait()
	if token.Error() != nil {
		fmt.Println("发布失败:", token.Error())
		return
	}

	// 保持程序运行以接收消息
	for {
		time.Sleep(1 * time.Second)
	}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2403676.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

二叉树的遍历总结

144.二叉树的前序遍历(opens new window)145.二叉树的后序遍历(opens new window)94.二叉树的中序遍历 二叉数的先中后序统一遍历法 public static void preOrder(BiTree root){BiTree p root;LinkedList<BiTree> stack new LinkedList<>();while(p ! null ||…

win32相关(远程线程和远程线程注入)

远程线程和远程线程注入 CreateRemoteThread函数 作用&#xff1a;创建在另一个进程的虚拟地址空间中运行的线程 HANDLE CreateRemoteThread([in] HANDLE hProcess, // 需要在哪个进程中创建线程[in] LPSECURITY_ATTRIBUTES lpThreadAttributes, // 安全…

[Spring]-AOP

AOP场景 AOP: Aspect Oriented Programming (面向切面编程) OOP: Object Oriented Programming (面向对象编程) 场景设计 设计: 编写一个计算器接口和实现类&#xff0c;提供加减乘除四则运算 需求: 在加减乘除运算的时候需要记录操作日志(运算前参数、运算后结果)实现方案:…

agent 开发

什么是 agent&#xff1f; Agent智能体&#xff08;又称AI Agent&#xff09;是一种具备自主感知、决策与行动能力的智能系统&#xff0c;其核心在于模仿人类的认知过程来处理复杂任务。以下是其关键特性和发展现状的综合分析&#xff1a; 一、核心定义与特征 #‌## 自主决策…

Golang——5、函数详解、time包及日期函数

函数详解、time包及日期函数 1、函数1.1、函数定义1.2、函数参数1.3、函数返回值1.4、函数类型与变量1.5、函数作参数和返回值1.6、匿名函数、函数递归和闭包1.7、defer语句1.8、panic和recover 2、time包以及日期函数2.1、time.Now()获取当前时间2.2、Format方法格式化输出日期…

深度学习环境配置指南:基于Anaconda与PyCharm的全流程操作

一、环境搭建前的准备 1. 查看基础环境位置 conda env list 操作说明&#xff1a;通过该命令确认Anaconda默认环境&#xff08;base&#xff09;所在磁盘路径&#xff08;如D盘&#xff09;&#xff0c;后续操作需跳转至该磁盘根目录。 二、创建与激活独立虚拟环境 1. 创…

打卡day46

知识点回顾&#xff1a; 不同CNN层的特征图&#xff1a;不同通道的特征图什么是注意力&#xff1a;注意力家族&#xff0c;类似于动物园&#xff0c;都是不同的模块&#xff0c;好不好试了才知道。通道注意力&#xff1a;模型的定义和插入的位置通道注意力后的特征图和热力图 内…

在SpringBoot中使用AWS SDK实现邮箱验证码服务

1.依赖导入&#xff08;maven&#xff09; <dependency><groupId>software.amazon.awssdk</groupId><artifactId>ses</artifactId><version>2.31.46</version></dependency> 2.申请两个key 发件人邮箱需要验证&#xff1a; …

深入理解二叉搜索树:原理到实践

1.二叉搜索树的概念 ⼆叉搜索树⼜称⼆叉排序树&#xff0c;它或者是⼀棵空树&#xff0c;或者是具有以下性质的⼆叉树 若它的左树不为空&#xff0c;则左子树上所有节点的值都小于或等于根节点的值。若它的右树不为空&#xff0c;则右子树上所有节点的值都大于或等于根节点的…

测试W5500的第11步_使用ARP解析IP地址对应的MAC地址

本文介绍了基于W5500芯片的ARP协议实现方法&#xff0c;详细阐述了ARP请求与回复的工作机制。ARP协议通过广播请求和单播回复实现IP地址与MAC地址的映射&#xff0c;确保局域网设备间的可靠通信。文章提供了完整的STM32F10x开发环境下的代码实现&#xff0c;包括网络初始化、SP…

终极数据结构详解:从理论到实践

终极数据结构详解&#xff1a;从理论到实践 我将从 底层原理、时间复杂度、空间优化、实际应用 和 代码实现 五个维度&#xff0c;彻底解析数据结构。内容涵盖&#xff1a; 线性结构&#xff08;数组、链表、栈、队列&#xff09;非线性结构&#xff08;树、图&#xff09;高…

【k8s】k8s集群搭建

k8s集群搭建 一、环境准备1.1 集群类型1.2 安装方式1.3 主机规划1.4 环境配置1.4.1 说明1.4.2 初始化1.4.3 关闭防火墙和禁止防火墙开机启动1.4.4 设置主机名1.4.5 主机名解析1.4.6 时间同步1.4.7 关闭selinux1.4.8 关闭swap分区1.4.9 将桥接的IPv4流量传递到iptables的链1.4.1…

60天python训练计划----day45

DAY 45 Tensorboard使用介绍 知识点回顾&#xff1a; tensorboard的发展历史和原理tensorboard的常见操作tensorboard在cifar上的实战&#xff1a;MLP和CNN模型 之前的内容中&#xff0c;我们在神经网络训练中&#xff0c;为了帮助自己理解&#xff0c;借用了很多的组件&#x…

C# Wkhtmltopdf HTML转PDF碰到的问题

最近碰到一个Html转PDF的需求&#xff0c;看了一下基本上都是需要依赖Wkhtmltopdf&#xff0c;需要在Windows或者linux安装这个可以后使用。找了一下选择了HtmlToPDFCore&#xff0c;这个库是对Wkhtmltopdf.NetCore简单二次封装&#xff0c;这个库的好处就是通过NuGet安装HtmlT…

Vue3 (数组push数据报错) 解决Cannot read property ‘push‘ of null报错问题

解决Cannot read property ‘push‘ of null报错问题 错误写法 定义变量 <script setup>const workList ref([{name:,value:}])</script>正确定义变量 <script setup>const workList ref([]) </script>解决咯~

html文字红色粗体,闪烁渐变动画效果,中英文切换版本

1. 代码 <!DOCTYPE html> <html lang"zh"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><title>红色粗体闪烁文字表格 - 中英文切换</t…

基于Django开发的运动商城系统项目

运动商城系统项目描述 运动商城系统是一个基于现代Web技术构建的电子商务平台&#xff0c;专注于运动类商品的在线销售与管理。该系统采用前后端分离架构&#xff0c;前端使用Vue.js实现动态交互界面&#xff0c;后端基于Django框架提供RESTful API支持&#xff0c;数据库采用…

Python60日基础学习打卡Day45

之前的神经网络训练中&#xff0c;为了帮助理解借用了很多的组件&#xff0c;比如训练进度条、可视化的loss下降曲线、权重分布图&#xff0c;运行结束后还可以查看单张图的推理效果。 如果现在有一个交互工具可以很简单的通过按钮完成这些辅助功能那就好了&#xff0c;他就是…

【Visual Studio 2022】卸载安装,ASP.NET

Visual Studio 2022 彻底卸载教程 手动清理残留文件夹 删除C:\Program Files\Microsoft Visual Studio 是旧版本 Visual Studio 的残留安装目录 文件夹名对应的 Visual Studio 版本Microsoft Visual Studio 9.0Visual Studio 2008Microsoft Visual Studio 10.0Visual Studio…

thinkphp-queue队列随笔

安装 # 创建项目 composer create-project topthink/think 5.0.*# 安装队列扩展 composer require topthink/think-queue 配置 // application/extra/queue.php<?php return [connector > Redis, // Redis 驱动expire > 0, // 任务的过期时间…