大数据技术基础实验十五:Storm实验——实时WordCountTopology

news2025/8/6 0:09:42

大数据技术基础实验十五:Storm实验——实时WordCountTopology

文章目录

  • 大数据技术基础实验十五:Storm实验——实时WordCountTopology
    • 一、前言
    • 二、实验目的
    • 三、实验要求
    • 四、实验原理
      • 1、Topologies
      • 2、Spouts
      • 3、Bolts
    • 五、实验步骤
      • 1、导入依赖jar包
      • 2、编写代码
      • 3、打包上传并运行
    • 六、最后我想说

一、前言

本期实验我们将使用上期部署的Storm集群来进行一个任务实现——实时WordCountTopology。

二、实验目的

掌握如何用Java代码来实现Storm任务的拓扑,掌握一个拓扑中Spout和Bolt的关系及如何组织它们之间的关系,掌握如何将Storm任务提交到集群。

三、实验要求

编写一个Storm拓扑,一个Spout每个一秒钟随机生成一个单词并发射给Bolt,Bolt统计接收到的每个单词出现的频率并每隔一秒钟实时打印一次统计结果,最后将任务提交到集群运行,并通过日志查看任务运行结果。

四、实验原理

Storm集群和Hadoop集群表面上看很类似。但是Hadoop上运行的是MapReduce jobs,而在Storm上运行的是拓扑(topology),这两者之间是非常不一样的。一个关键的区别是: 一个MapReduce job最终会结束, 而一个topology永远会运行(除非你手动kill掉)。

1、Topologies

一个topology是spouts和bolts组成的图,通过stream groupings将图中的spouts和bolts连接起来,如下图所示。

在这里插入图片描述

一个topology会一直运行直到你手动kill掉,Storm自动重新分配执行失败的任务, 并且Storm可以保证你不会有数据丢失(如果开启了高可靠性的话)。如果一些机器意外停机它上面的所有任务会被转移到其他机器上。

运行一个topology很简单。首先,把你所有的代码以及所依赖的jar打进一个jar包。然后运行类似下面的这个命令:

storm jar all-my-code.jar backtype.storm.MyTopology arg1 arg2
这个命令会运行主类: backtype.strom.MyTopology, 参数是arg1, arg2。这个类的main函数定义这个topology并且把它提交给Nimbus。storm jar负责连接到Nimbus并且上传jar包。

Topology的定义是一个Thrift结构,并且Nimbus就是一个Thrift服务, 你可以提交由任何语言创建的topology。上面的方面是用JVM-based语言提交的最简单的方法。

2、Spouts

消息源spout是Storm里面一个topology里面的消息生产者。一般来说消息源会从一个外部源读取数据并且向topology里面发出消息:tuple。Spout可以是可靠的也可以是不可靠的。如果这个tuple没有被storm成功处理,可靠的消息源spouts可以重新发射一个tuple, 但是不可靠的消息源spouts一旦发出一个tuple就不能重发了。

消息源可以发射多条消息流stream。使用OutputFieldsDeclarer.declareStream来定义多个stream,然后使用SpoutOutputCollector来发射指定的stream。

Spout类里面最重要的方法是nextTuple。要么发射一个新的tuple到topology里面或者简单的返回如果已经没有新的tuple。要注意的是nextTuple方法不能阻塞,因为storm在同一个线程上面调用所有消息源spout的方法。

另外两个比较重要的spout方法是ack和fail。storm在检测到一个tuple被整个topology成功处理的时候调用ack,否则调用fail。storm只对可靠的spout调用ack和fail。

3、Bolts

所有的消息处理逻辑被封装在bolts里面。Bolts可以做很多事情:过滤,聚合,查询数据库等等。

Bolts可以简单的做消息流的传递。复杂的消息流处理往往需要很多步骤,从而也就需要经过很多bolts。比如算出一堆图片里面被转发最多的图片就至少需要两步:第一步算出每个图片的转发数量。第二步找出转发最多的前10个图片。(如果要把这个过程做得更具有扩展性那么可能需要更多的步骤)。

Bolts可以发射多条消息流, 使用OutputFieldsDeclarer.declareStream定义stream,使用OutputCollector.emit来选择要发射的stream。

