SparkStreaming--scala

文章目录

  • 第1关:QueueStream
    • 代码
  • 第2关:File Streams
    • 代码


第1关:QueueStream

任务描述
本关任务:编写一个清洗QueueStream数据的SparkStreaming程序。

相关知识
为了完成本关任务,你需要掌握:1.如何使用SparkStreaming,2.如何使用 SparkStreaming读取QueueStream。

SparkStreaming 的开发步骤
初始化SparkConf并设置相关参数
val conf = new SparkConf().setMaster(master).setAppName(appName)

说明:

appName 是应用程序在集群 UI 上显示的名称。

master 是Spark,Mesos或YARN集群的URL,或在本地模式下运行使用 local[*]

初始化JavaStreamingContext并设置处理批次的时间
val ssc = new StreamingContext(conf, Seconds(1))

设置数据源
例如:

val inputStream = ssc.queueStream(rddQueue)

批次数据处理(使用相关算子完成相应的操作)
算子 含义
map(func) 通过将源DStream的每个元素传递给函数func来返回一个新的DStream
flatMap(func) 与map类似,但每个输入项可以映射到0个或更多输出项。
filter(func) 通过仅选择func返回true的源DStream的记录来返回新的DStream
repartition(numPartitions) 通过创建更多或更少的分区来更改此DStream中的并行度级别
union(otherStream) 返回一个新的DStream,它包含源DStream和otherDStream中元素的并集
count() 通过计算源DStream的每个RDD中的元素数量,返回单元素RDD的新DStream
reduce(func) 通过使用函数func(它接受两个参数并返回一个)聚合源DStream的每个RDD中的元素,返回单元素RDD的新DStream 。该函数应该是关联的和可交换的,以便可以并行计算
countByValue() 当在类型为K的元素的DStream上调用时,返回(K,Long)对的新DStream,其中每个键的值是其在源DStream的每个RDD中的频率
reduceByKey(func,[ numTasks ]) 当在(K,V)对的DStream上调用时,返回(K,V)对的新DStream,其中使用给定的reduce函数聚合每个键的值。注意:默认情况下,这使用Spark的默认并行任务数(本地模式为2,在群集模式下,数量由config属性确定spark.default.parallelism进行分组。您可以传递可选numTasks参数来设置不同数量的任务
join(otherStream, [numTasks]) 当在(K,V)和(K,W)对的两个DStream上调用时,返回(K,(V,W))对的新DStream与每个键的所有元素对
cogroup(otherStream, [numTasks]) 当在(K,V)和(K,W)对的DStream上调用时,返回(K,Seq [V],Seq [W])元组的新DStream
transform(func) 通过将RDD-to-RDD函数应用于源DStream的每个RDD来返回新的DStream。这可以用于在DStream上执行任意RDD操作
updateStateByKey(func) 返回一个新的“状态”DStream,其中通过在键的先前状态和键的新值上应用给定函数来更新每个键的状态
foreachRDD(func) 最通用的输出运算符,它将函数func应用于从流生成的每个RDD。此函数应将每个RDD中的数据推送到外部系统,例如将RDD保存到文件,或通过网络将其写入数据库。请注意,函数func在运行流应用程序的驱动程序进程中执行,并且通常会在其中执行RDD操作,这将强制计算流式RDD。
启动流计算
ssc.start();

等待处理停止
ssc.awaitTermination();

QueueStream
QueueStream(队列流):推入队列的每个RDD将被视为DStream中的一批数据,并像流一样处理。

编程要求
在右侧编辑器补充代码,完成以下需求:

将时间戳转换成规定格式的时间形式(格式为:yyyy-MM-dd HH:mm:ss )

提取数据中的起始URL(切割符为空格)

拼接结果数据,格式如下:

Ip:124.132.29.10,visitTime:2019-04-22 11:08:33,startUrl:www/2,targetUrl: https://search.yahoo.com/search?p=反叛的鲁鲁修,statusCode:200
将最终结果写入Mysql数据库
测试说明
平台将对你编写的代码进行评测:

预期输出:

1 Ip:100.143.124.29,visitTime:2017-10-27 14:58:05,startUrl:www/1,targetUrl:https://www.baidu.com/s?wd=反叛的鲁鲁修,statusCode:404
2 Ip:30.132.167.100,visitTime:2018-12-02 11:29:39,startUrl:www/4,targetUrl:-,statusCode:302
3 Ip:30.156.187.132,visitTime:2016-05-17 17:18:56,startUrl:www/2,targetUrl:-,statusCode:200
4 Ip:29.100.10.30,visitTime:2016-10-12 01:25:47,startUrl:www/3,targetUrl:http://cn.bing.com/search?q=游戏人生,statusCode:302
5 Ip:132.187.167.143,visitTime:2017-01-08 23:21:09,startUrl:pianhua/130,targetUrl:-,statusCode:200
6 Ip:143.187.100.10,visitTime:2016-09-21 19:27:39,startUrl:www/1,targetUrl:-,statusCode:302
7 Ip:10.100.124.30,visitTime:2018-09-16 02:49:35,startUrl:www/4,targetUrl:http://cn.bing.com/search?q=来自新世界,statusCode:200
8 Ip:29.10.143.187,visitTime:2017-09-29 15:49:09,startUrl:www/1,targetUrl:-,statusCode:404
9 Ip:29.187.132.100,visitTime:2018-11-27 05:43:17,startUrl:www/1,targetUrl:-,statusCode:200
10 Ip:187.167.124.132,visitTime:2016-01-28 13:34:33,startUrl:www/6,targetUrl:-,statusCode:200
开始你的任务吧,祝你成功!

代码

import java.text.SimpleDateFormat
import java.util.Date
import org.apache.spark.{HashPartitioner, SparkConf}
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.collection.mutable
 
object QueueStream {
    def main(args: Array[String]) {
        val rddQueue = new mutable.SynchronizedQueue[RDD[String]]()
        val conf = new SparkConf().setMaster("local[2]").setAppName("queueStream")
        
        /********** Begin **********/
 
        //1.初始化StreamingContext,设置时间间隔为1s
        val ssc = new StreamingContext(conf, Seconds(1))
 
        //2.对接队列流
        val inputStream = ssc.queueStream(rddQueue)
 
        /**
        *
        * 数据格式如下:
        *      100.143.124.29,1509116285000,'GET www/1 HTTP/1.0',https://www.baidu.com/s?wd=反叛的鲁鲁修,404
        * 数据从左往右分别代表:用户IP、访问时间戳、起始URL及相关信息(访问方式,起始URL,http版本)、目标URL、状态码
        *
        *
        * 原始数据的切割符为逗号,(英文逗号)
        *
        * 需求:
        *      1.将时间戳转换成规定时间(格式为:yyyy-MM-dd HH:mm:ss )
        *      2.提取数据中的起始URL(切割符为空格)
        *      3.拼接结果数据,格式如下:
        * Ip:124.132.29.10,visitTime:2019-04-22 11:08:33,startUrl:www/2,targetUrl:https://search.yahoo.com/search?p=反叛的鲁鲁修,statusCode:200
        *      4.将最终结果写入 mysql 数据库, 调用DBUtils.add(line)即可, line:String
        */
 
        //3.获取队列流中的数据,进行清洗、转换(按照上面的需求)
    val data = inputStream.map(data=>{
      val dataliat = data.split(',')
      val ip = dataliat(0)
      val simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
      val lt = dataliat(1).toLong
      val date = new Date(lt)
      val visitTime = simpleDateFormat.format(date)
      val startUrl = dataliat(2).split(' ')(1)
      val targetUrl= dataliat(3)
      val statusCode = dataliat(4)
      val result = "Ip:" + ip + ",visitTime:" + visitTime + ",startUrl:" + startUrl + ",targetUrl:" + targetUrl + ",statusCode:" + statusCode
      result
    })
 
        //4.将最终结果写入 mysql 数据库, 调用DBUtils.add(line)即可, line:String
    data.foreachRDD(rdd => {
      rdd.foreachPartition(it => {
        it.foreach(line => {
          DBUtils.add(line)
        })
      })
    })
 
 
        //5.启动SparkStreaming
    ssc.start()
 
        /********** End **********/
        DBUtils.addQueue(ssc, rddQueue)
    }
}

在这里插入图片描述

第2关:File Streams

任务描述
本关任务:编写一个清洗File Streams数据的SparkStreaming程序。

相关知识
为了完成本关任务,你需要掌握:1. 文件流,2. SparkStreaming的编程流程。

文件流
文件流(File Streams):从与HDFS API兼容的任何文件系统上的文件读取数据

通过文件流创建Dstream:

val lines=streamingContext.fileStreamKeyClass,ValueClass, InputFormatClass

对于简单的文本文件,有一种更简单的方法:

val lines=streamingContext.textFileStream(dataDirectory)

Spark Streaming将监视目录dataDirectory并处理在该目录中创建的任何文件(不支持嵌套目录中写入的文件)。

注意:

文件必须具有相同的数据格式。

文件移动到该目录后,不能在添加新数据,即使添加也不会读取新数据。

只会监听目录下在程序启动后新增的文件,不会去处理历史上已经存在的文件。

说明:文件流不需要运行receiver,因此不需要分配core

SparkStreaming编程流程
设置SparkConf相关参数
val conf = new SparkConf().setMaster(master).setAppName(appName)

初始化StreamingContext
val ssc = new StreamingContext(conf, Seconds(1))
Seconds(1)表示每一秒处理一个批次;

设置数据源创建Dstream
val lines = ssc.textFileStream(dataDirectory)

通过将转换和输出操作应用于DStream来定义流式计算
比如flatmap,map,foreachRDD,updateStateByKey等等;

启动流计算
ssc.start();

等待处理停止
ssc.awaitTermination();

编程要求
在右侧编辑器中补全代码,要求如下:

/root/step11_fils下有两个文件,文件内容分别为:
hadoop hadoop hadoop hadoop hadoop hadoop hadoop hadoop spark spark
hello hello hello hello hello hello hello hello study study
要求清洗数据并实时统计单词个数,并将最终结果导入MySQL
step表结构:

列名 数据类型 长度 非空
word varchar 255 √
count int 255 √
测试说明
平台会对你编写的代码进行测试:

预期输出:

hadoop 8
spark 2
hello 8
study 2

代码

package com.sanyiqi
 
import java.sql.{Connection, DriverManager, ResultSet}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
 
object SparkStreaming {
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("edu").setMaster("local")
        /********** Begin **********/
        //1.初始化StreamingContext,设置时间间隔为1s
    val ssc = new StreamingContext(conf, Seconds(1))
        //2.设置文件流,监控目录/root/step11_fils
    val lines = ssc.textFileStream("/root/step11_fils")
        /* *数据格式如下:hadoop hadoop spark spark
           *切割符为空格
           *需求:
           *累加各个批次单词出现的次数
           *将结果导入Mysql
           *判断MySQL表中是否存在即将要插入的单词,不存在就直接插入,存在则把先前出现的次数与本次出现的次数相加后插入
           *库名用educoder,表名用step,单词字段名用word,出现次数字段用count
         */
        //3.对数据进行清洗转换
   val wordcount = lines.flatMap(_.split(" ")).map(x=>(x,1)).reduceByKey(_+_)
 
 
//4.将结果导入MySQL
    wordcount.foreachRDD(rdd => {
      rdd.foreachPartition(f = eachPartition => {
        val connection: Connection = createConnection()
        eachPartition.foreach(f = record => {
          val querySql = "SELECT t.count FROM step t WHERE t.word = '" + record._1 + "'"
          val queryResultSet: ResultSet = connection.createStatement().executeQuery(querySql)
          val hasNext = queryResultSet.next()
          print("MySQL had word:" + record._1 + " already  :  " + hasNext)
          if (!hasNext)
          {
            val insertSql = "insert into step(word,count) values('" + record._1 + "'," + record._2 + ")"
            connection.createStatement().execute(insertSql)
          } else {
            val newWordCount = queryResultSet.getInt("count") + record._2
            val updateSql = "UPDATE step SET count = " + newWordCount + " where word = '" + record._1 + "'"
            connection.createStatement().execute(updateSql)
          }
        })
        connection.close()
      })
    })
        //5.启动SparkStreaming
    ssc.start()
 
        /********** End **********/
        Thread.sleep(15000)
        ssc.awaitTermination()
        ssc.stop()
	}
    
    /**
      *获取mysql连接
      *@return
      */
    def createConnection(): Connection ={
    	Class.forName("com.mysql.jdbc.Driver")
    	DriverManager.getConnection("jdbc:mysql://localhost:3306/educoder","root","123123")
    }
}

