Hudi入门

一、Hudi编译安装

1.下载

https://archive.apache.org/dist/hudi/0.9.0/hudi-0.9.0.src.tgz

2.maven编译

mvn clean install -DskipTests -Dscala2.12 -Dspark3

3.配置spark与hudi依赖包

[root@master hudi-spark-jars]# ll
total 37876
-rw-r--r-- 1 root root 38615211 Oct 27 16:13 hudi-spark3-bundle_2.12-0.9.0.jar
-rw-r--r-- 1 root root   161826 Oct 27 16:13 spark-avro_2.12-3.0.1.jar
-rw-r--r-- 1 root root     2777 Oct 27 16:13 spark_unused-1.0.0.jar

二、Hudi基础使用

1.启动cli

[root@master hudi-cli]# hudi-cli.sh

2.启动spark-shell添加hudi-jars

spark-shell \
--master local[2] \
--jars /usr/local/src/hudi/hudi-spark-jars/hudi-spark3-bundle_2.12-0.9.0.jar,/usr/local/src/hudi/hudi-spark-jars/spark-avro_2.12-3.0.1.jar,/usr/local/src/hudi/hudi-spark-jars/spark_unused-1.0.0.jar \
--conf "spark.serializer=org.apache.spark.serializer.KryoSerializer"

3.模拟产生数据

import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._

val tableName="hudi_trips_cow"
val basePath="hdfs://master:9000/hudi-warehouse/hudi_trips_cow"

val dataGen=new DataGenerator

val inserts=convertToStringList(dataGen.generateInserts(10))

val df=spark.read.json(spark.sparkContext.parallelize(inserts,2))

df.printSchema()
-----------------------------------------------------------------------------------------
root
 |-- begin_lat: double (nullable = true)
 |-- begin_lon: double (nullable = true)
 |-- driver: string (nullable = true)
 |-- end_lat: double (nullable = true)
 |-- end_lon: double (nullable = true)
 |-- fare: double (nullable = true)
 |-- partitionpath: string (nullable = true)
 |-- rider: string (nullable = true)
 |-- ts: long (nullable = true)
 |-- uuid: string (nullable = true)
-----------------------------------------------------------------------------------------

df.select("rider","begin_lat","begin_lon","driver","fare","uuid","ts").show(10,truncate=false)

4.保存到hudi表

df.write
  .mode(Overwrite)
  .format("hudi")
  .options(getQuickstartWriteConfigs)
  .option(PRECOMBINE_FIELD_OPT_KEY, "ts")
  .option(RECORDKEY_FIELD_OPT_KEY, "uuid")
  .option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath")
  .option(TABLE_NAME, tableName)
  .save(basePath)

5.查询hudi数据

val tripsSnapshotDF = spark.read.format("hudi").load("hdfs://master:9000/hudi-warehouse/hudi_trips_cow" + "/*/*/*/*")

tripsSnapshotDF.printSchema()
-----------------------------------------------------------------------------------------
root
 |-- _hoodie_commit_time: string (nullable = true)    --提交数据的提交时间 
 |-- _hoodie_commit_seqno: string (nullable = true)   --提交数据的编号 
 |-- _hoodie_record_key: string (nullable = true)     --提交数据的key 
 |-- _hoodie_partition_path: string (nullable = true) --提交数据的存储路径
 |-- _hoodie_file_name: string (nullable = true)      --提交数据的所在文件名称
 |-- begin_lat: double (nullable = true)
 |-- begin_lon: double (nullable = true)
 |-- driver: string (nullable = true)
 |-- end_lat: double (nullable = true)
 |-- end_lon: double (nullable = true)
 |-- fare: double (nullable = true)
 |-- partitionpath: string (nullable = true)
 |-- rider: string (nullable = true)
 |-- ts: long (nullable = true)
 |-- uuid: string (nullable = true)
-----------------------------------------------------------------------------------------

6.注册为临时视图

tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")

7.查询任务

乘车费用 大于 20 信息数据

scala> spark.sql("select fare, begin_lon, begin_lat, ts from  hudi_trips_snapshot where fare > 20.0").show()
+------------------+-------------------+-------------------+-------------+
|              fare|          begin_lon|          begin_lat|           ts|
+------------------+-------------------+-------------------+-------------+
| 33.92216483948643| 0.9694586417848392| 0.1856488085068272|1698046206939|
| 93.56018115236618|0.14285051259466197|0.21624150367601136|1698296387405|
| 64.27696295884016| 0.4923479652912024| 0.5731835407930634|1697991665477|
| 27.79478688582596| 0.6273212202489661|0.11488393157088261|1697865605719|
|  43.4923811219014| 0.8779402295427752| 0.6100070562136587|1698233221527|
| 66.62084366450246|0.03844104444445928| 0.0750588760043035|1697912700216|
|34.158284716382845|0.46157858450465483| 0.4726905879569653|1697805433844|
| 41.06290929046368| 0.8192868687714224|  0.651058505660742|1698234304674|
+------------------+-------------------+-------------------+-------------+

选取字段查询数据

spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from  hudi_trips_snapshot").show()

8.表数据结构

.hoodie文件

.hoodie 文件:由于CRUD的零散性,每一次的操作都会生成一个文件,这些小文件越来越多后,会严重影响HDFS的性能,Hudi设计了一套文件合并机制。 .hoodie文件夹中存放了对应的文件合并操作相关的日志文件。

Hudi把随着时间流逝,对表的一系列CRUD操作叫做Timeline。Timeline中某一次的操作,叫做Instant。Instant包含以下信息:

Instant Action,记录本次操作是一次数据提交(COMMITS),还是文件合并(COMPACTION),或者是文件清理(CLEANS);

Instant Time,本次操作发生的时间;

State,操作的状态,发起(REQUESTED),进行中(INFLIGHT),还是已完成(COMPLETED);

amricas和asia文件

amricas和asia相关的路径是实际的数据文件,按分区存储,分区的路径key是可以指定的。

三、基于IDEA使用Hudi

maven项目xml

主语scala版本相对应,否则会报错Exception in thread "main" java.lang.NoSuchMethodError: scala.Product.$init$

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>cn.saddam.hudi</groupId>
  <artifactId>Hudi-Learning</artifactId>
  <version>1.0.0</version>

<repositories>
  <repository>
    <id>aliyun</id>
    <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
  </repository>
  <repository>
    <id>cloudera</id>
    <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
  </repository>
  <repository>
    <id>jboss</id>
    <url>http://repository.jboss.com/nexus/content/groups/public</url>
  </repository>
</repositories>

<properties>
<scala.version>2.12.1</scala.version>
<scala.binary.version>2.12</scala.binary.version>
<spark.version>3.1.1</spark.version>
<hadoop.version>3.2.1</hadoop.version>
<hudi.version>0.9.0</hudi.version>
</properties>

<dependencies>
<!-- 依赖Scala语言 -->
<dependency>
  <groupId>org.scala-lang</groupId>
  <artifactId>scala-library</artifactId>
  <version>2.12.1</version>
</dependency>
<!-- Spark Core 依赖 -->
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-core_2.12</artifactId>
  <version>3.1.1</version>
</dependency>
<!-- Spark SQL 依赖 -->
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-sql_2.12</artifactId>
  <version>3.1.1</version>
</dependency>

<!-- Hadoop Client 依赖 -->
<dependency>
  <groupId>org.apache.hadoop</groupId>
  <artifactId>hadoop-client</artifactId>
  <version>${hadoop.version}</version>
</dependency>

<!-- hudi-spark3 -->
<dependency>
  <groupId>org.apache.hudi</groupId>
  <artifactId>hudi-spark3-bundle_2.12</artifactId>
  <version>${hudi.version}</version>
</dependency>
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-avro_2.12</artifactId>
  <version>3.1.1</version>
</dependency>

</dependencies>

<build>
<outputDirectory>target/classes</outputDirectory>
<testOutputDirectory>target/test-classes</testOutputDirectory>
<resources>
  <resource>
    <directory>${project.basedir}/src/main/resources</directory>
  </resource>
</resources>
<!-- Maven 编译的插件 -->
<plugins>
  <plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-compiler-plugin</artifactId>
    <version>3.0</version>
    <configuration>
      <source>1.8</source>
      <target>1.8</target>
      <encoding>UTF-8</encoding>
    </configuration>
  </plugin>
  <plugin>
    <groupId>net.alchim31.maven</groupId>
    <artifactId>scala-maven-plugin</artifactId>
    <version>3.2.0</version>
    <executions>
      <execution>
        <goals>
          <goal>compile</goal>
          <goal>testCompile</goal>
        </goals>
      </execution>
    </executions>
  </plugin>
</plugins>

</build>
</project>

1.main方法

object HudiSparkDemo {

  def main(args: Array[String]): Unit = {

    System.setProperty("HADOOP_USER_NAME", "root")

    val spark=SparkSession
      .builder()
      .appName(this.getClass.getSimpleName.stripSuffix("$"))
      .master("local[2]")
      // 设置序列化方式:Kryo
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .getOrCreate()

    import spark.implicits._

    //表名称
    val tableName: String = "tbl_trips_cow"

    //表存储路径
    val tablePath: String = "hdfs://192.168.184.135:9000/hudi-warehouse/hudi_trips_cow"


    // 构建数据生成器,为例模拟产生插入和更新数据
    import org.apache.hudi.QuickstartUtils._

    //TODO 任务一:模拟数据,插入Hudi表,采用COW模式
    //insertData(spark, tableName, tablePath)

    //TODO 任务二:快照方式查询(Snapshot Query)数据,采用DSL方式
    //queryData(spark, tablePath)
    queryDataByTime(spark, tablePath)

    //Thread.sleep(10000)
    //TODO 任务三:更新(Update)数据
    //val dataGen: DataGenerator = new DataGenerator()
    //insertData(spark, tableName, tablePath, dataGen)
    //updateData(spark, tableName, tablePath, dataGen)

    //TODO 任务四:增量查询(Incremental Query)数据,采用SQL方式
    //incrementalQueryData(spark, tablePath)

    //TODO 任务五:删除(Delete)数据
    //deleteData(spark, tableName, tablePath)

    // 应用结束,关闭资源
    spark.stop()

  }

2.模拟数据

在编写代码过程中,指定数据写入到HDFS路径时***直接写“/xxdir”***不要写“hdfs://mycluster/xxdir”,后期会报错“java.lang.IllegalArgumentException: Not in marker dir. Marker Path=hdfs://mycluster/hudi_data/.hoodie.temp/2022xxxxxxxxxx/default/c4b854e7-51d3-4a14-9b7e-54e2e88a9701-0_0-22-22_20220509164730.parquet.marker.CREATE, Expected Marker Root=/hudi_data/.hoodie/.temp/2022xxxxxxxxxx”,可以将对应的hdfs-site.xml、core-site.xml放在resources目录下,直接会找HDFS路径。

/**
    * 官方案例:模拟产生数据,插入Hudi表,表的类型COW
    */
  def insertData(spark: SparkSession, table: String, path: String): Unit = {
    import spark.implicits._

    // TODO: a. 模拟乘车数据
    import org.apache.hudi.QuickstartUtils._

    val dataGen: DataGenerator = new DataGenerator()
    val inserts: util.List[String] = convertToStringList(dataGen.generateInserts(100))

    import scala.collection.JavaConverters._
    val insertDF: DataFrame = spark.read
      .json(spark.sparkContext.parallelize(inserts.asScala, 2).toDS())
    //insertDF.printSchema()
    //insertDF.show(10, truncate = false)

    // TODO: b. 插入数据至Hudi表
    import org.apache.hudi.DataSourceWriteOptions._
    import org.apache.hudi.config.HoodieWriteConfig._

    insertDF.write
      .mode(SaveMode.Append)
      .format("hudi") // 指定数据源为Hudi
      .option("hoodie.insert.shuffle.parallelism", "2")
      .option("hoodie.upsert.shuffle.parallelism", "2")
      // Hudi 表的属性设置
      .option(PRECOMBINE_FIELD.key(), "ts")
      .option(RECORDKEY_FIELD.key(), "uuid")
      .option(PARTITIONPATH_FIELD.key(), "partitionpath")
      .option(TBL_NAME.key(), table)
      .save(path)
  }

2.查询数据

def queryData(spark: SparkSession, path: String): Unit = {
    import spark.implicits._

    val tripsDF: DataFrame = spark.read.format("hudi").load(path)
    //tripsDF.printSchema()
    //tripsDF.show(10, truncate = false)

    // 查询费用大于20,小于50的乘车数据
    tripsDF
      .filter($"fare" >= 20 && $"fare" <= 50)
      .select($"driver", $"rider", $"fare", $"begin_lat", $"begin_lon", $"partitionpath", $"_hoodie_commit_time")
      .orderBy($"fare".desc, $"_hoodie_commit_time".desc)
      .show(20, truncate = false)
  }

通过时间查询数据

def queryDataByTime(spark: SparkSession, path: String):Unit ={
    import org.apache.spark.sql.functions._

    // 方式一:指定字符串,格式 yyyyMMddHHmmss
    val df1 = spark.read
      .format("hudi")
      .option("as.of.instant", "20231027172433")
      .load(path)
      .sort(col("_hoodie_commit_time").desc)
    df1.printSchema()
    df1.show(5,false)

    // 方式二:指定字符串,格式yyyy-MM-dd HH:mm:ss
    val df2 = spark.read
      .format("hudi")
      .option("as.of.instant", "2023-10-27 17:24:33")
      .load(path)
      .sort(col("_hoodie_commit_time").desc)
    df2.printSchema()
    df2.show(5,false)
  }

3.更新数据

/**
    * 重新覆盖插入数据,然后更新
   */
  def insertData2(spark: SparkSession, table: String, path: String, dataGen: DataGenerator): Unit = {
    import spark.implicits._

    // TODO: a. 模拟乘车数据
    import org.apache.hudi.QuickstartUtils._
    val inserts = convertToStringList(dataGen.generateInserts(100))

    import scala.collection.JavaConverters._
    val insertDF: DataFrame = spark.read
      .json(spark.sparkContext.parallelize(inserts.asScala, 2).toDS())
    //insertDF.printSchema()
    //insertDF.show(10, truncate = false)

    // TODO: b. 插入数据至Hudi表
    import org.apache.hudi.DataSourceWriteOptions._
    import org.apache.hudi.config.HoodieWriteConfig._
    insertDF.write
      .mode(SaveMode.Ignore)
      .format("hudi") // 指定数据源为Hudi
      .option("hoodie.insert.shuffle.parallelism", "2")
      .option("hoodie.upsert.shuffle.parallelism", "2")
      // Hudi 表的属性设置
      .option(PRECOMBINE_FIELD.key(), "ts")
      .option(RECORDKEY_FIELD.key(), "uuid")
      .option(PARTITIONPATH_FIELD.key(), "partitionpath")
      .option(TBL_NAME.key(), table)
      .save(path)
  }

  /**
    * 官方案例:更新Hudi数据,运行程序时,必须要求与插入数据使用同一个DataGenerator对象,更新数据Key是存在的
    */
  def updateData(spark: SparkSession, table: String, path: String, dataGen: DataGenerator): Unit = {
    import spark.implicits._

    // TODO: a、模拟产生更新数据
    import org.apache.hudi.QuickstartUtils._

    import scala.collection.JavaConverters._
    val updates = convertToStringList(dataGen.generateUpdates(100))//更新
    val updateDF = spark.read.json(spark.sparkContext.parallelize(updates.asScala, 2).toDS())
    // TODO: b、更新数据至Hudi表
    import org.apache.hudi.DataSourceWriteOptions._
    import org.apache.hudi.config.HoodieWriteConfig._
    updateDF.write
      .mode(SaveMode.Append)
      .format("hudi")
      .option("hoodie.insert.shuffle.parallelism", "2")
      .option("hoodie.upsert.shuffle.parallelism", "2")
      .option(PRECOMBINE_FIELD.key(), "ts")
      .option(RECORDKEY_FIELD.key(), "uuid")
      .option(PARTITIONPATH_FIELD.key(), "partitionpath")
      .option(TBL_NAME.key(), table)
      .save(path)
  }

4.删除数据

/**
 * 官方案例:删除Hudi表数据,依据主键UUID进行删除,如果是分区表,指定分区路径
 */
def deleteData(spark: SparkSession, table: String, path: String): Unit = {
   import spark.implicits._
   
   // TODO: a. 加载Hudi表数据,获取条目数
   val tripsDF: DataFrame = spark.read.format("hudi").load(path)
   println(s"Count = ${tripsDF.count()}")
   
   // TODO: b. 模拟要删除的数据
   val dataframe: DataFrame = tripsDF.select($"uuid", $"partitionpath").limit(2)
   import org.apache.hudi.QuickstartUtils._

   val dataGen: DataGenerator = new DataGenerator()
   val deletes = dataGen.generateDeletes(dataframe.collectAsList())
   
   import scala.collection.JavaConverters._
   val deleteDF = spark.read.json(spark.sparkContext.parallelize(deletes.asScala, 2))
   
   // TODO: c. 保存数据至Hudi表,设置操作类型为:DELETE
   import org.apache.hudi.DataSourceWriteOptions._
   import org.apache.hudi.config.HoodieWriteConfig._
   deleteDF.write
      .mode(SaveMode.Append)
      .format("hudi")
      .option("hoodie.insert.shuffle.parallelism", "2")
      .option("hoodie.upsert.shuffle.parallelism", "2")
      // 设置数据操作类型为delete,默认值为upsert
      .option(OPERATION.key(), "delete")
      .option(PRECOMBINE_FIELD.key(), "ts")
      .option(RECORDKEY_FIELD.key(), "uuid")
      .option(PARTITIONPATH_FIELD.key(), "partitionpath")
      .option(TBL_NAME.key(), table)
      .save(path)
   
   // TODO: d. 再次加载Hudi表数据,统计条目数,查看是否减少2条
   val hudiDF: DataFrame = spark.read.format("hudi").load(path)
   println(s"Delete After Count = ${hudiDF.count()}")
}

知乎案例

https://www.zhihu.com/question/479484283/answer/2519394483

四、Spark滴滴运营数据分析

hive

配置文件

<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<configuration>
<property>
    <name>hive.metastore.warehouse.dir</name>
    <value>/user/hive/warehouse</value>
  </property>

 <property>
    <name>javax.jdo.option.ConnectionDriverName</name>
    <value>com.mysql.jdbc.Driver</value>
  </property>

<property>
    <name>javax.jdo.option.ConnectionURL</name>
    <value>jdbc:mysql://master:3306/hive?createDatabaseIfNotExist=true&amp;useSSL=false</value>
  </property>

 <property>
    <name>javax.jdo.option.ConnectionUserName</name>
    <value>root</value>
  </property>

 <property>
    <name>javax.jdo.option.ConnectionPassword</name>
    <value>xxxxxx</value>
  </property>
<property>
    <name>hive.metastore.schema.verification</name>
    <value>false</value>
</property>
<property>
	<name>hive.server2.thrift.bind.host</name>
	<value>master</value>
</property>
<property>
	<name>hive.metastore.uris</name>
	<value>thrift://master:9083</value>
</property>
<property>
		<name>hive.mapred.mode</name>
		<value>strict</value>
	</property>
	<property>
		<name>hive.exec.mode.local.auto</name>
		<value>true</value>
	</property>
	<property>
		<name>hive.fetch.task.conversion</name>
		<value>more</value>
	</property>
	    <property>
        <name>hive.server2.thrift.client.user</name>
        <value>root</value>
    </property>
    <property>
        <name>hive.server2.thrift.client.password</name>
        <value>32419</value>
    </property>

<property>
	<name>hive.metastore.event.db.notification.api.auth</name>
	<value>false</value>
</property>
</configuration>

脚本

start-beeline.sh
#!/bin/bash

/usr/local/src/hive/bin/beeline -u jdbc:hive2://master:10000 -n root -p xxxxxx
start-hiveserver2.sh
#!/bin/sh 

HIVE_HOME=/usr/local/src/hive

EXEC_CMD=hiveserver2

## 启动服务的时间
DATE_STR=`/bin/date '+%Y%m%d%H%M%S'`
# 日志文件名称(包含存储路径)
# HIVE_LOG=${HIVE_HOME}/logs/${EXEC_CMD}-${DATE_STR}.log
HIVE_LOG=${HIVE_HOME}/logs/${EXEC_CMD}.log

# 创建日志目录
/usr/bin/mkdir -p ${HIVE_HOME}/logs
## 启动服务
/usr/bin/nohup ${HIVE_HOME}/bin/hive --service ${EXEC_CMD} > ${HIVE_LOG} 2>&1 &
start-metastore.sh
#!/bin/sh 

HIVE_HOME=/usr/local/src/hive

EXEC_CMD=metastore

## 启动服务的时间
DATE_STR=`/bin/date '+%Y%m%d%H%M%S'`
# 日志文件名称(包含存储路径)
HIVE_LOG=${HIVE_HOME}/logs/${EXEC_CMD}-${DATE_STR}.log

# 创建日志目录
/usr/bin/mkdir -p ${HIVE_HOME}/logs
## 启动服务
/usr/bin/nohup ${HIVE_HOME}/bin/hive --service ${EXEC_CMD} > ${HIVE_LOG} 2>&1 &

数据字段介绍

在这里插入图片描述

Spark读取数据并加载至Hudi

SparkUtils

package cn.saddam.hudi.spark.didi

import org.apache.spark.sql.SparkSession

/**
  * SparkSQL操作数据(加载读取和保存写入)时工具类,比如获取SparkSession实例对象等
  */
object SparkUtils {
  /**
    * 构建SparkSession实例对象,默认情况下本地模式运行
    */
  def createSparkSession(clazz: Class[_], master: String = "local[4]", partitions: Int = 4): SparkSession ={
    SparkSession.builder()
      .appName(clazz.getSimpleName.stripSuffix("$"))
      .master(master)
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .config("spark.sql.shuffle.partitions", partitions)
      .getOrCreate()


  }
  
  def main(args: Array[String]): Unit = {
    val spark=createSparkSession(this.getClass)
    print(spark)

    Thread.sleep(1000000)

    spark.stop()
  }
}

readCsvFile

/**
    * 读取CSV格式文本文件数据,封装到DataFrame数据集
    */
  def readCsvFile(spark: SparkSession, path: String): DataFrame = {
    spark.read
      // 设置分隔符为逗号
      .option("sep", "\\t")
      // 文件首行为列名称
      .option("header", "true")
      // 依据数值自动推断数据类型
      .option("inferSchema", "true")
      // 指定文件路径
      .csv(path)
  }

process

/**
    * 对滴滴出行海口数据进行ETL转换操作:指定ts和partitionpath 列
    */
  def process(dataframe: DataFrame): DataFrame = {
    dataframe
      // 添加分区列:三级分区 -> yyyy/MM/dd
      .withColumn(
      "partitionpath",  // 列名称
      concat_ws("-", col("year"), col("month"), col("day"))
    )
      // 删除列:year, month, day
      .drop("year", "month", "day")
      // 添加timestamp列,作为Hudi表记录数据与合并时字段,使用发车时间
      .withColumn(
      "ts",
      unix_timestamp(col("departure_time"), "yyyy-MM-dd HH:mm:ss")
    )
  }

saveToHudi

/**
    * 将数据集DataFrame保存值Hudi表中,表的类型:COW
    */
  def saveToHudi(dataframe: DataFrame, table: String, path: String): Unit = {
    // 导入包
    import org.apache.hudi.DataSourceWriteOptions._
    import org.apache.hudi.config.HoodieWriteConfig._

    // 保存数据
    dataframe.write
      .mode(SaveMode.Overwrite)
      .format("hudi") // 指定数据源为Hudi
      .option("hoodie.insert.shuffle.parallelism", "2")
      .option("hoodie.upsert.shuffle.parallelism", "2")
      // Hudi 表的属性设置
      .option(PRECOMBINE_FIELD.key(), "ts")
      .option(RECORDKEY_FIELD.key(), "order_id")
      .option(PARTITIONPATH_FIELD.key(), "partitionpath")
      // 表的名称和路径
      .option(TBL_NAME.key(), table)
      .save(path)
  }

main方法

	System.setProperty("HADOOP_USER_NAME", "root")

  // 滴滴数据路径(file意思为本读文件系统)
  val datasPath: String = "file:/F:\\A-大数据学习\\Hudi\\Hudi-Learning\\datas\\DiDi\\dwv_order_make_haikou_1.txt"

  // Hudi中表的属性
  val hudiTableName: String = "tbl_didi_haikou"
  val hudiTablePath: String = "/hudi-warehouse/tbl_didi_haikou"

