Kafka多生产者消费者自动配置

news2025/7/26 7:57:48

背景

项目中不同的业务可能会使用多个kafka,按默认的Kafka配置,最多是支持消费者和生产者使用不同的Kafka,如果两个生产者使用不同的Kafka则需要自定义配置,生成对应的bean。

解决方案

多生产者,多消费者,使用不同的前缀来区分,根据前缀来区分配置,加载配置,实例化对应前缀的KafkaProperties kafkaListenerContainerFactory KafkaTemplate ,每个bean的名称都是带前缀的,使用的时候,按照需要注入对应的bean。

YML配置

spring:
  kafka:
    product:
      bootstrap-servers: 55.1.40.231:9091,55.6.70.231:9091,55.5.70.231:9091
      properties:
        sasl:
          mechanism: PLAIN
          jaas:
            config: org.apache.kafka.common.security.plain.PlainLoginModule required username="user" password="xxxx";
        security:
          protocol: SASL_PLAINTEXT
      producer:
        retries: 0
        acks: -1
        batch-size: 16384
        linger-ms: 0
        buffer-memory: 33554432
      consumer:
        group-id: consumer-group-id
        enable-auto-commit: true
        auto-commit-interval-ms: 1000
        auto-offset-reset: latest
        session-timeout-ms: 120000 
        request-timeout-ms: 180000
    order:
      bootstrap-servers: 55.10.33.132:9091,55.10.33.132:9092,55.10.33.132:9093,55.10.33.132:9094,55.10.33.132:9095,55.10.33.132:9096,55.10.33.132:9097,55.10.33.132:9098,55.10.33.132:9099,55.10.33.132:9100
      properties:
        sasl:
          mechanism: PLAIN
          jaas:
            config: org.apache.kafka.common.security.plain.PlainLoginModule required username="user_order" password="xxxxxxx";
        security:
          protocol: SASL_PLAINTEXT
      producer:
        retries: 3
        acks: -1
        batch-size: 16384
        linger-ms: 0
        buffer-memory: 33554432
      consumer:
        group-id: order-migration
        enable-auto-commit: true
        auto-commit-interval-ms: 1000
        auto-offset-reset: latest
        session-timeout-ms: 120000
        request-timeout-ms: 180000

自定义KafkaProperties

使用KafkaProperties接收配置,但是需要修改下前缀,但是KafkaProperties源码改不了,新写一个类继承KafkaProperties

@Component
@Primary
@ConfigurationProperties(prefix = "spring.kafka.order")
public class OrderKafkaProperties extends KafkaProperties{

}

如果没有Kafka默认配置,Kafka会自动实例化默认的KafkaProperties,如果有多个KafkaProperties实例,需要指定一个首选的bean,否则KafkaAnnotationDrivenConfiguration类中构造函数会报错。

所以在其中一个加上@Primary注解

KafkaTemplate和KafkaListenerContainerFactory配置

有了KafkaProperties就可以生成KafkaTemplateKafkaListenerContainerFactory实例

@Configuration
public class KafkaConfig {

    @Autowired
    private OrderKafkaProperties orderKafkaProperties;

    @Bean("orderKafkaTemplate")
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    private ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    private Map<String, Object> producerConfigs() {
        return contractKafkaProperties.buildProducerProperties();
    }


    @Bean("orderKafkaListenerContainerFactory")
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(10);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }

    private ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    private Map<String, Object> consumerConfigs() {
        return contractKafkaProperties.buildConsumerProperties();
    }
}

这样就可以在其他地方直接使用了,生产者就直接@Autowired orderKafkaTemplate,如果是消费者,直接在@KafkaListenercontainerFactory属性指定orderKafkaListenerContainerFactory

如果有多个生产者消费者,就增加对应的配置即可。这样简化了配置的读取,除了加了前缀,其他的配置都是和Kafka默认配置一样的,复用Springboot的属性绑定,后续如果有其他配置,加上后能直接生效,无需修改代码。如果修改配置的结构需要代码中读取,然后手动设置,后期修改YML配置和代码都需要修改,比较麻烦。

