RabbitMQ--基础篇

news2025/5/11 14:45:32

RabbitMQ


简介:RabbitMQ 是一种开源的消息队列中间件,你可以把它想象成一个高效的“邮局”。它专门负责在不同应用程序之间传递消息,让系统各部分能松耦合地协作

优势:

  • 异步处理:比如用户注册后,主程序将发送验证邮件的任务扔进队列就立刻返回,邮件服务后续慢慢处理,避免用户等待。

  • 削峰填谷:突然的流量高峰(如秒杀活动)会被队列缓冲,避免服务器被压垮。

  • 智能路由:通过交换机(Exchange)的四种路由策略(直连/主题/广播/头匹配),实现精准投递,比如将VIP用户的订单定向到专属客服队列。

  • 故障恢复:支持消息持久化和确认机制,即使服务器宕机,消息也不会丢失。

同步VS异步(以实际开发为例子进行说明):

  • 同步业务功能的耦合度高,异步耦合度低,可以达到解耦的效果

  • 同步业务流程响应的时间长,异步响应的时间短

  • 同步模式会导致并发压力向后进行传递,异步可以削峰限流

  • 同步模式下系统结构弹性不足,异步模式下系统弹性强,可扩展性强

注意:在实际开发中并不是说异步模式就完全优与同步模式,在一定的场景下使用异步模式是优化系统的架构,但是在一些其它的业务场景下需要同步来保证流程的完整性。所以说异步还是同步要跟据具体业务进行选择。

底层实现:

  • AMQP(Advanced Message Queuing Protocol):AMQP 是 跨语言的通用消息协议适合异构系统间的复杂通信。

  • JMS(Java Message Service):JMS是 Java 专属的 API 标准适合统一 Java 生态的消息处理。

主流的MQ产品对比

对比项RabbitMQActiveMQRocketMQKafka
开发语言ErlangJavaJavaScala/Java
维护方Rabbit(公司)Apache(社区)阿里(公司)Apache(社区)
核心机制基于 AMQP 协议的生产者-消费者模型基于 JMS 的消息传递模型分布式消息队列(Topic + Tag 分类)分布式流处理平台(发布-订阅模型)
协议支持AMQP、STOMP、MQTT、HTTP 插件AMQP、STOMP、OpenWire、REST、MQTT自定义协议(支持 TCP/HTTP)自定义协议(社区封装 HTTP 支持)
客户端语言官方:Erlang、Java、Ruby;社区:多语言Java、C/C++、.NET、Python、PHP官方:Java;社区:C++(不成熟)官方:Java;社区:Python、Go、Rust 等
可用性镜像队列、仲裁队列(Quorum Queue)主从复制主从复制分区(Partition) + 副本(Replica)
单机吞吐量约 10 万/秒约 5 万/秒10 万+/秒(阿里双十一验证)百万级/秒
消息延迟微秒级毫秒级毫秒级毫秒以内
消息确认完整 ACK/NACK 机制支持 JMS ACK 模式基于数据库持久化的消息表基于副本同步和 ISR 机制
功能特性✅ 低延迟、高并发、管理界面丰富✅ 老牌稳定、支持 JMS 规范✅ 高吞吐、阿里生态集成、事务消息✅ 高吞吐、流处理、大数据场景专用
适用场景复杂路由、实时业务(如支付订单)传统企业级系统(Java 生态)电商高并发场景(如秒杀、订单)日志采集、实时分析、流式计算

原生RabbitMQAPI调用:

//=========================================发送消息的代码示例=================================
public class Producer {  
  
