Spark Shell的简单使用

简介

        Spark shell是一个特别适合快速开发Spark原型程序的工具,可以帮助我们熟悉Scala语言。即使你对Scala不熟悉,仍然可以使用这个工具。Spark shell使得用户可以和Spark集群交互,提交查询,这便于调试,也便于初学者使用Spark。前一章介绍了运行Spark实例之前的准备工作,现在你可以开启一个Spark shell,然后用下面的命令连接你的集群:

spark-shell  spark://vm02:7077

格式:spark-shell  spark://host:port, 可以进入spark集群的任意一个节点
默认情况是进入到一个scala语言环境的一个交互窗口。

[hadoop@vm03 bin]$ spark-shell  spark://vm02:7077
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/12/21 20:06:11 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Spark context Web UI available at http://vm03:4040
Spark context available as 'sc' (master = local[*], app id = local-1703160374523).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.5.0
      /_/
         
Using Scala version 2.12.18 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_144)
Type in expressions to have them evaluated.
Type :help for more information.

scala>

以上进入spark交互窗口中,输出一些日志信息,包含指定APP ID信息。

        master = local[*], app id = local-1703160374523

local[*] 是一种运行模式,用于指定 Spark 应用程序在本地模式下运行,而 * 表示 Spark 应该使用所有可用的 CPU 核心。如果需要使用多线程运行模式需要指定运行的线程数量local[N].

加载一个简单的text文件

        在服务器上随便创建一个txt文件用于做演示

        随便造数据如下:

[hadoop@vm02 ~]$ vim text.txt

Name, Age, City, Occupation, Salary
John, 25, New York, Engineer, 80000
Alice, 30, San Francisco, Data Scientist, 90000
Bob, 28, Los Angeles, Software Developer, 85000
Eva, 22, Chicago, Student, 0
Michael, 35, Boston, Manager, 100000
Olivia, 29, Seattle, Designer, 95000
David, 31, Austin, Analyst, 88000
Sophia, 26, Denver, Teacher, 75000
Daniel, 33, Miami, Doctor, 120000
Emma, 27, Atlanta, Nurse, 70000
William, 32, Houston, Researcher, 95000
Ava, 24, Phoenix, Artist, 78000
James, 29, San Diego, Programmer, 92000
Grace, 28, Portland, Writer, 86000
Jackson, 30, Nashville, Musician, 110000
Lily, 26, Minneapolis, Chef, 89000
Ethan, 35, Detroit, Entrepreneur, 130000
Chloe, 23, Philadelphia, Student, 0
Logan, 31, Pittsburgh, Engineer, 98000
Harper, 27, Charlotte, Manager, 105000
Aiden, 28, Las Vegas, Developer, 90000
Mia, 25, Dallas, Scientist, 95000
Lucas, 30, San Antonio, Designer, 85000
Evelyn, 29, Raleigh, Teacher, 78000
Noah, 34, Orlando, Doctor, 115000
Amelia, 26, Sacramento, Analyst, 92000
Sophie, 32, Tampa, Nurse, 75000
Owen, 28, St. Louis, Researcher, 98000
Isabella, 31, Kansas City, Writer, 86000

使用spark-shell交互页面,进行读取该文件内容。

scala> val infile = sc.textFile("file:/home/hadoop/text.txt")
infile: org.apache.spark.rdd.RDD[String] = file:/home/hadoop/text.txt MapPartitionsRDD[1] at textFile at <console>:23

val infile = sc.textFile("/home/hadoop/text.txt")

        这段代码的目的是读取指定路径下的文本文件,创建一个Spark RDD(infile),该RDD包含文件中的每一行作为一个元素。这是在Spark中处理文本数据的一种常见方式。将text.txt文件中的每行作为一个RDD(Resilient Distributed Datasets)中的单独元素加载到Spark中,并返回一个名为infile的RDD。

       多副本范例

        注意当你连接到Spark的master之后,若集群中没有分布式文件系统,Spark会在集群中每一台机器上加载数据,所以要确保集群中的每个节点上都有完整数据。通常可以选择把数据放到HDFS、S3或者类似的分布式文件系统去避免这个问题。在本地模式下,可以将文件从本地直接加载,例如
        sc.textFile([filepah]),想让文件在所有机器上都有备份,请使用SparkContext类中的addFile函数,代码如下:        

import org.apache.spark.SparkFiles;
val file =sc.addFile("file:/home/hadoop/text.txt")
val inFile=sc.textFile(SparkFiles.get("text.txt"))

         addFile可以把文件分发到各个worker当中,然后worker会把文件存放在临时目录下。之后可以通过SparkFiles.get()获取文件

