Spark的转换算子和操作算子

Transformation转换算子

1.1 Value类型

1)创建包名:com.shangjack.value

1.1.1 map()映射

参数f是一个函数可以写作匿名子类,它可以接收一个参数。当某个RDD执行map方法时,会遍历该RDD中的每一个数据项,并依次应用f函数,从而产生一个新的RDD。即,这个新RDD中的每一个元素都是原来RDD中每一个元素依次应用f函数而得到的。

1)具体实现

package com.shangjack.value;

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaRDD;

import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.api.java.function.Function;

public class Test01_Map {

    public static void main(String[] args) {

        // 1.创建配置对象

        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");

        // 2. 创建sparkContext

        JavaSparkContext sc = new JavaSparkContext(conf);

        // 3. 编写代码

        JavaRDD<String> lineRDD = sc.textFile("input/1.txt");

        // 需求:每行结尾拼接||

        // 两种写法  lambda表达式写法(匿名函数) 

        JavaRDD<String> mapRDD = lineRDD.map(s -> s + "||");

        // 匿名函数写法 

        JavaRDD<String> mapRDD1 = lineRDD.map(new Function<String, String>() {

            @Override

            public String call(String v1) throws Exception {

                return v1 + "||";

            }

        });

        for (String s : mapRDD.collect()) {

            System.out.println(s);

        }

        // 输出数据的函数写法

        mapRDD1.collect().forEach(a -> System.out.println(a));

        mapRDD1.collect().forEach(System.out::println);

        // 4. 关闭sc

        sc.stop();

    }

}

1.1.2 flatMap()扁平化

1)功能说明

与map操作类似,将RDD中的每一个元素通过应用f函数依次转换为新的元素,并封装到RDD中。

区别:在flatMap操作中,f函数的返回值是一个集合,并且会将每一个该集合中的元素拆分出来放到新的RDD中。

2)需求说明:创建一个集合,集合里面存储的还是子集合,把所有子集合中数据取出放入到一个大的集合中。

4)具体实现:

package com.shangjack.value;

import org.apache.commons.collections.ListUtils;

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaRDD;

import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.api.java.function.FlatMapFunction;

import java.util.ArrayList;

import java.util.Arrays;

import java.util.Iterator;

import java.util.List;

public class Test02_FlatMap {

    public static void main(String[] args) {

        // 1.创建配置对象

        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");

        // 2. 创建sparkContext

        JavaSparkContext sc = new JavaSparkContext(conf);

        // 3. 编写代码

        ArrayList<List<String>> arrayLists = new ArrayList<>();

        arrayLists.add(Arrays.asList("1","2","3"));

        arrayLists.add(Arrays.asList("4","5","6"));

        JavaRDD<List<String>> listJavaRDD = sc.parallelize(arrayLists,2);

        // 对于集合嵌套的RDD 可以将元素打散

        // 泛型为打散之后的元素类型

        JavaRDD<String> stringJavaRDD = listJavaRDD.flatMap(new FlatMapFunction<List<String>, String>() {

            @Override

            public Iterator<String> call(List<String> strings) throws Exception {

                return strings.iterator();

            }

        });

        stringJavaRDD. collect().forEach(System.out::println);

        // 通常情况下需要自己将元素转换为集合

        JavaRDD<String> lineRDD = sc.textFile("input/2.txt");

        JavaRDD<String> stringJavaRDD1 = lineRDD.flatMap(new FlatMapFunction<String, String>() {

            @Override

            public Iterator<String> call(String s) throws Exception {

                String[] s1 = s.split(" ");

                return Arrays.asList(s1).iterator();

            }

        });

        stringJavaRDD1. collect().forEach(System.out::println);

        // 4. 关闭sc

        sc.stop();

    }

}

1.1.3 groupBy()分组

1)功能说明:分组,按照传入函数的返回值进行分组。将相同的key对应的值放入一个迭代器。

2)需求说明:创建一个RDD,按照元素模以2的值进行分组。

3)具体实现

package com.shangjack.value;

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaPairRDD;

import org.apache.spark.api.java.JavaRDD;

import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.api.java.function.Function;

import java.util.Arrays;

public class Test03_GroupBy {

