【Storm】【五】Storm集成Kafka

news2025/7/6 6:11:25

Storm集成Kafka

一、整合说明

Storm 官方对 Kafka 的整合分为两个版本,官方说明文档分别如下:

  • Storm Kafka Integration : 主要是针对 0.8.x 版本的 Kafka 提供整合支持;
  • Storm Kafka Integration (0.10.x+) : 包含 Kafka 新版本的 consumer API,主要对 Kafka 0.10.x + 提供整合支持。

这里我服务端安装的 Kafka 版本为 2.2.0(Released Mar 22, 2019) ,按照官方 0.10.x+ 的整合文档进行整合,不适用于 0.8.x 版本的 Kafka。

二、写入数据到Kafka

2.1 项目结构

在这里插入图片描述

2.2 项目主要依赖

<properties>
    <storm.version>1.2.2</storm.version>
    <kafka.version>2.2.0</kafka.version>
</properties>

<dependencies>
    <dependency>
        <groupId>org.apache.storm</groupId>
        <artifactId>storm-core</artifactId>
        <version>${storm.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.storm</groupId>
        <artifactId>storm-kafka-client</artifactId>
        <version>${storm.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>${kafka.version}</version>
    </dependency>
</dependencies>

2.3 DataSourceSpout

/**
 * 产生词频样本的数据源
 */
public class DataSourceSpout extends BaseRichSpout {

    private List<String> list = Arrays.asList("Spark", "Hadoop", "HBase", "Storm", "Flink", "Hive");

    private SpoutOutputCollector spoutOutputCollector;

    @Override
    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        this.spoutOutputCollector = spoutOutputCollector;
    }

    @Override
    public void nextTuple() {
        // 模拟产生数据
        String lineData = productData();
        spoutOutputCollector.emit(new Values(lineData));
        Utils.sleep(1000);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("line"));
    }


    /**
     * 模拟数据
     */
    private String productData() {
        Collections.shuffle(list);
        Random random = new Random();
        int endIndex = random.nextInt(list.size()) % (list.size()) + 1;
        return StringUtils.join(list.toArray(), "\t", 0, endIndex);
    }

}

产生的模拟数据格式如下:

Spark	HBase
Hive	Flink	Storm	Hadoop	HBase	Spark
Flink
HBase	Storm
HBase	Hadoop	Hive	Flink
HBase	Flink	Hive	Storm
Hive	Flink	Hadoop
HBase	Hive
Hadoop	Spark	HBase	Storm

2.4 WritingToKafkaApp

/**
 * 写入数据到 Kafka 中
 */
public class WritingToKafkaApp {

    private static final String BOOTSTRAP_SERVERS = "hadoop001:9092";
    private static final String TOPIC_NAME = "storm-topic";

    public static void main(String[] args) {


        TopologyBuilder builder = new TopologyBuilder();

        // 定义 Kafka 生产者属性
        Properties props = new Properties();
        /*
         * 指定 broker 的地址清单,清单里不需要包含所有的 broker 地址,生产者会从给定的 broker 里查找其他 broker 的信息。
         * 不过建议至少要提供两个 broker 的信息作为容错。
         */
        props.put("bootstrap.servers", BOOTSTRAP_SERVERS);
        /*
         * acks 参数指定了必须要有多少个分区副本收到消息,生产者才会认为消息写入是成功的。
         * acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。
         * acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
         * acks=all : 只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
         */
        props.put("acks", "1");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        KafkaBolt bolt = new KafkaBolt<String, String>()
                .withProducerProperties(props)
                .withTopicSelector(new DefaultTopicSelector(TOPIC_NAME))
                .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper<>());

        builder.setSpout("sourceSpout", new DataSourceSpout(), 1);
        builder.setBolt("kafkaBolt", bolt, 1).shuffleGrouping("sourceSpout");


        if (args.length > 0 && args[0].equals("cluster")) {
            try {
                StormSubmitter.submitTopology("ClusterWritingToKafkaApp", new Config(), builder.createTopology());
            } catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) {
                e.printStackTrace();
            }
        } else {
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("LocalWritingToKafkaApp",
                    new Config(), builder.createTopology());
        }
    }
}

2.5 测试准备工作

进行测试前需要启动 Kakfa:

1. 启动Kakfa

Kafka 的运行依赖于 zookeeper,需要预先启动,可以启动 Kafka 内置的 zookeeper,也可以启动自己安装的:

# zookeeper启动命令
bin/zkServer.sh start