Bolts的主要方法是execute, 它以一个tuple作为输入,bolts使用OutputCollector来发射tuple,bolts必须要为它处理的每一个tuple调用OutputCollector的ack方法,以通知Storm这个tuple被处理完成了,从而通知这个tuple的发射者spouts。 一般的流程是: bolts处理一个输入tuple, 发射0个或者多个tuple, 然后调用ack通知storm自己已经处理过这个tuple了。storm提供了一个IBasicBolt会自动调用ack。

五、实验步骤

1、导入依赖jar包

我们首先在Eclipse中创建一个StormTest项目,然后在项目中创建一个lib包,然后我们利用xftp登录到master虚拟机上,进入我们所需jar包的目录中:

cd /usr/cstor/storm/lib

在这里插入图片描述

将其全部的jar包导入到本地项目的lib包内。

然后再在Eclipse中对每个jar执行如下操作进行添加配置:

在这里插入图片描述

出现这样即可:

在这里插入图片描述

2、编写代码

然后我们在项目的src中首先创建一个cproc.word包。

在这里插入图片描述

然后我们再在包内创建三个java类并填入对应代码,用于实现一个完整的Topology。

Spout随机发送单词,代码实现:

package cproc.word;

import java.util.Map;
import java.util.Random;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;

public class WordReaderSpout extends BaseRichSpout {
    private SpoutOutputCollector collector;
    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector)
    {
        this.collector = collector;
    }
    @Override
    public void nextTuple() {
    	 //这个方法会不断被调用,为了降低它对CPU的消耗,让它sleep一下
     Utils.sleep(1000);
     final String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"};
     Random rand = new Random();
     String word = words[rand.nextInt(words.length)];
     collector.emit(new Values(word));
    }
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }
}  

Bolt单词计数,并每隔一秒打印一次,代码实现:

package cproc.word;

import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Tuple;

public class WordCounterBolt extends BaseBasicBolt {
    private static final long serialVersionUID = 5683648523524179434L;
    private HashMap<String, Integer> counters = new HashMap<String, Integer>();
    private volatile boolean edit = false;
    @Override
    public void prepare(Map stormConf, TopologyContext context) {
        //定义一个线程1秒钟打印一次统计的信息
        new Thread(new Runnable() {
          public void run() {
             while (true) {
               if (edit) {
                   for (Entry<String, Integer> entry : counters.entrySet())
                   {
                      System.out.println(entry.getKey() + " : " + entry.getValue());
                    }
                    edit = false;
                }
                try {
                    Thread.sleep(1000);
                 } catch (InterruptedException e) {
                     e.printStackTrace();
                  }
              }
            }
        }).start();
    }
    @Override
    public void execute(Tuple input, BasicOutputCollector collector) {
        String str = input.getString(0);
        if (!counters.containsKey(str)) {
            counters.put(str, 1);
        } else {
            Integer c = counters.get(str) + 1;
            counters.put(str, c);
        }
        edit = true;
    }
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {

    }
}

构建Topology并提交到集群主函数,代码实现:

package cproc.word;

import backtype.storm.Config;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.AuthorizationException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.topology.TopologyBuilder;

public class WordCountTopo {
    public static void main(String[] args) throws Exception{
      //构建Topology
      TopologyBuilder builder = new TopologyBuilder();
      builder.setSpout("word-reader", new WordReaderSpout());
      builder.setBolt("word-counter", new WordCounterBolt())
      .shuffleGrouping("word-reader");
      Config conf = new Config();
      //集群方式提交
      StormSubmitter.submitTopologyWithProgressBar("wordCount", conf,
      builder.createTopology());
    }
}

3、打包上传并运行

我们将Storm代码打成wordCount-Storm.jar(打包的时候不要包含storm中的jar,不然会报错的,将无法运行,即:wordCount-Storm.jar中只包含上面三个类的代码)上传到主节点。

在这里插入图片描述

这里需要注意的是我们不勾选上图框选的选项,这样就不会打包项目中的jar包。

然后我们打开xftp软件,将我们打包好的jar上传到master虚拟机的/usr/cstor/storm/bin目录下并在主节点进入Storm安装目录的bin下面用以下命令提交任务:

cd /usr/cstor/storm/bin/
./storm jar wordCount-Storm.jar cproc.word.WordCountTopo wordCount

在这里插入图片描述

出现如下情况就代表我们已经运行成功了。

