Spark---转换算子、行动算子、持久化算子

一、转换算子和行动算子

1、Transformations转换算子

1)、概念

Transformations类算子是一类算子(函数)叫做转换算子,如map、flatMap、reduceByKey等。Transformations算子是延迟执行,也叫懒加载执行。

2)、Transformation类算子

filter :过滤符合条件的记录数,true保留,false过滤掉

map:将一个RDD中的每个数据项,通过map中的函数映射变为一个新的元素。特点:输入一条,输出一条数据。

flatMap:先map后flat。与map类似,每个输入项可以映射为0到多个输出项。

sample:随机抽样算子,根据传进去的小数按比例进行又放回或者无放回的抽样。

reduceByKey:将相同的Key根据相应的逻辑进行处理。

sortByKey/sortBy:作用在K,V格式的RDD上,对Key进行升序或者降序排序。

2、Action行动算子

1)、概念:

Action类算子也是一类算子(函数)叫做行动算子,如foreach,collect,count等。Transformations类算子是延迟执行,Action类算子是触发执行。一个application应用程序中有几个Action类算子执行,就有几个job运行。

2)、Action类算子

count:返回数据集中的元素数。会在结果计算完成后回收到Driver端。

take(n):返回一个包含数据集前n个元素的集合。

first:first=take(1),返回数据集中的第一个元素。

foreach:循环遍历数据集中的每个元素,运行相应的逻辑。

collect:将计算结果回收到Driver端。

3)、demo:动态统计出现次数最多的单词个数,过滤掉。

  • 一千万条数据量的文件,过滤掉出现次数多的记录,并且其余记录按照出现次数降序排序。

假设有一个records.txt文件

hello Spark
hello HDFS
hello hadoop
hello linux
hello Spark
hello Spark
hello Spark1
hello Spark
hello Spark
hello Spark2
hello Spark
hello Spark
hello Spark
hello Spark3
hello Spark
hello HDFS
hello hadoop
hello linux
hello Spark
hello Spark
hello Spark4
hello Spark
hello Spark
hello Spark5
hello Spark
hello Spark

代码处理:

package com.bjsxt.demo;

import java.util.Arrays;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;

import scala.Tuple2;
/**
 * 动态统计出现次数最多的单词个数,过滤掉。
 * @author root
 *
 */
public class Demo1 {
	public static void main(String[] args) {
		SparkConf conf = new SparkConf();
		conf.setMaster("local").setAppName("demo1");
		JavaSparkContext jsc = new JavaSparkContext(conf);
		JavaRDD<String> lines = jsc.textFile("./records.txt");
		JavaRDD<String> flatMap = lines.flatMap(new FlatMapFunction<String, String>() {

			/**
			 * 
			 */
			private static final long serialVersionUID = 1L;

			@Override
			public Iterable<String> call(String t) throws Exception {
				return Arrays.asList(t.split(" "));
			}
		});
		JavaPairRDD<String, Integer> mapToPair = flatMap.mapToPair(new PairFunction<String,String, Integer>() {

			/**
			 * 
			 */
			private static final long serialVersionUID = 1L;

			@Override
			public Tuple2<String, Integer> call(String t) throws Exception {
				return new Tuple2<String, Integer>(t, 1);
			}
		});
		
		JavaPairRDD<String, Integer> sample = mapToPair.sample(true, 0.5);
		
		final List<Tuple2<String, Integer>> take = sample.reduceByKey(new Function2<Integer,Integer,Integer>(){

			/**
			 * 
			 */
			private static final long serialVersionUID = 1L;

			@Override
			public Integer call(Integer v1, Integer v2) throws Exception {
				return v1+v2;
			}
			
		}).mapToPair(new PairFunction<Tuple2<String,Integer>, Integer, String>() {

			/**
			 * 
			 */
			private static final long serialVersionUID = 1L;

			@Override
			public Tuple2<Integer, String> call(Tuple2<String, Integer> t)
					throws Exception {
				return new Tuple2<Integer, String>(t._2, t._1);
			}
		}).sortByKey(false).mapToPair(new PairFunction<Tuple2<Integer,String>, String, Integer>() {

			/**
			 * 
			 */
			private static final long serialVersionUID = 1L;

			@Override
			public Tuple2<String, Integer> call(Tuple2<Integer, String> t)
					throws Exception {
				return new Tuple2<String, Integer>(t._2, t._1);
			}
		}).take(1);
		
		System.out.println("take--------"+take);
		
		JavaPairRDD<String, Integer> result = mapToPair.filter(new Function<Tuple2<String,Integer>, Boolean>() {

			/**
			 * 
			 */
			private static final long serialVersionUID = 1L;

			@Override
			public Boolean call(Tuple2<String, Integer> v1) throws Exception {
				return !v1._1.equals(take.get(0)._1);
			}
		}).reduceByKey(new Function2<Integer,Integer,Integer>(){

			/**
			 * 
			 */
			private static final long serialVersionUID = 1L;

			@Override
			public Integer call(Integer v1, Integer v2) throws Exception {
				return v1+v2;
			}
			
		}).mapToPair(new PairFunction<Tuple2<String,Integer>, Integer, String>() {

			/**
			 * 
			 */
			private static final long serialVersionUID = 1L;

			@Override
			public Tuple2<Integer, String> call(Tuple2<String, Integer> t)
					throws Exception {
				return new Tuple2<Integer, String>(t._2, t._1);
			}
		}).sortByKey(false).mapToPair(new PairFunction<Tuple2<Integer,String>, String, Integer>() {

			/**
			 * 
			 */
			private static final long serialVersionUID = 1L;

			@Override
			public Tuple2<String, Integer> call(Tuple2<Integer, String> t)
					throws Exception {
				return new Tuple2<String, Integer>(t._2, t._1);
			}
		});
		
		result.foreach(new VoidFunction<Tuple2<String,Integer>>() {
			
			/**
			 * 
			 */
			private static final long serialVersionUID = 1L;

			@Override
			public void call(Tuple2<String, Integer> t) throws Exception {
				System.out.println(t);
			}
		});
		
		jsc.stop();
		
	}
}

