大数据开发之Spark(spark streaming)

第 1 章:SparkStreaming概述

1.1 spark streaming是什么

spark streaming用于流式数据的处理。
spark streaming支持的数据源很多,例如:kafka、flume、hdfs等。
数据输入后可以用spark的高度抽象原语如:map、reduce、join、window等进行计算。
而结果也能保存在很多地方,如hdfs、数据库等。
在这里插入图片描述

1.2 spark streaming框架原理

dstream是什么?
sparkcore->rdd
sparksql->dataframe、dataset
spark streaming使用离散化流作为抽象表示,叫做dstream。
dsteam是随时间推移而受到的数据的序列。
在dsteam内部,每个时间区间受到的数据都作为rdd存在,而dstream是由这些rdd所组合成的序列。
简单来说,dstream就是对rdd在实时数据处理场景的一种封装。
在这里插入图片描述

1.2.2 架构图

整体架构图
在这里插入图片描述

spark streaming架构图
在这里插入图片描述

1.2.3 背压机制

spark 1.5以前版本,用户如果要限制receiver的数据接收速率,可以通过设置静态配置参数“spark.streaming.receiver.maxrate”的值来实现,此举虽然可以通过限制接收速率,来适配当前的处理能力,防止内存溢出,单也会引入其它问题。比如:producer数据生产高于maxrate,当前集群处理能力也高于maxrate,这就会造成资源利用率下降等问题。
为了更好的协调数据接收速率与资源处理能力,1.5版本开始spark streaming可以动态控制速率来适配集群数据处理能力。背压机制:根据jobscheduler反馈作业的执行信息来动态调整receiver数据接受率。
通过属性“spark.streaming.backpressure.enabled”来控制是否启动背压机制,默认值false,即不启用。

1.3 spark steaming 特点

易用
在这里插入图片描述

容错
在这里插入图片描述

易整合到spark体系
在这里插入图片描述

第 2 章:dstream入门

2.1 wordcount案例入门

需求:使用Netcat工具向9999端口不断地发送数据,通过sparkstreaming读取端口数据并统计不同单词出现的次数。
在这里插入图片描述

1、添加依赖

<dependencies>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.12</artifactId>
        <version>3.0.0</version>
    </dependency>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.12</artifactId>
        <version>3.0.0</version>
    </dependency>
</dependencies>

2、编写代码

package com.atguigu.sparkstreaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object SparkStreaming01_WordCount {

    def main(args: Array[String]): Unit = {

        //1.初始化Spark配置信息
        val sparkConf = new SparkConf().setAppName("SparkStreaming").setMaster("local[*]")

        //2.初始化SparkStreamingContext
        val ssc = new StreamingContext(sparkConf, Seconds(3))

        //3.通过监控端口创建DStream,读进来的数据为一行行
        val lineDStream = ssc.socketTextStream("hadoop102", 9999)

        //3.1 将每一行数据做切分,形成一个个单词
        val wordDStream = lineDStream.flatMap(_.split(" "))

        //3.2 将单词映射成元组(word,1)
        val wordToOneDStream = wordDStream.map((_, 1))

        //3.3 将相同的单词次数做统计
        val wordToSumDStream = wordToOneDStream.reduceByKey(_+_)

        //3.4 打印
        wordToSumDStream.print()

        //4 启动SparkStreamingContext
        ssc.start()
        // 将主线程阻塞,主线程不退出
        ssc.awaitTermination()
    }
}

3、更改日志打印级别
将log4j.properties文件添加到resources里面,就能更改打印日志的级别为error

log4j.rootLogger=error, stdout,R
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS}  %5p --- [%50t]  %-80c(line:%5L)  :  %m%n

log4j.appender.R=org.apache.log4j.RollingFileAppender
log4j.appender.R.File=../log/agent.log
log4j.appender.R.MaxFileSize=1024KB
log4j.appender.R.MaxBackupIndex=1

log4j.appender.R.layout=org.apache.log4j.PatternLayout
log4j.appender.R.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS}  %5p --- [%50t]  %-80c(line:%6L)  :  %m%n

4、启动程序并通过netcat发送数据

[atguigu@hadoop102 ~]$ nc -lk 9999
hello spark

5、在idea控制台输出如下内容

-------------------------------------------
Time: 1602731772000 ms
-------------------------------------------
(hello,1)
(spark,1)

2.2 wordcount解析

dstream是spark streaming的基础抽象,代表持续性的而数据流和经过各种spark算子操作后的结果数据流。
在内部实现上,每一批次的数据封装成一个rdd,一系列连续的rdd组成了dstream。对这些rdd的转换是由spark引擎来计算。
说明:dstream中批次与批次之间计算相互独立。如果批次设置时间小于计算时间会出现计算任务叠加情况,需要多分配资源。通常情况,批次设置时间要大于计算时间。
在这里插入图片描述

