大数据-268 实时数仓 - ODS层 将 Kafka 中的维度表写入 DIM

点一下关注吧!!!非常感谢!!持续更新!!!

Java篇开始了!

  • MyBatis 更新完毕
  • 目前开始更新 Spring,一起深入浅出!

目前已经更新到了:

  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(已更完)
  • Spark(已更完)
  • Flink(已更完)
  • ClickHouse(已更完)
  • Kudu(已更完)
  • Druid(已更完)
  • Kylin(已更完)
  • Elasticsearch(已更完)
  • DataX(已更完)
  • Tez(已更完)
  • 数据挖掘(已更完)
  • Prometheus(已更完)
  • Grafana(已更完)
  • 离线数仓(已更完)
  • 实时数仓(正在更新…)

章节内容

  • ODS
  • Lambda架构
  • Kappa架构

在这里插入图片描述

基本介绍

在 Kafka 中写入维度表(DIM)通常涉及将实时或批处理数据从 Kafka 主题(Topic)读取,并根据数据流中的信息更新维度表(DIM),这在数据仓库或数据湖的 ETL(提取、转换、加载)过程中非常常见。维度表(DIM)存储的是与业务数据相关的维度信息,例如客户、产品、地理位置等,用于支持 OLAP(联机分析处理)查询。

理解 Kafka 数据流

Kafka 是一个分布式流平台,用于高吞吐量的消息传递。在 ETL 过程中,Kafka 通常用作数据的消息队列或者流处理的来源。每当新数据生成时,它会被发布到 Kafka 中的某个主题(Topic),然后消费者(Consumer)可以从主题中获取数据进行处理。

设计维度表(DIM)

维度表通常包含业务实体的详细信息,如产品名称、客户信息、时间维度等。与事实表(Fact)不同,维度表的数据较为静态,但可能会随着时间更新(例如,客户地址变更或产品类别更新)。每个维度表通常有一个唯一的主键(如 customer_id 或 product_id)来标识记录。

Kafka 消费者(Consumer)

为了从 Kafka 中读取维度数据,需要创建一个消费者(Consumer),它会从 Kafka 的某个主题(Topic)中读取消息。这些消息通常是 JSON 格式,包含需要写入维度表的信息。消费者将从 Kafka 主题中获取数据,可能包括以下步骤:

  • 连接到 Kafka 集群。
  • 订阅一个或多个主题(Topics)。
  • 消费消息并将其传递给后续的处理逻辑。
  • 消费者的实现可以使用 Kafka 提供的客户端库,例如 Kafka 的 Java 客户端、Python 的 confluent-kafka 等。

数据处理和转换

在读取到 Kafka 消息后,消费者需要对数据进行必要的处理和转换。对于维度数据,处理逻辑可能包括:

  • 数据解析:将消息从 Kafka 中的格式(例如 JSON)解析成结构化数据。
  • 校验数据:检查数据是否符合业务规则,是否完整,是否有效。
  • 维度数据更新:如果 Kafka 中的消息包含的维度信息已经存在,则更新相关记录;如果是新维度,则插入新记录。

维度表的更新

维度表的更新通常有两种常见的方式:

  • 全量更新:每次从 Kafka 获取到新的数据时,都将其覆盖到维度表中。这种方式适用于数据变动较少或者可以接受重写的场景。
  • 增量更新:根据时间戳、有效性标志或版本号等信息,更新已有的维度记录。这种方式适用于数据会有更新(如地址或状态变更)的场
    景。

增量更新时,通常会执行以下操作:

  • 查找是否已有该维度记录(例如通过 dimension_id)。
  • 如果存在且数据发生变化,则更新该记录,同时更新 valid_to 时间,并插入一条新的记录,设置 valid_from 和 valid_to 时间。
  • 如果不存在该记录,则直接插入新的维度数据。

写入到目标存储(DIM)

在数据处理后,需要将更新后的维度数据写入目标存储。这通常是一个数据库(例如 MySQL、PostgreSQL 或 NoSQL 数据库)或数据仓库(例如 Snowflake、Google BigQuery、Redshift)中的维度表(DIM)。

数据存储更新(事务性考虑)

对于维度表的更新,通常需要确保数据的一致性。可以使用事务来确保数据在更新过程中的一致性,防止数据丢失或重复。例如,可以在事务中执行所有的更新和插入操作,确保如果操作失败,可以回滚。

