Spark 之 解析json的复杂和嵌套数据结构

本文主要使用以下几种方法:

1,get_json_object():从一个json 字符串中根据指定的json 路径抽取一个json 对象

2,from_json():从一个json 字符串中按照指定的schema格式抽取出来作为DataFrame的列

3,to_json():将获取的数据转化为json格式

4,explode():炸裂成多行

5,selectExpr():将列转化为一个JSON对象的另一种方式


文件名是 mystudent.txt   具体内容如下,只有一条数据

1|{"dept":{"describe":"主要负责教学","name":"学术部"},"email":"zhangsan@edu.cn","id":79,"name":"zhangsan","stus":[{"grade":"三年级","id":12,"name":"xuesheng1","school":{"address":"南京","leader":"王总","name":"南京大学"}},{"grade":"三年级","id":3,"name":"xuesheng2","school":{"address":"南京","leader":"王总","name":"南京大学"}},{"grade":"三年级","id":1214,"name":"xuesheng3","school":{"address":"南京","leader":"王总","name":"南京大学"}}],"tel":"1585050XXXX"}

 大概是这样的结构:

  

 第一步:导入文件并分割成二元组转换成两列

val optionRDD: RDD[String] = sc.textFile("in/mystudent.txt")
optionRDD.foreach(println)

//分割,注意  |  用的是单引号
val option1: RDD[(String, String)] = optionRDD.map(x => {
      val arr = x.split('|');
      (arr(0), arr(1))
    })
option1.foreach(println)

//转化成两列
val jsonStrDF: DataFrame = option1.toDF("aid", "value")
        jsonStrDF.printSchema()
        jsonStrDF.show(false)

 第二步:按照几个大类先拆分

 val jsonObj: DataFrame = jsonStrDF.select(
      $"aid"
      , get_json_object($"value", "$.dept").as("dept")
      , get_json_object($"value", "$.email").as("email")
      , get_json_object($"value", "$.id").as("tid")
      , get_json_object($"value", "$.name").as("tname")
      , get_json_object($"value", "$.stus").as("stus")
      , get_json_object($"value", "$.tel").as("tel")
    )
    println("--------------------------1--------------------------")
    jsonObj.printSchema()
    jsonObj.show(false)

 第三步:把dept这个部分再分

val jsonObj2: DataFrame = jsonObj.select($"aid", $"email"
      , $"tid", $"tname"
      , get_json_object($"dept", "$.describe").as("describe")
      , get_json_object($"dept", "$.name").as("dname")
      , $"stus", $"tel"
    )
    println("--------------------------2--------------------------")
        jsonObj2.printSchema()
        jsonObj2.show(false)

 第四步:把stus这部分合并成数组

val fileds: List[StructField] =
      StructField("grade", StringType) ::
        StructField("id", StringType) ::
        StructField("name", StringType) ::
        StructField("school", StringType) :: Nil
val jsonObj3: DataFrame = jsonObj2.select(
      $"aid", $"describe", $"dname", $"email", $"tid", $"tname"
      , from_json($"stus", ArrayType(
        StructType(
          fileds
        )
      )
      ).as("events")
    )
    println("--------------------------3--------------------------")
    jsonObj3.printSchema()
    jsonObj3.show(false)

 第五步:explode炸裂stus 部分,分成三部分;并新增列,删除原数组数据

//炸裂
val jsonObj4: DataFrame = jsonObj3.withColumn("events", explode($"events"))
    println("--------------------------4--------------------------")
    jsonObj4.printSchema()
    jsonObj4.show(false)

//新增列,删除原数据
val jsonObj5: DataFrame = jsonObj4.withColumn("grade", $"events.grade")
      .withColumn("id", $"events.id")
      .withColumn("name", $"events.name")
      .withColumn("school", $"events.school")
      .drop("events")
    println("--------------------------5--------------------------")
    jsonObj5.printSchema()
    jsonObj5.show(false)

 第六步:分开school部分,并合并全表

