
一 、kafka核心总控制器(Controller)
 
 在Kafka集群中会有一个或者多个broker,其中有一个broker会被选举为控制器(Kafka Controller),它负责管理整个集群中所有分区和副本的状态。  
 
 
 
 作用:leader副本出现故障时,选举新的leder; 
 
             检测到某个分区的ISR发生变化时,通知所有borker更新元数据; 
 
             分区数量发生变化时,通知其它节点感应到新分区; 
 
 
 (*ISR,已与leader同步的副本的集合) 
 
 
 
 
 
 Controller的选举 
 
          集群启动时每个broker都会尝试在zookeeper上创建一个controller临时节点,zk会保证有且仅有一个创建成功;其它节点会一直监听这个临时节点,如果broker宕机,其它节点会再次创建临时节点,创建成功的成为controller; 
 
 
 controller相对于其它broker不同的职责: 
 
                  1、监听broker的变化: 
 为Zookeeper中的/brokers/ids/节中添加BrokerChangeListener节点,处理broker增减的变化; 
 
                  2、监听topic变化:为Zookeeper中的Brokers/topics节点添加TopicChangeListener,用来处理topic增减的变化;TopicDeleteinoListener,处理删除topic的动作; 
 
                  3、从zookeeper中读取当前所有topic、partition以及broker相关信息并进行相应的管理;对于所有topic所对应的Zookeeper中的/brokers/topics/[topic]节点添加PartitionModificationsListener,用来监听topic中的分区分配变化; 
 
                  4、更新集群的元数据信息,同步到其它普通的broker节点中; 
 
 
二、Partition副本选举Leader机制

初始化patitiion 会挑选编号最大的副本为leader;
          Controller感知到分区所在的broker挂了(通过监听zk中的节点),controller会从ISR(已同步的数据集)里挑第一个broker作为leader(就是同步最多数据的副本); 
 
unclean.leader.election.enable=false 代表已同步的副本没有全部挂掉,相反已同步的副本全部挂掉,则从未同步的副本中选出leader,这种情况下的副本会 丢失消息 ;
 
unclean.leader.election.enable=false 代表已同步的副本没有全部挂掉,相反已同步的副本全部挂掉,则从未同步的副本中选出leader,这种情况下的副本会 丢失消息 ;
 
         副本进入ISR的条件: 
 
 
                         1、副本节点不能产生分区,必须与zk和leader保持联通 
 
 
                         2、副本能复制leader的所有写操作,并且不能落后太多。(副本与leader副本数据更新时间由replica.lag.time.max.ms配置决定,超出这个时间未同步,移除ISR列表) 
 
 
三、消费者消费消息的offset记录
 
         每个消费者会定期将自己消费分区的offset提交给kafka内部的topic,提交的key是consumerGroupId+topic+分区号,value解释当前offset的值;kafka会定期清理topic的消息,最后保留最新的那条数据; 
 
 
 通过增加更多的分区,提高机器的并发量; 
 
 
四、消费者Rebalance机制
          rebalance就是说如果消费组里的消费者数量有变化,kafka会重新分配消费者与消费分区的关系;(只针对未指定消费分区的情况,指定了分区不会进行重新分配) 
 
          触发条件: 
 
                  1、消费组中的消费者数量发生变化 
 
                  2、增加了topic的分区 
 
                  3、消费组订阅了更多的topic 
 
          rebalance过程中无法消费消息,如果集群内节点较多,此过程会相当耗时; 
 
 
        
   Rebalance的工程: 
 
 
 
                  1、选择组协调器(GroupCoordinator):每个消费组都会选择一个broker作为自己的组协调器(coordinator),负责监控这个消费组里的所有消费者心跳,判断是否宕机;消费组中的每个消费者都会启动时向kafka集群中的某个节点发送findCoordinatorRequest请求来查找对应的组协调器; 
 
                  选择公式:hash(consumer group id)%_consumer_offsets主题分区数; 
 
                  2、加入消费组:成功找到组协调器后加入消费组,发送joinGroupRequest请求,组协调器会将第一个加入的消费者选为leader(消费组协调器),把consumer group情况发送给这个leader,这个leader负责指定分区方案; 
 
                  3、方案同步:消费组leader(消费组协调器)向groupCoordinator发送SyncGroupRequest,groupCoordinator将方案下发给所有消费者,各个消费者将与指定的分区leader建立连接进行消费 
 
 
 
 Rabalance分区分配策略:range、round-robin、stocky 
 
         假设一个主题十分分区,现在又三个消费者: 
 
                  
 rang策略:就是按照分区序号排序,假设 n=分区数/消费者数量 = 3, m=分区数%消费者数量 = 1,第一个消费者得到的分区为n+1(0~3),第二个消费者n(4~6),第三个消费者(7~9); 
 
                 
 round-robin轮训策略:第一个消费者(0,3,6,9),第二个消费者(1,4,7),第三个消费者(2,5,8) 
 
                
 stocky与rouond-robin初试分配类似,  在rebalance的时候需要保证两个原则: 
 
                          1、分区的分配要尽可能均匀 
 
                          2、分区的分配尽可能与上次分配保持相同; 
 
                          第一个目标优于第二个目标;比如第三个消费者挂了,原有的分配,第一个消费者(0,3,6,9),第二个消费者(1,4,7),第三个消费者(2,5,8);重新分配会将2分配给第一个消费者,5,8分给第二个消费者; 
 
 
