【spark-spring boot】学习笔记

目录

  • 说明
  • RDD学习
    • RDD介绍
    • RDD案例
      • 基于集合创建RDD
      • RDD存入外部文件中
    • 转换算子 操作
      • map 操作
        • 说明
        • 案例
      • flatMap操作
        • 说明
        • 案例
      • filter 操作
        • 说明
        • 案例
      • groupBy 操作
        • 说明
        • 案例
      • distinct 操作
        • 说明
        • 案例
      • sortBy 操作
        • 说明
        • 案例
      • mapToPair 操作
        • 说明
        • 案例
      • mapValues操作
        • 说明
        • 案例
      • groupByKey操作
        • 说明
        • 案例
      • reduceByKey操作
        • 说明
        • 案例
      • sortByKey操作
        • 说明
        • 案例
    • 行动算子 操作
      • collect 操作
        • 说明
        • 案例
      • count 操作
        • 说明
        • 案例
      • first操作
        • 说明
        • 案例
      • take操作
        • 说明
        • 案例
      • countByKey操作
        • 说明
        • 案例
      • saveAsTextFile 操作
        • 说明
        • 案例
      • foreach操作
        • 说明
        • 案例

说明

本文依赖于上一篇文章:【spark学习】 spring boot 整合 spark 的相关内容,请先阅读上一篇文章,将spark需要的环境先配置好,并测试通过之后再进行此文章的阅读和操作。

RDD学习

RDD介绍

想象一下你有一个大大的数据表,里面包含了很多很多的信息。如果你想对这些数据进行操作,比如筛选出符合条件的数据、或者对数据做一些计算,RDD 就是 Spark 用来存储和操作这些数据的一种方式。它有两个基本操作:转换操作(就像是加工数据)和 行动操作(获取结果)

RDD案例

基于集合创建RDD

import lombok.extern.slf4j.Slf4j;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.Arrays;
import java.util.List;

/**
 * spark 案例
 */
@Slf4j
@Component
public class DemoTimer {
    @Autowired
    JavaSparkContext javaSparkContext;

    @Autowired
    SparkSession sparkSession;

    /**
     * 创建 RDD 算子
     */
    @PostConstruct
    public void createRDD() {
        // 从集合创建 RDD
        List<String> lists = Arrays.asList("hello", "spark", "hi", "spark", "hadoop");
        JavaRDD<String> parallelize = javaSparkContext.parallelize(lists); // 创建RDD
        parallelize.collect().forEach(System.out::println); // 打印
    }
}

输出结果:
在这里插入图片描述

RDD存入外部文件中

将RDD存入外部文件,用于观察文件结果和分区结果。txt文件数等于默认分区数。从结果中看出默认分区为4个。

注意:存放结果的文件夹路径必须没有被创建。

以下​案例中,将基于集合生成的RDD保存到saveRddDemo文件夹中​。

package www.zhangxiaosan.top.timer;


import lombok.extern.slf4j.Slf4j;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.Arrays;
import java.util.List;

/**
 * spark 案例
 */
@Slf4j
@Component
public class DemoTimer {
    @Autowired
    JavaSparkContext javaSparkContext;

    @Autowired
    SparkSession sparkSession;

    /**
     * 创建 RDD 算子
     */
    @PostConstruct
    public void createRDD() {
        // 从集合创建 RDD
        List<String> lists = Arrays.asList("hello", "spark", "hi", "spark", "hadoop");
        
        // parallelize(): 创建RDD,RDD中创建默认分区数
        // parallelize( 元素, 分区数): 创建RDD,RDD中指定分区数
        JavaRDD<String> parallelize = javaSparkContext.parallelize(lists); // 创建RDD
        
        parallelize.collect().forEach(System.out::println); // 打印

        // 存储的目录文件夹路径。此处为项目中的路径且目录必须为不存在的路径。
        String fileSavePath="I:\\zhang\\SpringBootSparkDemo\\src\\main\\resources\\saveRddDemo";
        parallelize.saveAsTextFile(fileSavePath);// 开始将RDD保存到文件中 
    }
}

​运行结果:
在文件夹中存放的是运行程序生成的文件。如下图。
_SUCCESS文件​:成功标记
part-XXX 文件:保存的数据文件 ​,结果中有4个文件,说明默认分区为4个。
在这里插入图片描述

转换算子 操作

转换算子(Transformation)是指那些返回一个新的 RDD 的操作。转换算子不会立即执行计算,而是构建一个执行计划,只有当行动算子(Action)触发计算时,转换算子的操作才会实际执行。转换算子可以用来处理和转换数据,比如映射、过滤、聚合等。

map 操作

说明

map() 方法传入一个函数作为参数。map() 方法会将RDD中的元素逐一进行调用,函数的参数即是RDD的元素。 如:map( 函数( RDD元素 ) )。 map() 方法会返回一个新的RDD。

案例

将RDD中的每个元素都拼接上字符串 “ say hi ”

package www.zhangxiaosan.top.timer;


import lombok.extern.slf4j.Slf4j;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.Arrays;
import java.util.List;

/**
 * spark 案例
 */