第 3 章:dstream创建

3.1 rdd队列

3.1.1 用法及说明

测试方式:
1、使用ssc.queuestream(queueofrdds)来创建dstream。
2、将每一个推送到这个队列中的rdd,都会作为dstream的一个批次处理。

3.1.2 案例实操

需求:循环创建几个rdd,将rdd放入队列。通过sparkstreaming创建dstream,计算wordcount。
在这里插入图片描述

1、编写代码

package com.atguigu.sparkstreaming

import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.collection.mutable

object SparkStreaming02_RDDStream {

    def main(args: Array[String]): Unit = {

        //1.初始化Spark配置信息
        val conf = new SparkConf().setAppName("SparkStreaming").setMaster("local[*]")

        //2.初始化SparkStreamingContext
        val ssc = new StreamingContext(conf, Seconds(4))

        //3.创建RDD队列
        val rddQueue = new mutable.Queue[RDD[Int]]()

        //4.创建QueueInputDStream
        // oneAtATime = true 默认,一次读取队列里面的一个数据
        // oneAtATime = false, 按照设定的批次时间,读取队列里面数据
        val inputDStream = ssc.queueStream(rddQueue, oneAtATime = false)

        //5.处理队列中的RDD数据
        val sumDStream = inputDStream.reduce(_+_)

        //6.打印结果
        sumDStream.print()

        //7.启动任务
        ssc.start()

        //8.循环创建并向RDD队列中放入RDD
        for (i <- 1 to 5) {
            rddQueue += ssc.sparkContext.makeRDD(1 to 5)
            Thread.sleep(2000)
        }

        ssc.awaitTermination()
    }
}

2、结果展示(oneatatime=false)

-------------------------------------------
Time: 1603347444000 ms
-------------------------------------------
15

-------------------------------------------
Time: 1603347448000 ms
-------------------------------------------
30

-------------------------------------------
Time: 1603347452000 ms
-------------------------------------------
30

说明:如果一个批次中由多个rdd进入队列,最终计算前都会合并到一个rdd计算。
在这里插入图片描述

3.2 自定义数据源接收器

3.2.1 用法及说明

需要继承receiver,并实现onstart、onstop方法来自定义数据源采集。
在这里插入图片描述

3.2.2 案例

需求:自定义数据源,实现监控某个端口号,获取该端口号内容。
1、使用自定义的数据源采集数据

package com.atguigu.sparkstreaming

import java.io.{BufferedReader, InputStreamReader}
import java.net.Socket
import java.nio.charset.StandardCharsets

import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.streaming.{Seconds, StreamingContext}

object SparkStreaming03_CustomerReceiver {

    def main(args: Array[String]): Unit = {

        //1.初始化Spark配置信息
        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkstreaming")

        //2.初始化SparkStreamingContext
        val ssc = new StreamingContext(sparkConf, Seconds(5))

        //3.创建自定义receiver的Streaming
        val lineDStream = ssc.receiverStream(new CustomerReceiver("hadoop102", 9999))

        //4.将每一行数据做切分,形成一个个单词
        val wordDStream = lineDStream.flatMap(_.split(" "))

        //5.将单词映射成元组(word,1)
        val wordToOneDStream = wordDStream.map((_, 1))

        //6.将相同的单词次数做统计
        val wordToSumDStream = wordToOneDStream.reduceByKey(_ + _)

        //7.打印
        wordToSumDStream.print()

        //8.启动SparkStreamingContext
        ssc.start()
        ssc.awaitTermination()
    }
}

2、自定义数据源

/**
* @param host : 主机名称
 * @param port : 端口号
 *  Receiver[String] :返回值类型:String
 *  StorageLevel.MEMORY_ONLY: 返回值存储级别
 */
class CustomerReceiver(host: String, port: Int) extends Receiver[String](StorageLevel.MEMORY_ONLY) {

    // receiver刚启动的时候,调用该方法,作用为:读数据并将数据发送给Spark
    override def onStart(): Unit = {
			 //在onStart方法里面创建一个线程,专门用来接收数据
        new Thread("Socket Receiver") {
            override def run() {
                receive()
            }
        }.start()
    }

    // 读数据并将数据发送给Spark
    def receive(): Unit = {

        // 创建一个Socket
        var socket: Socket = new Socket(host, port)

        // 字节流读取数据不方便,转换成字符流buffer,方便整行读取
        val reader = new BufferedReader(new InputStreamReader(socket.getInputStream, StandardCharsets.UTF_8))

        // 读取数据
        var input: String = reader.readLine()

        //当receiver没有关闭并且输入数据不为空,就循环发送数据给Spark
        while (!isStopped() && input != null) {
            store(input)
            input = reader.readLine()
        }

        // 如果循环结束,则关闭资源
        reader.close()
        socket.close()

        //重启接收任务
        restart("restart")
    }

