在 DDD 中优雅的发送 Kafka 消息

news2025/7/19 6:47:38

前言
1:host 映射
在这里插入图片描述
下载 SwitchHost 配置一个映射地址。点击 + 添加一个本地环境,之后配置你的 IP kafka 这样就能找这个地址了。IP 为你本地的IP,如果是云服务器就是公网IP地址
使用docker-compose.yml进行一键部署安装

version: '3.0'
# docker-compose -f docker-compose.yml up -d
services:
  zookeeper:
    image: zookeeper:3.9.0
    container_name: zookeeper
    restart: always
    ports:
      - "2181:2181"
    environment:
      ZOO_MY_ID: 1
      ZOO_SERVERS: server.1=zookeeper:2888:3888;2181
      ZOOKEEPER_CLIENT_PORT: 2181
      ALLOW_ANONYMOUS_LOGIN: yes
      TZ: Asia/Shanghai
    networks:
      - my-network

  kafka:
    image: bitnami/kafka:3.7.0
    container_name: kafka
    volumes:
      - /etc/localtime:/etc/localtime
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_CFG_LISTENERS: PLAINTEXT://:9092
      KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      ALLOW_PLAINTEXT_LISTENER: yes
      KAFKA_MESSAGE_MAX_BYTES: "2000000"
      KAFKA_ENABLE_KRAFT: no
      JMX_PORT: 9999
      TZ: Asia/Shanghai
    depends_on:
      - zookeeper
    networks:
      - my-network

  kafka-eagle:
    image: echo21bash/kafka-eagle:3.0.2
    container_name: kafka-eagle
    environment:
      KAFKA_EAGLE_ZK_LIST: zookeeper:2181
    volumes:
      - ./kafka-eagle/system-config.properties:/opt/kafka-eagle/conf/system-config.properties
    ports:
      - "8048:8048"
    depends_on:
      - kafka
    networks:
      - my-network

networks:
  my-network:
    driver: bridge

脚本在代码中提供了完整的语句
消息流程
在这里插入图片描述
代码结构
在这里插入图片描述
1:domain 是领域层,提供一个个领域服务。如果一个工程有多个领域,则有不同的 a、b、c 领域包,每个包下有一套【event、model、repository、service】。
2:在领域层定义的 event 事件,里面涵盖了事件消息。而这个事件消息可以让 UserRepository 继承实现。最终完成消息发送。
3:最后是 trigger 触发器层,所有的 http、rpc、job、mq 都是一种触发行为。通过触发器的 listener 监听,来接收 mq 消息。
环境配置

spring:
  kafka:
    bootstrap-servers: localhost:9092
    producer:
      # 发生错误后,消息重发的次数。
      retries: 1
      #当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。
      batch-size: 16384
      # 设置生产者内存缓冲区的大小。
      buffer-memory: 33554432
      # 键的序列化方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 值的序列化方式
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      # acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。
      # acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
      # acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
      acks: 1
      
...

# 配置主题
kafka:
  topic:
    group: xmg-group
    user: xmg-topic

配置发送事件

@Slf4j
@Component
public class EventPublisher {

    @Resource
    private KafkaTemplate<String, String> kafkaTemplate;

    public void publish(String topic, BaseEvent.EventMessage<?> eventMessage) {
        try {
            String messageJson = JSON.toJSONString(eventMessage);
            kafkaTemplate.send(topic, messageJson);
            log.info("发送MQ消息 topic:{} message:{}", topic, messageJson);
        } catch (Exception e) {
            log.error("发送MQ消息失败 topic:{} message:{}", topic, JSON.toJSONString(eventMessage), e);
            throw e;
        }
    }

}

事件消息定义

public class UserMessageEvent extends BaseEvent<UserMessageEvent.UserMessage> {

    @Value("${kafka.topic.user}")
    private String topic;

    @Override
    public EventMessage<UserMessage> buildEventMessage(UserMessage data) {
        return EventMessage.<UserMessage>builder()
                .id(RandomStringUtils.randomNumeric(11))
                .timestamp(new Date())
                .data(data)
                .build();
    }

    @Override
    public String topic() {
        return topic;
    }

    /**
     * 要推送的事件消息,聚合到当前类下。
     */
    @Data
    @Builder
    @AllArgsConstructor
    @NoArgsConstructor
    public static class UserMessage {
        private String userId;
        private String userName;
        private String userType;
    }

}

事件消息发送

@Service
public class UserRepository extends UserMessageEvent implements IUserRepository {

    @Resource
    private EventPublisher publisher;

    @Override
    public void doSaveUser(UserEntity userEntity) {
        // 推送消息
        publisher.publish(this.topic(), this.buildEventMessage(UserMessageEvent.UserMessage.builder()
                .userId(userEntity.getUserId())
                .userName(userEntity.getUserName())
                .userType(userEntity.getUserTypeVO().getDesc())
                .build()));
    }
    
}