# 内置zookeeper启动命令
bin/zookeeper-server-start.sh config/zookeeper.properties

启动单节点 kafka 用于测试:

# bin/kafka-server-start.sh config/server.properties

2. 创建topic

# 创建用于测试主题
bin/kafka-topics.sh --create --bootstrap-server hadoop001:9092 --replication-factor 1 --partitions 1 --topic storm-topic

# 查看所有主题
 bin/kafka-topics.sh --list --bootstrap-server hadoop001:9092

3. 启动消费者

启动一个消费者用于观察写入情况,启动命令如下:

# bin/kafka-console-consumer.sh --bootstrap-server hadoop001:9092 --topic storm-topic --from-beginning

2.6 测试

可以用直接使用本地模式运行,也可以打包后提交到服务器集群运行。本仓库提供的源码默认采用 maven-shade-plugin 进行打包,打包命令如下:

# mvn clean package -D maven.test.skip=true

启动后,消费者监听情况如下:

在这里插入图片描述

三、从Kafka中读取数据

3.1 项目结构

在这里插入图片描述

3.2 ReadingFromKafkaApp

/**
 * 从 Kafka 中读取数据
 */
public class ReadingFromKafkaApp {

    private static final String BOOTSTRAP_SERVERS = "hadoop001:9092";
    private static final String TOPIC_NAME = "storm-topic";

    public static void main(String[] args) {

        final TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("kafka_spout", new KafkaSpout<>(getKafkaSpoutConfig(BOOTSTRAP_SERVERS, TOPIC_NAME)), 1);
        builder.setBolt("bolt", new LogConsoleBolt()).shuffleGrouping("kafka_spout");

        // 如果外部传参 cluster 则代表线上环境启动,否则代表本地启动
        if (args.length > 0 && args[0].equals("cluster")) {
            try {
                StormSubmitter.submitTopology("ClusterReadingFromKafkaApp", new Config(), builder.createTopology());
            } catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) {
                e.printStackTrace();
            }
        } else {
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("LocalReadingFromKafkaApp",
                    new Config(), builder.createTopology());
        }
    }

    private static KafkaSpoutConfig<String, String> getKafkaSpoutConfig(String bootstrapServers, String topic) {
        return KafkaSpoutConfig.builder(bootstrapServers, topic)
                // 除了分组 ID,以下配置都是可选的。分组 ID 必须指定,否则会抛出 InvalidGroupIdException 异常
                .setProp(ConsumerConfig.GROUP_ID_CONFIG, "kafkaSpoutTestGroup")
                // 定义重试策略
                .setRetry(getRetryService())
                // 定时提交偏移量的时间间隔,默认是 15s
                .setOffsetCommitPeriodMs(10_000)
                .build();
    }

    // 定义重试策略
    private static KafkaSpoutRetryService getRetryService() {
        return new KafkaSpoutRetryExponentialBackoff(TimeInterval.microSeconds(500),
                TimeInterval.milliSeconds(2), Integer.MAX_VALUE, TimeInterval.seconds(10));
    }
}

3.3 LogConsoleBolt

/**
 * 打印从 Kafka 中获取的数据
 */
public class LogConsoleBolt extends BaseRichBolt {


    private OutputCollector collector;

    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.collector=collector;
    }

    public void execute(Tuple input) {
        try {
            String value = input.getStringByField("value");
            System.out.println("received from kafka : "+ value);
            // 必须 ack,否则会重复消费 kafka 中的消息
            collector.ack(input);
        }catch (Exception e){
            e.printStackTrace();
            collector.fail(input);
        }

    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {

    }
}

这里从 value 字段中获取 kafka 输出的值数据。

在开发中,我们可以通过继承 RecordTranslator 接口定义了 Kafka 中 Record 与输出流之间的映射关系,可以在构建 KafkaSpoutConfig 的时候通过构造器或者 setRecordTranslator() 方法传入,并最后传递给具体的 KafkaSpout

默认情况下使用内置的 DefaultRecordTranslator,其源码如下,FIELDS 中 定义了 tuple 中所有可用的字段:主题,分区,偏移量,消息键,值。

public class DefaultRecordTranslator<K, V> implements RecordTranslator<K, V> {
    private static final long serialVersionUID = -5782462870112305750L;
    public static final Fields FIELDS = new Fields("topic", "partition", "offset", "key", "value");
    @Override
    public List<Object> apply(ConsumerRecord<K, V> record) {
        return new Values(record.topic(),
                record.partition(),
                record.offset(),
                record.key(),
                record.value());
    }

