这期只是针对springBoot/Cloud 在使用SpringAmqp消息队列的时候遇到的坑。
前提
如果没有安装RabbitMQ是无法连接成功的!所以前提是你要安装好RabbitMQ。
docker 安装命令
# 拉取docker镜像
docker pull rabbitmq:management
# 创建容器
docker run -id --name=rabbitmq \
-v /home/rabbitmq:/var/lib/rabbitmq \
-p 15672:15672 \
-p 5672:5672 \
-e RABBITMQ_DEFAULT_USER=guest\
-e RABBITMQ_DEFAULT_PASS=guest rabbitmq:management
这里你也可以映射更多接口
- 4369, 25672 -- erlang 发现口 & 集群端口
- 5671, 5672 --client 端通信口(RabbitMq 的编程语言客户端连接端口)(AMOP 端口)
- 15672 -- 管理界面 ui 端口(RabbitMq 管理界面端口)
- 25672 -- server 间内部通信口(RabbitMq 集群的端口)
- 61613, 61614 -- STOMP 协议端口
- 1883, 8883 -- MQTT 协议端口
安装好访问的页面大概这样:
1、引入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2、配置yml
spring:
#消息总线 也可以使用 kafka 参考 spring-cloud-bus 用法
rabbitmq:
host: 192.168.10.111
port: 5672
virtual-host: /
username: guest
password: guest
3、发送消息
@RequiredArgsConstructor
@RequestMapping("/test")
public class AmqpController {
private final RabbitTemplate rabbitTemplate;
/**
* 简单发送队列消息
**/
@GetMapping("/senMsg")
public void senMsg(){
rabbitTemplate.convertAndSend("队列名称","消息");
}
}
这里会有个坑,如果队列不存在,那这条消息就不会发送成功!可能是为了防止产生一堆无用的队列吧。
所以,我们需要在发送之前先校验下有没有队列,如果没有就给他创建一个
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import lombok.RequiredArgsConstructor;
import org.dromara.system.controller.test.config.RabbitmqConfig;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Objects;
import java.util.concurrent.TimeoutException;
@Component
@RequiredArgsConstructor
public class PanRabbitTemplate {
private final RabbitTemplate rabbitTemplate;
private final RabbitmqConfig rabbitmqConfig;
public void convertAndSend(String queueName,String msg) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(rabbitTemplate);
//如果queueName队列不存在,创建队列
if (Objects.isNull(rabbitAdmin.getQueueProperties(queueName))) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(rabbitmqConfig.getHost());
factory.setPort(rabbitmqConfig.getPort());
factory.setUsername(rabbitmqConfig.getUsername());
factory.setPassword(rabbitmqConfig.getPassword());
factory.setVirtualHost(rabbitmqConfig.getVirtualHost());
try {
// 创建连接
Connection connection = factory.newConnection();
// 创建信道
Channel channel = connection.createChannel();
// 创建队列
channel.queueDeclare(queueName, false, false, false, null);
// 关闭通联和连接
channel.close();
connection.close();
}catch (IOException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
throw new RuntimeException(e);
}
}
rabbitTemplate.convertAndSend(queueName, msg);
}
}
RabbitmqConfig 类
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
@Component
@Data
@ConfigurationProperties(prefix="spring.rabbitmq")
public class RabbitmqConfig {
private String host;
private Integer port;
private String username;
private String password;
private String virtualHost;
}
4、接收消息
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
@Service
public class SpringRabbitListener {
/**
* 消费者:没有队列就报错
* @param msg
*/
@RabbitListener(queues = "queueName1")
public void onMessage(String msg){
System.out.println("spring.queue: " + msg);
}
/**
* 消费者:没有队列就自动创建队列
* @param msg
*/
@RabbitListener(queuesToDeclare = { @Queue(value = "test.queue",durable = "true", autoDelete = "false") })
public void onMessage2(String msg) {
System.out.println("spring.queue: " + msg);
}
}
这里注意,如果只是简单的
@RabbitListener(queues = "queueName1")
,如果你的队列里面没有这个队列他会报错,所以还是建议用下面的方式!