    override def onStop(): Unit = {}
}

3、测试

[atguigu@hadoop102 ~]$ nc -lk 9999
hello spark

3.3 kafka数据源(面试、开发重点)

3.3.1 版本选型

receiverapi:需要一个专门的executor来接收数据,然后发送给其它的executor做计算。存在的问题:接收数据的executor和计算的executor速度会有所不同,特别在接收数据的executor速度大于计算的executor速度,会导致计算数据的节点内存溢出。早期版本中提供此方式,当前版本不适用。
directapi:是由计算的executor来主动消费kafka的数据,速度由自身控制。
在这里插入图片描述

注意:目前spark3.0.0以上版本只有direct模式。
http://spark.apache.org/docs/2.4.7/streaming-kafka-integration.html
在这里插入图片描述
http://spark.apache.org/docs/3.0.0/streaming-kafka-0-10-integration.html
总结:不同版本的offset存储位置。
0-8 receiverapi offset默认存储在:zookeeper中。
0-8 directapi offset默认存储在:checkpoint。手动维护:mysql等有事务的存储系统。
0-10 directapi offset默认存储在:_consumer_offsets系统主题。手动维护:mysql等有事务的存储系统。

3.3.2 kafka 0-10 direct模式

1、需求:通过sparkstreaming从kafka读取数据,并将读取过来的数据做简单计算,最终打印到控制台。
在这里插入图片描述

2、导入依赖

<dependency>
     <groupId>org.apache.spark</groupId>
     <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
     <version>3.0.0</version>
</dependency>

3、编写代码

package com.atguigu.sparkstreaming

import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object SparkStreaming04_DirectAuto {

    def main(args: Array[String]): Unit = {

        //1.创建SparkConf
        val sparkConf: SparkConf = new SparkConf().setAppName("sparkstreaming").setMaster("local[*]")

        //2.创建StreamingContext
        val ssc = new StreamingContext(sparkConf, Seconds(3))

        //3.定义Kafka参数:kafka集群地址、消费者组名称、key序列化、value序列化
        val kafkaPara: Map[String, Object] = Map[String, Object](
            ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",
            ConsumerConfig.GROUP_ID_CONFIG -> "atguiguGroup",
            ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",
            ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer]
        )

        //4.读取Kafka数据创建DStream
        val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
            ssc,
            LocationStrategies.PreferConsistent, //优先位置
            ConsumerStrategies.Subscribe[String, String](Set("testTopic"), kafkaPara)// 消费策略:(订阅多个主题,配置参数)
        )

        //5.将每条消息(KV)的V取出
        val valueDStream: DStream[String] = kafkaDStream.map(record => record.value())

        //6.计算WordCount
        valueDStream.flatMap(_.split(" "))
            .map((_, 1))
            .reduceByKey(_ + _)
            .print()

        //7.开启任务
        ssc.start()
        ssc.awaitTermination()
    }
}

4、测试
1)分别启动zookeeper和kafka集群

[atguigu@hadoop102 ~]$ zk.sh start
[atguigu@hadoop102 ~]$ kf.sh start

2)创建一个kafka的topic主题testtopic,两个分区

[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181/kafka  --create --replication-factor 1 --partitions 2 --topic testTopic

3)查看topic列表

[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181/kafka -list

4)查看topic详情

[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181/kafka \
--describe --topic testTopic

5)创建kafka生产者

[atguigu@hadoop102 kafka]$ bin/kafka-console-producer.sh \
--broker-list hadoop102:9092 --topic testTopic

Hello spark
Hello spark

6)创建kafka消费组

[atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh \
--bootstrap-server hadoop102:9092 --from-beginning --topic testTopic

5、查看_consumer_offsets主题中存储的offset

[atguigu@hadoop102 kafka]$ bin/kafka-consumer-groups.sh --bootstrap-server hadoop102:9092 --describe --group atguiguGroup

GROUP        TOPIC    PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  
atguiguGroup    testTopic       0              13                 13          

在生产者中生产数据,再次观察offset变换。

第 4 章:dstream转换

dstream上的操作与rdd的类似,分为转换和输出两种,此外转换操作中还有一些比较特殊的原语,如:updatastatebykey()、transform()以及各种windows相关的原语。

4.1 无状态转化操作

就是把rdd转化操作应用到dstream每个批次上,每个批次相互独立,自己算自己的。

4.1.1 常规无状态转化操做

dstream的部分无状态转化操作列在了下表中,都是dstream自己的api。
注意:针对键值对的dstream转化操作,要添加import streamingcontext._才能咋scala中使用,比如reducebykey()。
在这里插入图片描述

需要记住的是,尽管这些函数看起来像作用在整个流上一样,但事实上每个dstream在内部都是由许多rdd批次组成,且无状态转化操作是分别应用到每个rdd(一个批次的数据)上的。

4.1.2 transform

需求:通过transform可以将dstream每一批次的数据直接转换为rdd的算子操作。
在这里插入图片描述

1、代码编写

package com.atguigu.sparkstreaming

import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object SparkStreaming05_Transform {

    def main(args: Array[String]): Unit = {

        //1 创建SparkConf
        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkstreaming")

        //2 创建StreamingContext
        val ssc = new StreamingContext(sparkConf, Seconds(3))

        //3 创建DStream
        val lineDStream: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop102", 9999)

        // 在Driver端执行,全局一次
        println("111111111:" + Thread.currentThread().getName)

        //4 转换为RDD操作
        val wordToSumDStream: DStream[(String, Int)] = lineDStream.transform(

            rdd => {
                // 在Driver端执行(ctrl+n JobGenerator),一个批次一次
                println("222222:" + Thread.currentThread().getName)

                val words: RDD[String] = rdd.flatMap(_.split(" "))
	
                val wordToOne: RDD[(String, Int)] = words.map(x=>{

                    // 在Executor端执行,和单词个数相同
                    println("333333:" + Thread.currentThread().getName)

                    (x, 1)
                })

                val result: RDD[(String, Int)] = wordToOne.reduceByKey(_ + _)

                result
            }
        )

        //5 打印
        wordToSumDStream.print

        //6 启动
        ssc.start()
        ssc.awaitTermination()
    }
}

2、测试

[atguigu@hadoop102 ~]$ nc -lk 9999
hello spark

4.2 由状态转换操作

4.2.1 updatestatebykey

updatestatebykey()用于键值对形式的dstream,可以记录历史批次状态。例如可以实现累加wordcount。
updatestatebykey()参数中需要传递一个函数,在函数内部可以根据需求对新数据和历史状态进行整合处理,返回一个新的dstream。
注意:使用Updatestatebykey需要对检查点目录进行配置,会使用检查点来保存状态。
checkpoint小文件过多。
checkpoint记录最后一次时间戳,再次启动的时候会把间隔时间的周期再执行一次。
1、需求:更新版的wordcount
2、编写代码

package com.atguigu.sparkstreaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}

object sparkStreaming06_updateStateByKey {

    // 定义更新状态方法,参数seq为当前批次单词次数,state为以往批次单词次数
    val updateFunc = (seq: Seq[Int], state: Option[Int]) => {
        // 当前批次数据累加
        val currentCount = seq.sum
        // 历史批次数据累加结果
        val previousCount = state.getOrElse(0)
        // 总的数据累加
        Some(currentCount + previousCount)
    }

    def createSCC(): StreamingContext = {

        //1 创建SparkConf
        val conf = new SparkConf().setMaster("local[*]").setAppName("sparkstreaming")

        //2 创建StreamingContext
        val ssc = new StreamingContext(conf, Seconds(3))

        ssc.checkpoint("./ck")

        //3 获取一行数据
        val lines = ssc.socketTextStream("hadoop102", 9999)

        //4 切割
        val words = lines.flatMap(_.split(" "))

        //5 统计单词
        val wordToOne = words.map(word => (word, 1))

        //6 使用updateStateByKey来更新状态,统计从运行开始以来单词总的次数
        val stateDstream = wordToOne.updateStateByKey[Int](updateFunc)

        stateDstream.print()

        ssc
    }

    def main(args: Array[String]): Unit = {

        val ssc: StreamingContext = StreamingContext.getActiveOrCreate("./ck",()=>createSCC())

        //7 开启任务
        ssc.start()
        ssc.awaitTermination()
    }
}

3、启动程序并向9999端口发送数据

[atguigu@hadoop102 ~]$ nc -lk 9999
hello atguigu

hello atguigu

4、结果展示

-------------------------------------------
Time: 1603441344000 ms
-------------------------------------------
(hello,1)
(atguigu,1)

-------------------------------------------
Time: 1603441347000 ms
-------------------------------------------
(hello,2)
(atguigu,2)

5、原理说明
在这里插入图片描述

4.2.2 winodwoperations

window operations可以设置窗口的大小和滑动窗口的间隔来动态的获取当前streaming的允许状态。所有基于窗口的操作都需要两个参数,分别为窗口时长以及滑动步长。
窗口时长:计算内容的时间范围;
滑动步长:隔多久触发一次计算。
注意:这两者都必须为采集批次大小的整数倍。
如下图所示wordcount案例:窗口大小为批次的2倍,滑动步等于批次大小。
窗口操作数据流解析
在这里插入图片描述

4.2.3 window

1、基本语法:window,基于对源dstream窗口的批次进行计算返回一个新的dstream。
2、需求:统计wordcount,3秒一个批次,窗口12秒,滑步6秒。
3、代码编写

package com.atguigu.sparkstreaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}

object SparkStreaming07_window {

