Scala 练习一 将Mysql表数据导入HBase

Scala 练习一 将Mysql表数据导入HBase

续第一篇:Java代码将Mysql表数据导入HBase表

源码仓库地址:https://gitee.com/leaf-domain/data-to-hbase

一、整体介绍

在这里插入图片描述

  1. HBase特质

    连接HBase, 创建HBase执行对象

    1. 初始化配置信息:多条(hbase.zookeeper.quorum=>ip:2181)
      Configuration conf = HBaseConfiguration.create()
      conf.set(String, String)
    2. 创建连接:多个连接(池化)
      Connection con = ConnectionFactory.createConnection()
    3. 创建数据表:表名: String
      Table table = con.getTable(TableName)
    def build(): HBase		// 初始化配置信息
    def initPool(): HBase	// 初始化连接池
    def finish(): Executor	// 完成 返回执行对象
    
  2. Executor特质

    对HBase进行操作的方法: 包含如下函数

    def exists(tableName: String): Boolean	// 验证数据表是否存在
    def create(tableName: String, columnFamilies: Seq[String]): Boolean	// 创建数据表
    def drop(tableName: String): Boolean	// 删除数据表
    def put(tableName: String, data: util.List[Put]): Boolean	// 批量插入数据
    
  3. Jdbc 封装

    Jdbc封装

    1. 初始化连接
      driver : com.mysql.cj.jdbc.Driver
      参数:url, username, password
      创建连接
    2. 初始化执行器
      sql, parameters
      创建执行器【初始化参数】
    3. 执行操作并返回【结果】
      DML: 返回影响数据库表行数
      DQL: 返回查询的数据集合
      EX: 出现异常结果
  4. MyHBase用于实现HBaseExecutor特质

  5. 测试数据格式

    mysql表

    SET NAMES utf8mb4;
    SET FOREIGN_KEY_CHECKS = 0;
    
    DROP TABLE IF EXISTS `test_table_for_hbase`;
    CREATE TABLE `test_table_for_hbase`  (
      `test_id` int NULL DEFAULT NULL,
      `test_name` varchar(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL,
      `test_age` int NULL DEFAULT NULL,
      `test_gender` varchar(6) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL,
      `test_phone` varchar(11) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL
    ) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci ROW_FORMAT = Dynamic;
    
    INSERT INTO `test_table_for_hbase` VALUES (1, 'testName1', 26, 'male', '18011111112');
    INSERT INTO `test_table_for_hbase` VALUES (2, 'testName2', 25, 'female', '18011111113');
    INSERT INTO `test_table_for_hbase` VALUES (3, 'testName3', 27, 'male', '18011111114');
    INSERT INTO `test_table_for_hbase` VALUES (4, 'testName4', 35, 'male', '18011111115');
    -- .... 省略以下数据部分
    

    hbase表

    # 创建表  库名:表名, 列族1, 列族2
    create "hbase_test:tranfer_from_mysql","baseInfo","scoreInfo"	
    truncate 'hbase_test:tranfer_from_mysql'  # 清空hbase_test命名空间下的tranfer_from_mysql表
    scan 'hbase_test:tranfer_from_mysql'	  # 查看表
    

二、依赖

<dependencies>
    <!-- HBase 驱动 -->
    <dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase-client</artifactId>
        <version>2.3.5</version>
    </dependency>
    <!-- Hadoop -->
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-common</artifactId>
        <version>3.1.3</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-auth</artifactId>
        <version>3.1.3</version>
    </dependency>
    <!-- mysql -->
    <dependency>
        <groupId>com.mysql</groupId>
        <artifactId>mysql-connector-j</artifactId>
        <version>8.0.33</version>
    </dependency>

    <!-- zookeeper -->
    <dependency>
        <groupId>org.apache.zookeeper</groupId>
        <artifactId>zookeeper</artifactId>
        <version>3.6.3</version>
    </dependency>
</dependencies>

三、测试结果

终端有个日志的小警告(无伤大雅hh),输出为 true
在这里插入图片描述

查看hbase表,发现数据正常导入

在这里插入图片描述

四、源码

scala代码较简单这里直接上源码了,去除了部分注释,更多请去仓库下载

Executor

package hbase
import org.apache.hadoop.hbase.client.Put
import java.util
trait Executor {
  def exists(tableName: String): Boolean
  def create(tableName: String, columnFamilies: Seq[String]): Boolean
  def drop(tableName: String): Boolean
  def put(tableName: String, data: util.List[Put]): Boolean
}

HBase

package hbase
import org.apache.hadoop.hbase.client.Connection
trait HBase {
  protected var statusCode: Int = -1
  def build(): HBase
  case class PoolCon(var available: Boolean, con: Connection) {
    def out = {
      available = false
      this
    }
    def in = available = true
  }
  def initPool(): HBase
  def finish(): Executor
}

MyHBase

package hbase.impl

import hbase.{Executor, HBase}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.client.{ColumnFamilyDescriptorBuilder, ConnectionFactory, Put, TableDescriptorBuilder}
import org.apache.hadoop.hbase.exceptions.HBaseException
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}

