一、几个概念
1. 消费者组
消费者组:一个消费者组包含多个消费者。同一个消费组的消费者,分别消费不同的partition,便于加快消费。
kafka约定在一个消费者组中,对于同一个topic,每个consumer会分配不同partition,即topic下的一个patition只能被同一个消费者组的一个消费者消费,所以当消费组中的消费者个数大于partition个数时,会存在消费者闲置的情况,因为分不到partition.
2. 点对点(P2P,point to point)和发布订阅模型(Publish/Sub)
消息中间件模型有两种经典模型:点对点(P2P,point to point)和发布订阅模型(Publish/Sub)
点对点模式是基于队列,消息生产者将消息发送到队列,消息消费者从队列拉取消息
发布订阅模式定义了如何向一个内容节点发布和订阅消息,这个内容节点称为主题(Topic),主题可以认为是消息传递的中介,消息发布者将消息发布到某个主题,而消息订阅者从主题订阅消息
kafka通过消费者和消费者组的契合可以实现点对点(P2P,point to point)和发布订阅模型(Publish/Sub):
如消费者都属于同一个消费组,那么partition的消息发给这些消费者时,一条消息只发给一个消费者,不会多发,相当于点对点模式
消费者隶属于不同消费组,那么同一个partition可能发给不同消费者组的多个消费者手中,相当于发布订阅模式;
3. 消费者再均衡:
同一个消费组内的消费者消费不同分区,当消费者组内再增加消费者时,原来的消费者对应的partition会被回收掉,然后重新分配给最新的所有消费者,这就是再均衡。
        consumer.subscribe(Collections.singletonList("topic-module"), new ConsumerRebalanceListener() {
            @Override
            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                System.err.println("回收partitions:"+partitions);
            }
            @Override
            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                System.err.println("再分配partitions:"+partitions);
            }
        }); 
二、消费者多线程模型
kafka的consumer是非线程安全的,如果多个线程操作一个consumer实例,会报异常;因为KafkaConsumer中定义了一个acquire方法用来检测是否只有一个线程在操作,当检测到有其他线程时,会抛出ConcurrentModifactionException;
KafkaConsumer在执行所有动作时都会先执行acquire方法检测是否线程安全。
不过要实现消费者的多线程模型,也是有办法的
1. 多线程模型1

一个partition对应一个consumer,一个consumer也只运行在一个线程中。
首先新建一个 consumer线程类KafkaConsumerMultiThread1.java
public class KafkaConsumerMultiThread1 implements Runnable {
    private KafkaConsumer<String, String> consumer;
    private volatile boolean isRunning = true;
    private String threadName;
    private static AtomicInteger num = new AtomicInteger(0);
    public KafkaConsumerMultiThread1(Properties properties, String topic) {
        this.consumer = new KafkaConsumer<String, String>(properties);
        this.consumer.subscribe(Collections.singletonList(topic));
        this.threadName = "consumer-thread-" + num.getAndIncrement();
        System.err.println(this.threadName + " started ");
    }
    @Override
    public void run() {
        try{
            while (isRunning){
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
                for(TopicPartition topicPartition : records.partitions()){
                    List<ConsumerRecord<String, String>> consumerRecords = records.records(topicPartition);
                    int size = consumerRecords.size();
                    for(int i=0; i<size; i++){
                        ConsumerRecord<String, String> record = consumerRecords.get(i);
                        String value = record.value();
                        long messageOffset = record.offset();
                        System.err.println("当前消费者:"+ threadName
                                + ",消息内容:" + value
                                + ", 消息的偏移量: " + messageOffset
                                + "当前线程:" + Thread.currentThread().getName());
                    }
                }
            }
        }finally {
            if(consumer != null){
                consumer.close();
            }
        }
    }
    public boolean isRunning() {
        return isRunning;
    }
    public void setRunning(boolean running) {
        isRunning = running;
    }
}
 
然后再创建一个用于生成consumer线程的线程池:
public class MultiThreadTest {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.31.101:9092");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "topic-module");
        properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);
        // 改成手动提交
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        int coreSize = 5;
        ExecutorService executorService = Executors.newFixedThreadPool(coreSize);
        for(int i=0; i<coreSize; i++){
            executorService.execute(new KafkaConsumerMultiThread1(properties, "topic-module"));
        }
    }
} 
之后在生产者端发送消息,可以观察到有五个线程共同处理发来的消息:
2. 消费者多线程模型2

采用Master-worker模型(master负责分发,worker负责处理)。master初始化阶段,创建多个worker实例,然后master负责拉取消息,并负责分发给各个worker执行。各个worker执行完成后,将执行结果回传给master,由master统一进行commit。
这里的master为consumer,在 consumer master 中,会有多个任务队列,用来接收生产者端的消息,consumer会创建多个worker实例,并且将任务队列中的消息交给每个队列对应的worker对象处理,,worker处理完成的结果放到任务结果集中,然后master单线程最后做 consumer.commitSync()或者consumer.commitAsync()提交操作。这里的consumer作为master是单线程的,worker是多线程的。
这样避免了多个线程操作consumer,避免出现异常ConcurrentModifacationException



