五、消息推送机制
          1、写入方式producer push消息到broker,消息会被添加到patition最后,顺序写入磁盘(顺序写入效率比随机高)保证吞吐量; 
 
          2、消息路由机制: 
 
                          a、指定patition,直接使用 
 
                          b、未指定patition指定key,通过对key的hash选出patition 
 
                          c、=都为指定,轮训 
 
 
          3、写入流程 
 
 
 
 
  1、producer 先从 zookeeper 的 "/brokers/.../state" 节点找到该 partition 的 leader  
 
 
  
  2、producer 将消息发送给该 leader  
 
 
  
  3、leader 将消息写入本地 log 
  4. followers 从 leader pull 消息,写入本地 log 后 向leader 发送 ACK  
 
 
  
  5、leader 收到所有 ISR 中的 replica 的 ACK 后,增加 HW(high watermark,最后 commit 的 offset) 并向 producer 发送 ACK 
  
   
 
 
 
   六、HW与LEO详解(broker宕机后消息的保障) 
 
 
 
           HW俗称高水位,HighWatermark的缩写,取一个partition中对应的最小的LEO(log-end-offset)作为HW,consumer最多只能消费到HW所在位置。每个副本都有HW,leader和follower各自负责更新自己的HW。leader写入消息,consumer不能立刻消费,leader会等待该消息被所有ISR中的replicas同步更新后,consumer才能消费,这样即使broker挂了,新选举出来的消息仍然可以充新的leader中获取;(broker内部拉去消息,没有HW的限制) 
 
 
  
 
 
   kafka 的复制并非是完全同步复制,也并非是异步复制。同步复制要求所有的副本全部复制完成才会commit,这种复制性能较低;异步复制又不能保证消息不丢失;kafka的复制要结合提交的acks参数讨论; 
 
 
  
 六、日志分段存储
   kafka一个分区的消息数据对应存储在一个文件夹下,以topic名称+分区号命名,消息在分区内是分段(segment)存储的,每段消息都存储在不一样的log文件里,方便快速删除,每个分段最大不能超过1g;方便加载到内存中; 
 
 
  
  
   部分消息的 
   offset 
   索引文件, 
   kafka 
   每次往分区发 
   4K 
   ( 
   可配置 
   ) 
   消息就会记录一条当前消息的 
   offset 
   到 
   index 
   文件,如果要定位消息的offset 
   会先在这个文件里快速定位,再去 
   log 
   文件里找具体消息  
  
 
   
   00000000000000000000. 
   index  
  
 
   
   消息存储文件,主要存 
   offset 
   和消息体  
  
 
   
   00000000000000000000. 
   log  
  
 
   
   消息的发送时间索引文件, 
   kafka 
   每次往分区发 
   4K 
   ( 
   可配置 
   ) 
   消息就会记录一条当前消息的发送时间戳与对应的 
   offset 
   到 
   timeindex 
   文件,如果需要按照时间来定位消息的 
   offset 
   ,会先在这个文件里查找  
  
 
   
   00000000000000000000. 
   timeindex  
  
 
   
   文件如下: 
  
 
   
   00000000000005367851. 
   index  
  
 
   
   00000000000005367851. 
   log  
  
 
   
   00000000000005367851. 
   timeindex  
  
 
   
   00000000000009936472. 
   index  
  
 
   
   00000000000009936472. 
   log  
  
 
   
   00000000000009936472. 
   timeindex 
  
 
   
   
   kafka在zookeeper节点数据 
  
 
   
 
  


