方案演进

上述方案,如果需要新增一个Kafka的配置,需要新增一个前缀,然后新增对应配置代码,来生成KafkaPropertiesKafkaTemplateKafkaListenerContainerFactory实例,但是不同的前缀生成不同的实例代码都是重复的,而且所有的前缀、属性值都由YML配置可以得到,所以代码中生成带前缀的bean可以由代码自动生成,并注册到spring容器中。根据这个思路,写一个BeanFactoryAware的实现类。(Aware接口是框架提供给用户用户获取框架中一些对象的接口,比如BeanFactoryAware就是获取BeanFactory,框架会调用重写的setBeanFactory方法,将BeanFactory传给我们的实现类)

@Component
@Slf4j
public class EmallBeanFactoryAware implements BeanFactoryAware {

    @Autowired
    private Environment environment;

    private static final String SPRING_KAFKA_PREFIX = "spring.kafka";

    @Override
    public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
        if (beanFactory instanceof DefaultListableBeanFactory) {
            DefaultListableBeanFactory defaultListableBeanFactory = (DefaultListableBeanFactory) beanFactory;

            Binder binder = Binder.get(environment);
            //将YML中属性值映射到MAP中,后面根据配置前缀生成bean并注册到容器中,TODO 绑定可能有异常,加try catch稳一点
            BindResult<Map> bindResultWithPrefix = binder.bind(SPRING_KAFKA_PREFIX, Bindable.of(Map.class));
            if (!bindResultWithPrefix.isBound()) {
                return;
            }

            Map map = bindResultWithPrefix.get();
            Set set = map.keySet();
            Set<String> kafkaPropertyFiledNames = getKafkaPropertyFiledNames();

            //如果配置多个primary, 只设置第一个,TODO项目启动过程中,这个变量是否有并发问题
            boolean hasSetPrimary = false;
            //实例化每个带前缀的KafkaProperties、KafkaTemplate、
            for (Object object : set) {
                String prefix = object.toString();

                if (kafkaPropertyFiledNames.contains(prefix)) {
                    //不带前缀的正常配置忽略
                    continue;
                }

                String configPrefix = SPRING_KAFKA_PREFIX + "." + prefix;

                BindResult<KafkaProperties> kafkaPropertiesBindResult;
                try {
                    kafkaPropertiesBindResult = binder.bind(configPrefix, Bindable.of(KafkaProperties.class));
                    if (!kafkaPropertiesBindResult.isBound()) {
                        continue;
                    }
                } catch (Exception e) {
                    //一些配置不是在KafkaProperties属性,但是也不是前缀配置,在这一步会绑定失败,比如spring.kafka.topics配置,
                    //一些配置的名称是带-,KafkaProperties属性是驼峰,绑定是会出异常的,异常忽略
                    log.error("auto register kafka properties error, prefix is: {}", configPrefix);
                    continue;
                }

                //注册生产者(TODO 没配置生产者是否会报错)
                KafkaProperties kafkaProperties = kafkaPropertiesBindResult.get();
                String propertiesBeanName = prefix + "KafkaProperties";
                boolean isBeanExist = defaultListableBeanFactory.containsBean(propertiesBeanName);
                if (!isBeanExist) {
                    String primaryConfig = configPrefix + ".primary";
                    //没有默认的kafka配置,需要设置下primary
                    BindResult<Boolean> primaryBindResult = binder.bind(primaryConfig, Bindable.of(Boolean.class));
                    if (primaryBindResult.isBound() && primaryBindResult.get() && !hasSetPrimary) {
                        BeanDefinitionBuilder beanDefinitionBuilder = BeanDefinitionBuilder.genericBeanDefinition(KafkaProperties.class);
                        defaultListableBeanFactory.registerBeanDefinition(propertiesBeanName, beanDefinitionBuilder.getBeanDefinition());
                        defaultListableBeanFactory.registerSingleton(propertiesBeanName, kafkaProperties);
                        defaultListableBeanFactory.getBeanDefinition(propertiesBeanName).setPrimary(true);
                        hasSetPrimary = true;
                    } else {
                        defaultListableBeanFactory.registerSingleton(propertiesBeanName, kafkaProperties);
                    }
                }

				//注册生产者KafkaTemplate
                String templateBeanName = prefix + "KafkaTemplate";
                if (!defaultListableBeanFactory.containsBean(templateBeanName)) {
                    KafkaTemplate kafkaTemplate = new KafkaTemplate<String, String>(
                            new DefaultKafkaProducerFactory<>(kafkaProperties.buildProducerProperties()));
                    defaultListableBeanFactory.registerSingleton(templateBeanName, kafkaTemplate);
                }

                String beanName = prefix + "KafkaListenerContainerFactory";
                if (!defaultListableBeanFactory.containsBean(beanName)) {
                    //注册消费者listener(TODO 没配置消费者是否会报错)
                    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                            new ConcurrentKafkaListenerContainerFactory<>();
                    factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties()));
                    factory.setConcurrency(10);
                    factory.getContainerProperties().setPollTimeout(3000);
                    defaultListableBeanFactory.registerSingleton(beanName, factory);
                }
            }
        }
    }

    private static Set<String> getKafkaPropertyFiledNames () {
        Set<String> names = new HashSet<>();

        Field[] declaredFields = KafkaProperties.class.getDeclaredFields();
        if (declaredFields.length == 0) {
            return names;
        }

        for (Field declaredField : declaredFields) {
            names.add(declaredField.getName());
        }

        return names;
    }
}