    def main(args: Array[String]): Unit = {

        // 1 初始化SparkStreamingContext
        val conf = new SparkConf().setMaster("local[*]").setAppName("sparkstreaming")
        val ssc = new StreamingContext(conf, Seconds(3))

        // 2 通过监控端口创建DStream,读进来的数据为一行行
        val lines = ssc.socketTextStream("hadoop102", 9999)

        // 3 切割=》变换
        val wordToOneDStream = lines.flatMap(_.split(" "))
            .map((_, 1))

        // 4 获取窗口返回数据
        val wordToOneByWindow: DStream[(String, Int)] = wordToOneDStream.window(Seconds(12), Seconds(6))

        // 5 聚合窗口数据并打印
        val wordToCountDStream: DStream[(String, Int)] = wordToOneByWindow.reduceByKey(_+_)
        wordToCountDStream.print()

        // 6 启动=》阻塞
        ssc.start()
        ssc.awaitTermination()
    }
}

4、测试

[atguigu@hadoop102 ~]$ nc -lk 9999
hello

5、如果有多批数据进入窗口,最终也会通过window操作变成统一的rdd处理
在这里插入图片描述

4.2.4 reducebykeyandwindow

1、基本语法
reducebykeyandwindow(func,windowlength,slideinterval,[numtasks]):当在一个(k,v)对的dstream上调用此函数,会返回一个新的(k,v)对的dstream,此处通过对滑动窗口中批次数据使用reduce函数来整合每个key的value值。
2、需求:统计wordcount,3秒一个批次,窗口12秒,滑步6秒。
3、代码编写

package com.atguigu.sparkstreaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object SparkStreaming08_reduceByKeyAndWindow {

    def main(args: Array[String]): Unit = {

        // 1 初始化SparkStreamingContext
        val conf = new SparkConf().setAppName("sparkstreaming").setMaster("local[*]")
        val ssc = new StreamingContext(conf, Seconds(3))

        // 保存数据到检查点
        ssc.checkpoint("./ck")

        // 2 通过监控端口创建DStream,读进来的数据为一行行
        val lines = ssc.socketTextStream("hadoop102", 9999)

        // 3 切割=》变换
        val wordToOne = lines.flatMap(_.split(" "))
                         .map((_, 1))

        // 4 窗口参数说明: 算法逻辑,窗口12秒,滑步6秒
        val wordCounts = wordToOne.reduceByKeyAndWindow((a: Int, b: Int) => (a + b), Seconds(12), Seconds(6))

        // 5 打印
        wordCounts.print()

        // 6 启动=》阻塞
        ssc.start()
        ssc.awaitTermination()
    }
}

4、测试

[atguigu@hadoop102 ~]$ nc -lk 9999
hello atguigu

4.2.5 reducebykeyandwindow(反向reduce)

1、基本语法
reducebykeyandwindow(func,invfunc,windowlength,slideinterval,[numtasks]):这个函数是上述函数的变化版本,每个窗口的reduce值都是通过用前一个窗的reduce值来递增计算。通过reduce进入到滑动窗口数据并“反向reduce“离开窗口的旧数据来实现这个操作。一个例子是随着滑动窗口对keys的”加“”减“计数。通过前边介绍可以想到,这个函数只使用于”可逆的reduce函数“,也就是这些reduce函数有相应的”反reduce“函数(以参数invfunc形式传入)。如前述函数,reduce任务的数量通过可选参数来配置。
2、需求:统计wordcount,3秒一个批次,窗口12秒,滑步6秒。
3、代码编写

package com.atguigu.sparkstreaming

import org.apache.spark.{HashPartitioner, SparkConf}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object SparkStreaming09_reduceByKeyAndWindow_reduce {

    def main(args: Array[String]): Unit = {

        // 1 初始化SparkStreamingContext
        val conf = new SparkConf().setMaster("local[*]").setAppName("sparkstreaming")
        val ssc = new StreamingContext(conf, Seconds(3))

        // 保存数据到检查点
        ssc.checkpoint("./ck")

        // 2 通过监控端口创建DStream,读进来的数据为一行行
        val lines = ssc.socketTextStream("hadoop102", 9999)

        // 3 切割 =》变换
        val wordToOne = lines.flatMap(_.split(" "))
            .map((_, 1))

        // 4 窗口参数说明: 算法逻辑,窗口12秒,滑步6秒
        /*
        val wordToSumDStream: DStream[(String, Int)]= wordToOne.reduceByKeyAndWindow(
            (a: Int, b: Int) => (a + b),
            (x: Int, y: Int) => (x - y),
            Seconds(12),
            Seconds(6)
        )*/

        // 处理单词统计次数为0的问题
        val wordToSumDStream: DStream[(String, Int)]= wordToOne.reduceByKeyAndWindow(
            (a: Int, b: Int) => (a + b),
            (x: Int, y: Int) => (x - y),
            Seconds(12),
            Seconds(6),
            new HashPartitioner(2),
            (x:(String, Int)) => x._2 > 0
        )

        // 5 打印
        wordToSumDStream.print()

        // 6 启动=》阻塞
        ssc.start()
        ssc.awaitTermination()
    }
}