@Slf4j
@Component
public class DemoTimer {
    @Autowired
    JavaSparkContext javaSparkContext;

    @Autowired
    SparkSession sparkSession;

    /**
     * 创建 RDD 算子
     */
    @PostConstruct
    public void createRDD() {
        // 从集合创建 RDD
        List<String> lists = Arrays.asList("张三", "李四", "王五");
        JavaRDD<String> parallelize = javaSparkContext.parallelize(lists); // 创建RDD
		
		// lambda表达式传入匿名函数,进行元素拼接字符。
        // item 表示RDD中的元素
        parallelize = parallelize.map(item -> item + " say hi"); 
        
        parallelize.collect().forEach(System.out::println); // 打印
    }
}

输出结果:
在这里插入图片描述

flatMap操作

说明

flatMap() 方法传入一个函数作为参数 。flatMap() 方法会将RDD中的元素逐一进行调用,函数的参数即是RDD的元素。 如:flatMap( 函数( RDD元素 ) )。 flatMap() 方法会返回一个新的RDD。
flatMap() 方法 会将 RDD的元素扁平化处理成一个集合。

例如:
假设你有一个箱子,箱子里面放着几个小盒子,每个小盒子里又有一些玩具。flatMap 就是一个工具,它能帮你把每个小盒子里的玩具拿出来,直接放进一个大盒子里,最终把所有玩具放在一个地方。
每个小盒子里的玩具可以不止一个,甚至可能没有玩具(比如有的盒子是空的)。flatMap 会把每个盒子里的玩具都拿出来,放到一个大盒子里,最终得到一个扁平的大盒子,里面是所有玩具。

案例

将集合:[ [1, 2, 3, 4, 5],[hello, spark],[张三, 李四] ]
扁平化处理成:[ 1, 2, 3, 4, 5, hello, spark, 张三, 李四]

import lombok.extern.slf4j.Slf4j;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.Arrays;
import java.util.List;

/**
 * spark 案例
 */
@Slf4j
@Component
public class DemoTimer {
    @Autowired
    JavaSparkContext javaSparkContext;

    @Autowired
    SparkSession sparkSession;

    @PostConstruct
    public void flatMapDemo() {
    	// 声明集合:[ [1, 2, 3, 4, 5],[hello, spark],[张三, 李四] ]
        List<List<String>> arrs = Arrays.asList(
                Arrays.asList("1", "2", "3", "4", "5"),
                Arrays.asList("hello", "spark"),
                Arrays.asList("张三", "李四")
        );
        
        //输出声明的集合呢容
        System.out.println("原始集合打印:");
        arrs.forEach(item -> System.out.print(" " + item));

		// 分隔符
        System.out.println();
        System.out.println("-----------");

        System.out.println("flatMap操作后:");
        // 创建集合的RDD
        JavaRDD<List<String>> parallelize = javaSparkContext.parallelize(arrs);

		// flatMap操作
        List<String> collect = parallelize.flatMap(i -> i.iterator()).collect(); 

		// 打印 flatMap操作 后的集合
        collect.forEach(item -> System.out.print(" " + item));
        System.out.println();
    }
}

filter 操作

说明

filter() 方法传入一个函数作为参数,函数返回值只能为Boolean值。filter() 方法会将RDD中的元素逐一进行调用,函数的参数即是RDD的元素。 如:filter( 函数( RDD元素 ) )。 filter() 方法会返回一个新的RDD。filter() 将RDD元素中满足条件的元素保留,即函数返回为true的元素;RDD中不满足条件的元素,即函数返回为false的元素过滤。

案例

过滤单数,保留双数


import lombok.extern.slf4j.Slf4j;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.Arrays;
import java.util.List;

/**
 * spark 案例
 */
@Slf4j
@Component
public class DemoTimer {
    @Autowired
    JavaSparkContext javaSparkContext;

    @Autowired
    SparkSession sparkSession;

    @PostConstruct
    public void filterDemo() {
    	// 声明集合
        List<Integer> arrs = Arrays.asList(-1, 0, 1, 2, 3, 4, 5);

        System.out.println("原始集合打印:");
        arrs.forEach(item -> System.out.print(" " + item));
		// 输出过滤前的数据
        System.out.println();
        System.out.println("-----------");

        System.out.println("filter操作后:");
        JavaRDD<Integer> parallelize = javaSparkContext.parallelize(arrs);

		// 过滤掉 单数 (取模不等于0的数字为单数,否则为偶数)。item % 2 == 0 的数字为双数,返回有true保留
        List<Integer> collect = parallelize.filter(item -> item % 2 == 0).collect();
		
		// 输出过滤后的数据
        collect.forEach(item -> System.out.print(" " + item));
        System.out.println();
    }
}

运行结果:
在这里插入图片描述

groupBy 操作

说明

将数据按某些条件进行分组。每个组由键值对组成。
groupBy() 常和以下聚合函数一起使用,来对分组数据进行统计分析。
常用的聚合操作包括:

  1. count(): 统计每个组的元素数量
  2. sum(): 计算每个组的元素总和
  3. avg(): 计算每个组的平均值
  4. max():计算每个组的最大值
  5. min(): 计算每个组的最小值
  6. agg(): 自定义聚合操作,可以结合多个聚合函数