遇到的问题

手动注册的bean代码中@Autowire无法注入

手动注册的无法@Autowire,直接加@Lazy注解,先忽略bean注册的先后顺序

多个KafkaProperties实例,无法确定使用哪一个

因为使用前缀的配置方式,bean名称也是带前缀的,没有默认的Kafka配置,框架会自动生成对应的bean,KafkaAnnotationDrivenConfiguration中的KafkaProperties 属性是根据类型注入的,如果配置有多个前缀,注入的时候无法确定使用哪一个,所以增加一个primary配置,自动生成的时候设置下。

既有带前缀,又有不带前缀使用默认配置的

自动配置代码中有一段是根据yml中配置的key,判断是否是KafkaProperties类中的字段,如果是就忽略,让框架自动按默认配置,有些字段yml中是带-,如bootstrap-serversKafkaProperties中是驼峰命名bootstrapServers,绑定的时候会抛异常,影响应用启动,这种异常可以忽略,直接用try catch捕获。

设置Bean为Primary

第二个问题中,多个相同类型的Bean如何设置其中一个bean为Primary,手动注册bean,如果有实例对象,可以直接使用BeanFactoryregisterSingleton(beanName, object),如果没有实例对象,可以直接使用类名,通过BeanFactoryregisterBeanDefinition(beanName, beanDefinition)来注册,如果要设置bean为Primary,必须通过BeanDefinition来设置,但是通过框架的绑定是直接生成实例对象的,如果通过registerSingleton来注册,通过beanName获取BeanDefinition是会抛异常的,因为没有BeanDefinition,所以需要将对象实例和BeanDefinition关联起来,就是上面这段代码

//注册BeanDefinition
defaultListableBeanFactory.registerBeanDefinition(propertiesBeanName, beanDefinitionBuilder.getBeanDefinition());
//注册对象实例,使用相同的bean名称
defaultListableBeanFactory.registerSingleton(propertiesBeanName, kafkaProperties);
//再获取BeanDefinition就能获取到,而且这个bean就是上面注册的实例对象
defaultListableBeanFactory.getBeanDefinition(propertiesBeanName).setPrimary(true);

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/35417.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

PowerDesigner 设置

PowerDesigner 设置前言推荐PowerDesigner 设置简单设置sql反向生成物理模型物理模型创建索引最后前言 以下内容源自自己 仅供学习交流使用 推荐 第11章 数据库的设计规范【2.索引及调优篇】【MySQL高级】 powerdesign 通过sql反向生成ER模型 PowerDesiner 15 在物理模型中…