import java.util
import scala.collection.mutable.ArrayBuffer

class MyHBase (conf: Map[String, String])(pooled: Boolean = false, poolSize: Int = 3) extends HBase{
  private lazy val config: Configuration = HBaseConfiguration.create()
  private lazy val pool: ArrayBuffer[PoolCon] = ArrayBuffer()
  
  override def build(): HBase = {
    if(statusCode == -1){
      conf.foreach(t => config.set(t._1, t._2))
      statusCode = 0
      this
    }else{
      throw new HBaseException("build() function must be invoked first")
    }
  }
  
  override def initPool(): HBase = {
    if(statusCode == 0){
      val POOL_SIZE = if (pooled) {
        if (poolSize <= 0) 3 else poolSize
      } else 1
      for (i <- 1 to POOL_SIZE) {
        pool.append(PoolCon(available = true, ConnectionFactory.createConnection(config)))
      }
      statusCode = 1
      this
    }else{
      throw new HBaseException("initPool() function must be invoked only after build()")
    }

  }
  
  override def finish(): Executor = {
    if (statusCode == 1) {
      statusCode = 2
      new Executor {
        override def exists(tableName: String): Boolean = {
          var pc: PoolCon = null
          try{
            pc = getCon
            val exists = pc.con.getAdmin.tableExists(TableName.valueOf(tableName))
            pc.in
            exists
          }catch {
            case e: Exception => e.printStackTrace()
              false
          }finally {
            close(pc)
          }
        }

        override def create(tableName: String, columnFamilies: Seq[String]): Boolean = {
          if (exists(tableName)) {
            return false
          }
          var pc: PoolCon = null
          try {
            pc = getCon
            val builder: TableDescriptorBuilder = TableDescriptorBuilder
              .newBuilder(TableName.valueOf(tableName))

            columnFamilies.foreach(
              cf => builder.setColumnFamily(
                ColumnFamilyDescriptorBuilder.of(cf)
              )
            )
            pc.con.getAdmin.createTable(builder.build())
            true
          } catch {
            case e: Exception => e.printStackTrace()
              false
          } finally {
              close(pc)
          }
        }
        override def drop(tableName: String): Boolean = {
          if(!exists(tableName)){
            return false
          }
          var pc: PoolCon = null
          try {
            pc = getCon
            pc.con.getAdmin.deleteTable(TableName.valueOf(tableName))
            true
          } catch {
            case e: Exception => e.printStackTrace()
              false
          } finally {
            close(pc)
          }
        }

        override def put(tableName: String, data: util.List[Put]): Boolean = {
          if(!exists(tableName)){
            return false
          }
          var pc: PoolCon = null
          try {
            pc = getCon
            pc.con.getTable(TableName.valueOf(tableName)).put(data)
            true
          } catch {
            case e: Exception => e.printStackTrace()
              false
          } finally {
            close(pc)
          }
        }
      }
    }
    else {
      throw new HBaseException("finish() function must be invoked only after initPool()")
    }
  }
  private def getCon = {
    val left: ArrayBuffer[PoolCon] = pool.filter(_.available)
    if (left.isEmpty) {
      throw new HBaseException("no available connection")
    }
    left.apply(0).out
  }

  private def close(con: PoolCon) = {
    if (null != con) {
      con.in
    }
  }
}

object MyHBase{
  def apply(conf: Map[String, String])(poolSize: Int): MyHBase = new MyHBase(conf)(true, poolSize)
}

Jdbc