案例

给写生成绩按照分数来分组。

import lombok.extern.slf4j.Slf4j;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import scala.Tuple2;
import javax.annotation.PostConstruct;
import java.util.*;

/**
 * spark 案例
 */
@Slf4j
@Component
public class DemoTimer {
    @Autowired
    JavaSparkContext javaSparkContext;

    @Autowired
    SparkSession sparkSession;

    @PostConstruct
    public void groupByDemo() {
    	// 定义学生成绩数据
        List<Tuple2<String, Integer>> students = Arrays.asList(
                new Tuple2<>("张三", 85),
                new Tuple2<>("李四", 90),
                new Tuple2<>("王五", 85),
                new Tuple2<>("赵六", 95),
                new Tuple2<>("孙七", 90)
        );
        
        // 创建RDD
        JavaRDD<Tuple2<String, Integer>> parallelize = javaSparkContext.parallelize(students);

        // 使用 groupBy 进行分组,按成绩分组
        JavaPairRDD<Object, Iterable<Tuple2<String, Integer>>> integerIterableJavaPairRDD = parallelize.groupBy(tuple -> tuple._2());// 使用 groupBy 按成绩分组。_2表示元组的第二个元素,即分数

        // 打印结果
        integerIterableJavaPairRDD.sortByKey().foreach(item -> System.out.println("成绩:" + item._1() + ",学生:" + item._2()));

    }
}

结果如下:
在这里插入图片描述

distinct 操作

说明

对RDD的元素进行分布式去重。返回新的RDD,新的RDD内的元素不重复出现。

案例

将集合中重复的元素去除。

import lombok.extern.slf4j.Slf4j;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import scala.Tuple2;
import javax.annotation.PostConstruct;
import java.util.*;

/**
 * spark 案例
 */
@Slf4j
@Component
public class DemoTimer {
    @Autowired
    JavaSparkContext javaSparkContext;

    @Autowired
    SparkSession sparkSession;
    
    @PostConstruct
    public void distinctDemo() {
        // 声明集合
        List<Integer> arrs = Arrays.asList(1,2,3,4,5,6,1,4,6);

        // 创建RDD
        JavaRDD<Integer> parallelize = javaSparkContext.parallelize(arrs);

        // distinct()方法,用于去除重复元素。
        JavaRDD<Integer> distinct = parallelize.distinct();

        // 打印结果
        distinct.collect().forEach(System.out::println);
    }
}

运行结果:
在这里插入图片描述

sortBy 操作

说明

对RDD的元素进行排序,返回新的RDD。
sortBy() 可传入3个参数:
参数1:函数,每个RDD元素都会传入此函数中,此函数定义排序规则。
参数2:boolean值。定义排序顺序。true为升序,false降序。
参数3:Integer 整数,指定分区数量。

案例

将集合中的元素降序排序。

import lombok.extern.slf4j.Slf4j;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import scala.Tuple2;
import javax.annotation.PostConstruct;
import java.util.*;

/**
 * spark 案例
 */
@Slf4j
@Component
public class DemoTimer {
    @Autowired
    JavaSparkContext javaSparkContext;

    @Autowired
    SparkSession sparkSession;
    
    @PostConstruct
    public void sortByDemo() {
	    // 创建无序集合
        List<Integer> arrs = Arrays.asList(7,2,5,4,1,9,6,3,8);

        // 创建RDD
        JavaRDD<Integer> parallelize = javaSparkContext.parallelize(arrs);

        // 使用 sortBy 进行排序,按元素值排序,并返回一个RDD。
        JavaRDD<Integer> sortBy = parallelize.sortBy(item -> item, false, 1);

        // 打印结果
        sortBy.collect().forEach(System.out::println);
    }
}

运行结果:
在这里插入图片描述

mapToPair 操作

说明

mapToPair()是将一个普通的 RDD 转换为 JavaPairRDD 的一个方法。 JavaPairRDD 中的每个元素都是一个键值对。
mapToPair对RDD的元素进行映射成一个由键值对组成的 RDD(即映射成JavaPairRDD),即将每个元素转换成一个 Tuple2 对象,其中第一个元素是键(key),第二个元素是值(value)。

案例

将集合 [“张三:85”, “李四:90”, “王五:85”, “赵六:95”, “孙七:90”] 中的学生和成绩按照冒号分隔,形成键值对,姓名为键,值为成绩。

import lombok.extern.slf4j.Slf4j;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.SparkSession;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import scala.Tuple2;
import javax.annotation.PostConstruct;
import java.util.*;

/**
 * spark 案例
 */
@Slf4j
@Component
public class DemoTimer {
    @Autowired
    JavaSparkContext javaSparkContext;

    @Autowired
    SparkSession sparkSession;

