视频地址:尚硅谷大数据项目《在线教育之实时数仓》_哔哩哔哩_bilibili
目录
第9章 数仓开发之DWD层
P053
P054
P055
P056
P057
P058
P059
P060
P061
P062
P063
P064
P065
第9章 数仓开发之DWD层
P053
9.6 用户域用户注册事务事实表
9.6.1 主要任务
读取用户表数据,读取页面日志数据,关联两张表补全用户注册操作的维度信息,写入 Kafka 用户注册主题。
P054
9.6.4 代码
Kafka | Apache Flink
P055
//TODO 4 读取page主题数据dwd_traffic_page_log
//TODO 5 过滤用户表数据
//TODO 6 过滤注册日志的维度信息
P056
package com.atguigu.edu.realtime.app.dwd.db;
import com.atguigu.edu.realtime.util.EnvUtil;
import com.atguigu.edu.realtime.util.KafkaUtil;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
/**
* @author yhm
* @create 2023-04-23 17:36
*/
public class DwdUserUserRegister {
public static void main(String[] args) {
//TODO 1 创建环境设置状态后端
StreamExecutionEnvironment env = EnvUtil.getExecutionEnvironment(4);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
//TODO 2 设置表的TTL
// tableEnv.getConfig().setIdleStateRetention(Duration.ofSeconds(10L));
EnvUtil.setTableEnvStateTtl(tableEnv, "10s");
//TODO 3 读取topic_db的数据
String groupId = "dwd_user_user_register2";
KafkaUtil.createTopicDb(tableEnv, groupId);
// tableEnv.executeSql("select * from topic_db").print();
//TODO 4 读取page主题数据dwd_traffic_page_log
tableEnv.executeSql("CREATE TABLE page_log (\n" +
" `common` map<string,string>,\n" +
" `page` string,\n" +
" `ts` string\n" +
")" + KafkaUtil.getKafkaDDL("dwd_traffic_page_log", groupId));
//TODO 5 过滤用户表数据
Table userRegister = tableEnv.sqlQuery("select \n" +
" data['id'] id,\n" +
" data['create_time'] create_time,\n" +
" date_format(data['create_time'],'yyyy-MM-dd') create_date,\n" +
" ts\n" +
"from topic_db\n" +
"where `table`='user_info'\n" +
"and `type`='insert'" +
"");
tableEnv.createTemporaryView("user_register", userRegister);
//TODO 6 过滤注册日志的维度信息
Table dimLog = tableEnv.sqlQuery("select \n" +
" common['uid'] user_id,\n" +
" common['ch'] channel, \n" +
" common['ar'] province_id, \n" +
" common['vc'] version_code, \n" +
" common['sc'] source_id, \n" +
" common['mid'] mid_id, \n" +
" common['ba'] brand, \n" +
" common['md'] model, \n" +
" common['os'] operate_system \n" +
"from page_log\n" +
"where common['uid'] is not null \n"
//"and page['page_id'] = 'register'"
);
tableEnv.createTemporaryView("dim_log", dimLog);
//TODO 7 join两张表格
Table resultTable = tableEnv.sqlQuery("select \n" +
" ur.id user_id,\n" +
" create_time register_time,\n" +
" create_date register_date,\n" +
" channel,\n" +
" province_id,\n" +
" version_code,\n" +
" source_id,\n" +
" mid_id,\n" +
" brand,\n" +
" model,\n" +
" operate_system,\n" +
" ts, \n" +
" current_row_timestamp() row_op_ts \n" +
"from user_register ur \n" +
"left join dim_log pl \n" +
"on ur.id=pl.user_id");
tableEnv.createTemporaryView("result_table", resultTable);
//TODO 8 写出数据到kafka
tableEnv.executeSql(" create table dwd_user_user_register(\n" +
" user_id string,\n" +
" register_time string,\n" +
" register_date string,\n" +
" channel string,\n" +
" province_id string,\n" +
" version_code string,\n" +
" source_id string,\n" +
" mid_id string,\n" +
" brand string,\n" +
" model string,\n" +
" operate_system string,\n" +
" ts string,\n" +
" row_op_ts TIMESTAMP_LTZ(3) ,\n" +
" PRIMARY KEY (user_id) NOT ENFORCED\n" +
")" + KafkaUtil.getUpsertKafkaDDL("dwd_user_user_register"));
tableEnv.executeSql("insert into dwd_user_user_register " +
"select * from result_table");
}
}
P057
[atguigu@node001 ~]$ kafka-console-consumer.sh --bootstrap-server node001:9092 --topic dwd_user_user_register
P058
9.7 交易域下单事务事实表
9.7.1 主要任务
从 Kafka 读取 topic_db 主题数据,筛选订单明细表和订单表数据,读取 dwd_traffic_page_log 主题数据,筛选订单页日志,关联三张表获得交易域下单事务事实表,写入 Kafka 对应主题。
P059
DwdTradeOrderDetail,TODO1 ~ TODO7
P060
[atguigu@node001 ~]$ kafka-console-consumer.sh --bootstrap-server node001:9092 --topic dwd_trade_order_detail
package com.atguigu.edu.realtime.app.dwd.db;
import com.atguigu.edu.realtime.util.EnvUtil;
import com.atguigu.edu.realtime.util.KafkaUtil;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
/**
* @author yhm
* @create 2023-04-24 15:18
*/
public class DwdTradeOrderDetail {
public static void main(String[] args) {
//TODO 1 创建环境设置状态后端
StreamExecutionEnvironment env = EnvUtil.getExecutionEnvironment(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
//TODO 2 设置表格TTL
EnvUtil.setTableEnvStateTtl(tableEnv, "10s");
//TODO 3 从kafka读取业务数据topic_db
String groupId = "dwd_trade_order_detail";
KafkaUtil.createTopicDb(tableEnv, groupId);
//TODO 4 从kafka读取日志数据dwd_traffic_page_log
tableEnv.executeSql("create table page_log(\n" +
" common map<String,String>,\n" +
" page map<String,String>,\n" +
" ts string\n" +
")" + KafkaUtil.getKafkaDDL("dwd_traffic_page_log", groupId));
//TODO 5 过滤订单详情表
Table orderDetail = tableEnv.sqlQuery("select \n" +
" data['id'] id,\n" +
" data['course_id'] course_id,\n" +
" data['course_name'] course_name,\n" +
" data['order_id'] order_id,\n" +
" data['user_id'] user_id,\n" +
" data['origin_amount'] origin_amount,\n" +
" data['coupon_reduce'] coupon_reduce,\n" +
" data['final_amount'] final_amount,\n" +
" data['create_time'] create_time,\n" +
" date_format(data['create_time'], 'yyyy-MM-dd') create_date,\n" +
" ts\n" +
"from topic_db\n" +
"where `table`='order_detail'\n" +
"and type='insert'");
tableEnv.createTemporaryView("order_detail", orderDetail);
//TODO 6 过滤订单表
Table orderInfo = tableEnv.sqlQuery("select \n" +
" data['id'] id, \n" +
" data['out_trade_no'] out_trade_no, \n" +
" data['trade_body'] trade_body, \n" +
" data['session_id'] session_id, \n" +
" data['province_id'] province_id\n" +
"from topic_db\n" +
"where `table`='order_info'\n" +
"and type='insert'");
tableEnv.createTemporaryView("order_info", orderInfo);
//TODO 7 获取下单日志
Table orderLog = tableEnv.sqlQuery("select \n" +
" common['sid'] session_id,\n" +
" common['sc'] source_id\n" +
"from page_log\n" +
"where page['page_id']='order'");
tableEnv.createTemporaryView("order_log", orderLog);
//TODO 8 关联3张表格
Table resultTable = tableEnv.sqlQuery("select \n" +
" od.id,\n" +
" od.course_id,\n" +
" od.course_name,\n" +
" od.order_id,\n" +
" od.user_id,\n" +
" od.origin_amount,\n" +
" od.coupon_reduce,\n" +
" od.final_amount,\n" +
" od.create_time,\n" +
" oi.out_trade_no,\n" +
" oi.trade_body,\n" +
" oi.session_id,\n" +
" oi.province_id,\n" +
" ol.source_id,\n" +
" ts,\n" +
" current_row_timestamp() row_op_ts \n" +
"from order_detail od\n" +
"join order_info oi\n" +
"on od.order_id=oi.id\n" +
"left join order_log ol\n" +
"on oi.session_id=ol.session_id");
tableEnv.createTemporaryView("result_table", resultTable);
//TODO 9 创建upsert kafka
tableEnv.executeSql("create table dwd_trade_order_detail( \n" +
" id string,\n" +
" course_id string,\n" +
" course_name string,\n" +
" order_id string,\n" +
" user_id string,\n" +
" origin_amount string,\n" +
" coupon_reduce string,\n" +
" final_amount string,\n" +
" create_time string,\n" +
" out_trade_no string,\n" +
" trade_body string,\n" +
" session_id string,\n" +
" province_id string,\n" +
" source_id string,\n" +
" ts string,\n" +
" row_op_ts TIMESTAMP_LTZ(3) ,\n" +
" primary key(id) not enforced \n" +
")" + KafkaUtil.getUpsertKafkaDDL("dwd_trade_order_detail"));
//TODO 10 写出数据到kafka
tableEnv.executeSql("insert into dwd_trade_order_detail " +
"select * from result_table");
}
}
P061
9.8 交易域支付成功事务事实表
9.8.1 主要任务
从 Kafka topic_db主题筛选支付成功数据、从dwd_trade_order_detail主题中读取订单事实数据,关联两张表形成支付成功宽表,写入 Kafka 支付成功主题。
P062
[atguigu@node001 ~]$ kafka-console-consumer.sh --bootstrap-server node001:9092 --topic dwd_trade_pay_suc_detail
[atguigu@node001 ~]$ cd /opt/module/data_mocker/01-onlineEducation/
[atguigu@node001 01-onlineEducation]$ java -jar edu2021-mock-2022-06-18.jar
P063
9.9 事实表动态分流
9.9.1 主要任务
DWD层余下的事实表都是从topic_db中取业务数据库一张表的变更数据,按照某些条件过滤后写入Kafka的对应主题,它们处理逻辑相似且较为简单,可以结合配置表动态分流在同一个程序中处理。
读取优惠券领用数据,写入 Kafka 优惠券领用主题。
P064
BaseDBApp
//TODO 1 创建环境设置状态后端
//TODO 2 读取业务topic_db主流数据
//TODO 3 清洗转换topic_db数据
//TODO 4 使用flinkCDC读取dwd配置表数据
//TODO 5 创建广播流
//TODO 6 连接两个流
//TODO 7 过滤出需要的dwd表格数据
P065
package com.atguigu.edu.realtime.app.dwd.db;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.edu.realtime.app.func.DwdBroadcastProcessFunction;
import com.atguigu.edu.realtime.bean.DimTableProcess;
import com.atguigu.edu.realtime.bean.DwdTableProcess;
import com.atguigu.edu.realtime.util.EnvUtil;
import com.atguigu.edu.realtime.util.KafkaUtil;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.producer.ProducerRecord;
/**
* @author yhm
* @create 2023-04-24 18:05
*/
public class BaseDBApp {
public static void main(String[] args) throws Exception {
//TODO 1 创建环境设置状态后端
StreamExecutionEnvironment env = EnvUtil.getExecutionEnvironment(1);
//TODO 2 读取业务topic_db主流数据
String groupId = "base_DB_app";
DataStreamSource<String> dbStream = env.fromSource(KafkaUtil.getKafkaConsumer("topic_db", groupId), WatermarkStrategy.noWatermarks(), "base_db");
//TODO 3 清洗转换topic_db数据
SingleOutputStreamOperator<JSONObject> jsonObjStream = dbStream.flatMap(new FlatMapFunction<String, JSONObject>() {
@Override
public void flatMap(String value, Collector<JSONObject> out) throws Exception {
try {
JSONObject jsonObject = JSON.parseObject(value);
String type = jsonObject.getString("type");
if (!("bootstrap-start".equals(type) || "bootstrap-insert".equals(type) || "bootstrap-complete".equals(type))) {
out.collect(jsonObject);
}
} catch (Exception e) {
e.printStackTrace();
}
}
});
jsonObjStream.print();
//TODO 4 使用flinkCDC读取dwd配置表数据
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("node001")
.port(3306)
.username("root")
.password("123456")
.databaseList("edu_config")
.tableList("edu_config.dwd_table_process")
// 定义读取数据的格式
.deserializer(new JsonDebeziumDeserializationSchema())
// 设置读取数据的模式
.startupOptions(StartupOptions.initial())
.build();
//TODO 5 创建广播流
DataStreamSource<String> tableProcessStream = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "dwd_table_process");
MapStateDescriptor<String, DwdTableProcess> dwdTableProcessState = new MapStateDescriptor<>("dwd_table_process_state", String.class, DwdTableProcess.class);
BroadcastStream<String> broadcastDS = tableProcessStream.broadcast(dwdTableProcessState);
//TODO 6 连接两个流
BroadcastConnectedStream<JSONObject, String> connectStream = jsonObjStream.connect(broadcastDS);
//TODO 7 过滤出需要的dwd表格数据
SingleOutputStreamOperator<JSONObject> processStream = connectStream.process(new DwdBroadcastProcessFunction(dwdTableProcessState));
//TODO 8 将数据写出到kafka
processStream.sinkTo(KafkaUtil.getKafkaProducerBySchema(new KafkaRecordSerializationSchema<JSONObject>() {
@Override
public ProducerRecord<byte[], byte[]> serialize(JSONObject element, KafkaSinkContext context, Long timestamp) {
String topic = element.getString("sink_table");
element.remove("sink_table");
return new ProducerRecord<byte[], byte[]>(topic, element.toJSONString().getBytes());
}
}, "base_db_app_trans"));
//TODO 9 执行任务
env.execute();
}
}
[atguigu@node001 ~]$ kafka-console-consumer.sh --bootstrap-server node001:9092 --topic dwd_trade_cart_add
[atguigu@node001 ~]$ cd /opt/module/data_mocker/01-onlineEducation/
[atguigu@node001 01-onlineEducation]$ java -jar edu2021-mock-2022-06-18.jar
启动maxwell。