IDEA编写各种WordCount运行

目录

一、编写WordCount(Spark_scala)提交到spark高可用集群

1.项目结构

2.导入依赖

3.编写scala版的WordCount

4.maven打包

5.运行jar包

​6.查询hdfs的输出结果

二、本地编写WordCount(Spark_scala)读取本地文件

1.项目结构

2.编写scala版的WordCount

3.编辑Edit Configurations配置文件

4.直接本地运行LocalWordCount.scala文件

三、本地编写WordCount(Spark_java版)读取本地文件,并输出到本地

1.项目结构

2.编写java版的WordCount

3.编辑Edit Configurations配置文件

4.运行后查看结果

四、本地编写WordCount(Spark_java_lambda版)读取本地文件,并输出到本地

1.项目结构

2.编写java-lambda版的WordCount

3.编辑Edit Configurations配置文件

4.运行后查看结果


搞了一个晚上加一个白天,总算搞出来了,呼~~

本地编写IDEA之前需要在windows下安装scala、hadoop和spark环境,参考文章如下:

《Scala安装》 《Windows环境部署Hadoop-3.3.2和Spark3.3.2》

一、编写WordCount(Spark_scala)提交到spark高可用集群

首先安装好scala,然后在IDEA创建一个maven项目,开始编写代码

1.项目结构

2.导入依赖

<name>spark-in-action</name>
    <url>http://maven.apache.org</url>

    <!-- 定义的一些常量 -->
    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <encoding>UTF-8</encoding>
        <spark.version>3.3.2</spark.version>
        <scala.version>2.12.15</scala.version>
    </properties>

    <dependencies>
        <!-- scala的依赖 -->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>

        <!-- spark core 即为spark内核 ,其他高级组件都要依赖spark core -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.12</artifactId>
            <version>${spark.version}</version>
        </dependency>

    </dependencies>

    <!-- 配置Maven的镜像库 -->
    <!-- 依赖下载国内镜像库 -->
    <repositories>
        <repository>
            <id>nexus-aliyun</id>
            <name>Nexus aliyun</name>
            <layout>default</layout>
            <url>http://maven.aliyun.com/nexus/content/groups/public</url>
            <snapshots>
                <enabled>false</enabled>
                <updatePolicy>never</updatePolicy>
            </snapshots>
            <releases>
                <enabled>true</enabled>
                <updatePolicy>never</updatePolicy>
            </releases>
        </repository>
    </repositories>

    <!-- maven插件下载国内镜像库 -->
    <pluginRepositories>
        <pluginRepository>
            <id>ali-plugin</id>
            <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
            <snapshots>
                <enabled>false</enabled>
                <updatePolicy>never</updatePolicy>
            </snapshots>
            <releases>
                <enabled>true</enabled>
                <updatePolicy>never</updatePolicy>
            </releases>
        </pluginRepository>
    </pluginRepositories>

    <build>
        <pluginManagement>
            <plugins>
                <!-- 编译scala的插件 -->
                <plugin>
                    <groupId>net.alchim31.maven</groupId>
                    <artifactId>scala-maven-plugin</artifactId>
                    <version>3.2.2</version>
                </plugin>
                <!-- 编译java的插件 -->
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.5.1</version>
                </plugin>
            </plugins>
        </pluginManagement>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <executions>
                    <execution>
                        <id>scala-compile-first</id>
                        <phase>process-resources</phase>
                        <goals>
                            <goal>add-source</goal>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                    <execution>
                        <id>scala-test-compile</id>
                        <phase>process-test-resources</phase>
                        <goals>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <executions>
                    <execution>
                        <phase>compile</phase>
                        <goals>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

            <!-- 打jar插件 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.4.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

3.编写scala版的WordCount

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object WordCount {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("WordCount")
    // 1.创建SparkContext
    val sc = new SparkContext(conf)

    // 2.创建RDD(神奇的大集合,该集合中不装真正要计算的数据,而是装描述信息)
    val lines: RDD[String] = sc.textFile(args(0))

    // 3.对RDD进行操作,调用RDD的方法
    // -------------Transformation (转换算子开始) --------------
    // 切分压平
    val words: RDD[String] = lines.flatMap(_.split(" "))

    // 将单词和1组合放入到元组中
    val wordAndOne: RDD[(String, Int)] = words.map((_, 1))

    // 将key相同的数据进行分组聚合
    val reduced: RDD[(String, Int)] = wordAndOne.reduceByKey(_ + _)

    // 按照次数排序
    val sorted: RDD[(String, Int)] = reduced.sortBy(_._2, false) // 降序
    // -------------Transformation (转换算子结束) --------------

    // 4.调用Action
    // 将数据写入到外部的存储系统中
    sorted.saveAsTextFile(args(1))

    // 5.释放资源
    sc.stop()
  }
}

