一.SparkStreaming
163.SparkStreaming概述


 
 Spark Streaming is an extension of the core Spark API that  
 
 
 enables scalable, high-throughput, fault-tolerant stream  
 
 
 processing of live data streams.  
 
 
 Spark Streaming 
 是核心 
 Spark API 
 的扩展,支持实时数据流的  
 
 
 可扩展、高吞吐量、容错流处理。  
 
 
 Spark Streaming 
 用于流式数据的处理。 
 Spark Streaming 
 支持  
 
 
 的数据输入源很多,例如: 
 Kafka 
 、 
 Flume 
 、 
 HDFS 
 、 
 Kinesis 
 和 
 TCP  
 
 
 套接字等等。数据输入后可以用 
 Spark 
 的高级函数(如 
 map 
 、  
 
 
 reduce 
 、 
 join 
 和 
 window 
 等进行运算。而结果也能保存在很多地方,  
 
 
 如 
 HDFS 
 ,数据库和实时仪表板等。还可以可以在数据流上应用  
 
 
 Spark 
 的机器学习和图形处理算法。  
 
 
 Spark Streaming 
 接收实时输入数据流,并将数据分为多个批  
 
 
 次,然后由 
 Spark 
 引擎进行处理,以批量生成最终结果流。在内部,  
 
 
 它的工作原理如下: 
 

164.SparkStreaming_架构

 
 背压机制 
 ( 
 了解 
 ) 
 Spark 1.5 
 以前版本,用户如果要限制 
 Receiver 
 的数据接收速  
 
 
 率,可以通过设置静态配制参数  
 
 
 “spark.streaming.receiver.maxRate” 
 的值来实现,此举虽然可以通  
 
 
 过限制接收速率,来适配当前的处理能力,防止内存溢出,但也会  
 
 
 引入其它问题。比如: 
 producer 
 数据生产高于 
 maxRate 
 ,当前集群  
 
 
 处理能力也高于 
 maxRate 
 ,这就会造成资源利用率下降等问题。  
 
 
 为了更好的协调数据接收速率与资源处理能力, 
 1.5 
 版本开始  
 
 
 Spark Streaming 
 可以动态控制数据接收速率来适配集群数据处理  
 
 
 能力。背压机制(即 
 Spark Streaming Backpressure 
 ) 
 :  
 根据  
 
 
 JobScheduler 
 反馈作业的执行信息来动态调整 
 Receiver 
 数据接收  
 
 
 率。  
 
 
 通过属性 
 “spark.streaming.backpressure.enabled” 
 来控制是  
 
 
 否启用 
 backpressure 
 机制,默认值 
 false 
 ,即不启用。 
 
165.SparkStreaming_创建项目

 
 <dependency>  
 
 
 
 <groupId> 
 org.apache.spark 
 </groupId>  
 
 
 
 <artifactId> 
 spark-core_2.12 
 </artifactId>  
 
 
 
 <version> 
 3.2.1 
 </version>  
 
 
 </dependency>  
 
 
 <dependency>  
 
 
 <groupId> 
 org.apache.spark 
 </groupId>  
 
 
 <artifactId> 
 spark 
 
 
 streaming_2.12 
 </artifactId>  
 
 
 <version> 
 3.2.1 
 </version>  
 
 
 </dependency> 
 
 
166.SparkStreaming_WORDCOUNT

 
 package  
 com 
 . 
 itbaizhan 
 . 
 streaming  
 
 
 import  
 org 
 . 
 apache 
 . 
 spark 
 . 
 SparkConf  
 
 
 import  
 org 
 . 
 apache 
 . 
 spark 
 . 
 streaming 
 . 
 dstream 
 .  
 
 
 { 
 DStream 
 ,  
 ReceiverInputDStream 
 }  
 
 
 import  
 org 
 . 
 apache 
 . 
 spark 
 . 
 streaming 
 .{ 
 Seconds 
 ,  
 
 
 StreamingContext 
 }  
 
 
 object  
 StreamingWordCount 
  {  
 
 
 
 def  
 main 
 ( 
 args 
 :  
 Array 
 [ 
 String 
 ]):  
 Unit  
 = 
  {  
 
 
 //1. 
 初始化 
 SparkConf 
 类的对象  
 
 
 val  
 conf 
 :  
 SparkConf  
 =  
 new  
 SparkConf 
 ()  
 
 
     . 
 setMaster 
 ( 
 "local[*]" 
 )  
 
 
     . 
 setAppName 
 ( 
 "StreamingWordCount" 
 )  
 
 
 //2. 
 创建 
 StreamingContext 
 对象  
 
 
 val  
 ssc  
 =  
 new  
 StreamingContext 
 ( 
 conf 
 ,  
 
 
 Seconds 
 ( 
 5 
 ))  
 
 
 //3. 
 通过监控 
 node1 
 的 
 9999 
 端口创建 
 DStream 
 对象  
 
 
 val  
 lines 
 :  
 ReceiverInputDStream 
 [ 
 String 
 ]  
 
 
 =  
 
 
 1  
 
 
 2  
 
 
 3  
 
 
 4  
 
 
 5  
 
 
 6  
 
 
 7  
 
 
 8  
 
 
 9  
 
 
 10  
 
 
 11  
 
 
 12  
 
 
 13  
 
 
 14  
 
 
 15  
 
 
 16  
 
 
 7 
 测试  
 
 
 1  
 
 
 在 
 node1 
 上  
 
 
 2  
 
 
 在 
 IDEA 
 中运行程序  
 
 
 3  
 
 
 在 
 node1 
 上  
 
 
 4  
 
 
 查看 
 IDEA 
 控制台  
 
 
 ssc 
 . 
 socketTextStream 
 ( 
 "node1" 
 ,  
 9999 
 )  
 
 
 //4. 
 将每一行数据做切分,形成一个个单词  
 
 
 val  
 wordsDS 
 :  
 DStream 
 [ 
 String 
 ]  
 =  
 
 
 lines 
 . 
 flatMap 
 ( 
 _ 
 . 
 split 
 ( 
 " " 
 ))  
 
 
 //5.word=>(word,1)  
 
 
 val  
 wordOne 
 :  
 DStream 
 [( 
 String 
 ,  
 Int 
 )]  
 =  
 
 
 wordsDS 
 . 
 map 
 (( 
 _ 
 ,  
 1 
 ))  
 
 
 //6. 
 将相同的 
 key 
 的 
 value 
 做聚合加  
 
 
 val  
 wordCount 
 :  
 DStream 
 [( 
 String 
 ,  
 Int 
 )]  
 =  
 
 
 wordOne 
 . 
 reduceByKey 
 ( 
 _  
 +  
 _ 
 )  
 
 
 //7. 
 打印输出  
 
 
 wordCount 
 . 
 print 
 ()  
 
 
 //8. 
 启动  
 
 
 ssc 
 . 
 start 
 ()  
 
 
 //9. 
 等待执行停止  
 
 
 ssc 
 . 
 awaitTermination 
 ()  
 
 
 }  
 
 
 } 
 
167.SparkStreaming_数据抽象


168.SparkStreaming_RDD队列创建DSTREAM




169.SparkStreaming_自定义数据源一
 
 需求:自定义数据源,实现监控指定的端口号,获取该端口号  
 
 
 内容。  
 
 
 需要继承 
 Receiver 
 ,并实现 
 onStart 
 、 
 onStop 
 方法来自定义数据源采集。  
 
 
 package  
 com 
 . 
 itbaizhan 
 . 
 streaming  
 
 
 import  
 org 
 . 
 apache 
 . 
 spark 
 . 
 storage 
 . 
 StorageLevel  
 
 
 import  
 
 
 org 
 . 
 apache 
 . 
 spark 
 . 
 streaming 
 . 
 receiver 
 . 
 Receiver  
 
 
 import  
 java 
 . 
 io 
 .{ 
 BufferedReader 
 ,  
 
 
 InputStreamReader 
 }  
 
 
 import  
 java 
 . 
 net 
 . 
 Socket  
 
 
 import  
 java 
 . 
 nio 
 . 
 charset 
 . 
 StandardCharsets  
 
 
 1  
 
 
 2  
 
 
 3  
 
 
 4  
 
 
 5  
 
 
 6  
 
 
 7  
 
 
 8  
 
 
 9  
 
 
 13 
 class  
 ReceiverCustomer 
 ( 
 host 
 :  
 String 
 ,  
 port 
 :  
 
 
 Int 
 )  
 extends  
 Receiver 
 [ 
 String 
 ]  
 
 
 ( 
 StorageLevel 
 . 
 MEMORY_ONLY 
 ) {  
 
 
 
 // 
 最初启动的时候,调用该方法  
 
 
 
 // 
 作用:读数据并将数据发送给 
 Spark  
 
 
 
 override def  
 onStart 
 ():  
 Unit  
 = 
  {  
 
 
 new  
 Thread 
 ( 
 "Socket Receiver" 
 ) {  
 
 
 override def  
 run 
 () {  
 
 
 receive 
 ()  
 
 
     }  
 
 
   }. 
 start 
 ()  
 
 
 }  
 
 
 
 override def  
 onStop 
 ():  
 Unit  
 = 
  {}  
 
 
 
 // 
 读数据并将数据发送给 
 Spark  
 
 
 
 def  
 receive 
 ():  
 Unit  
 = 
  {  
 
 
 // 
 创建一个 
 Socket  
 
 
 var  
 socket 
 :  
 Socket  
 =  
 new  
 Socket 
 ( 
 host 
 ,  
 
 
 port 
 )  
 
 
 // 
 定义一个变量,用来接收端口传过来的数据  
 
 
 var  
 input 
 :  
 String  
 =  
 null  
 
 
 // 
 创建一个 
 BufferedReader 
 用于读取端口传来的数  
 
 
 据  
 
 
 val  
 reader  
 =  
 new  
 BufferedReader 
 ( 
 new  
 
 
 InputStreamReader 
 ( 
 socket 
 . 
 getInputStream 
 ,  
 
 
 StandardCharsets 
 . 
 UTF_8 
 ))  
 
 
 // 
 读取数据  
 
 
 input  
 =  
 reader 
 . 
 readLine 
 ()  
 
 
 // 
 当 
 receiver 
 没有关闭并且输入数据不为空,则循环  
 
 
 发送数据给 
 Spark  
 
 
 while 
  ( 
 ! 
 isStopped 
 ()  
 &&  
 input  
 !=  
 null 
 ) {  
 
 
 store 
 ( 
 input 
 )  
 
 
 input  
 =  
 reader 
 . 
 readLine 
 ()  
 
 
 10  
 
 
 11  
 
 
 12  
 
 
 13  
 
 
 14  
 
 
 15  
 
 
 16  
 
 
 17  
 
 
 18  
 
 
 19  
 
 
 20  
 
 
 21  
 
 
 22  
 
 
 23  
 
 
 24  
 
 
 25  
 
 
 26  
 
 
 27  
 
 
 28  
 
 
 29  
 
 
 30  
 
 
 31  
 
 
 32  
 
 
 33  
 
 
 34  
 
 
 14 
 使用自定义的数据源采集数据  
 
 
   }  
 
 
 // 
 跳出循环则关闭资源  
 
 
 reader 
 . 
 close 
 ()  
 
 
 socket 
 . 
 close 
 ()  
 
 
 // 
 重启任务  
 
 
 restart 
 ( 
 "restart" 
 )  
 
 
 }  
 
 
 } 
 
 
170.SparkStreaming_自定义数据源二
 
 package  
 com 
 . 
 itbaizhan 
 . 
 streaming  
 
 
 import  
 org 
 . 
 apache 
 . 
 spark 
 . 
 SparkConf  
 
 
 import  
 org 
 . 
 apache 
 . 
 spark 
 . 
 streaming 
 .{ 
 Seconds 
 ,  
 
 
 StreamingContext 
 }  
 
 
 object  
 CustomerSource 
  {  
 
 
 
 def  
 main 
 ( 
 args 
 :  
 Array 
 [ 
 String 
 ]):  
 Unit  
 = 
  {  
 
 
 //1. 
 初始化 
 Spark 
 配置信息  
 
 
 val  
 sparkConf  
 =  
 new  
 SparkConf 
 ()  
 
 
     . 
 setMaster 
 ( 
 "local[*]" 
 )  
 
 
     . 
 setAppName 
 ( 
 "CustomerSource" 
 )  
 
 
 //2. 
 初始化  
 
 
 val  
 ssc  
 =  
 new  
 
 
 StreamingContext 
 ( 
 sparkConf 
 ,  
 Seconds 
 ( 
 5 
 ))  
 
 
 //3. 
 创建自定义 
 receiver 
 的 
 Streaming  
 
 
 val  
 lines  
 =  
 ssc 
 . 
 receiverStream 
 ( 
 new  
 
 
 ReceiverCustomer 
 ( 
 "node1" 
 ,  
 9999 
 ))  
 
 
 lines 
 . 
 print 
 ()  
 
 
 //4. 
 启动  
 
 
 ssc 
 . 
 start 
 ()  
 
 
 ssc 
 . 
 awaitTermination 
 ()  
 
 
 1  
 
 
 2  
 
 
 3  
 
 
 4  
 
 
 5  
 
 
 6  
 
 
 7  
 
 
 8  
 
 
 9  
 
 
 10  
 
 
 11  
 
 
 12  
 
 
 13  
 
 
 14  
 
 
 15  
 
 
 16  
 
 
 17  
 
 
 18  
 
 
 19  
 
 
 15 
 测试  
 
 
 1  
 
 
 在 
 node1 
 上  
 
 
 2  
 
 
 在 
 IDEA 
 中运行程序  
 
 
 3  
 
 
 在 
 node1 
 上  
 
 
 4  
 
 
 查看 
 IDEA 
 控制台  
 
 
 实时效果反馈  
 
 
 1.  
 关于 
 SparkStreaming 
 接收器自定义数据源的描述,错误的  
 
 
 是:  
 
 
 A  
 
 
 需要继承 
 Receiver 
 ,并实现 
 onStart 
 、 
 onStop 
 方法来自定义  
 
 
 数据源采集。  
 
 
 B  
 
 
 Xxx extends Receiver[String](StorageLevel.MEMORY_ONLY)  
 
 
 接收到数据仅保存在  
 
 
 内存中。  
 
 
 C  
 
 
 onStart()  
 
 
 最初启动的时候,调用该方法;作用是读数据并将数  
 
 
 据发给 
 Spark 
 。  
 
 
 D  
 
 
 onStop()  
 
 
 不能空实现。  
 
 
 答案:  
 
 
 1=>D  
 可以空实现  
 
 
 SparkStreaming_DStream 
 无状态转换  
 
 
 }  
 
 
 }  
 
 
 20  
 
 
 21  
 
 
 [root@node1 ~] 
 # nc -lk 9999  
 
 
 1  
 
 
 [root@node1 ~] 
 # nc -lk 9999  
 
 
 aa  
 
 
 bb  
 
 
 cc 
 
 
171.SparkStreaming_DSTREAM无状态转换

172.SparkStreaming_DSTREAM无状态转换transform


173.SparkStreaming_DSTREAM有状态转换


174.SparkStreaming_窗口操作reducebykeyandwidow概述

 
 //reduceFunc– 
 结合和交换 
 reduce 
 函数  
 
 
 //windowDuration– 
 窗口长度;必须是此数据流批处理间  
 
 
 隔的倍数  
 
 
 //slideDuration– 
 窗口的滑动间隔 
 , 
 即新数据流生成 
 RDD  
 
 
 的间隔  
 
 
 def  
 reduceByKeyAndWindow 
 (  
 
 
 reduceFunc 
 : ( 
 V 
 ,  
 V 
 )  
 =>  
 V 
 ,  
 
 
 windowDuration 
 :  
 Duration 
 ,  
 
 
 slideDuration 
 :  
 Duration  
 
 
 ):  
 DStream 
 [( 
 K 
 ,  
 V 
 )]  
 =  
 ssc 
 . 
 withScope 
  {  
 
 
 //partitioner– 
 用于控制新数据流中每个 
 RDD 
 分区的  
 
 
 分区器  
 
 
 
 reduceByKeyAndWindow 
 ( 
 reduceFunc 
 ,  
 
 
 windowDuration 
 ,  
 slideDuration 
 ,  
 
 
 defaultPartitioner 
 ())  
 
 
 } 
 
175.SparkStreaming_窗口操作reducebykeyandwidow实战



176.SparkStreaming_窗口操作reducebykeyandwidow优化


177.SparkStreaming_窗口操作WINDOW



178.SparkStreaming_输出



179.SparkStreaming_优雅关闭一
 
 流式任务需要 
 7*24 
 小时执行,但是有时涉及到升级代码需要主  
 
 
 动停止程序,但是分布式程序,没办法做到一个个进程去杀死,所  
 
 
 以配置优雅的关闭就显得至关重要了。使用外部文件系统来控制内  
 
 
 部程序关闭。  
 
 
 package  
 com 
 . 
 itbaizhan 
 . 
 streaming  
 
 
 import  
 org 
 . 
 apache 
 . 
 spark 
 . 
 SparkConf  
 
 
 import  
 
 
 org 
 . 
 apache 
 . 
 spark 
 . 
 streaming 
 . 
 dstream 
 . 
 ReceiverI  
 
 
 nputDStream  
 
 
 import  
 org 
 . 
 apache 
 . 
 spark 
 . 
 streaming 
 .{ 
 Seconds 
 ,  
 
 
 StreamingContext 
 }  
 
 
 object  
 StreamingStopDemo 
  {  
 
 
 
 def  
 createSSC 
 ():  
 StreamingContext  
 = 
  {  
 
 
 val  
 sparkConf 
 :  
 SparkConf  
 =  
 new  
 
 
 SparkConf 
 (). 
 setMaster 
 ( 
 "local[*]" 
 ). 
 setAppName  
 
 
 ( 
 "StreamingStop" 
 )  
 
 
 // 
 设置优雅的关闭  
 
 
 
 sparkConf 
 . 
 set 
 ( 
 "spark.streaming.stopGraceful  
 
 
 lyOnShutdown" 
 ,  
 "true" 
 )  
 
 
 1  
 
 
 2  
 
 
 3  
 
 
 4  
 
 
 5  
 
 
 6  
 
 
 7  
 
 
 8  
 
 
 9  
 
 
 34     
 
 
 val  
 ssc  
 =  
 new  
 
 
 StreamingContext 
 ( 
 sparkConf 
 ,  
 Seconds 
 ( 
 5 
 ))  
 
 
 ssc 
 . 
 checkpoint 
 ( 
 "./ckp" 
 )  
 
 
 ssc  
 
 
 }  
 
 
 
 def  
 main 
 ( 
 args 
 :  
 Array 
 [ 
 String 
 ]):  
 Unit  
 = 
  {  
 
 
 val  
 ssc 
 :  
 StreamingContext  
 =  
 
 
 StreamingContext 
 . 
 getActiveOrCreate 
 ( 
 "./ckp" 
 ,  
 
 
 ()  
 =>  
 createSSC 
 ())  
 
 
 new  
 Thread 
 ( 
 new  
 
 
 StreamingStop 
 ( 
 ssc 
 )). 
 start 
 ()  
 
 
 val  
 line 
 :  
 ReceiverInputDStream 
 [ 
 String 
 ]  
 =  
 
 
 ssc 
 . 
 socketTextStream 
 ( 
 "node1" 
 ,  
 9999 
 )  
 
 
 line 
 . 
 print 
 ()  
 
 
 ssc 
 . 
 start 
 ()  
 
 
 ssc 
 . 
 awaitTermination 
 ()  
 
 
 }  
 
 
 }  
 
 
 10  
 
 
 11  
 
 
 12  
 
 
 13  
 
 
 14  
 
 
 15  
 
 
 16  
 
 
 17  
 
 
 18  
 
 
 19  
 
 
 20  
 
 
 21  
 
 
 22  
 
 
 package  
 com 
 . 
 itbaizhan 
 . 
 streaming  
 
 
 import  
 org 
 . 
 apache 
 . 
 hadoop 
 . 
 conf 
 . 
 Configuration  
 
 
 import  
 org 
 . 
 apache 
 . 
 hadoop 
 . 
 fs 
 .{ 
 FileSystem 
 ,  
 
 
 Path 
 }  
 
 
 import  
 org 
 . 
 apache 
 . 
 spark 
 . 
 streaming 
 .  
 
 
 { 
 StreamingContext 
 ,  
 StreamingContextState 
 }  
 
 
 import  
 java 
 . 
 net 
 . 
 URI  
 
 
 class  
 StreamingStop 
 ( 
 ssc 
 :  
 StreamingContext 
 )  
 
 
 extends  
 Runnable 
  {  
 
 
 
 override def  
 run 
 ():  
 Unit  
 = 
  {  
 
 
 val  
 fs 
 :  
 FileSystem  
 =  
 FileSystem 
 . 
 get 
 ( 
 new  
 
 
 URI 
 ( 
 "hdfs://node2:9820" 
 ),  
 
 
 new  
 Configuration 
 (),  
 "root" 
 )  
 
 
 1  
 
 
 2  
 
 
 3  
 
 
 4  
 
 
 5  
 
 
 6  
 
 
 7  
 
 
 8  
 
 
 9  
 
 
 35 
 测试  
 
 
 1  
 
 
 启动 
 hadoop 
 集群  
 
 
 2  
 
 
 在 
 node1 
 上: 
 nc -lk 9999  
 
 
 3  
 
 
 运行程序  
 
 
 4  
 
 
 在 
 node2 
 :  
 
 
 5  
 
 
 在 
 node1 
 上:  
 
 
 while 
  ( 
 true 
 ) {  
 
 
 try  
 
 
 Thread 
 . 
 sleep 
 ( 
 5000 
 )  
 
 
 catch 
  {  
 
 
 case  
 e 
 :  
 InterruptedException  
 =>  
 
 
 e 
 . 
 printStackTrace 
 ()  
 
 
     }  
 
 
 val  
 state 
 :  
 StreamingContextState  
 =  
 
 
 ssc 
 . 
 getState  
 
 
 if 
  ( 
 state  
 ==  
 
 
 StreamingContextState 
 . 
 ACTIVE 
 ) {  
 
 
 val  
 bool 
 :  
 Boolean  
 =  
 fs 
 . 
 exists 
 ( 
 new  
 
 
 Path 
 ( 
 "hdfs://node2:9820/stopSpark" 
 ))  
 
 
 if 
  ( 
 bool 
 ) {  
 
 
 ssc 
 . 
 stop 
 ( 
 stopSparkContext  
 =  
 true 
 ,  
 
 
 stopGracefully  
 =  
 true 
 )  
 
 
 System 
 . 
 exit 
 ( 
 0 
 )  
 
 
       }  
 
 
     }  
 
 
   }  
 
 
 }  
 
 
 } 
 
 
180.SparkStreaming_优雅关闭二


181.SparkStreaming_优雅关闭测试

182.SparkStreaming_整合KAFKA模式

183.SparkStreaming_整合kafka开发一
 
 导入依赖:  
 
 
 代码编写:  
 
 
 <dependency>  
 
 
 <groupId> 
 org.apache.spark 
 </groupId>  
 
 
 <artifactId> 
 spark-streaming-kafka-0-  
 
 
 10_2.12 
 </artifactId>  
 
 
 <version> 
 3.2.1 
 </version>  
 
 
 </dependency>  
 
 
 <dependency>  
 
 
 
 <groupId> 
 com.fasterxml.jackson.core 
 </groupI  
 
 
 d>  
 
 
 <artifactId> 
 jackson-core 
 </artifactId>  
 
 
 <version> 
 2.12.7 
 </version>  
 
 
 </dependency>  
 
 
 1  
 
 
 2  
 
 
 3  
 
 
 4  
 
 
 5  
 
 
 6  
 
 
 7  
 
 
 8  
 
 
 9  
 
 
 10  
 
 
 package  
 com 
 . 
 itbaizhan 
 . 
 streaming  
 
 
 import  
 org 
 . 
 apache 
 . 
 kafka 
 . 
 clients 
 . 
 consumer 
 .  
 
 
 { 
 ConsumerConfig 
 ,  
 ConsumerRecord 
 }  
 
 
 import  
 org 
 . 
 apache 
 . 
 spark 
 . 
 SparkConf  
 
 
 import  
 org 
 . 
 apache 
 . 
 spark 
 . 
 streaming 
 . 
 dstream 
 .  
 
 
 { 
 DStream 
 ,  
 InputDStream 
 }  
 
 
 1  
 
 
 2  
 
 
 3  
 
 
 4  
 
 
 40 
 import  
 org 
 . 
 apache 
 . 
 spark 
 . 
 streaming 
 . 
 kafka010 
 .  
 
 
 { 
 ConsumerStrategies 
 ,  
 KafkaUtils 
 ,  
 
 
 LocationStrategies 
 }  
 
 
 import  
 org 
 . 
 apache 
 . 
 spark 
 . 
 streaming 
 .{ 
 Seconds 
 ,  
 
 
 StreamingContext 
 }  
 
 
 object  
 DirectAPIDemo 
  {  
 
 
 
 def  
 main 
 ( 
 args 
 :  
 Array 
 [ 
 String 
 ]):  
 Unit  
 = 
  {  
 
 
 //1. 
 创建 
 SparkConf  
 
 
 val  
 sparkConf 
 :  
 SparkConf  
 =  
 new  
 
 
 SparkConf 
 ()  
 
 
     . 
 setMaster 
 ( 
 "local[*]" 
 )  
 
 
     . 
 setAppName 
 ( 
 "DirectAPIDemo" 
 )  
 
 
 //2. 
 创建 
 StreamingContext  
 
 
 val  
 ssc  
 =  
 new  
 
 
 StreamingContext 
 ( 
 sparkConf 
 ,  
 Seconds 
 ( 
 3 
 ))  
 
 
 //3. 
 定义 
 Kafka 
 参数  
 
 
 val  
 kafkaPara 
 :  
 Map 
 [ 
 String 
 ,  
 Object 
 ]  
 =  
 
 
 Map 
 [ 
 String 
 ,  
 Object 
 ](  
 
 
 ConsumerConfig 
 . 
 BOOTSTRAP_SERVERS_CONFIG  
 ->  
 
 
 "node2:9092,node3:9092,node4:9092" 
 ,  
 
 
 ConsumerConfig 
 . 
 GROUP_ID_CONFIG  
 ->  
 
 
 "itbaizhan" 
 ,  
 
 
 "key.deserializer"  
 ->  
 
 
 "org.apache.kafka.common.serialization.Strin  
 
 
 gDeserializer" 
 ,  
 
 
 "value.deserializer"  
 ->  
 
 
 "org.apache.kafka.common.serialization.Strin  
 
 
 gDeserializer"  
 
 
   )  
 
 
 //4. 
 读取 
 Kafka 
 数据创建 
 DStream  
 
 
 5  
 
 
 6  
 
 
 7  
 
 
 8  
 
 
 9  
 
 
 10  
 
 
 11  
 
 
 12  
 
 
 13  
 
 
 14  
 
 
 15  
 
 
 16  
 
 
 17  
 
 
 18  
 
 
 19  
 
 
 20  
 
 
 21  
 
 
 22  
 
 
 41 
 SparkStreaming_ 
 整合 
 Kafka 
 测试  
 
 
 val  
 kafkaDStream 
 :  
 
 
 InputDStream 
 [ 
 ConsumerRecord 
 [ 
 String 
 ,  
 String 
 ]]  
 
 
 =  
 
 
 KafkaUtils 
 . 
 createDirectStream 
 [ 
 String 
 ,  
 
 
 String 
 ]( 
 ssc 
 ,  
 
 
 // 
 由框架自动选择位置匹配  
 
 
 LocationStrategies 
 . 
 PreferConsistent 
 ,  
 
 
 // 
 消费者策略 
 主题: 
 topicKafka,kafka 
 参  
 
 
 数: 
 kafkaPara  
 
 
 ConsumerStrategies 
 . 
 Subscribe 
 [ 
 String 
 ,  
 
 
 String 
 ]( 
 Set 
 ( 
 "topicKafka" 
 ),  
 kafkaPara 
 ))  
 
 
 //5. 
 将每条消息的 
 KV 
 取出  
 
 
 //val valueDStream: DStream[String] =  
 
 
 kafkaDStream.map(record => record.value())  
 
 
 val  
 valueDStream 
 :  
 DStream 
 [ 
 String 
 ]  
 =  
 
 
 kafkaDStream 
 . 
 map 
 ( 
 _ 
 . 
 value 
 ())  
 
 
 //6. 
 计算 
 WordCount  
 
 
 valueDStream 
 . 
 flatMap 
 ( 
 _ 
 . 
 split 
 ( 
 " " 
 ))  
 
 
     . 
 map 
 (( 
 _ 
 ,  
 1 
 ))  
 
 
     . 
 reduceByKey 
 ( 
 _  
 +  
 _ 
 )  
 
 
     . 
 print 
 ()  
 
 
 //7. 
 开启任务  
 
 
 ssc 
 . 
 start 
 ()  
 
 
 ssc 
 . 
 awaitTermination 
 ()  
 
 
 }  
 
 
 } 
 
 
184.SparkStreaming_整合kafka开发二

185.SparkStreaming_整合kafka测试






