在这里插入图片描述

然后我们来通过查看Storm的日志文件来查看我们的运行结果,我们需要进入slave1的下的storm的logs目录中找到有关wordcount的文件:

cd /usr/cstor/storm/logs/
ls

在这里插入图片描述

然后我们使用如下命令查看运行结果:

cat wordCount-1-1668759595-worker-6703.log

在这里插入图片描述

然后我们可以在master上通过如下命令来结束我们的任务:

./storm kill wordCount

在这里插入图片描述

六、最后我想说

本期的Storm实验到这里就结束了,学校大数据平台上面的基础实验还剩下几个没有更新,后续我会慢慢全部更新的。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/15824.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

ffmpeg编译so

1.第一个坑&#xff1a;/bin/bash^M: bad interpreter: No such file or directory shell脚本报错/bin/bash^M: bad interpreter: No such file or directory&#xff0c;通过查阅资料得知&#xff0c;shell脚本格式必须是unix才行&#xff0c;但我这个脚本是在windows上编写完…

如何批量创建word文档并重命名?

如何批量创建word文档并重命名&#xff1f;大家请注意&#xff0c;我这里抛出的问题是批量创建word文档并重命名&#xff0c;重点在批量&#xff0c;并不是我们平时遇到的单纯创建一个或者几个word文档&#xff0c;而是批量创建几十上百个甚至几百上千个word文档。创建几个word…

英国博士后招聘|约克大学—核磁共振监测催化

英国约克大学博士后职位—核磁共振监测催化 约克大学&#xff08;University of York&#xff09;&#xff0c;建于1963年&#xff0c;是一所位于英国英格兰约克的研究型公立大学&#xff0c;英国罗素大学集团、世界大学联盟、N8大学联盟、白玫瑰大学联盟和江苏—英国高水平大学…

【开发心得】Java ftp开发注意事项

前言: 虽说已经2022年了&#xff0c;但是ftp上传方式还是有一定使用场景的&#xff0c;关于java的ftp上传下载实现&#xff0c;基本都指向了apache commont net 库。 代码实现可以参考:https://blog.csdn.net/tianshan2010/article/details/103690940 或者其他类似的文章&…

云服务--漏洞修复

1、Spring Security 身份认证绕过漏洞(CVE-2022-22978) Spring Security是一个能够为基于Spring的企业应用系统提供声明式的安全访问控制解决方案的安全框架。 Spring Security存在身份认证绕过漏洞 &#xff0c;当Spring Security中使用RegexRequestMatcher进行权限配置&#…

汽车云算力“竞速”,个性化进阶成新风向

配图来自Canva可画 随着产业互联网的持续推进&#xff0c;云服务逐渐深入各行各业&#xff0c;云服务厂商也专门推出各种面向特定行业的专属云&#xff0c;比如金融云、零售云、政务云等等。如今云服务厂商正把焦点深入到汽车领域&#xff0c;围绕“汽车云”展开新的角逐。 今…

LeetCode[662]二叉树的最大宽度

难度&#xff1a;中等 题目&#xff1a; 给你一棵二叉树的根节点 root &#xff0c;返回树的 最大宽度 。 描述&#xff1a; 树的 最大宽度 是所有层中最大的 宽度 。 每一层的 宽度 被定义为该层最左和最右的非空节点&#xff08;即&#xff0c;两个端点&#xff09;之间的长…

麦子-linux驱动策略与框架

一、linux内核同步和互斥 信号量&#xff08;进程与进程&#xff09; 当进程A执行共享资源先加锁执行down表示占用资源&#xff0c;进程B想执行就会被dowm&#xff0c;导致陷入睡眠 当进程A行完了就会up释放资源&#xff0c;我们其他线程执行这个共享资源就不会被dowm睡眠 自旋…

蓝桥杯入门即劝退(十)反转链表

----------持续更新蓝桥杯入门系列算法实例------------ 如果你也喜欢Java和算法&#xff0c;欢迎订阅专栏共同学习交流&#xff01; 你的点赞、关注、评论、是我创作的动力&#xff01; -------希望我的文章对你有所帮助-------- 前言&#xff1a;如果有一定链表基础&#…

【图像处理】基于形状提取和模式匹配组合的面部特征点提取方法(Matlab代码实现)

&#x1f468;‍&#x1f393;个人主页&#xff1a;研学社的博客 &#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜…