    @Override
    public Fields getFieldsFor(String stream) {
        return FIELDS;
    }

    @Override
    public List<String> streams() {
        return DEFAULT_STREAM;
    }
}

3.4 启动测试

这里启动一个生产者用于发送测试数据,启动命令如下:

# bin/kafka-console-producer.sh --broker-list hadoop001:9092 --topic storm-topic

在这里插入图片描述

本地运行的项目接收到从 Kafka 发送过来的数据:

在这里插入图片描述


用例源码下载地址:storm-kafka-integration

参考资料

  1. Storm Kafka Integration (0.10.x+)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/387777.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

English Learning - L2-3 英音地道语音语调 小元音 [ʌ] [ɒ] [ʊ] [ɪ] [ə] [e] 2023.02.27 周一

English Learning - L2-3 英音地道语音语调 小元音 [ʌ] [ɒ] [ʊ] [ɪ] [ə] [e] 2023.02.27 周一课前活动练习方法大小元音总结小元音准备工作[ʌ] 中元音发音技巧对应单词的发音对应句子的发音常见的字母组合[ɒ] 后元音发音技巧对应单词的发音对应句子的发音常见的字母组合…

Spring——什么是事务?传播行为?事务隔离级别有哪些?

思维导图一、什么是事务&#xff1f;多条DML要么同时成功&#xff0c;要么同时失败Transaction&#xff08;tx&#xff09;二、事务的四个过程&#xff1a;开启事务&#xff08;start transaction&#xff09;执行核心业务代码提交事务&#xff08;如果核心业务处理过程中没有出…

真香,Grafana开源Loki日志系统取代ELK?

一、Loki是什么&#xff1f; Loki是由Grafana Labs开源的一个水平可扩展、高可用性&#xff0c;多租户的日志聚合系统的日志聚合系统。它的设计初衷是为了解决在大规模分布式系统中&#xff0c;处理海量日志的问题。Loki采用了分布式的架构&#xff0c;并且与Prometheus、Graf…

【前端】一个更底层库-React基础知识点

目录Reat是什么&#xff1f;为什么要使用React类库比较容易学习&#xff0c;API非常少。组件内聚&#xff0c;容易组合原生组件和自定义组件融合渲染状态/属性驱动全局更新commonjs生态圈/工具栏完善React基础知识JSX概述JSX嵌入变量Event事件组合组合CHILDREN总结大家好&#…

02-问题思考维度:抓住核心用户、场景化分析、需求收集与辨别、用户故事

文章目录2.1 抓住核心用户2.1.1 为什么要抓住核心用户2.1.2 核心用户的特征根据不同维度&#xff0c;描述核心用户2.1.3 如何抓住核心用户2.2 场景化分析2.2.1 场景五要素2.2.2 场景化分析方法2.2.3 场景化分析方法的应用2.3 需求收集与辨别2.3.1 需求的定义及层次2.3.2 需求收…

汇编相关问题

汇编语言期末复习题DX&#xff1a;单项选择题 DU&#xff1a;多项选择题 TK&#xff1a;填空题 MC&#xff1a;名词解释 v JD&#xff1a;简答题 CXFX&#xff1a;程序分析题 CXTK&#xff1a;程序填空题 BC&#xff1a;编程题第1章&#xff1a;基础知识1、在汇编语言程序的开发…

Vue2.0开发之——购物车案例-axios请求列表数据(45)

一 概述 项目导入axios HTTP 库axios请求数据列表将请求到的数据转存到data中 二 项目导入axios HTTP 库 2.1 axios介绍 Axios是一个基于promise 的 HTTP 库&#xff0c;可以用在浏览器和 node.js中 2.2 axios项目地址 https://www.npmjs.com/package/axios 2.3 axios安装…

excel的Countif函数使用详细教程

excel的Countif函数使用详细教程本教程通过七个示例讲解Countif函数使用教程&#xff0c;其中条件如何设置模糊值&#xff0c;统计固定长度的文本&#xff0c;统计大于某个值&#xff0c;统计等于某个值的&#xff0c;统计日期等。Countif函数作用&#xff1a;对指定单元格区域…

pycharm关联github、新建以及更新仓

此处已经默认你安装了git以及pycharm,这篇文章将会教给大家如何利用pycharm管理自己的github. 目录 pycharm关联github设置 Github创建新的仓 仓库的更新 pycharm:2022。不同版本界面略有不同。 pycharm关联github设置 设置PyCharm&#xff0c;打开File --> Settings -…