    @PostConstruct
    public void mapToPairDemo() {
        // 创建包含学生成绩的列表
        List<String> students = Arrays.asList("张三:85", "李四:90", "王五:85", "赵六:95", "孙七:90");
        
        // 将数据并行化为 RDD
        JavaRDD<String> studentsRDD = javaSparkContext.parallelize(students);

        // 使用 mapToPair 将每个元素转换为 (姓名, 成绩) 的键值对
        JavaPairRDD<String, Integer> studentsPairRDD = studentsRDD.mapToPair((PairFunction<String, String, Integer>) s -> {
            // 根据冒号分隔学生姓名和成绩,返回一个 (姓名, 成绩) 的元组
            String[] parts = s.split(":");
            // 返回一个 (姓名, 成绩) 的元组
            return new Tuple2<>(parts[0], Integer.parseInt(parts[1]));
        });

        // 打印结果
        studentsPairRDD.collect().forEach(System.out::println);
    }
}

运行结果:
在这里插入图片描述

mapValues操作

说明

mapValues() 是一个用于处理 JavaPairRDD 的方法。它可以对 RDD 中的每个键值对的 值(value) 进行转换,同时保留原来的 键(key) 不变。

案例

将集合 [“张三:85”, “李四:90”, “王五:85”, “赵六:95”, “孙七:90”] 中的学生和成绩按照冒号分隔,形成键值对,姓名为键,值为成绩。并将成绩在原分数上+5分。


import lombok.extern.slf4j.Slf4j;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.SparkSession;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import scala.Tuple2;
import javax.annotation.PostConstruct;
import java.util.*;

/**
 * spark 案例
 */
@Slf4j
@Component
public class DemoTimer {
    @Autowired
    JavaSparkContext javaSparkContext;

    @Autowired
    SparkSession sparkSession;

    @PostConstruct
    public void mapToPairDemo() {
        // 创建包含学生成绩的列表
        List<String> students = Arrays.asList("张三:85", "李四:90", "王五:85", "赵六:95", "孙七:90");

        // 将数据并行化为 RDD
        JavaRDD<String> studentsRDD = javaSparkContext.parallelize(students);

        // 使用 mapToPair 将每个元素转换为 (姓名, 成绩) 的键值对
        JavaPairRDD<String, Integer> studentsPairRDD = studentsRDD.mapToPair((PairFunction<String, String, Integer>) s -> {
            // 根据冒号分隔学生姓名和成绩,返回一个 (姓名, 成绩) 的元组
            String[] parts = s.split(":");
            // 返回一个 (姓名, 成绩) 的元组
            return new Tuple2<>(parts[0], Integer.parseInt(parts[1]));
        });

        // 打印结果
        System.out.println("原始分数:");
        studentsPairRDD.collect().forEach(System.out::println);

        System.out.println("\n+5分后:");
        // item 表示每个键值对中的值,即分数
        JavaPairRDD<String, Integer> newValeRdd = studentsPairRDD.mapValues(item -> item + 5);

        // 打印结果
        newValeRdd.collect().forEach(System.out::println);
    }
}

运行结果:
在这里插入图片描述

groupByKey操作

说明

groupByKey() 是一个用于处理 JavaPairRDD 的方法。它根据键对数据进行分组,将所有具有相同键的元素聚集到一起,生成一个新的 RDD,其中每个键对应一个包含所有相同键值的集合。
每个键对应的值收集到一个 Iterable 容器中,然后返回一个新的 RDD,其中每个键对应一个包含该键的所有值的 Iterable。

案例

将集合 [“张三:85”, “李四:90”, “陈八:95”, “王五:85”, “黄六:92”] 中的学生和成绩按照冒号分隔,形成键值对,成绩为键,值为姓名。并将键值对中的键进行分类,将相同分数的学生归成一组。

import lombok.extern.slf4j.Slf4j;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.SparkSession;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import scala.Tuple2;
import javax.annotation.PostConstruct;
import java.util.*;

/**
 * spark 案例
 */
@Slf4j
@Component
public class DemoTimer {
    @Autowired
    JavaSparkContext javaSparkContext;

    @Autowired
    SparkSession sparkSession;

    @PostConstruct
    public void groupByKeyDemo(){
        // 创建包含学生成绩的列表
        List<String> students = Arrays.asList("张三:85", "李四:90", "陈八:95", "王五:85", "黄六:92");

        // 将数据并行化为 RDD
        JavaRDD<String> studentsRDD = javaSparkContext.parallelize(students);

        // 使用 mapToPair 将每个元素转换为 (成绩, 姓名) 的键值对
        JavaPairRDD<Integer, String> studentsPairRDD = studentsRDD.mapToPair(s -> {
            String[] parts = s.split(":");
            return new Tuple2<Integer, String>(Integer.parseInt(parts[1]), parts[0]);
        });
        
        // 根据键值对中的键进行分类
        JavaPairRDD<Integer, Iterable<String>> groupedRDD = studentsPairRDD.groupByKey();
		
		// 打印结果
        groupedRDD.collect().forEach(pair -> {
            System.out.println("成绩: " + pair._1() + ", 姓名: " + pair._2());
        });
  	}
}

执行结果:
在这里插入图片描述

reduceByKey操作

说明

reduceByKey()用于对 JavaPairRDD 数据进行聚合的一个方法。它根据键对值进行合并,并在分区内进行局部聚合,从而减少了跨节点的数据传输,通常比 groupByKey() 更高效。