4.maven打包

这个是胖包,除了项目本身的依赖,还有其他依赖,上面的original是瘦包,只有项目本身的依赖

5.运行jar包

首先启动zk、hdfs和spark高可用集群,这里我搭建的是standalone模式的高可用集群,不是on Yarn的

创建/opt/soft/spark-3.2.3/submit目录,将jar包上传到该目录下

提交命令

[root@node141 submit]# ../bin/spark-submit --master spark://node141:7077 --class cn.doitedu.day01.WordCount --executor-memory 1g --total-executor-cores 4 ./spark-in-action-1.0.jar hdfs://node141:9000/words.txt hdfs://node141:9000/out-1

--master spark://node141:7077     spark的master节点

--class cn.doitedu.day01.WordCount   运行的类名

--executor-memory 1g   占用的内存

--total-executor-cores 4  占用的核数

./spark-in-action-1.0.jar  运行的jar包地址

hdfs://node141:9000/words.txt   代码中args[0]对应的参数

hdfs://node141:9000/out-1  代码中args[1]对应的参数

6.查询hdfs的输出结果

二、本地编写WordCount(Spark_scala)读取本地文件

1.项目结构

2.编写scala版的WordCount

package cn.doitedu.day01

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
 * 使用本地模式运行Spark程序(开发调试的时候使用)
 */
object LocalWordCount {
  def main(args: Array[String]): Unit = {
    // 指定当前用户为root
    System.setProperty("HADOOP_USER_NAME", "root")

    val conf = new SparkConf()
      .setAppName("WordCount")
      .setMaster("local[*]") // 本地模式,*表示根据当前机器的核数开多个线程

    // 1.创建SparkContext
    val sc = new SparkContext(conf)

    // 2.创建RDD(神奇的大集合,该集合中不装真正要计算的数据,而是装描述信息)
    val lines: RDD[String] = sc.textFile(args(0))

    // 3.对RDD进行操作,调用RDD的方法
    // -------------Transformation (转换算子开始) --------------
    // 切分压平
    val words: RDD[String] = lines.flatMap(line => {
      val words = line.split(" ")
      println(words)  // debug
      words
    })

    // 将单词和1组合放入到元祖中
    val wordAndOne: RDD[(String, Int)] = words.map((_, 1))

    // 将key相同的数据进行分组聚合
    val reduced: RDD[(String, Int)] = wordAndOne.reduceByKey(_ + _)

    // 按照次数排序
    val sorted: RDD[(String, Int)] = reduced.sortBy(_._2, false) // 降序
    // -------------Transformation (转换算子结束) --------------

    // 4.调用Action
    // 将数据写入到外部的存储系统中
    sorted.saveAsTextFile(args(1))

    // 5.释放资源
    sc.stop()
  }
}

3.编辑Edit Configurations配置文件

4.直接本地运行LocalWordCount.scala文件

查看运行结果

三、本地编写WordCount(Spark_java版)读取本地文件,并输出到本地

1.项目结构

2.编写java版的WordCount

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;

import java.util.Arrays;
import java.util.Iterator;

public class JavaWordCount {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf()
                .setAppName("JavaWordCount")
                .setMaster("local[*]");
        JavaSparkContext jsc = new JavaSparkContext(conf);

        JavaRDD<String> lines = jsc.textFile(args[0]);

        JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterator<String> call(String lines) throws Exception {
                String[] words = lines.split("\\s+");
                return Arrays.asList(words).iterator();
            }
        });

        JavaPairRDD<String, Integer> wordAndOne = words.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String word) throws Exception {
                return Tuple2.apply(word, 1);
            }
        });

        JavaPairRDD<String, Integer> reduced = wordAndOne.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        });

        // 将原来的kv顺序颠倒  (flink,3)  ----> (3,flink)
        JavaPairRDD<Integer, String> swapped = reduced.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {
            @Override
            public Tuple2<Integer, String> call(Tuple2<String, Integer> tp) throws Exception {
                return tp.swap(); // 交换
            }
        });

        JavaPairRDD<Integer, String> sorted = swapped.sortByKey(false);

        JavaPairRDD<String, Integer> result = sorted.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(Tuple2<Integer, String> tp) throws Exception {
                return tp.swap();
            }
        });

        result.saveAsTextFile(args[1]);

        jsc.stop();
    }
}

3.编辑Edit Configurations配置文件

4.运行后查看结果

四、本地编写WordCount(Spark_java_lambda版)读取本地文件,并输出到本地

1.项目结构

2.编写java-lambda版的WordCount

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;

import java.util.Arrays;

public class JavaLambdaWordCount {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf()
                .setAppName("JavaLambdaWordCount")
                .setMaster("local[*]");
        JavaSparkContext jsc = new JavaSparkContext(conf);

        JavaRDD<String> lines = jsc.textFile(args[0]);
        JavaRDD<String> words = lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator());
        JavaPairRDD<String, Integer> wordAndOne = words.mapToPair(w -> Tuple2.apply(w, 1));
//        JavaPairRDD<String, Integer> reduced = wordAndOne.reduceByKey((a, b) -> a + b);
        JavaPairRDD<String, Integer> reduced = wordAndOne.reduceByKey(Integer::sum);
        JavaPairRDD<Integer, String> swapped = reduced.mapToPair(Tuple2::swap);
        // 排序
        JavaPairRDD<Integer, String> sorted = swapped.sortByKey(false);
        // 调回来
        JavaPairRDD<String, Integer> result = sorted.mapToPair(Tuple2::swap);
        result.saveAsTextFile(args[1]);
        jsc.stop();
    }
}

3.编辑Edit Configurations配置文件

4.运行后查看结果

好啦~~

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/458312.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

HDFS的架构优势与基本操作

目录 写在前面一、 HDFS概述1.1 HDFS简介1.2 HDFS优缺点1.2.1 优点1.2.2 缺点 1.3 HDFS组成架构1.4 HDFS文件块大小 二、HDFS的Shell操作&#xff08;开发重点&#xff09;2.1 基本语法2.2 命令大全2.3 常用命令实操2.3.1 上传2.3.2 下载2.3.3 HDFS直接操作 三、HDFS的API操作3…

influxdb2使用

&#xff08;作者&#xff1a;陈玓玏&#xff09; influxdb2首次使用时&#xff0c;通过k8s部署的&#xff0c;所以进入pod内部执行命令。 先在k8sdashboard找到influx的pod&#xff0c;点击执行&#xff0c;即可进入命令行界面。 首次连接时&#xff0c;通过influx setup启动…

Tomcat部署web项目与idea中配置web项目方法【通俗易懂】

✨前言✨   本文章主要介绍tomcat环境的配置&#xff0c;idea配置web项目&#xff0c;idea一般项目中配置tomcat&#xff0c;内容有点长&#xff0c;建议点击目录跳转阅读&#xff0c;文中所含均为官方文件&#xff0c;请放心使用。 &#x1f352;欢迎点赞 &#x1f44d; 收藏…

Visual Studio 2022 配置“Debug|x64”的 Designtime 生成失败。IntelliSense 可能不可用。

今天写代码&#xff0c;无缘无故就给我整个这个错误出来&#xff0c;我一头雾水。 经过我几个小时的奋战&#xff0c;终于解决问题 原因就是这个Q_INTERFACES(&#xff09;宏&#xff0c;我本想使用Q_DECLARE_INTERFACE Q_INTERFACES这两个Qt宏实现不继承QObject也能使用qobjec…

【AI+CAD】(二)LLM和VLM生成结构化数据结构(PPT/CAD/DXF)

当前LLM和VLM在PPT生成任务上已经小有成效,如ChatPPT。 @TOC 1. PPT-LLM LLM根据用户的instruction生成规范的绘制ppt的API语句:即使是最强的GPT-4 + CoT也只能达到20-30%的内容准确度。 LLM输入:User_instruction(当前+过去)、PPT_content、PPT_reader_API。其中 PPT_rea…

STM32F103 CubeMX 定时器输出PWM控制呼吸灯

STM32F103 CubeMX 定时器输出PWM控制呼吸灯 1. 生成代码1. 1 配置外部时钟&#xff0c;SWD口1.2 配置定时器31.3 配置定时器2 2. 代码编写的部分 使用的cubmx 软件的版本&#xff1a;6.2.0 最新版本 6.10&#xff08;2024年3月11日&#xff09; 1. 生成代码 1. 1 配置外部时钟…

Opencascade基础教程(9):切换视图

1、切换视图 1、1 增加视图切换按钮&#xff0c;并添加消息响应函数。 void COCCDemoView::OnButtonFrontview() {//前视图m_View->SetProj(V3d_Yneg);m_View->FitAll(); }void COCCDemoView::OnButtonRearview() {//后视图m_View->SetProj(V3d_Ypos);m_View->Fit…

Spring Web MVC入门(1)

什么是Spring Web MVC? 定义:Spring Web MVC是基于Servlet构建的原始Web框架,从一开始就包含在Spring框架中.它的正式名称"Spring Web MVC"来自其源模块的名称(Spring-webmvc),但是它通常被称为"Spring MVC". 什么是Servlet? Servlet是一种实现动态页面…

【linux线程(二)】线程互斥与线程同步

&#x1f493;博主CSDN主页:杭电码农-NEO&#x1f493;   ⏩专栏分类:Linux从入门到精通⏪   &#x1f69a;代码仓库:NEO的学习日记&#x1f69a;   &#x1f339;关注我&#x1faf5;带你学更多操作系统知识   &#x1f51d;&#x1f51d; Linux线程 1. 前言2. 多线程互…