    public static void main(String[] args) {

        // 1.创建配置对象

        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");

        // 2. 创建sparkContext

        JavaSparkContext sc = new JavaSparkContext(conf);

        // 3. 编写代码

        JavaRDD<Integer> integerJavaRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4),2);

        // 泛型为分组标记的类型

        JavaPairRDD<Integer, Iterable<Integer>> groupByRDD = integerJavaRDD.groupBy(new Function<Integer, Integer>() {

            @Override

            public Integer call(Integer v1) throws Exception {

                return v1 % 2;

            }

        });

        groupByRDD.collect().forEach(System.out::println);

        // 类型可以任意修改

        JavaPairRDD<Boolean, Iterable<Integer>> groupByRDD1 = integerJavaRDD.groupBy(new Function<Integer, Boolean>() {

            @Override

            public Boolean call(Integer v1) throws Exception {

                return v1 % 2 == 0;

            }

        });

        groupByRDD1. collect().forEach(System.out::println);

Thread.sleep(600000);

        // 4. 关闭sc

        sc.stop();

    }

}

  • groupBy会存在shuffle过程
  • shuffle:将不同的分区数据进行打乱重组的过程
  • shuffle一定会落盘。可以在local模式下执行程序,通过4040看效果。

1.1.4 filter()过滤

1)功能说明

接收一个返回值为布尔类型的函数作为参数。当某个RDD调用filter方法时,会对该RDD中每一个元素应用f函数,如果返回值类型为true,则该元素会被添加到新的RDD中。

2)需求说明:创建一个RDD,过滤出对2取余等于0的数据

3)代码实现

package com.shangjack.value;

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaRDD;

import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.api.java.function.Function;

import java.util.Arrays;

public class Test04_Filter {

    public static void main(String[] args) {

        // 1.创建配置对象

        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");

        // 2. 创建sparkContext

        JavaSparkContext sc = new JavaSparkContext(conf);

        // 3. 编写代码

        JavaRDD<Integer> integerJavaRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4),2);

        JavaRDD<Integer> filterRDD = integerJavaRDD.filter(new Function<Integer, Boolean>() {

            @Override

            public Boolean call(Integer v1) throws Exception {

                return v1 % 2 == 0;

            }

        });

        filterRDD. collect().forEach(System.out::println);

        // 4. 关闭sc

        sc.stop();

    }

}

1.1.5 distinct()去重

1)功能说明:对内部的元素去重,并将去重后的元素放到新的RDD中。

2)代码实现

package com.shangjack.value;

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaRDD;

import org.apache.spark.api.java.JavaSparkContext;

import java.util.Arrays;

public class Test05_Distinct {

    public static void main(String[] args) {

        // 1.创建配置对象

        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");

        // 2. 创建sparkContext

        JavaSparkContext sc = new JavaSparkContext(conf);

        // 3. 编写代码

        JavaRDD<Integer> integerJavaRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6), 2);

        // 底层使用分布式分组去重  所有速度比较慢,但是不会OOM

        JavaRDD<Integer> distinct = integerJavaRDD.distinct();

        distinct. collect().forEach(System.out::println);

        // 4. 关闭sc

        sc.stop();

    }

}

注意:distinct会存在shuffle过程

1.1.6 sortBy()排序

1)功能说明

该操作用于排序数据。在排序之前,可以将数据通过f函数进行处理,之后按照f函数处理的结果进行排序,默认为正序排列。排序后新产生的RDD的分区数与原RDD的分区数一致。Spark的排序结果是全局有序。

2)需求说明:创建一个RDD,按照数字大小分别实现正序和倒序排序

3)代码实现:

package com.shangjack.value;

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaRDD;

import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.api.java.function.Function;

import java.util.Arrays;

public class Test6_SortBy {

    public static void main(String[] args) {

        // 1.创建配置对象

        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");

        // 2. 创建sparkContext

        JavaSparkContext sc = new JavaSparkContext(conf);

        // 3. 编写代码

        JavaRDD<Integer> integerJavaRDD = sc.parallelize(Arrays.asList(5, 8, 1, 11, 20), 2);

        // (1)泛型为以谁作为标准排序  (2) true为正序  (3) 排序之后的分区个数

        JavaRDD<Integer> sortByRDD = integerJavaRDD.sortBy(new Function<Integer, Integer>() {

            @Override

            public Integer call(Integer v1) throws Exception {

                return v1;

            }

        }, true, 2);

        sortByRDD. collect().forEach(System.out::println);

        // 4. 关闭sc

        sc.stop();

    }

}

1.2 Key-Value类型

1)创建包名:com.shangjack.keyvalue

要想使用Key-Value类型的算子首先需要使用特定的方法转换为PairRDD

package com.shangjack.keyValue;

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaPairRDD;

import org.apache.spark.api.java.JavaRDD;

import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.api.java.function.PairFunction;

import scala.Tuple2;

import java.util.Arrays;

public class Test01_pairRDD{

    public static void main(String[] args) {

        // 1.创建配置对象

        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");

        // 2. 创建sparkContext

        JavaSparkContext sc = new JavaSparkContext(conf);

        // 3. 编写代码

        JavaRDD<Integer> integerJavaRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4),2);

        JavaPairRDD<Integer, Integer> pairRDD = integerJavaRDD.mapToPair(new PairFunction<Integer, Integer, Integer>() {

            @Override

            public Tuple2<Integer, Integer> call(Integer integer) throws Exception {

                return new Tuple2<>(integer, integer);

            }

        });

        pairRDD. collect().forEach(System.out::println);

        // 4. 关闭sc

        sc.stop();

    }

}

1.2.1 mapValues()只对V进行操作

1)功能说明:针对于(K,V)形式的类型只对V进行操作

2)需求说明:创建一个pairRDD,并将value添加字符串"|||"

4)代码实现:

package com.shangjack.keyValue;

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaPairRDD;

import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.api.java.function.Function;

import scala.Tuple2;

import java.util.Arrays;

public class Test02_MapValues {

    public static void main(String[] args) {

        // 1.创建配置对象

        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");

        // 2. 创建sparkContext

        JavaSparkContext sc = new JavaSparkContext(conf);

        // 3. 编写代码

        JavaPairRDD<String, String> javaPairRDD = sc.parallelizePairs(Arrays.asList(new Tuple2<>("k", "v"), new Tuple2<>("k1", "v1"), new Tuple2<>("k2", "v2")));

        // 只修改value 不修改key

        JavaPairRDD<String, String> mapValuesRDD = javaPairRDD.mapValues(new Function<String, String>() {

            @Override

            public String call(String v1) throws Exception {

                return v1 + "|||";

            }

        });

        mapValuesRDD. collect().forEach(System.out::println);

        // 4. 关闭sc

        sc.stop();

    }

}

1.2.2 groupByKey()按照K重新分组

1)功能说明

groupByKey对每个key进行操作,但只生成一个seq,并不进行聚合。

该操作可以指定分区器或者分区数(默认使用HashPartitioner)

2)需求说明:统计单词出现次数

4)代码实现:

package com.shangjack.keyValue;

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaPairRDD;

import org.apache.spark.api.java.JavaRDD;

import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.api.java.function.Function;

import org.apache.spark.api.java.function.PairFunction;

import scala.Tuple2;

import java.util.Arrays;

public class Test03_GroupByKey {

    public static void main(String[] args) {

        // 1.创建配置对象

        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");

        // 2. 创建sparkContext

        JavaSparkContext sc = new JavaSparkContext(conf);

        // 3. 编写代码

        JavaRDD<String> integerJavaRDD = sc.parallelize(Arrays.asList("hi","hi","hello","spark" ),2);

        // 统计单词出现次数

        JavaPairRDD<String, Integer> pairRDD = integerJavaRDD.mapToPair(new PairFunction<String, String, Integer>() {

            @Override

            public Tuple2<String, Integer> call(String s) throws Exception {

                return new Tuple2<>(s, 1);

            }

        });

        // 聚合相同的key

        JavaPairRDD<String, Iterable<Integer>> groupByKeyRDD = pairRDD.groupByKey();

        // 合并值

        JavaPairRDD<String, Integer> result = groupByKeyRDD.mapValues(new Function<Iterable<Integer>, Integer>() {

            @Override

            public Integer call(Iterable<Integer> v1) throws Exception {

                Integer sum = 0;

                for (Integer integer : v1) {

                    sum += integer;

                }

                return sum;

            }

        });

        result. collect().forEach(System.out::println);

        // 4. 关闭sc

        sc.stop();

    }

}}

1.2.3 reduceByKey()按照K聚合V

1)功能说明:该操作可以将RDD[K,V]中的元素按照相同的K对V进行聚合。其存在多种重载形式,还可以设置新RDD的分区数。

2)需求说明:统计单词出现次数

3)代码实现:

package com.shangjack.keyValue;

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaPairRDD;

import org.apache.spark.api.java.JavaRDD;

import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.api.java.function.Function;

import org.apache.spark.api.java.function.Function2;

import org.apache.spark.api.java.function.PairFunction;

import scala.Tuple2;

import java.util.Arrays;

public class Test04_ReduceByKey {

    public static void main(String[] args) {

        // 1.创建配置对象

        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");

        // 2. 创建sparkContext

        JavaSparkContext sc = new JavaSparkContext(conf);

        // 3. 编写代码

        JavaRDD<String> integerJavaRDD = sc.parallelize(Arrays.asList("hi","hi","hello","spark" ),2);

        // 统计单词出现次数

        JavaPairRDD<String, Integer> pairRDD = integerJavaRDD.mapToPair(new PairFunction<String, String, Integer>() {

            @Override

            public Tuple2<String, Integer> call(String s) throws Exception {

                return new Tuple2<>(s, 1);

            }

        });

        // 聚合相同的key

        JavaPairRDD<String, Integer> result = pairRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {

            @Override

            public Integer call(Integer v1, Integer v2) throws Exception {

                return v1 + v2;

            }

        });

        result. collect().forEach(System.out::println);

        // 4. 关闭sc

        sc.stop();

    }

}

1.2.4 reduceByKey和groupByKey区别

1)reduceByKey:按照key进行聚合,在shuffle之前有combine(预聚合)操作,返回结果是RDD[K,V]。

2)groupByKey:按照key进行分组,直接进行shuffle。

3)开发指导:在不影响业务逻辑的前提下,优先选用reduceByKey。求和操作不影响业务逻辑,求平均值影响业务逻辑。影响业务逻辑时建议先对数据类型进行转换再合并。

package com.shangjack.keyValue;

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaPairRDD;

import org.apache.spark.api.java.JavaRDD;

import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.api.java.function.Function;

import org.apache.spark.api.java.function.Function2;

import org.apache.spark.api.java.function.PairFunction;

import scala.Tuple2;

import java.util.Arrays;

public class Test06_ReduceByKeyAvg {

    public static void main(String[] args) throws InterruptedException {

        // 1.创建配置对象

        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");

        // 2. 创建sparkContext

        JavaSparkContext sc = new JavaSparkContext(conf);

        // 3. 编写代码

        JavaPairRDD<String, Integer> javaPairRDD = sc.parallelizePairs(Arrays.asList(new Tuple2<>("hi", 96), new Tuple2<>("hi", 97), new Tuple2<>("hello", 95), new Tuple2<>("hello", 195)));

        // ("hi",(96,1))

        JavaPairRDD<String, Tuple2<Integer, Integer>> tuple2JavaPairRDD = javaPairRDD.mapValues(new Function<Integer, Tuple2<Integer, Integer>>() {

            @Override

            public Tuple2<Integer, Integer> call(Integer v1) throws Exception {

                return new Tuple2<>(v1, 1);

            }

        });

        // 聚合RDD

        JavaPairRDD<String, Tuple2<Integer, Integer>> reduceRDD = tuple2JavaPairRDD.reduceByKey(new Function2<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>() {

            @Override

            public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> v1, Tuple2<Integer, Integer> v2) throws Exception {

                return new Tuple2<>(v1._1 + v2._1, v1._2 + v2._2);

            }

        });

        // 相除

        JavaPairRDD<String, Double> result = reduceRDD.mapValues(new Function<Tuple2<Integer, Integer>, Double>() {

            @Override

            public Double call(Tuple2<Integer, Integer> v1) throws Exception {

                return (new Double(v1._1) / v1._2);

            }

        });

        result. collect().forEach(System.out::println);

        // 4. 关闭sc

        sc.stop();

    }

}

1.2.5 sortByKey()按照K进行排序

1)功能说明

在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD

2)需求说明:创建一个pairRDD,按照key的正序和倒序进行排序

3)代码实现:

package com.shangjack.keyValue;

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaPairRDD;

import org.apache.spark.api.java.JavaSparkContext;

import scala.Tuple2;

import java.util.Arrays;

public class Test05_SortByKey {

    public static void main(String[] args) {

        // 1.创建配置对象

        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");

        // 2. 创建sparkContext

        JavaSparkContext sc = new JavaSparkContext(conf);

        // 3. 编写代码

        JavaPairRDD<Integer, String> javaPairRDD = sc.parallelizePairs(Arrays.asList(new Tuple2<>(4, "a"), new Tuple2<>(3, "c"), new Tuple2<>(2, "d")));

        // 填写布尔类型选择正序倒序

        JavaPairRDD<Integer, String> pairRDD = javaPairRDD.sortByKey(false);

        pairRDD. collect().forEach(System.out::println);

        // 4. 关闭sc

        sc.stop();

    }

}

2 Action行动算子

行动算子是触发了整个作业的执行。因为转换算子都是懒加载,并不会立即执行。

1)创建包名:com.shangjack.action

2.1 collect()以数组的形式返回数据集

1)功能说明:在驱动程序中,以数组Array的形式返回数据集的所有元素。

注意:所有的数据都会被拉取到Driver端,慎用

2)需求说明:创建一个RDD,并将RDD内容收集到Driver端打印

package com.shangjack.action;

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaRDD;

import org.apache.spark.api.java.JavaSparkContext;

import java.util.Arrays;

import java.util.List;

public class Test01_Collect {

    public static void main(String[] args) {

        // 1.创建配置对象

        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");

        // 2. 创建sparkContext

        JavaSparkContext sc = new JavaSparkContext(conf);

        // 3. 编写代码

        JavaRDD<Integer> integerJavaRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4),2);

        List<Integer> collect = integerJavaRDD.collect();

        for (Integer integer : collect) {

            System.out.println(integer);

        }

        // 4. 关闭sc

        sc.stop();

    }

}

2.2 count()返回RDD中元素个数

1)功能说明:返回RDD中元素的个数

3)需求说明:创建一个RDD,统计该RDD的条数

package com.shangjack.action;

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaRDD;

import org.apache.spark.api.java.JavaSparkContext;

import java.util.Arrays;

public class Test02_Count {

    public static void main(String[] args) {

        // 1.创建配置对象

        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");

        // 2. 创建sparkContext

        JavaSparkContext sc = new JavaSparkContext(conf);

        // 3. 编写代码

        JavaRDD<Integer> integerJavaRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4),2);

        long count = integerJavaRDD.count();

        System.out.println(count);

        // 4. 关闭sc

        sc.stop();

    }

}

2.3 first()返回RDD中的第一个元素

1)功能说明:返回RDD中的第一个元素

2)需求说明:创建一个RDD,返回该RDD中的第一个元素

package com.shangjack.action;

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaRDD;

import org.apache.spark.api.java.JavaSparkContext;

import java.util.Arrays;

public class Test03_First {

    public static void main(String[] args) {

        // 1.创建配置对象

        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");

        // 2. 创建sparkContext

        JavaSparkContext sc = new JavaSparkContext(conf);

        // 3. 编写代码

        JavaRDD<Integer> integerJavaRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4),2);

        Integer first = integerJavaRDD.first();

        System.out.println(first);

        // 4. 关闭sc

        sc.stop();

    }

}

2.4 take()返回由RDD前n个元素组成的数组

1)功能说明:返回一个由RDD的前n个元素组成的数组

2)需求说明:创建一个RDD,取出前两个元素

package com.shangjack.action;

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaRDD;

import org.apache.spark.api.java.JavaSparkContext;

import java.util.Arrays;

import java.util.List;

public class Test04_Take {

    public static void main(String[] args) {

        // 1.创建配置对象

        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");

        // 2. 创建sparkContext

        JavaSparkContext sc = new JavaSparkContext(conf);

        // 3. 编写代码

        JavaRDD<Integer> integerJavaRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4),2);

        List<Integer> list = integerJavaRDD.take(3);

        list.forEach(System.out::println);

        // 4. 关闭sc

        sc.stop();

    }

}

2.5 countByKey()统计每种key的个数

1)功能说明:统计每种key的个数

2)需求说明:创建一个PairRDD,统计每种key的个数

package com.shangjack.action;

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaPairRDD;

import org.apache.spark.api.java.JavaSparkContext;

import scala.Tuple2;

import java.util.Arrays;

import java.util.Map;

public class Test05_CountByKey {

    public static void main(String[] args) {

        // 1.创建配置对象

        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");

        // 2. 创建sparkContext

        JavaSparkContext sc = new JavaSparkContext(conf);

        // 3. 编写代码

        JavaPairRDD<String, Integer> pairRDD = sc.parallelizePairs(Arrays.asList(new Tuple2<>("a", 8), new Tuple2<>("b", 8), new Tuple2<>("a", 8), new Tuple2<>("d", 8)));

        Map<String, Long> map = pairRDD.countByKey();

        System.out.println(map);

        // 4. 关闭sc

        sc.stop();

    }

}

2.6 save相关算子

1)saveAsTextFile(path)保存成Text文件

功能说明:将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本

2)saveAsObjectFile(path) 序列化成对象保存到文件

功能说明:用于将RDD中的元素序列化成对象,存储到文件中。

3)代码实现

package com.shangjack.action;

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaPairRDD;

import org.apache.spark.api.java.JavaRDD;

import org.apache.spark.api.java.JavaSparkContext;

import scala.Tuple2;

import java.util.Arrays;

public class Test06_Save {

    public static void main(String[] args) {

        // 1.创建配置对象

        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");

        // 2. 创建sparkContext

        JavaSparkContext sc = new JavaSparkContext(conf);

        // 3. 编写代码

        JavaRDD<Integer> integerJavaRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4),2);

        integerJavaRDD.saveAsTextFile("output");

        integerJavaRDD.saveAsObjectFile("output1");

        // 4. 关闭sc

        sc.stop();

    }

}

2.7 foreach()遍历RDD中每一个元素

2)需求说明:创建一个RDD,对每个元素进行打印

package com.shangjack.action;

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaRDD;

import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.api.java.function.VoidFunction;

import java.util.Arrays;

public class Test07_Foreach {

    public static void main(String[] args) {

        // 1.创建配置对象

        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");

        // 2. 创建sparkContext

        JavaSparkContext sc = new JavaSparkContext(conf);

        // 3. 编写代码

        JavaRDD<Integer> integerJavaRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4),4);

        integerJavaRDD.foreach(new VoidFunction<Integer>() {

            @Override

            public void call(Integer integer) throws Exception {

                System.out.println(integer);

            }

        });

        // 4. 关闭sc

        sc.stop();

    }

}

2.8 foreachPartition ()遍历RDD中每一个分区

package com.shangjack.spark.action;

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaRDD;

import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.api.java.function.VoidFunction;

import java.util.Arrays;

import java.util.Iterator;

public class Test08_ForeachPartition {

    public static void main(String[] args) {

        // 1. 创建配置对象

        SparkConf conf = new SparkConf().setAppName("core").setMaster("local[*]");

        // 2. 创建sc环境

        JavaSparkContext sc = new JavaSparkContext(conf);

        // 3. 编写代码

        JavaRDD<Integer> parallelize = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6), 2);

        // 多线程一起计算   分区间无序  单个分区有序

        parallelize.foreachPartition(new VoidFunction<Iterator<Integer>>() {

            @Override

            public void call(Iterator<Integer> integerIterator) throws Exception {

                // 一次处理一个分区的数据

                while (integerIterator.hasNext()) {

                    Integer next = integerIterator.next();

                    System.out.println(next);

                }

            }

        });

        // 4. 关闭sc

        sc.stop();

    }

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/133213.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

python Flask框架,调用MobileNetV2图像分类模型,实现前端上传图像分类

