使用spark进行hbase的bulkload

使用spark进行hbase的bulkload

一、 背景

HBase 是一个面向列,schemaless,高吞吐,高可靠可水平扩展的 NoSQL 数据库,用户可以通过 HBase client 提供的 put get 等 api 实现在数据的实时读写。在过去的几年里,HBase 有了长足的发展,它在越来越多的公司里扮演者越来越重要的角色。
HBase 擅长于海量数据的实时读取,原生 HBase 没有二级索引,复杂查询场景支持的不好。同时因为 split,磁盘,网络抖动,Java GC 等多方面的因素会影响其 RT 表现,所以通常我们在使用HBase的同时也会使用其他的存储中间件,比如 ES,Reids,Mysql 等等。避免 HBase 成为信息孤岛,我们需要数据导入导出的工具在这些中间件之间做数据迁移,而最常用的莫过于阿里开源的 DataX。Datax从 其他数据源迁移数据到 HBase 实际上是走的 HBase 原生 api 接口,在少量数据的情况下没有问题,但当我们需要从 Hive 里,或者其他异构存储里批量导入几亿,几十亿的数据,那么用 DataX 这里就显得不那么适合,因为走原生接口为了避免影响生产集群的稳定性一定要做好限流,那么海量数据的迁移就很很慢,同时数据的持续写入会因为 flush,compaction 等机制占用较多的系统资源。为了解决批量导入的场景,Bulkload 应运而生。

二、HBase Bulkload
在大量数据需要写入HBase时,通常有 put方式和bulkLoad 两种方式。

1、put方式为单条插入,在put数据时会先将数据的更新操作信息和数据信息 写入WAL ,
在写入到WAL后, 数据就会被放到MemStore中 ,当MemStore满后数据就会被 flush到磁盘
(即形成HFile文件) ,在这种写操作过程会涉及到flush、split、compaction等操作,容易造
成节点不稳定,数据导入慢,耗费资源等问题,在海量数据的导入过程极大的消耗了系统
性能,避免这些问题最好的方法就是使用BulkLoad的方式来加载数据到HBase中。


2、BulkLoader利用HBase数据按照HFile格式存储在HDFS的原理,使用MapReduce直接批量
生成HFile格式文件后,RegionServers再将HFile文件移动到相应的Region目录下。

  • Extract,异构数据源数据导入到 HDFS 之上。
  • Transform,通过用户代码,可以是 MR 或者 Spark 任务将数据转化为 HFile。
  • Load,HFile 通过 loadIncrementalHFiles 调用将 HFile 放置到 Region 对应的 HDFS 目录上,该过程可能涉及到文件切分。

 三、实践

hive表


 

 hbase表

 依赖

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

<properties>

        <maven.compiler.source>1.8</maven.compiler.source>

        <maven.compiler.target>1.8</maven.compiler.target>

        <encoding>UTF-8</encoding>

        <hadoop.version>2.6.0-cdh5.16.2</hadoop.version>

        <log4j.version>1.7.30</log4j.version>

        <zk.version>3.4.5-cdh5.16.2</zk.version>

        <scala.version>2.12.10</scala.version>

        <scala.tools.version>2.12</scala.tools.version>

        <spark.version>3.2.0</spark.version>

        <hbase.version>1.2.0-cdh5.16.2</hbase.version>

        <config.version>1.4.0</config.version>

    </properties>

     

    <repositories>

        <repository>

            <id>nexus-aliyun</id>

            <url>http://maven.aliyun.com/nexus/content/groups/public</url>

        </repository>

        <repository>

            <id>cloudera</id>

            <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>

        </repository>

    </repositories>

    <dependencies>

        <dependency>

            <groupId>org.apache.hadoop</groupId>

            <artifactId>hadoop-client</artifactId>

            <version>${hadoop.version}</version>

        </dependency>

        <dependency>

            <groupId>org.slf4j</groupId>

            <artifactId>slf4j-log4j12</artifactId>

            <version>${log4j.version}</version>

        </dependency>

        <dependency>

            <groupId>org.apache.zookeeper</groupId>

            <artifactId>zookeeper</artifactId>

            <version>${zk.version}</version>

        </dependency>

        <dependency>

            <groupId>org.apache.spark</groupId>

            <artifactId>spark-core_${scala.tools.version}</artifactId>

            <version>${spark.version}</version>

        </dependency>

        <dependency>

            <groupId>org.apache.hbase</groupId>

            <artifactId>hbase-client</artifactId>

            <version>${hbase.version}</version>

        </dependency>

        <dependency>

            <groupId>org.apache.hbase</groupId>

            <artifactId>hbase-server</artifactId>

            <version>${hbase.version}</version>

        </dependency>

        <dependency>

            <groupId>org.apache.spark</groupId>

            <artifactId>spark-sql_${scala.tools.version}</artifactId>

            <version>${spark.version}</version>

        </dependency>

         

    </dependencies>

