4. 启动Spark Shell编程
4.1 什么是Spark Shell
spark shell是spark中的交互式命令行客户端,可以在spark shell中使用scala编写spark程序,启动后默认已经创建了SparkContext,别名为sc
4.2 启动Spark Shell
Shell /opt/apps/spark-3.2.3-bin-hadoop3.2/bin/spark-shell \ --master spark://node-1.51doit.cn:7077 --executor-memory 1g \ --total-executor-cores 3 |
如果Master配置了HA高可用,需要指定两个Master(因为这两个Master任意一个都可能是Active状态)
Shell /bigdata/spark-3.2.3-bin-hadoop3.2/bin/spark-shell \ --master spark://node-1.51doit.cn:7077,node-2.51doit.cn:7077 \ --executor-memory 1g \ --total-executor-cores 3 |
参数说明:
--master 指定masterd地址和端口,协议为spark://,端口是RPC的通信端口
--executor-memory 指定每一个executor的使用的内存大小
--total-executor-cores指定整个application总共使用了cores
Shell sc.textFile("hdfs://node-1.51doit.cn:9000/words.txt").flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_).sortBy(_._2,false).saveAsTextFile("hdfs://node-1.51doit.cn:9000/out") |
5. Spark编程入门
5.1 Scala编写Spark的WorkCount
5.1.1 创建一个Maven项目
5.1.2 在pom.xml中添加依赖和插件
XML <!-- 定义的一些常量 --> <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> <encoding>UTF-8</encoding> <spark.version>3.2.3</spark.version> <scala.version>2.12.15</scala.version> </properties>
<dependencies> <!-- scala的依赖 --> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency>
<!-- spark core 即为spark内核 ,其他高级组件都要依赖spark core --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.12</artifactId> <version>${spark.version}</version> </dependency>
</dependencies>
<!-- 配置Maven的镜像库 --> <!-- 依赖下载国内镜像库 --> <repositories> <repository> <id>nexus-aliyun</id> <name>Nexus aliyun</name> <layout>default</layout> <url>http://maven.aliyun.com/nexus/content/groups/public</url> <snapshots> <enabled>false</enabled> <updatePolicy>never</updatePolicy> </snapshots> <releases> <enabled>true</enabled> <updatePolicy>never</updatePolicy> </releases> </repository> </repositories>
<!-- maven插件下载国内镜像库 --> <pluginRepositories> <pluginRepository> <id>ali-plugin</id> <url>http://maven.aliyun.com/nexus/content/groups/public/</url> <snapshots> <enabled>false</enabled> <updatePolicy>never</updatePolicy> </snapshots> <releases> <enabled>true</enabled> <updatePolicy>never</updatePolicy> </releases> </pluginRepository> </pluginRepositories>
<build> <pluginManagement> <plugins> <!-- 编译scala的插件 --> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.2</version> </plugin> <!-- 编译java的插件 --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.5.1</version> </plugin> </plugins> </pluginManagement> <plugins> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <executions> <execution> <id>scala-compile-first</id> <phase>process-resources</phase> <goals> <goal>add-source</goal> <goal>compile</goal> </goals> </execution> <execution> <id>scala-test-compile</id> <phase>process-test-resources</phase> <goals> <goal>testCompile</goal> </goals> </execution> </executions> </plugin>
<plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <executions> <execution> <phase>compile</phase> <goals> <goal>compile</goal> </goals> </execution> </executions> </plugin>
<!-- 打jar插件 --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>2.4.3</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <filters> <filter> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> </configuration> </execution> </executions> </plugin> </plugins> </build> |
5.1.3 创建一个scala目录
选择scala目录,右键,将目录转成源码包,或者点击maven的刷新按钮
5.1.4 编写Spark程序
Scala import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext}
/** * 1.创建SparkContext * 2.创建RDD * 3.调用RDD的Transformation(s)方法 * 4.调用Action * 5.释放资源 */ object WordCount {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("WordCount") //创建SparkContext,使用SparkContext来创建RDD val sc: SparkContext = new SparkContext(conf) //spark写Spark程序,就是对抽象的神奇的大集合【RDD】编程,调用它高度封装的API //使用SparkContext创建RDD val lines: RDD[String] = sc.textFile(args(0))
//Transformation 开始 // //切分压平 val words: RDD[String] = lines.flatMap(_.split(" ")) //将单词和一组合放在元组中 val wordAndOne: RDD[(String, Int)] = words.map((_, 1)) //分组聚合,reduceByKey可以先局部聚合再全局聚合 val reduced: RDD[(String, Int)] = wordAndOne.reduceByKey(_+_) //排序 val sorted: RDD[(String, Int)] = reduced.sortBy(_._2, false) //Transformation 结束 //
//调用Action将计算结果保存到HDFS中 sorted.saveAsTextFile(args(1)) //释放资源 sc.stop() } } |
5.1.5 使用maven打包
- 使用idea图形界面打包:
5.1.6 提交任务
- 上传jar包到服务器,然后使用sparksubmit命令提交任务
Shell /bigdata/spark-3.2.3-bin-hadoop3.2/bin/spark-submit \ --master spark://node-1.51doit.cn:7077 \ --executor-memory 1g --total-executor-cores 4 \ --class cn._51doit.spark.day01.WordCount \ /root/spark-in-action-1.0.jar hdfs://node-1.51doit.cn:9000/words.txt hdfs://node-1.51doit.cn:9000/out |
参数说明:
--master 指定masterd地址和端口,协议为spark://,端口是RPC的通信端口
--executor-memory 指定每一个executor的使用的内存大小
--total-executor-cores指定整个application总共使用了cores
--class 指定程序的main方法全类名
jar包路径 args0 args1
5.2 Java编写Spark的WordCount
5.2.1 使用匿名实现类方式
Java import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import scala.Tuple2;
import java.util.Arrays; import java.util.Iterator;
public class JavaWordCount {
public static void main(String[] args) { SparkConf sparkConf = new SparkConf().setAppName("JavaWordCount"); //创建JavaSparkContext JavaSparkContext jsc = new JavaSparkContext(sparkConf); //使用JavaSparkContext创建RDD JavaRDD<String> lines = jsc.textFile(args[0]); //调用Transformation(s) //切分压平 JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() { @Override public Iterator<String> call(String line) throws Exception { return Arrays.asList(line.split(" ")).iterator(); } }); //将单词和一组合在一起 JavaPairRDD<String, Integer> wordAndOne = words.mapToPair( new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String word) throws Exception { return Tuple2.apply(word, 1); } }); //分组聚合 JavaPairRDD<String, Integer> reduced = wordAndOne.reduceByKey( new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }); //排序,先调换KV的顺序VK JavaPairRDD<Integer, String> swapped = reduced.mapToPair( new PairFunction<Tuple2<String, Integer>, Integer, String>() { @Override public Tuple2<Integer, String> call(Tuple2<String, Integer> tp) throws Exception { return tp.swap(); } }); //再排序 JavaPairRDD<Integer, String> sorted = swapped.sortByKey(false); //再调换顺序 JavaPairRDD<String, Integer> result = sorted.mapToPair( new PairFunction<Tuple2<Integer, String>, String, Integer>() { @Override public Tuple2<String, Integer> call(Tuple2<Integer, String> tp) throws Exception { return tp.swap(); } }); //触发Action,将数据保存到HDFS result.saveAsTextFile(args[1]); //释放资源 jsc.stop(); } } |
5.2.2 使用Lambda表达式方式
Java import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import scala.Tuple2;
import java.util.Arrays;
public class JavaLambdaWordCount {
public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("JavaLambdaWordCount"); //创建SparkContext JavaSparkContext jsc = new JavaSparkContext(conf); //创建RDD JavaRDD<String> lines = jsc.textFile(args[0]); //切分压平 JavaRDD<String> words = lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator()); //将单词和一组合 JavaPairRDD<String, Integer> wordAndOne = words.mapToPair(word -> Tuple2.apply(word, 1)); //分组聚合 JavaPairRDD<String, Integer> reduced = wordAndOne.reduceByKey((a, b) -> a + b); //调换顺序 JavaPairRDD<Integer, String> swapped = reduced.mapToPair(tp -> tp.swap()); //排序 JavaPairRDD<Integer, String> sorted = swapped.sortByKey(false); //调换顺序 JavaPairRDD<String, Integer> result = sorted.mapToPair(tp -> tp.swap()); //将数据保存到HDFS result.saveAsTextFile(args[1]); //释放资源 jsc.stop(); } } |
5.3 本地运行Spark和Debug
spark程序每次都打包上在提交到集群上比较麻烦且不方便调试,Spark还可以进行Local模式运行,方便测试和调试
5.3.1 在本地运行
Scala //Spark程序local模型运行,local[*]是本地运行,并开启多个线程 val conf: SparkConf = new SparkConf() .setAppName("WordCount") .setMaster("local[*]") //设置为local模式执行 |
5.3.2 读取HDFS中的数据
由于往HDFS中的写入数据存在权限问题,所以在代码中设置用户为HDFS目录的所属用户
Scala //往HDFS中写入数据,将程序的所属用户设置成更HDFS一样的用户 System.setProperty("HADOOP_USER_NAME", "root") |
5.4 使用PySpark(选学)
5.4.1 配置python环境
① 在所有节点上按照python3,版本必须是python3.6及以上版本
Shell yum install -y python3 |
② 修改所有节点的环境变量
Shell export JAVA_HOME=/usr/local/jdk1.8.0_251 export PYSPARK_PYTHON=python3 export HADOOP_HOME=/bigdata/hadoop-3.2.1 export HADOOP_CONF_DIR=/bigdata/hadoop-3.2.1/etc/hadoop export PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin |
5.4.2 使用pyspark shell
Shell /bigdata/spark-3.2.3-bin-hadoop3.2/bin/pyspark \ --master spark://node-1.51doit.cn:7077 \ --executor-memory 1g --total-executor-cores 10 |
在pyspark shell使用python编写wordcount
Python sc.textFile("hdfs://node-1.51doit.cn:8020/data/wc").flatMap(lambda line: line.split(' ')).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b).sortBy(lambda t: t[1], False).saveAsTextFile('hdfs://node-1.51doit.cn:8020/out01') |
5.4.3 配置PyCharm开发环境
①配置python的环境
②配置pyspark的依赖
点击Project Structure将Spark安装包下python/lib目录的py4j-*-src.zip和pyspark.zip添加进来
③添加环境变量
点击Edit Configuration
在pycharm中使用python编写wordcount
Python from pyspark import SparkConf, SparkContext
if __name__ == '__main__': conf = SparkConf().setAppName('WordCount').setMaster('local[*]') sc = SparkContext(conf=conf) lines = sc.textFile('file:///Users/star/Desktop/data.txt') words = lines.flatMap(lambda line: line.split(' ')) wordAndOne = words.map(lambda word: (word, 1)) reduced = wordAndOne.reduceByKey(lambda x, y: x + y) result = reduced.sortBy(lambda t: t[1], False) print(result.collect()) |