python Flask框架&#xff0c;调用MobileNetV2图像分类模型&#xff0c;实现前端上传图像分类 今天博主介绍一个图像分类的小项目 基于flask 和mobileNetV2模型的前端图像分类项目 环境配置如下&#xff1a; python版本3.7.6 安装库的版本如下&#xff1a; tensorflow 2.11.…

LabVIEW中NIGPIB设备与驱动程序不相关的MAX报错

LabVIEW中NIGPIB设备与驱动程序不相关的MAX报错 当插入GPIB-USB设备时&#xff0c;看到了NI MAX中列出该设备&#xff0c;但却显示了黄色警告指示&#xff0c;并且指出Windows没有与您的设备相关的驱动程序。 解决方案 需要安装能兼容的NI-488.2驱动程序。 通过交叉参考以下有…

STM32--时钟树

一、什么是时钟&#xff1f; 时钟是单片机的脉搏&#xff0c;是系统工作的同步节拍。单片机上至CPU&#xff0c;下至总线外设&#xff0c;它们工作时序的配合&#xff0c;都需要一个同步的时钟信号来统一指挥。时钟信号是周期性的脉冲信号。 二、什么是时钟树&#xff1f; S…

“Git实践指南:深入探索开发测试上线、分支管理与标签“

文章目录 引言一、Git的分支的使用1.分支2.标签3.分支与标签的关系4. 分支在实际中的作用5. 四个环境以及各自的功能特点6. 分支策略分支应用场景 二、Git的标签3.1 标签的基本使用3.3 标签的共享与推送 总结 引言 在现代软件开发中&#xff0c;版本控制是一个关键的环节&…

2023年【危险化学品经营单位主要负责人】免费试题及危险化学品经营单位主要负责人证考试

题库来源&#xff1a;安全生产模拟考试一点通公众号小程序 2023年危险化学品经营单位主要负责人免费试题为正在备考危险化学品经营单位主要负责人操作证的学员准备的理论考试专题&#xff0c;每个月更新的危险化学品经营单位主要负责人证考试祝您顺利通过危险化学品经营单位主…

【扩散模型】万字长文全面理解与应用Stable Diffusion

万字长文全面理解与应用Stable Diffusion 1. Stable Diffusion简介1.1 基本概念1.2 主体结构1.3 训练细节1.4 模型评测1.5 模型应用1.6 模型版本1.7 其他类型的条件生成模型1.8 使用DreamBooth进行微调 2. 实战Stable Diffusion2.1 环境准备2.2 从文本生成图像2.3 Stable Diffu…

LIBGDX实时绘制字符、实时绘制中文

LIBGDX实时绘制字符、实时绘制中文 转自&#xff1a;https://lingkang.top/archives/libgdx-shi-shi-hui-zhi-zi-fu 注意&#xff0c;相比于贴图字体&#xff0c;实时绘制会有一定的失真、模糊 Maven项目依赖&#xff1a; <properties><maven.compiler.source>…

抢量双11!抖音商城「官方立减」 缘何成为“爆单神器”?

10月20日抖音商城双11好物节正式开跑&#xff0c;仅仅三天&#xff0c;抖音商城整体GMV对比去年同期提升了200%&#xff0c;而在开跑一周后&#xff0c;一些品牌的销售额已经超过了今年整个618&#xff0c;可谓增势迅猛。其中&#xff0c;平台官方特别推出的「官方立减」玩法&a…

基于51单片机的篮球比赛计分器积分器

wx供重浩&#xff1a;创享日记 对话框发送&#xff1a;单片机篮球 获取完整源程序仿真源文件原理图文件论文报告等 基于51单片机的篮球计分器 由STC89C51单片机数码管显示模块按键模块电源模块构成 具体功能&#xff1a; &#xff08;1&#xff09;能记录单节比赛的比赛时间&am…

ETW HOOK原理探析

ETW HOOK研究 文章目录 ETW HOOK研究前言原理探究内核开启ETW日志HOOK ETW修改ETW日志上下文代理GetCpuClock函数寻找SSDT和SSDT Shadow 总结参考 前言 关于ETW是什么我就不多说了&#xff0c;可以通过微软的相关文档了解到。据网上得知这项技术最早被披露于2345的驱动中&…