import org.apache.spark.SparkFiles


// 获取文件在工作节点上的本地路径
val localFilePath = SparkFiles.get("text.txt")

// 打印路径
println(s"File is distributed to: $localFilePath")

        在其他节点,可以通过  SparkFiles的get()函数获取其存储路径

         文件内容读取范例

        在读取文件的时候,需要所有节点均存在该文件,不然后报错文件不存在,本spark基于hadoop for hdfs的分布式文件系统进行演练,首先需要将文件上传到hdfs文件系统中去


[hadoop@vm02 ~]$ hdfs dfs -mkdir /hadoop 
[hadoop@vm02 ~]$ hdfs dfs -ls /
Found 3 items
drwxr-xr-x   - hadoop supergroup          0 2023-12-21 22:31 /hadoop
drwxr-xr-x   - hadoop supergroup          0 2023-12-18 10:06 /hbase
drwxr-xr-x   - hadoop supergroup          0 2023-11-28 09:33 /home
[hadoop@vm02 ~]$ hdfs dfs -put /home/hadoop/text.txt  /hadoop/
[hadoop@vm02 ~]$ hdfs dfs -ls /hadoop 
Found 1 items
-rw-r--r--   3 hadoop supergroup       1119 2023-12-21 22:31 /hadoop/text.txt

将文件上传到hdfs中去,使用first进行查看文件内容表头信息

import org.apache.spark.SparkFiles; 
val infile = sc.textFile("hdfs://vm02:8020/hadoop/text.txt") 
infile.first() 

这里的8020是hdfs的rpc端口。 

spark-shell的逻辑回归 

        在 Spark 中,逻辑回归是一种用于二分类问题的机器学习算法。尽管它的名字中包含"回归",但实际上它是一种分类算法,用于预测一个二元目标变量的概率。


scala> import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.classification.LogisticRegression

scala> import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.feature.VectorAssembler

scala> import org.apache.spark.sql.{SparkSession, DataFrame}
import org.apache.spark.sql.{SparkSession, DataFrame}

scala> 

scala> 

scala> val spark = SparkSession.builder.appName("LogisticRegressionExample").getOrCreate()
23/12/22 00:15:24 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@13f05e8e



scala> val data = Seq(
     |   (1.0, 0.1, 0.5),
     |   (0.0, 0.2, 0.6),
     |   (1.0, 0.3, 0.7),
     |   (0.0, 0.4, 0.8)
     | )
data: Seq[(Double, Double, Double)] = List((1.0,0.1,0.5), (0.0,0.2,0.6), (1.0,0.3,0.7), (0.0,0.4,0.8))

scala> 

scala> val columns = Seq("label", "feature1", "feature2")
columns: Seq[String] = List(label, feature1, feature2)

scala> 

scala> val df: DataFrame = data.toDF(columns: _*)
df: org.apache.spark.sql.DataFrame = [label: double, feature1: double ... 1 more field]

scala> df.show()
+-----+--------+--------+
|label|feature1|feature2|
+-----+--------+--------+
|  1.0|     0.1|     0.5|
|  0.0|     0.2|     0.6|
|  1.0|     0.3|     0.7|
|  0.0|     0.4|     0.8|
+-----+--------+--------+



scala> val assembler = new VectorAssembler()
assembler: org.apache.spark.ml.feature.VectorAssembler = VectorAssembler: uid=vecAssembler_dc7bc810fe30, handleInvalid=error

scala>   .setInputCols(Array("feature1", "feature2"))
res1: assembler.type = VectorAssembler: uid=vecAssembler_dc7bc810fe30, handleInvalid=error, numInputCols=2

scala>   .setOutputCol("features")
res2: res1.type = VectorAssembler: uid=vecAssembler_dc7bc810fe30, handleInvalid=error, numInputCols=2

scala> 

scala> val assembledData = assembler.transform(df)
assembledData: org.apache.spark.sql.DataFrame = [label: double, feature1: double ... 2 more fields]

scala> assembledData.show()
+-----+--------+--------+---------+
|label|feature1|feature2| features|
+-----+--------+--------+---------+
|  1.0|     0.1|     0.5|[0.1,0.5]|
|  0.0|     0.2|     0.6|[0.2,0.6]|
|  1.0|     0.3|     0.7|[0.3,0.7]|
|  0.0|     0.4|     0.8|[0.4,0.8]|
+-----+--------+--------+---------+