    public static void main(String[] args) throws Exception {  
  
        // 创建连接工厂  
        ConnectionFactory connectionFactory = new ConnectionFactory();  
  
        // 设置主机地址  
        connectionFactory.setHost("192.168.200.100");  
  
        // 设置连接端口号:默认为 5672
        connectionFactory.setPort(5672);
  
        // 虚拟主机名称:默认为 /
        connectionFactory.setVirtualHost("/");
  
        // 设置连接用户名;默认为guest  
        connectionFactory.setUsername("guest");
  
        // 设置连接密码;默认为guest  
        connectionFactory.setPassword("123456");
  
        // 创建连接  
        Connection connection = connectionFactory.newConnection();  
  
        // 创建频道  
        Channel channel = connection.createChannel();  
  
        // 声明(创建)队列  
        // queue      参数1:队列名称  
        // durable    参数2:是否定义持久化队列,当 MQ 重启之后还在  
        // exclusive  参数3:是否独占本次连接。若独占,只能有一个消费者监听这个队列且 Connection 关闭时删除这个队列  
        // autoDelete 参数4:是否在不使用的时候自动删除队列,也就是在没有Consumer时自动删除  
        // arguments  参数5:队列其它参数  
        channel.queueDeclare("simple_queue", true, false, false, null);  
  
        // 要发送的信息  
        String message = "你好;小兔子!";  
  
        // 参数1:交换机名称,如果没有指定则使用默认Default Exchange  
        // 参数2:路由key,简单模式可以传递队列名称  
        // 参数3:配置信息  
        // 参数4:消息内容  
        channel.basicPublish("", "simple_queue", null, message.getBytes());  
  
        System.out.println("已发送消息:" + message);  
  
        // 关闭资源  
        channel.close();  
        connection.close();  
  
    }  
  
}
//=========================================接收消息的代码示例=================================
public class Consumer {  
  
    public static void main(String[] args) throws Exception {  
  
        // 1.创建连接工厂  
        ConnectionFactory factory = new ConnectionFactory();  
  
        // 2. 设置参数  
        factory.setHost("192.168.200.100");  
        factory.setPort(5672);  
        factory.setVirtualHost("/");  
        factory.setUsername("guest");
        factory.setPassword("123456");  
  
        // 3. 创建连接 Connection        
        Connection connection = factory.newConnection();  
  
        // 4. 创建Channel  
        Channel channel = connection.createChannel();  
  
        // 5. 创建队列  
        // 如果没有一个名字叫simple_queue的队列,则会创建该队列,如果有则不会创建  
        // 参数1. queue:队列名称  
        // 参数2. durable:是否持久化。如果持久化,则当MQ重启之后还在  
        // 参数3. exclusive:是否独占。  
        // 参数4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉  
        // 参数5. arguments:其它参数。  
        channel.queueDeclare("simple_queue",true,false,false,null);  
  
        // 接收消息  
        DefaultConsumer consumer = new DefaultConsumer(channel){  
  
            // 回调方法,当收到消息后,会自动执行该方法  
            // 参数1. consumerTag:标识  
            // 参数2. envelope:获取一些信息,交换机,路由key...  
            // 参数3. properties:配置信息  
            // 参数4. body:数据  
            @Override  
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  
  
                System.out.println("consumerTag:"+consumerTag);  
                System.out.println("Exchange:"+envelope.getExchange());  
                System.out.println("RoutingKey:"+envelope.getRoutingKey());  
                System.out.println("properties:"+properties);  
                System.out.println("body:"+new String(body));  
  
            }  
  
        };  
  
        // 参数1. queue:队列名称  
        // 参数2. autoAck:是否自动确认,类似咱们发短信,发送成功会收到一个确认消息  
        // 参数3. callback:回调对象  
        // 消费者类似一个监听程序,主要是用来监听消息  
        channel.basicConsume("simple_queue",true,consumer);  
  
    }  
  
}

封装RabbitMQ工具类:

public class ConnectionUtil {  
  
    //跟据自己服务的具体需求进行相关ip+端口的配置(动态变化)
    public static final String HOST_ADDRESS = "192.168.200.100";  
  
    public static Connection getConnection() throws Exception {  
  
        // 定义连接工厂  
        ConnectionFactory factory = new ConnectionFactory();  
  
        // 设置服务地址  
        factory.setHost(HOST_ADDRESS);  
  
        // 端口  
        factory.setPort(5672);  
  
        //设置账号信息,用户名、密码、vhost  
        factory.setVirtualHost("/");  
        factory.setUsername("guest");  
        factory.setPassword("123456");  
  
        // 通过工程获取连接  
        Connection connection = factory.newConnection();  
  
        return connection;  
    }  
  
  
  