TableObject

创建样例 TableObject

case class TableObject(database: String, tableName: String, typeInfo: String, dataInfo: String) extends Serializable

AreaInfo

case class AreaInfo(
  id: String,
  name: String,
  pid: String,
  sname: String,
  level: String,
  citycode: String,
  yzcode: String,
  mername: String,
  Lng: String,
  Lat: String,
  pinyin: String
  )

DataInfo

case class DataInfo(
  modifiedTime: String,
  orderNo: String,
  isPay: String,
  orderId: String,
  tradeSrc: String,
  payTime: String,
  productMoney: String,
  totalMoney: String,
  dataFlag: String,
  userId: String,
  areaId: String,
  createTime: String,
  payMethod: String,
  isRefund: String,
  tradeType: String,
  status: String
)

ConnHBase

class ConnHBase {
  def connToHbase:Connection ={
    val conf : Configuration = HBaseConfiguration.create()
    conf.set("hbase.zookeeper.quorum","h121.wzk.icu,h122.wzk.icu,h123.wzk.icu")
    conf.set("hbase.zookeeper.property.clientPort","2181")
    conf.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,30000)
    conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,30000)
    val connection = ConnectionFactory.createConnection(conf)
    connection
  }
}

SinkHBase

class SinkHBase extends RichSinkFunction[util.ArrayList[TableObject]] {

  var connection : Connection = _
  var hbTable : Table = _

  override def open(parameters: Configuration): Unit = {
    connection = new ConnHBase().connToHbase
    hbTable = connection.getTable(TableName.valueOf("wzk_area"))
  }

  override def close(): Unit = {
    if (hbTable != null) {
      hbTable.close()
    }
    if (connection != null) {
      connection.close()
    }
  }

  override def invoke(value: util.ArrayList[TableObject], context: SinkFunction.Context[_]): Unit = {
    value.forEach(x => {
      println(x.toString)
      val database: String = x.database
      val tableName: String = x.tableName
      val typeInfo: String = x.typeInfo
      if ((database.equalsIgnoreCase("dwshow") && tableName.equalsIgnoreCase("wzk_trade_orders"))) {
        if (typeInfo.equalsIgnoreCase("insert")) {
          value.forEach(x => {
            val info: DataInfo = JSON.parseObject(x.dataInfo, classOf[DataInfo])
            insertTradeOrders(hbTable, info)
          })
        } else if (typeInfo.equalsIgnoreCase("update")) {

        } else if (typeInfo.equalsIgnoreCase("delete")) {

        }
      }

      if (database.equalsIgnoreCase("dwshow") && tableName.equalsIgnoreCase("wzk_area")) {
        if (typeInfo.equalsIgnoreCase("insert")) {
          value.forEach(x => {
            val info: AreaInfo = JSON.parseObject(x.dataInfo, classOf[AreaInfo])
            insertArea(hbTable, info)
          })
        } else if (typeInfo.equalsIgnoreCase("update")) {
          value.forEach(x => {
            val info: AreaInfo = JSON.parseObject(x.dataInfo, classOf[AreaInfo])
            insertArea(hbTable, info)
          })
        } else if (typeInfo.equalsIgnoreCase("delete")) {
          value.forEach(x => {
            val info: AreaInfo = JSON.parseObject(x.dataInfo, classOf[AreaInfo])
            deleteArea(hbTable, info)
          })

        }
      }
    })
  }

