RabbitMQ安装部署详情可见:RabbitMQ简介及在Linux中安装部署(yum)
一、导入pom.xml依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.8.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-nop</artifactId>
<version>1.7.29</version>
</dependency>
二、入门测试案例
发送消息类:Send.java
public class Send {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置RabbitMQ地址
factory.setHost("192.168.119.129");
factory.setUsername("admin");
factory.setPassword("password");
//建立连接
Connection connection = factory.newConnection();
//获得信道
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//发布消息
String message = "你好,老6";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
System.out.println("发送了消息:" + message);
//关闭连接
channel.close();
connection.close();
}
}
接收消息类:Recv.java
public class Recv {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置RabbitMQ地址
factory.setHost("192.168.119.129");
factory.setUsername("admin");
factory.setPassword("password");
//建立连接
Connection connection = factory.newConnection();
//获得信道
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//接收消息并消费
channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("收到消息:" + message);
}
});
}
}
运行结果:
三、多任务消息队列案例
发送多任务消息类NewTask.java
public class NewTask {
private final static String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置RabbitMQ地址
factory.setHost("192.168.119.129");
factory.setUsername("admin");
factory.setPassword("password");
//建立连接
Connection connection = factory.newConnection();
//获得信道
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
for (int i = 0; i < 10; i++) {
String message;
if (i % 2 == 0) {
message = i + "...";
} else {
message = String.valueOf(i);
}
channel.basicPublish("", TASK_QUEUE_NAME, null, message.getBytes("UTF-8"));
System.out.println("发送了消息:" + message);
}
channel.close();
connection.close();
}
}
接收多任务消息类Worker.java
public class Worker {
private final static String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置RabbitMQ地址
factory.setHost("192.168.119.129");
factory.setUsername("admin");
factory.setPassword("password");
//建立连接
Connection connection = factory.newConnection();
//获得信道
final Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
System.out.println("开始接收消息");
channel.basicQos(1);
channel.basicConsume(TASK_QUEUE_NAME, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("收到了消息:" + message);
try {
doWork(message);
} finally {
System.out.println("消息处理完成");
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
});
}
private static void doWork(String task) {
char[] chars = task.toCharArray();
for (char ch : chars) {
if (ch == '.') {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
运行结果:
四、接收指定消息案例
发送消息类:EmitLogDirect.java
public class EmitLogDirect {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.119.129");
factory.setUsername("admin");
factory.setPassword("password");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
String message = "info:Hello World!";
channel.basicPublish(EXCHANGE_NAME, "info", null, message.getBytes("UTF-8"));
System.out.println("发送了消息," + "等级为info,消息内容:" + message);
message = "warning:Hello World!";
channel.basicPublish(EXCHANGE_NAME, "warning", null, message.getBytes("UTF-8"));
System.out.println("发送了消息," + "等级为warning,消息内容:" + message);
message = "error:Hello World!";
channel.basicPublish(EXCHANGE_NAME, "error", null, message.getBytes("UTF-8"));
System.out.println("发送了消息," + "等级为error,消息内容:" + message);
channel.close();
connection.close();
}
}
接收所有消息类:ReceiveLogsDirect1.java
public class ReceiveLogsDirect1 {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.119.129");
factory.setUsername("admin");
factory.setPassword("password");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//生成一个随机的临时的queue
String queueName = channel.queueDeclare().getQueue();
//一个交换机同时绑定3个queue
channel.queueBind(queueName, EXCHANGE_NAME, "info");
channel.queueBind(queueName, EXCHANGE_NAME, "warning");
channel.queueBind(queueName, EXCHANGE_NAME, "error");
System.out.println("开始接收消息");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("收到消息:" + message);
}
};
channel.basicConsume(queueName, true, consumer);
}
}
仅仅接收error消息类:ReceiveLogsDirect2.java
public class ReceiveLogsDirect2 {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.119.129");
factory.setUsername("admin");
factory.setPassword("password");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//生成一个随机的临时的queue
String queueName = channel.queueDeclare().getQueue();
//一个交换机绑定1个queue
channel.queueBind(queueName, EXCHANGE_NAME, "error");
System.out.println("开始接收消息");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("收到消息:" + message);
}
};
channel.basicConsume(queueName, true, consumer);
}
}
运行结果: