文章目录
1. 打开项目 2. 查看数据集 2.1 查看JSON格式数据 2.2 查看CSV格式数据 2.3 查看TXT格式数据
3. 添加单元测试依赖 4. 创建数据加载与保存对象 4.1 创建Spark会话对象 4.2 创建加载JSON数据方法 4.3 创建加载CSV数据方法 4.4 创建加载Text数据方法 4.5 创建加载JSON数据扩展方法 4.6 创建加载CSV数据扩展方法 4.7 创建加载Text数据扩展方法 4.8 创建保存文本文件方法 4.9 查看程序完整代码
5. 实战小结
1. 打开项目
打开SparkSQLDataSource
项目
2. 查看数据集
2.1 查看JSON格式数据
查看users.json
文件
{ "name" : "李小玲" , "gender" : "女" , "age" : 45 }
{ "name" : "童安格" , "gender" : "男" , "age" : 26 }
{ "name" : "陈燕文" , "gender" : "女" , "age" : 18 }
{ "name" : "王晓明" , "gender" : "男" , "age" : 32 }
{ "name" : "张丽华" , "gender" : "女" , "age" : 29 }
{ "name" : "刘伟强" , "gender" : "男" , "age" : 40 }
{ "name" : "赵静怡" , "gender" : "女" , "age" : 22 }
{ "name" : "孙强东" , "gender" : "男" , "age" : 35 }
2.2 查看CSV格式数据
查看users.csv
文件
name, gender, age
李小玲, 女, 45
童安格, 男, 26
陈燕文, 女, 18
王晓明, 男, 32
张丽华, 女, 29
刘伟强, 男, 40
赵静怡, 女, 22
孙强东, 男, 35
2.3 查看TXT格式数据
查看users.txt
文件
李小玲 女 45
童安格 男 26
陈燕文 女 18
王晓明 男 32
张丽华 女 29
刘伟强 男 40
赵静怡 女 22
孙强东 男 35
3. 添加单元测试依赖
在pom.xml
里添加单元测试框架依赖
< dependency>
< groupId> junit</ groupId>
< artifactId> junit</ artifactId>
< version> 4.13.2</ version>
</ dependency>
刷新项目依赖
4. 创建数据加载与保存对象
创建net.huawei.practice
包 在practice
子包里创建DataLoadAndSave
对象 创建DataLoadAndSave
伴生类
4.1 创建Spark会话对象
创建spark
常量
val spark = SparkSession. builder( )
. appName( "DataLoadAndSave" )
. master( "local[*]" )
. getOrCreate( )
4.2 创建加载JSON数据方法
创建loadJSONData()
方法
def loadJSONData( filePath: String ) : DataFrame = {
spark. read. json( filePath)
}
在伴生类里创建单元测试方法testLoadJSONData()
方法
@Test
def testLoadJSONData( ) : Unit = {
val df = DataLoadAndSave. loadJSONData( "data/users.json" )
df. show( )
}
运行testLoadJSONData()
测试方法,查看结果
4.3 创建加载CSV数据方法
创建loadCSVData()
方法
def loadCSVData( filePath: String ) : DataFrame = {
spark. read
. option( "header" , "true" )
. option( "inferSchema" , "true" )
. csv( filePath)
}
在伴生类里创建单元测试方法testLoadCSVData()
方法
@Test
def testLoadCSVData( ) : Unit = {
val df = DataLoadAndSave. loadCSVData( "data/users.csv" )
df. show( )
}
运行testLoadCSVData()
测试方法,查看结果
4.4 创建加载Text数据方法
创建loadTextData()
方法
def loadTextData( filePath: String ) : DataFrame = {
spark. read. text( filePath)
}
在伴生类里创建单元测试方法testLoadTextData()
方法 运行testLoadTextData()
测试方法,查看结果
4.5 创建加载JSON数据扩展方法
创建loadJSONDataExpand()
方法
def loadJSONDataExpand( filePath: String ) : DataFrame = {
spark. read. format( "json" ) . load( filePath)
}
在伴生类里创建单元测试方法testLoadJSONDataExpand()
方法 运行testLoadJSONDataExpand()
测试方法,查看结果
4.6 创建加载CSV数据扩展方法
创建loadCSVDataExpand()
方法
def loadCSVDataExpand( filePath: String ) : DataFrame = {
spark. read. format( "csv" )
. option( "header" , "true" )
. option( "inferSchema" , "true" )
. load( filePath)
}
在伴生类里创建单元测试方法testLoadCSVDataExpand()
方法 运行testLoadCSVDataExpand()
测试方法,查看结果
4.7 创建加载Text数据扩展方法
创建loadTextDataExpand()
方法
def loadTextDataExpand( filePath: String ) : DataFrame = {
spark. read. format( "text" ) . load( filePath)
}
在伴生类里创建单元测试方法testLoadTextDataExpand()
方法 运行testLoadTextDataExpand()
测试方法,查看结果
4.8 创建保存文本文件方法
创建saveTextFile()
方法
def saveTextFile( inputPath: String , outputPath: String ) : Unit = {
val df = spark. read. format( "text" ) . load( inputPath)
df. write. mode( "overwrite" ) . format( "text" ) . save( outputPath)
}
在伴生类里创建单元测试方法testSaveTextFile()
方法 运行testSaveTextFile()
测试方法,查看结果
4.9 查看程序完整代码
package net. huawei. practice
import org. apache. spark. sql. { DataFrame, SparkSession}
import org. junit. Test
object DataLoadAndSave {
val spark = SparkSession. builder( )
. appName( "DataLoadAndSave" )
. master( "local[*]" )
. getOrCreate( )
def loadJSONData( filePath: String ) : DataFrame = {
spark. read. json( filePath)
}
def loadCSVData( filePath: String ) : DataFrame = {
spark. read
. option( "header" , "true" )
. option( "inferSchema" , "true" )
. csv( filePath)
}
def loadTextData( filePath: String ) : DataFrame = {
spark. read. text( filePath)
}
def loadJSONDataExpand( filePath: String ) : DataFrame = {
spark. read. format( "json" ) . load( filePath)
}
def loadCSVDataExpand( filePath: String ) : DataFrame = {
spark. read. format( "csv" )
. option( "header" , "true" )
. option( "inferSchema" , "true" )
. load( filePath)
}
def loadTextDataExpand( filePath: String ) : DataFrame = {
spark. read. format( "text" ) . load( filePath)
}
def saveTextFile( inputPath: String , outputPath: String ) : Unit = {
val df = spark. read. format( "text" ) . load( inputPath)
df. write. mode( "overwrite" ) . format( "text" ) . save( outputPath)
}
}
class DataLoadAndSave {
@Test
def testLoadJSONData( ) : Unit = {
val df = DataLoadAndSave. loadJSONData( "data/users.json" )
df. show( )
}
@Test
def testLoadCSVData( ) : Unit = {
val df = DataLoadAndSave. loadCSVData( "data/users.csv" )
df. show( )
}
@Test
def testLoadTextData( ) : Unit = {
val df = DataLoadAndSave. loadTextData( "data/users.txt" )
df. show( )
}
@Test
def testLoadJSONDataExpand( ) : Unit = {
val df = DataLoadAndSave. loadJSONDataExpand( "data/users.json" )
df. show( )
}
@Test
def testLoadCSVDataExpand( ) : Unit = {
val df = DataLoadAndSave. loadCSVDataExpand( "data/users.csv" )
df. show( )
}
@Test
def testLoadTextDataExpand( ) : Unit = {
val df = DataLoadAndSave. loadTextDataExpand( "data/users.txt" )
df. show( )
}
@Test
def testSaveTextFile( ) : Unit = {
DataLoadAndSave. saveTextFile( "data/users.txt" , "result/users" )
}
}
5. 实战小结
在本次实战中,我们通过SparkSQLDataSource
项目深入学习了如何使用Spark SQL加载和保存不同格式的数据。首先,我们查看了JSON、CSV和TXT格式的数据集,并通过DataLoadAndSave
对象实现了数据的加载与保存功能。我们创建了多个方法,如loadJSONData()
、loadCSVData()
和loadTextData()
,分别用于加载不同格式的数据,并通过单元测试验证了这些方法的正确性。此外,我们还扩展了数据加载方法,使用format()
方法灵活加载数据,并实现了数据保存功能,如saveTextFile()
方法,将数据保存为文本文件。通过本次实战,我们掌握了Spark SQL处理多种数据格式的基本操作,为后续的数据处理和分析打下了坚实基础。