事件消息监听

@Slf4j
@Component
public class KafkaMessageListener {

    @KafkaListener(topics = "${kafka.topic.user}", groupId = "${kafka.topic.group}", concurrency = "1")
    public void topic_test(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
        Optional<?> message = Optional.ofNullable(record.value());
        if (message.isPresent()) {
            Object msg = message.get();
            try {
                // 逻辑处理

                // 确认消息消费完成,如果抛异常消息会进入重试
                ack.acknowledge();
                log.info("Kafka消费成功! Topic:" + topic + ",Message:" + msg);
            } catch (Exception e) {
                e.printStackTrace();
                log.error("Kafka消费失败!Topic:" + topic + ",Message:" + msg, e);
            }
        }
    }

}

测试验证

@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class UserServiceTest {

    @Resource
    private IUserService userService;

    @Test
    public void test_register() throws InterruptedException {
        while (true) {
            UserEntity userEntity = new UserEntity();
            userEntity.setUserId("10001");
            userEntity.setUserName("小明哥");
            userEntity.setUserTypeVO(UserTypeVO.T8);

            userService.register(userEntity);
            Thread.sleep(1500);
        }

    }

}

好了 至此 在 DDD 中优雅的发送 Kafka 消息 学习结束了 友友们 点点关注不迷路 老铁们!!!!!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2261413.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

数字电视标准与分类

数字电视相关内容是一个极其成熟且久远的领域&#xff0c;并不像其它的技术方面那么前沿。但是学习技术的另外一个方面也不就是可以维持咱们的好奇心以及认识生活中多个事务后面的技术本质。 近年来&#xff0c;电视领域发生了一系列的变化&#xff0c;电视数字化的进程明显加快…

AI视频配音技术创新应用与商业机遇

随着人工智能技术的飞速发展&#xff0c;AI视频配音技术已经成为内容创作者和营销人员的新宠。这项技术不仅能够提升视频内容的吸引力&#xff0c;还能为特定行业带来创新的解决方案。本文将探讨AI视频配音技术的应用场景&#xff0c;并讨论如何合法合规地利用这一技术。 AI视频…

Airbus结构数字样机理念及实践(转)

关注作者 1、数字样机的背景 早期的设计文档通过二维工程图来描述&#xff0c;对工程师来说&#xff0c;绘制工程图足够表达设计思想&#xff0c;工程图成为了标准的“工程师语言”。但是外围的用户通常通过透视图来表达设计意图&#xff0c;不得不产生了大量针对不同教育背景…

UNIAPP框架uView初步集成与开发设计

uView UI&#xff0c;是uni-app生态最优秀的UI框架&#xff0c;全面的组件和便捷的工具会让您信手拈来&#xff0c;如鱼得水。本文章分享UNIAPP集成使用uView页面动态开发设计。 一、使用HBuilder X 直接导入插件&#xff0c;下载后重启 uView - DCloud 插件市场 二、配置样…

【图像处理lec7】图像恢复、去噪

目录 一、图像退化与恢复概述 二、图像退化中的噪声模型 1、使用 imnoise 函数添加噪声 &#xff08;1&#xff09;imnoise 函数的概述 &#xff08;2&#xff09;函数语法 &#xff08;3&#xff09;支持的噪声类型与具体语法 &#xff08;4&#xff09;噪声类型的详细…

dpdk中udp包的接受与发送

预备知识 dpdk中一些关键宏定义与结构体定义 以太网帧相关 宏RTE_ETHER_ADDR_LEN mac地址长度&#xff0c;6字节48位 宏RTE_ETHER_TYPE_IPV4 代表ipv4 struct rte_ether_hdr 以太网帧头结构体&#xff0c;包含了三个成员变量&#xff0c;目的地址&#xff0c;源地址&#…

C语言进阶(2) ---- 指针的进阶

前言&#xff1a;指针的主题&#xff0c;我们在初阶的《指针》章节已经接触过了&#xff0c;我们知道了指针的概念&#xff1a; 1.指针就是个变量&#xff0c;用来存放地址&#xff0c;地址唯一标识一块内存空间。 2.指针的大小是固定的4/8个字节(32位平台/64位平台)。 3.指针是…

项目整体结构优化

文章目录 1.依赖配置方式1.作为专门管理依赖的模块2.作为父模块3.作为子模块4.注意事项1.关于relativePath的配置2.关于打包的配置3.遇到maven报错的解决方案1.首先刷新maven2.从子模块开始clean-install3.最终:在最顶级的模块clean-package 2.概览3.新建一个子模块sun-depende…