4.2.6 window的其它操作

1、countbywindow(windowlength,slideinterval):返回一个滑动窗口计数流中的元素个数
2、reducebywindow(func,windowlength,slideinterval):通过使用自定义函数整合滑动区间流元素来创建一个新的离散化数据流

第 5 章:dstream输出

dstream通常将数据输出到,外部数据库或屏幕上。
dstream与rdd中的惰性求值类似,如果一个dstream及其派生出的dstream都没有被执行输出操作,那么这些dstream就都不会被求值。如果streamingcontext中没有设定输出操作,整个context就都不会启动。
1、输出操作api如下:
1)saveastextfiles([prefix,[suffix]):以text文件形式存储这个dstream的内容。每一批次的存储文件名基于参数中的prefix和suffix。”prefix-time_in_ms[.suffix]“
2)saveasobjectfiles(prefix,[suffix]):以java对象序列化的方式将dstream中的数据保存为sequencefiles。每一批次的存储文件名基于参数中的为”prefix-time_in_ms[.suffix]“。
3)saveashadoopfiles(prefix,[suffix]):将stream中的数据保存为hadoop files。每一批次的存储文件名基于参数中的为”prefix-time_in_ms[.suffix]“。
注意:以上操作都是每一批次写出一次,会产生大量小文件,在生产环境,很少使用。
4)print():在允许流程序的驱动节点上打印dstream中的每一批次数据的最开始10个元素。这用于开发和调试。
5)foreachrdd(func):这是最通用的输出操作,即将函数func用于产生dstream的每一个rdd。其中参数传入的函数func应该实现将每一个rdd中数据推送到外部系统,如将rdd存入文件或者写入数据库。
在企业开发中通常采用foreachrdd(),它用来对dstream中的rdd进行任意计算。这和transform()有些类似,都可以让我们访问任意rdd。在foreachrdd()中,可以重用我们在spark中实现的所有行动操作(action 算子)。比如,常见的用例之一是把数据写到如mysql的外部数据库中。
2、foreachrdd代码实操

package com.atguigu.sparkstreaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object SparkStreaming10_output {

    def main(args: Array[String]): Unit = {

        // 1 初始化SparkStreamingContext
        val conf = new SparkConf().setMaster("local[*]").setAppName("sparkstreaming")
        val ssc = new StreamingContext(conf, Seconds(3))

        // 2 通过监控端口创建DStream,读进来的数据为一行行
        val lineDStream = ssc.socketTextStream("hadoop102", 9999)

        // 3 切割=》变换
        val wordToOneDStream = lineDStream.flatMap(_.split(" "))
            .map((_, 1))

        // 4 输出
        wordToOneDStream.foreachRDD(
            rdd=>{
                // 在Driver端执行(ctrl+n JobScheduler),一个批次一次
                // 在JobScheduler 中查找(ctrl + f)streaming-job-executor
                println("222222:" + Thread.currentThread().getName)

                rdd.foreachPartition(
                    //5.1 测试代码
                    iter=>iter.foreach(println)

                    //5.2 企业代码
                    //5.2.1 获取连接
                    //5.2.2 操作数据,使用连接写库
                    //5.2.3 关闭连接
                )
            }
        )

        // 5 启动=》阻塞
        ssc.start()
        ssc.awaitTermination()
    }
}

3、注意
1)连接不能写在driver层面(序列化)
2)如果写在foreach则每个rdd中的每一条数据都创建,得不偿失
3)增加foreachpartition,在分区创建(获取)

第 6 章:优雅关闭

流式任务需要7*24小时执行,但是有时涉及到升级代码需要主动停止程序,但是分布式程序,没办法做到一个个进程去杀死,所以配置优雅的关闭就显得至关重要了。
关闭方式:使用外部文件系统来控制内部程序关闭。
1、主程序

package com.atguigu.sparkstreaming

import java.net.URI
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext, StreamingContextState}

object SparkStreaming11_stop {

    def main(args: Array[String]): Unit = {

        //1.初始化Spark配置信息
        val sparkconf = new SparkConf().setMaster("local[*]").setAppName("sparkStreaming")

			 // 设置优雅的关闭
        sparkconf.set("spark.streaming.stopGracefullyOnShutdown", "true")

        //2.初始化SparkStreamingContext
        val ssc: StreamingContext = new StreamingContext(sparkconf, Seconds(3))

        // 接收数据
        val lineDStream: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop102", 9999)
        // 执行业务逻辑
        lineDStream.flatMap(_.split(" "))
                .map((_,1))
                .print()

        // 开启监控程序
        new Thread(new MonitorStop(ssc)).start()

        //4 启动SparkStreamingContext
        ssc.start()

        // 将主线程阻塞,主线程不退出
        ssc.awaitTermination()
    }
}