比Let‘s Encrypt更简单更齐全的免费证书申请教程

步骤一 打开JoySSL官网&#xff0c;注册属于你的专属账号&#xff1b; 永久免费SSL证书申请地址真正完全且永久免费&#xff01;不用您花一分钱&#xff0c;SSL证书免费使用90天&#xff0c;并且还支持连续签发。JoySSL携手全球权威可信顶级根&#xff0c;自研新一代SSL证书&…

Elasticsearch:使用标记修剪提高文本扩展性能

作者&#xff1a;来自 Elastic Kathleen DeRusso 本博客讨论了 ELSER 性能的令人兴奋的新增强功能&#xff0c;该增强功能即将在 Elasticsearch 的下一版本中推出&#xff01; 标记&#xff08;token&#xff09;修剪背后的策略 我们已经详细讨论了 Elasticsearch 中的词汇和…

【Java 并发】AbstractQueuedSynchronizer 中的 Condition

1 简介 任何一个 Java 对象都天然继承于 Object 类, 在线程间实现通信的往往会应用到 Object 的几个方法, 比如 wait(), wait(long timeout), wait(long timeout, int nanos) 与 notify(), notifyAll() 几个方法实现等待 / 通知机制。同样的, 在 Java Lock 体系下也有同样的方…

【Python】进阶学习:计算一个人BMI(身体质量指数)指数

【Python】进阶学习&#xff1a;计算一个人BMI&#xff08;身体质量指数&#xff09;指数 &#x1f308; 个人主页&#xff1a;高斯小哥 &#x1f525; 高质量专栏&#xff1a;Matplotlib之旅&#xff1a;零基础精通数据可视化、Python基础【高质量合集】、PyTorch零基础入门教…

考研失败, 学点Java打小工——Day3

1 编码规范——卫语句 表达异常分支时&#xff0c;少用if-else方式。   比如成绩判断中对于非法输入的处理&#xff1a; /*>90 <100 优秀>80 <90 良好>70 <80 一般>60 <70 及格<60 不及格*/Testpu…

蓝桥杯深度优先搜索|剪枝|N皇后问题|路径之谜(C++)

搜索&#xff1a;暴力法算法思想的具体实现 搜索&#xff1a;通用的方法&#xff0c;一个问题如果比较难&#xff0c;那么先尝试一下搜索&#xff0c;或许能启发出更好的算法 技巧&#xff1a;竞赛时遇到不会的难题&#xff0c;用搜索提交一下&#xff0c;说不定部分判题数据很…

李三清研究引领力学定律新篇章,光子模型图揭秘

一周期内&#xff0c;垂直&#xff0c;曲率不变&#xff0c;方向转向互变&#xff0c;正向反向互变&#xff0c;左旋右旋互变。变无限粗或变无限厚才发生质变&#xff0c;且属于由内向外变换&#xff0c;所以对应变换就是由内点向外点变换。 由于方向转向不能分割&#xff0c;…

画图实战-Python实现某产品全年销量数据多种样式可视化

画图实战-Python实现某产品全年销量数据多种样式可视化 学习心得Matplotlib说明什么是Matplotlib&#xff1f;Matplotlib特性Matplotlib安装 产品订单量-折线图某产品全年订单量数据数据提取和分析绘制折线图 产品订单&销售额-条形图某产品全年订单&销售额数据绘制条形…

Ollama管理本地开源大模型,用Open WebUI访问Ollama接口

现在开源大模型一个接一个的&#xff0c;而且各个都说自己的性能非常厉害&#xff0c;但是对于我们这些使用者&#xff0c;用起来就比较尴尬了。因为一个模型一个调用的方式&#xff0c;先得下载模型&#xff0c;下完模型&#xff0c;写加载代码&#xff0c;麻烦得很。 对于程…

windows中如何将已安装的node.js版本进行更换

第一步&#xff1a;先清除已经安装好的node.js版本 1.按健winR弹出窗口&#xff0c;键盘输入cmd,然后敲回车&#xff08;或者鼠标直接点击电脑桌面最左下角的win窗口图标弹出&#xff0c;输入cmd再点击回车键&#xff09; 然后进入命令控制行窗口&#xff0c;并输入where node…

upload文件上传漏洞复现

什么是文件上传漏洞&#xff1a; 文件上传漏洞是指由于程序员在对用户文件上传部分的控制不足或者处理缺陷&#xff0c;而导致的用户可以越过其本身权限向服务器上上传可执行的动态脚本文件。这里上传的文件可以是木马&#xff0c;病毒&#xff0c;恶意脚本或者WebShell等。“…