3、Spark代码流程

1)、创建SparkConf对象

可以设置Application name。

可以设置运行模式。

可以设置Spark application的资源需求。

2)、创建SparkContext对象

3)、基于Spark的上下文创建一个RDD,对RDD进行处理。

4)、应用程序中要有Action类算子来触发Transformation类算子执行。

5)、关闭Spark上下文对象SparkContext。

二、Spark持久化算子

1、控制算子

1)、概念

控制算子有三种,cache,persist,checkpoint,以上算子都可以将RDD持久化,持久化单位是partition。cache和persist都是懒执行的。必须有一个action类算子触发执行。checkpoint算子不仅能将RDD持久化到磁盘,还能切断RDD之间的依赖关系。

2)、cache

默认将RDD的数据持久化到内存中。cache是懒执行。

注意:chche()=persist()=persist(StorageLevel.Memory_Only)

测试cache文件:

测试代码:

1.SparkConf conf = new SparkConf();
2.conf.setMaster("local").setAppName("CacheTest");
3.JavaSparkContext jsc = new JavaSparkContext(conf);
4.JavaRDD<String> lines = jsc.textFile("persistData.txt");
5.
6.lines = lines.cache();
7.long startTime = System.currentTimeMillis();
8.long count = lines.count();
9.long endTime = System.currentTimeMillis();
10.System.out.println("共"+count+ "条数据,"+"初始化时间+cache时间+计算时间="+ 
11.(endTime-startTime));
12.
13.long countStartTime = System.currentTimeMillis();
14.long countrResult = lines.count();
15.long countEndTime = System.currentTimeMillis();
16.System.out.println("共"+countrResult+ "条数据,"+"计算时间="+ (countEndTime-
17.countStartTime));
18.
19.jsc.stop();

persist:

可以指定持久化的级别。最常用的是MEMORY_ONLY和MEMORY_AND_DISK。”_2“表示有副本数。

持久化级别如下:

2、cache和persist的注意事项

1)、cache和persist都是懒执行,必须有一个action类算子触发执行。

2)、cache和persist算子的返回值可以赋值给一个变量,在其他job中直接使用这个变量就是使用持久化的数据了。持久化的单位是partition。

3)、cache和persist算子后不能立即紧跟action算子。

4)、cache和persist算子持久化的数据当applilcation执行完成之后会被清除。

错误:rdd.cache().count() 返回的不是持久化的RDD,而是一个数值了。

3、checkpoint

checkpoint将RDD持久化到磁盘,还可以切断RDD之间的依赖关系。checkpoint目录数据当application执行完之后不会被清除。
  • persist(StorageLevel.DISK_ONLY)与Checkpoint的区别?

1)、checkpoint需要指定额外的目录存储数据,checkpoint数据是由外部的存储系统管理,不是Spark框架管理,当application完成之后,不会被清空。

2)、cache() 和persist() 持久化的数据是由Spark框架管理,当application完成之后,会被清空。

3)、checkpoint多用于保存状态。

  • checkpoint 的执行原理:

1)、当RDD的job执行完毕后,会从finalRDD从后往前回溯。

2)、当回溯到某一个RDD调用了checkpoint方法,会对当前的RDD做一个标记。

