Spark 从入门到精通

Spark 从入门到精通

环境搭建

准备工作

创建安装目录
mkdir /opt/soft
cd /opt/soft
下载scala
wget https://downloads.lightbend.com/scala/2.13.10/scala-2.13.10.tgz -P /opt/soft
解压scala
tar -zxvf scala-2.13.10.tgz
修改scala目录名称
mv scala-2.13.10 scala-2
下载spark
wget https://dlcdn.apache.org/spark/spark-3.4.0/spark-3.4.0-bin-hadoop3-scala2.13.tgz -P /opt/soft
解压spark
tar -zxvf spark-3.4.0-bin-hadoop3-scala2.13.tgz 
修改目录名称
mv spark-3.4.0-bin-hadoop3-scala2.13 spark3
修改环境遍历
vim /etc/profile
export JAVA_HOME=/opt/soft/jdk8
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar

export ZOOKEEPER_HOME=/opt/soft/zookeeper

export HADOOP_HOME=/opt/soft/hadoop3

export HADOOP_INSTALL=${HADOOP_HOME}
export HADOOP_MAPRED_HOME=${HADOOP_HOME}
export HADOOP_COMMON_HOME=${HADOOP_HOME}
export HADOOP_HDFS_HOME=${HADOOP_HOME}
export YARN_HOME=${HADOOP_HOME}
export HADOOP_CONF_DIR=${HADOOP_HOME}/etc/hadoop

export HDFS_NAMENODE_USER=root
export HDFS_DATANODE_USER=root
export HDFS_SECONDARYNAMENODE_USER=root
export YARN_RESOURCEMANAGER_USER=root
export YARN_NODEMANAGER_USER=root

export HIVE_HOME=/opt/soft/hive3
export HCAT_HOME=/opt/soft/hive3/hcatalog

export SQOOP_HOME=/opt/soft/sqoop-1

export FLUME_HOME=/opt/soft/flume

export HBASE_HOME=/opt/soft/hbase2

export PHOENIX_HOME=/opt/soft/phoenix

export SCALA_HOME=/opt/soft/scala-2

export SPARK_HOME=/opt/soft/spark3
export SPARKPYTHON=/opt/soft/spark3/python

export PATH=$PATH:$JAVA_HOME/bin:$ZOOKEEPER_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$HIVE_HOME/bin:$HCAT_HOME/bin:$SQOOP_HOME/bin:$FLUME_HOME/bin:$HBASE_HOME/bin:$PHOENIX_HOME/bin:$SCALA_HOME/bin:$SPARK_HOME/bin:$SPARK_HOME/sbin:$SPARKPYTHON
source /etc/profile

Local模式

scala java
启动
spark-shell

sparkl local spark-shell

页面地址:http://spark01:4040