【字符串】【双指针翻转字符串+快慢指针】Leetcode 151 反转字符串中单词【好】

【字符串】【双指针翻转字符串快慢指针】Leetcode 151 反转字符串中单词 解法1 双指针翻转字符串快慢指针更新数组大小 ---------------&#x1f388;&#x1f388;题目链接&#x1f388;&#x1f388;------------------- ---------------&#x1f388;&#x1f388;解答链接…

UI自动化测试 | Jenkins配置优化

前一段时间帮助团队搭建了UI自动化环境&#xff0c;这里将Jenkins环境的一些配置分享给大家。 背景&#xff1a; 团队下半年的目标之一是实现自动化测试&#xff0c;这里要吐槽一下&#xff0c;之前开发的测试平台了&#xff0c;最初的目的是用来做接口自动化测试和性能测试&…

MySQL 数据库查询与数据操作:使用 ORDER BY 排序和 DELETE 删除记录

使用 ORDER BY 进行排序 使用 ORDER BY 语句按升序或降序对结果进行排序。 ORDER BY 关键字默认按升序排序。要按降序排序结果&#xff0c;使用 DESC 关键字。 示例按名称按字母顺序排序结果&#xff1a; import mysql.connectormydb mysql.connector.connect(host"l…

Linux 内核定时器

一个人总要走陌生的路&#xff0c;看陌生的风景&#xff0c;听陌生的歌&#xff0c;然后在某个不经意的瞬间&#xff0c;你会发现&#xff0c;原本费尽心机想要忘记的事情真的就这么忘记了。 ----小新 一、引言 Linux内核定时器是一种用于在特定时间间隔后触发特定事件的重要组…

数据结构之AVL树

map/multimap/set/multiset这几个容器有个共同点是: 其底层都是按照二叉搜索树来实现的,但是普通的二叉搜索树有其自身的缺陷, 假如往树中插入的元素有序或者接近有序, 二叉搜索树就会退化成单支树, 时间复杂度会退化成O(N),因此map、set等关联式容器的底层结构是对二叉树进行了…

JavaScript基础入门04

目录 1.WebAPI 背景知识 1.1什么是 WebAPI 1.2什么是 API 2.DOM 基本概念 2.1什么是 DOM 2.2DOM 树 3.获取元素 3.1querySelector 3.2querySelectorAll 4.事件初识 4.1基本概念 4.2事件三要素 4.3简单示例 5.操作元素 5.1获取/修改元素内容 5.2获取/修改元素属性…

记事本简单运行java代码,理解程序运行

1.记事本创建一个文件, 把后缀.txt改为.java 如果没有显示尾缀, 勾选出文件扩展名 2.在文件里面编辑java代码并保存 3.在当前目录打开cmd 4.执行 javac Test.java 会生成好编译的.class文件 5.执行下面代码, 就成功得到j编写ava打印的代码 java Test 6.注意上面的中文在cmd中…

Windows下安装Anaconda5.3.1+Python3.8+TensorFlow2.13.0-CPU版本总结

Python3.8安装可以参考博文https://janus.blog.csdn.net/article/details/55274849 进行安装即可。 【1】Anaconda 清华的开源软件镜像站&#xff1a;https://mirrors.tuna.tsinghua.edu.cn/anaconda/archive/下载&#xff0c;这里选择的是5.3.1版本。 然后正常安装就可以&am…

C++17中std::optional的使用

模版类std::optional管理一个可选的(optional)存储值(contained value)&#xff0c;即可能存在也可能不存在的值。std::optional的一个常见用例是存储可能失败的函数的返回值。与其它方法相反(例如std::pair<T, bool>),std::optional可以很好地处理构造成本高昂的对象&am…

帧同步的思想与FIFO复位

02基于FDMA三缓存构架_哔哩哔哩_bilibili 图像从外部传输进来的时候&#xff0c;会产生若干延迟&#xff0c;可能会出现各种各样的问题&#xff08;断帧等&#xff09;&#xff0c;此时可以通过VS信号清空FIFO进行复位。 这个过程中的复位信号可能需要拓展&#xff0c;这是因为…