物流实时数仓:数仓搭建(DWD)一

系列文章目录

物流实时数仓:采集通道搭建
物流实时数仓:数仓搭建
物流实时数仓:数仓搭建(DIM)
物流实时数仓:数仓搭建(DWD)一


文章目录

  • 系列文章目录
  • 前言
  • 一、文件编写
    • 1.目录创建
    • 2.bean文件
      • 1.DwdOrderDetailOriginBean
      • 2.DwdOrderInfoOriginBean
      • 3.DwdTradeCancelDetailBean
      • 4.DwdTradeOrderDetailBean
      • 5.DwdTradePaySucDetailBean
      • 6.DwdTransBoundFinishDetailBean
      • 7.DwdTransDeliverSucDetailBean
      • 8.DwdTransDispatchDetailBean
      • 9.DwdTransReceiveDetailBean
      • 10.DwdTransSignDetailBean
    • 3.DwdOrderRelevantApp
  • 二、代码测试
    • 1.环境启动
    • 2.kafka消费者
    • 3.修改配置
    • 4.测试结果
  • 总结


前言

这次博客我们进行DWD层的搭建,内容比较多,一次可能写不完。
在这里插入图片描述
以上就是本次博客需要完成的内容,简单来说就是,从kafka读取数据,然后根据不同的关键字,将其从主流中进行分离,然后在写入各自的kafka中以便后续的操作


一、文件编写

1.目录创建

我们现在beans中创建后边需要的的bean
在这里插入图片描述
然后在dwd目录中创建此次需要的app
在这里插入图片描述

2.bean文件

1.DwdOrderDetailOriginBean

package com.atguigu.tms.realtime.beans;

import lombok.Data;

import java.math.BigDecimal;

/**
 *订单货物明细实体类
 */

@Data
public class DwdOrderDetailOriginBean {
    // 编号(主键)
    String id;

    // 运单id
    String orderId;

    // 货物类型
    String cargoType;

    // 长cm
    Integer volumnLength;

    // 宽cm
    Integer volumnWidth;

    // 高cm
    Integer volumnHeight;

    // 重量 kg
    BigDecimal weight;

    // 创建时间
    String createTime;

    // 更新时间
    String updateTime;

    // 是否删除
    String isDeleted;
}

2.DwdOrderInfoOriginBean

package com.atguigu.tms.realtime.beans;

import lombok.Data;

import java.math.BigDecimal;

/**
 * 订单实体类
 */
@Data
public class DwdOrderInfoOriginBean {
    // 编号(主键)
    String id;

    // 运单号
    String orderNo;

    // 运单状态
    String status;

    // 取件类型,1为网点自寄,2为上门取件
    String collectType;

    // 客户id
    String userId;

    // 收件人小区id
    String receiverComplexId;

    // 收件人省份id
    String receiverProvinceId;

    // 收件人城市id
    String receiverCityId;

    // 收件人区县id
    String receiverDistrictId;

    // 收件人姓名
    String receiverName;

    // 发件人小区id
    String senderComplexId;

    // 发件人省份id
    String senderProvinceId;

    // 发件人城市id
    String senderCityId;

    // 发件人区县id
    String senderDistrictId;

    // 发件人姓名
    String senderName;

    // 支付方式
    String paymentType;

    // 货物个数
    Integer cargoNum;

    // 金额
    BigDecimal amount;

    // 预计到达时间
    Long estimateArriveTime;

    // 距离,单位:公里
    BigDecimal distance;

    // 创建时间
    String createTime;

    // 更新时间
    String updateTime;

    // 是否删除
    String isDeleted;
}

3.DwdTradeCancelDetailBean

package com.atguigu.tms.realtime.beans;

import com.atguigu.tms.realtime.utils.DateFormatUtil;
import lombok.Data;

import java.math.BigDecimal;

/**
 * 交易域:取消运单事务事实表实体类
 */
@Data
public class DwdTradeCancelDetailBean {
    // 运单明细ID
    String id;

    // 运单id
    String orderId;

    // 货物类型
    String cargoType;

    // 长cm
    Integer volumeLength;

    // 宽cm
    Integer volumeWidth;

    // 高cm
    Integer volumeHeight;

    // 重量 kg
    BigDecimal weight;

    // 取消时间
    String cancelTime;

    // 运单号
    String orderNo;

    // 运单状态
    String status;

    // 取件类型,1为网点自寄,2为上门取件
    String collectType;

    // 客户id
    String userId;

    // 收件人小区id
    String receiverComplexId;

    // 收件人省份id
    String receiverProvinceId;

    // 收件人城市id
    String receiverCityId;

    // 收件人区县id
    String receiverDistrictId;

    // 收件人姓名
    String receiverName;

    // 发件人小区id
    String senderComplexId;

    // 发件人省份id
    String senderProvinceId;

    // 发件人城市id
    String senderCityId;

    // 发件人区县id
    String senderDistrictId;

    // 发件人姓名
    String senderName;

    // 支付方式
    String paymentType;

    // 货物个数
    Integer cargoNum;

    // 金额
    BigDecimal amount;

    // 预计到达时间
    String estimateArriveTime;

    // 距离,单位:公里
    BigDecimal distance;

    // 时间戳
    Long ts;

    public void mergeBean(DwdOrderDetailOriginBean detailOriginBean, DwdOrderInfoOriginBean infoOriginBean) {
        // 合并原始明细字段
        this.id = detailOriginBean.id;
        this.orderId = detailOriginBean.orderId;
        this.cargoType = detailOriginBean.cargoType;
        this.volumeLength = detailOriginBean.volumnLength;
        this.volumeWidth = detailOriginBean.volumnWidth;
        this.volumeHeight = detailOriginBean.volumnHeight;
        this.weight = detailOriginBean.weight;

        // 合并原始订单字段
        this.orderNo = infoOriginBean.orderNo;
        this.status = infoOriginBean.status;
        this.collectType = infoOriginBean.collectType;
        this.userId = infoOriginBean.userId;
        this.receiverComplexId = infoOriginBean.receiverComplexId;
        this.receiverProvinceId = infoOriginBean.receiverProvinceId;
        this.receiverCityId = infoOriginBean.receiverCityId;
        this.receiverDistrictId = infoOriginBean.receiverDistrictId;
        this.receiverName = infoOriginBean.receiverName;
        this.senderComplexId = infoOriginBean.senderComplexId;
        this.senderProvinceId = infoOriginBean.senderProvinceId;
        this.senderCityId = infoOriginBean.senderCityId;
        this.senderDistrictId = infoOriginBean.senderDistrictId;
        this.senderName = infoOriginBean.senderName;
        this.paymentType = infoOriginBean.paymentType;
        this.cargoNum = infoOriginBean.cargoNum;
        this.amount = infoOriginBean.amount;
        this.estimateArriveTime = DateFormatUtil.toYmdHms(
                infoOriginBean.estimateArriveTime - 8 * 60 * 60 * 1000);
        this.distance = infoOriginBean.distance;
        this.cancelTime =
                DateFormatUtil.toYmdHms(DateFormatUtil.toTs(
                        infoOriginBean.updateTime.replaceAll("T", " ")
                                .replaceAll("Z", ""), true)
                        + 8 * 60 * 60 * 1000);
        this.ts = DateFormatUtil.toTs(
                infoOriginBean.updateTime.replaceAll("T", " ")
                        .replaceAll("Z", ""), true)
                + 8 * 60 * 60 * 1000;
    }
}

4.DwdTradeOrderDetailBean

package com.atguigu.tms.realtime.beans;

import com.atguigu.tms.realtime.utils.DateFormatUtil;
import lombok.Data;

import java.math.BigDecimal;

/**
 *交易域:下单事务事实表实体类
 */
@Data
public class DwdTradeOrderDetailBean {
    // 运单明细ID
    String id;

    // 运单id
    String orderId;

    // 货物类型
    String cargoType;

    // 长cm
    Integer volumeLength;

    // 宽cm
    Integer volumeWidth;

    // 高cm
    Integer volumeHeight;

    // 重量 kg
    BigDecimal weight;

    // 下单时间
    String orderTime;

    // 运单号
    String orderNo;

    // 运单状态
    String status;

    // 取件类型,1为网点自寄,2为上门取件
    String collectType;

    // 客户id
    String userId;

    // 收件人小区id
    String receiverComplexId;

    // 收件人省份id
    String receiverProvinceId;

    // 收件人城市id
    String receiverCityId;

    // 收件人区县id
    String receiverDistrictId;

    // 收件人姓名
    String receiverName;

    // 发件人小区id
    String senderComplexId;

    // 发件人省份id
    String senderProvinceId;

    // 发件人城市id
    String senderCityId;

    // 发件人区县id
    String senderDistrictId;

    // 发件人姓名
    String senderName;

    // 支付方式
    String paymentType;

    // 货物个数
    Integer cargoNum;

    // 金额
    BigDecimal amount;

    // 预计到达时间
    String estimateArriveTime;

    // 距离,单位:公里
    BigDecimal distance;

    // 时间戳
    Long ts;

    public void mergeBean(DwdOrderDetailOriginBean detailOriginBean, DwdOrderInfoOriginBean infoOriginBean) {
        // 合并原始明细字段
        this.id = detailOriginBean.id;
        this.orderId = detailOriginBean.orderId;
        this.cargoType = detailOriginBean.cargoType;
        this.volumeLength = detailOriginBean.volumnLength;
        this.volumeWidth = detailOriginBean.volumnWidth;
        this.volumeHeight = detailOriginBean.volumnHeight;
        this.weight = detailOriginBean.weight;
        this.orderTime =
                DateFormatUtil.toYmdHms(DateFormatUtil.toTs(
                        detailOriginBean.createTime.replaceAll("T", " ")
                                .replaceAll("Z", ""), true)
                        + 8 * 60 * 60 * 1000);
        this.ts = DateFormatUtil.toTs(
                detailOriginBean.createTime.replaceAll("T", " ")
                        .replaceAll("Z", ""), true)
                + 8 * 60 * 60 * 1000;

        // 合并原始订单字段
        this.orderNo = infoOriginBean.orderNo;
        this.status = infoOriginBean.status;
        this.collectType = infoOriginBean.collectType;
        this.userId = infoOriginBean.userId;
        this.receiverComplexId = infoOriginBean.receiverComplexId;
        this.receiverProvinceId = infoOriginBean.receiverProvinceId;
        this.receiverCityId = infoOriginBean.receiverCityId;
        this.receiverDistrictId = infoOriginBean.receiverDistrictId;
        this.receiverName = infoOriginBean.receiverName;
        this.senderComplexId = infoOriginBean.senderComplexId;
        this.senderProvinceId = infoOriginBean.senderProvinceId;
        this.senderCityId = infoOriginBean.senderCityId;
        this.senderDistrictId = infoOriginBean.senderDistrictId;
        this.senderName = infoOriginBean.senderName;
        this.paymentType = infoOriginBean.paymentType;
        this.cargoNum = infoOriginBean.cargoNum;
        this.amount = infoOriginBean.amount;
        this.estimateArriveTime = DateFormatUtil.toYmdHms(
                infoOriginBean.estimateArriveTime - 8 * 60 * 60 * 1000);
        this.distance = infoOriginBean.distance;
    }
}

5.DwdTradePaySucDetailBean

package com.atguigu.tms.realtime.beans;
import com.atguigu.tms.realtime.utils.DateFormatUtil;
import lombok.Data;

import java.math.BigDecimal;

/**
 *交易域:支付成功事务事实表实体类
 */
@Data
public class DwdTradePaySucDetailBean {
    // 运单明细ID
    String id;

    // 运单id
    String orderId;

    // 货物类型
    String cargoType;

    // 长cm
    Integer volumeLength;

    // 宽cm
    Integer volumeWidth;

    // 高cm
    Integer volumeHeight;

    // 重量 kg
    BigDecimal weight;

    // 支付时间
    String payTime;

    // 运单号
    String orderNo;

    // 运单状态
    String status;

    // 取件类型,1为网点自寄,2为上门取件
    String collectType;

    // 客户id
    String userId;

    // 收件人小区id
    String receiverComplexId;

    // 收件人省份id
    String receiverProvinceId;

    // 收件人城市id
    String receiverCityId;

    // 收件人区县id
    String receiverDistrictId;

    // 收件人姓名
    String receiverName;

    // 发件人小区id
    String senderComplexId;

    // 发件人省份id
    String senderProvinceId;

    // 发件人城市id
    String senderCityId;

    // 发件人区县id
    String senderDistrictId;

    // 发件人姓名
    String senderName;

    // 支付方式
    String paymentType;

    // 货物个数
    Integer cargoNum;

    // 金额
    BigDecimal amount;

    // 预计到达时间
    String estimateArriveTime;

    // 距离,单位:公里
    BigDecimal distance;

    // 时间戳
    Long ts;

    public void mergeBean(DwdOrderDetailOriginBean detailOriginBean, DwdOrderInfoOriginBean infoOriginBean) {
        // 合并原始明细字段
        this.id = detailOriginBean.id;
        this.orderId = detailOriginBean.orderId;
        this.cargoType = detailOriginBean.cargoType;
        this.volumeLength = detailOriginBean.volumnLength;
        this.volumeWidth = detailOriginBean.volumnWidth;
        this.volumeHeight = detailOriginBean.volumnHeight;
        this.weight = detailOriginBean.weight;

        // 合并原始订单字段
        this.orderNo = infoOriginBean.orderNo;
        this.status = infoOriginBean.status;
        this.collectType = infoOriginBean.collectType;
        this.userId = infoOriginBean.userId;
        this.receiverComplexId = infoOriginBean.receiverComplexId;
        this.receiverProvinceId = infoOriginBean.receiverProvinceId;
        this.receiverCityId = infoOriginBean.receiverCityId;
        this.receiverDistrictId = infoOriginBean.receiverDistrictId;
        this.receiverName = infoOriginBean.receiverName;
        this.senderComplexId = infoOriginBean.senderComplexId;
        this.senderProvinceId = infoOriginBean.senderProvinceId;
        this.senderCityId = infoOriginBean.senderCityId;
        this.senderDistrictId = infoOriginBean.senderDistrictId;
        this.senderName = infoOriginBean.senderName;
        this.paymentType = infoOriginBean.paymentType;
        this.cargoNum = infoOriginBean.cargoNum;
        this.amount = infoOriginBean.amount;
        this.estimateArriveTime = DateFormatUtil.toYmdHms(
                infoOriginBean.estimateArriveTime - 8 * 60 * 60 * 1000);
        this.distance = infoOriginBean.distance;

        this.payTime =
                DateFormatUtil.toYmdHms(DateFormatUtil.toTs(
                        infoOriginBean.updateTime.replaceAll("T", " ")
                                .replaceAll("Z", ""), true)
                        + 8 * 60 * 60 * 1000);
        this.ts = DateFormatUtil.toTs(
                infoOriginBean.updateTime.replaceAll("T", " ")
                        .replaceAll("Z", ""), true)
                + 8 * 60 * 60 * 1000;
    }
}

6.DwdTransBoundFinishDetailBean

package com.atguigu.tms.realtime.beans;

import com.atguigu.tms.realtime.utils.DateFormatUtil;
import lombok.Data;

import java.math.BigDecimal;
/**
 *物流域:转运完成事务事实表实体类
 */
@Data
public class DwdTransBoundFinishDetailBean {
    // 运单明细ID
    String id;

    // 运单id
    String orderId;

    // 货物类型
    String cargoType;

    // 长cm
    Integer volumeLength;

    // 宽cm
    Integer volumeWidth;

    // 高cm
    Integer volumeHeight;

    // 重量 kg
    BigDecimal weight;

    // 发单时间
    String boundFinishTime;

    // 运单号
    String orderNo;

    // 运单状态
    String status;

    // 取件类型,1为网点自寄,2为上门取件
    String collectType;

    // 客户id
    String userId;

    // 收件人小区id
    String receiverComplexId;

    // 收件人省份id
    String receiverProvinceId;

    // 收件人城市id
    String receiverCityId;

    // 收件人区县id
    String receiverDistrictId;

    // 收件人姓名
    String receiverName;

    // 发件人小区id
    String senderComplexId;

    // 发件人省份id
    String senderProvinceId;

    // 发件人城市id
    String senderCityId;

    // 发件人区县id
    String senderDistrictId;

    // 发件人姓名
    String senderName;

    // 支付方式
    String paymentType;

    // 货物个数
    Integer cargoNum;

    // 金额
    BigDecimal amount;

    // 预计到达时间
    String estimateArriveTime;

    // 距离,单位:公里
    BigDecimal distance;

    // 时间戳
    Long ts;

    public void mergeBean(DwdOrderDetailOriginBean detailOriginBean, DwdOrderInfoOriginBean infoOriginBean) {
        // 合并原始明细字段
        this.id = detailOriginBean.id;
        this.orderId = detailOriginBean.orderId;
        this.cargoType = detailOriginBean.cargoType;
        this.volumeLength = detailOriginBean.volumnLength;
        this.volumeWidth = detailOriginBean.volumnWidth;
        this.volumeHeight = detailOriginBean.volumnHeight;
        this.weight = detailOriginBean.weight;

        // 合并原始订单字段
        this.orderNo = infoOriginBean.orderNo;
        this.status = infoOriginBean.status;
        this.collectType = infoOriginBean.collectType;
        this.userId = infoOriginBean.userId;
        this.receiverComplexId = infoOriginBean.receiverComplexId;
        this.receiverProvinceId = infoOriginBean.receiverProvinceId;
        this.receiverCityId = infoOriginBean.receiverCityId;
        this.receiverDistrictId = infoOriginBean.receiverDistrictId;
        this.receiverName = infoOriginBean.receiverName;
        this.senderComplexId = infoOriginBean.senderComplexId;
        this.senderProvinceId = infoOriginBean.senderProvinceId;
        this.senderCityId = infoOriginBean.senderCityId;
        this.senderDistrictId = infoOriginBean.senderDistrictId;
        this.senderName = infoOriginBean.senderName;
        this.paymentType = infoOriginBean.paymentType;
        this.cargoNum = infoOriginBean.cargoNum;
        this.amount = infoOriginBean.amount;
        this.estimateArriveTime = DateFormatUtil.toYmdHms(
                infoOriginBean.estimateArriveTime - 8 * 60 * 60 * 1000);
        this.distance = infoOriginBean.distance;
        this.boundFinishTime =
                DateFormatUtil.toYmdHms(DateFormatUtil.toTs(
                        infoOriginBean.updateTime.replaceAll("T", " ")
                                .replaceAll("Z", ""), true)
                        + 8 * 60 * 60 * 1000);
        this.ts = DateFormatUtil.toTs(
                infoOriginBean.updateTime.replaceAll("T", " ")
                        .replaceAll("Z", ""), true)
                + 8 * 60 * 60 * 1000;
    }
}

7.DwdTransDeliverSucDetailBean

package com.atguigu.tms.realtime.beans;

import com.atguigu.tms.realtime.utils.DateFormatUtil;
import lombok.Data;

import java.math.BigDecimal;
/**
 *物流域:派送成功事务事实表实体类
 */
@Data
public class DwdTransDeliverSucDetailBean {
    // 运单明细ID
    String id;

    // 运单id
    String orderId;

    // 货物类型
    String cargoType;

    // 长cm
    Integer volumeLength;

    // 宽cm
    Integer volumeWidth;

    // 高cm
    Integer volumeHeight;

    // 重量 kg
    BigDecimal weight;

    // 派送成功时间
    String deliverTime;

    // 运单号
    String orderNo;

    // 运单状态
    String status;

    // 取件类型,1为网点自寄,2为上门取件
    String collectType;

    // 客户id
    String userId;

    // 收件人小区id
    String receiverComplexId;

    // 收件人省份id
    String receiverProvinceId;

    // 收件人城市id
    String receiverCityId;

    // 收件人区县id
    String receiverDistrictId;

    // 收件人姓名
    String receiverName;

    // 发件人小区id
    String senderComplexId;

    // 发件人省份id
    String senderProvinceId;

    // 发件人城市id
    String senderCityId;

    // 发件人区县id
    String senderDistrictId;

    // 发件人姓名
    String senderName;

    // 支付方式
    String paymentType;

    // 货物个数
    Integer cargoNum;

    // 金额
    BigDecimal amount;

    // 预计到达时间
    String estimateArriveTime;

    // 距离,单位:公里
    BigDecimal distance;

    // 时间戳
    Long ts;

    public void mergeBean(DwdOrderDetailOriginBean detailOriginBean, DwdOrderInfoOriginBean infoOriginBean) {
        // 合并原始明细字段
        this.id = detailOriginBean.id;
        this.orderId = detailOriginBean.orderId;
        this.cargoType = detailOriginBean.cargoType;
        this.volumeLength = detailOriginBean.volumnLength;
        this.volumeWidth = detailOriginBean.volumnWidth;
        this.volumeHeight = detailOriginBean.volumnHeight;
        this.weight = detailOriginBean.weight;

        // 合并原始订单字段
        this.orderNo = infoOriginBean.orderNo;
        this.status = infoOriginBean.status;
        this.collectType = infoOriginBean.collectType;
        this.userId = infoOriginBean.userId;
        this.receiverComplexId = infoOriginBean.receiverComplexId;
        this.receiverProvinceId = infoOriginBean.receiverProvinceId;
        this.receiverCityId = infoOriginBean.receiverCityId;
        this.receiverDistrictId = infoOriginBean.receiverDistrictId;
        this.receiverName = infoOriginBean.receiverName;
        this.senderComplexId = infoOriginBean.senderComplexId;
        this.senderProvinceId = infoOriginBean.senderProvinceId;
        this.senderCityId = infoOriginBean.senderCityId;
        this.senderDistrictId = infoOriginBean.senderDistrictId;
        this.senderName = infoOriginBean.senderName;
        this.paymentType = infoOriginBean.paymentType;
        this.cargoNum = infoOriginBean.cargoNum;
        this.amount = infoOriginBean.amount;
        this.estimateArriveTime = DateFormatUtil.toYmdHms(
                infoOriginBean.estimateArriveTime - 8 * 60 * 60 * 1000);
        this.distance = infoOriginBean.distance;
        this.deliverTime =
                DateFormatUtil.toYmdHms(DateFormatUtil.toTs(
                        infoOriginBean.updateTime.replaceAll("T", " ")
                                .replaceAll("Z", ""), true)
                        + 8 * 60 * 60 * 1000);
        this.ts = DateFormatUtil.toTs(
                infoOriginBean.updateTime.replaceAll("T", " ")
                        .replaceAll("Z", ""), true)
                + 8 * 60 * 60 * 1000;
    }
}

8.DwdTransDispatchDetailBean

package com.atguigu.tms.realtime.beans;

import com.atguigu.tms.realtime.utils.DateFormatUtil;

import lombok.Data;

import java.math.BigDecimal;

/**
 *物流域:发单事务事实表实体类
 */
@Data
public class DwdTransDispatchDetailBean {
    // 运单明细ID
    String id;

    // 运单id
    String orderId;

    // 货物类型
    String cargoType;

    // 长cm
    Integer volumeLength;

    // 宽cm
    Integer volumeWidth;

    // 高cm
    Integer volumeHeight;

    // 重量 kg
    BigDecimal weight;

    // 发单时间
    String dispatchTime;

    // 运单号
    String orderNo;

    // 运单状态
    String status;

    // 取件类型,1为网点自寄,2为上门取件
    String collectType;

    // 客户id
    String userId;

    // 收件人小区id
    String receiverComplexId;

    // 收件人省份id
    String receiverProvinceId;

    // 收件人城市id
    String receiverCityId;

    // 收件人区县id
    String receiverDistrictId;

    // 收件人姓名
    String receiverName;

    // 发件人小区id
    String senderComplexId;

    // 发件人省份id
    String senderProvinceId;

    // 发件人城市id
    String senderCityId;

    // 发件人区县id
    String senderDistrictId;

    // 发件人姓名
    String senderName;

    // 支付方式
    String paymentType;

    // 货物个数
    Integer cargoNum;

    // 金额
    BigDecimal amount;

    // 预计到达时间
    String estimateArriveTime;

    // 距离,单位:公里
    BigDecimal distance;

    // 时间戳
    Long ts;

    public void mergeBean(DwdOrderDetailOriginBean detailOriginBean, DwdOrderInfoOriginBean infoOriginBean) {
        // 合并原始明细字段
        this.id = detailOriginBean.id;
        this.orderId = detailOriginBean.orderId;
        this.cargoType = detailOriginBean.cargoType;
        this.volumeLength = detailOriginBean.volumnLength;
        this.volumeWidth = detailOriginBean.volumnWidth;
        this.volumeHeight = detailOriginBean.volumnHeight;
        this.weight = detailOriginBean.weight;

        // 合并原始订单字段
        this.orderNo = infoOriginBean.orderNo;
        this.status = infoOriginBean.status;
        this.collectType = infoOriginBean.collectType;
        this.userId = infoOriginBean.userId;
        this.receiverComplexId = infoOriginBean.receiverComplexId;
        this.receiverProvinceId = infoOriginBean.receiverProvinceId;
        this.receiverCityId = infoOriginBean.receiverCityId;
        this.receiverDistrictId = infoOriginBean.receiverDistrictId;
        this.receiverName = infoOriginBean.receiverName;
        this.senderComplexId = infoOriginBean.senderComplexId;
        this.senderProvinceId = infoOriginBean.senderProvinceId;
        this.senderCityId = infoOriginBean.senderCityId;
        this.senderDistrictId = infoOriginBean.senderDistrictId;
        this.senderName = infoOriginBean.senderName;
        this.paymentType = infoOriginBean.paymentType;
        this.cargoNum = infoOriginBean.cargoNum;
        this.amount = infoOriginBean.amount;
        this.estimateArriveTime = DateFormatUtil.toYmdHms(
                infoOriginBean.estimateArriveTime - 8 * 60 * 60 * 1000);
        this.distance = infoOriginBean.distance;
        this.dispatchTime =
                DateFormatUtil.toYmdHms(DateFormatUtil.toTs(
                        infoOriginBean.updateTime.replaceAll("T", " ")
                                .replaceAll("Z", ""), true)
                        + 8 * 60 * 60 * 1000);
        this.ts = DateFormatUtil.toTs(
                infoOriginBean.updateTime.replaceAll("T", " ")
                        .replaceAll("Z", ""), true)
                + 8 * 60 * 60 * 1000;
    }
}

9.DwdTransReceiveDetailBean

package com.atguigu.tms.realtime.beans;

import com.atguigu.tms.realtime.utils.DateFormatUtil;
import lombok.Data;

import java.math.BigDecimal;

/**
 *物流域:揽收(接单)事务事实表实体类
 */
@Data
public class DwdTransReceiveDetailBean {
    // 运单明细ID
    String id;

    // 运单id
    String orderId;

    // 货物类型
    String cargoType;

    // 长cm
    Integer volumeLength;

    // 宽cm
    Integer volumeWidth;

    // 高cm
    Integer volumeHeight;

    // 重量 kg
    BigDecimal weight;

    // 揽收时间
    String receiveTime;

    // 运单号
    String orderNo;

    // 运单状态
    String status;

    // 取件类型,1为网点自寄,2为上门取件
    String collectType;

    // 客户id
    String userId;

    // 收件人小区id
    String receiverComplexId;

    // 收件人省份id
    String receiverProvinceId;

    // 收件人城市id
    String receiverCityId;

    // 收件人区县id
    String receiverDistrictId;

    // 收件人姓名
    String receiverName;

    // 发件人小区id
    String senderComplexId;

    // 发件人省份id
    String senderProvinceId;

    // 发件人城市id
    String senderCityId;

    // 发件人区县id
    String senderDistrictId;

    // 发件人姓名
    String senderName;

    // 支付方式
    String paymentType;

    // 货物个数
    Integer cargoNum;

    // 金额
    BigDecimal amount;

    // 预计到达时间
    String estimateArriveTime;

    // 距离,单位:公里
    BigDecimal distance;

    // 时间戳
    Long ts;

    public void mergeBean(DwdOrderDetailOriginBean detailOriginBean, DwdOrderInfoOriginBean infoOriginBean) {
        // 合并原始明细字段
        this.id = detailOriginBean.id;
        this.orderId = detailOriginBean.orderId;
        this.cargoType = detailOriginBean.cargoType;
        this.volumeLength = detailOriginBean.volumnLength;
        this.volumeWidth = detailOriginBean.volumnWidth;
        this.volumeHeight = detailOriginBean.volumnHeight;
        this.weight = detailOriginBean.weight;

        // 合并原始订单字段
        this.orderNo = infoOriginBean.orderNo;
        this.status = infoOriginBean.status;
        this.collectType = infoOriginBean.collectType;
        this.userId = infoOriginBean.userId;
        this.receiverComplexId = infoOriginBean.receiverComplexId;
        this.receiverProvinceId = infoOriginBean.receiverProvinceId;
        this.receiverCityId = infoOriginBean.receiverCityId;
        this.receiverDistrictId = infoOriginBean.receiverDistrictId;
        this.receiverName = infoOriginBean.receiverName;
        this.senderComplexId = infoOriginBean.senderComplexId;
        this.senderProvinceId = infoOriginBean.senderProvinceId;
        this.senderCityId = infoOriginBean.senderCityId;
        this.senderDistrictId = infoOriginBean.senderDistrictId;
        this.senderName = infoOriginBean.senderName;
        this.paymentType = infoOriginBean.paymentType;
        this.cargoNum = infoOriginBean.cargoNum;
        this.amount = infoOriginBean.amount;
        this.estimateArriveTime = DateFormatUtil.toYmdHms(
                infoOriginBean.estimateArriveTime - 8 * 60 * 60 * 1000);
        this.distance = infoOriginBean.distance;
        this.receiveTime =
                DateFormatUtil.toYmdHms(DateFormatUtil.toTs(
                        infoOriginBean.updateTime.replaceAll("T", " ")
                                .replaceAll("Z", ""), true)
                        + 8 * 60 * 60 * 1000);
        this.ts = DateFormatUtil.toTs(
                infoOriginBean.updateTime.replaceAll("T", " ")
                        .replaceAll("Z", ""), true)
                + 8 * 60 * 60 * 1000;
    }
}

10.DwdTransSignDetailBean

package com.atguigu.tms.realtime.beans;

import com.atguigu.tms.realtime.utils.DateFormatUtil;
import lombok.Data;

import java.math.BigDecimal;

/**
 * 物流域:签收事务事实表实体类
 */
@Data
public class DwdTransSignDetailBean {
    // 运单明细ID
    String id;

    // 运单id
    String orderId;

    // 货物类型
    String cargoType;

    // 长cm
    Integer volumeLength;

    // 宽cm
    Integer volumeWidth;

    // 高cm
    Integer volumeHeight;

    // 重量 kg
    BigDecimal weight;

    // 签收时间
    String signTime;

    // 运单号
    String orderNo;

    // 运单状态
    String status;

    // 取件类型,1为网点自寄,2为上门取件
    String collectType;

    // 客户id
    String userId;

    // 收件人小区id
    String receiverComplexId;

    // 收件人省份id
    String receiverProvinceId;

    // 收件人城市id
    String receiverCityId;

    // 收件人区县id
    String receiverDistrictId;

    // 收件人姓名
    String receiverName;

    // 发件人小区id
    String senderComplexId;

    // 发件人省份id
    String senderProvinceId;

    // 发件人城市id
    String senderCityId;

    // 发件人区县id
    String senderDistrictId;

    // 发件人姓名
    String senderName;

    // 支付方式
    String paymentType;

    // 货物个数
    Integer cargoNum;

    // 金额
    BigDecimal amount;

    // 预计到达时间
    String estimateArriveTime;

    // 距离,单位:公里
    BigDecimal distance;

    // 时间戳
    Long ts;

    public void mergeBean(DwdOrderDetailOriginBean detailOriginBean, DwdOrderInfoOriginBean infoOriginBean) {
        // 合并原始明细字段
        this.id = detailOriginBean.id;
        this.orderId = detailOriginBean.orderId;
        this.cargoType = detailOriginBean.cargoType;
        this.volumeLength = detailOriginBean.volumnLength;
        this.volumeWidth = detailOriginBean.volumnWidth;
        this.volumeHeight = detailOriginBean.volumnHeight;
        this.weight = detailOriginBean.weight;

        // 合并原始订单字段
        this.orderNo = infoOriginBean.orderNo;
        this.status = infoOriginBean.status;
        this.collectType = infoOriginBean.collectType;
        this.userId = infoOriginBean.userId;
        this.receiverComplexId = infoOriginBean.receiverComplexId;
        this.receiverProvinceId = infoOriginBean.receiverProvinceId;
        this.receiverCityId = infoOriginBean.receiverCityId;
        this.receiverDistrictId = infoOriginBean.receiverDistrictId;
        this.receiverName = infoOriginBean.receiverName;
        this.senderComplexId = infoOriginBean.senderComplexId;
        this.senderProvinceId = infoOriginBean.senderProvinceId;
        this.senderCityId = infoOriginBean.senderCityId;
        this.senderDistrictId = infoOriginBean.senderDistrictId;
        this.senderName = infoOriginBean.senderName;
        this.paymentType = infoOriginBean.paymentType;
        this.cargoNum = infoOriginBean.cargoNum;
        this.amount = infoOriginBean.amount;
        this.estimateArriveTime = DateFormatUtil.toYmdHms(
                infoOriginBean.estimateArriveTime - 8 * 60 * 60 * 1000);
        this.distance = infoOriginBean.distance;
        this.signTime =
                DateFormatUtil.toYmdHms(DateFormatUtil.toTs(
                        infoOriginBean.updateTime.replaceAll("T", " ")
                                .replaceAll("Z", ""), true)
                        + 8 * 60 * 60 * 1000);
        this.ts = DateFormatUtil.toTs(
                infoOriginBean.updateTime.replaceAll("T", " ")
                        .replaceAll("Z", ""), true)
                + 8 * 60 * 60 * 1000;
    }
}

3.DwdOrderRelevantApp

package com.atguigu.tms.realtime.app.dwd;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.tms.realtime.beans.*;
import com.atguigu.tms.realtime.utils.CreateEnvUtil;
import com.atguigu.tms.realtime.utils.KafkaUtil;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SideOutputDataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

public class DwdOrderRelevantApp {
    public static void main(String[] args) throws Exception {
        // 1.环境准备
        StreamExecutionEnvironment env = CreateEnvUtil.getStreamEnv(args);
        env.setParallelism(4);

        // 2.从Kafka读数据
        String topic = "tms_ods";
        String groupId = "dwd_order_relevant_group";
        KafkaSource<String> kafkaSource = KafkaUtil.getKafkaSource(topic, groupId, args);
        SingleOutputStreamOperator<String> kafkaStrDS = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka_source")
                .uid("kafka_source");

        // 3.筛选订单和订单明细数据
        SingleOutputStreamOperator<String> filterDS = kafkaStrDS.filter((FilterFunction<String>) jsonStr -> {
            JSONObject jsonObj = JSON.parseObject(jsonStr);
            String tableName = jsonObj.getJSONObject("source").getString("table");
            return "order_info".equals(tableName) || "order_cargo".equals(tableName);
        });
//        filterDS.print(">>>");
        // 4.对流中的数据类型进行转换 jsonStr->jsonObj
        SingleOutputStreamOperator<JSONObject> jsonObjDS = filterDS.map((MapFunction<String, JSONObject>) jsonStr -> {
            JSONObject jsonObj = JSON.parseObject(jsonStr);
            String tableName = jsonObj.getJSONObject("source").getString("table");
            jsonObj.put("table", tableName);
            jsonObj.remove("source");
            jsonObj.remove("transaction");
            return jsonObj;
        });

//        jsonObjDS.print(">>>");

        // 5.按照order_id进行分组
        KeyedStream<JSONObject, String> keyDS = jsonObjDS.keyBy((KeySelector<JSONObject, String>) jsonObj -> {
            String table = jsonObj.getString("table");
            if ("order_info".equals(table)) {
                return jsonObj.getJSONObject("after").getString("id");

            }
            return jsonObj.getJSONObject("after").getString("order_id");
        });
//        keyDS.print(">>>");

        // 6.定义侧输出流标签 下单放到主流,支付成功、取消运单、揽收(接单)、发单 转运完成、派送成功、签收放到侧输出流
        // 支付成功明细流标签
        OutputTag<String> paySucTag = new OutputTag<String>("dwd_trade_pay_suc_detail") {
        };
        // 取消运单明细流标签
        OutputTag<String> cancelDetailTag = new OutputTag<String>("dwd_trade_cancel_detail") {
        };
        // 揽收明细流标签
        OutputTag<String> receiveDetailTag = new OutputTag<String>("dwd_trans_receive_detail") {
        };
        // 发单明细流标签
        OutputTag<String> dispatchDetailTag = new OutputTag<String>("dwd_trans_dispatch_detail") {
        };
        // 转运完成明细流标签
        OutputTag<String> boundFinishDetailTag = new OutputTag<String>("dwd_trans_bound_finish_detail") {
        };
        // 派送成功明细流标签
        OutputTag<String> deliverSucDetailTag = new OutputTag<String>("dwd_trans_deliver_detail") {
        };
        // 签收明细流标签
        OutputTag<String> signDetailTag = new OutputTag<String>("dwd_trans_sign_detail") {
        };

        // 7.分流
        SingleOutputStreamOperator<String> orderDetailDS = keyDS.process(
                new KeyedProcessFunction<String, JSONObject, String>() {

                    private ValueState<DwdOrderInfoOriginBean> infoBeanState;
                    private ValueState<DwdOrderDetailOriginBean> detailBeanState;

                    @Override
                    public void open(Configuration parameters) {
                        ValueStateDescriptor<DwdOrderInfoOriginBean> InfoOriginBeanStateDescriptor
                                = new ValueStateDescriptor<>("infoBeanState", DwdOrderInfoOriginBean.class);
                        InfoOriginBeanStateDescriptor.enableTimeToLive(StateTtlConfig.newBuilder(Time.seconds(5)).build());

                        infoBeanState = getRuntimeContext().getState(InfoOriginBeanStateDescriptor);

                        ValueStateDescriptor detailBeanStateDescriptor
                                = new ValueStateDescriptor<>("detailBeanState", DwdOrderDetailOriginBean.class);
                        detailBeanState = getRuntimeContext().getState(detailBeanStateDescriptor);

                    }

                    @Override
                    public void processElement(JSONObject jsonObj, KeyedProcessFunction<String, JSONObject, String>.Context ctx, Collector<String> out) throws Exception {
                        String table = jsonObj.getString("table");
                        String op = jsonObj.getString("op");
                        JSONObject data = jsonObj.getJSONObject("after");
                        if ("order_info".equals(table)) {
                            //处理的是订单数据
                            DwdOrderInfoOriginBean infoOriginBean = data.toJavaObject(DwdOrderInfoOriginBean.class);

                            // 脱敏
                            String senderName = infoOriginBean.getSenderName();
                            String receiverName = infoOriginBean.getReceiverName();

                            senderName = senderName.charAt(0) + senderName.substring(1).replaceAll(".", "\\*");
                            receiverName = receiverName.charAt(0) + receiverName.substring(1).replaceAll(".", "\\*");

                            infoOriginBean.setSenderName(senderName);
                            infoOriginBean.setReceiverName(receiverName);

                            DwdOrderDetailOriginBean detailOriginBean = detailBeanState.value();
                            if ("c".equals(op)) {
                                // 下单操作
                                if (detailOriginBean == null) {
                                    // 订单数据 比明细数据先到,将订单数据放到状态中
                                    infoBeanState.update(infoOriginBean);
                                } else {
                                    // 说明订单数据来之前,明细数据已经来到了,直接关联
                                    DwdTradeOrderDetailBean dwdTradeOrderDetailBean = new DwdTradeOrderDetailBean();
                                    dwdTradeOrderDetailBean.mergeBean(detailOriginBean, infoOriginBean);
                                    // 将下单业务过程数据 放到主流中
                                    out.collect(JSON.toJSONString(dwdTradeOrderDetailBean));
                                }

                            } else if ("u".equals(op) && detailOriginBean != null) {
                                // 其它操作
                                // 获取修改前的数据
                                JSONObject oldData = jsonObj.getJSONObject("before");
                                // 获取修改前的状态值
                                String oldStatus = oldData.getString("status");
                                String status = infoOriginBean.getStatus();
                                if (!oldStatus.equals(status)) {
                                    // 说明修改的是status字段
                                    String changeLog = oldStatus + " -> " + status;
                                    switch (changeLog) {
                                        case "60010 -> 60020":
                                            // 处理支付成功数据
                                            DwdTradePaySucDetailBean dwdTradePaySucDetailBean = new DwdTradePaySucDetailBean();
                                            dwdTradePaySucDetailBean.mergeBean(detailOriginBean, infoOriginBean);
                                            ctx.output(paySucTag, JSON.toJSONString(dwdTradePaySucDetailBean));
                                            break;
                                        case "60020 -> 60030":
                                            // 处理揽收明细数据
                                            DwdTransReceiveDetailBean dwdTransReceiveDetailBean = new DwdTransReceiveDetailBean();
                                            dwdTransReceiveDetailBean.mergeBean(detailOriginBean, infoOriginBean);
                                            ctx.output(receiveDetailTag, JSON.toJSONString(dwdTransReceiveDetailBean));
                                            break;
                                        case "60040 -> 60050":
                                            // 处理发单明细数据
                                            DwdTransDispatchDetailBean dispatchDetailBean = new DwdTransDispatchDetailBean();
                                            dispatchDetailBean.mergeBean(detailOriginBean, infoOriginBean);
                                            ctx.output(dispatchDetailTag, JSON.toJSONString(dispatchDetailBean));
                                            break;
                                        case "60050 -> 60060":
                                            // 处理转运完成明细数据
                                            DwdTransBoundFinishDetailBean boundFinishDetailBean = new DwdTransBoundFinishDetailBean();
                                            boundFinishDetailBean.mergeBean(detailOriginBean, infoOriginBean);
                                            ctx.output(boundFinishDetailTag, JSON.toJSONString(boundFinishDetailBean));
                                            break;
                                        case "60060 -> 60070":
                                            // 处理派送成功数据
                                            DwdTransDeliverSucDetailBean dwdTransDeliverSucDetailBean = new DwdTransDeliverSucDetailBean();
                                            dwdTransDeliverSucDetailBean.mergeBean(detailOriginBean, infoOriginBean);
                                            ctx.output(deliverSucDetailTag, JSON.toJSONString(dwdTransDeliverSucDetailBean));
                                            break;
                                        case "60070 -> 60080":
                                            // 处理签收明细数据
                                            DwdTransSignDetailBean dwdTransSignDetailBean = new DwdTransSignDetailBean();
                                            dwdTransSignDetailBean.mergeBean(detailOriginBean, infoOriginBean);
                                            ctx.output(signDetailTag, JSON.toJSONString(dwdTransSignDetailBean));
                                            // 签收后订单数据不会再发生变化,状态可以清除
                                            detailBeanState.clear();
                                            break;
                                        default:
                                            if (status.equals("60999")) {
                                                DwdTradeCancelDetailBean dwdTradeCancelDetailBean = new DwdTradeCancelDetailBean();
                                                dwdTradeCancelDetailBean.mergeBean(detailOriginBean, infoOriginBean);
                                                ctx.output(cancelDetailTag, JSON.toJSONString(dwdTradeCancelDetailBean));
                                                // 取消后订单数据不会再发生变化,状态可以清除
                                                detailBeanState.clear();
                                            }
                                            break;
                                    }
                                }
                            }
                        } else {
                            // 处理订单明细
                            DwdOrderDetailOriginBean detailOriginBean = data.toJavaObject(DwdOrderDetailOriginBean.class);
                            if ("c".equals(op)) {
                                detailBeanState.update(detailOriginBean);
                                // 获取状态中存放的订单数据 注意:只有下单操作,并且订单数据先到,明细数据后到的情况,才会从状态中拿到订单数据
                                DwdOrderInfoOriginBean infoOriginBean = infoBeanState.value();
                                if (infoOriginBean != null) {
                                    //属于下单业务过程
                                    DwdTradeOrderDetailBean dwdTradeOrderDetailBean = new DwdTradeOrderDetailBean();
                                    dwdTradeOrderDetailBean.mergeBean(detailOriginBean, infoOriginBean);
                                    // 将下单业务过程数据 放到主流中
                                    out.collect(JSON.toJSONString(dwdTradeOrderDetailBean));
                                }
                            }
                        }
                    }
                }
        ).uid("process_data");

        // 8.从主流中提取侧输出流
        // 支付成功明细流
        //8.1 支付成功明细流
        SideOutputDataStream<String> paySucDS = orderDetailDS.getSideOutput(paySucTag);

        // 8.2 取消运单明细流
        SideOutputDataStream<String> cancelDetailDS = orderDetailDS.getSideOutput(cancelDetailTag);
        // 8.3 揽收明细流
        SideOutputDataStream<String> receiveDetailDS = orderDetailDS.getSideOutput(receiveDetailTag);
        // 8.4 发单明细流
        SideOutputDataStream<String> dispatchDetailDS = orderDetailDS.getSideOutput(dispatchDetailTag);
        // 8.5 转运成功明细流
        SideOutputDataStream<String> boundFinishDetailDS = orderDetailDS.getSideOutput(boundFinishDetailTag);
        // 8.6 派送成功明细流
        SideOutputDataStream<String> deliverSucDetailDS = orderDetailDS.getSideOutput(deliverSucDetailTag);
        // 8.7 签收明细流
        SideOutputDataStream<String> signDetailDS = orderDetailDS.getSideOutput(signDetailTag);

        // 9.将不同流的数据写到kafka的不同主题中
        // 9.1.1 交易域下单明细主题
        String detailTopic = "tms_dwd_trade_order_detail";
        // 9.1.2 交易域支付成功明细主题
        String paySucDetailTopic = "tms_dwd_trade_pay_suc_detail";
        // 9.1.3 交易域取消运单明细主题
        String cancelDetailTopic = "tms_dwd_trade_cancel_detail";
        // 9.1.4 物流域接单(揽收)明细主题
        String receiveDetailTopic = "tms_dwd_trans_receive_detail";
        // 9.1.5 物流域发单明细主题
        String dispatchDetailTopic = "tms_dwd_trans_dispatch_detail";
        // 9.1.6 物流域转运完成明细主题
        String boundFinishDetailTopic = "tms_dwd_trans_bound_finish_detail";
        // 9.1.7 物流域派送成功明细主题
        String deliverSucDetailTopic = "tms_dwd_trans_deliver_detail";
        // 9.1.8 物流域签收明细主题
        String signDetailTopic = "tms_dwd_trans_sign_detail";

        // 9.2 发送数据到 Kafka
        // 9.2.1 运单明细数据
        KafkaSink<String> kafkaProducer = KafkaUtil.getKafkaSink(detailTopic, args);
        orderDetailDS.print("~~");
        orderDetailDS
                .sinkTo(kafkaProducer)
                .uid("order_detail_sink");

        // 9.2.2 支付成功明细数据
        KafkaSink<String> paySucKafkaProducer = KafkaUtil.getKafkaSink(paySucDetailTopic, args);
        paySucDS.print("!!");
        paySucDS
                .sinkTo(paySucKafkaProducer)
                .uid("pay_suc_detail_sink");

        // 9.2.3 取消运单明细数据
        KafkaSink<String> cancelKafkaProducer = KafkaUtil.getKafkaSink(cancelDetailTopic, args);
        cancelDetailDS.print("@@");
        cancelDetailDS
                .sinkTo(cancelKafkaProducer)
                .uid("cancel_detail_sink");

        // 9.2.4 揽收明细数据
        KafkaSink<String> receiveKafkaProducer = KafkaUtil.getKafkaSink(receiveDetailTopic, args);
        receiveDetailDS.print("##");
        receiveDetailDS
                .sinkTo(receiveKafkaProducer)
                .uid("reveive_detail_sink");

        // 9.2.5 发单明细数据
        KafkaSink<String> dispatchKafkaProducer = KafkaUtil.getKafkaSink(dispatchDetailTopic, args);
        dispatchDetailDS.print("$$");
        dispatchDetailDS
                .sinkTo(dispatchKafkaProducer)
                .uid("dispatch_detail_sink");

        // 9.2.6 转运完成明细主题
        KafkaSink<String> boundFinishKafkaProducer = KafkaUtil.getKafkaSink(boundFinishDetailTopic, args);
        boundFinishDetailDS.print("%%");
        boundFinishDetailDS
                .sinkTo(boundFinishKafkaProducer)
                .uid("bound_finish_detail_sink");

        // 9.2.7 派送成功明细数据
        KafkaSink<String> deliverSucKafkaProducer = KafkaUtil.getKafkaSink(deliverSucDetailTopic, args);
        deliverSucDetailDS.print("^^");
        deliverSucDetailDS
                .sinkTo(deliverSucKafkaProducer)
                .uid("deliver_suc_detail_sink");

        // 9.2.8 签收明细数据
        KafkaSink<String> signKafkaProducer = KafkaUtil.getKafkaSink(signDetailTopic, args);
        signDetailDS.print("&&");
        signDetailDS
                .sinkTo(signKafkaProducer)
                .uid("sign_detail_sink");
        env.execute();
    }
}

二、代码测试

1.环境启动

hadoop,zk,kf全部启动
根据流程图可以看到,流程中没有使用到dim层的内容,所以我们不需要启动hbase。

2.kafka消费者

kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic tms_dwd_trade_order_detail
kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic tms_dwd_trade_pay_suc_detail
kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic tms_dwd_trade_cancel_detail
kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic tms_dwd_trans_receive_detail
kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic tms_dwd_trans_dispatch_detail
kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic tms_dwd_trans_bound_finish_detail
kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic tms_dwd_trans_deliver_detail
kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic tms_dwd_trans_sign_detail

一共需要查看8个消费者主题,你可以开8个窗口,也可以一个一个看,kafka如果没有消费者,会先将数据保存,等待消费,所以不需要8个主题同时消费。

3.修改配置

在这里插入图片描述

4.测试结果

先启动OdsApp和DwdOrderRelevantApp,然后生成模拟数据,之后查看kakfa消费者,有些数据可能要多生成几次才行。

kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic tms_dwd_trade_order_detail
在这里插入图片描述

kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic tms_dwd_trade_pay_suc_detail
在这里插入图片描述
kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic tms_dwd_trade_cancel_detail

kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic tms_dwd_trans_receive_detail
在这里插入图片描述
kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic tms_dwd_trans_dispatch_detail
这个主题是特殊情况,正常可能没有输出。

kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic tms_dwd_trans_bound_finish_detail
在这里插入图片描述

kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic tms_dwd_trans_deliver_detail
在这里插入图片描述
kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic tms_dwd_trans_sign_detail
在这里插入图片描述


总结

至此这篇博客的内容结束。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/245924.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

亚信科技AntDB数据库——深入了解AntDB-M元数据锁的实现(一)

锁的获取 5.1 锁的强弱 当线程已经持有的锁比新申请的锁更强时&#xff0c;认为已经持有了锁&#xff0c;无需再对申请锁类型加锁。锁的强弱指持有的锁与其他锁的不兼容集合大小&#xff0c;集合相同锁相同&#xff0c;集合更大锁更强&#xff0c;否则无强弱关系。通过锁的兼…

Kafka-Kafka基本原理与集群快速搭建

一、Kafka介绍 ​ ChatGPT对于Apache Kafka的介绍&#xff1a; Apache Kafka是一个分布式流处理平台&#xff0c;最初由LinkedIn开发并于2011年开源。它主要用于解决大规模数据的实时流式处理和数据管道问题。 Kafka是一个分布式的发布-订阅消息系统&#xff0c;可以快速地处理…

JAVA:深入探讨Map的多种遍历方式

1、简述 在现代编程中&#xff0c;Map&#xff08;映射&#xff09;是一种常见的数据结构&#xff0c;用于存储键-值对。在许多编程语言中&#xff0c;Map提供了灵活的数据组织方式&#xff0c;但为了充分发挥其功能&#xff0c;我们需要了解多种遍历方式。本文将深入探讨Map的…

龙迅LT2611UXC 双PORT LVDS转HDMI(2.0)+音频

描述&#xff1a; LT2611UXC是一个高性能的LVDS到HDMI2.0的转换器&#xff0c;用于STB&#xff0c;DVD应用程序。 LVDS输入可配置为单端口或双端口&#xff0c;有1个高速时钟通道&#xff0c;3~4个高速数据通道&#xff0c;最大运行1.2Gbps/通道&#xff0c;可支持高达9.6Gbp…

牛客网SQL训练2—SQL基础进阶

文章目录 一、基本查询二、数据过滤三&#xff1a;函数四&#xff1a;分组聚合五&#xff1a;子查询六&#xff1a;多表连接七&#xff1a;组合查询八&#xff1a;技能专项-case when使用九&#xff1a;多表连接-窗口函数十&#xff1a;技能专项-having子句十一&#xff1a;技能…

桂电|《操作系统》实验一:UNIX/LINUX及其使用环境(实验报告)

桂林电子科技大学2023-2024学年 第 一 学期 操作系统A 实验报告 实验名称 实验一 UNIX/LINUX及其使用环境 实验指导老师&#xff1a; 成绩 院 系 计算机与信息安全学院 专业 计算机科学与技术(卓越工程) 学 号 姓名 课内序…

保姆级 Keras 实现 YOLO v3 三

保姆级 Keras 实现 YOLO v3 三 一. 分配 anchor box二. 正负样本匹配规则三. 为每一个 anchor box 打标签3.1 anchor box 长什么样?3.2 每一个 anchor box 标签需要填充的信息有哪些?3.3 ( Δ x , Δ y , Δ w , Δ h ) (\Delta x, \Delta y, \Delta w, \Delta h) (Δx,Δy,…

uniapp交互反馈api的使用示例

官方文档链接&#xff1a;uni.showToast(OBJECT) | uni-app官网 1.uni.showToast({}) 显示消息提示框。 常用属性&#xff1a; title:页面提示的内容 image&#xff1a;改变提示框默认的icon图标 duration&#xff1a;提示框在页面显示多少秒才让它消失 添加了image属性后。 注…

vs code 设置了自动格式化保存 但是json 配置文件不想自动格式化

vscode 点击文件>首选项>设置>点击打开设置 >转化成文本格式 "editor.formatOnSave": true, "[json]": { "editor.formatOnSave": false }, 即可

智安网络|计算机视觉在城市交通中的前景与变化

随着科技的不断发展&#xff0c;计算机视觉技术在各个领域扮演着越来越重要的角色。在交通领域&#xff0c;计算机视觉为智能交通系统带来了许多机遇&#xff0c;为城市交通管理和出行体验提供了前所未有的可能性。 首先&#xff0c;计算机视觉技术可以用于智能交通监控。传统…

力扣题:数字与字符串间转换-12.15

力扣题-12.15 [力扣刷题攻略] Re&#xff1a;从零开始的力扣刷题生活 力扣题1&#xff1a;592. 分数加减运算 解题思想&#xff1a;首先通过对表达式进行分离&#xff0c;然后利用分数的加法原则进行计算&#xff0c;最后除以最大公因数即可 class Solution(object):def fra…

【MATLAB】数据拟合第10期-二阶多项式的局部加权回归拟合算法

有意向获取代码&#xff0c;请转文末观看代码获取方式~也可转原文链接获取~ 1 基本定义 二阶多项式局部加权回归拟合算法是一种用于回归分析的方法&#xff0c;主要通过局部加权线性回归模型来实现。以下是对二阶多项式局部加权回归拟合算法的介绍&#xff1a; 局部加权线性回…

微信小程序 实现上传图片前裁剪功能

前言 技术支持&#xff1a; wx-cropper 裁剪 总体思路是&#xff1a;安装完wx-cropper之后就它当成组件使用。在使用页面的地方引入组件就行。上传图片的逻辑不变&#xff0c;在 通过wx.chooseMedia() Api 拿到图片之后传递给子组件&#xff0c;子组件在拿到图片进行裁剪处理等…

ChatGPT如何做科研??

2023年我们进入了AI2.0时代。微软创始人比尔盖茨称ChatGPT的出现有着重大历史意义&#xff0c;不亚于互联网和个人电脑的问世。360创始人周鸿祎认为未来各行各业如果不能搭上这班车&#xff0c;就有可能被淘汰在这个数字化时代&#xff0c;如何能高效地处理文本、文献查阅、PPT…

算法:单链表反转

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 目录 一、问题描述 二、栈解法Stack 三、三指针法 总结 提示&#xff1a;以下是本篇文章正文内容&#xff0c;下面案例可供参考 一、问题描述 有个单链表&#xff0c;现…

Gitlab基础篇: Gitlab docker 安装部署、Gitlab 设置账号密码

文章目录 1、环境准备2、配置1)、初始化2)、修改gitlab配置文件3)、修改docker配置的gitlab默认端口 gitlab进阶配置gitlab 设置账号密码 1、环境准备 安装docker gitlab前确保docker环境&#xff0c;如果没有搭建docker请查阅“Linux docker 安装文档” docker 下载 gitlab容…

阿里云SLS日志服务之数据导入与加工处理

一、背景 采集vm虚拟机上的Log日志文本&#xff0c;如果需要经过特殊的加工处理&#xff0c;在本文主要讲述如何在SLS把kafka采集上来的数据经导入并加工后存储。 二、数据流转图 三、数据导入 服务地址&#xff1a;填写kafka集群的地址数据格式&#xff1a;json字符串&#…

libevent服务GET/POST的简单使用

目录 1、前言2、测试demo2.1、目录结构2.2、 测试源码2.2.1、http_server.cpp2.2.2、 http_server.h 2.3、 编译2.4、 运行结果2.4.1、测试POST2.4.2 、测试GET请求 1、前言 项目开发中经常需要使用到私有协议和Qt,Android等GUI前端通信&#xff0c;比较常用的使用POST和GET方式…

《算法通关村——透彻理解动态规划》

《算法通关村——透彻理解动态规划》 62. 不同路径 一个机器人位于一个 m x n 网格的左上角 &#xff08;起始点在下图中标记为 “Start” &#xff09;。 机器人每次只能向下或者向右移动一步。机器人试图达到网格的右下角&#xff08;在下图中标记为 “Finish” &#xff…

Matlab示例-Examine 16-QAM Using MATLAB学习笔记

​工作之余学习16-QAM 写在前面 网上看到许多示例&#xff0c;但一般都比较难以跑通。所以&#xff0c;还是老方法&#xff0c;先将matlab自带的例子研究下。 Examine 16-QAM Using MATLAB Examine 16-QAM Using MATLAB 或者&#xff0c;在matlab中&#xff0c;键入&#x…