![sparkl local spark-shell

退出
:quit

sparkl local spark-shell

pyspark
启动
pyspark

spark local pyspark

页面地址:http://spark01:4040

spark local pyspark

退出
quit() or Ctrl-D

spark local pyspark

本地模式提交应用

在spark目录下执行

bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master local[2] \
./examples/jars/spark-examples_2.13-3.4.0.jar \
10
  1. –class表示要执行程序的主类,此处可以更换为咱们自己写的应用程序
  2. –master local[2] 部署模式,默认为本地模式,数字表示分配的虚拟CPU核数量
  3. spark-examples_2.13-3.4.0.jar 运行的应用类所在的jar包,实际使用时,可以设定为咱们自己打的jar包
  4. 数字10表示程序的入口参数,用于设定当前应用的任务数量

Standalone模式

编写核心配置文件

cont目录下

cd /opt/soft/spark3/conf
cp spark-env.sh.template spark-env.sh
vim spark-env.sh
export JAVA_HOME=/opt/soft/jdk8
export HADOOP_HOME=/opt/soft/hadoop3
export HADOOP_CONF_DIR=/opt/soft/hadoop3/etc/hadoop
export JAVA_LIBRAY_PATH=/opt/soft/hadoop3/lib/native

export SPARK_MASTER_HOST=spark01
export SPARK_MASTER_PORT=7077

export SPARK_WORKER_MEMORY=4g
export SPARK_WORKER_CORES=4
export SPARK_MASTER_WEBUI_PORT=6633

编辑slaves
cp workers.template workers
vim workers
spark01
spark02
spark03

配置历史日志
cp spark-defaults.conf.template spark-defaults.conf
vim spark-defaults.conf
spark.eventLog.enabled           true
spark.eventLog.dir               hdfs://lihaozhe/spark-log
hdfs dfs -mkdir /spark-log
vim spark-env.sh
export SPARK_HISTORY_OPTS="
-Dspark.history.ui.port=18080 
-Dspark.history.retainedApplications=30 
-Dspark.history.fs.logDirectory=hdfs://lihaozhe/spark-log"
修改启动文件名称
mv sbin/start-all.sh sbin/start-spark.sh
mv sbin/stop-all.sh sbin/stop-spark.sh
分发搭配其他节点
scp -r /opt/soft/spark3 root@spark02:/opt/soft
scp -r /opt/soft/spark3 root@spark03:/opt/soft
scp -r /etc/profile root@spark02:/etc
scp -r /etc/profile root@spark03:/etc

在其它节点刷新环境遍历

source /etc/profile
启动
start-spark.sh
start-history-server.sh
webui

http://spark01:6633

spark standlone webui

http://spark01:18080

spark standlone history server

提交作业到集群
bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://spark01:7077 \
./examples/jars/spark-examples_2.13-3.4.0.jar \
10

spark standlone webui

spark standlone history server

HA模式

编写核心配置文件

cont目录下

cd /opt/soft/spark3/conf
cp spark-env.sh.template spark-env.sh
vim spark-env.sh
export JAVA_HOME=/opt/soft/jdk8
export HADOOP_HOME=/opt/soft/hadoop3
export HADOOP_CONF_DIR=/opt/soft/hadoop3/etc/hadoop
export JAVA_LIBRAY_PATH=/opt/soft/hadoop3/lib/native

SPARK_DAEMON_JAVA_OPTS="
-Dspark.deploy.recoveryMode=ZOOKEEPER 
-Dspark.deploy.zookeeper.url=spark01:2181,spark02:2181,spark03:2181 
-Dspark.deploy.zookeeper.dir=/spark3"

export SPARK_WORKER_MEMORY=4g
export SPARK_WORKER_CORES=4
export SPARK_MASTER_WEBUI_PORT=6633

hdfs dfs -mkdir /spark3
编辑slaves
cp workers.template workers
vim workers
spark01
spark02
spark03

配置历史日志
cp spark-defaults.conf.template spark-defaults.conf
vim spark-defaults.conf
spark.eventLog.enabled           true
spark.eventLog.dir               hdfs://lihaozhe/spark-log
hdfs dfs -mkdir /spark-log
vim spark-env.sh
export SPARK_HISTORY_OPTS="
-Dspark.history.ui.port=18080 
-Dspark.history.retainedApplications=30 
-Dspark.history.fs.logDirectory=hdfs://lihaozhe/spark-log"
修改启动文件名称
mv sbin/start-all.sh sbin/start-spark.sh
mv sbin/stop-all.sh sbin/stop-spark.sh
分发搭配其他节点
scp -r /opt/soft/spark3 root@spark02:/opt/soft
scp -r /opt/soft/spark3 root@spark03:/opt/soft
scp -r /etc/profile root@spark02:/etc
scp -r /etc/profile root@spark03:/etc

在其它节点刷新环境遍历

source /etc/profile
启动
start-spark.sh
start-history-server.sh
webui

http://spark01:6633

spark ha webui

http://spark01:18080

spark ha history server

提交作业到集群
bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://spark01:7077 \
./examples/jars/spark-examples_2.13-3.4.0.jar \
10
提交作业到Yarn
bin/spark-submit --master yarn \
--class  org.apache.spark.examples.SparkPi ./examples/jars/spark-examples_2.13-3.4.0.jar 10

spark ha webui

spark ha history server

spark-code

spark-core

pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.lihaozhe</groupId>
    <artifactId>spark-code</artifactId>
    <version>1.0.0</version>

    <properties>
        <jdk.version>1.8</jdk.version>
        <scala.version>2.13.10</scala.version>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>

        <commons-lang3.version>3.12.0</commons-lang3.version>
        <java-testdata-generator.version>1.1.2</java-testdata-generator.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
        <dependency>
            <groupId>com.github.binarywang</groupId>
            <artifactId>java-testdata-generator</artifactId>
            <version>${java-testdata-generator.version}</version>
        </dependency>
        <!-- spark-core -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.13</artifactId>
            <version>3.4.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.13</artifactId>
            <version>3.4.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.13</artifactId>
            <version>3.4.0</version>
        </dependency>
        <!-- junit-jupiter-api -->
        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter-api</artifactId>
            <version>5.9.3</version>
            <scope>test</scope>
        </dependency>
        <!-- junit-jupiter-engine -->
        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter-engine</artifactId>
            <version>5.9.3</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.26</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-slf4j-impl</artifactId>
            <version>2.14.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>3.3.5</version>
        </dependency>
        <!-- commons-pool2 -->
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-pool2</artifactId>
            <version>2.11.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-jdbc</artifactId>
            <version>3.1.3</version>
        </dependency>
        <!-- commons-lang3 -->
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>${commons-lang3.version}</version>
        </dependency>
    </dependencies>
    <build>
        <finalName>${project.artifactId}</finalName>
        <!--<outputDirectory>../package</outputDirectory>-->
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.11.0</version>
                <configuration>
                    <!-- 设置编译字符编码 -->
                    <encoding>UTF-8</encoding>
                    <!-- 设置编译jdk版本 -->
                    <source>${jdk.version}</source>
                    <target>${jdk.version}</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-clean-plugin</artifactId>
                <version>3.2.0</version>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-resources-plugin</artifactId>
                <version>3.3.1</version>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-war-plugin</artifactId>
                <version>3.3.2</version>
            </plugin>
            <!-- 编译级别 -->
            <!-- 打包的时候跳过测试junit begin -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.22.2</version>
                <configuration>
                    <skip>true</skip>
                </configuration>
            </plugin>
            <!-- 该插件用于将Scala代码编译成class文件 -->
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>4.8.1</version>
                <configuration>
                    <scalaCompatVersion>2.13</scalaCompatVersion>
                    <scalaVersion>2.13.10</scalaVersion>
                </configuration>
                <executions>
                    <execution>
                        <goals>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                    <execution>
                        <id>compile-scala</id>
                        <phase>compile</phase>
                        <goals>
                            <goal>add-source</goal>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                    <execution>
                        <id>test-compile-scala</id>
                        <phase>test-compile</phase>
                        <goals>
                            <goal>add-source</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.5.0</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

hdfs-conf

在 resources 目录下存放 hdfs 核心配置文件 core-site.xml 和hdfs-site.xml

被引入的hdfs配置文件为测试集群配置文件

由于生产环境与测试环境不同,项目打包的时候排除hdfs配置文件

rdd
数据集方式构建RDD
package com.lihaozhe.course01;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

import java.util.Arrays;
import java.util.List;

/**
 * 借助并行数据集 Parallelized Collections 构建 RDD
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/12 下午5:23
 */
public class JavaDemo01 {
    public static void main(String[] args) {
        String appName = "rdd";
        // SparkConf conf = new SparkConf().setAppName(appName).setMaster("local");
        // spark基础配置
        SparkConf conf = new SparkConf().setAppName(appName);
        // 本地运行
        conf.setMaster("local");
        try (JavaSparkContext sc = new JavaSparkContext(conf)) {
            List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
            // 借助并行数据集 Parallelized Collections 构建 RDD
            JavaRDD<Integer> javaRDD = sc.parallelize(data);
            List<Integer> result = javaRDD.collect();
            result.forEach(System.out::println);
        }
    }
}

package com.lihaozhe.course01

import org.apache.spark.{SparkConf, SparkContext}

/**
 * 借助并行数据集 Parallelized Collections 构建 RDD
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/12 下午4:59
 */
object ScalaDemo01 {
  def main(args: Array[String]): Unit = {
    val appName: String = "rdd"
    // val conf = new SparkConf().setAppName(appName).setMaster("local")
    // spark基础配置
    val conf = new SparkConf().setAppName(appName)
    // 本地运行
    conf.setMaster("local")
    // 构建 SparkContext spark 上下文
    val sc = new SparkContext(conf)

    val data = Array(1, 2, 3, 4, 5)
    // 从集合中创建 RDD
    // Parallelized Collections
    val distData = sc.parallelize(data)
    data.foreach(println)
  }
}

本地文件构建RDD

words.txt

linux shell
java mysql jdbc
hadoop hdfs mapreduce
hive presto
flume kafka
hbase phoenix
scala spark
sqoop flink
linux shell
java mysql jdbc
hadoop hdfs mapreduce
hive presto
flume kafka
hbase phoenix
scala spark
sqoop flink
base phoenix
scala spark
sqoop flink
linux shell
java mysql jdbc
hadoop hdfs mapreduce
java mysql jdbc
hadoop hdfs mapreduce
hive presto
flume kafka
hbase phoenix
scala spark
java mysql jdbc
hadoop hdfs mapreduce
java mysql jdbc
hadoop hdfs mapreduce
hive presto

package com.lihaozhe.course01;

import org.apache.spark.Partition;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

import java.util.List;


/**
 * 从文件中读取数据集创建 RDD
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/12 下午5:23
 */
public class JavaDemo02 {
    public static void main(String[] args) {
        String appName = "rdd";
        // SparkConf conf = new SparkConf().setAppName(appName).setMaster("local");
        // spark基础配置
        SparkConf conf = new SparkConf().setAppName(appName);
        // 本地运行
        conf.setMaster("local");
        try (JavaSparkContext sc = new JavaSparkContext(conf)) {
            // 从文件中读取数据集创建 RDD
            JavaRDD<String> javaRDD = sc.textFile("file:///home/lsl/IdeaProjects/spark-code/words.txt");
            List<String> lines = javaRDD.collect();
            lines.forEach(System.out::println);
        }
    }
}

package com.lihaozhe.course01

import org.apache.spark.{SparkConf, SparkContext}

/**
 * 从文件中读取数据集创建 RDD
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/12 下午4:59
 */
object ScalaDemo02 {
  def main(args: Array[String]): Unit = {
    val appName: String = "rdd"
    // val conf = new SparkConf().setAppName(appName).setMaster("local")
    // spark基础配置
    val conf = new SparkConf().setAppName(appName)
    // 本地运行
    conf.setMaster("local")
    // 构建 SparkContext spark 上下文
    val sc = new SparkContext(conf)

    // 从文件中读取数据集创建 RDD
    val distFile = sc.textFile("file:///home/lsl/IdeaProjects/spark-code/words.txt")
    // 遍历输出
    distFile.foreach(println)
  }
}

HDFS文件构建RDD
package com.lihaozhe.course01;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

import java.util.List;


/**
 * 从文件中读取数据集创建 RDD
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/12 下午5:23
 */
public class JavaDemo03 {
    public static void main(String[] args) {
        System.setProperty("HADOOP_USER_NAME", "root");
        String appName = "rdd";
        // SparkConf conf = new SparkConf().setAppName(appName).setMaster("local");
        // spark基础配置
        SparkConf conf = new SparkConf().setAppName(appName);
        // 本地运行
        conf.setMaster("local");
        try (JavaSparkContext sc = new JavaSparkContext(conf)) {
            // 从文件中读取数据集创建 RDD
            // JavaRDD<String> javaRDD = sc.textFile("hdfs://lihaozhe/data/words.txt");
            JavaRDD<String> javaRDD = sc.textFile("data/words.txt");
            List<String> lines = javaRDD.collect();
            lines.forEach(System.out::println);
        }
    }
}

package com.lihaozhe.course01

import org.apache.spark.{SparkConf, SparkContext}

/**
 * 从文件中读取数据集创建 RDD
 *
 * @author 李昊哲
 * @version 1.0.0  2023/5/12 下午4:59
 */
object ScalaDemo03 {
  def main(args: Array[String]): Unit = {
    System.setProperty("HADOOP_USER_NAME", "root")
    val appName: String = "rdd"
    // val conf = new SparkConf().setAppName(appName).setMaster("local")
    // spark基础配置
    val conf = new SparkConf().setAppName(appName)
    // 本地运行
    conf.setMaster("local")
    // 构建 SparkContext spark 上下文
    val sc = new SparkContext(conf)

    // 从文件中读取数据集创建 RDD
    // val distFile = sc.textFile("hdfs://lihaozhe/data/words.txt")
    // hdfs://lihaozhe/user/root/data/words.txt
    val distFile = sc.textFile("hdata/words.txt")
    // 遍历输出
    distFile.foreach(println)
  }
}

WordCount
JavaWordCount
package com.lihaozhe.course02;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;

import java.util.Arrays;
import java.util.List;

/**
 * @author 李昊哲
 * @version 1.0.0  2023/5/12 下午9:37
 */
public class JavaWordCount {
    public static void main(String[] args) {
        System.setProperty("HADOOP_USER_NAME", "root");
        String appName = "rdd";
        // SparkConf conf = new SparkConf().setAppName(appName).setMaster("local");
        // spark基础配置
        SparkConf conf = new SparkConf().setAppName(appName);
        // 本地运行
        conf.setMaster("local");
        try (JavaSparkContext sc = new JavaSparkContext(conf)) {
            // 从文件中读取数据集创建 RDD
            JavaRDD<String> javaRDD = sc.textFile("data/words.txt");
            // FlatMapFunction 方法中 第一个参数输入RDD数据类型 第二个参数为输出RDD数据类型
            // javaRDD.flatMap(new FlatMapFunction<String, String>() {
            //     @Override
            //     public Iterator<String> call(String s) throws Exception {
            //         return null;
            //    }
            // });
            JavaRDD<String> wordsRdd = javaRDD.flatMap((FlatMapFunction<String, String>) line -> Arrays.asList(line.split(" ")).listIterator());
            // (love,1)
            // wordsRdd.mapToPair(new PairFunction<String, String, Integer>() {
            //     @Override
            //     public Tuple2<String, Integer> call(String s) throws Exception {
            //         return null;
            //    }
            // });
            JavaPairRDD<String, Integer> pairRDD = wordsRdd.mapToPair((PairFunction<String, String, Integer>) word -> new Tuple2<>(word, 1));
            // 根据 word 分组
            // pairRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
            //     @Override
            //     public Integer call(Integer integer, Integer integer2) throws Exception {
            //         return null;
            //     }
            // });

            // pairRDD.reduceByKey((Function2<Integer, Integer, Integer>) (x, y) -> x + y);
            JavaPairRDD<String, Integer> javaPairRDD = pairRDD.reduceByKey((Function2<Integer, Integer, Integer>) Integer::sum);
            List<Tuple2<String, Integer>> collect = javaPairRDD.collect();
            collect.forEach(System.out::println);
            javaPairRDD.saveAsTextFile("result");
        }
    }
}

ScalaWordCount
package com.lihaozhe.course02

import org.apache.spark.{SparkConf, SparkContext}

/**
 * @author 李昊哲
 * @version 1.0.0  2023/5/12 下午9:38
 */
object ScalaWordCount01 {
  def main(args: Array[String]): Unit = {
    System.setProperty("HADOOP_USER_NAME", "root")
    // val conf = new SparkConf().setAppName("词频统计").setMaster("local")
    val conf = new SparkConf().setAppName("词频统计")
    conf.setMaster("local")
    val sc = new SparkContext(conf)
    val content = sc.textFile("data/words.txt")
    // 遍历输出
    content.foreach(println)
    // 讲结果转为 flatMap
    val words = content.flatMap(_.split(" "))
    words.foreach(println)
    // 转为 key value结果
    // (love,Seq(love, love, love, love, love))
    val wordGroup = words.groupBy(word => word)
    wordGroup.foreach(println)
    val wordCount = wordGroup.mapValues(_.size)
    wordCount.foreach(println)
    // 将计算结果落地本地磁盘
    wordCount.saveAsTextFile("result")
    sc.stop()
  }
}

package com.lihaozhe.course02

import org.apache.spark.{SparkConf, SparkContext}

/**
 * @author 李昊哲
 * @version 1.0.0  2023/5/12 下午9:38
 */
object ScalaWordCount02 {
  def main(args: Array[String]): Unit = {
    System.setProperty("HADOOP_USER_NAME", "root")
    // val conf = new SparkConf().setAppName("词频统计").setMaster("local")
    val conf = new SparkConf().setAppName("词频统计")
    conf.setMaster("local")
    val sc = new SparkContext(conf)
    val content = sc.textFile("data/words.txt")
    // 遍历输出
    content.foreach(println)
    // 讲结果转为 flatMap
    val words = content.flatMap(_.split(" "))
    words.foreach(println)
    // 转为 key value结果
    // 通过隐式转换 讲列表中的单词 转为map 返回
    // (China,1)
    val wordMap = words.map((_, 1))
    wordMap.foreach(println)
    val wordCount = wordMap.reduceByKey(_ + _)
    wordCount.foreach(println)
    // 将计算结果落地本地磁盘
    wordCount.saveAsTextFile("result")
    sc.stop()
  }
}

项目打包发布
mvn package

上传jar文件到集群

在集群上提交

spark-submit --master yarn --class com.lihaozhe.course02.JavaWordCount spark-code.jar 
spark-submit --master yarn --class com.lihaozhe.course02.ScalaWordCount01 spark-code.jar 
spark-submit --master yarn --class com.lihaozhe.course02.ScalaWordCount02 spark-code.jar 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/19041.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

进程(二)

进程二 2.6 调度的概念、层次2.6.1 基本概念2.6.2 三个层次2.6.3 三层调度的联系、对比2.6.4 补充知识2.6.5 本小节总结 2.7 进程调度的时机、切换与过程、方式2.7.1 进程调度的时机2.7.2 切换与过程2.7.3 进程调度的方式2.7.4 总结 2.8 调度器/调度程序/闲逛线程2.9 调度算法的…

Python基础入门编程代码练习(六)

一、模拟房产经纪人来管理房屋信息 编写业务实现 家具类&#xff1a;HouseItem 属性&#xff1a;名字 name&#xff0c;占地面积 area 方法&#xff1a;__init__ , __str__ 类名&#xff1a;房子类 House 属性&#xff1a;户型 name&#xff0c;总面积&#xff1a;total_are…

Word怎么分页,提高效率就靠这3种方法!

案例&#xff1a;Word怎么分页 【文档要进行分页处理&#xff0c;但是我尝试了好多次还是不行&#xff01;大家知道Word怎么分页吗&#xff1f;】 在使用Microsoft Word处理文档时&#xff0c;我们常常需要进行分页操作。Word的分页功能可以将文档分成多个页面&#xff0c;以…

【Selenium上】——全栈开发——如桃花来

目录索引 Selenium是什么&#xff1a;下载和配置环境变量&#xff1a;1. 基本使用&#xff1a;导入五个常用包&#xff1a;基本代码&#xff1a; 实例引入&#xff1a;声明不同浏览器对象&#xff1a;访问页面&#xff1a; Selenium是什么&#xff1a; Selenium是一个用于Web应…

怎么把pdf中的某一页分出来?

怎么把pdf中的某一页分出来&#xff1f;PDF格式的文档在日常生活中是非常常见的&#xff0c;相信大家都对其有所了解&#xff0c;并且经常使用。它的主要特点是不允许用户随意编辑其中的内容&#xff0c;当我们仅需要阅读时&#xff0c;PDF文档无疑是十分方便的&#xff0c;尤其…

Python爬虫解读

爬虫&#xff1a; Python爬虫是指利用计算机程序或者脚本自动抓取网站数据的一种行为&#xff0c;通常是为了提取网站数据或者进行数据分析等目的。 Python 爬虫可以分为手动爬虫和自动爬虫两种。手动爬虫是指完全由人工编写代码来实现的爬虫&#xff0c;这种方式需要编写大量的…

NAS私有云存储 - 搭建Nextcloud私有云盘并公网远程访问

文章目录 摘要视频教程1. 环境搭建2. 测试局域网访问3. 内网穿透3.1 ubuntu本地安装cpolar3.2 创建隧道3.3 测试公网访问 4 配置固定http公网地址4.1 保留一个二级子域名4.1 配置固定二级子域名4.3 测试访问公网固定二级子域名 转载自内网穿透工具的文章&#xff1a;使用Nextcl…

Selenium自动化测试中的PageObject模式

PageObject模式简介 众所周知&#xff0c;UI页面元素常常是不稳定的&#xff0c;在使用Selenium编写WebUI自动化测试用例时&#xff0c;随着测试脚本的增加&#xff0c;维护和更新这些元素便成为一个令人头疼的问题。 在普通模式下&#xff0c;脚本直接定位并操作元素&#xf…

进程(一)

进程&#xff08;一&#xff09; 2.1 进程的定义、组成、组织方式、特征2.1.1 定义2.1.2 组成2.1.3 组织方式2.1.4 特征2.1.5 本小节总结 2.2 进程的状态与转换2.2.1 进程的状态2.2.3 进程状态的转换2.2.4 本小节总结 2.3 进程控制2.3.1 基本概念2.3.2 进程控制相关的原语2.3.3…

【Java零基础入门篇】第 ⑤ 期 - 抽象类和接口(二)

博主&#xff1a;命运之光 专栏&#xff1a;Java零基础入门 学习目标 1.了解什么是抽象类&#xff0c;什么是接口&#xff1b; 2.掌握抽象类和接口的定义方法&#xff1b; 3.理解接口和抽象类的使用场景&#xff1b; 4.掌握多态的含义和用法&#xff1b; 5.掌握内部类的定义方法…

vue学习

{{}} 插值语法&#xff0c;应用vue实例容器中&#xff0c;获取标签值 v-bind v-bind&#xff1a;单向绑定&#xff0c;实例对象key的对应的值&#xff0c;绑定到vue实例容器标签的属性中 简写&#xff1a;: v-model v-model:双向绑定 注意&#xff1a;v-model只能应用于‘表单…

MathType7简体中文版数学公式编辑器下载安装教程

MathType一款专业的数学公式编辑器&#xff0c;理科生专用的必备工具&#xff0c;可应用于教育教学、科研机构、工程学、论文写作、期刊排版、编辑理科试卷等领域。2018年2月&#xff0c;MathType 7简体中文版正式发布&#xff0c;给用户带来全新的体验。MathType 是Windows和M…

jenkins,gitlab,实时构建推送

首先jdk&#xff0c;jenkins安装好&#xff0c;新版jenkins不支持jdk8 然后安装环境maven&#xff0c;git 环境配置 插件安装 gitlab插件 Build Authorization Token Root插件 插件环境整好之后新建个任务 源码管理&#xff0c;填入仓库https地址&#xff0c;添加git…

shell函数

目录 一、shell函数 1.函数的作用 2.函数的优点 二、shell函数的格式 2.1函数返回值return 2.2函数变量 三、函数传参 四、递归函数 4.1查找输入目录下文件及其子目录下文件 4.2将IP地址转换为二进制 五、函数数据库 一、shell函数 1.函数的作用 把程序里需要多次…

C++Primer第20章 iostream库

第20章 iostream库 C中的IO流是通过多继承和虚拟继承实现的,下面是它的关系. 我们要学习的就是三个库,这里我会把重点的拿出来 iostream: 终端操作fstream:文件操作sstream:格式化操作 20.1 输出操作符<< 输出操作符可以接受任何内置数据类型的实参,包含我们的const …

怎么搭建个人小型渲染农场?搭建渲染农场配置

渲染农场是众多机器组成的渲染集群&#xff0c;通常用来渲染你的单帧效果图或动画项目&#xff0c;我们借助渲染农场的力量&#xff0c;可以满足3D项目交期时间迫在眉睫的需求&#xff0c;当你试着在自己的机器上渲染一个复杂的动画项目时&#xff0c;可能需要几十小时的等待时…

车载软件架构——闲聊几句AUTOSAR BSW(四)

我是穿拖鞋的汉子,魔都中坚持长期主义的工程师。 老规矩,分享一段喜欢的文字,避免自己成为高知识低文化的工程师: 我们并不必要为了和谐,而时刻保持通情达理;我们需要具备的是,偶尔有肚量欣然承认在某些方面我们可能会有些不可理喻。该有主见的时候能掷地有声地镇得住场…

大数据赛项|2023年广东省大学生计算机设计大赛初赛结果公示

2023年广东省大学生计算机设计大赛 暨第16届中国大学生计算机设计大赛 粤港澳大湾区赛初赛结果公示 根据《广东省教育厅关于做好2023年广东省本科高校大学生学科竞赛工作的通知》&#xff0c;广东外语外贸大学承办2023年“广东省大学生计算机设计大赛”。 在广大师生的热情…

达摩院开源多模态对话大模型mPLUG-Owl

miniGPT-4的热度至今未减&#xff0c;距离LLaVA的推出也不到半个月&#xff0c;而新的看图聊天模型已经问世了。今天要介绍的模型是一款类似于miniGPT-4和LLaVA的多模态对话生成模型&#xff0c;它的名字叫mPLUG-Owl。 论文链接&#xff1a;https://arxiv.org/abs/2304.14178…

实时操作系统内存管理-TLSF算法

内存管理-TLSF算法 前言TLSF算法&#xff1a;为什么内存又叫内存“块”&#xff1a;O(1)查找空闲块&#xff1a;确定fl&#xff1a;确定sl&#xff1a;提级申请&#xff1a;分割块&#xff1a; 空闲块如何串成链表&#xff1f;减少外部碎片&#xff1a;查找上下块&#xff1a; …