scala> val lr = new LogisticRegression()
lr: org.apache.spark.ml.classification.LogisticRegression = logreg_29b7d06469ba

scala>   .setLabelCol("label")
res4: org.apache.spark.ml.classification.LogisticRegression = logreg_29b7d06469ba

scala>   .setFeaturesCol("features")
res5: org.apache.spark.ml.classification.LogisticRegression = logreg_29b7d06469ba

scala>   .setMaxIter(10)
res6: res5.type = logreg_29b7d06469ba

scala>   .setRegParam(0.01)
res7: res6.type = logreg_29b7d06469ba



scala> val lrModel = lr.fit(assembledData)
23/12/22 00:15:43 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
lrModel: org.apache.spark.ml.classification.LogisticRegressionModel = LogisticRegressionModel: uid=logreg_29b7d06469ba, numClasses=2, numFeatures=2



scala> val summary = lrModel.summary
summary: org.apache.spark.ml.classification.LogisticRegressionTrainingSummary = org.apache.spark.ml.classification.BinaryLogisticRegressionTrainingSummaryImpl@4369db27

scala> println(s"Coefficients: ${lrModel.coefficients}")
Coefficients: [-4.371555225626981,-4.37155522562698]

scala> println(s"Intercept: ${lrModel.intercept}")
Intercept: 3.9343997030642823

scala> println(s"Objective History: ${summary.objectiveHistory.mkString(", ")}")
Objective History: 0.6931471805599453, 0.5954136109155707, 0.5904687934140505, 0.5901819039583514, 0.5901795791081599, 0.5901795782746598

        在进行 拟合模型的时候,会占用较高的内存,如果内存不足,会导致内存溢出而退出spark-shell会话。通过以下命令,增加算子内存

spark-shell --conf spark.executor.memory=4g

但是不能超过可用内存

free -h 

代码含义解释

import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.sql.{SparkSession, DataFrame}

此部分导入了必要的Spark MLlib类和Spark SQL类。

val spark = SparkSession.builder.appName("LogisticRegressionExample").getOrCreate()

这创建了一个Spark会话,应用程序的名称为"LogisticRegressionExample"。

val data = Seq(
  (1.0, 0.1, 0.5),
  (0.0, 0.2, 0.6),
  (1.0, 0.3, 0.7),
  (0.0, 0.4, 0.8)
)

val columns = Seq("label", "feature1", "feature2")

val df: DataFrame = data.toDF(columns: _*)
df.show()

此部分使用示例数据创建了一个名为df的DataFrame,其中每一行表示一个数据点,具有标签("label")和两个特征("feature1"和"feature2")。show()方法用于显示DataFrame。

val assembler = new VectorAssembler()
  .setInputCols(Array("feature1", "feature2"))
  .setOutputCol("features")

val assembledData = assembler.transform(df)
assembledData.show()

使用VectorAssembler将"feature1"和"feature2"列组合成名为"features"的单列。结果的DataFrame存储在assembledData中,并显示出来。

val lr = new LogisticRegression()
  .setLabelCol("label")
  .setFeaturesCol("features")
  .setMaxIter(10)
  .setRegParam(0.01)

此部分创建了一个逻辑回归模型(lr)并设置了一些参数,例如标签列,特征列,最大迭代次数(setMaxIter)和正则化参数(setRegParam)。

val lrModel = lr.fit(assembledData)

使用fit方法在组合数据(assembledData)上训练逻辑回归模型。

val summary = lrModel.summary
println(s"Coefficients: ${lrModel.coefficients}")
println(s"Intercept: ${lrModel.intercept}")
println(s"Objective History: ${summary.objectiveHistory.mkString(", ")}")

        此部分输出逻辑回归模型训练的各种结果。显示了系数,截距和训练过程中目标函数的历史记录。summary对象提供了有关训练摘要的其他信息。

这里使用scala 语法相当繁琐,转换为python的语法就会简单很多

python示例

from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression



# 创建 Spark 会话
spark = SparkSession.builder \
    .appName("LogisticRegressionExample") \
    .master("spark://10.0.0.102:7077") \
.getOrCreate()

# 创建包含一些示例数据的 DataFrame
data = [
    (1.0, 0.1, 0.5),
    (0.0, 0.2, 0.6),
    (1.0, 0.3, 0.7),
    (0.0, 0.4, 0.8)
]

columns = ["label", "feature1", "feature2"]

df = spark.createDataFrame(data, columns)
df.show()