    public static void main(String[] args) throws Exception {  
  
        Connection con = ConnectionUtil.getConnection();  
  
        // amqp://guest@192.168.200.100:5672/  
        System.out.println(con);  
  
        con.close();  
  
    }  
  
}

RabbitMQ体系结构:

  • 生产者(Producer):发送消息到 RabbitMQ 的应用程序。

  • 消费者(Consumer):从队列中接收并处理消息的应用程序。

  • 交换机(Exchange):接收生产者消息,根据类型和路由规则将消息分发到队列。

    • 四大类型

类型路由规则典型场景
Direct精确匹配 Routing Key(如 order.pay一对一精准投递(如支付成功通知)
Topic通配符匹配(如 order.**.pay多服务订阅同一类消息(如日志分类)
Fanout广播到所有绑定队列(无视 Routing Key群发通知(如系统公告)
Headers通过消息头(Headers)键值对匹配复杂条件路由(需灵活匹配时)
  • 队列(Queue):定义交换机与队列之间的映射关系,指定路由规则

  • 信道(Channel):复用 TCP 连接的轻量级虚拟链路,减少资源消耗。

  • 虚拟主机(Virtual Host):逻辑隔离的“消息域”,不同 vhost 间资源(交换机、队列)互不干扰。

总结:RabbitMQ 通过 生产者-交换机-队列-消费者 模型实现异步通信,核心在于灵活的路由规则(交换机类型)和可靠性保障(持久化、确认机制)。

基础篇

工作模式

  • 简单模式:最简单的消息队列模型,包含一个生产者、一个队列和一个消费者。生产者直接将消息发送到队列,消费者从队列中接收消息。

  • 工作队列模式(Work Queues):使用默认的交换机,一个队列对应多个消费者,消息按轮询(Round-Robin)或公平分发(Fair Dispatch)分配给消费者,避免单个消费者过载。

  • 发布/订阅模式(Publish/Subscribe):使用 扇形交换机(Fanout Exchange)生产者将消息发送到交换机,交换机将消息广播到所有绑定的队列,每个消费者独立接收一份消息副本。

  • 路由模式(Routing):使用 直接交换机(Direct Exchange)生产者指定消息的 路由键(Routing Key),交换机根据路由键将消息精确匹配到绑定的队列

  • 主题模式(Topics):使用 主题交换机(Topic Exchange)路由键支持通配符匹配(* 匹配一个词,# 匹配多个词)。例如路由键 stock.usd.nyse 可被 *.nysestock.# 订阅。

  • 远程过程调用(RPC):通过消息队列实现远程调用。客户端发送请求消息时附带回调队列和唯一ID,服务端处理完成后将响应发送到回调队列,客户端通过ID匹配响应。

  • 发布者确认(Publisher Confirms):生产者发送消息后,RabbitMQ会异步返回确认(ACK)或未确认(NACK),确保消息成功到达交换机或队列。

工作队列模式(Work Queues)
  1. 并行处理能力

  • 多消费者竞争消费一个队列可绑定多个消费者,消息被并发处理消息只会被其中的一个消费者拿到。

  • 横向扩展:通过增加消费者数量,轻松应对高并发或大流量场景。

  1. 负载均衡机制

  • 轮询分发(Round-Robin)默认策略,均摊消息到所有消费者简单但可能因消费者性能差异导致负载不均。

  • 公平分发(Fair Dispatch):通过 prefetch_count 限制消费者同时处理的消息数,确保“能者多劳”,避免慢消费者堆积任务。

  1. 消息可靠性保障

  • ACK确认机制:消费者处理完成后需手动发送确认(ACK),若处理失败或消费者宕机,消息自动重新入队,确保任务不丢失。

  • 持久化支持:队列和消息均可设置为持久化(durable=true),防止RabbitMQ服务重启后数据丢失。

//================================生产端代码循环发送10次消息================================  
public class Producer {  
  
    public static final String QUEUE_NAME = "work_queue";  
  
    public static void main(String[] args) throws Exception {  
  
        Connection connection = ConnectionUtil.getConnection();  
  
        Channel channel = connection.createChannel();  
  
        channel.queueDeclare(QUEUE_NAME,true,false,false,null);  
  
        for (int i = 1; i <= 10; i++) {  
  
            String body = i+"hello rabbitmq~~~";  
  
            channel.basicPublish("",QUEUE_NAME,null,body.getBytes());  
  
        }  
  
        channel.close();  
  
        connection.close();  
  
    }  
  
}
 
//================================消费端代码竞争消息================================
 public class Consumer1/2 {  
  
    static final String QUEUE_NAME = "work_queue";  
  
    public static void main(String[] args) throws Exception {  
  
        Connection connection = ConnectionUtil.getConnection();  
  
        Channel channel = connection.createChannel();  
  
        channel.queueDeclare(QUEUE_NAME,true,false,false,null);  
  
        Consumer consumer = new DefaultConsumer(channel){  
  
            @Override  
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  
  
                System.out.println("Consumer1 body:"+new String(body));  
  
            }  
  
        };  
  
        channel.basicConsume(QUEUE_NAME,true,consumer);  
  
    }  
  
}

发布/订阅模式(Publish/Subscribe)
  1. 消息广播机制

  • 扇形交换机(Fanout Exchange)驱动生产者将消息发送到交换机,交换机会将消息无条件广播到所有与其绑定的队列,每个队列的消费者都能收到一份消息副本。

  • 一对多分发一条消息可被多个消费者同时接收,适用于需要广泛触达的场景(如系统通知、日志收集)。

//====================================生产者代码====================================
public class Producer {  
  
    public static void main(String[] args) throws Exception {  
  
      // 1、获取连接  
        Connection connection = ConnectionUtil.getConnection();  
  
      // 2、创建频道  
        Channel channel = connection.createChannel();  
  
        // 参数1. exchange:交换机名称  
        // 参数2. type:交换机类型  
        //     DIRECT("direct"):定向  
        //     FANOUT("fanout"):扇形(广播),发送消息到每一个与之绑定队列。  
        //     TOPIC("topic"):通配符的方式  
        //     HEADERS("headers"):参数匹配  
        // 参数3. durable:是否持久化  
        // 参数4. autoDelete:自动删除  
        // 参数5. internal:内部使用。一般false  
        // 参数6. arguments:其它参数  
        String exchangeName = "test_fanout";  
  
        // 3、创建交换机  
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null);  
  
        // 4、创建队列  
        String queue1Name = "test_fanout_queue1";  
        String queue2Name = "test_fanout_queue2";  
  
        channel.queueDeclare(queue1Name,true,false,false,null);  
        channel.queueDeclare(queue2Name,true,false,false,null);  
  
        // 5、绑定队列和交换机  
      // 参数1. queue:队列名称  
      // 参数2. exchange:交换机名称  
      // 参数3. routingKey:路由键,绑定规则  
      //     如果交换机的类型为fanout,routingKey设置为""  
        channel.queueBind(queue1Name,exchangeName,"");  
        channel.queueBind(queue2Name,exchangeName,"");  
  
        String body = "日志信息:张三调用了findAll方法...日志级别:info...";  
  
        // 6、发送消息  
        channel.basicPublish(exchangeName,"",null,body.getBytes());  
  
        // 7、释放资源  
        channel.close();  
        connection.close();  
  
    }  
  
}
 
//====================================消费者1代码===================================
public class Consumer1 {  
  
    public static void main(String[] args) throws Exception {  
  
        Connection connection = ConnectionUtil.getConnection();  
  
        Channel channel = connection.createChannel();  
  
        String queue1Name = "test_fanout_queue1";  
  
        channel.queueDeclare(queue1Name,true,false,false,null);  
  
        Consumer consumer = new DefaultConsumer(channel){  
  
            @Override  
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  
  
                System.out.println("body:"+new String(body));  
                System.out.println("队列 1 消费者 1 将日志信息打印到控制台.....");  
  
            }  
  
        };  
  
        channel.basicConsume(queue1Name,true,consumer);  
  
    }  
  
}
​
//====================================消费者2代码===================================
public class Consumer2 {  
  
    public static void main(String[] args) throws Exception {  
  
        Connection connection = ConnectionUtil.getConnection();  
  
        Channel channel = connection.createChannel();  
  
        String queue2Name = "test_fanout_queue2";  
  
        channel.queueDeclare(queue2Name,true,false,false,null);  
  
        Consumer consumer = new DefaultConsumer(channel){  
  
            @Override  
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  
  
                System.out.println("body:"+new String(body));  
                System.out.println("队列 2 消费者 2 将日志信息打印到控制台.....");  
  
            }  
  
        };  
  
        channel.basicConsume(queue2Name,true,consumer);  
  
    }  
  
}