案例

计算学生的总分。

import lombok.extern.slf4j.Slf4j;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.SparkSession;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import scala.Tuple2;
import javax.annotation.PostConstruct;
import java.util.*;

/**
 * spark 案例
 */
@Slf4j
@Component
public class DemoTimer {
    @Autowired
    JavaSparkContext javaSparkContext;

    @Autowired
    SparkSession sparkSession;

    @PostConstruct
    public void groupByKeyDemo(){
        // 创建包含学生成绩的列表
        List<String> students = Arrays.asList("张三:85", "李四:90", "张三:95", "王五:85", "李四:92");

        // 将数据并行化为 RDD
        JavaRDD<String> studentsRDD = javaSparkContext.parallelize(students);
        
        // 使用 mapToPair 将每个元素转换为 (姓名, 成绩) 的键值对
        JavaPairRDD<String, Integer> studentsPairRDD = studentsRDD.mapToPair(s -> {
            String[] parts = s.split(":");
            return new Tuple2<>(parts[0], Integer.parseInt(parts[1]));
        });

        // 使用 groupByKey 按学生姓名进行成绩求和
        JavaPairRDD<String, Integer> groupedRDD = studentsPairRDD.reduceByKey((a, b) -> a + b);

        // 打印结果
        groupedRDD.collect().forEach(pair -> {
            System.out.println("姓名: " + pair._1() + ", 成绩: " + pair._2());
        });
    }
}

运行结果:
在这里插入图片描述

sortByKey操作

说明

对 JavaPairRDD 数据按键进行排序的方法。默认为升序。
传入参数为Boolean值:true 升序 ,false降序

案例

将学生成绩按照成绩从高到低降序排序

import lombok.extern.slf4j.Slf4j;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.SparkSession;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import scala.Tuple2;
import javax.annotation.PostConstruct;
import java.util.*;

/**
 * spark 案例
 */
@Slf4j
@Component
public class DemoTimer {
    @Autowired
    JavaSparkContext javaSparkContext;

    @Autowired
    SparkSession sparkSession;
    @PostConstruct
    public void sortByKeyDemo(){
    	// 定义原始数据
        List<Tuple2<String, Integer>> students = Arrays.asList(
                new Tuple2<>("张三", 85),
                new Tuple2<>("李四", 90),
                new Tuple2<>("王五", 65),
                new Tuple2<>("赵六", 95),
                new Tuple2<>("孙七", 90)
        );
		// 生成RDD
        JavaPairRDD<String, Integer> studentsPairRDD = javaSparkContext.parallelizePairs(students);

        // 将学生原集合(姓名,成绩)格式的数据转换为(成绩,姓名)格式的键值对
        JavaPairRDD<Integer, String> integerStringJavaPairRDD = studentsPairRDD.mapToPair(item -> new Tuple2<>(item._2(), item._1()));
		
		// 将(成绩,姓名)格式的键值对按照键降序排序,即按照成绩降序排序
        JavaPairRDD<Integer, String> stringIntegerJavaPairRDD = integerStringJavaPairRDD.sortByKey(false);
		
		// 打印结果
        stringIntegerJavaPairRDD .collect() .forEach(pair -> {
            System.out.println("成绩: " + pair._1() + ", 姓名: " + pair._2());
        });
    }
}

运行结果:
在这里插入图片描述

行动算子 操作

行动算子(Action)是指会触发 Spark 作业的执行,并且会产生一个结果或者副作用的操作。与 转换算子(Transformation)不同,转换算子只会定义数据转换的计算逻辑,而不会立即执行。只有在遇到行动算子时,Spark 才会真正开始计算,并将结果返回给用户或写入外部存储。

当你调用一个行动算子时,Spark 会从头开始执行所有必要的转换操作,并将结果返回给你或者存储到外部系统(如 HDFS、数据库等)。

行动算子通常会返回一个具体的结果,例如一个列表、一个数值,或者在某些情况下,可能会执行一些副作用操作(例如将数据写入磁盘)。

collect 操作

说明

将分布式 RDD 中的所有数据项拉取到本地驱动程序(Driver)中,通常作为一个数组、列表或其他集合类型。因为 collect() 会将整个 RDD 数据集拉到本地,所以如果数据量非常大,可能会导致内存溢出(OutOfMemoryError)。

注意事项:
数据量大时的风险:如果 RDD 中包含的数据量非常大,调用 collect() 会导致所有数据被加载到本地驱动程序的内存中,这可能会导致内存溢出错误(OutOfMemoryError)。因此,建议在数据集非常大的情况下谨慎使用 collect()。

建议:对于大规模数据集,通常会使用其他行动算子(如 take())获取一个数据的子集,避免一次性加载所有数据。

并行计算的代价:尽管 collect() 会将所有数据从分布式环境中拉回到单个节点,但它不会对数据进行额外的计算,只会执行之前定义的转换操作。因此,它本质上是将整个数据集的计算结果从集群中汇总回来。

案例

收集RDD数据并打印。

import lombok.extern.slf4j.Slf4j;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.SparkSession;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import scala.Tuple2;
import javax.annotation.PostConstruct;
import java.util.*;