# 使用 VectorAssembler 将特征列合并成一个特征向量
assembler = VectorAssembler(inputCols=["feature1", "feature2"], outputCol="features")
assembledData = assembler.transform(df)
assembledData.show()

# 创建逻辑回归模型
lr = LogisticRegression(labelCol="label", featuresCol="features", maxIter=10, regParam=0.01)

# 拟合模型
lrModel = lr.fit(assembledData)

# 查看模型的训练结果
print("Coefficients: {}".format(lrModel.coefficients))
print("Intercept: {}".format(lrModel.intercept))
print("Objective History: {}".format(lrModel.summary.objectiveHistory()))

此时可以登录到spark web上查看任务情况

http://10.0.0.102:8081/

spark web ui 的端口信息可以通过以下方式查看 

ps -ef |grep webui-port

当资源不足时,执行代码过程中没五秒钟会输出一次提示信息(不影响代码执行)

23/12/22 00:54:47 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/265948.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Apache Flink 进阶教程(七):网络流控及反压剖析

目录 前言 网络流控的概念与背景 为什么需要网络流控 网络流控的实现&#xff1a;静态限速 网络流控的实现&#xff1a;动态反馈/自动反压 案例一&#xff1a;Storm 反压实现 案例二&#xff1a;Spark Streaming 反压实现 疑问&#xff1a;为什么 Flink&#xff08;bef…

15-Echarts简化系列之:geo 地理坐标系,地图资源基本绘制和配置项使用

Echarts版本&#xff1a;5.4.3 geo&#xff1a;地理坐标系组件用于地图的绘制&#xff0c;支持在地理坐标系上绘制散点图&#xff0c;线集。绘制地图的数据源可支持 geojson和 svg 格式。 本文章中提供 实例代码 和地图 静态资源 &#xff0c;项目以 react ts 为主&#xff0…

html旋转相册

一、实验题目 做一个旋转的3d相册&#xff0c;当鼠标停留在相册时&#xff0c;相册向四面散开 二、实验代码 <!DOCTYPE html> <html lang"zh"><head><meta charset"UTF-8"><meta http-equiv"X-UA-Compatible" con…

大一C语言查缺补漏 12.23

遗留下来的问题&#xff1a; 3-1 6 3-2 2 3-2 5 在C语言中&#xff0c;标识符的第一个字符是有什么规范吗&#xff1f; 在C语言中&#xff0c;标识符的第一个字符必须是以下两种情况之一&#xff1a; 字母&#xff08;a~z或A~Z&#xff09; 下划线 (_) 在C语言中&…

FFmpeg——视频处理工具安装以及简单命令学习。

FFmpeg 是一个免费、开源且高度可定制的多媒体处理工具&#xff0c;它是一个强大的跨平台框架&#xff0c;用于处理音频、视频、多媒体流和图像。FFmpeg 的主要功能包括解码、编码、转码、流处理、多路复用、分离、合并、过滤等&#xff0c;支持多种音视频格式&#xff0c;包括…

[Angular] 笔记 7:模块

Angular 中的模块(modules) 是代码在逻辑上的最大划分&#xff0c;它类似于C, C# 中的名字空间&#xff1a; module 可分为如下几种不同的类型&#xff1a; 使用模块的第一个原因是要对代码进行逻辑上的划分&#xff0c;第二个非常重要的原因是为了实现懒惰加载(lazy loading)&…

OpenHarmony南向之Audio

音频架构 Audio驱动框架基于HDF驱动框架实现&#xff0c;包含内核态&#xff08;KHDF&#xff09;&#xff0c;和用户态&#xff08;UHDF&#xff09;&#xff0c; 对北向提供音频HDI接口 音频框架图 驱动架构主要由以下几部分组成。 HDI adapter&#xff1a;实现Audio HAL层…

Gradle - 安装、环境变量、配置国内源、常用命令

目录 一、Gradle 1.1、安装&环境变量 1.2、配置国内源 1.3、Gradle 项目文件介绍 1.4、Gradle 中的常用指令 一、Gradle 1.1、安装&环境变量 a&#xff09;从 Gradle 官网下载对应的版本&#xff1a;Gradle | Releases 这里以 8.0 版本为例&#xff0c;下载附带…

01背包详解,状态设计,滚动数组优化,通用问题求解

文章目录 0/1背包前言一、0/1背包的状态设计1、状态设计2、状态转移方程3、初始状态4、代码实现5、滚动数组优化二维优化为两个一维二维优化为一个一维&#xff0c;倒序递推 二、0/1背包的通用问题求最大值求最小值求方案数 0/1背包 前言 0/1包问题&#xff0c;作为动态规划问…