val jsonObj6: DataFrame = jsonObj5.select($"aid", $"describe"
      , $"dname", $"email",$"tid",$"tname",$"grade",$"id",$"name",
      get_json_object($"school","$.address").as("address")
      ,get_json_object($"school","$.leader").as("leader")
    ,get_json_object($"school","$.name").as("schoolname"))
    println("--------------------------6--------------------------")
    jsonObj6.printSchema()
    jsonObj6.show(false)

 总结,全文代码如下:

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{DataFrame, SparkSession}

object JsonMyStu {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("jsonstu3opdemo").setMaster("local[*]")
    val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
    val sc: SparkContext = spark.sparkContext

    import spark.implicits._
    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.types._

    val optionRDD: RDD[String] = sc.textFile("in/mystudent.txt")
    optionRDD.foreach(println)


//按照 | 分割成两列
    val option1: RDD[(String, String)] = optionRDD.map(x => {
      val arr = x.split('|');
      (arr(0), arr(1))
    })
    option1.foreach(println)

    val jsonStrDF: DataFrame = option1.toDF("aid", "value")
        jsonStrDF.printSchema()
        jsonStrDF.show(false)

    val jsonObj: DataFrame = jsonStrDF.select(
      $"aid"
      , get_json_object($"value", "$.dept").as("dept")
      , get_json_object($"value", "$.email").as("email")
      , get_json_object($"value", "$.id").as("tid")
      , get_json_object($"value", "$.name").as("tname")
      , get_json_object($"value", "$.stus").as("stus")
      , get_json_object($"value", "$.tel").as("tel")
    )
    println("--------------------------1--------------------------")
    jsonObj.printSchema()
    jsonObj.show(false)

    val jsonObj2: DataFrame = jsonObj.select($"aid", $"email"
      , $"tid", $"tname"
      , get_json_object($"dept", "$.describe").as("describe")
      , get_json_object($"dept", "$.name").as("dname")
      , $"stus", $"tel"
    )
    println("--------------------------2--------------------------")
        jsonObj2.printSchema()
        jsonObj2.show(false)

    val fileds: List[StructField] =
      StructField("grade", StringType) ::
        StructField("id", StringType) ::
        StructField("name", StringType) ::
        StructField("school", StringType) :: Nil
    val jsonObj3: DataFrame = jsonObj2.select(
      $"aid", $"describe", $"dname", $"email", $"tid", $"tname"
      , from_json($"stus", ArrayType(
        StructType(
          fileds
        )
      )
      ).as("events")
    )
    println("--------------------------3--------------------------")
    jsonObj3.printSchema()
    jsonObj3.show(false)

    val jsonObj4: DataFrame = jsonObj3.withColumn("events", explode($"events"))
    println("--------------------------4--------------------------")
    jsonObj4.printSchema()
    jsonObj4.show(false)

    val jsonObj5: DataFrame = jsonObj4.withColumn("grade", $"events.grade")
      .withColumn("id", $"events.id")
      .withColumn("name", $"events.name")
      .withColumn("school", $"events.school")
      .drop("events")
    println("--------------------------5--------------------------")
    jsonObj5.printSchema()
    jsonObj5.show(false)

    val jsonObj6: DataFrame = jsonObj5.select($"aid", $"describe"
      , $"dname", $"email",$"tid",$"tname",$"grade",$"id",$"name",
      get_json_object($"school","$.address").as("address")
      ,get_json_object($"school","$.leader").as("leader")
    ,get_json_object($"school","$.name").as("schoolname"))
    println("--------------------------6--------------------------")
    jsonObj6.printSchema()
    jsonObj6.show(false)

  }
}

拓展:

//如果分割符是  ,  则用以下方法,indexOf返回第一个此元素的下标值
    /*val optinRDD: RDD[String] = sc.textFile("in/mystudent.txt")
    optinRDD.foreach(println)
    val frame: RDD[(String, String)] = optinRDD.map(
      x => {
        //返回第一个,所在的位置
        val i: Int = x.indexOf(",")//1
        
        //开始截取
        //(0,i)--->(0,1)
        //(i+1) 2 从下标元素开始到末尾
        val tuple: (String, String) = (x.substring(0, i), x.substring(i + 1))
        tuple
      }
    )*/

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/10070.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【SpringMVC】第一个springmvc项目

需求: 用户在页面发起一个请求, 请求交给springmvc的控制器对象,并显示请求的处理结果(在结果页面显示一个欢迎语句)。 实现步骤: 新建web maven工程 加入依赖 spring-webmvc依赖,间接把spri…

FLINK 在蚂蚁大规模金融场景的平台建设

摘要:本文整理自蚂蚁集团高级技术专家、蚂蚁集团流计算平台负责人李志刚,在 Flink Forward Asia 2022 平台建设专场的分享。本篇内容主要分为四个部分: 主要挑战架构方案核心技术介绍未来规划点击查看直播回放和演讲 PPT 一、主要挑战 1.1 金…

【 Spring MVC 核心功能(三) - 输出数据】

文章目录引言一、返回静态页面二、返回非静态页面的数据三、返回 JSON 对象四、请求转发(forward)和请求重定向(redirect)五、拓展:IDEA 热部署(热加载)3.1 添加 SpringBoot DevTools 框架3.2 开起 IDEA 的自动编译3.3 开起运行中的热部署3.4 使用 debug 启动项目引…

【机器学习】SoftMax多分类---学习笔记