package mysql
import java.sql.{Connection, DriverManager, ResultSet, SQLException}
import java.util
object Jdbc {
  object Result extends Enumeration {
    val EX = Value(0) 
    val DML = Value(1) 
    val DQL = Value(2) 
  }
  // 3种结果(异常,DML,DQL)封装
  case class ResThree(rst: Result.Value) {
    def to[T <: ResThree]: T = this.asInstanceOf[T]
  }
  class Ex(throwable: Throwable) extends ResThree(Result.EX)
  object Ex {
    def apply(throwable: Throwable): Ex = new Ex(throwable)
  }

  class Dml(affectedRows: Int) extends ResThree(Result.DML) {
    def update = affectedRows
  }
  object Dml {
    def apply(affectedRows: Int): Dml = new Dml(affectedRows)
  }

  class Dql(set: ResultSet) extends ResThree(Result.DQL) {
    def generate[T](f: ResultSet => T) = {
      val list: util.List[T] = new util.ArrayList()
      while (set.next()) {
        list.add(f(set))
      }
      list
    }
  }
  object Dql {
    def apply(set: ResultSet): Dql = new Dql(set)
  }
  // JDBC 函数封装
  def jdbc(url: String, user: String, password: String)(sql: String, params: Seq[Any] = null): ResThree = {
    def con() = {
      // 1.1 显式加载 JDBC 驱动程序(只需要一次)
      Class.forName("com.mysql.cj.jdbc.Driver")
      // 1.2 创建连接对象
      DriverManager.getConnection(url, user, password)
    }
    def pst(con: Connection) = {
      // 2.1 创建执行对象
      val pst = con.prepareStatement(sql)
      // 2.2 初始化 SQL 参数
      if (null != params && params.nonEmpty) {
        params.zipWithIndex.foreach(t => pst.setObject(t._2 + 1, t._1))
      }
      pst
    }
    try {
      val connect = con()
      val prepared = pst(connect)
      sql match {
        case sql if sql.matches("^(insert|INSERT|delete|DELETE|update|UPDATE) .*")
        => Dml(prepared.executeUpdate())
        case sql if sql.matches("^(select|SELECT) .*")
        => Dql(prepared.executeQuery())
        case _ => Ex(new SQLException(s"illegal sql command : $sql"))
      }

    } catch {
      case e: Exception => Ex(e)
    }

  }

}

Test

import hbase.impl.MyHBase
import mysql.Jdbc._
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.util.Bytes
import java.util

