添加moquette依赖
<dependency>
<groupId>io.moquette</groupId>
<artifactId>moquette-broker</artifactId>
<version>0.17</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-reload4j</artifactId>
</exclusion>
</exclusions>
</dependency>
配置文件类MoquetteProperties
package com.mqtt.config;
import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
@Data
@Configuration
public class MoquetteProperties {
@Value("${mqtt.port:1883}")
private String mqttPort;
@Value("${mqtt.host:0.0.0.0}")
private String mqttHost;
@Value("${mqtt.allow_anonymous:false}")
private String allowAnonymous;
@Value("${mqtt.username:admin}")
private String username;
@Value("${mqtt.password:moque123432}")
private String password;
}
权限认证类CustomAuthenticator
package com.mqtt.interceptor;
import com.mqtt.config.MoquetteProperties;
import io.moquette.broker.security.IAuthenticator;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class CustomAuthenticator implements IAuthenticator {
@Autowired
private MoquetteProperties moquetteProperties;
@Override
public boolean checkValid(String clientId, String username, byte[] password) {
String passwordStr = new String(password);
if (moquetteProperties.getUsername().equals(username)
&& moquetteProperties.getPassword().equals(passwordStr)) {
return true;
}
log.error("CustomAuthenticator checkValid: 用户名或密码错误");
return false;
}
}
消息拦截类MqttMessageInterceptor
package com.mqtt.interceptor;
import io.moquette.interception.AbstractInterceptHandler;
import io.moquette.interception.messages.InterceptPublishMessage;
import io.netty.buffer.ByteBuf;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;
@Slf4j
@Component
public class MqttMessageInterceptor extends AbstractInterceptHandler {
@Override
public String getID() {
return MqttMessageInterceptor.class.getName();
}
@Override
public void onPublish(InterceptPublishMessage msg) {
String clientId = msg.getClientID();
String topic = msg.getTopicName();
// 获取消息的有效载荷
ByteBuf payload = msg.getPayload();
String content = safeReadByteBuf(payload);
log.info("MqttMessageInterceptor Received message - Client: {}, Topic: {}, Payload: {}",
clientId, topic, content);
}
@Override
public void onSessionLoopError(Throwable error) {
log.error("MqttMessageInterceptor onSessionLoopError", error);
}
/**
* 安全读取 ByteBuf 数据
*/
private String safeReadByteBuf(ByteBuf byteBuf) {
try {
if (byteBuf == null || !byteBuf.isReadable()) {
return "";
}
if (byteBuf.hasArray()) {
// 堆内缓冲区
byte[] array = byteBuf.array();
int offset = byteBuf.arrayOffset() + byteBuf.readerIndex();
int length = byteBuf.readableBytes();
return new String(array, offset, length, StandardCharsets.UTF_8);
} else {
// 堆外缓冲区
byte[] array = new byte[byteBuf.readableBytes()];
byteBuf.getBytes(byteBuf.readerIndex(), array);
return new String(array, StandardCharsets.UTF_8);
}
} finally {
// 确保释放 ByteBuf 资源
if (byteBuf != null && byteBuf.refCnt() > 0) {
byteBuf.release();
}
}
}
}
MQTT Server类MoquetteBrokerConfig
参考
https://github.com/mochi-mqtt/server/blob/main/examples/hooks/main.go#L22
package com.mqtt.config;
import com.mqtt.interceptor.CustomAuthenticator;
import com.mqtt.interceptor.MqttMessageInterceptor;
import io.moquette.broker.Server;
import io.moquette.broker.config.IConfig;
import io.moquette.broker.config.MemoryConfig;
import io.moquette.interception.InterceptHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.io.IOException;
import java.util.List;
import java.util.Properties;
@Slf4j
@Configuration
public class MoquetteBrokerConfig {
@Autowired
private MoquetteProperties moquetteProperties;
@Bean(destroyMethod = "stopServer")
public Server mqttBroker(MqttMessageInterceptor interceptor, CustomAuthenticator customAuthenticator) {
// 创建 Moquette 的配置
Properties properties = new Properties();
// 设置监听端口为 1883
properties.setProperty(IConfig.PORT_PROPERTY_NAME, moquetteProperties.getMqttPort());
// 监听所有网络接口
properties.setProperty(IConfig.HOST_PROPERTY_NAME, moquetteProperties.getMqttHost());
// 允许匿名连接
properties.setProperty(IConfig.ALLOW_ANONYMOUS_PROPERTY_NAME, moquetteProperties.getAllowAnonymous());
IConfig config = new MemoryConfig(properties);
// 初始化 Moquette 服务器
Server mqttServer = new Server();
List<InterceptHandler> handlers = List.of(interceptor);
try {
mqttServer.startServer(config, handlers, null, customAuthenticator, null);
} catch (IOException e) {
log.error("启动内嵌式MQTT服务器失败", e);
throw new RuntimeException(e);
}
log.info("内嵌式MQTT服务器已启动,监听端口: 1883");
return mqttServer;
}
}
go 内嵌mqtt
配置
package constants
const (
MqttUser = "admin"
MqttPassword = "bydmqtt123432"
MqttPort = "1883"
)
MQTT Server
package mqtt
import (
"bytes"
"fmt"
"iaas-server-manager/constants"
"github.com/google/uuid"
"github.com/labstack/gommon/log"
mqtt "github.com/mochi-mqtt/server/v2"
"github.com/mochi-mqtt/server/v2/hooks/auth"
"github.com/mochi-mqtt/server/v2/listeners"
"github.com/mochi-mqtt/server/v2/packets"
)
// CustomAuthHook 实现自定义认证
type CustomAuthHook struct {
auth.Hook
}
func (h *CustomAuthHook) OnConnectAuthenticate(cl *mqtt.Client, pk packets.Packet) bool {
username := pk.Connect.Username
password := pk.Connect.Password
if len(username) == 0 || len(password) == 0 {
return false
}
// 自定义认证逻辑
if string(username) == constants.MqttUser && string(password) == constants.MqttPassword {
return true
}
// 或者检查数据库等外部系统
// user := db.GetUser(pk.Username)
// return user != nil && user.CheckPassword(pk.Password)
return false
}
// 自定义 Hook 实现
// https://github.com/mochi-mqtt/server/blob/main/examples/hooks/main.go#L22
type MqttCustomHook struct {
mqtt.HookBase
}
func (h *MqttCustomHook) OnConnect(cl *mqtt.Client, pk packets.Packet) error {
log.Info("client connected", "client", cl.ID)
return nil
}
func (h *MqttCustomHook) OnDisconnect(cl *mqtt.Client, err error, expire bool) {
if err != nil {
log.Info("client disconnected", "client", cl.ID, "expire", expire, "error", err)
} else {
log.Info("client disconnected", "client", cl.ID, "expire", expire)
}
}
// OnPacketRead 在读取数据包时触发
func (h *MqttCustomHook) OnPacketRead(cl *mqtt.Client, pk packets.Packet) (packets.Packet, error) {
switch pk.FixedHeader.Type {
case packets.Publish:
log.Printf("收到PUBLISH消息 clientId = %s, TopicName = %s, Payload = %s", cl.ID, pk.TopicName, string(pk.Payload))
case packets.Connect:
log.Printf("客户端连接clientId = %s", cl.ID)
case packets.Subscribe:
log.Printf("客户端订阅clientId = %s", cl.ID)
}
return pk, nil // 继续处理数据包
}
func (h *MqttCustomHook) OnSubscribed(cl *mqtt.Client, pk packets.Packet, reasonCodes []byte) {
log.Printf("subscribed clientId=%s, qos=%v", cl.ID, "filters", reasonCodes)
}
func (h *MqttCustomHook) OnUnsubscribed(cl *mqtt.Client, pk packets.Packet) {
log.Printf("unsubscribed clientId=%s", cl.ID)
}
func (h *MqttCustomHook) OnPublish(cl *mqtt.Client, pk packets.Packet) (packets.Packet, error) {
log.Printf("received from clientId=%s, payload=%s", cl.ID, string(pk.Payload))
return pk, nil
}
func (h *MqttCustomHook) OnPublished(cl *mqtt.Client, pk packets.Packet) {
log.Printf("published to clientId=%s, payload=%s", cl.ID, string(pk.Payload))
}
func (h *MqttCustomHook) Provides(b byte) bool {
return bytes.Contains([]byte{
mqtt.OnConnect,
mqtt.OnDisconnect,
mqtt.OnSubscribed,
mqtt.OnUnsubscribed,
mqtt.OnPublished,
mqtt.OnPublish,
mqtt.OnPacketRead,
}, []byte{b})
}
type MqttServer struct {
}
// https://github.com/mochi-mqtt/server/blob/main/README-CN.md
// 启动MqttServer
func (ms *MqttServer) StartServer() {
// Create the new MQTT Server.
server := mqtt.New(&mqtt.Options{
InlineClient: true,
})
// Allow all connections.
errAddHook := server.AddHook(new(CustomAuthHook), nil)
if errAddHook != nil {
log.Fatal("CustomAuthHook AddHook fail", errAddHook)
panic(errAddHook)
}
errAddHook = server.AddHook(new(MqttCustomHook), nil)
if errAddHook != nil {
log.Fatal("MqttCustomHook AddHook fail", errAddHook)
panic(errAddHook)
}
serverId := uuid.New()
// Create a TCP listener on a standard port.
tcp := listeners.NewTCP(listeners.Config{ID: fmt.Sprintf("mqtt_%s", serverId), Address: fmt.Sprintf(":%s", constants.MqttPort)})
err := server.AddListener(tcp)
if err != nil {
log.Fatal("AddListener fail", err)
panic(err)
}
go func() {
err := server.Serve()
if err != nil {
log.Fatal("AddListener Serve", err)
panic(err)
}
}()
log.Info("StartServer start finish, Listener On Port: ", constants.MqttPort)
}
启动
package main
func main() {
//启动mqtt服务
mqttServer := mqtt.MqttServer{}
mqttServer.StartServer()
}
mqtt客户端
添加依赖
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version>
</dependency>
订阅消息
package com.olive;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
public class MqttReceiver2 {
public static void main(String[] args) {
String broker = "tcp://localhost:1883";
String clientId = "ReceiverClient2";
String topic = "exclusive/test/topic";
String username = "admin";
String password = "moque123432";
try {
MqttClient client = new MqttClient(broker, clientId);
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName(username);
options.setPassword(password.toCharArray());
options.setCleanSession(true);
client.setCallback(new MqttCallback() {
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
System.out.println("收到消息: " + new String(message.getPayload()));
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
// 不需要处理
}
@Override
public void connectionLost(Throwable cause) {
System.out.println("连接丢失");
}
});
client.connect(options);
System.out.println("接收方客户端已连接");
client.subscribe(topic);
System.out.println("已订阅主题: " + topic);
} catch (MqttException e) {
e.printStackTrace();
}
}
}
发布消息
package com.olive;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
public class MqttSender {
public static void main(String[] args) {
String broker = "tcp://localhost:1883";
String clientId = "SenderClient";
String topic = "patrol/publish/result";
String content = "Hello from SenderClient";
String username = "admin";
String password = "bydmqtt123432";
try {
MqttClient client = new MqttClient(broker, clientId);
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName(username);
options.setPassword(password.toCharArray());
options.setCleanSession(true);
client.connect(options);
System.out.println("发送方客户端已连接");
MqttMessage message = new MqttMessage(content.getBytes());
message.setQos(1); // 设置 QoS 级别为 1
client.publish(topic, message);
System.out.println("消息已发送到主题: " + topic);
client.disconnect();
System.out.println("发送方客户端已断开连接");
} catch (MqttException e) {
e.printStackTrace();
}
}
}
go语言发布消息
package main
import (
"fmt"
"log"
"time"
mqtt "github.com/eclipse/paho.mqtt.golang"
)
var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
fmt.Printf("收到消息: %s from topic: %s\n", msg.Payload(), msg.Topic())
}
var connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) {
log.Println("已连接到 MQTT Broker")
// 订阅主题
topic := "exclusive/test/topic"
qos := byte(0)
token := client.Subscribe(topic, qos, messagePubHandler)
token.Wait()
if token.Error() != nil {
log.Printf("订阅失败: %v\n", token.Error())
} else {
log.Printf("已订阅主题: %s\n", topic)
}
}
var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
fmt.Printf("连接丢失: %v", err)
}
func main() {
// 设置 MQTT Broker 的地址
broker := "tcp://localhost:1883"
clientID := "go_mqtt_client"
// 创建客户端选项
opts := mqtt.NewClientOptions()
opts.AddBroker(broker)
opts.SetUsername("admin")
opts.SetPassword("bydmqtt123432")
opts.SetClientID(clientID)
opts.SetAutoReconnect(true) // 启用自动重连
opts.SetResumeSubs(true) // 重连后恢复订阅
opts.SetMaxReconnectInterval(30 * time.Second) // 最大重连间隔
// 设置回调函数
opts.OnConnect = connectHandler
opts.OnConnectionLost = connectLostHandler
// 创建客户端
client := mqtt.NewClient(opts)
// 连接到 Broker
if token := client.Connect(); token.Wait() && token.Error() != nil {
fmt.Println("连接失败:", token.Error())
return
}
fmt.Println("成功连接到 MQTT Broker")
// // 订阅主题
// topic := "exclusive/test/topic"
// qos := byte(0)
// token := client.Subscribe(topic, qos, messagePubHandler)
// token.Wait()
// if token.Error() != nil {
// fmt.Println("订阅失败:", token.Error())
// return
// }
// fmt.Println("成功订阅主题:", topic)
// 发布消息
topic := "exclusive/test/topic/1"
payload := "back Hello, MQTT from Go!"
qos := byte(0)
retain := false
token := client.Publish(topic, qos, retain, payload)
token.Wait()
if token.Error() != nil {
fmt.Println("发布失败:", token.Error())
return
}
// 保持程序运行以接收消息
for {
time.Sleep(1 * time.Second)
}
}