/**
 * spark 案例
 */
@Slf4j
@Component
public class DemoTimer {
    @Autowired
    JavaSparkContext javaSparkContext;

    @Autowired
    SparkSession sparkSession;
    @PostConstruct
    public void collectDemo(){
        // 定义一个整数集合
        List<Integer> arrs = Arrays.asList(1, 2, 3, 4, 5);

        // 创建RDD
        JavaRDD<Integer> parallelize = javaSparkContext.parallelize(arrs);

        // collect()方法,用于将RDD中的数据收集到Driver端,并返回一个List。
        List<Integer> collect = parallelize.collect();

        // 打印结果
        collect.forEach(item -> System.out.println(item));
    }
}

执行结果:
在这里插入图片描述

count 操作

说明

统计RDD中的元素数量。

案例

统计RDD中的元素数量,并输出


import lombok.extern.slf4j.Slf4j;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.SparkSession;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import scala.Tuple2;
import javax.annotation.PostConstruct;
import java.util.*;

/**
 * spark 案例
 */
@Slf4j
@Component
public class DemoTimer {
    @Autowired
    JavaSparkContext javaSparkContext;

    @Autowired
    SparkSession sparkSession;
    @PostConstruct
    public void countDemo(){
        // 定义一个整数集合
        List<Integer> arrs = Arrays.asList(1, 2, 3, 4, 5);

        // 创建RDD
        JavaRDD<Integer> parallelize = javaSparkContext.parallelize(arrs);

        // count()方法,统计数量。
        Long total = parallelize.count();

        // 打印结果
        System.out.println("元素数量 = " + total);
    }
}

运行结果:
在这里插入图片描述

first操作

说明

返回RDD中的第一个元素

案例

获取RDD中的第一个元素并打印

import lombok.extern.slf4j.Slf4j;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.SparkSession;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import scala.Tuple2;
import javax.annotation.PostConstruct;
import java.util.*;

/**
 * spark 案例
 */
@Slf4j
@Component
public class DemoTimer {
    @Autowired
    JavaSparkContext javaSparkContext;

    @Autowired
    SparkSession sparkSession;

    @PostConstruct
    public void firstDemo(){
        // 创建一个集合
        List<Integer> arrs = Arrays.asList(1, 2, 3, 4, 5);

        // 创建RDD
        JavaRDD<Integer> parallelize = javaSparkContext.parallelize(arrs);

        // 获取第一个元素
        Integer first = parallelize.first();

        // 打印结果
        System.out.println("第一个元素 = " + first);
    }
}

运行结果:在这里插入图片描述

take操作

说明

在RDD中从头获取指定数量的元素,返回获取的元素集合。

案例

获取前3个元素

import lombok.extern.slf4j.Slf4j;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.SparkSession;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import scala.Tuple2;
import javax.annotation.PostConstruct;
import java.util.*;

/**
 * spark 案例
 */
@Slf4j
@Component
public class DemoTimer {
    @Autowired
    JavaSparkContext javaSparkContext;

    @Autowired
    SparkSession sparkSession;

    @PostConstruct
    public void takeDemo(){
        // 创建一个集合
        List<Integer> arrs = Arrays.asList(1, 2, 3, 4, 5);

        // 创建RDD
        JavaRDD<Integer> parallelize = javaSparkContext.parallelize(arrs);

        // 获取前三个元素
        List<Integer> takes = parallelize.take(3);

        // 打印结果
        System.out.println("前三个元素 = " + takes);
    }
}

运行结果:
在这里插入图片描述

countByKey操作

说明

统计RDD中每种键的数量。

案例

根据键值对,统计键值对中不同键的数量。并打印。

import lombok.extern.slf4j.Slf4j;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.SparkSession;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import scala.Tuple2;
import javax.annotation.PostConstruct;
import java.util.*;

/**
 * spark 案例
 */
@Slf4j
@Component
public class DemoTimer {
    @Autowired
    JavaSparkContext javaSparkContext;

    @Autowired
    SparkSession sparkSession;

    @PostConstruct
    public void countByKeyDemo(){
        // 创建一个集合
        List<Tuple2<String,Integer>> arrs = Arrays.asList(
                new Tuple2<String,Integer>("张三",15),
                new Tuple2<String,Integer>("张三",20),
                new Tuple2<String,Integer>("李四",20),
                new Tuple2<String,Integer>("李四",30),
                new Tuple2<String,Integer>("李四",50),
                new Tuple2<String,Integer>("王五",10)
        );

        // 创建JavaPairRDD 对象,JavaPairRDD对象的元素为键值对。
        JavaPairRDD<String, Integer> parallelize = javaSparkContext.parallelizePairs(arrs);

        // 使用 countByKey() 方法,统计相同键的数量。
        Map<String, Long> countByKey = parallelize.countByKey();

        // 打印结果
        countByKey.forEach((key, value) -> System.out.println("键: " + key + ", 数量: " + value));
    }
}

运行结果:
在这里插入图片描述

saveAsTextFile 操作

说明

将 RDD 的数据以txt文件保存到外部存储系统。传入指定路径,文件会生成到该路径下。