spark 代码

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

package com.jojo

import org.apache.hadoop.conf.Configuration

import org.apache.hadoop.fs.{FileSystem, Path}

import org.apache.hadoop.hbase.{HBaseConfiguration, HColumnDescriptor, HTableDescriptor, KeyValue, TableName}

import org.apache.hadoop.hbase.client.{ConnectionFactory, Result}

import org.apache.hadoop.hbase.io.ImmutableBytesWritable

import org.apache.hadoop.hbase.mapreduce.{HFileOutputFormat2, LoadIncrementalHFiles, TableOutputFormat}

import org.apache.hadoop.hbase.util.Bytes

import org.apache.hadoop.mapreduce.Job

import org.apache.spark.rdd.RDD

import org.apache.spark.{SparkConf, SparkContext}

/**

 * Description:Hbase批量加载   同一列族多列

 */

object HbaseBulkLoadApp {

  val zookeeperQuorum = "cdh01,cdh02,cdh03"//zookeeper信息

  val dataSourcePath = "hdfs://cdh03:8020/user/hive/warehouse/sample_07" //源文件

  val hFilePath = "hdfs://cdh03:8020/tmp/result"//hfile的存储路径

  val hdfsRootPath = "hdfs://cdh03:8020/"//根路径

  val tableName = "sample_07"//表名

  val familyName = "basic"//列族

  val arr = Array("code","description""total_emp","salary")//列的名字集合