  def main(args: Array[String]): Unit = {
    //TODO step1. 构建SparkSession实例对象(集成Hudi和HDFS)
    val spark: SparkSession = SparkUtils.createSparkSession(this.getClass)
    import spark.implicits._

    //TODO step2. 加载本地CSV文件格式滴滴出行数据
    val didiDF: DataFrame = readCsvFile(spark, datasPath)
    //didiDF.printSchema()
    //didiDF.show(10, truncate = false)

    //TODO step3. 滴滴出行数据ETL处理并保存至Hudi表
    val etlDF: DataFrame = process(didiDF)
    //etlDF.printSchema()
    //etlDF.show(10, truncate = false)

    //TODO stpe4. 保存转换后数据至Hudi表
    saveToHudi(etlDF, hudiTableName, hudiTablePath)

    // stpe5. 应用结束,关闭资源
    spark.stop()
  }

Spark加载Hudi数据并需求统计

从Hudi表加载数据

/**
    * 从Hudi表加载数据,指定数据存在路径
    */
  def readFromHudi(spark: SparkSession, hudiTablePath: String): DataFrame ={
    // a. 指定路径,加载数据,封装至DataFrame
    val didiDF = spark.read.format("hudi").load(hudiTablePath)

    // b. 选择字段
    didiDF
      .select(
      "order_id", "product_id",
      "type", "traffic_type", "pre_total_fee",
        "start_dest_distance", "departure_time"
    )
  }

订单类型统计

/**
    *  订单类型统计,字段:product_id
    *  对海口市滴滴出行数据,按照订单类型统计,
    *  使用字段:product_id,其中值【1滴滴专车, 2滴滴企业专车, 3滴滴快车, 4滴滴企业快车】
    */
  def reportProduct(dataframe: DataFrame) = {
    // a. 按照产品线ID分组统计
    val reportDF: DataFrame = dataframe.groupBy("product_id").count()

    // b. 自定义UDF函数,转换名称

    val to_name =udf(
      // 1滴滴专车, 2滴滴企业专车, 3滴滴快车, 4滴滴企业快车
      (productId: Int) => {
        productId match {
          case 1 =>  "滴滴专车"
          case 2 =>  "滴滴企业专车"
          case 3 =>  "滴滴快车"
          case 4 =>  "滴滴企业快车"
        }
      }
    )

    // c. 转换名称,应用函数
    val resultDF: DataFrame = reportDF.select(
      to_name(col("product_id")).as("order_type"),
      col("count").as("total")
    )
//    resultDF.printSchema()
//    resultDF.show(10, truncate = false)
    resultDF.write
      .format("jdbc")
      .option("driver","com.mysql.jdbc.Driver")
      .option("url", "jdbc:mysql://192.168.184.135:3306/Hudi_DiDi?createDatabaseIfNotExist=true&characterEncoding=utf8&useSSL=false")
      .option("dbtable", "reportProduct")
      .option("user", "root")
      .option("password", "xxxxxx")
      .save()
  }

订单时效性统计

/**
    *  订单时效性统计,字段:type
    */
  def reportType(dataframe: DataFrame): DataFrame = {
    // a. 按照产品线ID分组统计
    val reportDF: DataFrame = dataframe.groupBy("type").count()
    // b. 自定义UDF函数,转换名称
    val to_name = udf(
      // 0实时,1预约
      (realtimeType: Int) => {
        realtimeType match {
          case 0 =>  "实时"
          case 1 =>  "预约"
        }
      }
    )
    // c. 转换名称,应用函数
    val resultDF: DataFrame = reportDF.select(
      to_name(col("type")).as("order_realtime"),
      col("count").as("total")
    )
//    resultDF.printSchema()
//    resultDF.show(10, truncate = false)
    resultDF
  }

交通类型统计

/**
    *  交通类型统计,字段:traffic_type
    */
  def reportTraffic(dataframe: DataFrame): DataFrame = {
    // a. 按照产品线ID分组统计
    val reportDF: DataFrame = dataframe.groupBy("traffic_type").count()

    // b. 自定义UDF函数,转换名称
    val to_name = udf(
      // 1企业时租,2企业接机套餐,3企业送机套餐,4拼车,5接机,6送机,302跨城拼车
      (trafficType: Int) => {
        trafficType match {
          case 0 =>  "普通散客"
          case 1 =>  "企业时租"
          case 2 =>  "企业接机套餐"
          case 3 =>  "企业送机套餐"
          case 4 =>  "拼车"
          case 5 =>  "接机"
          case 6 =>  "送机"
          case 302 =>  "跨城拼车"
          case _ => "未知"
        }
      }
    )

    // c. 转换名称,应用函数
    val resultDF: DataFrame = reportDF.select(
      to_name(col("traffic_type")).as("traffic_type"), //
      col("count").as("total") //
    )
//    resultDF.printSchema()
//    resultDF.show(10, truncate = false)
    resultDF
  }

订单价格统计

/**
    * 订单价格统计,将价格分阶段统计,字段:pre_total_fee
    */
  def reportPrice(dataframe: DataFrame): DataFrame = {
    val resultDF: DataFrame = dataframe
      .agg(
        // 价格:0 ~ 15
        sum(
          when(
            col("pre_total_fee").between(0, 15), 1
          ).otherwise(0)
        ).as("0~15"),
        // 价格:16 ~ 30
        sum(
          when(
            col("pre_total_fee").between(16, 30), 1
          ).otherwise(0)
        ).as("16~30"),
        // 价格:31 ~ 50
        sum(
          when(
            col("pre_total_fee").between(31, 50), 1
          ).otherwise(0)
        ).as("31~50"),
        // 价格:50 ~ 100
        sum(
          when(
            col("pre_total_fee").between(51, 100), 1
          ).otherwise(0)
        ).as("51~100"),
        // 价格:100+
        sum(
          when(
            col("pre_total_fee").gt(100), 1
          ).otherwise(0)
        ).as("100+")
      )

//    resultDF.printSchema()
//    resultDF.show(10, truncate = false)
    resultDF
  }

订单距离统计

/**
    * 订单距离统计,将价格分阶段统计,字段:start_dest_distance
    */
  def reportDistance(dataframe: DataFrame): DataFrame = {
    val resultDF: DataFrame = dataframe
      .agg(
        // 价格:0 ~ 15
        sum(
          when(
            col("start_dest_distance").between(0, 10000), 1
          ).otherwise(0)
        ).as("0~10km"),
        // 价格:16 ~ 30
        sum(
          when(
            col("start_dest_distance").between(10001, 20000), 1
          ).otherwise(0)
        ).as("10~20km"),
        // 价格:31 ~ 50
        sum(
          when(
            col("start_dest_distance").between(200001, 30000), 1
          ).otherwise(0)
        ).as("20~30km"),
        // 价格:50 ~ 100
        sum(
          when(
            col("start_dest_distance").between(30001, 5000), 1
          ).otherwise(0)
        ).as("30~50km"),
        // 价格:100+
        sum(
          when(
            col("start_dest_distance").gt(50000), 1
          ).otherwise(0)
        ).as("50+km")
      )

//    resultDF.printSchema()
//    resultDF.show(10, truncate = false)
    resultDF
  }

订单星期分组统计

/**
    *  订单星期分组统计,字段:departure_time
    */
  def reportWeek(dataframe: DataFrame): DataFrame = {

    // a. 自定义UDF函数,转换日期为星期
    val to_week: UserDefinedFunction = udf(
      // 0实时,1预约
      (dateStr: String) => {
        val format: FastDateFormat = FastDateFormat.getInstance("yyyy-MM-dd")
        val calendar: Calendar = Calendar.getInstance()

        val date: Date = format.parse(dateStr)
        calendar.setTime(date)

        val dayWeek: String = calendar.get(Calendar.DAY_OF_WEEK) match {
          case 1 => "星期日"
          case 2 => "星期一"
          case 3 => "星期二"
          case 4 => "星期三"
          case 5 => "星期四"
          case 6 => "星期五"
          case 7 => "星期六"
        }
        // 返回星期
        dayWeek
      }
    )

    // b. 转换日期为星期,并分组和统计
    val resultDF: DataFrame = dataframe
      .select(
        to_week(col("departure_time")).as("week")
      )
      .groupBy(col("week")).count()
      .select(
        col("week"), col("count").as("total") //
      )
//    resultDF.printSchema()
//    resultDF.show(10, truncate = false)
    resultDF
  }

main方法

// Hudi中表的属性
val hudiTablePath: String = "/hudi-warehouse/tbl_didi_haikou"

def main(args: Array[String]): Unit = {
    //TODO step1. 构建SparkSession实例对象(集成Hudi和HDFS)
    val spark: SparkSession = SparkUtils.createSparkSession(this.getClass, partitions = 8)
    import spark.implicits._

    //TODO step2. 依据指定字段从Hudi表中加载数据
    val hudiDF: DataFrame = readFromHudi(spark, hudiTablePath)
    //hudiDF.printSchema()
    //hudiDF.show(false)

    //TODO  step3. 按照业务指标进行数据统计分析

    // 指标1:订单类型统计
//    reportProduct(hudiDF)
//    SparkUtils.saveToMysql(spark,reportType(hudiDF),"reportProduct")


    // 指标2:订单时效统计
//    reportType(hudiDF).show(false)
//    SparkUtils.saveToMysql(spark,reportType(hudiDF),"reportType")

    // 指标3:交通类型统计
//    reportTraffic(hudiDF)
    SparkUtils.saveToMysql(spark,reportTraffic(hudiDF),"reportTraffic")

    // 指标4:订单价格统计
//    reportPrice(hudiDF)
    SparkUtils.saveToMysql(spark,reportPrice(hudiDF),"reportPrice")

    // 指标5:订单距离统计
//    reportDistance(hudiDF)
    SparkUtils.saveToMysql(spark,reportDistance(hudiDF),"reportDistance")

    // 指标6:日期类型:星期,进行统计
//    reportWeek(hudiDF)
    SparkUtils.saveToMysql(spark,reportWeek(hudiDF),"reportWeek")


    //TODO step4. 应用结束关闭资源
    spark.stop()
  }

五、Hive滴滴运营数据分析

Idea连接hive

启动metastore和hiveserver2和beeline

2-master-hive

root

xxxxxx

jdbc:hive2://192.168.184.135:10000

hive加载数据

# 1. 创建数据库
create database db_hudi

# 2. 使用数据库
use db_hudi

# 3. 创建外部表
CREATE EXTERNAL TABLE db_hudi.tbl_hudi_didi(
order_id bigint          ,
product_id int           ,
city_id int              ,
district int             ,
county int               ,
type int                 ,
combo_type int           ,
traffic_type int         ,
passenger_count int      ,
driver_product_id int    ,
start_dest_distance int  ,
arrive_time string       ,
departure_time string    ,
pre_total_fee double     ,
normal_time string       ,
bubble_trace_id string   ,
product_1level int       ,
dest_lng double          ,
dest_lat double          ,
starting_lng double      ,
starting_lat double      ,
partitionpath string     ,
ts bigint
)
PARTITIONED BY (date_str string)
ROW FORMAT SERDE
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT
'org.apache.hudi.hadoop.HoodieParquetInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
'/hudi-warehouse/tbl_didi_haikou'

# 5. 添加分区
alter table db_hudi.tbl_hudi_didi add if not exists partition(date_str='2017-5-22') location '/hudi-warehouse/tbl_didi_haikou/2017-5-22' ;
alter table db_hudi.tbl_hudi_didi add if not exists partition(date_str='2017-5-23') location '/hudi-warehouse/tbl_didi_haikou/2017-5-23' ;
alter table db_hudi.tbl_hudi_didi add if not exists partition(date_str='2017-5-24') location '/hudi-warehouse/tbl_didi_haikou/2017-5-24' ;
alter table db_hudi.tbl_hudi_didi add if not exists partition(date_str='2017-5-25') location '/hudi-warehouse/tbl_didi_haikou/2017-5-25' ;
alter table db_hudi.tbl_hudi_didi add if not exists partition(date_str='2017-5-26') location '/hudi-warehouse/tbl_didi_haikou/2017-5-26' ;
alter table db_hudi.tbl_hudi_didi add if not exists partition(date_str='2017-5-27') location '/hudi-warehouse/tbl_didi_haikou/2017-5-27' ;
alter table db_hudi.tbl_hudi_didi add if not exists partition(date_str='2017-5-28') location '/hudi-warehouse/tbl_didi_haikou/2017-5-28' ;
alter table db_hudi.tbl_hudi_didi add if not exists partition(date_str='2017-5-29') location '/hudi-warehouse/tbl_didi_haikou/2017-5-29' ;
alter table db_hudi.tbl_hudi_didi add if not exists partition(date_str='2017-5-30') location '/hudi-warehouse/tbl_didi_haikou/2017-5-30' ;
alter table db_hudi.tbl_hudi_didi add if not exists partition(date_str='2017-5-31') location '/hudi-warehouse/tbl_didi_haikou/2017-5-31' ;
alter table db_hudi.tbl_hudi_didi add if not exists partition(date_str='2017-6-1') location '/hudi-warehouse/tbl_didi_haikou/2017-6-1' ;
alter table db_hudi.tbl_hudi_didi add if not exists partition(date_str='2017-6-2') location '/hudi-warehouse/tbl_didi_haikou/2017-6-2' ;
alter table db_hudi.tbl_hudi_didi add if not exists partition(date_str='2017-6-3') location '/hudi-warehouse/tbl_didi_haikou/2017-6-3' ;
alter table db_hudi.tbl_hudi_didi add if not exists partition(date_str='2017-6-4') location '/hudi-warehouse/tbl_didi_haikou/2017-6-4' ;
alter table db_hudi.tbl_hudi_didi add if not exists partition(date_str='2017-6-5') location '/hudi-warehouse/tbl_didi_haikou/2017-6-5' ;
alter table db_hudi.tbl_hudi_didi add if not exists partition(date_str='2017-6-6') location '/hudi-warehouse/tbl_didi_haikou/2017-6-6' ;
alter table db_hudi.tbl_hudi_didi add if not exists partition(date_str='2017-6-7') location '/hudi-warehouse/tbl_didi_haikou/2017-6-7' ;
alter table db_hudi.tbl_hudi_didi add if not exists partition(date_str='2017-6-8') location '/hudi-warehouse/tbl_didi_haikou/2017-6-8' ;
alter table db_hudi.tbl_hudi_didi add if not exists partition(date_str='2017-6-9') location '/hudi-warehouse/tbl_didi_haikou/2017-6-9' ;
alter table db_hudi.tbl_hudi_didi add if not exists partition(date_str='2017-6-10') location '/hudi-warehouse/tbl_didi_haikou/2017-6-10' ;
  
# 设置非严格模式
set hive.mapred.mode = nonstrict ;

# SQL查询前10条数据
select order_id, product_id, type, traffic_type, pre_total_fee, start_dest_distance, departure_time from db_hudi.tbl_hudi_didi limit 10 ;

HiveQL 分析

SparkSQL连接Hudi 把hudi-spark3-bundle_2.12-0.9.0.jar拷贝到spark/jars

spark-sql  \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'

指标一:订单类型统计

WITH tmp AS (
  SELECT product_id, COUNT(1) AS total FROM db_hudi.tbl_hudi_didi GROUP BY product_id
)
SELECT 
  CASE product_id
    WHEN 1 THEN "滴滴专车"
    WHEN 2 THEN "滴滴企业专车"
    WHEN 3 THEN "滴滴快车"
    WHEN 4 THEN "滴滴企业快车"
  END AS order_type,
  total
FROM tmp ;
         
滴滴专车        15615
滴滴快车        1298383
Time taken: 2.721 seconds, Fetched 2 row(s)

指标二:订单时效性统计

WITH tmp AS (
  SELECT type AS order_realtime, COUNT(1) AS total FROM db_hudi.tbl_hudi_didi GROUP BY type
)
SELECT 
  CASE order_realtime
    WHEN 0 THEN "实时"
    WHEN 1 THEN "预约"
  END AS order_realtime,
  total
FROM tmp ;

预约    28488
实时    1285510
Time taken: 1.001 seconds, Fetched 2 row(s)

指标三:订单交通类型统计

WITH tmp AS (
  SELECT traffic_type, COUNT(1) AS total FROM db_hudi.tbl_hudi_didi GROUP BY traffic_type
)
SELECT 
  CASE traffic_type
   WHEN 0 THEN  "普通散客" 
   WHEN 1 THEN  "企业时租"
   WHEN 2 THEN  "企业接机套餐"
   WHEN 3 THEN  "企业送机套餐"
   WHEN 4 THEN  "拼车"
   WHEN 5 THEN  "接机"
   WHEN 6 THEN  "送机"
   WHEN 302 THEN  "跨城拼车"
   ELSE "未知"
  END AS traffic_type,
  total
FROM tmp ;

送机    37469
接机    19694
普通散客        1256835
Time taken: 1.115 seconds, Fetched 3 row(s)

指标四:订单价格统计

SELECT 
  SUM(
    CASE WHEN pre_total_fee BETWEEN 1 AND 15 THEN 1 ELSE 0 END
  ) AS 0_15,
  SUM(
    CASE WHEN pre_total_fee BETWEEN 16 AND 30 THEN 1 ELSE 0 END
  ) AS 16_30,
  SUM(
    CASE WHEN pre_total_fee BETWEEN 31 AND 50 THEN 1 ELSE 0 END
  ) AS 31_150,
  SUM(
    CASE WHEN pre_total_fee BETWEEN 51 AND 100 THEN 1 ELSE 0 END
  ) AS 51_100,
  SUM(
    CASE WHEN pre_total_fee > 100 THEN 1 ELSE 0 END
  )  AS 100_
FROM 
  db_hudi.tbl_hudi_didi;

六、Spark结构化流写入Hudi

启动zookeeper

--单机版本(此用)--
[root@node1 conf]# mv zoo_sample.cfg zoo.cfg
[root@node1 conf]# vim zoo.cfg
	修改内容:
	dataDir=/export/server/zookeeper/datas
[root@node1 conf]# mkdir -p /export/server/zookeeper/datas

#启动zookeeper
[root@master ~]# zkServer.sh start
JMX enabled by default
Using config: /usr/local/src/zookeeper/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED

#查看zookeeper状态信息
[root@master kafka]# zkServer.sh status
JMX enabled by default
Using config: /usr/local/src/zookeeper/bin/../conf/zoo.cfg
Mode: standalone

--分布式版本--
[root@node1 conf]# vim zoo.cfg
	修改内容:
	dataDir=/export/server/zookeeper/datas
	server.0=master:2888:3888
	server.1=slave1:2888:3888
	server.2=slave2:2888:3888

启动kafka

zookeeper.connect=192.168.184.135:2181/kafka

创建topic要加上/kafka --zookeeper master:2181/kafka

#server.properties修改
listeners=PLAINTEXT://192.168.184.135:9092
log.dirs=/usr/local/src/kafka/kafka-logs
zookeeper.connect=192.168.184.135:2181/kafka

#启动kafka
kafka-server-start.sh /usr/local/src/kafka/config/server.properties

#查看所有topic
kafka-topics.sh --list --zookeeper master:2181/kafka

#创建topic
kafka-topics.sh --create --zookeeper master:2181/kafka --replication-factor 1 --partitions 1 --topic order_topic

#删除topic
kafka-topics.sh --delete --zookeeper master:2181/kafka --topic order_topic

kafka tool工具

chroot path /kafka对应zookeeper连接地址后2181/kafka

在这里插入图片描述

订单数据模拟生成器

package cn.saddam.hudi.spark_streaming


import java.util.Properties

import org.apache.commons.lang3.time.FastDateFormat
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer
import org.json4s.jackson.Json

import scala.util.Random


/**
  * 订单实体类(Case Class)
  *
  * @param orderId     订单ID
  * @param userId      用户ID
  * @param orderTime   订单日期时间
  * @param ip          下单IP地址
  * @param orderMoney  订单金额
  * @param orderStatus 订单状态
  */
case class OrderRecord(
                        orderId: String,
                        userId: String,
                        orderTime: String,
                        ip: String,
                        orderMoney: Double,
                        orderStatus: Int
                      )

/**
  * 模拟生产订单数据,发送到Kafka Topic中
  *      Topic中每条数据Message类型为String,以JSON格式数据发送
  * 数据转换:
  *      将Order类实例对象转换为JSON格式字符串数据(可以使用json4s类库)
  */
object MockOrderProducer {

  def main(args: Array[String]): Unit = {

    var producer: KafkaProducer[String, String] = null
    try {
      // 1. Kafka Client Producer 配置信息
      val props = new Properties()
      props.put("bootstrap.servers", "192.168.184.135:9092")
      props.put("acks", "1")
      props.put("retries", "3")
      props.put("key.serializer", classOf[StringSerializer].getName)
      props.put("value.serializer", classOf[StringSerializer].getName)

      // 2. 创建KafkaProducer对象,传入配置信息
      producer = new KafkaProducer[String, String](props)

      // 随机数实例对象
      val random: Random = new Random()
      // 订单状态:订单打开 0,订单取消 1,订单关闭 2,订单完成 3
      val allStatus = Array(0, 1, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)

      while (true) {
        // 每次循环 模拟产生的订单数目
//        val batchNumber: Int = random.nextInt(1) + 1
        val batchNumber: Int = random.nextInt(1) + 20
        (1 to batchNumber).foreach { number =>
          val currentTime: Long = System.currentTimeMillis()
          val orderId: String = s"${getDate(currentTime)}%06d".format(number)
          val userId: String = s"${1 + random.nextInt(5)}%08d".format(random.nextInt(1000))
          val orderTime: String = getDate(currentTime, format = "yyyy-MM-dd HH:mm:ss.SSS")
          val orderMoney: String = s"${5 + random.nextInt(500)}.%02d".format(random.nextInt(100))
          val orderStatus: Int = allStatus(random.nextInt(allStatus.length))
          // 3. 订单记录数据
          val orderRecord: OrderRecord = OrderRecord(
            orderId, userId, orderTime, getRandomIp, orderMoney.toDouble, orderStatus
          )
          // 转换为JSON格式数据
          val orderJson = new Json(org.json4s.DefaultFormats).write(orderRecord)
          println(orderJson)
          // 4. 构建ProducerRecord对象
          val record = new ProducerRecord[String, String]("order-topic", orderId, orderJson)
          // 5. 发送数据:def send(messages: KeyedMessage[K,V]*), 将数据发送到Topic
          producer.send(record)
        }
//        Thread.sleep(random.nextInt(500) + 5000)
        Thread.sleep(random.nextInt(500))
      }
    } catch {
      case e: Exception => e.printStackTrace()
    } finally {
      if (null != producer) producer.close()
    }
  }

  /** =================获取当前时间================= */
  def getDate(time: Long, format: String = "yyyyMMddHHmmssSSS"): String = {
    val fastFormat: FastDateFormat = FastDateFormat.getInstance(format)
    val formatDate: String = fastFormat.format(time) // 格式化日期
    formatDate
  }

  /** ================= 获取随机IP地址 ================= */
  def getRandomIp: String = {
    // ip范围
    val range: Array[(Int, Int)] = Array(
      (607649792, 608174079), //36.56.0.0-36.63.255.255
      (1038614528, 1039007743), //61.232.0.0-61.237.255.255
      (1783627776, 1784676351), //106.80.0.0-106.95.255.255
      (2035023872, 2035154943), //121.76.0.0-121.77.255.255
      (2078801920, 2079064063), //123.232.0.0-123.235.255.255
      (-1950089216, -1948778497), //139.196.0.0-139.215.255.255
      (-1425539072, -1425014785), //171.8.0.0-171.15.255.255
      (-1236271104, -1235419137), //182.80.0.0-182.92.255.255
      (-770113536, -768606209), //210.25.0.0-210.47.255.255
      (-569376768, -564133889) //222.16.0.0-222.95.255.255
    )
    // 随机数:IP地址范围下标
    val random = new Random()
    val index = random.nextInt(10)
    val ipNumber: Int = range(index)._1 + random.nextInt(range(index)._2 - range(index)._1)

    // 转换Int类型IP地址为IPv4格式
    number2IpString(ipNumber)
  }

  /** =================将Int类型IPv4地址转换为字符串类型================= */
  def number2IpString(ip: Int): String = {
    val buffer: Array[Int] = new Array[Int](4)
    buffer(0) = (ip >> 24) & 0xff
    buffer(1) = (ip >> 16) & 0xff
    buffer(2) = (ip >> 8) & 0xff
    buffer(3) = ip & 0xff
    // 返回IPv4地址
    buffer.mkString(".")
  }
}

结构化流实时从Kafka消费数据

package cn.saddam.hudi.spark_streaming

import cn.saddam.hudi.spark.didi.SparkUtils
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.OutputMode

/*
基于StructuredStreaming结构化流实时从Kafka消费数据,经过ETL转换后,存储至Hudi表
*/
object HudiStructuredDemo {


  def main(args: Array[String]): Unit = {

    System.setProperty("HADOOP_USER_NAME", "root")

    //TODO step1、构建SparkSession实例对象
    val spark=SparkUtils.createSparkSession(this.getClass)

    //TODO step2、从Kafka实时消费数据
    val kafkaStreamDF: DataFrame =readFromKafka(spark,"order-topic")

    //TODO step3、提取数据,转换数据类型
    val streamDF: DataFrame = process(kafkaStreamDF)

    //TODO step4、保存数据至Hudi表中:COW(写入时拷贝)和MOR(读取时保存)
    saveToHudi(streamDF)


    //TODO step5、流式应用启动以后,等待终止
    spark.streams.active.foreach(query => println(s"Query: ${query.name} is Running ............."))
    spark.streams.awaitAnyTermination()
  }

  /**
    * 指定Kafka Topic名称,实时消费数据
    */
  def readFromKafka(spark: SparkSession, topicName: String) = {
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "192.168.184.135:9092")
      .option("subscribe", topicName)
      .option("startingOffsets", "latest")
      .option("maxOffsetsPerTrigger", 100000)
      .option("failOnDataLoss", "false")
      .load()
  }

  /**
    * 对Kafka获取数据,进行转换操作,获取所有字段的值,转换为String,以便保存Hudi表
    */
  def process(streamDF: DataFrame) = {
    /* 从Kafka消费数据后,字段信息如
             key -> binary,value -> binary
             topic -> string, partition -> int, offset -> long
             timestamp -> long, timestampType -> int
           */
    streamDF
      // 选择字段,转换类型为String
      .selectExpr(
      "CAST(key AS STRING) order_id", //
      "CAST(value AS STRING) message", //
      "topic", "partition", "offset", "timestamp"//
    )
      // 解析Message,提取字段内置
      .withColumn("user_id", get_json_object(col("message"), "$.userId"))
      .withColumn("order_time", get_json_object(col("message"), "$.orderTime"))
      .withColumn("ip", get_json_object(col("message"), "$.ip"))
      .withColumn("order_money", get_json_object(col("message"), "$.orderMoney"))
      .withColumn("order_status", get_json_object(col("message"), "$.orderStatus"))
      // 删除Message列
      .drop(col("message"))
      // 转换订单日期时间格式为Long类型,作为Hudi表中合并数据字段
      .withColumn("ts", to_timestamp(col("order_time"), "yyyy-MM-dd HH:mm:ss.SSSS"))
      // 订单日期时间提取分区日期:yyyyMMdd
      .withColumn("day", substring(col("order_time"), 0, 10))
  }

  /**
    * 将流式数据集DataFrame保存至Hudi表,分别表类型:COW和MOR
    */
  def saveToHudi(streamDF: DataFrame): Unit = {
    streamDF.writeStream
      .outputMode(OutputMode.Append())
      .queryName("query-hudi-streaming")
      // 针对每微批次数据保存
      .foreachBatch((batchDF: Dataset[Row], batchId: Long) => {
      println(s"============== BatchId: ${batchId} start ==============")
      writeHudiMor(batchDF) // TODO:表的类型MOR
    })
      .option("checkpointLocation", "/datas/hudi-spark/struct-ckpt-100")
      .start()
  }

  /**
    * 将数据集DataFrame保存到Hudi表中,表的类型:MOR(读取时合并)
    */
  def writeHudiMor(dataframe: DataFrame): Unit = {
    import org.apache.hudi.DataSourceWriteOptions._
    import org.apache.hudi.config.HoodieWriteConfig._
    import org.apache.hudi.keygen.constant.KeyGeneratorOptions._

    dataframe.write
      .format("hudi")
      .mode(SaveMode.Append)
      // 表的名称
      .option(TBL_NAME.key, "tbl_kafka_mor")
      // 设置表的类型
      .option(TABLE_TYPE.key(), "MERGE_ON_READ")
      // 每条数据主键字段名称
      .option(RECORDKEY_FIELD_NAME.key(), "order_id")
      // 数据合并时,依据时间字段
      .option(PRECOMBINE_FIELD_NAME.key(), "ts")
      // 分区字段名称
      .option(PARTITIONPATH_FIELD_NAME.key(), "day")
      // 分区值对应目录格式,是否与Hive分区策略一致
      .option(HIVE_STYLE_PARTITIONING_ENABLE.key(), "true")
      // 插入数据,产生shuffle时,分区数目
      .option("hoodie.insert.shuffle.parallelism", "2")
      .option("hoodie.upsert.shuffle.parallelism", "2")
      // 表数据存储路径
      .save("/hudi-warehouse/tbl_order_mor")
  }
}

订单数据查询分析(spark-shell)

//启动spark-shell
spark-shell \
--master local[2] \
--jars /usr/local/src/hudi/hudi-spark-jars/hudi-spark3-bundle_2.12-0.9.0.jar,/usr/local/src/hudi/hudi-spark-jars/spark-avro_2.12-3.0.1.jar,/usr/local/src/hudi/hudi-spark-jars/spark_unused-1.0.0.jar \
--conf "spark.serializer=org.apache.spark.serializer.KryoSerializer"

//指定Hudi表数据存储目录,加载数据
val ordersDF = spark.read.format("hudi").load("/hudi-warehouse/tbl_order_mor/day=2023-11-02")

//查看Schema信息
ordersDF.printSchema()

//查看订单表前10条数据,选择订单相关字段
ordersDF.select("order_id", "user_id", "order_time", "ip", "order_money", "order_status", "day").show(false)

//查看数据总条目数
ordersDF.count()

//注册临时视图
ordersDF.createOrReplaceTempView("view_tmp_orders")

//交易订单数据基本聚合统计:最大金额max、最小金额min、平均金额avg
spark.sql("""
  with tmp AS (
    SELECT CAST(order_money AS DOUBLE) FROM view_tmp_orders WHERE order_status = '0'
  )
  select 
    max(order_money) as max_money, 
    min(order_money) as min_money, 
    round(avg(order_money), 2) as avg_money 
  from tmp 
""").show()
+---------+---------+---------+
|max_money|min_money|avg_money|
+---------+---------+---------+
|   504.97|     5.05|   255.95|
+---------+---------+---------+

DeltaStreamer 工具类

在这里插入图片描述

七、Hudi集成SparkSQL

启动spark-sql

spark-sql \
--master local[2] \
--jars /usr/local/src/hudi/hudi-spark-jars/hudi-spark3-bundle_2.12-0.9.0.jar,/usr/local/src/hudi/hudi-spark-jars/spark-avro_2.12-3.0.1.jar,/usr/local/src/hudi/hudi-spark-jars/spark_unused-1.0.0.jar \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'