Python测试框架之unittest和pytest 的区别

一、Unittest Unittest是Python标准库中自带的单元测试框架&#xff0c;Unittest有时候也被称为PyUnit&#xff0c;就像JUnit是Java语言的标准单元测试框架一样&#xff0c;Unittest则是Python语言的标准单元测试框架。 Unittest支持自动化测试&#xff0c;测试用例的初始化、…

FFN -> GLU -> GAU

1 GLU GLU的起源是2016年由Yann N. Dauphin在 论文:Language Modeling with Gated Convolutional Networks 在语言模型的建模方法上相比于循环神经网络更具有竞争力&#xff0c;提出了一种简单的线性门控单元来堆叠卷积层从而使得文本中的token可以并行化处理来获得上下文的语义…

HTTP响应详解

目录 一.状态码 小结&#xff08;记住&#xff09; 二.认识响应正文&#xff08;body&#xff09; 三.如何构造http请求 一.状态码 是一个数字&#xff0c;这个数字描述了当前这次请求的状态&#xff08;成功&#xff0c;失败&#xff0c;失败的原因&#xff09; http的状态…

Typora基础篇

Markdown基础 标题 #标题名字&#xff08;#号的个数代表标题的级数&#xff09; 文件-偏好设置-Markdown里面如果勾选了严格模式&#xff0c;那#与标题名称之间就需要加上一个空格一级标题用1个# 二级标题用2个# 三级标题用3个# 四级标题用4个# 五级标题用5个# ####### …

262-视口,布局视口,视觉视口,移动端适配,less语法,比哪里,DPR,RRI,less的弊端,运算,嵌套,混合,继承,混入,运算,

262-CSS中的单位 ◼ 前面编写的CSS中,我们经常会使用px来表示一个长度(大小),比如font-size设置为18px,width设置为100px。 ◼ px是一个长度(length)单位,事实上CSS中还有非常多的长度单位。 ◼ 整体可以分成两类:  绝对长度单位(Absolute length units);  相…

【uniapp】安装与使用uView组件库:

文章目录一、官网文档二、安装【1】进入控制台【2】初始化package.json【3】安装uView-ui【4】配置main.js【5】配置uni.scss【6】配置easycom组件模式【7】app.vue引入基础样式三、效果一、官网文档 https://www.uviewui.com/ 二、安装 【1】进入控制台 【2】初始化package…

基于sklearn的集成学习实战

集成学习投票法与bagging 投票法 sklearn提供了VotingRegressor和VotingClassifier两个投票方法。使用模型需要提供一个模型的列表&#xff0c;列表中每个模型采用tuple的结构表示&#xff0c;第一个元素代表名称&#xff0c;第二个元素代表模型&#xff0c;需要保证每个模型…

《计算机体系结构量化研究方法第六版》1.3 计算机体系结构的定义

1.3.1 指令集体系结构&#xff1a;计算机体系结构的近距离审视 1、这里通过指令集体系结构 ISA来指代程序员可以看到的实际指令集。ISA相当于软件和硬件之间的界限。 &#xff08;1&#xff09;ISA分类 几乎所有的ISA都被划分到了通用寄存器体系结构中&#xff0c;在这种体系…

能源消耗监测管理系统:实现企业用能定额、降低成本节能管理

现如今&#xff0c;很多企业还停留在安装各种节能产品或者是传统节能技术来达到节能目的&#xff0c;但是这些方法不能从根本上实现节能目标&#xff0c;从而导致企业效益。 在国家推动下&#xff0c;能源管理系统加入了互联网的技术以及数字化技术来协同管理&#xff0c;软硬…

二叉树链式结构的实现及简单操作(画图超详细解释)