3)、Spark框架会自动启动一个新的job,重新计算这个RDD的数据,将数据持久化到HDFS上。

  • 优化:对RDD执行checkpoint之前,最好对这个RDD先执行cache,这样新启动的job只需要将内存中的数据拷贝到HDFS上就可以,省去了重新计算这一步。
  • 使用:
1.SparkConf conf = new SparkConf();
2.conf.setMaster("local").setAppName("checkpoint");
3.JavaSparkContext sc = new JavaSparkContext(conf);
4.sc.setCheckpointDir("./checkpoint");
5.JavaRDD<Integer> parallelize = sc.parallelize(Arrays.asList(1,2,3));
6.parallelize.checkpoint();
7.parallelize.count();
8.sc.stop();

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/175450.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

ROS2对比ROS1的一些变化与优势(全新安装ROS2以及编译错误处理)《1》

1、概述 我们在前面介绍的ROS&#xff0c;都是ROS1的版本&#xff0c;近期对机器狗进行学习的时候&#xff0c;发现版本是ROS2了&#xff0c;也发现平时习惯的一些命令都有了变化&#xff0c;改变还是挺大的&#xff0c;不过熟悉之后还是很习惯ROS2的写法。 ROS2不是在ROS1的基…

NV080D语音芯片:让智能快递柜取件更便利

随着互联网的普及和电子商务的迅速发展&#xff0c;网购消费已经成为了越来越多人的选择。这也催生了一个庞大的“网购一族”&#xff0c;他们购买的各种商品会通过快递公司送到家门口。然而&#xff0c;收取快递往往也伴随着一系列问题。比如&#xff0c;派送时间和收件人取件…

如何通过提升客户体验带来更大的增长、更好的客户留存率?

客户期望的转变 在一个日益数字化的世界里&#xff0c;有必要采取以客户为中心的思维方式。因为客户与企业互动的方式有很多是在数字空间发生的&#xff0c;客户的需求和模式已经转变。 这种情况已经酝酿了几年&#xff0c;但在2020年才打开闸门。随着疫情的爆发&#xff0c;企…

【文末送书】十大排序算法C++代码实现

欢迎关注博主 Mindtechnist 或加入【智能科技社区】一起学习和分享Linux、C、C、Python、Matlab&#xff0c;机器人运动控制、多机器人协作&#xff0c;智能优化算法&#xff0c;滤波估计、多传感器信息融合&#xff0c;机器学习&#xff0c;人工智能等相关领域的知识和技术。关…

解决 Python requests 库中 方法选择错误问题

在使用Python库requests进行网页请求时&#xff0c;可能会遇到一个问题&#xff0c;即在处理重定向时&#xff0c;requests的Session.resolve_redirects方法会复制原始请求对象&#xff0c;这可能导致后续请求的HTTP方法选择错误。 解决方案&#xff1a; 针对上述问题&#x…

PyCharm 配置sqlite3驱动下载问题

单击View -> Tool Windows -> Database&#xff0c;打开Database窗体&#xff0c;之后进行配置&#xff0c;下载驱动包失败&#xff01; 解决 &#xff08;1&#xff09;下载Sqlite3驱动 下载地址: Central Repository: org/xerial/sqlite-jdbc 选择的版本是3.34.0,下载…

【Rxjava详解】(一)观察者模式的拓展

文章目录 RxJava引入扩展的观察者模式RxJava的观察者模式基本实现 RxJava入门示例Action RxJava引入 在介绍RxJava之前先说一下Rx。全称是Reactive Extensions&#xff0c;直译过来就是响应式扩展 Rx基于观察者模式&#xff0c;它是一种编程模型&#xff0c;目标是提供一致的…

PDF Reader Pro 3.0.1.0(pdf阅读器)

PDF Reader Pro是一款功能强大的PDF阅读、注释、填写表单&签名、转换、OCR、合并拆分PDF页面、编辑PDF等软件。 它支持多种颜色的高亮、下划线&#xff0c;可以按需选择&#xff0c;没有空白处可以进行注释&#xff0c;这时候便签是你最佳的选择&#xff0c;不点开时自动隐…

探索锦食送如何通过API集成无代码开发技术提高电商平台和营销系统效率

探索锦食送无代码开发集成技术 随着电子商务和营销系统的快速发展&#xff0c;企业不断寻求更高效和灵活的管理方式。锦食送&#xff0c;作为高端餐饮外卖服务的领先者&#xff0c;通过无代码开发的API集成技术&#xff0c;实现了电商平台和营销系统的高效管理。这种创新的连接…

ROS2串口通讯serial库(适用于humble版本)

要的串口操作的API介绍在这里&#xff1a;serial: serial::Serial Class Reference (wjwwood.io) 但是我们不是直接利用上面这个东西&#xff0c;而是使用的是根据这个改写的一个针对ros2的一个serial库&#xff0c;这个serial库是根据上面这个库改写来的&#xff0c;ros2的库在…