  def insertTradeOrders(hbTable: Table, dataInfo: DataInfo): Unit = {
    val tableName = "wzk_trade_orders"
    val columnFamily = "f1"
    // 如果表不存在则创建
    createTableIfNotExists(connection, tableName, columnFamily)

    val put = new Put(dataInfo.orderId.getBytes)
    put.addColumn("f1".getBytes, "modifiedTime".getBytes, dataInfo.modifiedTime.getBytes())
    put.addColumn("f1".getBytes, "orderNo".getBytes, dataInfo.orderNo.getBytes())
    put.addColumn("f1".getBytes, "isPay".getBytes, dataInfo.isPay.getBytes())
    put.addColumn("f1".getBytes, "orderId".getBytes, dataInfo.orderId.getBytes())
    put.addColumn("f1".getBytes, "tradeSrc".getBytes, dataInfo.tradeSrc.getBytes())
    put.addColumn("f1".getBytes, "payTime".getBytes, dataInfo.payTime.getBytes())
    put.addColumn("f1".getBytes, "productMoney".getBytes, dataInfo.productMoney.getBytes())
    put.addColumn("f1".getBytes, "totalMoney".getBytes, dataInfo.totalMoney.getBytes())
    put.addColumn("f1".getBytes, "dataFlag".getBytes, dataInfo.dataFlag.getBytes())
    put.addColumn("f1".getBytes, "userId".getBytes, dataInfo.userId.getBytes())
    put.addColumn("f1".getBytes, "areaId".getBytes, dataInfo.areaId.getBytes())
    put.addColumn("f1".getBytes, "createTime".getBytes, dataInfo.createTime.getBytes())
    put.addColumn("f1".getBytes, "payMethod".getBytes, dataInfo.payMethod.getBytes())
    put.addColumn("f1".getBytes, "isRefund".getBytes, dataInfo.isRefund.getBytes())
    put.addColumn("f1".getBytes, "tradeType".getBytes, dataInfo.tradeType.getBytes())
    put.addColumn("f1".getBytes, "status".getBytes, dataInfo.status.getBytes())
    hbTable.put(put)
  }

  def insertArea(hbTable: Table, areaInfo: AreaInfo): Unit = {
    // val tableName = "wzk_area"
    // val columnFamily = "f1"
    // 如果表不存在则创建
    // createTableIfNotExists(connection, tableName, columnFamily)

    println(areaInfo.toString)
    val put = new Put(areaInfo.id.getBytes())
    put.addColumn("f1".getBytes(), "name".getBytes(), areaInfo.name.getBytes())
    put.addColumn("f1".getBytes(), "pid".getBytes(), areaInfo.pid.getBytes())
    put.addColumn("f1".getBytes(), "sname".getBytes(), areaInfo.sname.getBytes())
    put.addColumn("f1".getBytes(), "level".getBytes(), areaInfo.level.getBytes())
    put.addColumn("f1".getBytes(), "citycode".getBytes(), areaInfo.citycode.getBytes())
    put.addColumn("f1".getBytes(), "yzcode".getBytes(), areaInfo.yzcode.getBytes())
    put.addColumn("f1".getBytes(), "mername".getBytes(), areaInfo.mername.getBytes())
    put.addColumn("f1".getBytes(), "lng".getBytes(), areaInfo.Lng.getBytes())
    put.addColumn("f1".getBytes(), "lat".getBytes(), areaInfo.Lat.getBytes())
    put.addColumn("f1".getBytes(), "pinyin".getBytes(), areaInfo.pinyin.getBytes())
    hbTable.put(put)
  }

  def deleteArea(hbTable: Table, areaInfo: AreaInfo): Unit = {
    val delete = new Delete(areaInfo.id.getBytes)
    hbTable.delete(delete)
  }

  def createTableIfNotExists(connection: Connection, tableName: String, columnFamily: String): Unit = {
    val admin = connection.getAdmin
    try {
      val table = TableName.valueOf(tableName)

      // 检查表是否存在
      if (!admin.tableExists(table)) {
        val tableDescriptor = new HTableDescriptor(table)
        val columnDescriptor = new HColumnDescriptor(columnFamily.getBytes())
        tableDescriptor.addFamily(columnDescriptor)

        // 创建表
        admin.createTable(tableDescriptor)
        println(s"表 $tableName 创建成功")
      } else {
        println(s"表 $tableName 已存在")
      }
    } finally {
      admin.close()
    }
  }

}

SourceKafka

class SourceKafka {

  def getKafkaSource(topicName: String) : FlinkKafkaConsumer[String] = {
    val props = new Properties()
    props.setProperty("bootstrap.servers", "h121.wzk.icu:9092")
    props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    props.setProperty("group.id", "hbase-test")
    props.setProperty("auto.offset.reset", "earliest")
    new FlinkKafkaConsumer[String](topicName, new SimpleStringSchema(), props)
  }

}

KafkaToHBase