计算机视觉-边缘检测

图片分类 一张图片中可能有多个需要识别的物体&#xff0c;会用方框标注他们的位置和类别 例&#xff1a; 给出一张照片&#xff0c;计算机需要从中识别出这是一只猫 一张图片的计算量是较大的&#xff0c;这张图片的尺寸虽然是6464&#xff0c;因为每张图片有3个颜色通道&am…

多模块应用、发布使用第三方库(持续更新中)

目录: 1、多模块概述&#xff08;HAP、HSP、HAR&#xff09; HAR与HSP两种共享包的主要区别体现在&#xff1a; 2、三类模块&#xff1a; 3、创建项目&#xff1a;项目名&#xff1a;meituan &#xff08;1&#xff09;创建Ability类型的Module&#xff0c;编译后为HAP文件…

安卓 文件管理相关功能记录

文件管理细分为图片、视频、音乐、文件四类 目录 权限 静态声明权限 动态检查和声明权限方法 如何开始上述动态申请的流程 提示 图片 获取图片文件的对象列表 展示 删除 视频 获取视频文件的对象列表 获取视频file列表 按日期装载视频文件列表 展示 播放 删除…

CHIMA网络安全攻防大赛经验分享

比赛模式 第一轮&#xff1a;20分钟基础知识赛&#xff08;50道题&#xff09; 安全运维&#xff0c;法律法规&#xff0c;linux操作系统等 第二轮&#xff1a;50分钟CTF夺旗&#xff08;5道题&#xff09; 题目涵盖 密码学 运用多种工具&#xff0c;如ASCII对照&#xff0c…

QT 国际化(翻译)

QT国际化&#xff08;Internationalization&#xff0c;简称I18N&#xff09;是指将一个软件应用程序的界面、文本、日期、数字等元素转化为不同的语言和文化习惯的过程。这使得软件能够在不同的国家和地区使用&#xff0c;并且可以根据用户的语言和地区提供本地化的使用体验。…

[Java] 使用 VSCode 来开发 Java

目录 前言Java 环境怎么看自己是否已经配置完成&#xff1f;安装 JDK安装 Maven 环境修改 Maven 依赖源 完善 VS Code配置插件配置 Maven配置 Maven Settings配置 Maven 可执行文件地址 前言 由于使用 VSCode 编码已经成为习惯&#xff0c;并且它确实相对其他的 IDE 较为轻量化…

如何高效获取Twitter数据:Apify平台上的推特数据采集解决方案

引言 在数据分析和市场研究领域&#xff0c;Twitter&#xff08;现在的X&#xff09;数据一直是重要的信息来源。但是&#xff0c;自从Twitter更改API定价策略后&#xff0c;获取数据的成本大幅提升。本文将介绍一个经济实惠的替代方案。 为什么需要Twitter数据&#xff1f; …

vue3+ant design vue实现日期选择器不展示清除按钮

1、代码&#xff1a;只需设置:allowClear"false"即可 <a-date-pickerv-model:value"value1":disabledDate"disabledDate"change"queryRate":allowClear"false" />const disabledDate (current: Dayjs) > {// 获取…

S2CRNet 图像测评笔记 图像融合

空间分离曲线渲染网络用于高效高分辨率图像协调 开源地址&#xff1a; https://github.com/stefanLeong/S2CRNet 效果图&#xff1a; 左边是输入&#xff0c;最右边是效果&#xff1a;效果不是很理想&#xff0c;色差问题还在 本地代码&#xff1a; S2CRNet-demos-main

【计算机网络】Layer4-Transport layer

目录 传输层协议How demultiplexing works in transport layer&#xff08;传输层如何进行分用&#xff09;分用&#xff08;Demultiplexing&#xff09;的定义&#xff1a;TCP/UDP段格式&#xff1a; UDPUDP的特点&#xff1a;UDP Format端口号Trivial File Transfer Protocol…

【Excel】单元格分列

目录 分列&#xff08;新手友好&#xff09; 1. 选中需要分列的单元格后&#xff0c;选择 【数据】选项卡下的【分列】功能。 2. 按照分列向导提示选择适合的分列方式。 3. 分好就是这个样子 智能分列&#xff08;进阶&#xff09; 高级分列 Tips&#xff1a; 新手推荐基…

易语言鼠标轨迹算法(游戏防检测算法)

一.简介 鼠标轨迹算法是一种模拟人类鼠标操作的程序&#xff0c;它能够模拟出自然而真实的鼠标移动路径。 鼠标轨迹算法的底层实现采用C/C语言&#xff0c;原因在于C/C提供了高性能的执行能力和直接访问操作系统底层资源的能力。 鼠标轨迹算法具有以下优势&#xff1a; 模拟…