#Hudi默认upsert/insert/delete的并发度是1500,对于演示小规模数据集设置更小的并发度。
set hoodie.upsert.shuffle.parallelism = 1;
set hoodie.insert.shuffle.parallelism = 1;
set hoodie.delete.shuffle.parallelism = 1;

#设置不同步Hudi表元数据
set hoodie.datasource.meta.sync.enable=false;

创建表

--编写DDL语句,创建Hudi表,表的类型:MOR和分区表,主键为id,分区字段为dt,合并字段默认为ts。
create table test_hudi_table (
  id int,
  name string,
  price double,
  ts long,
  dt string
) using hudi
 partitioned by (dt)
 options (
  primaryKey = 'id',
  type = 'mor'
 )
location 'hdfs://192.168.184.135:9000/hudi-warehouse/test2_hudi_table' ;

--创建Hudi表后查看创建的Hudi表
show create table test_hudi_table; 

CREATE TABLE `default`.`test_hudi_table` (
  `_hoodie_commit_time` STRING,
  `_hoodie_commit_seqno` STRING,
  `_hoodie_record_key` STRING,
  `_hoodie_partition_path` STRING,
  `_hoodie_file_name` STRING,
  `id` INT,
  `name` STRING,
  `price` DOUBLE,
  `ts` BIGINT,
  `dt` STRING)
USING hudi
OPTIONS (
  `type` 'mor',
  `primaryKey` 'id')
PARTITIONED BY (dt)
LOCATION 'hdfs://192.168.184.135:9000/hudi-warehouse/test_hudi_table'

Time taken: 0.217 seconds, Fetched 1 row(s)

插入数据

java.lang.NoSuchMethodError: org.apache.spark.sql.catalyst.expressions.Alias.<init>(Lorg/apache/spark/sql/catalyst/expressions/Expression;Ljava/lang/String;Lorg/apache/spark/sql/catalyst/expressions/ExprId;Lscala/collection/Seq;Lscala/Option;)V

insert into test_hudi_table select 1 as id, 'hudi' as name, 10 as price, 1000 as ts, '2021-11-01' as dt;

insert into test_hudi_table select 2 as id, 'spark' as name, 20 as price, 1100 as ts, '2021-11-01' as dt;

insert into test_hudi_table select 3 as id, 'flink' as name, 30 as price, 1200 as ts, '2021-11-01' as dt;

insert into test_hudi_table select 4 as id, 'sql' as name, 40 as price, 1400 as ts, '2021-11-01' as dt;

查询数据

--使用SQL查询Hudi表数据,全表扫描查询
select * from test_hudi_table ;

--查看表中字段结构,使用DESC语句
desc test_hudi_table ;

--指定查询字段,查询表中前几天数据
SELECT _hoodie_record_key,_hoodie_partition_path, id, name, price, ts, dt FROM test_hudi_table ;

更新数据

--使用DELETE语句,将id=1的记录删除,命令如下
delete from test_hudi_table where id = 1 ;

--再次查询Hudi表数据,查看数据是否更新
SELECT COUNT(1) AS total from test_hudi_table WHERE id = 1;

DDL创建表

在spark-sql中编写DDL语句,创建Hudi表数据,核心三个属性参数

核心参数

在这里插入图片描述

Hudi表类型

在这里插入图片描述

创建COW类型Hudi表

在这里插入图片描述

创建MOR类型Hudi表
 options (
  primaryKey = 'id',
  type = 'mor'
 )
管理表与外部表

创建表时,指定location存储路径,表就是外部表
在这里插入图片描述

创建表时设置为分区表

在这里插入图片描述

支持使用CTAS

在这里插入图片描述

在实际应用使用时,合理选择创建表的方式,建议创建外部及分区表,便于数据管理和安全。

DDL-DML-DQL-DCL区别

一、DQL
DQL(data Query Language) 数据查询语言
就是我们最经常用到的 SELECT(查)语句 。主要用来对数据库中的数据进行查询操作。
二、DML
DML(data manipulation language)数据操纵语言:
就是我们最经常用到的 INSERT(增)、DELETE(删)、UPDATE(改)。主要用来对数据库重表的数据进行一些增删改操作。

三、DDL
DDL(data definition language)数据库定义语言:
就是我们在创建表的时候用到的一些sql,比如说:CREATE、ALTER、DROP等。主要是用在定义或改变表的结构,数据类型,表之间的链接和约束等初始化工作上。
 

四、DCL
DCL(Data Control Language)数据库控制语言:
是用来设置或更改数据库用户或角色权限的语句,包括(grant(授予权限),deny(拒绝权限),revoke(收回权限)等)语句。这个比较少用到。

MergeInto 语句

Merge Into Insert

--当不满足条件时(关联条件不匹配),插入数据到Hudi表中
merge into test_hudi_table as t0
using (
 select 1 as id, 'hadoop' as name, 1 as price, 9000 as ts, '2021-11-02' as dt
) as s0
on t0.id = s0.id
when not matched then insert * ;

Merge Into Update

--当满足条件时(关联条件匹配),对数据进行更新操作
merge into test_hudi_table as t0
using (
 select 1 as id, 'hadoop3' as name, 1000 as price, 9999 as ts, '2021-11-02' as dt
) as s0
on t0.id = s0.id
when matched then update set *

Merge Into Delete

--当满足条件时(关联条件匹配),对数据进行删除操作
merge into test_hudi_table t0
using (
 select 1 as s_id, 'hadoop3' as s_name, 8888 as s_price, 9999 as s_ts, '2021-11-02' as dt
) s0
on t0.id = s0.s_id
when matched and s_ts = 9999 then delete

八、Hudi集成Flink

[flink学习之sql-client之踩坑记录_flink sql-client_cclovezbf的博客-CSDN博客](https://blog.csdn.net/cclovezbf/article/details/127887149)

安装Flink 1.12

使用Flink 1.12版本,部署Flink Standalone集群模式,启动服务,步骤如下

step1、下载安装包

https://archive.apache.org/dist/flink/flink-1.12.2/

step2、上传软件包

step3、解压

step5、添加hadoop依赖jar包
往Flink中的lib目录里添加两个jar包:
flink-shaded-hadoop-3-uber-3.1.1.7.2.1.0-327-9.0.jar
commons-cli-1.4.jar

集群--添加完后,将lib目录分发给其他虚拟机。虚拟机上也需要添加上面两个jar包

下载仓库分别是:
https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-hadoop-3-uber/3.1.1.7.2.1.0-327-9.0
https://mvnrepository.com/artifact/commons-cli/commons-cli/1.4


cd flink/lib

flink-shaded-hadoop-2-uber-2.7.5-10.0.jar

启动Flink

start-cluster.sh

[root@master lib]# jps
53121 StandaloneSessionClusterEntrypoint
3218 DataNode
2979 NameNode
53622 Jps
53401 TaskManagerRunner
28107 QuorumPeerMain
5918 RunJar

stop-cluster.sh

词频统计WordCount

flink run /usr/local/src/flink/examples/batch/WordCount.jar

java.lang.NoSuchMethodError: org.apache.commons.cli.Option.builder(Ljava/lang/String;)Lorg/apache/commons/cli/Option$Builder; 解决:flink/lib 下添加commons-cli-1.4.jar

Flink快速入门

环境准备

Jar包和配置文件

hudi-flink-bundle_2.12-0.9.0.jar

[root@master target]# cp hudi-flink-bundle_2.12-0.9.0.jar /usr/local/src/flink/lib
[root@master target]# pwd
/usr/local/src/hudi/packaging/hudi-flink-bundle/target

flink-conf.yaml

接下来使用Flink SQL Client提供SQL命令行与Hudi集成,需要启动Flink Standalone集群,其中需要修改配置文件【$FLINK_HOME/conf/flink-conf.yaml】,TaskManager分配Slots数目为4。

taskmanager.numberOfTaskSlots: 4

修改后重启flink
第一步、启动HDFS集群
[root@master ~]# hadoop-daemon.sh start namenode 
[root@master ~]# hadoop-daemon.sh start datanode
第二步、启动Flink 集群

由于Flink需要连接HDFS文件系统,所以先设置HADOOP_CLASSPATH变量,再启动Standalone集群服务。

export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`

#启动flink
start-cluster.sh
第三步、启动Flink SQL Cli

embedded:嵌入式方式

#启动flink-sql客户端
sql-client.sh embedded shell

#在SQL Cli设置分析结果展示模式为tableau:
set execution.result-mode=tableau;

Flink SQL> set execution.result-mode=tableau;
[INFO] Session property has been set.


-------------------------------------exit报错---------------------------------------------
Flink SQL> exit;
[INFO] Exiting Flink SQL CLI Client...

Shutting down the session...
done.
Exception in thread "Thread-6" java.lang.IllegalStateException: Trying to access closed classloader. Please check if you store classloaders directly or indirectly in static fields.

解决办法: 在 flink 配置文件里 flink-conf.yaml设置
classloader.check-leaked-classloader: false
SQL Cli-tableau模式
set execution.result-mode=tableau;

创建表并插入数据

创建表

创建表:t1,数据存储到Hudi表中,底层HDFS存储,表的类型:MOR

CREATE TABLE t1(
  uuid VARCHAR(20), 
  name VARCHAR(10),
  age INT,
  ts TIMESTAMP(3),
  `partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
  'connector' = 'hudi',
  'path' = 'hdfs://192.168.184.135:9000/hudi-warehouse/hudi-t1',
  'write.tasks' = '1',
  'compaction.tasks' = '1', 
  'table.type' = 'MERGE_ON_READ',
    'hive-conf-dir' = '/usr/hdp/3.1.5.0-152/hive/conf'
);

show tables;

--查看表及结构
desc t1;

Flink SQL> desc t1;
+-----------+--------------+------+-----+--------+-----------+
|      name |         type | null | key | extras | watermark |
+-----------+--------------+------+-----+--------+-----------+
|      uuid |  VARCHAR(20) | true |     |        |           |
|      name |  VARCHAR(10) | true |     |        |           |
|       age |          INT | true |     |        |           |
|        ts | TIMESTAMP(3) | true |     |        |           |
| partition |  VARCHAR(20) | true |     |        |           |
+-----------+--------------+------+-----+--------+-----------+
5 rows in set

插入数据

t1中插入数据,其中t1表为分区表,字段名称:**partition**,插入数据时字段值有:【**part1、part2、part3和part4**】