object KafkaToHBase {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val kafkaConsumer = new SourceKafka().getKafkaSource("dwshow")
    kafkaConsumer.setStartFromLatest()
    val sourceStream = env.addSource(kafkaConsumer)
    val mapped: DataStream[util.ArrayList[TableObject]] = sourceStream.map(x => {
      val jsonObj: JSONObject = JSON.parseObject(x)
      val database: AnyRef = jsonObj.get("database")
      val table: AnyRef = jsonObj.get("table")
      val typeInfo: AnyRef = jsonObj.get("type")
      val objects = new util.ArrayList[TableObject]()
      jsonObj.getJSONArray("data").forEach(x => {
        objects.add(TableObject(database.toString, table.toString, typeInfo.toString, x.toString))
        println(x.toString)
      })
      objects
    })
    mapped.addSink(new SinkHBase)
    env.execute()
  }
}

启动项目

我们对表进行修改:
在这里插入图片描述
可以看到控制台对饮输出了内容:
在这里插入图片描述
别的表也尝试修改一下:
在这里插入图片描述
查看 HBase 可以看到数据已经有了:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/947475.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

微博_14.12.2-内置猪手 会员版

微博猪手是一款作用于微博的 XposedLsposed 模块,可以支持未root用户和已root用户使用。进入【我的】页面,点击【右上角的设置】,点击【微博猪手】即可进一步设置其他功能。通过微博猪手模块可以实现去除各种广告(开屏、信息流等&…

计算机网络 (21)网络层的几个重要概念

前言 计算机网络中的网络层是OSI(开放系统互连)模型中的第三层,也是TCP/IP模型中的第二层,它位于数据链路层和传输层之间,负责数据包从源主机到目的主机的路径选择和数据转发。 一、网络层的主要功能 路由选择&#xf…

openwrt nginx UCI配置过程

openwrt 中nginx有2种配置方法,uci nginx uci /etc/config/nginx 如下: option uci_enable true‘ 如果是true就是使用UCI配置,如果 是false,就要使用/etc/nginx/nginx.conf,一般不要修改。 如果用UCI,其…

【深度学习进阶】基于CNN的猫狗图片分类项目

介绍 基于卷积神经网络(CNN)的猫狗图片分类项目是机器学习领域中的一种常见任务,它涉及图像处理和深度学习技术。以下是该项目的技术点和流程介绍: 技术点 卷积神经网络 (CNN): CNN 是一种专门用于处理具有类似网格结构的数据的…

uni-app 页面生命周期及组件生命周期汇总(Vue2、Vue3)

文章目录 一、前言🍃二、页面生命周期三、Vue2 页面及组件生命周期流程图四、Vue3 页面及组件生命周期流程图4.1 页面加载时序介绍4.2 页面加载常见问题4.3 onShow 和 onHide4.4 onInit4.5 onLoad4.6 onReachBottom4.7 onPageScroll4.8 onBackPress4.9 onTabItemTap…

缓存淘汰算法:次数除以时间差

记录缓存中的每一项的访问次数、最后访问时间,获取当前时间,可算出时间差,然后,用次数除以时间差,取最小的淘汰。 这一算法比较慢,需配合多级缓存。一级缓存不很大,使用此算法。二级缓存可以大…

uniapp 微信小程序开发使用高德地图、腾讯地图

一、高德地图 1.注册高德地图开放平台账号 (1)创建应用 这个key 第3步骤,配置到项目中locationGps.js 2.下载高德地图微信小程序插件 (1)下载地址 高德地图API | 微信小程序插件 (2)引入项目…

Mac iTerm2集成DeepSeek AI

1. 去deepseek官网申请api key,DeepSeek 2. 安装iTerm2 AI Plugin插件,https://iterm2.com/ai-plugin.html,插件解压后直接放到和iTerms相同的位置,默认就在/Applications 下 3. 配置iTerm2 4. 重启iTerm2,使用快捷键呼出AI对话…

树莓派 Pico RP2040 教程点灯 双核编程案例

双核点亮不同的 LED 示例,引脚分别是GP0跟GP1。 #include "pico/stdlib.h" #include "pico/multicore.h"#define LED1 0 // 核心 0 控制的 LED 引脚 #define LED2 1 // 核心 1 控制的 LED 引脚// the setup function runs once when you press …

简单使用linux

1.1 Linux的组成 Linux 内核:内核是系统的核心,是运行程序和管理 像磁盘和打印机等硬件设备的核心程序。 文件系统 : 文件存放在磁盘等存储设备上的组织方法。 Linux 能支持多种目前浒的文件系统,如 ext4 、 FAT 、 VFAT 、 ISO9660 、 NF…

ACM算法模板

ACM算法模板 起手式基础算法前缀和与差分二分查找三分查找求极值分治法:归并排序 动态规划基本线性 d p dp dp最长上升子序列I O ( n 2 ) O(n ^ 2) O(n2)最长上升子序列II O ( n l o g n ) O(nlogn) O(nlogn) 贪心二分最长公共子序列 背包背包求组合种类背包求排列…

《Vue3实战教程》19:Vue3组件 v-model

如果您有疑问&#xff0c;请观看视频教程《Vue3实战教程》 组件 v-model​ 基本用法​ v-model 可以在组件上使用以实现双向绑定。 从 Vue 3.4 开始&#xff0c;推荐的实现方式是使用 defineModel() 宏&#xff1a; vue <!-- Child.vue --> <script setup> co…

Docker 环境中搭建 Redis 哨兵模式集群的步骤与问题解决

在 Docker 环境中搭建 Redis 哨兵模式集群的步骤与问题解决 在 Redis 高可用架构中&#xff0c;哨兵模式&#xff08;Sentinel&#xff09;是确保 Redis 集群在出现故障时自动切换主节点的一种机制。通过使用 Redis 哨兵&#xff0c;我们可以实现 Redis 集群的监控、故障检测和…

数据结构:时间复杂度和空间复杂度

我们知道代码和代码之间算法的不同&#xff0c;一定影响了代码的执行效率&#xff0c;那么我们该如何评判算法的好坏呢&#xff1f;这就涉及到了我们算法效率的分析了。 &#x1f4d6;一、算法效率 所谓算法效率的分析分为两种&#xff1a;第一种时间效率&#xff0c;又称时间…

《Vue3实战教程》39:Vue3无障碍访问

如果您有疑问&#xff0c;请观看视频教程《Vue3实战教程》 无障碍访问​ Web 无障碍访问 (也称为 a11y) 是指创建可供任何人使用的网站的做法——无论是身患某种障碍、通过慢速的网络连接访问、使用老旧或损坏的硬件&#xff0c;还是仅处于某种不方便的环境。例如&#xff0c;…

GESP2024年6月认证C++五级( 第三部分编程题(1)黑白格)

参考程序&#xff08;二维前缀和&#xff09; #include <iostream> #include <vector> #include <algorithm> using namespace std;int main() {int n, m, k;cin >> n >> m >> k;// 输入网格图vector<vector<int>> grid(n, v…

二、SQL语言,《数据库系统概念》,原书第7版

文章目录 一、概览SQL语言1.1 SQL 语言概述1.1.1 SQL语言的提出和发展1.1.2 SQL 语言的功能概述 1.2 利用SQL语言建立数据库1.2.1 示例1.2.2 SQL-DDL1.2.2.1 CREATE DATABASE1.2.2.2 CREATE TABLE 1.2.3 SQL-DML1.2.3.1 INSERT INTO 1.3 用SQL 语言进行简单查询1.3.1 单表查询 …

js按日期按数量进行倒序排序,然后再新增一个字段,给这个字段赋值 10 到1

效果如下图&#xff1a; 实现思路&#xff1a; 汇总数据&#xff1a;使用 reduce 方法遍历原始数据数组&#xff0c;将相同日期的数据进行合并&#xff0c;并计算每个日期的总和。创建日期映射&#xff1a;创建一个映射 dateMap&#xff0c;存储每个日期的对象列表。排序并添加…

用uniapp写一个播放视频首页页面代码

效果如下图所示 首页有导航栏&#xff0c;搜索框&#xff0c;和视频列表&#xff0c; 导航栏如下图 搜索框如下图 视频列表如下图 文件目录 视频首页页面代码如下 <template> <view class"video-home"> <!-- 搜索栏 --> <view class…

【three.js】光源

光源 光源特点 当使用MeshLambertMaterial材质时&#xff0c;会受到光线的影响&#xff0c; 我们代码里面如果没有设置光线&#xff0c;则使用MeshLambertMaterial材质修饰的模型不可见&#xff0c;这个时候&#xff0c;我们添加光线后&#xff0c;便可以看见。 环境光 定义&a…