在这里插入图片描述


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/799402.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【07】分布式事务解决方案

1、事务简介 事务(Transaction)是访问并可能更新数据库中各种数据项的一个程序执行单元(unit)。在关系数据库中,一个事务由一组SQL语句组成。事务应该具有ACID四个特性:原子性、一致性、隔离性、持久性。任何事务机制在实现时,都应该考虑事务…

mac生成.dmg压缩镜像文件

mac生成.dmg压缩镜像文件 背景准备内容步骤1,找一个文件夹2,制作application替身1,终端方式2,黄金右手方式 3,.app文件放入文件夹4,制作.dmg压缩镜像文件5,安装.dmg 总结 背景 为绕开App Store…

纯css实现语音播报动画效果

先来看看效果图 黑色以下代码 background: url(data:image/png;base64,iVBORw0KGgoAAAANSUhEUgAAAGAAAAAYCAYAAAAF6fiUAAAAAXNSR0IArs4c6QAAAARzQklUCAgICHwIZIgAAAO8SURBVGiB1ZlPaBxVHMe/v7fbkmUZErRs9/1moUtYBBfxklZR8aYg5KAgvcT25MFDvUhPTaGnHoKIlOpJ0VsPUvAgKNjSS0EPAVGKya…

防火墙综合实验之NAT和智能选路

目录 前言: 一、实验题目 二、实验操作 需求一 需求二 需求三 需求四、需求五 需求六 需求七 ​编辑 需求八 需求九 需求十 需求十一 三、需求测试 前言: 本篇文章是延续上一篇文章,简单来说就是防火墙实验的完善和延续&#…

Zabbix6.0监控Freeswitch状态

一、前提环境说明 1、最终实现Freeswitch监控指标信息: 2、环境需求: (1)需要使用Zabbix6.0及以上 (2)需要使用zabbix_agent2 二、实现步骤 1、zabbix_agent2添加监控键值 cat /etc/zabbix/conf.d/fr…

唯众物联网综合实训台 物联网实验室建设方案

物联网综合实训装置 物联网工程应用综合实训台是我公司针对职业院校物联网行业综合技能型人才培养,综合运用传感器技术、RFID技术、接口控制技术、无线传感网技术、Android应用开发等,配合实训台上的433M无线通信设备、ZigBee节点、射频设备、控制设备、…

CoT-SC论文速读

1.论文速读 本文提出了一个重要的Decoder策略为:“Self-Consistency”,并将其用在CoT的Prompt工作中。 该策略作用:让LLM在处理复杂问题时,让他尝试多个推理路径,每一个推理路径都是一次CoT(Chain of Thought&#x…

解决回溯算法之切割问题(leetcode--分割回文串)

文章目录 1.问题描述2.做题思路(关键是画出对于的二叉树图)3.代码实现 1.问题描述 2.做题思路(关键是画出对于的二叉树图) 1.思考从起始串的分割方案, 有a ,aa, aab三种方式 2.————————————剩余ab,b,空(接下来对ab,b同样的方式进行分割) 3.…

【Linux】centos7安装PHP7.4报错:libzip版本过低

问题描述 configure: error: Package requirements (libzip > 0.11 libzip ! 1.3.1 libzip ! 1.7.0) were not met: checking for libzip > 0.11 libzip ! 1.3.1 libzip ! 1.7.0... no configure: error: Package requirements (libzip > 0.11 libzip ! 1.3.1 libzi…

星辰计划02-独特视角的spring动态代理

承接上一文 动态代理 ,这里探究spring 动态代理 会话1:spring动态代理 quick start 👧哥哥,哥哥,spring 怎么去搞动态代理的呢👨 来来来,听我细细来说 quick start通过Spring的 ProxyFactory…

【高中数学/幂函数】比较a=2^0.3,b=3^0.2,c=7^0.1的大小

【问题】 比较a2^0.3,b3^0.2,c7^0.1的大小 【解答】 a2^0.32^3/10(2^3)^1/108^1/10 b3^0.23^2/10(3^2)^1/109^1/10 c7^0.17^1/10 由于yx^1/10在x正半轴是增函数,底数大的得数就大。 因为9>8>7,所以b>a>c 【图像】 在图像上绘出曲线yx^1/10&…

C++初阶:类和对象(二)

✨✨所属专栏:C✨✨ ✨✨作者主页:嶔某✨✨ 类的默认成员函数 默认成员函数就是用户没有显式实现,编译器会⾃动⽣成的成员函数称为默认成员函数。⼀个类,我们不写的情况下编译器会默认⽣成以下6个默认成员函数,需要注…

报文对比工具

如果有报文对比需求,可以通过以下步骤实现: ①通过在线 XML排序、压缩、格式化 网站 排序后格式化数据 http://www.bejson.com/otherformat/xmlsort/ 访问速度快,操作直观, 1.原始xml数据 2.排序 3.格式化 ②N比对数据是否一…

哥德巴赫猜想c++

方法一 #include<bits/stdc.h> using namespace std; //定义函数&#xff0c;判断素数 bool sushu(int n){bool rtrue;//先假设是素数&#xff0c;即真//循环因子范围&#xff0c;找到一个因子就不是素数for(int i2;i<sqrt(n);i){//判断2~n的根号是否素数if(n%i0){//…

翁恺-C语言程序设计-05-3. 求a的连续和

05-3. 求a的连续和 输入两个整数a和n&#xff0c;a的范围是[0,9]&#xff0c;n的范围是[1,8]&#xff0c;求数列之和S aaaaaa…aaa…a&#xff08;n个a&#xff09;。如a为2、n为8时输出的是222222…22222222的和。 输入格式&#xff1a; 输入在一行中给出两个整数&#xf…

Hdfs3.x新特性详解

作者&#xff1a;九月 HDFS Disk Balancer(磁盘均衡器) HDFS Disk Balancer与HDFS Balancer的区别 两者都是实现负载均衡功能。 HDFS Balancer是之前Hadoop2.x中本身存在的&#xff0c;主要是多个DataNode节点之间的数据的平衡。 HDFS Disk Balancer是Hadoop3中新出现的&…

融云:换头像=换人设?社交应用中隐秘而重要的「用户信息管理」

当代年轻人失眠三大原因&#xff0c;最近新上的《喜人奇妙夜》帮你找到了—— 基金绿了、吵架输了、前任头像换了。 当你半夜翻看前任的社交账号&#xff0c;一场盛大的失眠就开始了&#xff0c;就算古希腊掌柜睡眠的神躺你旁边也不好使。即便 Ta 没有更新内容&#xff0c;昵…

Linux RTL8111/RTL8168 不能联网 / 最新版驱动下载安装

注&#xff1a; 机翻&#xff0c;未校对。 如何让 Realtek RTL8111/RTL8168 在 Linux 下工作 这篇文章于 2016 年 8 月在我原来的博客上发布。尽管如今 Linux 下的 RTL8111/RTL8168 网络接口的情况变得越来越稳定&#xff0c;但它们仍然会导致数据包丢失或网络连接不稳定等问题…

C1W1.Assignment: Logistic Regression

理论课&#xff1a;C1W1.Sentiment Analysis with Logistic Regression 文章目录 前期准备导入包导入数据处理推文文本 Part 1: Logistic regressionPart 1.1: Sigmoid实现 sigmoid 函数Logistic regression: regression and a sigmoid Part 1.2 Cost function and GradientUp…

自动驾驶-端到端分割任务

上采样 bed of nails interpolation transposed convolutions 1. 上采样 (Upsampling) 上采样是一种技术&#xff0c;用于增加数据集中的样本数量或是提高信号的分辨率。在图像处理中&#xff0c;上采样通常指的是增加图像的像素数量&#xff0c;从而使图像变得更大。这可…