object Test {
  def main(args: Array[String]): Unit = {
    // 初始化MySQL JDBC操作函数
    val jdbcOpr: (String, Seq[Any]) => ResThree = jdbc(
      user = "root",
      url = "jdbc:mysql://localhost:3306/test_db_for_bigdata",
      password = "123456"
    )

    // 执行SQL查询,并将结果封装在ResThree对象中
    val toEntity: ResThree = jdbcOpr(
      "select * from test_table_for_hbase where test_id between ? and ?",
      Seq(2, 4)
    )

    // 判断ResThree对象中的结果是否为异常
    if (toEntity.rst == Result.EX) {
      // 如果异常,执行异常结果处理
      toEntity.to[Ex]
      println("出现异常结果处理")
    } else {
      // 如果没有异常,将查询结果转换为HBase的Put对象列表
      val puts: util.List[Put] = toEntity.to[Dql].generate(rst => {
        // 创建一个Put对象,表示HBase中的一行
        val put = new Put(
          Bytes.toBytes(rst.getInt("test_id")), // row key设置为test_id
          System.currentTimeMillis() // 设置时间戳
        )
        // 向Put对象中添加列值
        // baseInfo是列族名,test_name、test_age、test_gender、test_phone是列名
        put.addColumn(
          Bytes.toBytes("baseInfo"), Bytes.toBytes("test_name"),
          Bytes.toBytes(rst.getString("test_name"))
        )
        put.addColumn(
          Bytes.toBytes("baseInfo"), Bytes.toBytes("test_age"),
          Bytes.toBytes(rst.getString("test_age")) // 注意:这里假设test_age是字符串类型,但通常应为整数类型
        )
        put.addColumn(
          Bytes.toBytes("baseInfo"), Bytes.toBytes("test_gender"),
          Bytes.toBytes(rst.getString("test_gender"))
        )
        put.addColumn(
          Bytes.toBytes("baseInfo"), Bytes.toBytes("test_phone"),
          Bytes.toBytes(rst.getString("test_phone"))
        )
        // 返回构建好的Put对象
        put
      })

      // 如果有数据需要插入HBase
      if (puts.size() > 0) {
        // 初始化HBase连接池并执行Put操作
        val exe = MyHBase(Map("hbase.zookeeper.quorum" -> "single01:2181"))(1)
          .build()
          .initPool()
          .finish()

        // 执行Put操作,并返回是否成功
        val bool = exe.put("hbase_test:tranfer_from_mysql", puts)

        // 打印操作结果
        println(bool)
      } else {
        // 如果没有数据需要插入
        println("查无数据")
      }
    }
  }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/680339.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

软件测试——Java单元测试(常用注解+断言)待续

1.软件及环境 软件&#xff1a;IDEA 环境&#xff1a;JDK1.8&#xff0c;Junit 4.13 2.环境配置 这里我们采用IDEA编辑器&#xff0c;利用Maven对项目进行构建&#xff0c;如下&#xff1a; 然后项目构建完之后&#xff0c;首先第一步是进入pom文件&#xff0c;添加Junit4.13依…

JavaSE——【逻辑控制】(知识)

目录 前言 一、顺序结构 二、分支结构 三、循环结构 总结 前言 公元 3050 年&#xff0c;地球的科技已经发展到令人难以想象的地步。这天&#xff0c;艾米莉在自己的房间里启动了最新的虚拟旅行装置&#xff0c;下一秒&#xff0c;她发现小奥奇的博客更新了。立即放弃了虚…

Linux部署调度工具xxl-job

背景&#xff1a; Pentaho Data Integration&#xff08;kettle&#xff09;作为用户规模最多的开源ETL工具&#xff0c;强大简洁的功能深受广大ETL从业者的欢迎。但kettle本身的调度监控功能却非常弱。Pentaho官方都建议采用crontab(Unix&#xff0c;linux平台)和计划任务(Win…

Warning:成交前,永远相信意外即将发生

作为一名首次次创业者&#xff0c;随着创业进入深层次阶段&#xff0c;越来越感觉到&#xff1a;创业是一条不归路&#xff0c;因为路上不止有惊喜&#xff0c;还有风尘。创业之前我认为世界是“天圆地方”的&#xff0c; 创业后你猜我怎么看这个世界的&#xff1f; 创业前我一…

输入a,b,c3个整数,按由大到小的顺序输出

解题思路&#xff1a; 用3个指针变量指向3个整型变量&#xff0c;然后用swap函数来实现互换3个整型变量的值。 编写程序&#xff1a; 运行结果&#xff1a; 程序分析&#xff1a; exchange函数的作用是使指针变量p1,p2,p3所指向的整型变量按由大到小的顺序交换它们的值…

ARM32开发——串口库封装(初级)

&#x1f3ac; 秋野酱&#xff1a;《个人主页》 &#x1f525; 个人专栏:《Java专栏》《Python专栏》 ⛺️心若有所向往,何惧道阻且长 文章目录 开发流程分组创建 接口定义完整代码 开发流程 在文件系统中&#xff0c;创建库目录Library在keil工程中&#xff0c;创建分组管理…

【Vue】组件通信

文章目录 一、组件之间如何通信二、组件关系分类三、通信解决方案四、父子通信流程五、父向子通信代码示例六、子向父通信代码示例 组件通信&#xff0c;就是指组件与组件之间的数据传递 组件的数据是独立的&#xff0c;无法直接访问其他组件的数据。想使用其他组件的数据&…

Java集合简略记录

一、集合体系结构 单列集合&#xff1a;Collection 双列集合&#xff1a;Map 二、单列集合 List系列集合&#xff1a;添加的元素是有序、可重复、有索引 有序指的是存和取的顺序是一致的&#xff0c;和之前排序的从小到大是没有任何关系的 Set系列集合&#xff1a;添加的元素是…

28 hive安装-本地模式

1.安装mysql&#xff08;参考文章&#xff1a;centos7.8安装Mysql8.4-CSDN博客&#xff09; 2.将mysql驱动拷贝到/opt/module/hive/lib目录下 &#xff08;直接windows通过finalShell上传&#xff09; 3./opt/module/hive/conf目录下新建hive-site.xml文件&#xff0c;进行配置…

c#与汇川plc通信 使用官网API库

前言 上位机开发中有时会要求与PLC进行通信&#xff0c;汇川官网也有好用的API库方便大家使用。记录一下开发过程。 1.下载资料 汇川官网地址&#xff1a;汇川技术 - 推进工业文明 共创美好生活 打开后选择&#xff1a;服务与支持-》资料下载-》 资料下载 这里可以直接搜索&am…

傅立叶变换矩阵的频谱响应

傅立叶变换矩阵的频谱响应 线性变换可以用矩阵表示&#xff0c;傅立叶变换是一种线性变换&#xff0c;因此也可以使用矩阵表示。具体可以参考&#xff1a;离散傅立叶变换和线性变换的关系&#xff1a;什么是线性空间&#xff1f; 1、傅立叶矩阵 X [ k ] ∑ n 0 N − 1 x [ …

实测有效:Win11一键恢复win10经典右键菜单,让Win11右键默认显示更多设置教程!

Win11一键还原win10右键菜单&#xff1f;win11右键菜单怎么改&#xff1f;怎样让Win11右键默认显示更多选项&#xff1f;今天&#xff0c;我要给你们介绍一款专为Windows 11系统设计的小巧工具&#xff0c;它能让你的右键菜单瞬间回到Win10时代&#xff0c;那种熟悉的感觉&…

Python02:python代码初体验

0、python代码初体验 print(hello,world)看到执行结果输出&#xff0c;则OKK! 1、输出结果取消换行 当print多个执行结果&#xff0c;又希望它们在同一行展示时&#xff1a; print(hello,world, end)print(Hao are, end ) print(you, end?) print(I am fine.) # end参数可…

鸿蒙Ability Kit(程序框架服务)【Ability与ServiceExtensionAbility通信】

Ability与ServiceExtensionAbility通信 介绍 本示例展示通过[IDL的方式]和 [ohos.rpc] 等接口实现了Ability与ServiceExtensionAbility之间的通信。 效果预览 使用说明 1.启动应用后&#xff0c;首页展示城市的天气信息&#xff0c;当前温度每隔5S会刷新一次。 工程目录 …

短链接突然无法打开和域名有关系?短链接使用必读

在使用短链接的过程中&#xff0c;偶尔会遇到短链接无法正常访问的情形&#xff0c;打开之后&#xff0c;要么呈现该网页已然停止访问的提示&#xff0c;要么就是显示无法访问此网站&#xff0c;这究竟是何种缘由导致的呢&#xff1f;而当遇到这些状况时又该如何去妥善解决呢&a…

IC设计企业致力于解决的HPC数据防泄漏,到底该怎么做?

对于半导体IC设计企业来说&#xff0c;芯片设计、验证、仿真使用HPC环境现在已逐渐成为趋势&#xff0c;主要原因在于原来的工作流程存在较多的缺陷&#xff1a; 性能瓶颈&#xff1a;仿真、设计、验证、生产过程中&#xff0c;前端仿真需要小文件高并发低时延的读写和巨量元数…

2024年人文教育与管理科学国际会议(ICHEMS 2024)

2024年人文教育与管理科学国际会议 2024 International Conference on Humanities Education and Management Science 【1】会议简介 2024年人文教育与管理科学国际会议是一场集合了全球人文教育与管理科学领域精英的学术盛会。本次会议旨在搭建一个国际化的学术交流平台&#…

ElementUi el-tree动态加载节点数据 load方法触发机制

需求背景&#xff1a;需要根据点击后获取的数据动态渲染一个 el-tree&#xff0c;同时渲染出来的 el-tree&#xff0c;需要点击节点时才能获取该节点的层数的获取&#xff0c;如图所示&#xff0c;我需要点击“组”节点才能渲染“设备列表”树&#xff0c;同时“设备列表”树的…

企业数据安全管理容易忽视的关键点:云存储权限管控

云存储已经广泛应用于企业用户、教育领域、医疗领域以及政府和公共服务部门。具体应用场景包括文件共享、数据备份、在线课程、教学资源库、电子病历、医学影像、实验室数据、政务数据的集中管理和共享等。 云存储的优势非常明显&#xff1a; 可扩展性&#xff1a;云存储空间可…

Message forwarding mechanism (消息转发机制)

iOS的消息转发机制 iOS的消息转发机制是在消息发送给对象时&#xff0c;找不到对应的实例方法的情况下启动的。消息转发允许对象在运行时处理无法识别的消息&#xff0c;提供了一种动态的、灵活的消息处理方式。 消息转发机制主要分为三个阶段&#xff1a; 动态方法解析快速…