SAP 如何批量扩充(复制)科目到其他子公司(T-CODE: FS15)?

SAP 科目主数据管理常用的事务代码&#xff1a; 科目的创建分科目表层&#xff08;FSP0&#xff09;&#xff1b; 公司代码层&#xff08;FSS0&#xff09;&#xff1b; 集中创建&#xff08;FS00&#xff09;。 现假设我们成立一家新的子公司&#xff0c;需要把母公司的科目批…

博客系统程序(页面设计)

咱们学习javaEE的目的就是完成一个网站.在当前学习的基础上,已经可以完成我们的博客系统的页面的设计了!!!首先我们要进行统筹规划:首先我们的博客页面将会有4个页面:1.博客列表页2.博客详情页显示一个博客的具体内容:3.登录页就是用户输入用户名和页面的地方4.博客编辑页发布新…

1、JAVA 开发环境搭建 - JDK 的安装配置

文章目录一、下载 JDK81、官网地址&#xff1a;[**https://www.oracle.com**](https://www.oracle.com)二、安装 JDK1、鼠标右键安装包&#xff0c;以管理员身份运行(无脑下一步即可)2、细节说明三、配置环境变量1、为啥要配置环境变量呢&#xff1f;2、原因分析3、配置环境变量…

SpringMVC的完整执行流程

Java知识点总结&#xff1a;想看的可以从这里进入 目录2.5、SpringMVC流程2.5、SpringMVC流程 Spring MVC 框架主要由 DispatcherServlet、处理器映射、控制器、视图解析器、视图等组成&#xff0c;它和其他的MVC框架一样&#xff0c;以请求为驱动&#xff0c;围绕Servlet分派…

顺序表(一篇带你掌握顺序表)

目录 一、顺序表是什么 1.1 概念 1.2 分类 1.3 结构 二、顺序表的基本操作 2.1 前绪准备 2.2 初始化 2.3 扩容 2.5 尾插 2.6 打印 2.7 尾删 2.8 头插 2.9 头删 2.10 在pos位置插入 2.11 删除pos位置的数据 2.12 查找 三、完整代码 3.1 Test.c文件 3.2 SeqList.h…

C++之const_cast的用法

C的四种类型转换之const_cast 这里写目录标题前言const_cast1.new type为左值引用2. new type为函数类型的右值引用3.new type为对象类型的右值引用4.指向函数的指针和指向成员函数的指针不受约束5.通过非const访问路径修改const对象导致未定义行为6.const_cast只能用来修改指针…

Android onLayout布局流程解析

组件布局流程结论 1.&#xff09;layout流程始于ViewRootImpl的performLayout()方法&#xff0c;该方法会调用根View&#xff08;DecorView&#xff09;的layout()方法进行布局&#xff0c;因为DecorView是ViewGroup(FrameLayout),所以layout流程来到了ViewGroup&#xff08;其…

【C语言指针练习题】你真的学会指针了吗?

✨✨✨✨如果文章对你有帮助记得点赞收藏关注哦&#xff01;&#xff01;✨✨✨✨ 文章目录✨✨✨✨如果文章对你有帮助记得点赞收藏关注哦&#xff01;&#xff01;✨✨✨✨一维数组练习题&#xff1a;字符数组练习题&#xff1a;字符指针练习题&#xff1a;二维数组练习题&am…

LeetCode-63. 不同路径 II

题目来源 63. 不同路径 II 递归 class Solution {public int uniquePathsWithObstacles(int[][] obstacleGrid) {int row obstacleGrid.length-1;int col obstacleGrid[0].length-1;return process(row,col,0,0,obstacleGrid);}private int process(int row ,int col,int i…

重装系统一半电脑蓝屏如何解决

小编相信大家在使用电脑或者给电脑重装系统的时候都遇到过电脑蓝屏等等故障问题。最近有用户就遇到了这样一个问题&#xff0c;问小编重装系统一半电脑蓝屏怎么办&#xff0c;那么今天小编就告诉大家重装系统一半电脑蓝屏的解决方法。 工具/原料&#xff1a; 系统版本&#x…

Tecent libco C++协程库初探

安装 https://github.com/Tencent/libco 上把release版本的下下来&#xff1a; mkdir build && cd build && cmake .. && make拿到动态和静态库啦&#xff0c;然后cp到/usr/local/lib就完成安装啦。 项目有很多example&#xff0c;直接进根目录make就…