什么是MVC?MVC框架的优势和特点

目录 一、什么是MVC 二、MVC模式的组成部分和工作原理 1、模型&#xff08;Model&#xff09; 2、视图&#xff08;View&#xff09; 3、控制器&#xff08;Controller&#xff09; 三、MVC模式的工作过程如下&#xff1a; 用户发送请求&#xff0c;请求由控制器处理。 …

少儿编程:从兴趣到升学的关键之路

随着科技的飞速发展&#xff0c;计算机编程已经逐渐渗透到我们生活的方方面面。对于新时代的少儿来说&#xff0c;掌握编程技能不仅可以开拓视野&#xff0c;提高思维能力&#xff0c;还可能成为他们未来升学和就业的重要砝码。6547网将探讨如何将少儿编程从兴趣培养成一种有力…

谷歌推大语言模型VideoPoet:文本图片皆可生成视频和音频

Google Research最近发布了一款名为VideoPoet的大型语言模型&#xff08;LLM&#xff09;&#xff0c;旨在解决当前视频生成领域的挑战。该领域近年来涌现出许多视频生成模型&#xff0c;但在生成连贯的大运动时仍存在瓶颈。现有领先模型要么生成较小的运动&#xff0c;要么在生…

图像识别与人工智能到底是何关系?有何区别?

图像识别是人工智能领域的一个重要应用领域&#xff0c;它利用人工智能技术和算法来分析和理解图像内容。图像识别是使计算机能够模拟和理解人类视觉系统的能力&#xff0c;并从图像中提取出有用的信息和特征。 人工智能在图像识别中扮演着至关重要的角色&#xff0c;主要体现…

【Sass】网易云动画播放器

简介 仿网易云播放动画 效果图 sass src/assets/style/musicPlay.sass // TODO 音乐播放器动画 // ? 动画停止class >>> .muscic-play-stop // HTML结构 // <div class"music-play"> // <div class"bg-primary"></div>…

二级分销的魅力:无限裂变创造十八亿的流水

有这么一个团队&#xff0c;仅靠这一个二级分销&#xff0c;六个月就打造了十八亿的流水。听着是不是很恐怖&#xff1f;十八亿确实是一个很大的数字&#xff0c;那么这个团队是怎么做到的呢&#xff1f;我们接着往下看。 这是一个销售减脂产品的团队。不靠网店&#xff0c;不…

运行游戏显示缺少d3dx9_42.dll怎么办,三步即可完美解决

在我们使用电脑玩游戏&#xff0c;工作的时候&#xff0c;偶尔会遇到一些错误提示&#xff0c;其中之一就是缺少d3dx9_42.dll。这个错误通常出现在运行某些游戏或应用程序时&#xff0c;它表示计算机缺少了DirectX 9组件中的d3dx9_42.dll文件。为了解决这个问题&#xff0c;下面…

【接口测试】Postman(三)-变量与集合

一、变量 ​ 变量这个概念相信大家都不陌生&#xff0c;因此在这里我们不介绍了。主要说一下在Postman中有哪几类变量&#xff0c;主要包括以下四类&#xff1a; Global&#xff08;全局&#xff09; Environment&#xff08;环境&#xff09; Local&#xff08;本地&#xf…

python打开opencv图像与QImage图像及其转化

目录 1、Qimage图像 2、opencv图像 3、python打开QImage图像通过Qlabel控件显示 4、python打开QImage图像通过opencv显示 5、python打开opencv图像并显示 6、python打开opencv图像通过Qlabel控件显示 1、Qimage图像 QImage是Qt库中用于存储和处理图像的类。它可以存储多种…

微软官方镜像下载大全(windows iso 官方镜像)

原本只是想下一个Windows Server 2022中文版的镜像&#xff0c;后面发现要么就是慢得一批的某盘&#xff0c;要么就是磁力&#xff0c;我想直接下载简简单单&#xff0c;找了一圈没有找到。官网下载需要注册、登录乱七八糟&#xff0c;最终终于找到下载方法了&#xff0c;适用于…

大型语言模型,MirrorBERT — 将模型转换为通用词汇和句子编码器

大型语言模型&#xff0c;MirrorBERT — 将模型转换为通用词汇和句子编码器 一、介绍 BERT 模型在现代 NLP 应用中发挥着基础作用&#xff0c;这已不是什么秘密。尽管它们在下游任务上表现出色&#xff0c;但大多数模型在没有微调的情况下在特定问题上并不是那么完美。从原始预…