注意
输出路径不能存在:如果输出路径已经存在,saveAsTextFile 会抛出异常
文件分区:Spark 会将每个分区的数据写入一个独立的文件。因此,如果 RDD 有多个分区,它会生成多个文件,每个文件对应一个分区的数据 。 文件名会根据分区编号进行自动命名,通常形式是:part-00***
文本格式:保存时,RDD 中的每个元素会被转换为文本行。默认情况下,Spark 会把 RDD 中的每个元素的 toString() 输出到文件中。
路径支持分布式存储:saveAsTextFile 支持将数据保存到本地文件系统、HDFS、S3 等分布式存储中。路径的格式取决于存储系统的类型。例如,如果要保存到 HDFS,路径应该以 hdfs:// 开头。

案例

将RDD存入外部文件。

以下​案例中,将基于集合生成的RDD保存到saveRddDemo文件夹中​。具体参考本文中的 RDD存入外部文件中 内容

import lombok.extern.slf4j.Slf4j;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.Arrays;
import java.util.List;

/**
 * spark 案例
 */
@Slf4j
@Component
public class DemoTimer {
    @Autowired
    JavaSparkContext javaSparkContext;

    @Autowired
    SparkSession sparkSession;

    /**
     * 创建 RDD 算子
     */
    @PostConstruct
    public void createRDD() {
        // 从集合创建 RDD
        List<String> lists = Arrays.asList("hello", "spark", "hi", "spark", "hadoop");
        
        // parallelize(): 创建RDD,RDD中创建默认分区数
        // parallelize( 元素, 分区数): 创建RDD,RDD中指定分区数
        JavaRDD<String> parallelize = javaSparkContext.parallelize(lists); // 创建RDD
        
        parallelize.collect().forEach(System.out::println); // 打印

        // 存储的目录文件夹路径。此处为项目中的路径且目录必须为不存在的路径。
        String fileSavePath="I:\\zhang\\SpringBootSparkDemo\\src\\main\\resources\\saveRddDemo";
        parallelize.saveAsTextFile(fileSavePath);// 开始将RDD保存到文件中 
    }
}

foreach操作

说明

循环遍历RDD中的元素

案例

以上案例中大部分用到此方法,是否用方式看以上案例。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/923779.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

C++ 红黑树:红黑树的插入及应用(map与set的封装)

目录 红黑树 红黑树的概念 红黑树的性质 红黑树节点的定义 一、如果默认给黑色 二、如果默认给红色 红黑树的插入操作 1.按搜索树的规则进行插入 2.检测新节点插入后&#xff0c;红黑树的性质是否造到破坏 情况一&#xff1a;cur为红&#xff0c;parent为红&#xff…

elementUI非常规数据格式渲染复杂表格(副表头、合并单元格)

效果 数据源 前端代码 (展示以及表格处理/数据处理) 标签 <el-table :data"dataList" style"width: 100%" :span-method"objectSpanMethod"><template v-for"(item, index) in headers"><el-table-column prop"…

HTML详解(1)

1.HTML定义 HTML&#xff1a;超文本标记语言。超文本&#xff1a;通过链接可以把多个网页链接到一起标记&#xff1a;标签&#xff0c;带括号的文本后缀&#xff1a;.html 标签语法&#xff1a;<strong>需加粗文字</strong> 成对出现&#xff0c;中间包裹内容&l…

两数之和--leetcode100题

一&#xff0c;前置知识 1&#xff0c;vector向量 二&#xff0c;题目 1. 两数之和https://leetcode.cn/problems/two-sum/ 给定一个整数数组 nums 和一个整数目标值 target&#xff0c;请你在该数组中找出 和为目标值 target 的那 两个 整数&#xff0c;并返回它们的数组下…

微信小程序条件渲染与列表渲染的全面教程

微信小程序条件渲染与列表渲染的全面教程 引言 在微信小程序的开发中,条件渲染和列表渲染是构建动态用户界面的重要技术。通过条件渲染,我们可以根据不同的状态展示不同的内容,而列表渲染则使得我们能够高效地展示一组数据。本文将详细讲解这两种渲染方式的用法,结合实例…

李宏毅机器学习课程知识点摘要(14-18集)

线性回归&#xff0c;逻辑回归&#xff08;线性回归sigmoid&#xff09;&#xff0c;神经网络 linear regression &#xff0c; logistic regression &#xff0c; neutral network 里面的偏导的相量有几百万维&#xff0c;这就是neutral network的不同&#xff0c;他是…

Bean的生命周期详解保姆级教程,结合spring boot和spring.xml两种方式讲解,5/7/10大小阶段详细分析

文章目录 Spring Bean的生命周期一、为什么知道 Bean 的生命周期&#xff1f;二、生命周期大致了解三、详细分析生命周期3.1 ① 初步划分为 5 步&#xff1a;3.1.1 spring 框架中怎么理解3.1.2 spring boot 项目中怎么理解 3.2 ② 细分 5 步为 7 步&#xff1a;3.2.1 spring 框…

gRPC 双向流(Bidirectional Streaming RPC)的使用方法