二叉树链式结构的实现及简单操作前置说明前序遍历中序遍历后序遍历层序遍历如何判断一颗二叉树是完全二叉树通过前序遍历的数组构建二叉树销毁二叉树总结前置说明 由于我们要对二叉树进行操作&#xff0c;我们就得现有一个二叉树&#xff0c;而二叉树的构建又比较复杂&#xf…

李宏毅机器学习作业10——Adversarial Attack

目录 目标和方法 评价方法 导包 Global Settings Data transform Dataset 代理网络 评估模型在非攻击性图像上的表现 Attack Algorithm FGSM I-FGSM MI-FGSM Diverse Input (DIM) 攻击函数 Attack Ensemble Attack 集成模型函数 构建集成模型 进行攻击 FG…

【Node.js】第八章 express编写接口

目录 1. 编写接口 1.1 编写GET接口 2.2 编写POST接口 2. 接口跨域问题 2.1 跨域问题 2.2 使用cors中间件解决跨域问题 2.3 CORS ​2.4 JSONP接口 1. 编写接口 1.1 编写GET接口 2.2 编写POST接口 2. 接口跨域问题 2.1 跨域问题 2.2 使用cors中间件解决跨域问题 cor…

家里Win7电脑如何连接公司Win10电脑?快解析+远程桌面

什么是远程桌面&#xff1f;通俗地讲&#xff0c;就是可以在任何地点登陆位于其他地点的电脑&#xff0c;可以看到远程登陆电脑的一切东西&#xff0c;可以进行添加、改变、删除文件等任何操作&#xff0c;就像自己在那台电脑前操作一样。远程桌面有丰富的应用场景&#xff0c;…

如何在TIA博途中在线更新PLC的CPU固件版本?

如何在TIA博途中在线更新PLC的CPU固件版本? S7-1200PLC最新的V4.6.0版本的固件出来了,本次就以V4.6版本的固件为例,演示如何在博途中对PLC的固件版本进行更新。 (为防止更新过程中出现意外,强烈建议对PLC的程序进行备份!备份!备份!) 如下图所示,打开某个项目,选中PL…

nm命令使用详解,让你加快学习速度

nm 命令详解 符号是每个ELF文件的一个重要部分&#xff0c;因为它保存了程序实现或使用的所有(全局)变量和函数。符号表中保存了查找程序符号、为符号赋值、重定位符号所需要的全部信息。Linux中 nm用来列出目标文件的符号表;如果nm指令没有指出目标文件&#xff0c;则nm假定目…

模拟电路设计(34)---脉宽调制型开关电路

在开关稳压电源中&#xff0c;直流变换器中的功率晶体管工作在开关状态。目前开关电源的工作频率在几百kHz&#xff0c;有些甚至已经到了MHz量级。如下图所示是DC-DC开关变换器的原理框图&#xff1a; ​DC-DC开关变换器的原理框图 开关电源的实现方式有很多种&#xff0c;如最…

[附源码]Python计算机毕业设计二手图书回收销售网站

项目运行 环境配置&#xff1a; Pychram社区版 python3.7.7 Mysql5.7 HBuilderXlist pipNavicat11Djangonodejs。 项目技术&#xff1a; django python Vue 等等组成&#xff0c;B/S模式 pychram管理等等。 环境需要 1.运行环境&#xff1a;最好是python3.7.7&#xff0c;…

Spring:AOP事务管理(14)

Sprin事务Spring事务简介相关概念介绍转账案例-需求分析转账案例-环境搭建事务管理Spring事务角色Spring事务属性事务配置转账业务追加日志案例事务传播行为Spring事务简介 相关概念介绍 事务作用&#xff1a;在数据层保障一系列的数据库操作同成功同失败。Spring事务作用&am…

电子作业票系统:消除安全管理漏洞,科技赋能企业业务洞察

电子作业票系统采用定位、物联网、人脸识别、大数据技术对现场作业进行严格管控&#xff1b;通过风险大数据风险辨识模型&#xff0c;实现作业风险辨识&#xff0c;对动火、高处、受限空间、临时用电、吊装、断路、管线打开、挖掘作业等特殊作业票证智能化管理。 在危化企业实际…