7天酒店斩获五洲钻石奖“年度投资价值酒店连锁品牌” 打造酒店投资极致性价比

近日&#xff0c;“2022文旅国际峰会暨第十五届世界酒店论坛”在海南隆重召开&#xff0c;300多位来自中外的业界精英和跨界企业家代表等受邀参加&#xff0c;齐聚行业盛会。同期&#xff0c;“第十四届五洲钻石奖”也正式揭晓&#xff0c;7天酒店在众多品牌中脱颖而出&#xf…

【原创】使用Golang的电商搜索技术架构实现

作者&#xff1a;黑夜路人 时间&#xff1a;2022年11月 一、背景&#xff1a; 现在搜索技术已经是非常主流的应用技术&#xff0c;各种优秀的索引开源软件已经很普遍了&#xff0c;比如 Lucene/Solr/Elasticsearch 等等主流搜索索引开源软件&#xff0c;让我们搭建一个优秀的…

项目管理如何有效进行?看这篇就够了

先放上一个项目管理流程图&#xff0c;纯手打&#xff0c;图有点长。 ​ 上面的流程图其实就已经清楚概括了项目管理的流程框架&#xff0c;按照这个逻辑去推进&#xff0c;一般来说就可以达到标准化的要求了。 将以上流程细化成具体的项目管理场景&#xff0c;大致可以分为7大…

【Java面试八股文宝典之基础篇】备战2023 查缺补漏 你越早准备 越早成功!!!——Day08

大家好&#xff0c;我是陶然同学&#xff0c;软件工程大三明年实习。认识我的朋友们知道&#xff0c;我是科班出身&#xff0c;学的还行&#xff0c;但是对面试掌握不够&#xff0c;所以我将用这100多天更新Java面试题&#x1f643;&#x1f643;。 不敢苟同&#xff0c;相信大…

6种MySQL数据库平滑扩容方案剖析

1. 扩容方案剖析 1.1 扩容问题 在项目初期&#xff0c;我们部署了三个数据库A、B、C&#xff0c;此时数据库的规模可以满足我们的业务需求。为了将数据做到平均分配&#xff0c;我们在Service服务层使用uid%3进行取模分片&#xff0c;从而将数据平均分配到三个数据库中。 如…

网络丢包,网络延迟?这款神器帮你搞定所有!

常用的 ping&#xff0c;tracert&#xff0c;nslookup 一般用来判断主机的网络连通性&#xff0c;其实 Linux 下有一个更好用的网络联通性判断工具&#xff0c;它可以结合ping nslookup traceroute 来判断网络的相关特性&#xff0c;这个命令就是 mtr。 mtr 全称 my tracerout…

2022年“移动云杯”算力网络应用创新大赛圆满落幕,百万大奖揭晓!

11 月 17-18 日&#xff0c;2022 年移动云开发者技术论坛暨“移动云杯”算力网络应用创新大赛总决赛在苏州举行。活动现场公布了 2022 年“移动云杯”算力网络应用创新大赛总决赛获奖名单。同时重磅发布了移动云 openAPI 2.0、首届移动云量子计算大赛。 三大赛道齐发力&#xf…

2022ICPC 网络赛第二场 E An Interesting Sequence

You should generate a sequence of positive integers of length n. Of course,this sequence needs to meet some requirements. ∀ i∈[1,n] ai​>1 ∀ i∈[2,n] gcd(ai−1​,ai​)1 a1​k(k>1) gcd(x,y) means the greatest common divisor of x and y. You n…

启明智显分享|4.3寸智能串口屏应用于充电桩

据数据显示&#xff0c;全球新能源汽车销量正大幅度增长&#xff0c;全球汽车电动化渗透率也由0.8%增长到7.74%&#xff0c;这不仅意味着汽车产业电动化浪潮的来临&#xff0c;也证实了我国新能源汽车行业正处于高速发展状态。随着电动汽车销售量与保有量的迅速增长&#xff0c…

linux线程互斥

文章目录多线程执行的问题Linux线程互斥要解决以上问题&#xff0c;需要做到三点&#xff1a;多线程执行的问题 先看一段代码 int tickets10000;void* buytickets(void* args) {char* name (char*)args;while(1){if(tickets>0){cout<<name<<"] tickets:…