gRPC 是一个支持多种语言的高性能 RPC 框架&#xff0c;拥有丰富的 API 来简化服务端和客户端的开发过程。gRPC 支持四种 RPC 类型&#xff1a;Unary RPC、Server Streaming RPC、Client Streaming RPC 和 Bidirectional Streaming RPC。下面是双向流 API 的使用方法。 双向流…

ffmpeg视频滤镜:替换部分帧-freezeframes

滤镜描述 freezeframes 官网地址 > FFmpeg Filters Documentation 这个滤镜接收两个输入&#xff0c;然后会将第一个视频中的部分帧替换为第二个视频的某一帧。 滤镜使用 参数 freezeframes AVOptions:first <int64> ..FV....... set first fra…

解决SpringBoot连接Websocket报:请求路径 404 No static resource websocket.

问题发现 最近在工作中用到了WebSocket进行前后端的消息通信&#xff0c;后端代码编写完后&#xff0c;测试一下是否连接成功&#xff0c;发现报No static resource websocket.&#xff0c;看这个错貌似将接口变成了静态资源来访问了&#xff0c;第一时间觉得是端点没有注册成…

Leetcode322.零钱兑换(HOT100)

链接 代码&#xff1a; class Solution { public:int coinChange(vector<int>& coins, int amount) {vector<int> dp(amount1,amount1);//要兑换amount元硬币&#xff0c;我们就算是全选择1元的硬币&#xff0c;也不过是amount个&#xff0c;所以初始化amoun…

网络安全期末复习

第1章 网络安全概括 &#xff08;1&#xff09;用户模式切换到系统配置模式&#xff08;enable&#xff09;。 &#xff08;2&#xff09;显示当前位置的设置信息&#xff0c;很方便了解系统设置&#xff08;show running-config&#xff09;。 &#xff08;3&#xff09;显…

鸿蒙进阶篇-自定义组件

大家好&#xff0c;我是鸿蒙开天组&#xff0c;今天咱们来学习自定义组件。 一、自定义组件定义 在ArkUI中&#xff0c;UI显示的内容均为组件&#xff0c;由框架直接提供的称为系统组件&#xff0c;由开发者定义的称为自定义组件。在进行 UI 界面开发时&#xff0c;通常不是简…

深入浅出 WebSocket:构建实时数据大屏的高级实践

简介 请参考下方&#xff0c;学习入门操作 基于 Flask 和 Socket.IO 的 WebSocket 实时数据更新实现 在当今数字化时代&#xff0c;实时性是衡量互联网应用的重要指标之一。无论是股票交易、在线游戏&#xff0c;还是实时监控大屏&#xff0c;WebSocket 已成为实现高效、双向…

一键AI换脸软件,支持表情控制,唇形同步Facefusion-3.0.0发布!支持N卡和CPU,一键启动包

嗨,小伙伴们!还记得小编之前介绍的FaceFusion 2.6.1吗?今天给大家带来超级exciting的消息 —— FaceFusion 3.0.0闪亮登场啦! &#x1f31f; 3.0.0版本更新 &#x1f3d7;️ 全面重构:修复了不少小虫子,运行更稳定,再也不怕突然罢工啦! &#x1f600; Live Portrait功能:新增…

spring boot框架漏洞复现

spring - java开源框架有五种 Spring MVC、SpringBoot、SpringFramework、SpringSecurity、SpringCloud spring boot版本 版本1: 直接就在根下 / 版本2:根下的必须目录 /actuator/ 端口:9093 spring boot搭建 1:直接下载源码打包 2:运行编译好的jar包:actuator-testb…

hhdb数据库介绍(10-8)

首页 管理平台通过数据可视方式在首页功能中实时展示计算节点集群的数据量、访问流量、集群组件状态、告警事件、安全防控等用户关心的信息。 集群安全 邮件通知&#xff1a;根据通知设置中监控开关是否打开判断&#xff0c;分为&#xff1a;全部开启、未开启、部分开启&…

Vue前端开发-slot传参

slot 又称插槽&#xff0c;它是在子组件中为父组件提供的一个占位符&#xff0c;使用来表示&#xff0c;通过这个占位符&#xff0c;父组件可以向中填充任意的内容代码&#xff0c;这些代码将自动替换占位符的位置&#xff0c;从而轻松实现在父组件中控制子组件内容的需求。 作…

18:(标准库)DMA二:DMA+串口收发数据

DMA串口收发数据 1、DMA串口发送数据2、DMA中断串口接收定长数据包3、串口空闲中断DMA接收不定长数据包 1、DMA串口发送数据 当串口的波特率大于115200时&#xff0c;可以通过DMA1进行数据搬运&#xff0c;以防止数据的丢失。如上图所示&#xff1a;UART1的Tx发送请求使用DMA1的…

2024 java大厂面试复习总结(一)(持续更新)

10年java程序员&#xff0c;2024年正好35岁&#xff0c;2024年11月公司裁员&#xff0c;记录自己找工作时候复习的一些要点。 java基础 hashCode()与equals()的相关规定 如果两个对象相等&#xff0c;则hashcode一定也是相同的两个对象相等&#xff0c;对两个对象分别调用eq…