// 监控程序
class MonitorStop(ssc: StreamingContext) extends Runnable{

    override def run(): Unit = {
        // 获取HDFS文件系统
        val fs: FileSystem = FileSystem.get(new URI("hdfs://hadoop102:8020"),new Configuration(),"atguigu")

        while (true){
            Thread.sleep(5000)
            // 获取/stopSpark路径是否存在
            val result: Boolean = fs.exists(new Path("hdfs://hadoop102:8020/stopSpark"))

            if (result){

                val state: StreamingContextState = ssc.getState()
                // 获取当前任务是否正在运行
                if (state == StreamingContextState.ACTIVE){
                    // 优雅关闭
                    ssc.stop(stopSparkContext = true, stopGracefully = true)
                    System.exit(0)
                }
            }
        }
    }
}

2、测试
1)发送数据

[atguigu@hadoop102 ~]$ nc -lk 9999
hello

2)启动hadoop集群

[atguigu@hadoop102 hadoop-3.1.3]$ sbin/start-dfs.sh

[atguigu@hadoop102 hadoop-3.1.3]$ hadoop fs -mkdir /stopSpark

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/348499.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Unity配置表xlsx/xls打包后读取错误问题

前言 代码如下&#xff1a; //文本解析private void ParseText(){//打开文本 读FileStream stream File.Open(Application.streamingAssetsPath excelname, FileMode.Open, FileAccess.Read, FileShare.Read);//读取文件流IExcelDataReader excelRead ExcelReaderFactory…

Flume1.9基础学习

文章目录 一、Flume 入门概述1、概述2、Flume 基础架构2.1 Agent2.2 Source2.3 Sink2.4 Channel2.5 Event 3、Flume 安装部署3.1 安装地址3.2 安装部署 二、Flume 入门案例1、监控端口数据官方案例1.1 概述1.2 实现步骤 2、实时监控单个追加文件2.1 概述2.2 实现步骤 3、实时监…

ElasticSearch搜索引擎入门到精通

ES 是基于 Lucene 的全文检索引擎,它会对数据进行分词后保存索引,擅长管理大量的数据,相对于 MySQL 来说不擅长经常更新数据及关联查询。这篇文章就是为了进一步了解一下它,到底是如何做到这么高效的查询的。 在学习其他数据库的时候我们知道索引是一个数据库系统极其重要…

OpenAI正式推出GPT商店 ChatGPT团队订阅服务一并推出

2024年1月11日消息&#xff0c;据外媒报道&#xff0c;如上周在给开发者的邮件中所宣布的一样&#xff0c;因ChatGPT而名声大噪的人工智能公司OpenAI&#xff0c;在本周正式推出了GPT商店&#xff0c;供用户分享和发现个性化的ChatGPT&#xff0c;同时他们也推出了面向各种不同…

【Java 数据结构】ArrayList与顺序表

ArrayList 1.线性表2.顺序表2.1 接口的实现 3. ArrayList简介4. ArrayList使用4.1 ArrayList的构造4.2 ArrayList常见操作4.3 ArrayList的遍历4.4 ArrayList的扩容机制 1.线性表 线性表&#xff08;linear list&#xff09;是n个具有相同特性的数据元素的有限序列。 线性表是一…

Java / Spring Boot + POI 给 Word 添加水印

1、前言(瞎扯) 事情是这样子的&#xff1a;我好哥们所在的公司实在是太忙了&#xff0c;多线程都开起来了还是忙不完&#xff0c;只能开了子线程叫我帮他整一个给 Word 加水印的&#xff0c;于是我就网上找呗~ 看到那个 Aspose 好像是收费的&#xff0c;然后就把目光转向了 POI…

基于物联网设计的水稻田智能灌溉系统(STM32+华为云IOT)

一、项目介绍 随着科技的不断发展和人们生活水平的提高&#xff0c;农业生产也逐渐向智能化、高效化的方向发展。水稻作为我国主要的粮食作物之一&#xff0c;其生长过程中的灌溉管理尤为重要。传统的灌溉方式往往依赖于人工观察和控制&#xff0c;不仅效率低下&#xff0c;而…

城市开发区视频系统建设方案:打造视频基座、加强图像数据治理

一、背景需求 随着城市建设的步伐日益加快&#xff0c;开发区已经成为了我国工业化、城镇化和对外开放的重要载体。自贸区、开发区和产业园的管理工作自然也变得至关重要。在城市经开区的展览展示馆、进出口商品展示交易中心等地&#xff0c;数千路监控摄像头遍布各角落&#…

Spring-Kafka 3.0 消费者消费失败处理方案