路由模式(Routing)
  1. 基于路由键的精确分发

  • 直接交换机(Direct Exchange)驱动生产者发送消息时需指定路由键(Routing Key),交换机会将消息精确匹配到绑定相同路由键的队列。

  • 条件性路由:仅当队列绑定的路由键与消息的路由键完全一致时,消息才会被投递,实现按条件分发。

  1. 灵活的消息过滤

  • 多队列绑定不同路由键可为同一交换机绑定多个队列,每个队列声明不同的路由键(例如 errorinfowarning,实现消息分类处理。

  • 生产者可控性:生产者通过指定路由键决定消息的目标队列,无需消费者干预。

//================================生产者代码========================================  
public class Producer {  
  
    public static void main(String[] args) throws Exception {  
  
      Connection connection = ConnectionUtil.getConnection();  
  
      Channel channel = connection.createChannel();  
  
      String exchangeName = "test_direct";  
  
      // 创建交换机  
      channel.exchangeDeclare(exchangeName,BuiltinExchangeType.DIRECT,true,false,false,null);  
  
      // 创建队列  
      String queue1Name = "test_direct_queue1";  
      String queue2Name = "test_direct_queue2";  
  
      // 声明(创建)队列  
      channel.queueDeclare(queue1Name,true,false,false,null);  
      channel.queueDeclare(queue2Name,true,false,false,null);  
  
      // 队列绑定交换机  
      // 队列1绑定error  
      channel.queueBind(queue1Name,exchangeName,"error");  
  
      // 队列2绑定info error warning  
      channel.queueBind(queue2Name,exchangeName,"info");  
      channel.queueBind(queue2Name,exchangeName,"error");  
      channel.queueBind(queue2Name,exchangeName,"warning");  
  
        String message = "日志信息:张三调用了delete方法.错误了,日志级别warning";  
  
        // 发送消息  
        channel.basicPublish(exchangeName,"warning",null,message.getBytes());  
        System.out.println(message);  
  
      // 释放资源  
        channel.close();  
        connection.close();  
  
    }  
  
}
 
//===============================消费者1代码========================================
public class Consumer1 {  
  
    public static void main(String[] args) throws Exception {  
  
        Connection connection = ConnectionUtil.getConnection();  
  
        Channel channel = connection.createChannel();  
  
        String queue1Name = "test_direct_queue1";  
  
        channel.queueDeclare(queue1Name,true,false,false,null);  
  
        Consumer consumer = new DefaultConsumer(channel){  
  
            @Override  
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  
  
                System.out.println("body:"+new String(body));  
                System.out.println("Consumer1 将日志信息打印到控制台.....");  
  
            }  
  
        };  
  
        channel.basicConsume(queue1Name,true,consumer);  
  
    }  
  
}
​
//===============================消费者2代码========================================
public class Consumer2 {  
  