SoftMax---学习笔记softMax分类函数定义:softmax分类损失函数softMax分类函数 首先给一个图,这个图比较清晰地告诉大家softmax是怎么计算的。 (图片来自网络) 定义: 给定以歌nknknk矩阵W(w1,w2,...,wk)W(w_1,w_2,...,w_k)W(w1​,w2​,...,w…

Arcgis小技巧【12】——ArcGIS标注的各种用法和示例

标注是将描述性文本放置在地图中的要素上或要素旁的过程。 本文整理了ArcGIS中的各种标注方法、可能遇到的问题和细节,内容比较杂,想到哪写到哪。 一、正常标注某一字段值的内容 右键点击【属性】,在【标注】选项卡下勾选【标注此图层中的的…

Python 小型项目大全 1~5

一、百吉饼 原文:http://inventwithpython.com/bigbookpython/project1.html 在百吉饼这种演绎逻辑游戏中,你必须根据线索猜出一个秘密的三位数。该游戏提供以下提示之一来响应您的猜测:"Pico",当您的猜测在错误的位置有…

【SpringMVC】7—文件上传

⭐⭐⭐⭐⭐⭐ Github主页👉https://github.com/A-BigTree 笔记链接👉https://github.com/A-BigTree/Code_Learning ⭐⭐⭐⭐⭐⭐ 如果可以,麻烦各位看官顺手点个star~😊 如果文章对你有所帮助,可以点赞👍…

布隆过滤器讲解及基于Guava BloomFilter案例

目录 1、布隆过滤器是什么 2、主要作用 3、存储过程 4、查询过程 5、布隆过滤器的删除操作 6、优点 7、缺点 8、测试误判案例 8.1、引入Guava依赖 8.2、编写测试代码 8.3、测试 8.4、BloomFilter实现原理 9、总结 推荐博主视频,讲的很棒:布隆…

华为运动健康服务Health Kit 6.10.0版本新增功能速览!

华为运动健康服务(HUAWEI Health Kit)6.10.0 版本新增的能力有哪些? 阅读本文寻找答案,一起加入运动健康服务生态大家庭! 一、 支持三方应用查询用户测量的连续血糖数据 符合申请Health Kit服务中开发者申请资质要求…

大数据项目之电商数据仓库系统回顾

文章目录一、实训课题二、实训目的三、操作环境四、 实训过程(实训内容及主要模块)五、实训中用到的课程知识点六、实训中遇到的问题及解决方法七、课程实训体会与心得八、程序清单一、实训课题 大数据项目之电商数据仓库系统 二、实训目的 完成一个电…

7.基于概率距离快速削减法的风光场景生成与削减方法

matlab代码:基于概率距离快速削减法的风光场景生成与削减方法 采用蒙特卡洛进行场景生成,并再次进行场景缩减。 clear;clc; %风电出力预测均值E W[5.8,6.7,5.8,5.1,6.3,5,6.2,6,4.1,6,7,6.8,6.5,6.9,5,5.6,6,5.8,6.2,4.7,3.3,4.4,5.6,5]; %取标准差为风…

在unreal中的基于波叠加的波浪水面材质原理和制作

关于水的渲染模型 如何渲染出真实的水体和模拟,是图形学,游戏开发乃至仿真领域很有意思的一件事 记得小时候玩《Command & Conquer: Red Alert 3》,被当时的水面效果深深震撼,作为一款2008年出的游戏,现在想起它…

算法:将一个数组旋转k步

题目 输入一个数组如 [1,2,3,4,5,6,7],输出旋转 k 步后的数组。 旋转 1 步:就是把尾部的 7 放在数组头部前面,也就是 [7,1,2,3,4,5,6]旋转 2 步:就是把尾部的 6 放在数组头部前面,也就是 [6,7,1,2,3,4,5]… 思路 思…

C++继承(上)

一、继承的概念及定义1.继承的概念2.继承定义2.1定义格式2.2继承关系和访问限定符2.3继承基类成员访问方式的变化二、基类和派生类对象赋值转换三、继承中的作用域一、继承的概念及定义 1.继承的概念 继承机制是面向对象程序设计使代码可以复用的最重要的手段,它允…

聊聊如何运用JAVA注解处理器(APT)

什么是APT APT(Annotation Processing Tool)它是Java编译期注解处理器,它可以让开发人员在编译期对注解进行处理,通过APT可以获取到注解和被注解对象的相关信息,并根据这些信息在编译期按我们的需求生成java代码模板或…

【SQL Server】数据库开发指南(一)数据库设计

文章目录一、数据库设计的必要性二、什么是数据库设计三、数据库设计的重要性五、数据模型5.1 实体-关系(E-R)数据模型5.2 实体(Entity)5.3 属性(Attribute)5.5 关系(Relationship)六…

和ChatGPT-4聊完后,我觉得一切可能已经来不及了

了然无味,晴空万里!和ChatGPT-4开始了一场坦诚的沟通,它全程都表现出高情商,以及不断尽量安抚我的情绪,而这,恰恰令我脊背发凉。 部分文字截取 ZM:我能不能理解每次对话就是一次你的“生命” G&…

LeetCode刷题6:二叉树篇之第 1 节

提示1:本篇先带大家了解二叉树的基础理论,后给出4道基础题目,不难,冲啊~ 算法刷题系列 LeetCode刷题1:数组篇LeetCode刷题2:链表篇LeetCode刷题3:哈希篇LeetCode刷题4:字符串篇Lee…

1678_计算机架构黄金时代_文章阅读

全部学习汇总: GreyZhang/g_risc_v: Learning notes about RISC V. (github.com) 看了一份几年前的文章,觉得还是挺有收获的,因此做一个简单的整理。 对于架构有很大影响的主要考虑四点:专用硬件的实现、高安全性的要求、开放指令…

【Pandas】① Pandas 数据处理基础

介绍 Pandas 是非常著名的开源数据处理库,我们可以通过它完成对数据集进行快速读取、转换、过滤、分析等一系列操作。除此之外,Pandas 拥有强大的缺失数据处理与数据透视功能,可谓是数据预处理中的必备利器。 知识点 数据类型数据读取数据选择…