一、背景 我们作为Kafka在使用Kafka是&#xff0c;必然考虑消息消费失败的重试次数&#xff0c;重试后仍然失败如何处理&#xff0c;要么阻塞&#xff0c;要么丢弃&#xff0c;或者保存 二、设置消费失败重试次数 1 默认重试次数在哪里看 Kafka3.0 版本默认失败重试次数为1…

27.移除元素(力扣LeetCode)

文章目录 27.移除元素&#xff08;力扣LeetCode&#xff09;题目描述方法一&#xff1a;vector成员函数&#xff1a;erase方法二&#xff1a;暴力解法方法三&#xff1a;双指针法 27.移除元素&#xff08;力扣LeetCode&#xff09; 题目描述 给你一个数组 nums 和一个值 val&…

【开源】基于JAVA语言的班级考勤管理系统

目录 一、摘要1.1 项目介绍1.2 项目录屏 二、功能模块2.1 系统基础支持模块2.2 班级学生教师支持模块2.3 考勤签到管理2.4 学生请假管理 三、系统设计3.1 功能设计3.1.1 系统基础支持模块3.1.2 班级学生教师档案模块3.1.3 考勤签到管理模块3.1.4 学生请假管理模块 3.2 数据库设…

python Django入门

1.创建Django项目 方式一:进入到指定要存放项目的目录&#xff0c;执行*django-admin startproject “projectname”* 来创建一个名方式二:使用Pycharm专业版创建Django项目 创建项目后&#xff0c;默认的目录结构: manage.py:是Django用于管理本项目的命令行工具&#xff0c…

前出深入-机器学习

文章目录 一、K近邻算法1.1 先画一个散列图1.2 使用K最近算法建模拟合数据1.3 进行预测1.4 K最近邻算法处理多元分类问题1.5 K最近邻算法用于回归分析1.6 K最近邻算法项目实战-酒的分类1.6.1 对数据进行分析1.6.2 生成训练数据集和测试数据集1.6.3 使用K最近邻算法对数据进行建…

如何在Shopee平台上进行选品 :充分利用渠道获取灵感和数据支持

在Shopee平台上进行选品是一个关键的决策过程&#xff0c;它直接影响到卖家的销售业绩和店铺的发展。为了帮助卖家更好地进行选品&#xff0c;Shopee提供了多种渠道来获取灵感和数据支持。下面将介绍一些主要的选品渠道以及如何利用它们来进行选品。 先给大家推荐一款shopee知…

【DDD】学习笔记-深入分析软件的复杂度

软件复杂度的成因 Eric Evans 的经典著作《领域驱动设计》的副标题为“软件核心复杂性应对之道”&#xff0c;这说明了 Eric 对领域驱动设计的定位就是应对软件开发的复杂度。Eric 甚至认为&#xff1a;“领域驱动设计只有应用在大型项目上才能产生最大的收益”。他通过 Smart…

C#,数据检索算法之线性检索(Linear Search)的源代码

数据检索算法是指从数据集合&#xff08;数组、表、哈希表等&#xff09;中检索指定的数据项。 数据检索算法是所有算法的基础算法之一。 线性&#xff1f;听起来就“高大上”&#xff0c;其实&#xff0c;只不过就是挨个比较呗。 本文发布&#xff08;听起来很正式 &#x…

TarGAN:多模态医学图像转换GAN

TarGAN 核心思想网络结构 核心思想 论文&#xff1a;https://arxiv.org/abs/2105.08993 代码&#xff1a;https://github.com/2165998/TarGAN 解决的问题&#xff1a;传统多模态医学图像转换通常&#xff0c;在生成高质量图像方面存在问题&#xff0c;特别是在关键目标区域或…

C语言中计算结构体的大小

一. 使用sizeof 计算结构体的大小 通常情况下&#xff0c;我们习惯于使用 sizeof 运算符来计算结构体的大小。 例如&#xff0c;下面是一个结构体的定义&#xff1a; struct Student {int id;char name[20];int age;float score; }; 其中&#xff0c;Student是该结构体的类…

IP被封怎么办?如何绕过IP禁令?

相信很多人遇到过IP禁令&#xff1a;比如你在访问社交媒体、搜索引擎或电子商务网站时会被限制访问&#xff0c;又或者你的的账号莫名被封&#xff0c;这些由于网络上的种种限制我们经常会遭遇IP被封的情况&#xff0c;导致无法使用继续进行网络行动。在本文中&#xff0c;我们…

Django笔记(七):JWT认证

首 前后端分离的项目更多使用JWT认证——Json Web Token。本文记录djangorestframework-simplejwt的使用方式。文档 安装 pip install djangorestframework-simplejwt 配置settings.py: INSTALLED_APPS [rest_framework_simplejwt, ]REST_FRAMEWORK {DEFAULT_AUTHENTICA…