    public static void main(String[] args) throws Exception {  
  
        Connection connection = ConnectionUtil.getConnection();  
  
        Channel channel = connection.createChannel();  
  
        String queue2Name = "test_direct_queue2";  
  
        channel.queueDeclare(queue2Name,true,false,false,null);  
  
        Consumer consumer = new DefaultConsumer(channel){  
  
            @Override  
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  
  
                System.out.println("body:"+new String(body));  
                System.out.println("Consumer2 将日志信息存储到数据库.....");  
  
            }  
  
        };  
  
        channel.basicConsume(queue2Name,true,consumer);  
  
    }  
  
}

主题模式(Topics)
  1. 基于通配符的灵活路由

  • 主题交换机(Topic Exchange)驱动:生产者发送消息时指定带层级的路由键(Routing Key,如 order.europe.paid),消费者通过绑定键(Binding Key)使用通配符* 匹配一个词,# 匹配零或多个词)订阅消息。

    • 示例:绑定键 *.europe.* 可匹配 order.europe.paidshipment.europe.delayed

    • 绑定键 stock.# 可匹配 stock.usd.nysestock.eur.london.close

  • 多维度匹配支持复杂的分层路由逻辑,适用于需要按多条件分类的场景。

  1. 高度动态的消息过滤

  • 灵活订阅:消费者可动态定义绑定键的通配规则,按需订阅特定模式的消息,无需修改生产者逻辑。

  • 精准与模糊匹配结合既能精确匹配固定路由键,也能通过通配符覆盖一类消息。