​​​​​​​3分钟实现EG网关串口连接麦格米特PLC

EG网关串口连接麦格米特PLC 前言&#xff1a;麦格米特PLC广泛应于工业控制领域&#xff0c;是一款性能高、稳定性强的PLC设备。此文档将介绍如何使用EG系列网关通过串口连接麦格米特PLC&#xff0c;并添加到EMCP物联网云平台&#xff0c;实现电脑Web页面、手机APP和微信对麦格米…

健康饮酒进家庭,国台酒业与碧桂园服务集团达成战略合作

11月19日&#xff0c;碧桂园服务集团与国台酒业集团战略合作发布会暨“健康饮酒进家庭”项目启动仪式在广州举行。 广东省酒类行业协会创会会长朱思旭&#xff0c;广东省酒类行业协会会长彭洪&#xff0c;碧桂园服务集团总裁徐彬淮&#xff0c;碧桂园服务集团酒类业务总经理、广…

第二证券:万亿巨头!业绩大超预期,但预警在华销售将大幅下滑

当地时间11月21日&#xff0c;美股三大股指收跌。其间&#xff0c;道指跌0.18%&#xff0c;标普500指数跌0.20%&#xff0c;纳斯达克指数跌0.59%。 英伟达盘后发布了第三财季成果&#xff0c;第三财季&#xff0c;英伟达总营收同比增两倍、EPS盈利增近六倍&#xff0c;分别较分…

跑步耳机哪个牌子好?这五款跑步耳机闭眼入也不会错!

作为一个经常跑步运动的人&#xff0c;总感觉运动能够让人暂时远离城市的喧嚣&#xff0c;同时运动也是一种特别好的舒压方法。但跑步的时候如果没有音乐助燃&#xff0c;那是没有灵魂的&#xff0c;这也许就是现代年轻人的矫情吧&#xff0c;我在运动的时候经常会佩戴骨传导耳…

【报错记录】解决使用Kotlin写的SpringBoot项目使用Aspect切面无法生效的问题

前言 为了能在SpringBoot使用Kotlin&#xff0c;真的是各种坑都彩礼一遍&#xff0c;这次遇到的问题是Aspect无法对Kotlin代码生效。我这里的使用场景是使用切面切Controller中的方法&#xff0c;用来对接口进行一些初始化和收尾工作。 Aspect在Controller类还是Java代码的时…

排序算法--冒泡排序

实现逻辑 ① 比较相邻的元素。如果第一个比第二个大&#xff0c;就交换他们两个。 ②对每一对相邻元素作同样的工作&#xff0c;从开始第一对到结尾的最后一对。在这一点&#xff0c;最后的元素应该会是最大的数。 ③针对所有的元素重复以上的步骤&#xff0c;除了最后一个。 ④…

最护眼的灯是白炽灯吗?专业的护眼台灯推荐

以前科技发展落后&#xff0c;晚上需要照明时也只有白炽灯可以使用&#xff0c;这也是迫不得已的事情。白炽灯最大的优点就是成本便宜&#xff0c;而且显色比较接近自然光。不过缺点也有着不少&#xff0c;例如&#xff1a;光线分布不均匀、刺眼、能耗高、寿命短等等。 如今时…

Vatee万腾的数字化掌舵:Vatee科技解决方案的全面引领

随着数字化时代的到来&#xff0c;Vatee万腾凭借其卓越的科技实力和全面的解决方案&#xff0c;成功地在数字化探索的航程中掌舵引领。 首先&#xff0c;Vatee万腾以其强大的数字化科技实力成为行业的引领者。vatee万腾不仅在人工智能、大数据分析、云计算等前沿领域取得了显著…

8-cgi fastcgi wsgi uwsgi uWSGI 分别是什么?如何自定制上下文管理器、Python是值传递还是引用传递

1 cgi fastcgi wsgi uwsgi uWSGI 分别是什么&#xff1f; 2 如何自定制上下文管理器 3 Python是值传递还是引用传递 1 cgi fastcgi wsgi uwsgi uWSGI 分别是什么&#xff1f; # CGI:通用网关接口&#xff08;Common Gateway Interface/CGI&#xff09;,CGI描述了服务器&#xf…

CAS方式实现单点登录SSO

1. CAS介绍 CAS&#xff08;Central Authentication Service&#xff09;中心认证服务 下面这张图来自官网&#xff0c;清晰简单的介绍了CAS的继续交互过程 2. CAS具体实现 首先需要分别搭建CAS-server和CAS-client服务&#xff0c; 这两个服务分别在2台机器上&#xff0c;…