INSERT INTO t1 VALUES('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),
('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),
('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),
('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),
('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),
('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),
('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),
('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');

批量插入报错:org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy 'dfs.client.block.write.replace-datanode-on-failure.policy' in its configuration.

hdfs-site.xml插入

<property>
 <name>dfs.client.block.write.replace-datanode-on-failure.enable</name>
<value>true</value>
</property>

<property>
<name>dfs.client.block.write.replace-datanode-on-failure.policy</name>
<value>NEVER</value>
</property>
查询数据
select * from t1;

select * from t1 where `partition` = 'par1' ;
更新数据

更新数据用insert--将id1的年龄更新为30岁

Flink SQL> select uuid,name,age from t1 where uuid='id1';
+-----+----------------------+----------------------+-------------+
| +/- |                 uuid |                 name |         age |
+-----+----------------------+----------------------+-------------+
|   + |                  id1 |                Danny |          27 |
+-----+----------------------+----------------------+-------------+

Flink SQL> insert into t1 values ('id1','Danny',30,TIMESTAMP '1970-01-01 00:00:01','par1');

Flink SQL> select uuid,name,age from t1 where uuid='id1';
+-----+----------------------+----------------------+-------------+
| +/- |                 uuid |                 name |         age |
+-----+----------------------+----------------------+-------------+
|   + |                  id1 |                Danny |          30 |
+-----+----------------------+----------------------+-------------+
Received a total of 1 rows

流式查询SteamingQuery

Flink插入Hudi表数据时,支持以流的方式加载数据,增量查询分析

创建表

流式表

CREATE TABLE t2(
  uuid VARCHAR(20), 
  name VARCHAR(10),
  age INT,
  ts TIMESTAMP(3),
  `partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
  'connector' = 'hudi',
  'path' = 'hdfs://192.168.184.135:9000/hudi-warehouse/hudi-t1',
  'table.type' = 'MERGE_ON_READ',
  'read.tasks' = '1', 
  'read.streaming.enabled' = 'true',
  'read.streaming.start-commit' = '20210316134557',
  'read.streaming.check-interval' = '4' 
    
);

--核心参数选项说明:
read.streaming.enabled 设置为 true,表明通过 streaming 的方式读取表数据; 
read.streaming.check-interval 指定了 source 监控新的 commits 的间隔为 4s;
table.type 设置表类型为 MERGE_ON_READ;
插入数据

重新打开一个终端,然后创建一个表非流式表,path与之前的地址一样,然后新的终端中插入新的数据id9,之前创建的t2表会流式插入新的数据

CREATE TABLE t1(
  uuid VARCHAR(20), 
  name VARCHAR(10),
  age INT,
  ts TIMESTAMP(3),
  `partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
  'connector' = 'hudi',
  'path' = 'hdfs://192.168.184.135:9000/hudi-warehouse/hudi-t1',
  'write.tasks' = '1',
  'compaction.tasks' = '1', 
  'table.type' = 'MERGE_ON_READ'
);

insert into t1 values ('id9','test',27,TIMESTAMP '1970-01-01 00:00:01','par5');

insert into t1 values ('id10','saddam',23,TIMESTAMP '2023-11-05 23:07:01','par5');

Flink SQL Writer

Flink SQL集成Kafka

第一步、创建Topic
#启动zookeeper
[root@master ~]# zkServer.sh start

#启动kafka
kafka-server-start.sh /usr/local/src/kafka/config/server.properties

#创建topic:flink-topic
kafka-topics.sh --create --zookeeper master:2181/kafka --replication-factor 1 --partitions 1 --topic flink-topic

#工具创建
.....
第二步、启动HDFS集群
start-dfs.sh
第三步、启动Flink 集群
export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`

start-cluster.sh
第四步、启动Flink SQL Cli

采用指定参数【-j xx.jar】方式加载hudi-flink集成包

sql-client.sh embedded -j /usr/local/src/flink/flink-Jars/flink-sql-connector-kafka_2.12-1.12.2.jar shell

set execution.result-mode=tableau;
第五步、创建表,映射到Kafka Topic

其中Kafka Topic中数据是CSV文件格式,有三个字段:user_id、item_id、behavior,从Kafka消费数据时,设置从最新偏移量开始

CREATE TABLE tbl_kafka (
  `user_id` BIGINT,
  `item_id` BIGINT,
  `behavior` STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'flink-topic',
  'properties.bootstrap.servers' = '192.168.184.135:9092',
  'properties.group.id' = 'test-group-10001',
  'scan.startup.mode' = 'latest-offset',
  'format' = 'csv'
);
第六步、实时向Topic发送数据,并在FlinkSQL查询

首先,在FlinkSQL页面,执行SELECT查询语句

Flink SQL> select * from tbl_kafka;

其次,通过Kafka Console Producer向Topic发送数据

-- 生产者发送数据
kafka-console-producer.sh --broker-list 192.168.184.135:9092 --topic flink-topic
/*
1001,90001,click
1001,90001,browser
1001,90001,click
1002,90002,click
1002,90003,click
1003,90001,order
1004,90001,order
*/

Flink SQL写入Hudi-IDEAJava开发

Maven开发pom文件
		<!-- Flink Client -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- Flink Table API & SQL -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hudi</groupId>
            <artifactId>hudi-flink-bundle_${scala.binary.version}</artifactId>
            <version>0.9.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-shaded-hadoop-2-uber</artifactId>
            <version>2.7.5-10.0</version>
        </dependency>

        <!-- MySQL/FastJson/lombok -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.32</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.68</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.12</version>
        </dependency>

        <!-- slf4j及log4j -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.7</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
            <scope>runtime</scope>
        </dependency>
消费Kafka数据

启动zookeeper,kafka,然后启动数据模拟生成器,再运行FlinkSQLKafakDemo

package flink_kafka_hudi;

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import static org.apache.flink.table.api.Expressions.*;

/**
 * 基于Flink SQL Connector实现:实时消费Topic中数据,转换处理后,实时存储Hudi表中
 */
public class FlinkSQLKafakDemo {
    public static void main(String[] args) {
        //TODO 1-获取表执行环境
        EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                .inStreamingMode()
                .build();
        TableEnvironment tableEnv = TableEnvironment.create(settings) ;

        //TODO 2-创建输入表, 从Kafka消费数据
        tableEnv.executeSql(
                "CREATE TABLE order_kafka_source (\n" +
                        "  orderId STRING,\n" +
                        "  userId STRING,\n" +
                        "  orderTime STRING,\n" +
                        "  ip STRING,\n" +
                        "  orderMoney DOUBLE,\n" +
                        "  orderStatus INT\n" +
                        ") WITH (\n" +
                        "  'connector' = 'kafka',\n" +
                        "  'topic' = 'order-topic',\n" +
                        "  'properties.bootstrap.servers' = '192.168.184.135:9092',\n" +
                        "  'properties.group.id' = 'gid-1001',\n" +
                        "  'scan.startup.mode' = 'latest-offset',\n" +
                        "  'format' = 'json',\n" +
                        "  'json.fail-on-missing-field' = 'false',\n" +
                        "  'json.ignore-parse-errors' = 'true'\n" +
                        ")"
        );

        //TODO 3-数据转换:提取订单时间中订单日期,作为Hudi表分区字段值
        Table etlTable = tableEnv
                .from("order_kafka_source")
                .addColumns(
                        $("orderTime").substring(0, 10).as("partition_day")
                )
                .addColumns(
                        $("orderId").substring(0, 17).as("ts")
                );
        tableEnv.createTemporaryView("view_order", etlTable);

        //TODO 4-查询数据
        tableEnv.executeSql("SELECT * FROM view_order").print();
    }
}
Flink写入hudi并读取

启动数据生成器用kafka消费

存入hudi
package flink_kafka_hudi;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import static org.apache.flink.table.api.Expressions.$;

/**
 * 基于Flink SQL Connector实现:实时消费Topic中数据,转换处理后,实时存储到Hudi表中
 */
public class FlinkSQLHudiDemo {

    public static void main(String[] args) {
        // 1-获取表执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // TODO: 由于增量将数据写入到Hudi表,所以需要启动Flink Checkpoint检查点
        env.setParallelism(1);
        env.enableCheckpointing(5000) ;
        EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                .inStreamingMode() // 设置流式模式
                .build();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);

        // 2-创建输入表,TODO:从Kafka消费数据
        tableEnv.executeSql(
                "CREATE TABLE order_kafka_source (\n" +
                        "  orderId STRING,\n" +
                        "  userId STRING,\n" +
                        "  orderTime STRING,\n" +
                        "  ip STRING,\n" +
                        "  orderMoney DOUBLE,\n" +
                        "  orderStatus INT\n" +
                        ") WITH (\n" +
                        "  'connector' = 'kafka',\n" +
                        "  'topic' = 'order-topic',\n" +
                        "  'properties.bootstrap.servers' = '192.168.184.135:9092',\n" +
                        "  'properties.group.id' = 'gid-1002',\n" +
                        "  'scan.startup.mode' = 'latest-offset',\n" +
                        "  'format' = 'json',\n" +
                        "  'json.fail-on-missing-field' = 'false',\n" +
                        "  'json.ignore-parse-errors' = 'true'\n" +
                        ")"
        );

        // 3-转换数据:可以使用SQL,也可以时Table API
        Table etlTable = tableEnv
                .from("order_kafka_source")
                // 添加字段:Hudi表数据合并字段,时间戳, "orderId": "20211122103434136000001" ->  20211122103434136
                .addColumns(
                        $("orderId").substring(0, 17).as("ts")
                )
                // 添加字段:Hudi表分区字段, "orderTime": "2021-11-22 10:34:34.136" -> 021-11-22
                .addColumns(
                        $("orderTime").substring(0, 10).as("partition_day")
                );
        tableEnv.createTemporaryView("view_order", etlTable);

        // 4-创建输出表,TODO: 关联到Hudi表,指定Hudi表名称,存储路径,字段名称等等信息
        tableEnv.executeSql(
                "CREATE TABLE order_hudi_sink (\n" +
                        "  orderId STRING PRIMARY KEY NOT ENFORCED,\n" +
                        "  userId STRING,\n" +
                        "  orderTime STRING,\n" +
                        "  ip STRING,\n" +
                        "  orderMoney DOUBLE,\n" +
                        "  orderStatus INT,\n" +
                        "  ts STRING,\n" +
                        "  partition_day STRING\n" +
                        ")\n" +
                        "PARTITIONED BY (partition_day)\n" +
                        "WITH (\n" +
                        "    'connector' = 'hudi',\n" +
                        "    'path' = 'hdfs://192.168.184.135:9000/hudi-warehouse/flink_hudi_order',\n" +
                        "    'table.type' = 'MERGE_ON_READ',\n" +
                        "    'write.operation' = 'upsert',\n" +
                        "    'hoodie.datasource.write.recordkey.field'= 'orderId',\n" +
                        "    'write.precombine.field' = 'ts',\n" +
                        "    'write.tasks'= '1'\n" +
                        ")"
        );

        // 5-通过子查询方式,将数据写入输出表
        tableEnv.executeSql(
                "INSERT INTO order_hudi_sink\n" +
                        "SELECT\n" +
                        "  orderId, userId, orderTime, ip, orderMoney, orderStatus, ts, partition_day\n" +
                        "FROM view_order"
        );

    }

}
读取hudi
package flink_kafka_hudi;

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;

/**
 * 基于Flink SQL Connector实现:从Hudi表中加载数据,编写SQL查询
 */
public class FlinkSQLReadDemo {

	public static void main(String[] args) {
		// 1-获取表执行环境
		EnvironmentSettings settings = EnvironmentSettings
			.newInstance()
			.inStreamingMode()
			.build();
		TableEnvironment tableEnv = TableEnvironment.create(settings) ;

		// 2-创建输入表,TODO:加载Hudi表数据
		tableEnv.executeSql(
			"CREATE TABLE order_hudi(\n" +
				"  orderId STRING PRIMARY KEY NOT ENFORCED,\n" +
				"  userId STRING,\n" +
				"  orderTime STRING,\n" +
				"  ip STRING,\n" +
				"  orderMoney DOUBLE,\n" +
				"  orderStatus INT,\n" +
				"  ts STRING,\n" +
				"  partition_day STRING\n" +
				")\n" +
				"PARTITIONED BY (partition_day)\n" +
				"WITH (\n" +
				"    'connector' = 'hudi',\n" +
				"    'path' = 'hdfs://192.168.184.135:9000/hudi-warehouse/flink_hudi_order',\n" +
				"    'table.type' = 'MERGE_ON_READ',\n" +
				"    'read.streaming.enabled' = 'true',\n" +
				"    'read.streaming.check-interval' = '4'\n" +
				")"
		);

		// 3-执行查询语句,读取流式读取Hudi表数据
		tableEnv.executeSql(
			"SELECT orderId, userId, orderTime, ip, orderMoney, orderStatus, ts, partition_day FROM order_hudi"
		).print() ;
	}

}

基于Flink实时增量入湖流程图

Flink SQL写入Hudi-FlinkSQL开发

集成环境
#修改$FLINK_HOME/conf/flink-conf.yaml文件
jobmanager.rpc.address: node1.itcast.cn
jobmanager.memory.process.size: 1024m
taskmanager.memory.process.size: 2048m
taskmanager.numberOfTaskSlots: 4

classloader.check-leaked-classloader: false
classloader.resolve-order: parent-first

execution.checkpointing.interval: 3000
state.backend: rocksdb
state.checkpoints.dir: hdfs://master:9000/flink/flink-checkpoints
state.savepoints.dir: hdfs://master:9000/flink/flink-savepoints
state.backend.incremental: true

#jar包
将Hudi与Flink集成jar包及其他相关jar包,放置到$FLINK_HOME/lib目录
hudi-flink-bundle_2.12-0.9.0.jar
flink-sql-connector-kafka_2.12-1.12.2.jar
flink-shaded-hadoop-3-uber-3.1.1.7.2.9.0-173-9.0.jar

#启动Standalone集群
export HADOOP_CLASSPATH=`/usr/local/src/hadoop/bin/hadoop classpath`
start-cluster.sh

#启动SQL Client,最好再次指定Hudi集成jar包
sql-client.sh embedded -j /usr/local/src/flink/lib/hudi-flink-bundle_2.12-0.9.0.jar shell

#设置属性
set execution.result-mode=tableau;
set execution.checkpointing.interval=3sec;
执行SQL

首先创建输入表:从Kafka消费数据,其次编写SQL提取字段值,再创建输出表:将数据保存值Hudi表中,最后编写SQL查询Hudi表数据。

第1步、创建输入表,关联Kafka Topic
-- 输入表:Kafka Source
CREATE TABLE order_kafka_source (
  orderId STRING,
  userId STRING,
  orderTime STRING,
  ip STRING,
  orderMoney DOUBLE,
  orderStatus INT
) WITH (
  'connector' = 'kafka',
  'topic' = 'order-topic',
  'properties.bootstrap.servers' = '192.168.184.135:9092',
  'properties.group.id' = 'gid-1001',
  'scan.startup.mode' = 'latest-offset',
  'format' = 'json',
  'json.fail-on-missing-field' = 'false',
  'json.ignore-parse-errors' = 'true'
);

SELECT orderId, userId, orderTime, ip, orderMoney, orderStatus FROM order_kafka_source ;
第2步、处理获取Kafka消息数据,提取字段值
SELECT 
  orderId, userId, orderTime, ip, orderMoney, orderStatus, 
  substring(orderId, 0, 17) AS ts, substring(orderTime, 0, 10) AS partition_day 
FROM order_kafka_source ;
第3步、创建输出表,保存数据至Hudi表,设置相关属性
-- 输出表:Hudi Sink
CREATE TABLE order_hudi_sink (
  orderId STRING PRIMARY KEY NOT ENFORCED,
  userId STRING,
  orderTime STRING,
  ip STRING,
  orderMoney DOUBLE,
  orderStatus INT,
  ts STRING,
  partition_day STRING
)
PARTITIONED BY (partition_day) 
WITH (
  'connector' = 'hudi',
  'path' = 'hdfs://192.168.184.135:9000/hudi-warehouse/order_hudi_sink',
  'table.type' = 'MERGE_ON_READ',
  'write.operation' = 'upsert',
  'hoodie.datasource.write.recordkey.field'= 'orderId',
  'write.precombine.field' = 'ts',
  'write.tasks'= '1',
  'compaction.tasks' = '1', 
  'compaction.async.enabled' = 'true', 
  'compaction.trigger.strategy' = 'num_commits', 
  'compaction.delta_commits' = '1'
);
第4步、使用INSERT INTO语句,将数据保存Hudi表
-- 子查询插入INSERT ... SELECT ...
INSERT INTO order_hudi_sink 
SELECT
  orderId, userId, orderTime, ip, orderMoney, orderStatus,
  substring(orderId, 0, 17) AS ts, substring(orderTime, 0, 10) AS partition_day 
FROM order_kafka_source ;

Flink CDC Hudi

CDC的全称是Change data Capture,即变更数据捕获,主要面向数据库的变更,是是数据库领域非常常见的技术,主要用于捕获数据库的一些变更,然后可以把变更数据发送到下游。

流程图

环境准备

#修改Hudi集成flink和Hive编译依赖版本配置
原因:现在版本Hudi,在编译的时候本身默认已经集成的flink-SQL-connector-hive的包,会和Flink lib包下的flink-SQL-connector-hive冲突。所以,编译的过程中只修改hive编译版本。

文件:hudi-0.9.0/packaging/hudi-flink-bundle/pom.xml

 <hive.version>3.1.2</hive.version> #hive版本修改为自己的版本
 
然后进入hudi-0.9.0/packaging/hudi-flink-bundle/ 再编译Hudi源码:
mvn clean install -DskipTests -Drat.skip=true -Dscala-2.12 -Dspark3 -Pflink-bundle-shade-hive3


#将Flink CDC MySQL对应jar包,放到$FLINK_HOME/lib目录中
flink-sql-connector-mysql-cdc-1.3.0.jar

#hive 需要用来读hudi数据,放到$HIVE_HOME/lib目录中
hudi-hadoop-mr-bundle-0.9.0.jar

#flink 用来写入和读取数据,将其拷贝至$FLINK_HOME/lib目录中,如果以前有同名jar包,先删除再拷贝。
hudi-flink-bundle_2.12-0.9.0.jar

#启动
dfs
zk
kafka
flink
metastore
hiveserver2

创建 MySQL 表

首先开启MySQL数据库binlog日志,再重启MySQL数据库服务,最后创建表。

第一步、开启MySQL binlog日志
[root@node1 ~]# vim /etc/my.cnf [mysqld]下面添加内容:

server-id=2
log-bin=mysql-bin
binlog_format=row
expire_logs_days=15
binlog_row_image=full
第二步、重启MySQL Server
service mysqld restart
第三步、在MySQL数据库,创建表
-- MySQL 数据库创建表
create database test;
create table test.tbl_users(
   id bigint auto_increment primary key,
   name varchar(20) null,
   birthday timestamp default CURRENT_TIMESTAMP not null,
   ts timestamp default CURRENT_TIMESTAMP not null
);

创建 CDC 表

先启动HDFS服务、Hive MetaStore和HiveServer2服务和Flink Standalone集群,再运行SQL Client,最后创建表关联MySQL表,采用MySQL CDC方式。

启动相关服务
#启动HDFS服务,分别启动NameNode和DataNode
hadoop-daemon.sh start namenode 
hadoop-daemon.sh start datanode

#启动Hive服务:元数据MetaStore和HiveServer2
hive/bin/start-metastore.sh 
hive/bin/start-hiveserver2.sh

#启动Flink Standalone集群
export HADOOP_CLASSPATH=`/usr/local/src/hadoop/bin/hadoop classpath`
start-cluster.sh

#启动SQL Client客户端
sql-client.sh embedded -j /usr/local/src/flink/lib/hudi-flink-bundle_2.12-0.9.0.jar shell
设置属性:
set execution.result-mode=tableau;
set execution.checkpointing.interval=3sec;
创建输入表,关联MySQL表,采用MySQL CDC 关联
-- Flink SQL Client创建表
CREATE TABLE users_source_mysql (
  id BIGINT PRIMARY KEY NOT ENFORCED,
  name STRING,
  birthday TIMESTAMP(3),
  ts TIMESTAMP(3)
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '192.168.184.135',
'port' = '3306',
'username' = 'root',
'password' = 'xxxxxx',
'server-time-zone' = 'Asia/Shanghai',
'debezium.snapshot.mode' = 'initial',
'database-name' = 'test',
'table-name' = 'tbl_users'
);
开启MySQL Client客户端,执行DML语句,插入数据
insert into test.tbl_users (name) values ('zhangsan')
insert into test.tbl_users (name) values ('lisi');
insert into test.tbl_users (name) values ('wangwu');
insert into test.tbl_users (name) values ('laoda');
insert into test.tbl_users (name) values ('laoer');
查询CDC表数据
-- 查询数据
select * from users_source_mysql;

创建视图

创建一个临时视图,增加分区列part,方便后续同步hive分区表。

-- 创建一个临时视图,增加分区列 方便后续同步hive分区表
create view view_users_cdc 
AS 
SELECT *, DATE_FORMAT(birthday, 'yyyyMMdd') as part FROM users_source_mysql;

select * from view_users_cdc;

创建 Hudi 表

创建 CDC Hudi Sink表,并自动同步hive分区表

CREATE TABLE users_sink_hudi_hive(
id bigint ,
name string,
birthday TIMESTAMP(3),
ts TIMESTAMP(3),
part VARCHAR(20),
primary key(id) not enforced
)
PARTITIONED BY (part)
with(
'connector'='hudi',
'path'= 'hdfs://192.168.184.135:9000/hudi-warehouse/users_sink_hudi_hive', 
'table.type'= 'MERGE_ON_READ',
'hoodie.datasource.write.recordkey.field'= 'id', 
'write.precombine.field'= 'ts',
'write.tasks'= '1',
'write.rate.limit'= '2000', 
'compaction.tasks'= '1', 
'compaction.async.enabled'= 'true',
'compaction.trigger.strategy'= 'num_commits',
'compaction.delta_commits'= '1',
'changelog.enabled'= 'true',
'read.streaming.enabled'= 'true',
'read.streaming.check-interval'= '3',
'hive_sync.enable'= 'true',
'hive_sync.mode'= 'hms',
'hive_sync.metastore.uris'= 'thrift://192.168.184.135:9083',
'hive_sync.jdbc_url'= 'jdbc:hive2://192.168.184.135:10000',
'hive_sync.table'= 'users_sink_hudi_hive',
'hive_sync.db'= 'default',
'hive_sync.username'= 'root',
'hive_sync.password'= 'xxxxxx',
'hive_sync.support_timestamp'= 'true'
);

此处Hudi表类型:MOR,Merge on Read (读时合并),快照查询+增量查询+读取优化查询(近实时)。使用列式存储(parquet)+行式文件(arvo)组合存储数据。更新记录到增量文件中,然后进行同步或异步压缩来生成新版本的列式文件。

数据写入Hudi表

编写INSERT语句,从视图中查询数据,再写入Hudi表中

insert into users_sink_hudi_hive select id, name, birthday, ts, part from view_users_cdc;

Hive 表查询

需要引入hudi-hadoop-mr-bundle-0.9.0.jar包,放到$HIVE_HOME/lib下

--启动Hive中beeline客户端,连接HiveServer2服务 已自动生产hudi MOR模式的2张表:

users_sink_hudi_hive_ro,ro 表全称 read oprimized table,对于 MOR 表同步的 xxx_ro 表,只暴露压缩后的 parquet。其查询方式和COW表类似。设置完 hiveInputFormat 之后 和普通的 Hive 表一样查询即可;

users_sink_hudi_hive_rt,rt表示增量视图,主要针对增量查询的rt表;ro表只能查parquet文件数据, rt表 parquet文件数据和log文件数据都可查;

查看自动生成表users_sink_hudi_hive_ro结构

show create table users_sink_hudi_hive_ro;

查看自动生成表的分区信息

show partitions users_sink_hudi_hive_ro ;
show partitions users_sink_hudi_hive_rt ;

查询Hive 分区表数据

set hive.exec.mode.local.auto=true;
set hive.input.format = org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat;
set hive.mapred.mode=nonstrict ;

select id, name, birthday, ts, `part` from users_sink_hudi_hive_ro;

指定分区字段过滤,查询数据

select name, ts from users_sink_hudi_hive_ro where part ='20231110';
select name, ts from users_sink_hudi_hive_rt where part ='20231110';

Hudi Client操作Hudi表

进入Hudi客户端命令行:hudi/hudi-cli/hudi-cli.sh

连接Hudi表,查看表信息

connect --path hdfs://192.168.184.135:9000/hudi-warehouse/users_sink_hudi_hive

查看Hudi compactions 计划

compactions show all

查看Hudi commit信息

commits show --sortBy "CommitTime"

help

hudi:users_sink_hudi_hive->help
2023-11-10 21:13:57,140 INFO core.SimpleParser: * ! - Allows execution of operating sy
* // - Inline comment markers (start of line only)
* ; - Inline comment markers (start of line only)
* bootstrap index showmapping - Show bootstrap index mapping
* bootstrap index showpartitions - Show bootstrap indexed partitions
* bootstrap run - Run a bootstrap action for current Hudi table
* clean showpartitions - Show partition level details of a clean
* cleans refresh - Refresh table metadata
* cleans run - run clean
* cleans show - Show the cleans
* clear - Clears the console
* cls - Clears the console
* clustering run - Run Clustering
* clustering schedule - Schedule Clustering
* commit rollback - Rollback a commit
* commits compare - Compare commits with another Hoodie table
* commit show_write_stats - Show write stats of a commit
* commit showfiles - Show file level details of a commit
* commit showpartitions - Show partition level details of a commit
* commits refresh - Refresh table metadata
* commits show - Show the commits
* commits showarchived - Show the archived commits
* commits sync - Compare commits with another Hoodie table
* compaction repair - Renames the files to make them consistent with the timeline as d when compaction unschedule fails partially.
* compaction run - Run Compaction for given instant time
* compaction schedule - Schedule Compaction
* compaction show - Shows compaction details for a specific compaction instant
* compaction showarchived - Shows compaction details for a specific compaction instant
* compactions show all - Shows all compactions that are in active timeline
* compactions showarchived - Shows compaction details for specified time window
* compaction unschedule - Unschedule Compaction
* compaction unscheduleFileId - UnSchedule Compaction for a fileId
* compaction validate - Validate Compaction
* connect - Connect to a hoodie table
* create - Create a hoodie table if not present
* date - Displays the local date and time
* desc - Describe Hoodie Table properties
* downgrade table - Downgrades a table
* exit - Exits the shell
* export instants - Export Instants and their metadata from the Timeline
* fetch table schema - Fetches latest table schema
* hdfsparquetimport - Imports Parquet table to a hoodie table
* help - List all commands usage
* metadata create - Create the Metadata Table if it does not exist
* metadata delete - Remove the Metadata Table
* metadata init - Update the metadata table from commits since the creation
* metadata list-files - Print a list of all files in a partition from the metadata
* metadata list-partitions - Print a list of all partitions from the metadata
* metadata refresh - Refresh table metadata
* metadata set - Set options for Metadata Table
* metadata stats - Print stats about the metadata
* quit - Exits the shell
* refresh - Refresh table metadata
* repair addpartitionmeta - Add partition metadata to a table, if not present
* repair corrupted clean files - repair corrupted clean files
* repair deduplicate - De-duplicate a partition path contains duplicates & produce rep
* repair overwrite-hoodie-props - Overwrite hoodie.properties with provided file. Riskon!
* savepoint create - Savepoint a commit
* savepoint delete - Delete the savepoint
* savepoint rollback - Savepoint a commit
* savepoints refresh - Refresh table metadata
* savepoints show - Show the savepoints
* script - Parses the specified resource file and executes its commands
* set - Set spark launcher env to cli
* show archived commits - Read commits from archived files and show details
* show archived commit stats - Read commits from archived files and show details
* show env - Show spark launcher env by key
* show envs all - Show spark launcher envs
* show fsview all - Show entire file-system view
* show fsview latest - Show latest file-system view
* show logfile metadata - Read commit metadata from log files
* show logfile records - Read records from log files
* show rollback - Show details of a rollback instant
* show rollbacks - List all rollback instants
* stats filesizes - File Sizes. Display summary stats on sizes of files
* stats wa - Write Amplification. Ratio of how many records were upserted to how many
* sync validate - Validate the sync by counting the number of records
* system properties - Shows the shell's properties
* temp_delete - Delete view name
* temp_query - query against created temp view
* temp delete - Delete view name
* temp query - query against created temp view
* temps_show - Show all views name
* temps show - Show all views name
* upgrade table - Upgrades a table
* utils loadClass - Load a class
* version - Displays shell version

九、Hudi案例实战一

七陌社交是一家专门做客服系统的公司, 传智教育是基于七陌社交构建客服系统,每天都有非常多的的用户进行聊天, 传智教育目前想要对这些聊天记录进行存储, 同时还需要对每天的消息量进行实时统计分析, 请您来设计如何实现数据的存储以及实时的数据统计分析工作。
需求如下:
1)  选择合理的存储容器进行数据存储, 并让其支持基本数据查询工作
2)  进行实时统计消息总量
3)  进行实时统计各个地区收 发 消息的总量
4)  进行实时统计每一位客户发送和接收消息数量

1、案例架构

实时采集七陌用户聊天信息数据,存储消息队列Kafka,再实时将数据处理转换,将其消息存储Hudi表中,最终使用Hive和Spark业务指标统计,基于FanBI可视化报表展示。
在这里插入图片描述

1、Apache Flume:分布式实时日志数据采集框架
由于业务端数据在不断的在往一个目录下进行生产, 我们需要实时的进行数据采集, 而flume就是一个专门用于数据采集工具,比如就可以监控某个目录下文件, 一旦有新的文件产生即可立即采集。

2、Apache Kafka:分布式消息队列
Flume 采集过程中, 如果消息非常的快, Flume也会高效的将数据进行采集, 那么就需要一个能够快速承载数据容器, 而且后续还要对数据进行相关处理转换操作, 此时可以将flume采集过来的数据写入到Kafka中,进行消息数据传输,而Kafka也是整个集团中心所有业务线统一使用的消息系统, 用来对接后续的业务(离线或者实时)。

3、Apache Spark:分布式内存计算引擎,离线和流式数据分析处理
整个七陌社交案例, 需要进行实时采集,那么此时也就意味着数据来一条就需要处理一条, 来一条处理一条, 此时就需要一些流式处理的框架,Structured Streaming或者Flink均可。
此外,七陌案例中,对每日用户消息数据按照业务指标分析,最终存储MySQL数据库中,选择SparkSQL。

4、Apache Hudi:数据湖框架
七陌用户聊天消息数据,最终存储到Hudi表(底层存储:HDFS分布式文件系统),统一管理数据文件,后期与Spark和Hive集成,进行业务指标分析。

5、Apache Hive:大数据数仓框架
与Hudi表集成,对七陌聊天数据进行分析,直接编写SQL即可。

6、MySQL:关系型数据库
将业务指标分析结果存储在MySQL数据库中,后期便于指标报表展示。

7、FineBI:报表工具
帆软公司的一款商业图表工具, 让图表制作更加简单

2、业务数据

用户聊天数据以文本格式存储日志文件中,包含20个字段,下图所示 各个字段之间分割符号为:**\001**
在这里插入图片描述

3、数据生成

运行jar包:7Mo_DataGen.jar,指定参数信息,模拟生成用户聊天信息数据,写入日志文件

第一步、创建原始文件目录

mkdir -p /usr/local/src/datas/7mo_init

第二步、上传模拟数据程序

#7mo_init目录下
7Mo_DataGen.jar
7Mo_Data.xlsx

第三步、创建模拟数据目录

mkdir -p /usr/local/src/datas/7mo_data
touch MOMO_DATA.dat #注意权限 需要写入这个文件

第四步、运行程序生成数据

# 1. 语法
java -jar /usr/local/src/datas/7mo_init/7Mo_DataGen.jar 原始数据路径 模拟数据路径 随机产生数据间隔ms时间
  	
# 2. 测试:每500ms生成一条数据
java -jar /usr/local/src/datas/7mo_init/7Mo_DataGen.jar \
/usr/local/src/datas/7mo_init/7Mo_Data.xlsx \
/usr/local/src/datas/7mo_data \
500

第五步、查看产生数据

[root@master 7mo_data]# pwd
/usr/local/src/datas/7mo_data
[root@master 7mo_data]# head -3 MOMO_DATA.dat

4、七陌数据采集

Apache Flume 是什么

在这里插入图片描述

 Aapche Flume是由Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的软件,网址:http://flume.apache.org/
 
 Flume的核心是把数据从数据源(source)收集过来,再将收集到的数据送到指定的目的地(sink)。为了保证输送的过程一定成功,在送到目的地(sink)之前,会先缓存数据(channel),待数据真正到达目的地(sink)后,flume在删除自己缓存的数据。
 
 当前Flume有两个版本:
Flume 0.9X版本的统称Flume OG(original generation)
Flume1.X版本的统称Flume NG(next generation)
由于Flume NG经过核心组件、核心配置以及代码架构重构,与Flume OG有很大不同。改动的另一原因是将Flume纳入 apache 旗下,Cloudera Flume 改名为 Apache Flume。

Apache Flume 运行机制

Flume系统中核心的角色是agent,agent本身是一个Java进程,一般运行在日志收集节点。

每一个agent相当于一个数据传递员,内部有三个组件:
Source:采集源,用于跟数据源对接,以获取数据;
Sink:下沉地,采集数据的传送目的,用于往下一级agent或者往最终存储系统传递数据;
Channel:agent内部的数据传输通道,用于从source将数据传递到sink;
在整个数据的传输的过程中,流动的是event,它是Flume内部数据传输的最基本单元。

在这里插入图片描述

event将传输的数据进行封装,如果是文本文件,通常是一行记录,event也是事务的基本单位。event从source,流向channel,再到sink,本身为一个字节数组,并可携带headers(头信息)信息。event代表着一个数据的最小完整单元,从外部数据源来,向外部的目的地去。

在这里插入图片描述

一个完整的event包括:event headers、event body,其中event body是flume收集到的日记记录。

Apache Flume 安装部署

#第一步、上传解压
# 上传
cd /export/software
rz apache-flume-1.9.0-bin.tar.gz

# 解压,重命名及创建软链接
tar -zxf apache-flume-1.9.0-bin.tar.gz -C /export/server

cd /export/server
mv apache-flume-1.9.0-bin flume-1.9.0-bin
ln -s flume-1.9.0-bin flume 

#第二步、修改flume-env.sh 
cd /export/server/flume/conf
mv flume-env.sh.template  flume-env.sh

vim flume-env.sh
# 22行:修改JDK路径
export JAVA_HOME=/export/server/jdk

下载软件包:
	http://www.apache.org/dyn/closer.lua/flume/1.9.0/apache-flume-1.9.0-bin.tar.gz
官方文档:
	https://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html
数据源source:
	https://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html
数据缓冲Channel:
	https://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html
			内存Memory
			文件File
数据终端sink:
	https://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html
			HDFS文件
			Kafka消息队列

Apache Flume 入门程序

需求说明: 监听服务器上某一个端口号(例如: 44444), 采集发向此端口的数据。

在这里插入图片描述

第1步、确定三大组件
source组件: 需要一个能够监听端口号的组件(网络组件)
使用Apache Flume提供的 : NetCat TCP Source

channel组件: 需要一个传输速度更快的管道(内存组件)
使用Apache Flume提供的 : Memory Channel

sink组件 : 此处我们只需要打印出来即可(日志组件)
使用Apache Flume提供的 : Logger Sink
第2步、编写采集配置文件

netcat_source_logger_sink.properties

# 第一部分: 定义这个agent中各组件的名字
a1.sources = r1
a1.sinks = k1
a1.channels = c1

#第二部分:  描述和配置source组件:r1
a1.sources.r1.type = netcat
a1.sources.r1.bind = master
a1.sources.r1.port = 44444

# 第三部分: 描述和配置sink组件:k1
a1.sinks.k1.type = logger

# 第四部分: 描述和配置channel组件,此处使用是内存缓存的方式
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 第五部分: 描述和配置source  channel   sink之间的连接关系
a1.sources.r1.channels = c1 
a1.sinks.k1.channel = c1
第3步、启动flume: 指定采集配置文件
flume-ng agent -n a1  \
-c /usr/local/src/flume/conf \
-f /usr/local/src/flume/conf/netcat_source_logger_sink.properties \
-Dflume.root.logger=INFO,console

参数说明: 	
  -c conf   指定flume自身的配置文件所在目录	
  -f conf/netcat-logger.con  指定我们所描述的采集方案	
  -n a1  指定我们这个agent的名字
第4步、agent启动之后, 连接测试
#安装telnet
yum -y install telnet

#随便在一个能跟agent节点通信的机器上,执行如下命令
telnet master  44444

5、七陌社交数据采集

七陌社交数据源特点:持续不断的向某一个目录下得一个文件输出消息。功能要求:实时监控某一个目录下的文件, 一旦发现有新的文件,立即将其进行采集到Kafka中。

在这里插入图片描述

第1步、确定三大组件
source组件:  能够监控某个目录的文件source组件   
使用Apache Flume提供的 : taildir

channel组件:  一般都是选择 内存组件 (更高效)
使用Apache Flume提供 : Memory Channel

sink组件:  输出到 Kafka的sink组件
使用Apache Flume提供:Kafka Sink
第2步、编写采集配置文件

7mo_mem_kafka.properties

# define a1
a1.sources = s1 
a1.channels = c1
a1.sinks = k1

#define s1
a1.sources.s1.type = TAILDIR
#指定一个元数据记录文件
a1.sources.s1.positionFile = /usr/local/src/flume/position/taildir_7mo_kafka.json
#将所有需要监控的数据源变成一个组
a1.sources.s1.filegroups = f1
#指定了f1是谁:监控目录下所有文件
a1.sources.s1.filegroups.f1 = /usr/local/src/datas/7mo_data/.*
#指定f1采集到的数据的header中包含一个KV对
a1.sources.s1.headers.f1.type = 7mo
a1.sources.s1.fileHeader = true

#define c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 1000

#define k1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = 7MO-MSG
a1.sinks.k1.kafka.bootstrap.servers = master:9092
a1.sinks.k1.kafka.flumeBatchSize = 10
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 100

#bind
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
第3步、启动ZK服务和Kafka服务
zkServer.sh start 
kafka-server-start.sh -daemon /usr/local/src/kafka/config/server.properties
第4步、创建topic
kafka-topics.sh --create \
--zookeeper master:2181/kafka \
--partitions 3 --replication-factor 1 \
--topic 7MO-MSG  
第5步、启动flume: 指定采集配置文件
flume-ng agent \
-n a1 \
-c /usr/local/src/flume/conf/ \
-f /usr/local/src/flume/conf/7mo_mem_kafka.properties \
-Dflume.root.logger=INFO,console
第6步、启动模拟数据
java -jar /usr/local/src/datas/7mo_init/7Mo_DataGen.jar \
/usr/local/src/datas/7mo_init/7Mo_Data.xlsx \
/usr/local/src/datas/7mo_data \
5000

6、实时存储七陌数据

编写Spark中流式程序:StructuredStreaming,实时从Kafka消费获取社交数据,经过转换(数据字段提取等)处理,最终保存到Hudi表中,表的格式:**ROM**。

在这里插入图片描述

在IDEA中创建一个模块

6.1-封装实体类

封装Momo聊天记录实体样例类CaseClass

package cn.saddam.hudi.momo

/**
  * 封装Momo聊天记录实体样例类CaseClass
  */
case class MomoMessage(
                        msg_time: String,
                        sender_nickyname: String,
                        sender_account: String,
                        sender_sex: String,
                        sender_ip: String,
                        sender_os: String,
                        sender_phone_type: String,
                        sender_network: String,
                        sender_gps: String,
                        receiver_nickyname: String,
                        receiver_ip: String,
                        receiver_account: String,
                        receiver_os: String,
                        receiver_phone_type: String,
                        receiver_network: String,
                        receiver_gps: String,
                        receiver_sex: String,
                        msg_type: String,
                        distance: String,
                        message: String
                      )

6.2-编写流式程序

创建SparkSession
 /**
    * 创建SparkSession会话实例对象,基本属性设置
    */
  def createSparkSession(clazz: Class[_], master: String = "local[4]", partitions: Int = 4): SparkSession ={
    SparkSession.builder()
      .appName(clazz.getSimpleName.stripSuffix("$"))
      .master(master)
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .config("spark.sql.shuffle.partitions", partitions)
      .getOrCreate()

  }
kafka消费数据
/**
    * 指定Kafka Topic名称,实时消费数据
    */
  def readFromKafka(spark: SparkSession, topicName: String): DataFrame = {
    spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "192.168.184.135:9092")
      .option("subscribe", topicName)
      .option("startingOffsets", "latest")
      .option("maxOffsetsPerTrigger", 100000)
      .option("failOnDataLoss", "false")
      .load()
  }
Kafka获取数据,进行转换操作
/**
    * 对Kafka获取数据,进行转换操作,获取所有字段的值,转换为String,以便保存Hudi表
    */
  def process(streamDF: DataFrame): DataFrame = {
    import streamDF.sparkSession.implicits._

    /*
       2021-11-25 20:52:58牛星海17870843110女156.35.36.204IOS 9.0华为 荣耀Play4T4G91.319474,29.033363成紫57.54.100.313946849234Android 6.0OPPO A11X4G84.696447,30.573691 女TEXT78.22KM有一种想见不敢见的伤痛,这一种爱还埋藏在我心中,让我对你的思念越来越浓,我却只能把你你放在我心中。
     */
    // 1-提取Message消息数据
    val messageStreamDF: DataFrame = streamDF.selectExpr("CAST(value AS STRING) message")

    // 2-解析数据,封装实体类
    val momoStreamDS: Dataset[MomoMessage] = messageStreamDF
      .as[String] // 转换为Dataset
      .map(message => {
      val array = message.split("\001")
      val momoMessage = MomoMessage(
        array(0), array(1), array(2), array(3), array(4), array(5), array(6), array(7),
        array(8), array(9), array(10), array(11), array(12), array(13), array(14),
        array(15), array(16), array(17), array(18), array(19)
      )
      // 返回实体类
      momoMessage
    })
      
       // 3-为Hudi表添加字段:主键id、数据聚合字段ts、分区字段day
    val hudiStreamDF = momoStreamDS.toDF()
      .withColumn("ts", unix_timestamp($"msg_time").cast(StringType))
      .withColumn(
        "message_id",
        concat($"sender_account", lit("_"), $"ts", lit("_"), $"receiver_account")
      )
      .withColumn("day", substring($"msg_time", 0, 10))

    hudiStreamDF
  }
测试方式,将数据打印到控制台
/**
    * 测试方式,将数据打印到控制台
    *
    * @param streamDF
    */
  def printToConsole(streamDF: DataFrame): Unit = {
    streamDF.writeStream
      .outputMode(OutputMode.Append())
      .queryName("query-hudi-momo")
      .format("console")
      .option("numRows", "10")
      .option("truncate", "false")
      .option("checkpointLocation", "/datas/hudi-struct-ckpt-0")
      .start()
  }
保存至Hudi表
/**
    * 将流式数据集DataFrame保存至Hudi表,分别表类型:COW和MOR
    */
  def saveToHudi(streamDF: DataFrame): Unit = {
    streamDF.writeStream
      .outputMode(OutputMode.Append())
      .queryName("query-hudi-7mo")
      // 针对每微批次数据保存
      .foreachBatch((batchDF: Dataset[Row], batchId: Long) => {
      println(s"============== BatchId: ${batchId} start ==============")
      writeHudiMor(batchDF) // TODO:表的类型MOR
    })
      .option("checkpointLocation", "/datas/hudi-spark/struct-ckpt-100")
      .start()
  }

  /**
    * 将数据集DataFrame保存到Hudi表中,表的类型:MOR(读取时合并)
    */
  def writeHudiMor(dataframe: DataFrame): Unit = {
    import org.apache.hudi.DataSourceWriteOptions._
    import org.apache.hudi.config.HoodieWriteConfig._
    import org.apache.hudi.keygen.constant.KeyGeneratorOptions._

    dataframe.write
      .format("hudi")
      .mode(SaveMode.Append)
      // 表的名称
      .option(TBL_NAME.key, "7mo_msg_hudi")
      // 设置表的类型
      .option(TABLE_TYPE.key(), "MERGE_ON_READ")
      // 每条数据主键字段名称
      .option(RECORDKEY_FIELD_NAME.key(), "message_id")
      // 数据合并时,依据时间字段
      .option(PRECOMBINE_FIELD_NAME.key(), "ts")
      // 分区字段名称
      .option(PARTITIONPATH_FIELD_NAME.key(), "day")
      // 分区值对应目录格式,是否与Hive分区策略一致
      .option(HIVE_STYLE_PARTITIONING_ENABLE.key(), "true")
      // 插入数据,产生shuffle时,分区数目
      .option("hoodie.insert.shuffle.parallelism", "2")
      .option("hoodie.upsert.shuffle.parallelism", "2")
      // 表数据存储路径
      .save("file:///F:\\momo\\7mo_msg_hudi")
  }
main方法
package cn.saddam.hudi.momo

import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.StringType
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.keygen.constant.KeyGeneratorOptions._

def main(args: Array[String]): Unit = {

    System.setProperty("HADOOP_USER_NAME", "root")

    //TODO step1、构建SparkSession实例对象
    val spark: SparkSession = createSparkSession(this.getClass)
    spark.sparkContext.setLogLevel("WARN")

    //TODO step2、从Kafka实时消费数据
    val kafkaStreamDF: DataFrame = readFromKafka(spark, "7MO-MSG")

    // step3、提取数据,转换数据类型
    val streamDF: DataFrame = process(kafkaStreamDF)

    // step4、保存数据至Hudi表中:MOR(读取时保存)
    //printToConsole(streamDF)
    saveToHudi(streamDF)

    // step5、流式应用启动以后,等待终止
    spark.streams.active.foreach(
      query => println(s"Query: ${query.name} is Running .............")
    )
    spark.streams.awaitAnyTermination()

  }

7、集成Hive指标分析

将Hudi表数据,与Hive表进行关联,使用beeline等客户端,编写SQL分析Hudi表数据。

在这里插入图片描述

7.1-创建Hive表

启动Hive MetaStore服务和HiveServer2服务,再启动beeline客户端

start-metastore.sh
start-hiveserver2.sh
start-beeline.sh

编写DDL语句,创建Hive表,关联Hudi表,其中设置InputFormat实现类。

--创建Hive表,映射到Hudi表
CREATE EXTERNAL TABLE db_hudi.tbl_7mo_hudi(
  msg_time             String,
  sender_nickyname     String,
  sender_account       String,
  sender_sex           String,
  sender_ip            String,
  sender_os            String,
  sender_phone_type    String,
  sender_network       String,
  sender_gps           String,
  receiver_nickyname   String,
  receiver_ip          String,
  receiver_account     String,
  receiver_os          String,
  receiver_phone_type  String,
  receiver_network     String,
  receiver_gps         String,
  receiver_sex         String,
  msg_type             String,
  distance             String,
  message              String,
  message_id           String,
  ts                   String       
)
PARTITIONED BY (day string)
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
STORED AS INPUTFORMAT 'org.apache.hudi.hadoop.HoodieParquetInputFormat' 
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION '/hudi-warehouse/7mo_msg_hudi' ;

--由于Hudi是分区表,需要手动添加分区信息
alter table db_hudi.tbl_7mo_hudi 
add if not exists partition(day = '2023-11-12') location '/hudi-warehouse/7mo_msg_hudi/day=2023-11-11' ;

alter table db_hudi.tbl_7mo_hudi 
add if not exists partition(day = '2023-11-12') location '/hudi-warehouse/7mo_msg_hudi/day=2023-11-12' ;

--查询数据
SELECT
  msg_time, sender_nickyname, receiver_nickyname, ts 
FROM db_hudi.tbl_7mo_hudi 
WHERE day = '2023-11-12'
limit 10 ;

load data inpath '/home/ec2-user/total/cn.txt' into table stu partition(cuntry='cn');

7.2-业务指标分析

hive优化

编写SQL,对七陌社交数据进行简易指标统计分析,由于数据流较小,设置本地模式执

set hive.exec.mode.local.auto=true;
set hive.mapred.mode=nonstrict;
set hive.exec.mode.local.auto.input.files.max=15;
指标1:统计总消息量
WITH tmp AS (
  SELECT COUNT(1) AS momo_total  FROM db_hudi.tbl_7mo_hudi
)
SELECT "全国" AS momo_name, momo_total FROM tmp;
指标2:统计各个用户, 发送消息量
WITH tmp AS (
  SELECT 
    sender_nickyname, COUNT(1) momo_total 
  FROM db_hudi.tbl_7mo_hudi 
  GROUP BY sender_nickyname
)
SELECT 
  sender_nickyname AS momo_name, momo_total
FROM tmp 
ORDER BY momo_total DESC LIMIT 10;
指标3:统计各个用户, 接收消息量
WITH tmp AS (
  SELECT 
    receiver_nickyname, COUNT(1) momo_total 
  FROM db_hudi.tbl_7mo_hudi 
  GROUP BY receiver_nickyname
)
SELECT 
  receiver_nickyname AS momo_name, momo_total  
FROM tmp 
ORDER BY momo_total DESC LIMIT 10;
指标4:统计男女发送信息量
SELECT 
  sender_sex, receiver_sex, COUNT(1) momo_total 
FROM db_hudi.tbl_7mo_hudi 
GROUP BY sender_sex, receiver_sex;

8、Spark 离线指标分析

编写SparkSQL程序,加载Hudi表数据封装到DataFrame中,按照业务指标需要,编写SQL分析数据,最终保存到MySQL数据库表中,流程示意图如下

在这里插入图片描述

8.1-需求说明

对七陌社交消息数据的实时统计操作, 如下统计需求:
1)、统计消息的总条数
2)、根据IP地址统计各个地区(省) 发送的消息数和接收的消息数
3)、统计七陌社交消息中各个用户发送多少条和接收多少条

8.2-创建数据库表

其中字段:7mo_category 表示指标类型:
1:表示全国信息量统计
2:表示各省份发送信息量统计
3:表示各省份接收信息量统计
4:表示用户发送信息量统计
5:表示用户接收信息量统计

将上述业务需求,最终结果存储到MySQL数据库1张表中:7mo.7mo_report

-- 创建数据库
CREATE DATABASE IF NOT EXISTS 7mo ;
-- 创建表
CREATE TABLE IF NOT EXISTS `7mo`.`7mo_report` (
    `7mo_name` varchar(100) NOT NULL,
    `7mo_total` bigint(20) NOT NULL,
    `7mo_category` varchar(100) NOT NULL,
    PRIMARY KEY (`7mo_name`, `7mo_category`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 ;

8.3-编写指标分析程序

创建对象object:MomoSQLHudi,编写MAIN方法,按照编写流式程序5个步骤,写出代码结构

解析IP地址及选择字段

解析IP地址为【省份】,推荐使用【**ip2region**】第三方工具库,官网网址:<https://gitee.com/lionsoul/ip2region/>,引入使用IP2Region第三方库

第一步、复制IP数据集【ip2region.db】到工程下的【dataset】目录

第二步、在Maven中添加依赖

 <dependency>
    <groupId>org.lionsoul</groupId>
    <artifactId>ip2region</artifactId>
    <version>1.7.2</version>
</dependency>

------------------------------------------
<dependency>
            <groupId>com.ggstar</groupId>
            <artifactId>ipdatabase</artifactId>
            <version>1.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.poi</groupId>
            <artifactId>poi-ooxml</artifactId>
            <version>3.14</version>
        </dependency>
        <dependency>
            <groupId>org.apache.poi</groupId>
            <artifactId>poi</artifactId>
            <version>3.14</version>
        </dependency>
加载Hudi表数据
package cn.saddam.hudi.momo

import org.apache.spark.sql.SparkSession

object MoMoReadHudi {
  def main(args: Array[String]): Unit = {
    val spark=SparkSession.builder()
      .master("local[2]")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .config("spark.sql.shuffle.partitions", 2)
      .config("spark.default.parallelism", 2)
      .getOrCreate()


    val hudiDF=spark.read
      .format("hudi")
      .load("hdfs://192.168.184.135:9000/hudi-warehouse/7mo_msg_hudi")
    
    hudiDF.write.save("file:home/saddam/Hudi-Study/datas/7mo_msg_hudi")
    
    spark.stop()
  }
}
清洗数据

解析ip地址,选择需要字段

package cn.saddam.hudi.momo

import com.ggstar.util.ip.IpHelper
import org.apache.spark.sql.SparkSession

object MoMoIpParse {
  def main(args: Array[String]): Unit = {
    val spark=SparkSession.builder()
      .master("local[2]")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .config("spark.sql.shuffle.partitions", 2)
      .config("spark.default.parallelism", 2)
      .getOrCreate()

    // 1-读取hudi数据
    val HUdiDF = spark.read
      .parquet("file:home/saddam/Hudi-Study/datas/7mo_msg_hudi")

    import org.apache.spark.sql.functions._
    import spark.implicits._

    // 2-注册udf
    val ip_to_province = udf(getCity _)

    // 3-解析IP
    val ipParseDF = HUdiDF
      .withColumn("sender_province", ip_to_province('sender_ip))
      .withColumn("receiver_province", ip_to_province('receiver_ip))
      .select(
        "day", "sender_nickyname", "receiver_nickyname", "sender_province", "receiver_province"
      )
    
    // 4-保存数据
    ipParseDF.write.save("file:home/saddam/Hudi-Study/datas/7mo_msg_hudi_IpParse")


    spark.stop()
  }

  /**
    * IP解析
    * @param ip
    * @return
    */
  def getCity(ip:String): String ={
    IpHelper.findRegionByIp(ip)
  }

}
指标分析
package cn.saddam.hudi.momo

import org.apache.spark.sql.{DataFrame, Dataset, Row, SaveMode, SparkSession}

object MoMoCalculation {
  def main(args: Array[String]): Unit = {
    val spark=SparkSession.builder()
      .master("local[2]")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .config("spark.sql.shuffle.partitions", 2)
      .config("spark.default.parallelism", 2)
      .getOrCreate()

    // TODO 读取hudi数据
    val HudiCleanDF = spark.read
      .parquet("file:home/saddam/Hudi-Study/datas/7mo_msg_hudi_IpParse")

    // TODO 指标分析
    //指标1:统计总消息量
//    reportAllTotalDF(HudiCleanDF).show()

    //指标2:统计各省份发送消息量
//    reportSenderProvinceTotalDF(HudiCleanDF).show()

    //指标3:统计各省份接收消息量
//    reportReceiverProvinceTotalDF(HudiCleanDF).show()

    //指标4:统计各个用户, 发送消息量
//    reportSenderNickyNameTotalDF(HudiCleanDF).show()

    //指标5:统计各个用户, 接收消息量
//    reportReceiverNickyNameTotalDF(HudiCleanDF).show()


    import org.apache.spark.sql.functions._

    // TODO 五个业务需求整合为一张表
    val reportTotalDF= reportAllTotalDF(HudiCleanDF)
      .union(reportSenderProvinceTotalDF(HudiCleanDF))
      .union(reportReceiverProvinceTotalDF(HudiCleanDF))
      .union(reportSenderNickyNameTotalDF(HudiCleanDF))
      .union(reportReceiverNickyNameTotalDF(HudiCleanDF))


    // TODO 保存报表至MySQL数据库
    reportTotalDF
      .coalesce(1)
      .write
      .mode(SaveMode.Append)
      .format("jdbc")
      .option("driver", "com.mysql.jdbc.Driver")
      .option("url",
        "jdbc:mysql://192.168.184.135:3306/?useUnicode=true&characterEncoding=utf-8&useSSL=false")
      .option("dbtable", "7mo.7mo_report")
      .option("user", "root")
      .option("password", "xxxxxx")
      .save()



    spark.stop()
  }

  //指标1:统计总消息量
  def reportAllTotalDF(dataframe: DataFrame): DataFrame = {
    val spark: SparkSession = dataframe.sparkSession

    dataframe.createOrReplaceTempView("view_tmp_etl")

    val reportAllTotalDF: DataFrame = spark.sql(
      """
        |WITH tmp AS (
        |  SELECT COUNT(1) AS 7mo_total  FROM view_tmp_etl
        |)
        |SELECT "全国" AS 7mo_name, 7mo_total, "1" AS 7mo_category FROM tmp;
        |""".stripMargin
    )
    reportAllTotalDF

  }

  //指标2:统计各省份发送消息量
  def reportSenderProvinceTotalDF(dataframe: DataFrame): DataFrame = {
    val spark: SparkSession = dataframe.sparkSession

    dataframe.createOrReplaceTempView("view_tmp_etl")

    val reportSenderProvinceTotalDF: DataFrame = spark.sql(
      """
        |WITH tmp AS (
        |  SELECT sender_province, COUNT(1) AS 7mo_total FROM view_tmp_etl GROUP BY sender_province
        |)
        |SELECT sender_province AS 7mo_name, 7mo_total, "2" AS 7mo_category FROM tmp;
        |""".stripMargin
    )

    reportSenderProvinceTotalDF

  }

  //指标3:统计各省份接收消息量
  def reportReceiverProvinceTotalDF(dataframe: DataFrame): DataFrame = {
    val spark: SparkSession = dataframe.sparkSession

    dataframe.createOrReplaceTempView("view_tmp_etl")

    val reportReceiverProvinceTotalDF: DataFrame = spark.sql(
      """
        |WITH tmp AS (
        |  SELECT receiver_province, COUNT(1) AS 7mo_total FROM view_tmp_etl GROUP BY receiver_province
        |)
        |SELECT receiver_province AS 7mo_name, 7mo_total, "3" AS 7mo_category FROM tmp;
        |""".stripMargin
    )

    reportReceiverProvinceTotalDF

  }

  //指标4:统计各个用户, 发送消息量
  def reportSenderNickyNameTotalDF(dataframe: DataFrame): DataFrame = {
    val spark: SparkSession = dataframe.sparkSession

    dataframe.createOrReplaceTempView("view_tmp_etl")

    val reportSenderNickyNameTotalDF: DataFrame = spark.sql(
      """
        |WITH tmp AS (
        |  SELECT sender_nickyname, COUNT(1) AS 7mo_total FROM view_tmp_etl GROUP BY sender_nickyname
        |)
        |SELECT sender_nickyname AS 7mo_name, 7mo_total, "4" AS 7mo_category FROM tmp;
        |""".stripMargin
    )

    reportSenderNickyNameTotalDF

  }

  //指标5:统计各个用户, 接收消息量
  def reportReceiverNickyNameTotalDF(dataframe: DataFrame): DataFrame= {
    val spark: SparkSession = dataframe.sparkSession

    dataframe.createOrReplaceTempView("view_tmp_etl")

    val reportReceiverNickyNameTotalDF: DataFrame = spark.sql(
      """
        |WITH tmp AS (
        |  SELECT receiver_nickyname, COUNT(1) AS 7mo_total FROM view_tmp_etl GROUP BY receiver_nickyname
        |)
        |SELECT receiver_nickyname AS 7mo_name, 7mo_total, "5" AS 7mo_category FROM tmp;
        |""".stripMargin
    )

    reportReceiverNickyNameTotalDF

  }
}
MYSQL数据统计

查询各个指标前5条数据

(SELECT 7mo_name, 7mo_total, "全国总信息量" AS "7mo.category"
FROM 7mo.7mo_report WHERE 7mo_category = 1)
UNION
(SELECT 7mo_name, 7mo_total, "省份发送信息量" AS "7mo.category"
FROM 7mo.7mo_report WHERE 7mo_category = 2 ORDER BY 7mo_total DESC LIMIT 5)
UNION
(SELECT 7mo_name, 7mo_total, "省份接收信息量" AS "7mo.category"
 FROM 7mo.7mo_report WHERE 7mo_category = 3 ORDER BY 7mo_total DESC LIMIT 5)
UNION
(SELECT 7mo_name, 7mo_total, "用户发送信息量" AS "7mo.category"
 FROM 7mo.7mo_report WHERE 7mo_category = 4 ORDER BY 7mo_total DESC LIMIT 5)
UNION
(SELECT 7mo_name, 7mo_total, "用户接收信息量" AS "7mo.category"
 FROM 7mo.7mo_report WHERE 7mo_category = 5 ORDER BY 7mo_total DESC LIMIT 5);

9、FineBI 报表可视化

使用FineBI,连接数据MySQL数据库,加载业务指标报表数据,以不同图表展示。

安装FineBI

报表

在这里插入图片描述

十、Hudi实战案例二

传智教育大数据分析平台,突出的是“真”,此项目是传智教育联合三方K12教育机构共同研发,并在上线发布后转换为课程,过程真实细致,采用主流的大数据技术和工具,主要针对客户(主要是学生)访问、咨询、线索、意向、报名、考勤等各类业务数据分析,根据分析结果优化平台的服务质量,最终满足用户的需求。教育大数据分析平台项目就是将大数据技术应用于教育培训领域,为企业经营提供数据支撑。

1、案例架构

本案例基于Flink SQL 与Hudi整合,将MySQL数据库业务数据,实时采集存储到Hudi表中,使用Presto和Flink SQL分别进行离线查询分析和流式查询数据,最后报表存储到MySQL数据库,使用FineBI整合进行可视化展示。

在这里插入图片描述

1、MySQL数据库:
传智教育客户业务数据存储及离线实时分析报表结果存储,对接可视化FineBI工具展示。

2、Flink SQL 引擎
使用Flink SQL中CDC实时采集MySQL数据库表数据到Hudi表,此外基于Flink SQL Connector整合Hudi与MySQL,数据存储和查询。

3、Apache Hudi:数据湖框架
传智教育业务数据,最终存储到Hudi表(底层存储:HDFS分布式文件系统),统一管理数据文件,后期与Spark和Hive集成,进行业务指标分析。

4、Presto 分析引擎
一个Facebook开源的分布式SQL查询引擎,适用于交互式分析查询,数据量支持GB到PB字节。
本案例中直接从Hudi表加载数据,其中依赖Hive MetaStore管理元数据。其中Presto可以集成多数据源,方便数据交互处理。

5、FineBI:报表工具
帆软公司的一款商业图表工具, 让图表制作更加简单

2、业务数据

2.1-客户信息表

CREATE TABLE IF NOT EXISTS itcast_nev.customer (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `customer_relationship_id` int(11) DEFAULT NULL COMMENT '当前意向id',
  `create_date_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `update_date_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '最后更新时间',
  `deleted` bit(1) NOT NULL DEFAULT b'0' COMMENT '是否被删除(禁用)',
  `name` varchar(128) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL DEFAULT '' COMMENT '姓名',
  `idcard` varchar(24) CHARACTER SET utf8 COLLATE utf8_bin DEFAULT '' COMMENT '身份证号',
  `birth_year` int(5) DEFAULT NULL COMMENT '出生年份',
  `gender` varchar(8) CHARACTER SET utf8 COLLATE utf8_bin DEFAULT 'MAN' COMMENT '性别',
  `phone` varchar(24) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL DEFAULT '' COMMENT '手机号',
  `wechat` varchar(32) CHARACTER SET utf8 COLLATE utf8_bin DEFAULT '' COMMENT '微信',
  `qq` varchar(32) CHARACTER SET utf8 COLLATE utf8_bin DEFAULT '' COMMENT 'qq号',
  `email` varchar(56) CHARACTER SET utf8 COLLATE utf8_bin DEFAULT '' COMMENT '邮箱',
  `area` varchar(128) COLLATE utf8mb4_unicode_ci DEFAULT '' COMMENT '所在区域',
  `leave_school_date` date DEFAULT NULL COMMENT '离校时间',
  `graduation_date` date DEFAULT NULL COMMENT '毕业时间',
  `bxg_student_id` varchar(64) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '博学谷学员ID,可能未关联到,不存在',
  `creator` int(11) DEFAULT NULL COMMENT '创建人ID',
  `origin_type` varchar(32) CHARACTER SET utf8 COLLATE utf8_bin DEFAULT NULL COMMENT '数据来源',
  `origin_channel` varchar(32) CHARACTER SET utf8 COLLATE utf8_bin DEFAULT NULL COMMENT '来源渠道',
  `tenant` int(11) NOT NULL DEFAULT '0',
  `md_id` int(11) DEFAULT '0' COMMENT '中台id',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;

预先导入客户信息数据至表中,使用命令:**source**

source /usr/local/src/mysql_sql/1-customer.sql ;

2.2-客户意向表

CREATE TABLE IF NOT EXISTS itcast_nev.customer_relationship(
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `create_date_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
  `update_date_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '最后更新时间',
  `deleted` bit(1) NOT NULL DEFAULT b'0' COMMENT '是否被删除(禁用)',
  `customer_id` int(11) NOT NULL DEFAULT '0' COMMENT '所属客户id',
  `first_id` int(11) DEFAULT NULL COMMENT '第一条客户关系id',
  `belonger` int(11) DEFAULT NULL COMMENT '归属人',
  `belonger_name` varchar(10) DEFAULT NULL COMMENT '归属人姓名',
  `initial_belonger` int(11) DEFAULT NULL COMMENT '初始归属人',
  `distribution_handler` int(11) DEFAULT NULL COMMENT '分配处理人',
  `business_scrm_department_id` int(11) DEFAULT '0' COMMENT '归属部门',
  `last_visit_time` datetime DEFAULT NULL COMMENT '最后回访时间',
  `next_visit_time` datetime DEFAULT NULL COMMENT '下次回访时间',
  `origin_type` varchar(32) CHARACTER SET utf8 COLLATE utf8_bin DEFAULT NULL COMMENT '数据来源',
  `itcast_school_id` int(11) DEFAULT NULL COMMENT '校区Id',
  `itcast_subject_id` int(11) DEFAULT NULL COMMENT '学科Id',
  `intention_study_type` varchar(32) CHARACTER SET utf8 COLLATE utf8_bin DEFAULT NULL COMMENT '意向学习方式',
  `anticipat_signup_date` date DEFAULT NULL COMMENT '预计报名时间',
  `level` varchar(8) DEFAULT NULL COMMENT '客户级别',
  `creator` int(11) DEFAULT NULL COMMENT '创建人',
  `current_creator` int(11) DEFAULT NULL COMMENT '当前创建人:初始==创建人,当在公海拉回时为 拉回人',
  `creator_name` varchar(32) DEFAULT '' COMMENT '创建者姓名',
  `origin_channel` varchar(32) CHARACTER SET utf8 COLLATE utf8_bin DEFAULT NULL COMMENT '来源渠道',
  `comment` varchar(255) CHARACTER SET utf8 COLLATE utf8_bin DEFAULT '' COMMENT '备注',
  `first_customer_clue_id` int(11) DEFAULT '0' COMMENT '第一条线索id',
  `last_customer_clue_id` int(11) DEFAULT '0' COMMENT '最后一条线索id',
  `process_state` varchar(32) DEFAULT NULL COMMENT '处理状态',
  `process_time` datetime DEFAULT NULL COMMENT '处理状态变动时间',
  `payment_state` varchar(32) DEFAULT NULL COMMENT '支付状态',
  `payment_time` datetime DEFAULT NULL COMMENT '支付状态变动时间',
  `signup_state` varchar(32) CHARACTER SET utf8 COLLATE utf8_bin DEFAULT NULL COMMENT '报名状态',
  `signup_time` datetime DEFAULT NULL COMMENT '报名时间',
  `notice_state` varchar(32) DEFAULT NULL COMMENT '通知状态',
  `notice_time` datetime DEFAULT NULL COMMENT '通知状态变动时间',
  `lock_state` bit(1) DEFAULT b'0' COMMENT '锁定状态',
  `lock_time` datetime DEFAULT NULL COMMENT '锁定状态修改时间',
  `itcast_clazz_id` int(11) DEFAULT NULL COMMENT '所属ems班级id',
  `itcast_clazz_time` datetime DEFAULT NULL COMMENT '报班时间',
  `payment_url` varchar(1024) DEFAULT '' COMMENT '付款链接',
  `payment_url_time` datetime DEFAULT NULL COMMENT '支付链接生成时间',
  `ems_student_id` int(11) DEFAULT NULL COMMENT 'ems的学生id',
  `delete_reason` varchar(64) DEFAULT NULL COMMENT '删除原因',
  `deleter` int(11) DEFAULT NULL COMMENT '删除人',
  `deleter_name` varchar(32) DEFAULT NULL COMMENT '删除人姓名',
  `delete_time` datetime DEFAULT NULL COMMENT '删除时间',
  `course_id` int(11) DEFAULT NULL COMMENT '课程ID',
  `course_name` varchar(64) DEFAULT NULL COMMENT '课程名称',
  `delete_comment` varchar(255) DEFAULT '' COMMENT '删除原因说明',
  `close_state` varchar(32) DEFAULT NULL COMMENT '关闭装填',
  `close_time` datetime DEFAULT NULL COMMENT '关闭状态变动时间',
  `appeal_id` int(11) DEFAULT NULL COMMENT '申诉id',
  `tenant` int(11) NOT NULL DEFAULT '0' COMMENT '租户',
  `total_fee` decimal(19,0) DEFAULT NULL COMMENT '报名费总金额',
  `belonged` int(11) DEFAULT NULL COMMENT '小周期归属人',
  `belonged_time` datetime DEFAULT NULL COMMENT '归属时间',
  `belonger_time` datetime DEFAULT NULL COMMENT '归属时间',
  `transfer` int(11) DEFAULT NULL COMMENT '转移人',
  `transfer_time` datetime DEFAULT NULL COMMENT '转移时间',
  `follow_type` int(4) DEFAULT '0' COMMENT '分配类型,0-自动分配,1-手动分配,2-自动转移,3-手动单个转移,4-手动批量转移,5-公海领取',
  `transfer_bxg_oa_account` varchar(64) DEFAULT NULL COMMENT '转移到博学谷归属人OA账号',
  `transfer_bxg_belonger_name` varchar(64) DEFAULT NULL COMMENT '转移到博学谷归属人OA姓名',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB  DEFAULT CHARSET=utf8;

预先导入客户意向数据至表中,使用命令:**source**

source /usr/local/src/mysql_sql/2-customer_relationship.sql ;

2.3-客户线索表

CREATE TABLE IF NOT EXISTS itcast_nev.customer_clue(
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `create_date_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `update_date_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '最后更新时间',
  `deleted` bit(1) NOT NULL DEFAULT b'0' COMMENT '是否被删除(禁用)',
  `customer_id` int(11) DEFAULT NULL COMMENT '客户id',
  `customer_relationship_id` int(11) DEFAULT NULL COMMENT '客户关系id',
  `session_id` varchar(48) COLLATE utf8_bin DEFAULT '' COMMENT '七陌会话id',
  `sid` varchar(48) COLLATE utf8_bin DEFAULT '' COMMENT '访客id',
  `status` varchar(16) COLLATE utf8_bin DEFAULT '' COMMENT '状态(undeal待领取 deal 已领取 finish 已关闭 changePeer 已流转)',
  `user` varchar(16) COLLATE utf8_bin DEFAULT '' COMMENT '所属坐席',
  `create_time` datetime DEFAULT NULL COMMENT '七陌创建时间',
  `platform` varchar(16) COLLATE utf8_bin DEFAULT '' COMMENT '平台来源 (pc-网站咨询|wap-wap咨询|sdk-app咨询|weixin-微信咨询)',
  `s_name` varchar(32) COLLATE utf8_bin DEFAULT '' COMMENT '用户名称',
  `seo_source` varchar(255) COLLATE utf8_bin DEFAULT '' COMMENT '搜索来源',
  `seo_keywords` varchar(255) COLLATE utf8_bin DEFAULT '' COMMENT '关键字',
  `ip` varchar(48) COLLATE utf8_bin DEFAULT '' COMMENT 'IP地址',
  `referrer` text COLLATE utf8_bin COMMENT '上级来源页面',
  `from_url` text COLLATE utf8_bin COMMENT '会话来源页面',
  `landing_page_url` text COLLATE utf8_bin COMMENT '访客着陆页面',
  `url_title` varchar(1024) COLLATE utf8_bin DEFAULT '' COMMENT '咨询页面title',
  `to_peer` varchar(255) COLLATE utf8_bin DEFAULT '' COMMENT '所属技能组',
  `manual_time` datetime DEFAULT NULL COMMENT '人工开始时间',
  `begin_time` datetime DEFAULT NULL COMMENT '坐席领取时间 ',
  `reply_msg_count` int(11) DEFAULT '0' COMMENT '客服回复消息数',
  `total_msg_count` int(11) DEFAULT '0' COMMENT '消息总数',
  `msg_count` int(11) DEFAULT '0' COMMENT '客户发送消息数',
  `comment` varchar(1024) COLLATE utf8_bin DEFAULT '' COMMENT '备注',
  `finish_reason` varchar(255) COLLATE utf8_bin DEFAULT '' COMMENT '结束类型',
  `finish_user` varchar(32) COLLATE utf8_bin DEFAULT '' COMMENT '结束坐席',
  `end_time` datetime DEFAULT NULL COMMENT '会话结束时间',
  `platform_description` varchar(255) COLLATE utf8_bin DEFAULT '' COMMENT '客户平台信息',
  `browser_name` varchar(255) COLLATE utf8_bin DEFAULT '' COMMENT '浏览器名称',
  `os_info` varchar(255) COLLATE utf8_bin DEFAULT '' COMMENT '系统名称',
  `area` varchar(255) COLLATE utf8_bin DEFAULT NULL COMMENT '区域',
  `country` varchar(16) COLLATE utf8_bin DEFAULT '' COMMENT '所在国家',
  `province` varchar(16) COLLATE utf8_bin DEFAULT '' COMMENT '省',
  `city` varchar(255) COLLATE utf8_bin DEFAULT '' COMMENT '城市',
  `creator` int(11) DEFAULT '0' COMMENT '创建人',
  `name` varchar(64) COLLATE utf8_bin DEFAULT '' COMMENT '客户姓名',
  `idcard` varchar(24) COLLATE utf8_bin DEFAULT '' COMMENT '身份证号',
  `phone` varchar(24) COLLATE utf8_bin DEFAULT '' COMMENT '手机号',
  `itcast_school_id` int(11) DEFAULT NULL COMMENT '校区Id',
  `itcast_school` varchar(128) COLLATE utf8_bin DEFAULT '' COMMENT '校区',
  `itcast_subject_id` int(11) DEFAULT NULL COMMENT '学科Id',
  `itcast_subject` varchar(128) COLLATE utf8_bin DEFAULT '' COMMENT '学科',
  `wechat` varchar(32) COLLATE utf8_bin DEFAULT '' COMMENT '微信',
  `qq` varchar(32) COLLATE utf8_bin DEFAULT '' COMMENT 'qq号',
  `email` varchar(56) COLLATE utf8_bin DEFAULT '' COMMENT '邮箱',
  `gender` varchar(8) COLLATE utf8_bin DEFAULT 'MAN' COMMENT '性别',
  `level` varchar(8) COLLATE utf8_bin DEFAULT NULL COMMENT '客户级别',
  `origin_type` varchar(32) COLLATE utf8_bin DEFAULT '' COMMENT '数据来源渠道',
  `information_way` varchar(32) COLLATE utf8_bin DEFAULT NULL COMMENT '资讯方式',
  `working_years` date DEFAULT NULL COMMENT '开始工作时间',
  `technical_directions` varchar(255) COLLATE utf8_bin DEFAULT '' COMMENT '技术方向',
  `customer_state` varchar(32) COLLATE utf8_bin DEFAULT '' COMMENT '当前客户状态',
  `valid` bit(1) DEFAULT b'0' COMMENT '该线索是否是网资有效线索',
  `anticipat_signup_date` date DEFAULT NULL COMMENT '预计报名时间',
  `clue_state` varchar(32) COLLATE utf8_bin DEFAULT 'NOT_SUBMIT' COMMENT '线索状态',
  `scrm_department_id` int(11) DEFAULT NULL COMMENT 'SCRM内部部门id',
  `superior_url` text COLLATE utf8_bin COMMENT '诸葛获取上级页面URL',
  `superior_source` varchar(1024) COLLATE utf8_bin DEFAULT NULL COMMENT '诸葛获取上级页面URL标题',
  `landing_url` text COLLATE utf8_bin COMMENT '诸葛获取着陆页面URL',
  `landing_source` varchar(1024) COLLATE utf8_bin DEFAULT NULL COMMENT '诸葛获取着陆页面URL来源',
  `info_url` text COLLATE utf8_bin COMMENT '诸葛获取留咨页URL',
  `info_source` varchar(255) COLLATE utf8_bin DEFAULT NULL COMMENT '诸葛获取留咨页URL标题',
  `origin_channel` varchar(32) COLLATE utf8_bin DEFAULT '' COMMENT '投放渠道',
  `course_id` int(32) DEFAULT NULL,
  `course_name` varchar(255) COLLATE utf8_bin DEFAULT NULL,
  `zhuge_session_id` varchar(500) COLLATE utf8_bin DEFAULT NULL,
  `is_repeat` int(4) NOT NULL DEFAULT '0' COMMENT '是否重复线索(手机号维度) 0:正常 1:重复',
  `tenant` int(11) NOT NULL DEFAULT '0' COMMENT '租户id',
  `activity_id` varchar(16) COLLATE utf8_bin DEFAULT NULL COMMENT '活动id',
  `activity_name` varchar(64) COLLATE utf8_bin DEFAULT NULL COMMENT '活动名称',
  `follow_type` int(4) DEFAULT '0' COMMENT '分配类型,0-自动分配,1-手动分配,2-自动转移,3-手动单个转移,4-手动批量转移,5-公海领取',
  `shunt_mode_id` int(11) DEFAULT NULL COMMENT '匹配到的技能组id',
  `shunt_employee_group_id` int(11) DEFAULT NULL COMMENT '所属分流员工组',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB  DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

预先导入客户意向数据至表中,使用命令:**source**

source /usr/local/src/mysql_sql/3-customer_clue.sql ;

2.4-线索申诉表

CREATE TABLE IF NOT EXISTS itcast_nev.customer_appeal
(
  id int auto_increment primary key COMMENT '主键',
  customer_relationship_first_id int not NULL COMMENT '第一条客户关系id',
  employee_id int NULL COMMENT '申诉人',
  employee_name varchar(64) NULL COMMENT '申诉人姓名',
  employee_department_id int NULL COMMENT '申诉人部门',
  employee_tdepart_id int NULL COMMENT '申诉人所属部门',
  appeal_status int(1) not NULL COMMENT '申诉状态,0:待稽核 1:无效 2:有效',
  audit_id int NULL COMMENT '稽核人id',
  audit_name varchar(255) NULL COMMENT '稽核人姓名',
  audit_department_id int NULL COMMENT '稽核人所在部门',
  audit_department_name varchar(255) NULL COMMENT '稽核人部门名称',
  audit_date_time datetime NULL COMMENT '稽核时间',
  create_date_time datetime DEFAULT CURRENT_TIMESTAMP NULL COMMENT '创建时间(申诉时间)',
  update_date_time timestamp DEFAULT CURRENT_TIMESTAMP NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  deleted bit DEFAULT b'0'  not NULL COMMENT '删除标志位',
  tenant int DEFAULT 0 not NULL
)ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;

预先导入客户意向数据至表中,使用命令:**source**

source /usr/local/src/mysql_sql/4-customer_appeal.sql ;

2.5-客户访问咨询记录表

create table IF NOT EXISTS itcast_nev.web_chat_ems(
  id int auto_increment primary key comment '主键' ,
  create_date_time timestamp null comment '数据创建时间',
  session_id varchar(48) default '' not null comment '七陌sessionId',
  sid varchar(48) collate utf8_bin  default '' not null comment '访客id',
  create_time datetime null comment '会话创建时间',
  seo_source varchar(255) collate utf8_bin default '' null comment '搜索来源',
  seo_keywords varchar(512) collate utf8_bin default '' null comment '关键字',
  ip varchar(48) collate utf8_bin  default '' null comment 'IP地址',
  area varchar(255) collate utf8_bin default '' null comment '地域',
  country varchar(16) collate utf8_bin  default '' null comment '所在国家',
  province varchar(16) collate utf8_bin  default '' null comment '省',
  city varchar(255) collate utf8_bin default '' null comment '城市',
  origin_channel varchar(32) collate utf8_bin  default '' null comment '投放渠道',
  user varchar(255) collate utf8_bin default '' null comment '所属坐席',
  manual_time datetime null comment '人工开始时间',
  begin_time datetime null comment '坐席领取时间 ',
  end_time datetime null comment '会话结束时间',
  last_customer_msg_time_stamp datetime null comment '客户最后一条消息的时间',
  last_agent_msg_time_stamp datetime null comment '坐席最后一下回复的时间',
  reply_msg_count int(12) default 0  null comment '客服回复消息数',
  msg_count int(12) default 0  null comment '客户发送消息数',
  browser_name varchar(255) collate utf8_bin default '' null comment '浏览器名称',
  os_info varchar(255) collate utf8_bin default '' null comment '系统名称'
);

预先导入客户意向数据至表中,使用命令:source

source /usr/local/src/mysql_sql/5-web_chat_ems.sql ;

3、Flink CDC 实时数据采集

在这里插入图片描述

3.1-开启MySQL binlog

[root@node1 ~]# vim /etc/my.cnf [mysqld]下面添加内容:

server-id=2
log-bin=mysql-bin
binlog_format=row
expire_logs_days=15
binlog_row_image=full

重启MySQL Server

service mysqld restart

下载Flink CDC MySQL Jar包

由于使用Flink 1.12.2版本,目前支持Flink CDC 版本:1.3.0,添加maven 依赖
<!-- https://mvnrepository.com/artifact/com.alibaba.ververica/flink-connector-mysql-cdc -->
<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>flink-connector-mysql-cdc</artifactId>
    <version>1.3.0</version>
</dependency>

如果使用Flink SQL Client,需要将jar包放到 $FLINK_HOME/lib 目录中
flink-sql-connector-mysql-cdc-1.3.0.jar

3.2-环境准备

实时数据采集,既可以编写Java程序,又可以直接运行DDL语句。

方式一:启动Flink SQL Client
-- 启动HDFS服务
hadoop-daemon.sh start namenode 
hadoop-daemon.sh start datanode

-- 启动Flink Standalone集群
export HADOOP_CLASSPATH=`/usr/local/src/hadoop/bin/hadoop classpath`
/usr/loacl/src/flink/bin/start-cluster.sh

-- 启动SQL Client
/usr/local/src/flink/bin/sql-client.sh embedded \
-j /usr/local/src/flink/lib/hudi-flink-bundle_2.12-0.9.0.jar shell

-- 设置属性
set execution.result-mode=tableau;
set execution.checkpointing.interval=3sec;
SET execution.runtime-mode = streaming; 
方式二:使用IDEA创建Maven工程
<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>flink-connector-mysql-cdc</artifactId>
    <version>1.3.0</version>
</dependency>

编写程序,实现数据实时采集同步,主要三个步骤:**输入表InputTable、输出表outputTable,查询插入INSERT...SELECT语句**

在这里插入图片描述

3.3-实时采集数据

基于Flink CDC 实时采集数据,需要创建输入Input和输出Output两张表,再编写INSERT...SELECT 插入查询语句

在这里插入图片描述

接下来将MySQL数据库5张业务数据表数据,实时采集同步到Hudi表中(存储HDFS文件系统)

3.3.1-客户信息表
第一步、输入表InputTable
create table tbl_customer_mysql (
  id STRING PRIMARY KEY NOT ENFORCED,
  customer_relationship_id STRING,
  create_date_time STRING,
  update_date_time STRING,
  deleted STRING,
  name STRING,
  idcard STRING,
  birth_year STRING,
  gender STRING,
  phone STRING,
  wechat STRING,
  qq STRING,
  email STRING,
  area STRING,
  leave_school_date STRING,
  graduation_date STRING,
  bxg_student_id STRING,
  creator STRING,
  origin_type STRING,
  origin_channel STRING,
  tenant STRING,
  md_id STRING
)WITH (
  'connector' = 'mysql-cdc',
  'hostname' = '192.168.184.135',
  'port' = '3306',
  'username' = 'root',
  'password' = 'xxxxxx',
  'server-time-zone' = 'Asia/Shanghai',
  'debezium.snapshot.mode' = 'initial',
  'database-name' = 'itcast_nev',
  'table-name' = 'customer'
);
第二步、输出表OutputTable
CREATE TABLE edu_customer_hudi(
  id STRING PRIMARY KEY NOT ENFORCED,
  customer_relationship_id STRING,
  create_date_time STRING,
  update_date_time STRING,
  deleted STRING,
  name STRING,
  idcard STRING,
  birth_year STRING,
  gender STRING,
  phone STRING,
  wechat STRING,
  qq STRING,
  email STRING,
  area STRING,
  leave_school_date STRING,
  graduation_date STRING,
  bxg_student_id STRING,
  creator STRING,
  origin_type STRING,
  origin_channel STRING,
  tenant STRING,
  md_id STRING,
  part STRING
)
PARTITIONED BY (part)
WITH(
  'connector'='hudi',
  'path'= 'hdfs://192.168.184.135:9000/hudi-warehouse/edu_customer_hudi', 
  'table.type'= 'MERGE_ON_READ',
  'hoodie.datasource.write.recordkey.field'= 'id', 
  'write.precombine.field'= 'create_date_time',
  'write.tasks'= '1',
  'write.rate.limit'= '2000', 
  'compaction.tasks'= '1', 
  'compaction.async.enabled'= 'true',
  'compaction.trigger.strategy'= 'num_commits',
  'compaction.delta_commits'= '1',
  'changelog.enabled'= 'true'
);
第三步、插入查询语句
insert into edu_customer_hudi 
select *, CAST(CURRENT_DATE AS STRING) AS part from tbl_customer_mysql;
3.3.2-客户意向表
第一步、输入表InputTable
create table tbl_customer_relationship_mysql (
  id string PRIMARY KEY NOT ENFORCED,
  create_date_time string,
  update_date_time string,
  deleted string,
  customer_id string,
  first_id string,
  belonger string,
  belonger_name string,
  initial_belonger string,
  distribution_handler string,
  business_scrm_department_id string,
  last_visit_time string,
  next_visit_time string,
  origin_type string,
  itcast_school_id string,
  itcast_subject_id string,
  intention_study_type string,
  anticipat_signup_date string,
  `level` string,
  creator string,
  current_creator string,
  creator_name string,
  origin_channel string,
  `comment` string,
  first_customer_clue_id string,
  last_customer_clue_id string,
  process_state string,
  process_time string,
  payment_state string,
  payment_time string,
  signup_state string,
  signup_time string,
  notice_state string,
  notice_time string,
  lock_state string,
  lock_time string,
  itcast_clazz_id string,
  itcast_clazz_time string,
  payment_url string,
  payment_url_time string,
  ems_student_id string,
  delete_reason string,
  deleter string,
  deleter_name string,
  delete_time string,
  course_id string,
  course_name string,
  delete_comment string,
  close_state string,
  close_time string,
  appeal_id string,
  tenant string,
  total_fee string,
  belonged string,
  belonged_time string,
  belonger_time string,
  transfer string,
  transfer_time string,
  follow_type string,
  transfer_bxg_oa_account string,
  transfer_bxg_belonger_name string
)WITH(
  'connector' = 'mysql-cdc',
  'hostname' = '192.168.184.135',
  'port' = '3306',
  'username' = 'root',
  'password' = 'xxxxxx',
  'server-time-zone' = 'Asia/Shanghai',
  'debezium.snapshot.mode' = 'initial',
  'database-name' = 'itcast_nev',
  'table-name' = 'customer_relationship'
);
第二步、输出表OutputTable
create table edu_customer_relationship_hudi(
  id string PRIMARY KEY NOT ENFORCED,
  create_date_time string,
  update_date_time string,
  deleted string,
  customer_id string,
  first_id string,
  belonger string,
  belonger_name string,
  initial_belonger string,
  distribution_handler string,
  business_scrm_department_id string,
  last_visit_time string,
  next_visit_time string,
  origin_type string,
  itcast_school_id string,
  itcast_subject_id string,
  intention_study_type string,
  anticipat_signup_date string,
  `level` string,
  creator string,
  current_creator string,
  creator_name string,
  origin_channel string,
  `comment` string,
  first_customer_clue_id string,
  last_customer_clue_id string,
  process_state string,
  process_time string,
  payment_state string,
  payment_time string,
  signup_state string,
  signup_time string,
  notice_state string,
  notice_time string,
  lock_state string,
  lock_time string,
  itcast_clazz_id string,
  itcast_clazz_time string,
  payment_url string,
  payment_url_time string,
  ems_student_id string,
  delete_reason string,
  deleter string,
  deleter_name string,
  delete_time string,
  course_id string,
  course_name string,
  delete_comment string,
  close_state string,
  close_time string,
  appeal_id string,
  tenant string,
  total_fee string,
  belonged string,
  belonged_time string,
  belonger_time string,
  transfer string,
  transfer_time string,
  follow_type string,
  transfer_bxg_oa_account string,
  transfer_bxg_belonger_name string,
  part STRING
)
PARTITIONED BY (part)
WITH(
  'connector'='hudi',
  'path'= 'hdfs://192.168.184.135:9000/hudi-warehouse/edu_customer_relationship_hudi', 
  'table.type'= 'MERGE_ON_READ',
  'hoodie.datasource.write.recordkey.field'= 'id', 
  'write.precombine.field'= 'create_date_time',
  'write.tasks'= '1',
  'write.rate.limit'= '2000', 
  'compaction.tasks'= '1', 
  'compaction.async.enabled'= 'true',
  'compaction.trigger.strategy'= 'num_commits',
  'compaction.delta_commits'= '1',
  'changelog.enabled'= 'true'
);
第三步、插入查询语句
insert into edu_customer_relationship_hudi 
select *, CAST(CURRENT_DATE AS STRING) AS part from tbl_customer_relationship_mysql;
3.3.3-客户线索表
第一步、输入表InputTable
create table tbl_customer_clue_mysql (
  id string PRIMARY KEY NOT ENFORCED,
  create_date_time string,
  update_date_time string,
  deleted string,
  customer_id string,
  customer_relationship_id string,
  session_id string,
  sid string,
  status string,
  `user` string,
  create_time string,
  platform string,
  s_name string,
  seo_source string,
  seo_keywords string,
  ip string,
  referrer string,
  from_url string,
  landing_page_url string,
  url_title string,
  to_peer string,
  manual_time string,
  begin_time string,
  reply_msg_count string,
  total_msg_count string,
  msg_count string,
  `comment` string,
  finish_reason string,
  finish_user string,
  end_time string,
  platform_description string,
  browser_name string,
  os_info string,
  area string,
  country string,
  province string,
  city string,
  creator string,
  name string,
  idcard string,
  phone string,
  itcast_school_id string,
  itcast_school string,
  itcast_subject_id string,
  itcast_subject string,
  wechat string,
  qq string,
  email string,
  gender string,
  `level` string,
  origin_type string,
  information_way string,
  working_years string,
  technical_directions string,
  customer_state string,
  valid string,
  anticipat_signup_date string,
  clue_state string,
  scrm_department_id string,
  superior_url string,
  superior_source string,
  landing_url string,
  landing_source string,
  info_url string,
  info_source string,
  origin_channel string,
  course_id string,
  course_name string,
  zhuge_session_id string,
  is_repeat string,
  tenant string,
  activity_id string,
  activity_name string,
  follow_type string,
  shunt_mode_id string,
  shunt_employee_group_id string
)WITH(
  'connector' = 'mysql-cdc',
  'hostname' = '192.168.184.135',
  'port' = '3306',
  'username' = 'root',
  'password' = 'xxxxxx',
  'server-time-zone' = 'Asia/Shanghai',
  'debezium.snapshot.mode' = 'initial',
  'database-name' = 'itcast_nev',
  'table-name' = 'customer_clue'
);
第二步、输出表OutputTable
create table edu_customer_clue_hudi (
  id string PRIMARY KEY NOT ENFORCED,
  create_date_time string,
  update_date_time string,
  deleted string,
  customer_id string,
  customer_relationship_id string,
  session_id string,
  sid string,
  status string,
  `user` string,
  create_time string,
  platform string,
  s_name string,
  seo_source string,
  seo_keywords string,
  ip string,
  referrer string,
  from_url string,
  landing_page_url string,
  url_title string,
  to_peer string,
  manual_time string,
  begin_time string,
  reply_msg_count string,
  total_msg_count string,
  msg_count string,
  `comment` string,
  finish_reason string,
  finish_user string,
  end_time string,
  platform_description string,
  browser_name string,
  os_info string,
  area string,
  country string,
  province string,
  city string,
  creator string,
  name string,
  idcard string,
  phone string,
  itcast_school_id string,
  itcast_school string,
  itcast_subject_id string,
  itcast_subject string,
  wechat string,
  qq string,
  email string,
  gender string,
  `level` string,
  origin_type string,
  information_way string,
  working_years string,
  technical_directions string,
  customer_state string,
  valid string,
  anticipat_signup_date string,
  clue_state string,
  scrm_department_id string,
  superior_url string,
  superior_source string,
  landing_url string,
  landing_source string,
  info_url string,
  info_source string,
  origin_channel string,
  course_id string,
  course_name string,
  zhuge_session_id string,
  is_repeat string,
  tenant string,
  activity_id string,
  activity_name string,
  follow_type string,
  shunt_mode_id string,
  shunt_employee_group_id string,
  part STRING
)
PARTITIONED BY (part)
WITH(
  'connector'='hudi',
  'path'= 'hdfs://192.168.184.135:9000/hudi-warehouse/edu_customer_clue_hudi', 
  'table.type'= 'MERGE_ON_READ',
  'hoodie.datasource.write.recordkey.field'= 'id', 
  'write.precombine.field'= 'create_date_time',
  'write.tasks'= '1',
  'write.rate.limit'= '2000', 
  'compaction.tasks'= '1', 
  'compaction.async.enabled'= 'true',
  'compaction.trigger.strategy'= 'num_commits',
  'compaction.delta_commits'= '1',
  'changelog.enabled'= 'true'
);
第三步、插入查询语句客户意向表
insert into edu_customer_clue_hudi 
select *, CAST(CURRENT_DATE AS STRING) AS part from tbl_customer_clue_mysql;
3.3.4-客户申诉表
第一步、输入表InputTable
create table tbl_customer_appeal_mysql (
  id string PRIMARY KEY NOT ENFORCED,
  customer_relationship_first_id string,
  employee_id string,
  employee_name string,
  employee_department_id string,
  employee_tdepart_id string,
  appeal_status string,
  audit_id string,
  audit_name string,
  audit_department_id string,
  audit_department_name string,
  audit_date_time string,
  create_date_time string,
  update_date_time string,
  deleted string,
  tenant string
)WITH (
  'connector' = 'mysql-cdc',
  'hostname' = '192.168.184.135',
  'port' = '3306',
  'username' = 'root',
  'password' = 'xxxxxx',
  'server-time-zone' = 'Asia/Shanghai',
  'debezium.snapshot.mode' = 'initial',
  'database-name' = 'itcast_nev',
  'table-name' = 'customer_appeal'
);
第二步、输出表OutputTable
create table edu_customer_appeal_hudi (
  id string PRIMARY KEY NOT ENFORCED,
  customer_relationship_first_id STRING,
  employee_id STRING,
  employee_name STRING,
  employee_department_id STRING,
  employee_tdepart_id STRING,
  appeal_status STRING,
  audit_id STRING,
  audit_name STRING,
  audit_department_id STRING,
  audit_department_name STRING,
  audit_date_time STRING,
  create_date_time STRING,
  update_date_time STRING,
  deleted STRING,
  tenant STRING,
  part STRING
)
PARTITIONED BY (part)
WITH(
  'connector'='hudi',
  'path'= 'hdfs://192.168.184.135:9000/hudi-warehouse/edu_customer_appeal_hudi', 
  'table.type'= 'MERGE_ON_READ',
  'hoodie.datasource.write.recordkey.field'= 'id', 
  'write.precombine.field'= 'create_date_time',
  'write.tasks'= '1',
  'write.rate.limit'= '2000', 
  'compaction.tasks'= '1', 
  'compaction.async.enabled'= 'true',
  'compaction.trigger.strategy'= 'num_commits',
  'compaction.delta_commits'= '1',
  'changelog.enabled'= 'true'
);
第三步、插入查询语句客户意向表
insert into edu_customer_appeal_hudi 
select *, CAST(CURRENT_DATE AS STRING) AS part from tbl_customer_appeal_mysql;
3.3.5-客户访问咨询记录表
第一步、输入表InputTable
create table tbl_web_chat_ems_mysql (
  id string PRIMARY KEY NOT ENFORCED,
  create_date_time string,
  session_id string,
  sid string,
  create_time string,
  seo_source string,
  seo_keywords string,
  ip string,
  area string,
  country string,
  province string,
  city string,
  origin_channel string,
  `user` string,
  manual_time string,
  begin_time string,
  end_time string,
  last_customer_msg_time_stamp string,
  last_agent_msg_time_stamp string,
  reply_msg_count string,
  msg_count string,
  browser_name string,
  os_info string
)WITH(
  'connector' = 'mysql-cdc',
  'hostname' = '192.168.184.135',
  'port' = '3306',
  'username' = 'root',
  'password' = 'xxxxx',
  'server-time-zone' = 'Asia/Shanghai',
  'debezium.snapshot.mode' = 'initial',
  'database-name' = 'itcast_nev',
  'table-name' = 'web_chat_ems'
);
第二步、输出表OutputTable
create table edu_web_chat_ems_hudi (
  id string PRIMARY KEY NOT ENFORCED,
  create_date_time string,
  session_id string,
  sid string,
  create_time string,
  seo_source string,
  seo_keywords string,
  ip string,
  area string,
  country string,
  province string,
  city string,
  origin_channel string,
  `user` string,
  manual_time string,
  begin_time string,
  end_time string,
  last_customer_msg_time_stamp string,
  last_agent_msg_time_stamp string,
  reply_msg_count string,
  msg_count string,
  browser_name string,
  os_info string,
  part STRING
)
PARTITIONED BY (part)
WITH(
  'connector'='hudi',
  'path'= 'hdfs://192.168.184.135:9000/hudi-warehouse/edu_web_chat_ems_hudi', 
  'table.type'= 'MERGE_ON_READ',
  'hoodie.datasource.write.recordkey.field'= 'id', 
  'write.precombine.field'= 'create_date_time',
  'write.tasks'= '1',
  'write.rate.limit'= '2000', 
  'compaction.tasks'= '1', 
  'compaction.async.enabled'= 'true',
  'compaction.trigger.strategy'= 'num_commits',
  'compaction.delta_commits'= '1',
  'changelog.enabled'= 'true'
);
第三步、插入查询语句
insert into edu_web_chat_ems_hudi 
select *, CAST(CURRENT_DATE AS STRING) AS part from tbl_web_chat_ems_mysql;
3.3.6-测试Hudi数据
-- 启动HDFS服务
hadoop-daemon.sh start namenode 
hadoop-daemon.sh start datanode

-- 启动Flink Standalone集群
export HADOOP_CLASSPATH=`/usr/local/src/hadoop/bin/hadoop classpath`
/usr/loacl/src/flink/bin/start-cluster.sh

-- 启动SQL Client
/usr/local/src/flink/bin/sql-client.sh embedded \
-j /usr/local/src/flink/lib/hudi-flink-bundle_2.12-0.9.0.jar shell

-- 设置属性
set execution.result-mode=tableau;
set execution.checkpointing.interval=3sec;
SET execution.runtime-mode = batch;   --此处不是steaming 是批处理

-- 1. 客户信息表【customer】
CREATE TABLE edu_customer(
  id STRING PRIMARY KEY NOT ENFORCED,
  customer_relationship_id STRING,
  create_date_time STRING,
  update_date_time STRING,
  deleted STRING,
  name STRING,
  idcard STRING,
  birth_year STRING,
  gender STRING,
  phone STRING,
  wechat STRING,
  qq STRING,
  email STRING,
  area STRING,
  leave_school_date STRING,
  graduation_date STRING,
  bxg_student_id STRING,
  creator STRING,
  origin_type STRING,
  origin_channel STRING,
  tenant STRING,
  md_id STRING,
  part STRING
)
PARTITIONED BY (part)
WITH(
  'connector'='hudi',
  'path'= 'hdfs://192.168.184.135:9000/hudi-warehouse/edu_customer_hudi', 
  'table.type'= 'MERGE_ON_READ',
  'hoodie.datasource.write.recordkey.field'= 'id', 
  'write.precombine.field'= 'create_date_time',
  'write.tasks'= '1',
  'write.rate.limit'= '2000', 
  'compaction.tasks'= '1', 
  'compaction.async.enabled'= 'true',
  'compaction.trigger.strategy'= 'num_commits',
  'compaction.delta_commits'= '1',
  'changelog.enabled'= 'true'
);

SELECT count(1) AS total FROM edu_customer ;
SELECT id, name, gender, create_date_time FROM edu_customer LIMIT 10;



-- 2. 客户意向表【customer_relationship】
create table edu_customer_relationship(
  id string PRIMARY KEY NOT ENFORCED,
  create_date_time string,
  update_date_time string,
  deleted string,
  customer_id string,
  first_id string,
  belonger string,
  belonger_name string,
  initial_belonger string,
  distribution_handler string,
  business_scrm_department_id string,
  last_visit_time string,
  next_visit_time string,
  origin_type string,
  itcast_school_id string,
  itcast_subject_id string,
  intention_study_type string,
  anticipat_signup_date string,
  `level` string,
  creator string,
  current_creator string,
  creator_name string,
  origin_channel string,
  `comment` string,
  first_customer_clue_id string,
  last_customer_clue_id string,
  process_state string,
  process_time string,
  payment_state string,
  payment_time string,
  signup_state string,
  signup_time string,
  notice_state string,
  notice_time string,
  lock_state string,
  lock_time string,
  itcast_clazz_id string,
  itcast_clazz_time string,
  payment_url string,
  payment_url_time string,
  ems_student_id string,
  delete_reason string,
  deleter string,
  deleter_name string,
  delete_time string,
  course_id string,
  course_name string,
  delete_comment string,
  close_state string,
  close_time string,
  appeal_id string,
  tenant string,
  total_fee string,
  belonged string,
  belonged_time string,
  belonger_time string,
  transfer string,
  transfer_time string,
  follow_type string,
  transfer_bxg_oa_account string,
  transfer_bxg_belonger_name string,
  part STRING
)
PARTITIONED BY (part)
WITH(
  'connector'='hudi',
  'path'= 'hdfs://192.168.184.135:9000/hudi-warehouse/edu_customer_relationship_hudi', 
  'table.type'= 'MERGE_ON_READ',
  'hoodie.datasource.write.recordkey.field'= 'id', 
  'write.precombine.field'= 'create_date_time',
  'write.tasks'= '1',
  'write.rate.limit'= '2000', 
  'compaction.tasks'= '1', 
  'compaction.async.enabled'= 'true',
  'compaction.trigger.strategy'= 'num_commits',
  'compaction.delta_commits'= '1',
  'changelog.enabled'= 'true'
);

SELECT count(1) AS total FROM edu_customer_relationship ;
SELECT id, course_name, origin_type, create_date_time FROM edu_customer_relationship LIMIT 10;


-- 3. 客户线索表【customer_clue】
create table edu_customer_clue(
  id string PRIMARY KEY NOT ENFORCED,
  create_date_time string,
  update_date_time string,
  deleted string,
  customer_id string,
  customer_relationship_id string,
  session_id string,
  sid string,
  status string,
  `user` string,
  create_time string,
  platform string,
  s_name string,
  seo_source string,
  seo_keywords string,
  ip string,
  referrer string,
  from_url string,
  landing_page_url string,
  url_title string,
  to_peer string,
  manual_time string,
  begin_time string,
  reply_msg_count string,
  total_msg_count string,
  msg_count string,
  `comment` string,
  finish_reason string,
  finish_user string,
  end_time string,
  platform_description string,
  browser_name string,
  os_info string,
  area string,
  country string,
  province string,
  city string,
  creator string,
  name string,
  idcard string,
  phone string,
  itcast_school_id string,
  itcast_school string,
  itcast_subject_id string,
  itcast_subject string,
  wechat string,
  qq string,
  email string,
  gender string,
  `level` string,
  origin_type string,
  information_way string,
  working_years string,
  technical_directions string,
  customer_state string,
  valid string,
  anticipat_signup_date string,
  clue_state string,
  scrm_department_id string,
  superior_url string,
  superior_source string,
  landing_url string,
  landing_source string,
  info_url string,
  info_source string,
  origin_channel string,
  course_id string,
  course_name string,
  zhuge_session_id string,
  is_repeat string,
  tenant string,
  activity_id string,
  activity_name string,
  follow_type string,
  shunt_mode_id string,
  shunt_employee_group_id string,
  part STRING
)
PARTITIONED BY (part)
WITH(
  'connector'='hudi',
  'path'= 'hdfs://192.168.184.135:9000/hudi-warehouse/edu_customer_clue_hudi', 
  'table.type'= 'MERGE_ON_READ',
  'hoodie.datasource.write.recordkey.field'= 'id', 
  'write.precombine.field'= 'create_date_time',
  'write.tasks'= '1',
  'write.rate.limit'= '2000', 
  'compaction.tasks'= '1', 
  'compaction.async.enabled'= 'true',
  'compaction.trigger.strategy'= 'num_commits',
  'compaction.delta_commits'= '1',
  'changelog.enabled'= 'true'
);

SELECT count(1) AS total FROM edu_customer_clue ;
SELECT id, customer_id, s_name, create_date_time FROM edu_customer_clue LIMIT 10;

-- 4.客户申诉表【customer_appeal】
create table edu_customer_appeal(
  id string PRIMARY KEY NOT ENFORCED,
  customer_relationship_first_id STRING,
  employee_id STRING,
  employee_name STRING,
  employee_department_id STRING,
  employee_tdepart_id STRING,
  appeal_status STRING,
  audit_id STRING,
  audit_name STRING,
  audit_department_id STRING,
  audit_department_name STRING,
  audit_date_time STRING,
  create_date_time STRING,
  update_date_time STRING,
  deleted STRING,
  tenant STRING,
  part STRING
)
PARTITIONED BY (part)
WITH(
  'connector'='hudi',
  'path'= 'hdfs://192.168.184.135:9000/hudi-warehouse/edu_customer_appeal_hudi', 
  'table.type'= 'MERGE_ON_READ',
  'hoodie.datasource.write.recordkey.field'= 'id', 
  'write.precombine.field'= 'create_date_time',
  'write.tasks'= '1',
  'write.rate.limit'= '2000', 
  'compaction.tasks'= '1', 
  'compaction.async.enabled'= 'true',
  'compaction.trigger.strategy'= 'num_commits',
  'compaction.delta_commits'= '1',
  'changelog.enabled'= 'true'
);

SELECT count(1) AS total FROM edu_customer_appeal ;
SELECT id, employee_id, employee_name, create_date_time FROM edu_customer_appeal LIMIT 10;

-- 5. 客服访问咨询记录表【web_chat_ems】
create table edu_web_chat_ems (
  id string PRIMARY KEY NOT ENFORCED,
  create_date_time string,
  session_id string,
  sid string,
  create_time string,
  seo_source string,
  seo_keywords string,
  ip string,
  area string,
  country string,
  province string,
  city string,
  origin_channel string,
  `user` string,
  manual_time string,
  begin_time string,
  end_time string,
  last_customer_msg_time_stamp string,
  last_agent_msg_time_stamp string,
  reply_msg_count string,
  msg_count string,
  browser_name string,
  os_info string,
  part STRING
)
PARTITIONED BY (part)
WITH(
  'connector'='hudi',
  'path'= 'hdfs://192.168.184.135:9000/hudi-warehouse/edu_web_chat_ems_hudi', 
  'table.type'= 'MERGE_ON_READ',
  'hoodie.datasource.write.recordkey.field'= 'id', 
  'write.precombine.field'= 'create_date_time',
  'compaction.tasks'= '1', 
  'compaction.async.enabled'= 'true',
  'compaction.trigger.strategy'= 'num_commits',
  'compaction.delta_commits'= '1',
  'changelog.enabled'= 'true'
);

SELECT count(1) AS total FROM edu_web_chat_ems ;
SELECT id, session_id, ip, province FROM edu_web_chat_ems LIMIT 10;

4、Presto 即席分析

使用Presto 分析Hudi表数据,最终将结果直接存储到MySQL数据库表中,示意图如下

在这里插入图片描述

第一、Hive 中创建表,关联Hudi表
第二、Presto集成Hive,加载Hive表数据
第三、Presto集成MySQL,读取或者保存数据

4.1-Presto 是什么

Presto是一款Facebook开源的MPP架构的OLAP查询引擎,可针对不同数据源执行大容量数据集的一款分布式SQL执行引擎。适用于交互式分析查询,数据量支持GB到PB字节。

1、清晰的架构,是一个能够独立运行的系统,不依赖于任何其他外部系统。例如调度,presto自身提供了对集群的监控,可以根据监控信息完成调度。
2、简单的数据结构,列式存储,逻辑行,大部分数据都可以轻易的转化成presto所需要的这种数据结构。
3、丰富的插件接口,完美对接外部存储系统,或者添加自定义的函数。

官网:https://prestodb.io/ 

Presto采用典型的master-slave模型,由一个Coordinator节点,一个Discovery Server节点,多个Worker节点组成,Discovery Server通常内嵌于Coordinator节点中。

在这里插入图片描述

1、coordinator(master)负责meta管理,worker管理,query的解析和调度
2、worker则负责计算和读写
3、discovery server, 通常内嵌于coordinator节点中,也可以单独部署,用于节点心跳。在下文中,默认discovery和coordinator共享一台机器。

Presto 数据模型:采取三层表结构

在这里插入图片描述

1、catalog 对应某一类数据源,例如hive的数据,或mysql的数据
2、schema 对应mysql中的数据库
3、table 对应mysql中的表

4.2-Presto 安装部署

采用单节点部署安装Presto,服务器名称:master,IP地址:192.168.184.135

1. Presto 分析引擎
	官方网站:
		https://prestodb.io/
	下载地址:
		https://prestodb.io/download.html
	SERVER:服务
			Master(Coordinator)协调节点
			Workers工作节点
			https://repo1.maven.org/maven2/com/facebook/presto/presto-server/0.266.1/presto-server-0.266.1.tar.gz(服务包)

	命令行客户端
			https://repo1.maven.org/maven2/com/facebook/presto/presto-cli/0.266.1/presto-cli-0.266.1-executable.jar(客户端包)
			
	JDBC DRIVER
			通过JDBC连接服务,编写DDL、DML及DQL语句,发送执行
			https://repo1.maven.org/maven2/com/facebook/presto/presto-jdbc/0.266.1/presto-jdbc-0.266.1.jar (jdbc包)
4.2.1-上传解压Presto安装包
# yum安装上传文件插件lrzsz
yum install -y lrzsz

# 上传安装包到master的/usr/local/src/software-jar目录
presto-server-0.245.1.tar.gz

# 解压、重命名
tar -xzvf presto-server-0.245.1.tar.gz
ln -s presto-server-0.245.1 presto 
mv presto-server-0.245.1/ presto

#创建配置文件存储目录
mkdir -p /usr/local/src/presto/etc
4.2.2-配置presto
config.properties

vim /usr/local/src/presto/etc/config.properties

coordinator=true
node-scheduler.include-coordinator=true
http-server.http.port=8090
query.max-memory=6GB
query.max-memory-per-node=2GB
query.max-total-memory-per-node=2GB
discovery-server.enabled=true
discovery.uri=http://192.168.184.135:8090
jvm.config

vim /usr/local/src/presto/etc/jvm.config

-server
-Xmx3G
-XX:+UseG1GC
-XX:G1HeapRegionSize=32M
-XX:+UseGCOverheadLimit
-XX:+ExplicitGCInvokesConcurrent
-XX:+HeapDumpOnOutOfMemoryError
-XX:+ExitOnOutOfMemoryError
node.properties

vim /usr/local/src/presto/etc/node.properties

node.environment=hudipresto
node.id=presto-master
node.data-dir=/usr/local/src/presto/data
hive.properties

mkdir -p /usr/local/src/presto/etc/catalog

vim /usr/local/src/presto/etc/catalog/hive.properties

connector.name=hive-hadoop2
hive.metastore.uri=thrift://192.168.184.135:9083
hive.parquet.use-column-names=true
hive.config.resources=/usr/local/src/presto/etc/catalog/core-site.xml,/export/server/presto/etc/catalog/hdfs-site.xml
mysql.properties

vim /usr/local/src/presto/etc/catalog/mysql.properties

connector.name=mysql
connection-url=jdbc:mysql://192.168.184.135:3306
connection-user=root
connection-password=xxxxxx
4.2.3-启动服务
launcher start

使用jps查看进程是否存在,进程名称:PrestoServer

此外WEB UI界面:
http://192.168.184.135:8090/ui/
4.2.4-Presto CLI命令行客户端
#客户端Jar

presto-cli-0.241-executable.jar

#上传presto-cli-0.245.1-executable.jar到/usr/local/src/presto/bin

mv presto-cli-0.245.1-executable.jar presto
chmod u+x presto

#CLI客户端启动

/usr/local/src/presto/bin/presto --server 192.168.184.135:8090

#展示catalogs
presto> show catalogs;
 Catalog
---------
 hive
 mysql
 system
(3 rows)

Query 20231124_163247_00000_gz4bb, FINISHED, 1 node
Splits: 19 total, 19 done (100.00%)
0:01 [0 rows, 0B] [0 rows/s, 0B/s]

#查询hive schema,需提前启动hive metastore

presto> show schemas from hive;
       Schema
--------------------
 db_hudi
 default
 information_schema
 saddam
(4 rows)

4.3-Hive 创建表

为了让Presto分析Hudi表中数据,需要将Hudi表映射关联到Hive表中。接下来,再Hive中创建5张传智教育客户业务数据表,映射关联到Hudi表

启动HDFS服务、HiveMetaStore和HiveServer服务,运行Beeline命令行

-- 启动HDFS服务
start-dfs.sh

-- Hive服务
start-metastore.sh 
start-hiveserver2.sh

-- 启动Beeline客户端
start-beeline.sh

-- 设置Hive本地模式
set hive.exec.mode.local.auto=true;
set hive.mapred.mode=nonstrict;
set hive.exec.mode.local.auto.inputbytes.max=50000000;
4.3.1-创建数据库
-- 创建数据库
CREATE DATABASE IF NOT EXISTS edu_hudi ;
-- 使用数据库
USE edu_hudi ;
4.3.2-客户信息表

编写DDL语句创建表

CREATE EXTERNAL TABLE edu_hudi.tbl_customer(
  id string,
  customer_relationship_id string,
  create_date_time string,
  update_date_time string,
  deleted string,
  name string,
  idcard string,
  birth_year string,
  gender string,
  phone string,
  wechat string,
  qq string,
  email string,
  area string,
  leave_school_date string,
  graduation_date string,
  bxg_student_id string,
  creator string,
  origin_type string,
  origin_channel string,
  tenant string,
  md_id string
)PARTITIONED BY (day_str string)
ROW FORMAT SERDE 
  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
STORED AS INPUTFORMAT 
  'org.apache.hudi.hadoop.HoodieParquetInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION 
  '/hudi-warehouse/edu_customer_hudi' ;

由于是分区表,所以添加分区

ALTER TABLE edu_hudi.tbl_customer ADD IF NOT EXISTS PARTITION(day_str='2023-11-24') 
location '/hudi-warehouse/edu_customer_hudi/2023-11-24' ;
4.3.3-客户意向表

编写DDL语句创建表

CREATE EXTERNAL TABLE edu_hudi.tbl_customer_relationship(
  id string,
  create_date_time string,
  update_date_time string,
  deleted string,
  customer_id string,
  first_id string,
  belonger string,
  belonger_name string,
  initial_belonger string,
  distribution_handler string,
  business_scrm_department_id string,
  last_visit_time string,
  next_visit_time string,
  origin_type string,
  itcast_school_id string,
  itcast_subject_id string,
  intention_study_type string,
  anticipat_signup_date string,
  `level` string,
  creator string,
  current_creator string,
  creator_name string,
  origin_channel string,
  `comment` string,
  first_customer_clue_id string,
  last_customer_clue_id string,
  process_state string,
  process_time string,
  payment_state string,
  payment_time string,
  signup_state string,
  signup_time string,
  notice_state string,
  notice_time string,
  lock_state string,
  lock_time string,
  itcast_clazz_id string,
  itcast_clazz_time string,
  payment_url string,
  payment_url_time string,
  ems_student_id string,
  delete_reason string,
  deleter string,
  deleter_name string,
  delete_time string,
  course_id string,
  course_name string,
  delete_comment string,
  close_state string,
  close_time string,
  appeal_id string,
  tenant string,
  total_fee string,
  belonged string,
  belonged_time string,
  belonger_time string,
  transfer string,
  transfer_time string,
  follow_type string,
  transfer_bxg_oa_account string,
  transfer_bxg_belonger_name string
)PARTITIONED BY (day_str string)
ROW FORMAT SERDE 
  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
STORED AS INPUTFORMAT 
  'org.apache.hudi.hadoop.HoodieParquetInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION 
  '/hudi-warehouse/edu_customer_relationship_hudi' ;

由于是分区表,所以添加分区

ALTER TABLE edu_hudi.tbl_customer_relationship ADD IF NOT EXISTS PARTITION(day_str='2023-11-24') 
location '/hudi-warehouse/edu_customer_relationship_hudi/2023-11-24' ;
4.3.4-客户线索表

编写DDL语句创建表

CREATE EXTERNAL TABLE edu_hudi.tbl_customer_clue(
  id string,
  create_date_time string,
  update_date_time string,
  deleted string,
  customer_id string,
  customer_relationship_id string,
  session_id string,
  sid string,
  status string,
  `user` string,
  create_time string,
  platform string,
  s_name string,
  seo_source string,
  seo_keywords string,
  ip string,
  referrer string,
  from_url string,
  landing_page_url string,
  url_title string,
  to_peer string,
  manual_time string,
  begin_time string,
  reply_msg_count string,
  total_msg_count string,
  msg_count string,
  `comment` string,
  finish_reason string,
  finish_user string,
  end_time string,
  platform_description string,
  browser_name string,
  os_info string,
  area string,
  country string,
  province string,
  city string,
  creator string,
  name string,
  idcard string,
  phone string,
  itcast_school_id string,
  itcast_school string,
  itcast_subject_id string,
  itcast_subject string,
  wechat string,
  qq string,
  email string,
  gender string,
  `level` string,
  origin_type string,
  information_way string,
  working_years string,
  technical_directions string,
  customer_state string,
  valid string,
  anticipat_signup_date string,
  clue_state string,
  scrm_department_id string,
  superior_url string,
  superior_source string,
  landing_url string,
  landing_source string,
  info_url string,
  info_source string,
  origin_channel string,
  course_id string,
  course_name string,
  zhuge_session_id string,
  is_repeat string,
  tenant string,
  activity_id string,
  activity_name string,
  follow_type string,
  shunt_mode_id string,
  shunt_employee_group_id string
)
PARTITIONED BY (day_str string)
ROW FORMAT SERDE 
  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
STORED AS INPUTFORMAT 
  'org.apache.hudi.hadoop.HoodieParquetInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION 
  '/hudi-warehouse/edu_customer_clue_hudi' ;

由于是分区表,所以添加分区

ALTER TABLE edu_hudi.tbl_customer_clue ADD IF NOT EXISTS PARTITION(day_str='2023-11-24') 
location '/hudi-warehouse/edu_customer_clue_hudi/2023-11-24' ;
4.3.5-客户申诉表

编写DDL语句创建表

CREATE EXTERNAL TABLE edu_hudi.tbl_customer_appeal(
  id string,
  customer_relationship_first_id STRING,
  employee_id STRING,
  employee_name STRING,
  employee_department_id STRING,
  employee_tdepart_id STRING,
  appeal_status STRING,
  audit_id STRING,
  audit_name STRING,
  audit_department_id STRING,
  audit_department_name STRING,
  audit_date_time STRING,
  create_date_time STRING,
  update_date_time STRING,
  deleted STRING,
  tenant STRING
)
PARTITIONED BY (day_str string)
ROW FORMAT SERDE 
  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
STORED AS INPUTFORMAT 
  'org.apache.hudi.hadoop.HoodieParquetInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION 
  '/hudi-warehouse/edu_customer_appeal_hudi' ;

由于是分区表,所以添加分区

ALTER TABLE edu_hudi.tbl_customer_appeal ADD IF NOT EXISTS PARTITION(day_str='2023-11-24') 
location '/hudi-warehouse/edu_customer_appeal_hudi/2023-11-24' ;
4.3.6-客户访问咨询记录表

编写DDL语句创建表

CREATE EXTERNAL TABLE edu_hudi.tbl_web_chat_ems (
  id string,
  create_date_time string,
  session_id string,
  sid string,
  create_time string,
  seo_source string,
  seo_keywords string,
  ip string,
  area string,
  country string,
  province string,
  city string,
  origin_channel string,
  `user` string,
  manual_time string,
  begin_time string,
  end_time string,
  last_customer_msg_time_stamp string,
  last_agent_msg_time_stamp string,
  reply_msg_count string,
  msg_count string,
  browser_name string,
  os_info string
)
PARTITIONED BY (day_str string)
ROW FORMAT SERDE 
  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
STORED AS INPUTFORMAT 
  'org.apache.hudi.hadoop.HoodieParquetInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION 
  '/hudi-warehouse/edu_web_chat_ems_hudi' ;

由于是分区表,所以添加分区

ALTER TABLE edu_hudi.tbl_web_chat_ems ADD IF NOT EXISTS PARTITION(day_str='2023-11-24') 
location '/hudi-warehouse/edu_web_chat_ems_hudi/2023-11-24' ;

4.4-离线指标分析

使用Presto分析Hudi表数据,需要将集成jar包:hudi-presto-bundle-0.9.0.jar,放入到Presto插件目录:presto/plugin/hive-hadoop2中

#启动Presto Client 客户端命令行,查看Hive中创建数据库
launcher start

presto --server 192.168.184.135:8090

#展示catalogs
presto> show catalogs;

#查询hive的schemas
presto> show schemas from hive;
       Schema
--------------------
 db_hudi
 default
 edu_hudi
 information_schema
 saddam
(5 rows)

#使用数据库:edu_hudi,查看有哪些表
presto> use hive.edu_hudi;
USE
presto:edu_hudi> show tables;
           Table
---------------------------
 tbl_customer
 tbl_customer_appeal
 tbl_customer_clue
 tbl_customer_relationship
 tbl_web_chat_ems
(5 rows)

接下来,按照业务指标需求,使用Presto,分析Hudi表数据,将指标直接保存MySQL数据库

在这里插入图片描述

首先在MySQL数据库中,创建database,专门存储分析指标表

-- 创建数据库
CREATE DATABASE `itcast_rpt` /*!40100 DEFAULT CHARACTER SET utf8 */;
4.4.1-每日报名量

对客户意向表数据统计分析:每日客户报名量,先创建MySQL表,再编写SQL,最后保存数据

MySQL-创建表:itcast_rpt.stu_apply

CREATE TABLE  IF NOT EXISTS `itcast_rpt`.`stu_apply` (
  `report_date` longtext,
  `report_total` bigint(20) NOT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

presto-指标SQL语句

WITH tmp AS (
  SELECT 
    format_datetime(from_unixtime(cast(payment_time as bigint) / 1000),'yyyy-MM-dd')AS day_value, customer_id 
  FROM hive.edu_hudi.tbl_customer_relationship 
  WHERE 
    day_str = '2023-11-24' AND payment_time IS NOT NULL AND payment_state = 'PAID' AND deleted = 'false'
)
SELECT day_value, COUNT(customer_id) AS total FROM tmp GROUP BY day_value ;

presto-分析结果保存MySQL表

INSERT INTO mysql.itcast_rpt.stu_apply (report_date, report_total) 
SELECT day_value, total FROM (
  SELECT day_value, COUNT(customer_id) AS total FROM (
    SELECT 
      format_datetime(from_unixtime(cast(payment_time as bigint) / 1000), 'yyyy-MM-dd')AS day_value, customer_id 
    FROM hive.edu_hudi.tbl_customer_relationship 
    WHERE day_str = '2023-11-24' AND payment_time IS NOT NULL AND payment_state = 'PAID' AND deleted = 'false'
  ) GROUP BY day_value
) ;
4.4.2-每日访问量

MySQL-创建表:itcast_rpt.web_pv

CREATE TABLE  IF NOT EXISTS `itcast_rpt`.`web_pv` (
  `report_date` longtext,
  `report_total` bigint(20) NOT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

presto-指标SQL语句

WITH tmp AS (
  SELECT 
    id, format_datetime(from_unixtime(cast(create_time as bigint) / 1000), 'yyyy-MM-dd')AS day_value
  FROM hive.edu_hudi.tbl_web_chat_ems 
  WHERE day_str = '2023-11-24' 
)
SELECT day_value, COUNT(id) AS total FROM tmp GROUP BY day_value ;

presto-分析结果保存MySQL表

INSERT INTO mysql.itcast_rpt.web_pv (report_date, report_total) 
SELECT day_value, COUNT(id) AS total FROM (
  SELECT 
    id, format_datetime(from_unixtime(cast(create_time as bigint) / 1000), 'yyyy-MM-dd') AS day_value
  FROM hive.edu_hudi.tbl_web_chat_ems 
  WHERE day_str = '2023-11-24' 
) GROUP BY day_value ;
4.4.3-每日意向数

MySQL-创建表:itcast_rpt.stu_intention

CREATE TABLE  IF NOT EXISTS `itcast_rpt`.`stu_intention` (
  `report_date` longtext,
  `report_total` bigint(20) NOT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

presto-指标SQL语句

WITH tmp AS (
  SELECT 
    id, format_datetime(from_unixtime(cast(create_date_time as bigint) / 1000), 'yyyy-MM-dd')AS day_value
  FROM hive.edu_hudi.tbl_customer_relationship 
  WHERE day_str = '2023-11-24' AND create_date_time IS NOT NULL AND deleted = 'false'
)
SELECT day_value, COUNT(id) AS total FROM tmp GROUP BY day_value ;

presto-分析结果保存MySQL表

INSERT INTO mysql.itcast_rpt.stu_intention (report_date, report_total) 
SELECT day_value, COUNT(id) AS total FROM (
  SELECT 
    id, format_datetime(from_unixtime(cast(create_date_time as bigint) / 1000), 'yyyy-MM-dd')AS day_value
  FROM hive.edu_hudi.tbl_customer_relationship 
  WHERE day_str = '2023-11-24' AND create_date_time IS NOT NULL AND deleted = 'false'
) GROUP BY day_value ;
4.4.4-每日线索量

MySQL-创建表:itcast_rpt.stu_clue

CREATE TABLE IF NOT EXISTS `itcast_rpt`.`stu_clue` (
  `report_date` longtext,
  `report_total` bigint(20) NOT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

presto-指标SQL语句

WITH tmp AS (
  SELECT 
    id, format_datetime(from_unixtime(cast(create_date_time as bigint) / 1000), 'yyyy-MM-dd')AS day_value
  FROM hive.edu_hudi.tbl_customer_clue 
  WHERE day_str = '2023-11-24' AND clue_state IS NOT NULL AND deleted = 'false'
)
SELECT day_value, COUNT(id) AS total FROM tmp GROUP BY day_value ;

presto-分析结果保存MySQL表

INSERT INTO mysql.itcast_rpt.stu_clue (report_date, report_total) 
SELECT day_value, COUNT(id) AS total FROM (
  SELECT 
    id, format_datetime(from_unixtime(cast(create_date_time as bigint) / 1000), 'yyyy-MM-dd')AS day_value
  FROM hive.edu_hudi.tbl_customer_clue 
  WHERE day_str = '2023-11-24' AND clue_state IS NOT NULL AND deleted = 'false'
) GROUP BY day_value ;

5、Flink SQL 流式分析

使用Flink SQL流式查询Hudi表今日实时数据,统计离线指标对应今日实时指标,最后使用FineBI实时大屏展示

在这里插入图片描述

基于Flink SQL Connector与Hudi和MySQL集成,编写SQL流式查询分析,在SQL Clientk客户端命令行执行DDL语句和SELECT语句。

5.1-业务需求

实时对传智教育客户每日业务数据进行基本指标统计,如下所示

在这里插入图片描述

总共有5个指标,涉及到3张业务表:客户访问记录表、客户线索表和客户意向表,其中每个指标实时数据存储到MySQL数据库中一张表。

每个实时指标统计,分为三个步骤:

第1步、创建输入表,流式加载Hudi表数据;
第2步、创建输出表,实时保存数据至MySQL表;
第3步、依据业务,编写SQL语句,查询输入表数据,并将结果插入输出表;

在这里插入图片描述

5.2-创建MySQL表

每个实时指标存储到MySQL数据库一张表,首先创建5个指标对应的5张表,名称不一样,字段一样,DDL语句如下

--指标1:今日访问量
CREATE TABLE `itcast_rpt`.`realtime_web_pv` (
  `report_date` varchar(255) NOT NULL,
  `report_total` bigint(20) NOT NULL,
  PRIMARY KEY (`report_date`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

--指标2:今日咨询量
CREATE TABLE `itcast_rpt`.`realtime_stu_consult` (
  `report_date` varchar(255) NOT NULL,
  `report_total` bigint(20) NOT NULL,
  PRIMARY KEY (`report_date`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

--指标3:今日意向数
CREATE TABLE `itcast_rpt`.`realtime_stu_intention` (
  `report_date` varchar(255) NOT NULL,
  `report_total` bigint(20) NOT NULL,
  PRIMARY KEY (`report_date`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

--指标4:今日报名人数
CREATE TABLE `itcast_rpt`.`realtime_stu_apply` (
  `report_date` varchar(255) NOT NULL,
  `report_total` bigint(20) NOT NULL,
  PRIMARY KEY (`report_date`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

--指标5:今日有效线索量
CREATE TABLE `itcast_rpt`.`realtime_stu_clue` (
  `report_date` varchar(255) NOT NULL,
  `report_total` bigint(20) NOT NULL,
  PRIMARY KEY (`report_date`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

5.3-实时指标分析

具体演示,采用离线加载hudi表数据进行统计分析存储到mysql

实时统计5个指标,加载3个Hudi表数据,如下所示

在这里插入图片描述

1.今日访问量和今日咨询量,流式加载表:edu_web_chat_ems_hudi 数据

在这里插入图片描述

2.今日意向数和今日报名人数,流式加载表:edu_customer_relationship_hudi 数据

在这里插入图片描述

3.今日有效线索量,流式加载表:edu_customer_clue_hudi 数据

在这里插入图片描述

启动服务

启动HDFS服务和Standalone集群,运行SQL Client客户端,设置属性

-- 启动HDFS服务
hadoop-daemon.sh start namenode 
hadoop-daemon.sh start datanode

-- 启动Flink Standalone集群
export HADOOP_CLASSPATH=`/usr/local/src/hadoop/bin/hadoop classpath`
/usr/loacl/src/flink/bin/start-cluster.sh

-- 启动SQL Client
/usr/local/src/flink/bin/sql-client.sh embedded \
-j /usr/local/src/flink/lib/hudi-flink-bundle_2.12-0.9.0.jar shell

-- 设置属性
set execution.result-mode=tableau;
set execution.checkpointing.interval=3sec;
-- 流处理模式
SET execution.runtime-mode = streaming; 
5.3.1-今日访问量

首先创建输入表:流式加载,Hudi表数据

CREATE TABLE edu_web_chat_ems_hudi (
  id string PRIMARY KEY NOT ENFORCED,
  create_date_time string,
  session_id string,
  sid string,
  create_time string,
  seo_source string,
  seo_keywords string,
  ip string,
  area string,
  country string,
  province string,
  city string,
  origin_channel string,
  `user` string,
  manual_time string,
  begin_time string,
  end_time string,
  last_customer_msg_time_stamp string,
  last_agent_msg_time_stamp string,
  reply_msg_count string,
  msg_count string,
  browser_name string,
  os_info string,
  part STRING
)
PARTITIONED BY (part)
WITH(
  'connector'='hudi',
  'path'= 'hdfs://192.168.184.135:9000/hudi-warehouse/edu_web_chat_ems_hudi', 
  'table.type'= 'MERGE_ON_READ',
  'hoodie.datasource.write.recordkey.field'= 'id', 
  'write.precombine.field'= 'create_date_time',
  'read.tasks' = '1'
);


--流式才使用,此案例无法流式写入hudi数据,所以此处不添加流式
  'read.streaming.enabled' = 'true',
  'read.streaming.check-interval' = '5',

统计结果,存储至视图View

CREATE VIEW IF NOT EXISTS view_tmp_web_pv AS
SELECT day_value, COUNT(id) AS total FROM (
  SELECT
    FROM_UNIXTIME(CAST(create_time AS BIGINT) / 1000, 'yyyy-MM-dd') AS day_value, id
  FROM edu_web_chat_ems_hudi
  WHERE part ='2023-11-24'
) GROUP BY  day_value;

--若是流式写数据,WHERE part = CAST(CURRENT_DATE AS STRING)

保存MySQL数据库

-- SQL Connector MySQL
CREATE TABLE realtime_web_pv_mysql (
  report_date STRING,
  report_total BIGINT, 
  PRIMARY KEY (report_date) NOT ENFORCED
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://192.168.184.135:3306/itcast_rpt',
  'driver' = 'com.mysql.cj.jdbc.Driver',
  'username' = 'root',
  'password' = 'xxxxxx',
  'table-name' = 'realtime_web_pv'
);

-- INSERT INTO 插入
INSERT INTO  realtime_web_pv_mysql SELECT day_value, total FROM view_tmp_web_pv;

--插入报错Could not find any factory for identifier 'jdbc' that implements 
flink-connector-jdbc_2.11-1.12.2.jar放入flink/lib下,然后重启服务
5.3.2-今日咨询量

统计结果,存储至视图View

CREATE VIEW IF NOT EXISTS view_tmp_stu_consult AS
SELECT day_value, COUNT(id) AS total FROM (
  SELECT
    FROM_UNIXTIME(CAST(create_time AS BIGINT) / 1000, 'yyyy-MM-dd') AS day_value, id
  FROM edu_web_chat_ems_hudi
  WHERE part ='2023-11-24' AND msg_count > 0
) GROUP BY  day_value;

保存MySQL数据库

-- SQL Connector MySQL
CREATE TABLE realtime_stu_consult_mysql (
  report_date STRING,
  report_total BIGINT, 
  PRIMARY KEY (report_date) NOT ENFORCED
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://192.168.184.135:3306/itcast_rpt',
  'driver' = 'com.mysql.cj.jdbc.Driver',
  'username' = 'root',
  'password' = 'xxxxxx',
  'table-name' = 'realtime_stu_consult'
);

-- INSERT INTO 插入
INSERT INTO  realtime_stu_consult_mysql SELECT day_value, total FROM view_tmp_stu_consult;
5.3.3-今日意向数

首先创建输入表:流式加载,Hudi表数据

create table edu_customer_relationship_hudi(
  id string PRIMARY KEY NOT ENFORCED,
  create_date_time string,
  update_date_time string,
  deleted string,
  customer_id string,
  first_id string,
  belonger string,
  belonger_name string,
  initial_belonger string,
  distribution_handler string,
  business_scrm_department_id string,
  last_visit_time string,
  next_visit_time string,
  origin_type string,
  itcast_school_id string,
  itcast_subject_id string,
  intention_study_type string,
  anticipat_signup_date string,
  `level` string,
  creator string,
  current_creator string,
  creator_name string,
  origin_channel string,
  `comment` string,
  first_customer_clue_id string,
  last_customer_clue_id string,
  process_state string,
  process_time string,
  payment_state string,
  payment_time string,
  signup_state string,
  signup_time string,
  notice_state string,
  notice_time string,
  lock_state string,
  lock_time string,
  itcast_clazz_id string,
  itcast_clazz_time string,
  payment_url string,
  payment_url_time string,
  ems_student_id string,
  delete_reason string,
  deleter string,
  deleter_name string,
  delete_time string,
  course_id string,
  course_name string,
  delete_comment string,
  close_state string,
  close_time string,
  appeal_id string,
  tenant string,
  total_fee string,
  belonged string,
  belonged_time string,
  belonger_time string,
  transfer string,
  transfer_time string,
  follow_type string,
  transfer_bxg_oa_account string,
  transfer_bxg_belonger_name string,
  part STRING
)
PARTITIONED BY (part)
WITH(
  'connector'='hudi',
  'path'= 'hdfs://192.168.184.135:9000/hudi-warehouse/edu_customer_relationship_hudi', 
  'table.type'= 'MERGE_ON_READ',
  'hoodie.datasource.write.recordkey.field'= 'id', 
  'write.precombine.field'= 'create_date_time', 
  'read.tasks' = '1'
);

统计结果,存储至视图View

CREATE VIEW IF NOT EXISTS view_tmp_stu_intention AS
SELECT day_value, COUNT(id) AS total FROM (
  SELECT
    FROM_UNIXTIME(CAST(create_date_time AS BIGINT) / 1000, 'yyyy-MM-dd') AS day_value, id
  FROM edu_customer_relationship_hudi
  WHERE part ='2023-11-24' AND create_date_time IS NOT NULL AND deleted = 'false'
) GROUP BY  day_value;

保存MySQL数据库

-- SQL Connector MySQL
CREATE TABLE realtime_stu_intention_mysql (
  report_date STRING,
  report_total BIGINT, 
  PRIMARY KEY (report_date) NOT ENFORCED
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://192.168.184.135:3306/itcast_rpt',
  'driver' = 'com.mysql.cj.jdbc.Driver',
  'username' = 'root',
  'password' = 'xxxxxx',
  'table-name' = 'realtime_stu_intention'
);

-- INSERT INTO 插入
INSERT INTO  realtime_stu_intention_mysql SELECT day_value, total 
FROM view_tmp_stu_intention;
5.3.4-今日报名人数

统计结果,存储至视图View

CREATE VIEW IF NOT EXISTS view_tmp_stu_apply AS
SELECT day_value, COUNT(id) AS total FROM (
  SELECT
    FROM_UNIXTIME(CAST(payment_time AS BIGINT) / 1000, 'yyyy-MM-dd') AS day_value, id
  FROM edu_customer_relationship_hudi
  WHERE part ='2023-11-24' AND payment_time IS NOT NULL 
AND payment_state = 'PAID' AND deleted = 'false'
) GROUP BY  day_value;

保存MySQL数据库

-- SQL Connector MySQL
CREATE TABLE realtime_stu_apply_mysql (
  report_date STRING,
  report_total BIGINT, 
  PRIMARY KEY (report_date) NOT ENFORCED
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://192.168.184.135:3306/itcast_rpt',
  'driver' = 'com.mysql.cj.jdbc.Driver',
  'username' = 'root',
  'password' = 'xxxxxx',
  'table-name' = 'realtime_stu_apply'
);

-- INSERT INTO 插入
INSERT INTO  realtime_stu_apply_mysql SELECT day_value, total FROM view_tmp_stu_apply;
5.3.5-今日有效线索量

首先创建输入表:流式加载,Hudi表数据

create table edu_customer_clue_hudi(
  id string PRIMARY KEY NOT ENFORCED,
  create_date_time string,
  update_date_time string,
  deleted string,
  customer_id string,
  customer_relationship_id string,
  session_id string,
  sid string,
  status string,
  `user` string,
  create_time string,
  platform string,
  s_name string,
  seo_source string,
  seo_keywords string,
  ip string,
  referrer string,
  from_url string,
  landing_page_url string,
  url_title string,
  to_peer string,
  manual_time string,
  begin_time string,
  reply_msg_count string,
  total_msg_count string,
  msg_count string,
  `comment` string,
  finish_reason string,
  finish_user string,
  end_time string,
  platform_description string,
  browser_name string,
  os_info string,
  area string,
  country string,
  province string,
  city string,
  creator string,
  name string,
  idcard string,
  phone string,
  itcast_school_id string,
  itcast_school string,
  itcast_subject_id string,
  itcast_subject string,
  wechat string,
  qq string,
  email string,
  gender string,
  `level` string,
  origin_type string,
  information_way string,
  working_years string,
  technical_directions string,
  customer_state string,
  valid string,
  anticipat_signup_date string,
  clue_state string,
  scrm_department_id string,
  superior_url string,
  superior_source string,
  landing_url string,
  landing_source string,
  info_url string,
  info_source string,
  origin_channel string,
  course_id string,
  course_name string,
  zhuge_session_id string,
  is_repeat string,
  tenant string,
  activity_id string,
  activity_name string,
  follow_type string,
  shunt_mode_id string,
  shunt_employee_group_id string,
  part STRING
)
PARTITIONED BY (part)
WITH(
  'connector'='hudi',
  'path'= 'hdfs://192.168.184.135:9000/hudi-warehouse/edu_customer_clue_hudi', 
  'table.type'= 'MERGE_ON_READ',
  'hoodie.datasource.write.recordkey.field'= 'id', 
  'write.precombine.field'= 'create_date_time',  
  'read.tasks' = '1'
);

统计结果,存储至视图View

CREATE VIEW IF NOT EXISTS view_tmp_stu_clue AS
SELECT day_value, COUNT(id) AS total FROM (
  SELECT
    FROM_UNIXTIME(CAST(create_date_time AS BIGINT) / 1000, 'yyyy-MM-dd') AS day_value, id
  FROM edu_customer_clue_hudi
  WHERE part ='2023-11-24' AND clue_state IS NOT NULL AND deleted = 'false'
) GROUP BY  day_value;

保存MySQL数据库

-- SQL Connector MySQL
CREATE TABLE realtime_stu_clue_mysql (
  report_date STRING,
  report_total BIGINT, 
  PRIMARY KEY (report_date) NOT ENFORCED
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://192.168.184.135:3306/itcast_rpt',
  'driver' = 'com.mysql.cj.jdbc.Driver',
  'username' = 'root',
  'password' = 'xxxxxx',
  'table-name' = 'realtime_stu_clue'
);

-- INSERT INTO 插入
INSERT INTO  realtime_stu_clue_mysql SELECT day_value, total FROM view_tmp_stu_clue;

6、FineBI 报表可视化

create_date_time string,
update_date_time string,
deleted string,
customer_id string,
first_id string,
belonger string,
belonger_name string,
initial_belonger string,
distribution_handler string,
business_scrm_department_id string,
last_visit_time string,
next_visit_time string,
origin_type string,
itcast_school_id string,
itcast_subject_id string,
intention_study_type string,
anticipat_signup_date string,
level string,
creator string,
current_creator string,
creator_name string,
origin_channel string,
comment string,
first_customer_clue_id string,
last_customer_clue_id string,
process_state string,
process_time string,
payment_state string,
payment_time string,
signup_state string,
signup_time string,
notice_state string,
notice_time string,
lock_state string,
lock_time string,
itcast_clazz_id string,
itcast_clazz_time string,
payment_url string,
payment_url_time string,
ems_student_id string,
delete_reason string,
deleter string,
deleter_name string,
delete_time string,
course_id string,
course_name string,
delete_comment string,
close_state string,
close_time string,
appeal_id string,
tenant string,
total_fee string,
belonged string,
belonged_time string,
belonger_time string,
transfer string,
transfer_time string,
follow_type string,
transfer_bxg_oa_account string,
transfer_bxg_belonger_name string,
part STRING
)
PARTITIONED BY (part)
WITH(
‘connector’=‘hudi’,
‘path’= ‘hdfs://192.168.184.135:9000/hudi-warehouse/edu_customer_relationship_hudi’,
‘table.type’= ‘MERGE_ON_READ’,
‘hoodie.datasource.write.recordkey.field’= ‘id’,
‘write.precombine.field’= ‘create_date_time’,
‘read.tasks’ = ‘1’
);


**统计结果,存储至视图View**

~~~sql
CREATE VIEW IF NOT EXISTS view_tmp_stu_intention AS
SELECT day_value, COUNT(id) AS total FROM (
  SELECT
    FROM_UNIXTIME(CAST(create_date_time AS BIGINT) / 1000, 'yyyy-MM-dd') AS day_value, id
  FROM edu_customer_relationship_hudi
  WHERE part ='2023-11-24' AND create_date_time IS NOT NULL AND deleted = 'false'
) GROUP BY  day_value;

保存MySQL数据库

-- SQL Connector MySQL
CREATE TABLE realtime_stu_intention_mysql (
  report_date STRING,
  report_total BIGINT, 
  PRIMARY KEY (report_date) NOT ENFORCED
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://192.168.184.135:3306/itcast_rpt',
  'driver' = 'com.mysql.cj.jdbc.Driver',
  'username' = 'root',
  'password' = 'xxxxxx',
  'table-name' = 'realtime_stu_intention'
);

-- INSERT INTO 插入
INSERT INTO  realtime_stu_intention_mysql SELECT day_value, total 
FROM view_tmp_stu_intention;
5.3.4-今日报名人数

统计结果,存储至视图View

CREATE VIEW IF NOT EXISTS view_tmp_stu_apply AS
SELECT day_value, COUNT(id) AS total FROM (
  SELECT
    FROM_UNIXTIME(CAST(payment_time AS BIGINT) / 1000, 'yyyy-MM-dd') AS day_value, id
  FROM edu_customer_relationship_hudi
  WHERE part ='2023-11-24' AND payment_time IS NOT NULL 
AND payment_state = 'PAID' AND deleted = 'false'
) GROUP BY  day_value;

保存MySQL数据库

-- SQL Connector MySQL
CREATE TABLE realtime_stu_apply_mysql (
  report_date STRING,
  report_total BIGINT, 
  PRIMARY KEY (report_date) NOT ENFORCED
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://192.168.184.135:3306/itcast_rpt',
  'driver' = 'com.mysql.cj.jdbc.Driver',
  'username' = 'root',
  'password' = 'xxxxxx',
  'table-name' = 'realtime_stu_apply'
);

-- INSERT INTO 插入
INSERT INTO  realtime_stu_apply_mysql SELECT day_value, total FROM view_tmp_stu_apply;
5.3.5-今日有效线索量

首先创建输入表:流式加载,Hudi表数据

create table edu_customer_clue_hudi(
  id string PRIMARY KEY NOT ENFORCED,
  create_date_time string,
  update_date_time string,
  deleted string,
  customer_id string,
  customer_relationship_id string,
  session_id string,
  sid string,
  status string,
  `user` string,
  create_time string,
  platform string,
  s_name string,
  seo_source string,
  seo_keywords string,
  ip string,
  referrer string,
  from_url string,
  landing_page_url string,
  url_title string,
  to_peer string,
  manual_time string,
  begin_time string,
  reply_msg_count string,
  total_msg_count string,
  msg_count string,
  `comment` string,
  finish_reason string,
  finish_user string,
  end_time string,
  platform_description string,
  browser_name string,
  os_info string,
  area string,
  country string,
  province string,
  city string,
  creator string,
  name string,
  idcard string,
  phone string,
  itcast_school_id string,
  itcast_school string,
  itcast_subject_id string,
  itcast_subject string,
  wechat string,
  qq string,
  email string,
  gender string,
  `level` string,
  origin_type string,
  information_way string,
  working_years string,
  technical_directions string,
  customer_state string,
  valid string,
  anticipat_signup_date string,
  clue_state string,
  scrm_department_id string,
  superior_url string,
  superior_source string,
  landing_url string,
  landing_source string,
  info_url string,
  info_source string,
  origin_channel string,
  course_id string,
  course_name string,
  zhuge_session_id string,
  is_repeat string,
  tenant string,
  activity_id string,
  activity_name string,
  follow_type string,
  shunt_mode_id string,
  shunt_employee_group_id string,
  part STRING
)
PARTITIONED BY (part)
WITH(
  'connector'='hudi',
  'path'= 'hdfs://192.168.184.135:9000/hudi-warehouse/edu_customer_clue_hudi', 
  'table.type'= 'MERGE_ON_READ',
  'hoodie.datasource.write.recordkey.field'= 'id', 
  'write.precombine.field'= 'create_date_time',  
  'read.tasks' = '1'
);

统计结果,存储至视图View

CREATE VIEW IF NOT EXISTS view_tmp_stu_clue AS
SELECT day_value, COUNT(id) AS total FROM (
  SELECT
    FROM_UNIXTIME(CAST(create_date_time AS BIGINT) / 1000, 'yyyy-MM-dd') AS day_value, id
  FROM edu_customer_clue_hudi
  WHERE part ='2023-11-24' AND clue_state IS NOT NULL AND deleted = 'false'
) GROUP BY  day_value;

保存MySQL数据库

-- SQL Connector MySQL
CREATE TABLE realtime_stu_clue_mysql (
  report_date STRING,
  report_total BIGINT, 
  PRIMARY KEY (report_date) NOT ENFORCED
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://192.168.184.135:3306/itcast_rpt',
  'driver' = 'com.mysql.cj.jdbc.Driver',
  'username' = 'root',
  'password' = 'xxxxxx',
  'table-name' = 'realtime_stu_clue'
);

-- INSERT INTO 插入
INSERT INTO  realtime_stu_clue_mysql SELECT day_value, total FROM view_tmp_stu_clue;

6、FineBI 报表可视化

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/446605.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【Python从入门到进阶】50、当当网Scrapy项目实战(三)

接上篇《49、当当网Scrapy项目实战&#xff08;二&#xff09;》 上一篇我们讲解了的Spider与item之间的关系&#xff0c;以及如何使用item&#xff0c;以及使用pipelines管道进行数据下载的操作&#xff0c;本篇我们来讲解Scrapy的多页面下载如何实现。 一、多页面下载原理分…

【Leetcode每日一刷】滑动窗口:209.长度最小的子数组

一、209.长度最小的子数组 1.1&#xff1a;题目 题目链接 1.2&#xff1a;解题思路 题型&#xff1a;滑动窗口&#xff1b;时间复杂度&#xff1a;O(n) &#x1faa7; 滑动窗口本质也是双指针的一种技巧&#xff0c;特别适用于字串问题 ❗❗核心思想/ 关键&#xff1a;左右…

A5自媒体wordpress主题模板

一个简洁的wordpress个人博客主题&#xff0c;适合做个人博客&#xff0c;SEO优化效果挺不错的。 https://www.wpniu.com/themes/204.html

前端学习之行内和块级标签

行内标签 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>span</title> </head> <body><!-- 行内标签特点&#xff1a;1、不换行,一行可以放多个2、默认宽度内容撑开代表&#…

谈谈我的自媒体创作真实感悟

因为我有数十款APP和小程序&#xff0c;基本都是些辅助内容创作的工具&#xff0c;于是我就顺水推舟做了几个自媒体账号&#xff1a;微信公众号&#xff0c;抖音&#xff0c;知乎&#xff0c;小红书&#xff0c;CSDN等&#xff0c;账号的名字都是全赞工程师。 目前这些号有收入…

分享一些实用性的大语言模型(GitHub篇)

1.多模态大模型 GitHub网址&#xff1a;haotian-liu/LLaVA&#xff1a;[NeurIPS23 Oral] 视觉指令调优 &#xff08;LLaVA&#xff09; 构建&#xff0c;旨在实现 GPT-4V 级别及以上的能力。 (github.com) 下面是LLaVA模型的介绍&#xff0c;作者都有一直维护和更新&#xff0c…

CSS居中对齐 (垂直居中)

内部块级元素的高度要小于容器(父元素) 方案一&#xff1a;行高 容器高度&#xff08;单行内联元素&#xff09; 限制条件&#xff1a;仅用于单行内联元素 display:inline 和 display: inline-block; 给容器添加样式 height: 100px;line-height: 100px;<!DOCTYPE html>…

TimescaleDB 开源时序数据库

文章目录 1.TimescaleDB介绍2.Hypertable 和 chunk3.Hypertable4.Hypertable操作 开源中间件 # TimescaleDBhttps://iothub.org.cn/docs/middleware/ https://iothub.org.cn/docs/middleware/timescale/timescale-summary/1.TimescaleDB介绍 TimescaleDB是基于PostgreSQL数据…

MATH数据集分享

来源: AINLPer公众号&#xff08;每日干货分享&#xff01;&#xff01;&#xff09; 编辑: ShuYini 校稿: ShuYini 时间: 2024-3-10 很多创新性的研究都可能会遇到数学问题&#xff0c;但是这项技能对于计算机来说仍然是个不小的挑战。为了衡量模型在解决数学问题上的表现。UC…

C++11的简单介绍(上)

1.C11简介 在2003年C标准委员会曾经提交了一份技术勘误表(简称TC1)&#xff0c;使得C03这个名字已经取代了C98称为C11之前的最新C标准名称。不过由于C03(TC1)主要是对C98标准中的漏洞进行修复&#xff0c;语言的核心部分则没有改动&#xff0c;因此人们习惯性的把两个标准合并…

Go语言必知必会100问题-20 切片操作实战

前言 有很多gopher将切片的length和capacity混淆&#xff0c;没有彻底理清这两者的区别和联系。理清楚切片的长度和容量这两者的关系&#xff0c;有助于我们合理的对切片进行初始化、通过append追加元素以及进行复制等操作。如果没有深入理解它们&#xff0c;缺少高效操作切片…

一文彻底搞懂MyISAM和InnoDB区别

文章目录 1. 是否支持行级锁2. 是否支持事务3. 是否支持外键4. 是否支持数据库异常崩溃后的安全恢复5. 是否支持 MVCC6. 索引实现7. 常见的几种 MySQL 存储引擎对比 MySQL 5.5版本之前&#xff0c;MyISAM引擎是MySQL的默认存储引擎&#xff0c;拥有全文索引、压缩和空间函数等特…

力扣——合并k个升序链表

文章目录 题目解析题目链接题目解析 算法讲解暴力解法利用优先级队列进行优化 代码解析 题目解析 题目链接 首先先把题目连接给大家奉上题目链接 题目解析 严格来说这个题目是非常容易理解的相信大家有了合并两个升序链表来理解这个题目就会非常容易理解了。这个题目的意思就…

LeetCode 654.最大二叉树

给定一个不重复的整数数组 nums 。 最大二叉树 可以用下面的算法从 nums 递归地构建: 创建一个根节点&#xff0c;其值为 nums 中的最大值。 递归地在最大值 左边 的 子数组前缀上 构建左子树。 递归地在最大值 右边 的 子数组后缀上 构建右子树。 返回 nums 构建的 最大二叉树…

Spring Security | Oauth2 /oauth/token自定义授权实现底层源码浅析与实现

Spring Security Oauth2 /oauth/token自定义授权源码分析实现过程&#xff0c;看了网上很多文章&#xff0c;分析和实现肯定存在不完整地方&#xff0c;可以在评论区指出交流。 1 /oauth/token入口 org.springframework.security.oauth2.provider.endpoint.TokenEndpoint Token…

java的参数传递机制(引用类型)

1.除了非引用类型的形参传递&#xff0c;还有引用类型的变量形参传递&#xff0c;但引用类型的形参变量传递与非引用类型是不同的&#xff01;&#xff01;&#xff01; public class MethodDemo2 {public static void main(String[] args) {int[] arr new int[]{10,20,30,9}…

MySQL入门到中级知识汇总2024

文章目录 1.揭开MySQL的神秘面纱2. SQL的基本命令实操3. 数据库的备份与恢复4. MySQL常用的数据类型&#xff08;列类型&#xff09;5. 数据类型之小数类型的使用6. 表的创建7. 表的修改8. mysql事务9. mysql表类型和存储引擎10. mysql的视图11. mysql的管理 1.揭开MySQL的神秘…

20.2 nginx

20.2 nginx 1. 学习目标2. 介绍2.1 正向代理2.2 反向代理2.3 动态静态资源分离2.4 nginx优缺点3. 安装3.1 Linux安装****************************************************************************************************************************************************…

Charles-抓包工具的使用

文章目录 Charles简介与安装Charles的简介Charles的安装Charles 安装证书 抓包在PC端抓取HTTPS请求在移动端进行抓包移动端配置Androd端配置iOS端配置 Charles使用小技巧&#xff1a; 模拟慢速网络 Charles简介与安装 Charles的简介 Charles 是在 PC 端常用的网络封包截取工具…

块设备驱动(1)-什么是块设备驱动?块设备驱动概念总结

1.块设备驱动概念 块设备驱动是针对存储设备&#xff0c;例如SD卡、EMMC、NAND FLASH、NOR FLSASH。 块设备驱动以块为单位进行访问、最小寻址单位是扇区、一个块中包含多个扇区、支持随机访问、带缓冲区&#xff0c;&#xff0c;当发生写入操作时&#xff0c;并不会立马操作硬…