//================================生产者代码========================================
public class Producer {  
  
    public static void main(String[] args) throws Exception {  
  
        Connection connection = ConnectionUtil.getConnection();  
  
        Channel channel = connection.createChannel();  
  
        String exchangeName = "test_topic";  
  
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC,true,false,false,null);  
  
        String queue1Name = "test_topic_queue1";  
        String queue2Name = "test_topic_queue2";  
  
        channel.queueDeclare(queue1Name,true,false,false,null);  
        channel.queueDeclare(queue2Name,true,false,false,null);  
  
        // 绑定队列和交换机  
      // 参数1. queue:队列名称  
      // 参数2. exchange:交换机名称  
      // 参数3. routingKey:路由键,绑定规则  
      //      如果交换机的类型为fanout ,routingKey设置为""  
        // routing key 常用格式:系统的名称.日志的级别。  
        // 需求: 所有error级别的日志存入数据库,所有order系统的日志存入数据库  
        channel.queueBind(queue1Name,exchangeName,"#.error");  
        channel.queueBind(queue1Name,exchangeName,"order.*");  
        channel.queueBind(queue2Name,exchangeName,"*.*");  
  
        // 分别发送消息到队列:order.info、goods.info、goods.error  
        String body = "[所在系统:order][日志级别:info][日志内容:订单生成,保存成功]";  
        channel.basicPublish(exchangeName,"order.info",null,body.getBytes());  
  
        body = "[所在系统:goods][日志级别:info][日志内容:商品发布成功]";  
        channel.basicPublish(exchangeName,"goods.info",null,body.getBytes());  
  
        body = "[所在系统:goods][日志级别:error][日志内容:商品发布失败]";  
        channel.basicPublish(exchangeName,"goods.error",null,body.getBytes());  
  
        channel.close();  
        connection.close();  
  
    }  
  
}
 
//================================消费者1代码=======================================
public class Consumer1 {  
  
    public static void main(String[] args) throws Exception {  
  
        Connection connection = ConnectionUtil.getConnection();  
  
        Channel channel = connection.createChannel();  
  
        String QUEUE_NAME = "test_topic_queue1";  
  
        channel.queueDeclare(QUEUE_NAME,true,false,false,null);  
  
        Consumer consumer = new DefaultConsumer(channel){  
  
            @Override  
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  
  
                System.out.println("body:"+new String(body));  
  
            }  
  
        };  
  
        channel.basicConsume(QUEUE_NAME,true,consumer);  
  
    }  
  
}
​
//================================消费者2代码=======================================
public class Consumer2 {  
  
    public static void main(String[] args) throws Exception {  
  
        Connection connection = ConnectionUtil.getConnection();  
  
        Channel channel = connection.createChannel();  
  
        String QUEUE_NAME = "test_topic_queue2";  
  
        channel.queueDeclare(QUEUE_NAME,true,false,false,null);  
  
        Consumer consumer = new DefaultConsumer(channel){  
  
            @Override  
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  
  
                System.out.println("body:"+new String(body));  
  
            }  
  
        };  
  
        channel.basicConsume(QUEUE_NAME,true,consumer);  
  
    }  
  
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2373199.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Quorum协议原理与应用详解

一、Quorum 协议核心原理 基本定义 Quorum 是一种基于 读写投票机制 的分布式一致性协议&#xff0c;通过权衡一致性&#xff08;C&#xff09;与可用性&#xff08;A&#xff09;实现数据冗余和最终一致性。其核心规则为&#xff1a; W&#xff08;写成功副本数&#xff09; …