  def main(args: Array[String]): Unit = {

    //获取content

    val sparkConf = new SparkConf()

      .setAppName(s"${this.getClass.getSimpleName}")

      .setMaster("local")

      //指定序列化格式,默认是java序列化

      .set("spark.serializer""org.apache.spark.serializer.KryoSerializer")

      //告知哪些类型需要序列化

      .registerKryoClasses(Array(classOf[ImmutableBytesWritable], classOf[Result]))

    val sc = new SparkContext(sparkConf)

    //hadoop配置

    val hadoopConf = new Configuration()

    hadoopConf.set("fs.defaultFS", hdfsRootPath)

    //获取输出路径

    val fileSystem = FileSystem.get(hadoopConf)

    //获取hbase配置

    val hconf = HBaseConfiguration.create()

    //设置zookeeper集群

    hconf.set("hbase.zookeeper.quorum", zookeeperQuorum)

    //设置端口

    hconf.set("hbase.zookeeper.property.clientPort""2181");

    //设置hfile最大个数

    hconf.set("hbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily","3200")

    //设置hfile的大小

    hconf.set("hbase.hregion.max.filesize","10737418240")

    hconf.set(TableOutputFormat.OUTPUT_TABLE, tableName)

    //获取hbase连接

    val hbaseConn = ConnectionFactory.createConnection(hconf)

    val admin = hbaseConn.getAdmin

    /**

     * 保存生成的HFile文件

     * 注:bulk load  生成的HFile文件需要落地

     * 然后再通过LoadIncrementalHFiles类load进Hbase

     * 此处关于  sortBy 操作详解:

     * 0. Hbase查询是根据rowkey进行查询的,并且rowkey是有序,

     * 某种程度上来说rowkey就是一个索引,这是Hbase查询高效的一个原因,

     * 这就要求我们在插入数据的时候,要插在rowkey该在的位置。

     * 1. Put方式插入数据,会有WAL,同时在插入Hbase的时候会根据RowKey的值选择合适的位置,此方式本身就可以保证RowKey有序

     * 2. bulk load 方式没有WAL,它更像是hive通过load方式直接将底层文件HFile移动到制定的Hbase路径下,所以,在不东HFile的情况下,要保证本身有序才行

     * 之前写的时候只要rowkey有序即可,但是2.0.2版本的时候发现clounm也要有序,所以会有sortBy(x => (x._1, x._2.getKeyString), true)

     *

     * @param hfileRDD

     */

    // 0. 准备程序运行的环境

    // 如果 HBase 表不存在,就创建一个新表

    if (!admin.tableExists(TableName.valueOf(tableName))) {

      val desc = new HTableDescriptor(TableName.valueOf(tableName))

      val hcd = new HColumnDescriptor(familyName)

      desc.addFamily(hcd)

      admin.createTable(desc)

      print("创建了一个新表")

    }

    // 如果存放 HFile文件的路径已经存在,就删除掉

    if(fileSystem.exists(new Path(hFilePath))) {

      fileSystem.delete(new Path(hFilePath), true)

      print("删除hdfs上存在的路径")

    }

    // 1. 清洗需要存放到 HFile 中的数据,rowKey 一定要排序,否则会报错:

    // java.io.IOException: Added a key not lexically larger than previous.

    val data: RDD[(ImmutableBytesWritable, Seq[KeyValue])] = sc.textFile(dataSourcePath)

      .map(row => {

        // 处理数据的逻辑

        val arrs = row.split("\t")

        var kvlist: Seq[KeyValue] = List()//存储多个列

        var rowkey: Array[Byte] = null

        var cn: Array[Byte] = null

        var v: Array[Byte] = null

        var kv: KeyValue = null

        val cf = familyName.getBytes //列族

        rowkey = Bytes.toBytes(arrs(0)) //key

        for (i <- 1 to (arrs.length - 1)) {

          cn = arr(i).getBytes() //列的名称

          v = Bytes.toBytes(arrs(i)) //列的值

          //将rdd转换成HFile需要的格式,上面定义了Hfile的key是ImmutableBytesWritable,那么我们定义的RDD也是要以ImmutableBytesWritable的实例为key

          kv = new KeyValue(rowkey, cf, cn, v) //封装一下 rowkey, cf, clounmVale, value

          kvlist = kvlist :+ kv //将新的kv加在kvlist后面(不能反 需要整体有序)

        }

        (new ImmutableBytesWritable(rowkey), kvlist)

      })

    val hfileRDD: RDD[(ImmutableBytesWritable, KeyValue)] = data

      .flatMapValues(_.iterator)

    // 2. Save Hfiles on HDFS

    val table = hbaseConn.getTable(TableName.valueOf(tableName))

    val job = Job.getInstance(hconf)

    job.setMapOutputKeyClass(classOf[ImmutableBytesWritable])

    job.setMapOutputValueClass(classOf[KeyValue])

    HFileOutputFormat2.configureIncrementalLoadMap(job, table)

    hfileRDD

      .sortBy(x => (x._1, x._2.getKeyString), true//要保持 整体有序

      .saveAsNewAPIHadoopFile(hFilePath,

        classOf[ImmutableBytesWritable],

        classOf[KeyValue],

        classOf[HFileOutputFormat2],

        hconf)

    print("成功生成HFILE")

    val bulkLoader = new LoadIncrementalHFiles(hconf)

    val regionLocator = hbaseConn.getRegionLocator(TableName.valueOf(tableName))

    bulkLoader.doBulkLoad(new Path(hFilePath), admin, table, regionLocator)

    hbaseConn.close()

    sc.stop()

  }

}

 其中可能遇到的问题:

1

EndOfStreamException: Unable to read additional data from server sessionid 0x17f44ca01833e45, likely server has closed socket

 解决:

  主要是zk的版本不匹配,在依赖选择匹配的zk版本。

输出结果

https://www.cnblogs.com/huangguoming/articles/12967868.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/45378.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

主机漏洞利用演示MS17-010(永恒之蓝)

ms17-010危害&#xff1a;对被攻击方的电脑造成蓝屏&#xff01; 申明&#xff1a;本篇文章的用意仅做学习使用 网络搭建环境&#xff1a; 软件&#xff1a;Vmware Workstation 17 攻击机&#xff1a;Kali 靶机环境&#xff1a;Windows 7 Nmap软件的基本功能&#xff1a; …

ONNX Runtime 加速深度学习(C++ 、python)详细介绍

ONNX Runtime 加速深度学习(C 、python)详细介绍 本文在 https://blog.csdn.net/u013250861/article/details/127829944 基础上进行了更改&#xff0c;感谢原作&#xff01; ONNXRuntime(Open Neural Network Exchange)是微软推出的一款针对ONNX模型格式的推理框架&#xff0c…

3DVR全景旅游,最新数字化智慧文旅

导语&#xff1a; 随着科技的飞速发展&#xff0c;3DVR全景旅游正以其独特的特点和无限的优势&#xff0c;成为当今智慧文旅的领航者。穿戴上VR设备&#xff0c;只需一个轻轻的点击&#xff0c;你将被带入一个全新的数字世界&#xff0c;领略美景、探索奇迹。让我们一起深入了…

第119天:免杀对抗-二开CSShellcode函数修改生成模版修改反编译重打包(下)

知识点 #知识点&#xff1a; 1、CS-表面特征消除 2、CS-HTTP流量特征消除 3、CS-Shellcode特征消除#章节点&#xff1a; 编译代码面-ShellCode-混淆 编译代码面-编辑执行器-编写 编译代码面-分离加载器-编写 程序文件面-特征码定位-修改 程序文件面-加壳花指令-资源 代码加载面…

SSM企业固定资产智能管理系统的设计与实现【纯干货分享,M免费领取源码06298】

摘要 信息化社会内需要与之针对性的信息获取途径&#xff0c;但是途径的扩展基本上为人们所努力的方向&#xff0c;由于站在的角度存在偏差&#xff0c;人们经常能够获得不同类型信息&#xff0c;这也是技术最为难以攻克的课题。针对企业固定资产智能管理系统等问题&#xff0c…

springboot 项目启动不打印spring 启动日志

今天项目遇到一个很奇怪的问题&#xff0c;服务在启动时&#xff0c;不打印spring 的启动日志。经过排查发现是因为其他的依赖引入了 log4j 的依赖&#xff0c;因为我们的项目用的是logback&#xff0c;所以项目中没有log4j 的相关配置&#xff0c;所以干扰到了日志的打印 原因…

删除主表 子表外键没有索引的性能优化

整个表147M&#xff0c;执行时一个CPU耗尽&#xff0c; buffer gets 超过1个G&#xff0c; 启用并行也没有用 今天开发的同事问有个表上的数据为什么删不掉&#xff1f;我看了一下&#xff0c;也就不到100000条数据&#xff0c;表上有外键&#xff0c;等了5分钟hang在那里&…

python:基于GeoPandas和GeoViews库将GEDI激光高程数据映射到交互式地图

作者:CSDN @ _养乐多_ 本文将介绍 GEDI(Global Ecosystem Dynamics Investigation)激光雷达数据某数据点波形数据提取,并绘制图表,添加其他图表元素并使图表具有交互性。 在本文中,我们将探索如何打开、读取和处理GEDI数据,并利用地理信息处理库GeoPandas和地理空间数…

DevOps自动化平台开发之 Shell脚本执行的封装

基础知识 基于如下技术栈开发DevOps平台 Spring Boot Shell Ansible Git Gitlab Docker K8S Vue 1、spring boot starter的封装使用 2、Shell脚本的编写 3、Ansible 脚本的编写 4、Docker 的使用与封装设计 本篇介绍如何使用Java封装Linux命令和Shell脚本的使用 将其设计成…

云计算——ACA学习 数据中心概述

作者简介&#xff1a;一名云计算网络运维人员、每天分享网络与运维的技术与干货。 座右铭&#xff1a;低头赶路&#xff0c;敬事如仪 个人主页&#xff1a;网络豆的主页​​​​​ 目录 写在前面 课程目标 学前了解 一.数据中心定义 二.数据中心涉及的主要标准与规范 …

和chatgpt学架构04-路由开发

目录 1 什么是路由2 如何设置路由2.1 安装依赖2.2 创建路由文件2.3 创建首页2.4 编写HomePage2.5 更新路由配置2.6 让路由生效 3 测试总结 要想使用vue实现页面的灵活跳转&#xff0c;其中路由配置是必不可少的&#xff0c;我们在做开发的时候&#xff0c;先需要了解知识点&…

十、数据结构——链式队列

数据结构中的链式队列 目录 一、链式队列的定义 二、链式队列的实现 三、链式队列的基本操作 ①初始化 ②判空 ③入队 ④出队 ⑤获取长度 ⑥打印 四、循环队列的应用 五、总结 六、全部代码 七、结果 在数据结构中&#xff0c;队列&#xff08;Queue&#xff09;是一种常见…

【数据分享】1999—2021年地级市地方一般公共预算收支状况(科学技术支出/教育支出等)

在之前的文章中&#xff0c;我们分享过基于2000-2022年《中国城市统计年鉴》整理的1999-2021年地级市的人口相关数据、各类用地面积数据、污染物排放和环境治理相关数据、房地产投资情况和商品房销售面积、社会消费品零售总额和年末金融机构存贷款余额&#xff08;可查看之前的…

STM32CubeIDE(串口)

目录 一、轮询模式 1.1 配置USART2为异步模式 1.2 500ms发送一次消息 1.3 通信结果 1.4 串口控制LED 二、中断收发 2.1 开启中断 2.2 中断发送接收 2.2.1 中断发送只需要调用接口 2.2.2 中断接收 2.3 实验结果 三、DMA模式与收发不定长数据 3.1 DMA通道配置 3.2 DMA…

【MATLAB绘图】

MATLAB绘图函数&#xff1a;Plot函数详解 介绍 MATLAB是一种常用的科学计算和数据可视化工具&#xff0c;它提供了强大的绘图函数&#xff0c;使用户能够创建各种类型的图表和图形。 基本语法 plot函数的基本语法如下&#xff1a; plot(x, y)其中&#xff0c;x和y是长度相…

Vue 本地应用 图片切换 v-show v-bind实践

点击切换图片的本质&#xff0c;其实修改的是img标签的src属性。 图片的地址有很多个&#xff0c;在js当中通过数组来保存多个数据&#xff0c;数组的取值结合索引&#xff0c;根据索引可以来判断是否是第一张还是最后一张。 图片的变化本质是src属性被修改了&#xff0c;属性…

Shell输出帮助手册

代码&#xff1a; 帮助手册雏形 function help(){echo -e "Help manual":echo -e " -h. -- help View the help manual"echo -e " install Installation"echo -e " uninstall Uninstall" }case "$1&qu…

设计模式——单例模式

1 概述 单例模式就是保证一个类只有一个对象实例。 为了保证无法创建多余的对象实例&#xff0c;单例类中需要自己创建对象实例&#xff0c;并把自己的构造方法私有化以防止其他地方调用创建对象&#xff0c;且需要提供一个公共的方法给其他类来获取该单例类的实例。 同时单例…

初识TDMQ

目录 一&#xff1a;需求背景二&#xff1a;相关文档三&#xff1a;验证TDMQ广播消息 一&#xff1a;需求背景 目前公司需要将决策引擎处理的结果&#xff0c; 一部分数据交给下游分析/入黑/通知等功能。因此就需要决策引擎生产结果让多方下游去消费。 而我需要实现下游的一部…

flutter开发实战-jsontodart及 生成Dart Model类

flutter开发实战-jsontodart及 生成Dart Model类。 在开发中&#xff0c;经常遇到请求的数据Json需要转换成model类。这里记录一下Jsontodart生成Dart Model类的方案。 一、JSON生成Dart Model类 在开发中经常用到将json转成map或者list。通过json.decode() 可以方便 JSON 字…