使用RabbitMQ实现微服务间的异步消息传递
- RabbitMQ简介
- 安装RabbitMQ
- 在Ubuntu上安装RabbitMQ
- 在CentOS上安装RabbitMQ
 
- 配置RabbitMQ
- 创建微服务
- 生产者服务
- 安装依赖
- 生产者代码
 
- 消费者服务
- 消费者代码
 
 
- 运行微服务
- 消息模式
- 直接模式
- 生产者代码
- 消费者代码
 
- 扇出模式
- 生产者代码
- 消费者代码
 
- 主题模式
- 生产者代码
- 消费者代码
 
 
- 高级特性
- 持久化
- 生产者代码
- 消费者代码
 
- 确认机制
- 消费者代码
 
 
- 监控和日志
- 监控
- 日志
 
- 故障排除
- 总结
 
 
在现代分布式系统中,微服务架构越来越受到欢迎。微服务之间需要进行高效、可靠的消息传递。RabbitMQ作为一个成熟的开源消息中间件,能够很好地满足这一需求。本文将详细介绍如何使用RabbitMQ实现微服务间的异步消息传递。
RabbitMQ是一个开源的消息代理和队列服务器,基于AMQP(Advanced Message Queuing Protocol)协议。它支持多种消息模式,如发布/订阅、路由、主题等。 RabbitMQ可以在多种操作系统上安装,包括Linux、macOS和Windows。sudo apt-get update
sudo apt-get install rabbitmq-server
sudo systemctl start rabbitmq-server
sudo systemctl enable rabbitmq-server
sudo yum install epel-release
sudo yum install rabbitmq-server
sudo systemctl start rabbitmq-server
sudo systemctl enable rabbitmq-server
sudo rabbitmq-plugins enable rabbitmq_management
sudo systemctl restart rabbitmq-server
pip install pika
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
message = 'Hello World!'
channel.basic_publish(exchange='', routing_key='hello', body=message)
print(f'Sent: {message}')
connection.close()
import pika
def on_message_received(ch, method, properties, body):
    print(f'Received: {body.decode()}')
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
channel.basic_consume(queue='hello', auto_ack=True, on_message_callback=on_message_received)
print('Waiting for messages...')
channel.start_consuming()
# 启动消费者服务
python consumer.py
# 启动生产者服务
python producer.py
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='direct_queue')
message = 'Direct message'
channel.basic_publish(exchange='', routing_key='direct_queue', body=message)
print(f'Sent: {message}')
connection.close()
import pika
def on_message_received(ch, method, properties, body):
    print(f'Received: {body.decode()}')
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='direct_queue')
channel.basic_consume(queue='direct_queue', auto_ack=True, on_message_callback=on_message_received)
print('Waiting for messages...')
channel.start_consuming()
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='fanout_exchange', exchange_type='fanout')
message = 'Fanout message'
channel.basic_publish(exchange='fanout_exchange', routing_key='', body=message)
print(f'Sent: {message}')
connection.close()
import pika
def on_message_received(ch, method, properties, body):
    print(f'Received: {body.decode()}')
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='fanout_exchange', exchange_type='fanout')
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='fanout_exchange', queue=queue_name)
channel.basic_consume(queue=queue_name, auto_ack=True, on_message_callback=on_message_received)
print('Waiting for messages...')
channel.start_consuming()
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_exchange', exchange_type='topic')
routing_key = 'kern.critical'
message = 'Critical kernel message'
channel.basic_publish(exchange='topic_exchange', routing_key=routing_key, body=message)
print(f'Sent: {message}')
connection.close()
import pika
def on_message_received(ch, method, properties, body):
    print(f'Received: {body.decode()}')
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_exchange', exchange_type='topic')
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
binding_keys = ['*.critical', 'kern.*']
for binding_key in binding_keys:
    channel.queue_bind(exchange='topic_exchange', queue=queue_name, routing_key=binding_key)
channel.basic_consume(queue=queue_name, auto_ack=True, on_message_callback=on_message_received)
print('Waiting for messages...')
channel.start_consuming()
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='durable_queue', durable=True)
message = 'Persistent message'
channel.basic_publish(exchange='', routing_key='durable_queue', body=message, properties=pika.BasicProperties(delivery_mode=2))
print(f'Sent: {message}')
connection.close()
import pika
def on_message_received(ch, method, properties, body):
    print(f'Received: {body.decode()}')
    ch.basic_ack(delivery_tag=method.delivery_tag)
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='durable_queue', durable=True)
channel.basic_consume(queue='durable_queue', on_message_callback=on_message_received)
print('Waiting for messages...')
channel.start_consuming()
import pika
def on_message_received(ch, method, properties, body):
    print(f'Received: {body.decode()}')
    ch.basic_ack(delivery_tag=method.delivery_tag)
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='ack_queue')
channel.basic_consume(queue='ack_queue', on_message_callback=on_message_received)
print('Waiting for messages...')
channel.start_consuming()
sudo rabbitmqctl status
sudo journalctl -u rabbitmq-server
 
 
 
 
 
使用RabbitMQ可以显著提高微服务间消息传递的可靠性和效率。



