vue搭建+element引入

vue搭建element 在使用Vue.js开发项目时&#xff0c;经常会选择使用Element UI作为UI框架&#xff0c;因为它提供了丰富的组件和良好的设计&#xff0c;可以大大提高开发效率。以下是如何在Vue项目中集成Element UI的步骤&#xff1a; 1. 创建Vue项目 如果你还没有创建Vue项…

食物数据分析系统vue+flask

食物数据分析系统 项目概述 食物数据分析系统是一个集食物营养成分查询、对比分析和数据可视化于一体的Web应用。系统采用前后端分离架构&#xff0c;为用户提供食物营养信息检索、食物对比和营养分析等功能&#xff0c;帮助用户了解食物的营养成分&#xff0c;做出更健康的饮…

SPDK NVMe of RDMA 部署

使用SPDK NVMe of RDMA 实现多NVMe设备共享 一、编译、安装spdk 1.1、下载 1.1.1 下载spdk源码 首先&#xff0c;我们需要从GitHub上克隆SPDK的源码仓库。打开终端&#xff0c;输入以下命令&#xff1a; git clone -b v22.01 https://github.com/spdk/spdk.git cd spdk1.1.2…

【Redis】缓存和分布式锁

&#x1f525;个人主页&#xff1a; 中草药 &#x1f525;专栏&#xff1a;【中间件】企业级中间件剖析 一、缓存&#xff08;Cache&#xff09; 概述 Redis最主要的应用场景便是作为缓存。缓存&#xff08;Cache&#xff09;是一种用于存储数据副本的技术或组件&#xff0c;…

OpenLayers 精确经过三个点的曲线绘制

OpenLayers 精确经过三个点的曲线绘制 根据您的需求&#xff0c;我将提供一个使用 OpenLayers 绘制精确经过三个指定点的曲线解决方案。对于三个点的情况&#xff0c;我们可以使用 二次贝塞尔曲线 或 三次样条插值&#xff0c;确保曲线精确通过所有控制点。 实现方案 下面是…

大模型微调指南之 LLaMA-Factory 篇:一键启动LLaMA系列模型高效微调

文章目录 一、简介二、如何安装2.1 安装2.2 校验 三、开始使用3.1 可视化界面3.2 使用命令行3.2.1 模型微调训练3.2.2 模型合并3.2.3 模型推理3.2.4 模型评估 四、高级功能4.1 分布训练4.2 DeepSpeed4.2.1 单机多卡4.2.2 多机多卡 五、日志分析 一、简介 LLaMA-Factory 是一个…

GLPK(GNU线性规划工具包)介绍

GLPK全称为GNU Linear Programming Kit(GNU线性规划工具包)&#xff0c;可从 https://sourceforge.net/projects/winglpk/ 下载源码及二进制库&#xff0c;最新版本为4.65。也可从 https://ftp.gnu.org/gnu/glpk/ 下载&#xff0c;仅包含源码&#xff0c;最新版本为5.0。 GLPK是…

PCB设计实践(十三)PCB设计中差分线间距与线宽设置的深度解析

一、差分信号的基本原理与物理背景 差分信号技术通过两条等幅反相的传输线实现信号传输&#xff0c;其核心优势体现在电磁场耦合的对称性上。根据麦克斯韦方程组的对称解原理&#xff0c;两条线产生的电磁场在远场区域相互抵消&#xff0c;形成以下特性&#xff1a; 1. 共模噪…

2025python学习笔记

一.Python语言基础入门 第一章 01.初识Python Python的起源&#xff1a; 1989年&#xff0c;为了打发圣诞节假期&#xff0c;Gudio van Rossum吉多范罗苏姆&#xff08;龟叔&#xff09;决心开发一个新的解释程序&#xff08;Python维形&#xff09;1991年&#xff0c;第一个…

【并发编程】基于 Redis 手写分布式锁

目录 一、基于 Redis 演示超卖现象 1.1 Redis 超卖现象 1.2 超卖现象解决方案 二、Redis 的乐观锁机制 2.1 原生客户端演示 2.2 业务代码实现 三、单机部署 Redis 实现分布式锁 3.1 分布式锁的演变和升级 3.2 setnx 实现分布式锁 3.2.1 递归调用实现分布式锁 3.2.2 循…

Jsp技术入门指南【十二】自定义标签

Jsp技术入门指南【十二】自定义标签 前言一、什么是标签二、标签的类型有哪些&#xff1f;1. 空标签2. 带有属性的标签3. 带主体的标签 三、自定义标签的部件3.1 自定义标签的四步骤3.2 标签处理程序3.3 自定义标签的开发及使用步骤第一步&#xff1a;创建标签助手类第二步&…

Java—— 泛型详解

泛型概述 泛型是JDK5中引入的特性&#xff0c;可以在编译阶段约束操作的数据类型&#xff0c;并进行检查。 泛型的格式&#xff1a;<数据类型> 注意&#xff1a;泛型只能支持引用数据类型。 泛型的好处 没有泛型的时候&#xff0c;可以往集合中添加任意类型的数据&#x…

GPT-4o, GPT 4.5, GPT 4.1, O3, O4-mini等模型的区别与联系

大模型时代浪潮汹涌,作为其中的领军者,OpenAI 其推出的系列模型以强大的能力深刻影响着整个行业,并常常成为业界其他公司对标和比较的基准。因此,深入了解 OpenAI 的大模型,不仅是为了使用它们,更是为了理解当前大模型的能力边界和发展趋势,这对于我们评估和选择其他各类…

Kubernetes生产实战(十二):无工具容器网络连接数暴增指南

当线上容器突然出现TCP连接数暴涨&#xff0c;而容器内又没有安装任何调试工具时&#xff0c;如何快速定位问题&#xff1f;本文将分享一套经过大型互联网公司验证的排查方案&#xff0c;涵盖从快速应急到根因分析的全流程。 一、快速锁定问题容器 查看pod 连接数方式&#x…

MySQL的Order by与Group by优化详解!

目录 前言核心思想&#xff1a;让索引帮你“排好序”或“分好组”Part 1: ORDER BY 优化详解1.1 什么是 Filesort&#xff1f;为什么它慢&#xff1f;1.2 如何避免 Filesort&#xff1f;—— 利用索引的有序性1.3 EXPLAIN 示例 (ORDER BY) Part 2: GROUP BY 优化详解2.1 什么是…

软件测试——用例篇(3)

目录 一、设计测试用例的具体方法 1.1等价类 1.1.1等价类概念介绍 1.1.2等价类分类 1.2边界值 1.2.1边界值分析法 1.2.2边界值分类 1.3正交法 1.3.1正交表 1.3.2正交法设计测试用例步骤 1.4判定表法 1.4.1判定表 1.4.2判定表方法设计测试用例 1.5 场景法 1.6错误…

在 Win11 下安装 Wireshark 的详细步骤

目录 一、了解 Wireshark1. 作用和功能2. 使用步骤 二、下载安装包三、运行安装包四、使用 Wireshark1. 抓包2. 窗口介绍3. 过滤器&#xff08;显示 / 捕获过滤器&#xff09;4. 保存过滤后的报文1&#xff09;显示过滤器表达式&#xff08;了解&#xff09;2&#xff09;过滤表…

AI编程: 使用Trae1小时做成的音视频工具,提取音频并识别文本

背景 在上个月&#xff0c;有网页咨询我怎么才能获取视频中的音频并识别成文本&#xff0c;我当时给他的回答是去问一下AI&#xff0c;让AI来给你答案。 他觉得我在敷衍他&#xff0c;大骂了我一顿&#xff0c;大家觉得我的回答对吗&#xff1f; 小编心里委屈&#xff0c;我…

RTC实时时钟DS1337S/PT7C4337WEX国产替代FRTC1337S

NYFEA徕飞公司的FRTC1337S串行实时时钟是一种低功耗时钟/日历&#xff0c;被设计成可以无缝替代市场上流行的DS1337S和PT7C4337WEX(SOP8)两种型号, 具有两个可编程的时钟闹钟和一个可编程方波输出。 地址和数据通过2线双向总线串行传输。时钟/日历提供秒、分钟、小时、天、日期…