物流实时数仓:数仓搭建(DWS)一

系列文章目录

物流实时数仓:采集通道搭建
物流实时数仓:数仓搭建
物流实时数仓:数仓搭建(DIM)
物流实时数仓:数仓搭建(DWD)一
物流实时数仓:数仓搭建(DWD)二
物流实时数仓:数仓搭建(DWS)一


文章目录

  • 系列文章目录
  • 前言
  • 一、代码编写
    • 1.文件创建
      • 1.主程序
      • 2.实体类
      • 3.自定义触发器
      • 4.自定义聚合函数
      • 5.在HbaseUtil中添加查询方法
      • 6.JedisUtil工具类
      • 7.封装DimUtil工具类,使用旁路缓存优化查询维度
      • 8.修改实体类TmsConfigDimBean
      • 9.传递op
      • 10.获取线程池的工具类
      • 11.异步关联函数DimAsyncFunction
      • 12.ClickHouseUtil工具类
      • 13. DimSinkFunction
      • 14. TransientSink注解
    • 2.主程序
    • 3.开窗聚合
      • 1.MyTriggerFunction
      • 2.MyAggregationFunction
    • 4.关联维度信息
      • 1.DimAsyncFunction
      • 2.DimJoinFunction
      • 3.ThreadPoolUtil
      • 4.DimUtil
      • 5.JedisUtil
      • 6. HbaseUtil
      • 7. DwsBoundOrgSortDayBean
      • 8.补充维度字段
      • 9. MyBroadcastProcessFunction
      • 10. DimSinkFunction
    • 5.写入CK
      • 1. ClickHouseUtil
      • 2.TransientSink
  • 二、代码测试
    • 1.程序启动
    • 2.修改kafka分区
    • 3.ck建表
      • 1.建库
      • 2.建表
      • 3.物化视图
      • 4.查看结果
  • 总结


前言

这次博客,我们进行各机构分拣次数的统计。统计当日各机构的分拣次数,并补充城市、省份等维度信息,写入ClickHouse对应表。要求每十秒钟更新一次统计结果。

大体流程如图。
在这里插入图片描述


一、代码编写

1.文件创建

1.主程序

在这里插入图片描述

2.实体类

在这里插入图片描述

3.自定义触发器

在这里插入图片描述

4.自定义聚合函数

在这里插入图片描述

5.在HbaseUtil中添加查询方法

在这里插入图片描述

6.JedisUtil工具类

在这里插入图片描述

7.封装DimUtil工具类,使用旁路缓存优化查询维度

在这里插入图片描述

8.修改实体类TmsConfigDimBean

在这里插入图片描述

9.传递op

在这里插入图片描述

10.获取线程池的工具类

在这里插入图片描述

11.异步关联函数DimAsyncFunction

在这里插入图片描述

12.ClickHouseUtil工具类

在这里插入图片描述以上就是这次博客要更改或创建的java文件。

13. DimSinkFunction

当维度数据更新时,删除redis中的对应数据。
在这里插入图片描述

14. TransientSink注解

某些字段不需要写入ClickHouse,但对统计有帮助,我们可以通过添加自定义注解,在写出时获取字段的TransientSink注解,通过判断是否注解是否为空在写出时忽略指定字段。
在这里插入图片描述

2.主程序

DwsBoundOrgSortDay需要完成的任务如以下流程图。
在这里插入图片描述

package com.atguigu.tms.realtime.app.dws;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.tms.realtime.app.func.DimAsyncFunction;
import com.atguigu.tms.realtime.app.func.MyAggregationFunction;
import com.atguigu.tms.realtime.app.func.MyTriggerFunction;
import com.atguigu.tms.realtime.beans.DwdBoundSortBean;
import com.atguigu.tms.realtime.beans.DwsBoundOrgSortDayBean;
import com.atguigu.tms.realtime.utils.*;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.util.concurrent.TimeUnit;

public class DwsBoundOrgSortDay {
    public static void main(String[] args) throws Exception {
        // 环境准备
        StreamExecutionEnvironment env = CreateEnvUtil.getStreamEnv(args);
        env.setParallelism(4);

        // kafka读取数据
        String topic = "tms_dwd_bound_sort";
        String groupId = "dws_tms_dwd_bound_sort";

        KafkaSource<String> kafkaSource = KafkaUtil.getKafkaSource(topic, groupId, args);
        SingleOutputStreamOperator<String> kafkaStrDS = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka_source")
                .uid("kafka_source");


        // 对流中的数据进行类型转换 jsonStr-> 实体类
        SingleOutputStreamOperator<DwsBoundOrgSortDayBean> dwsBoundOrgSortDayBeanSingleOutputStreamOperator = kafkaStrDS.map(
                new MapFunction<String, DwsBoundOrgSortDayBean>() {
                    @Override
                    public DwsBoundOrgSortDayBean map(String jsonStr) throws Exception {
                        DwdBoundSortBean dwdBoundSortBean = JSON.parseObject(jsonStr, DwdBoundSortBean.class);
                        return DwsBoundOrgSortDayBean.builder()
                                .orgId(dwdBoundSortBean.getOrgId())
                                .sortCountBase(1L)
                                .ts(dwdBoundSortBean.getTs() + 8 * 60 * 60 * 1000L)
                                .build();
                    }
                }
        );


        // 指定Watermark以及提取事件事件字段
        SingleOutputStreamOperator<DwsBoundOrgSortDayBean> withWatermarkDS = dwsBoundOrgSortDayBeanSingleOutputStreamOperator.assignTimestampsAndWatermarks(
                WatermarkStrategy
                        .<DwsBoundOrgSortDayBean>forMonotonousTimestamps()
                        .withTimestampAssigner(
                                new SerializableTimestampAssigner<DwsBoundOrgSortDayBean>() {
                                    @Override
                                    public long extractTimestamp(DwsBoundOrgSortDayBean boundOrgSortDayBean, long recordTimestamp) {
                                        return boundOrgSortDayBean.getTs();
                                    }
                                }
                        )

        );

//        withWatermarkDS.print("###");

        // 按照机构id进行分组
        KeyedStream<DwsBoundOrgSortDayBean, String> keyedDS = withWatermarkDS.keyBy(DwsBoundOrgSortDayBean::getOrgId);

        // 开窗
        WindowedStream<DwsBoundOrgSortDayBean, String, TimeWindow> windowDS = keyedDS.window(TumblingEventTimeWindows.of(Time.days(1L)));

        // 指定自定义触发器
        WindowedStream<DwsBoundOrgSortDayBean, String, TimeWindow> triggerDS = windowDS.trigger(new MyTriggerFunction<>());

        // 聚合
        SingleOutputStreamOperator<DwsBoundOrgSortDayBean> aggregateDS = triggerDS.aggregate(
                new MyAggregationFunction<DwsBoundOrgSortDayBean>() {
                    @Override
                    public DwsBoundOrgSortDayBean add(DwsBoundOrgSortDayBean value, DwsBoundOrgSortDayBean accumulator) {
                        if (accumulator == null) {
                            return value;
                        }
                        accumulator.setSortCountBase(accumulator.getSortCountBase() + 1);
                        return accumulator;
                    }
                },
                new ProcessWindowFunction<DwsBoundOrgSortDayBean, DwsBoundOrgSortDayBean, String, TimeWindow>() {
                    @Override
                    public void process(String s, Context context, Iterable<DwsBoundOrgSortDayBean> elements, Collector<DwsBoundOrgSortDayBean> out) throws Exception {
                        for (DwsBoundOrgSortDayBean element : elements) {
                            // 获取窗口起始时间
                            long stt = context.window().getStart();
                            // 将窗口时间左移8小时 并转换格式
                            element.setCurDate(DateFormatUtil.toDate(stt - 8 * 60 * 60 * 1000L));
                            element.setTs(System.currentTimeMillis());
                            out.collect(element);
                        }
                    }
                }
        );


        // 关联维度(城市、省份)
        // 关联机构维度 获取机构名称
        // 异步I/O
        SingleOutputStreamOperator<DwsBoundOrgSortDayBean> withOrgNameDS = AsyncDataStream.unorderedWait(
                aggregateDS,
                new DimAsyncFunction<DwsBoundOrgSortDayBean>("dim_base_organ") {
                    @Override
                    public void join(DwsBoundOrgSortDayBean sortDayBean, JSONObject dimInfoJsonObj) {
                        sortDayBean.setOrgName(dimInfoJsonObj.getString("org_name"));
                        String orgParentId = dimInfoJsonObj.getString("org_parent_id");
                        sortDayBean.setJoinOrgId(orgParentId != null?orgParentId:sortDayBean.getOrgId());
                    }

                    @Override
                    public Tuple2<String, String> getCondition(DwsBoundOrgSortDayBean sortDayBean) {
                        return Tuple2.of("id", sortDayBean.getOrgId());
                    }
                },
                60,
                TimeUnit.SECONDS
        );


		// 补充城市ID
        SingleOutputStreamOperator<DwsBoundOrgSortDayBean> withCityIdDS = AsyncDataStream.unorderedWait(
                withOrgNameDS,
                new DimAsyncFunction<DwsBoundOrgSortDayBean>("dim_base_organ") {
                    @Override
                    public void join(DwsBoundOrgSortDayBean sortDayBean, JSONObject dimInfoJsonObj) {
                        sortDayBean.setCityId(dimInfoJsonObj.getString("region_id"));
                    }

                    @Override
                    public Tuple2<String, String> getCondition(DwsBoundOrgSortDayBean sortDayBean) {
                        return Tuple2.of("id", sortDayBean.getJoinOrgId());

                    }
                },
                60,
                TimeUnit.SECONDS
        );



        // 关联地区维度表 根据城市的id获取城市名称以及当前城市所属的省份id
        SingleOutputStreamOperator<DwsBoundOrgSortDayBean> withCityNameAndProvinceIdDS = AsyncDataStream.unorderedWait(
                withCityIdDS,
                new DimAsyncFunction<DwsBoundOrgSortDayBean>("dim_base_region_info") {
                    @Override
                    public void join(DwsBoundOrgSortDayBean sortDayBean, JSONObject dimInfoJsonObj) {
                        sortDayBean.setCityName(dimInfoJsonObj.getString("name"));
                        sortDayBean.setProvinceId(dimInfoJsonObj.getString("parent_id"));
                    }

                    @Override
                    public Tuple2<String, String> getCondition(DwsBoundOrgSortDayBean sortDayBean) {
                        return Tuple2.of("id", sortDayBean.getCityId());

                    }
                },
                60, TimeUnit.SECONDS
        );


        // 关联地区维度表 根据省份的id获取省份的名称
        SingleOutputStreamOperator<DwsBoundOrgSortDayBean> withProvinceDS = AsyncDataStream.unorderedWait(
                withCityNameAndProvinceIdDS,
                new DimAsyncFunction<DwsBoundOrgSortDayBean>("dim_base_region_info") {
                    @Override
                    public void join(DwsBoundOrgSortDayBean sortDayBean, JSONObject dimInfoJsonObj) {
                        sortDayBean.setProvinceName(dimInfoJsonObj.getString("name"));
                    }

                    @Override
                    public Tuple2<String, String> getCondition(DwsBoundOrgSortDayBean sortDayBean) {
                        return Tuple2.of("id", sortDayBean.getProvinceId());
                    }
                },
                60, TimeUnit.SECONDS

        );

        withProvinceDS.print(">>>>");


        // 将关联的结果写入ck中
        withProvinceDS.addSink(
                ClickHouseUtil.getJdbcSink("insert into dws_bound_org_sort_day_base values(?,?,?,?,?,?,?,?,?)")
        );

        env.execute();
    }
}

现在我们就按照主程序的调用来完成其他文件的编写。

3.开窗聚合

开窗之前没有用到新的函数,所以不说了。

1.MyTriggerFunction

自定义触发器

package com.atguigu.tms.realtime.app.func;

import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;

 //自定义触发器 每10s触发一次窗口计算
public class MyTriggerFunction<T>  extends Trigger<T, TimeWindow> {

    @Override
    public TriggerResult onElement(T element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
        ValueStateDescriptor<Boolean> valueStateDescriptor
                = new ValueStateDescriptor<Boolean>("isFirstState",Boolean.class);
        ValueState<Boolean> isFirstState = ctx.getPartitionedState(valueStateDescriptor);
        Boolean isFirst = isFirstState.value();
        if(isFirst == null){
            //如果是窗口中的第一个元素
            //将状态中的值进行更新
            isFirstState.update(true);
            //注册定时器  当前事件时间向下取整后 + 10s后执行
            ctx.registerEventTimeTimer(timestamp -timestamp%10000L  + 2000L);
        }else if(isFirst){
            isFirstState.update(false);
        }
        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
        return TriggerResult.CONTINUE;
    }

    //time 表示事件时间触发器 触发时间
    @Override
    public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
        long end = window.getEnd();
        if(time < end){
            if(time + 2000L < end){
                ctx.registerEventTimeTimer(time + 2000L);
            }
            return TriggerResult.FIRE;
        }
        return TriggerResult.CONTINUE;
    }

    @Override
    public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
        ctx.deleteEventTimeTimer(window.maxTimestamp());
    }
}

2.MyAggregationFunction

自定义聚合函数

package com.atguigu.tms.realtime.app.func;

import org.apache.flink.api.common.functions.AggregateFunction;

public abstract class MyAggregationFunction<T> implements AggregateFunction<T,T,T> {
    @Override
    public T createAccumulator() {
        return null;
    }

    @Override
    public T getResult(T accumulator) {
        return accumulator;
    }

    @Override
    public T merge(T a, T b) {
        return null;
    }
}

4.关联维度信息

1.DimAsyncFunction

异步I/O

package com.atguigu.tms.realtime.app.func;


import com.alibaba.fastjson.JSONObject;
import com.atguigu.tms.realtime.beans.DimJoinFunction;
import com.atguigu.tms.realtime.commom.TmsConfig;
import com.atguigu.tms.realtime.utils.DimUtil;
import com.atguigu.tms.realtime.utils.ThreadPoolUtil;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;

import java.util.Collections;
import java.util.concurrent.ExecutorService;

public abstract class DimAsyncFunction<T> extends RichAsyncFunction<T, T> implements DimJoinFunction<T> {
    private String tableName;

    public DimAsyncFunction(String tableName) {
        this.tableName = tableName;
    }


    private ExecutorService executorService;

    @Override
    public void open(Configuration parameters) throws Exception {
        executorService = ThreadPoolUtil.getInstance();
    }

    @Override
    public void asyncInvoke(T obj, ResultFuture<T> resultFuture) throws Exception {
        // 从线程池中获取线程,发送异步请求
        executorService.submit(
                new Runnable() {
                    @Override
                    public void run() {
                        // 根据流中的对象获取要作为查询条件的主键或者外键
                        Tuple2<String, String> keyNameAndValue = getCondition(obj);
                        // 根据查询条件获取维度对象
                        JSONObject dimInfoJsonObj = DimUtil.getDimInfo(TmsConfig.HBASE_NAMESPACE, tableName, keyNameAndValue);
                        // 将维度对象的属性补充到流中的对象上
                        if (dimInfoJsonObj != null) {
                            join(obj, dimInfoJsonObj);
                        }
                        // 向下游传递数据
                        resultFuture.complete(Collections.singleton(obj));

                    }
                }
        );
    }
}

2.DimJoinFunction

我们将需要DimAsyncFunction中一些需要子类实现的函数写入DimJoinFunction,当作接口接入。

package com.atguigu.tms.realtime.beans;

import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.java.tuple.Tuple2;

public interface DimJoinFunction<T> {

    void join(T obj, JSONObject dimInfoJsonObj);

    Tuple2<String, String> getCondition(T obj);
}

3.ThreadPoolUtil

异步I/O中用作创建线程池的工具类

package com.atguigu.tms.realtime.utils;

import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolUtil {
    private static volatile ThreadPoolExecutor poolExecutor;

    public static synchronized ThreadPoolExecutor getInstance() {
        if (poolExecutor == null) {
            synchronized (ThreadPoolUtil.class){
                if (poolExecutor == null) {
                    poolExecutor = new ThreadPoolExecutor(
                            4,
                            20,
                            300,
                            TimeUnit.SECONDS,
                            new LinkedBlockingDeque<Runnable>(Integer.MAX_VALUE)
                    );
                }
            }
        }

        return poolExecutor;
    }
}

4.DimUtil

在维度关联时,我们需要从hbase中获取维度信息,为了为了优化查询速度,我们引入了redis,流程如图所示
在这里插入图片描述

package com.atguigu.tms.realtime.utils;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.java.tuple.Tuple2;
import redis.clients.jedis.Jedis;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class DimUtil {
    public static JSONObject getDimInfo(String namespace, String tableName, Tuple2<String, String> nameAndValue) {
        // 获取的查询条件中的字段名以及字段值
        String keyName = nameAndValue.f0;
        String keyValue = nameAndValue.f1;

        // 拼接从Redis中查询数据的Key
        String redisKey = "dim:" + tableName.toLowerCase() + ":" + keyName + "_" + keyValue;

        // 操作Redis的客户端
        Jedis jedis = null;
        // 用于存放从Redis查询的维度数据
        String dimJsonStr = null;
        // 用于封装返回结果
        JSONObject dimJsonObj = null;

        //  先从缓存中查询维度数据
        try {
            jedis = JedisUtil.getJedis();
            dimJsonStr = jedis.get(redisKey);
            if (StringUtils.isNotEmpty(dimJsonStr)) {
                // 如果在缓存中能够找到要查询的维度
                dimJsonObj = JSON.parseObject(dimJsonStr);

            } else {
                // 如果在缓存中,没有找到要查询的维度数据
                if ("id".equals(keyName)) {
                    dimJsonObj = HbaseUtil.getRowByPrimaryKey(namespace, tableName, nameAndValue);
                } else {
                    dimJsonObj = HbaseUtil.getRowByForeignKey(namespace, tableName, nameAndValue);
                }

                if (dimJsonObj != null && jedis != null) {
                    jedis.setex(redisKey, 3600 * 24, dimJsonObj.toJSONString());
                }
            }
        } catch (Exception e) {
            log.error("从Redis中查询维度数据发生了一场", e);
        } finally {
            if (jedis != null) {
                System.out.println("关闭客户端");
                jedis.close();
            }
        }

        return dimJsonObj;


    }

    // 从Redis中删除缓存的维度数据
    public static void delCached(String tableName, Tuple2<String, String> keyNameAndValue) {
        String keyName = keyNameAndValue.f0;
        String keyValue = keyNameAndValue.f1;

        String redisKey = "dim:" + tableName.toLowerCase() + ":" + keyName + "_" + keyValue;
        Jedis jedis = null;
        try {
            jedis = JedisUtil.getJedis();
            jedis.decr(redisKey);
        }catch (Exception e){
            log.error("清除Redis中缓存的维度数据时发生了异常", e);
        }finally {
            if (jedis != null) {
                jedis.close();
            }

        }

    }
}

5.JedisUtil

用于连接reids的jedis客户端。
先在pom中引入依赖

<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <version>3.3.0</version>
</dependency>

在这里插入图片描述

package com.atguigu.tms.realtime.utils;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

public class JedisUtil {
    private static JedisPool jedisPool;

    static {
        JedisPoolConfig poolConfig = new JedisPoolConfig();
        poolConfig.setMaxTotal(1000);
        poolConfig.setMaxIdle(5);
        poolConfig.setMinIdle(5);
        poolConfig.setBlockWhenExhausted(true);
        poolConfig.setMaxWaitMillis(2000L);
        poolConfig.setTestOnBorrow(true);
        jedisPool=new JedisPool(poolConfig,"hadoop102",6379,10000);
    }

    public static Jedis getJedis(){
        System.out.println("创建Jedis客户端");
        Jedis jedis = jedisPool.getResource();
        return jedis;
    }

    public static void main(String[] args) {
        Jedis jedis = getJedis();
        String pong = jedis.ping();
        System.out.println(pong);
    }
}

6. HbaseUtil

之前我们在HbaseUtil完成了创建表和插入操作,现在来完成查询操作。

package com.atguigu.tms.realtime.utils;

import com.alibaba.fastjson.JSONObject;
import com.atguigu.tms.realtime.commom.TmsConfig;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.util.Bytes;

import java.io.IOException;

public class HbaseUtil {
    private static Connection conn;

    static {
        try {
            Configuration conf = HBaseConfiguration.create();
            conf.set("hbase.zookeeper.quorum", TmsConfig.hbase_zookeeper_quorum);
            conn = ConnectionFactory.createConnection(conf);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }


    // 创建表
    public static void createTable(String nameSpace, String tableName, String... families) {
        Admin admin = null;
        try {
            if (families.length < 1) {
                System.out.println("至少需要一个列族");
                return;
            }

            admin = conn.getAdmin();
            // 判断表是否存在
            if (admin.tableExists(TableName.valueOf(nameSpace, tableName))) {
                System.out.println(nameSpace + ":" + tableName + "已存在");
                return;
            }
            TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(TableName.valueOf(nameSpace, tableName));
            // 指定列族
            for (String family : families) {
                ColumnFamilyDescriptor familyDescriptor = ColumnFamilyDescriptorBuilder
                        .newBuilder(Bytes.toBytes(family)).build();
                builder.setColumnFamily(familyDescriptor);
            }


            admin.createTable(builder.build());
        } catch (IOException e) {
            throw new RuntimeException(e);
        } finally {
            if (admin != null) {
                try {
                    admin.close();
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }

            }
        }
    }

    // 向hbase插入对象
    public static void putPow(String namespace, String tableName, Put put) {
        BufferedMutator mutator = null;
        try {
            BufferedMutatorParams params = new BufferedMutatorParams(TableName.valueOf(namespace, tableName));
            params.writeBufferSize(5 * 1024 * 1024);
            params.setWriteBufferPeriodicFlushTimeoutMs(3000L);

            mutator = conn.getBufferedMutator(params);
            mutator.mutate(put);


        } catch (IOException e) {
            throw new RuntimeException(e);
        } finally {
            if (mutator != null) {
                try {
                    mutator.close();
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }

            }
        }
    }

    // 根据主键从Hbase表中查询一行数据
    public static JSONObject getRowByPrimaryKey(String namespace, String tableName, Tuple2<String, String> rowKeyNameAndKey) {
        Table table = null;
        JSONObject dimJsonObj = null;
        String rowKeyName = rowKeyNameAndKey.f0;
        String rowKeyValue = rowKeyNameAndKey.f1;

        try {
            table = conn.getTable(TableName.valueOf(namespace, tableName));
            Result result = table.get(new Get(Bytes.toBytes(rowKeyValue)));
            Cell[] cells = result.rawCells();
            if (cells.length > 0) {
                dimJsonObj = new JSONObject();
                dimJsonObj.put(rowKeyName, rowKeyValue);
                for (Cell cell : cells) {
                    dimJsonObj.put(Bytes.toString(CellUtil.cloneQualifier(cell)), Bytes.toString(CellUtil.cloneValue(cell)));

                }
            } else {
                System.out.println("从Hbase表中没有找到对应的维度数据");
            }


        } catch (IOException e) {
            throw new RuntimeException(e);
        } finally {
            if (table != null) {
                try {
                    table.close();
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
        }


        return dimJsonObj;

    }

    //根据外键从hbase表中查询一行数据
    public static JSONObject getRowByForeignKey(String namespace, String tableName, Tuple2<String, String> foreignKeyNameAndKey) {
        Table table = null;
        JSONObject dimJsonObj = null;
        try {
            table = conn.getTable(TableName.valueOf(namespace, tableName));
            Scan scan = new Scan();
            String foreignKeyName = foreignKeyNameAndKey.f0;
            String foreignKeyValue = foreignKeyNameAndKey.f1;


            SingleColumnValueFilter singleColumnValueFilter = new SingleColumnValueFilter(Bytes.toBytes("info"),Bytes.toBytes(foreignKeyName), CompareOperator.EQUAL,Bytes.toBytes(foreignKeyValue));
            singleColumnValueFilter.setFilterIfMissing(true);
            scan.setFilter(singleColumnValueFilter);
            ResultScanner scanner = table.getScanner(scan);
            Result result = scanner.next();
            if (result!=null){
                Cell[] cells = result.rawCells();
                if (cells.length > 0) {
                    dimJsonObj = new JSONObject();
                    dimJsonObj.put("id", Bytes.toString(result.getRow()));
                    for (Cell cell : cells) {
                        dimJsonObj.put(Bytes.toString(CellUtil.cloneQualifier(cell)), Bytes.toString(CellUtil.cloneValue(cell)));

                    }
                } else {
                    System.out.println("从Hbase表中没有找到对应的维度数据");
                }
            }

        } catch (IOException e) {
            throw new RuntimeException(e);
        } finally {
            if (table != null) {
                try {
                    table.close();
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
        }
        return dimJsonObj;

    }
}

7. DwsBoundOrgSortDayBean

自定义的工具类,其中包含我们要写入ck的字段

package com.atguigu.tms.realtime.beans;

import lombok.Builder;
import lombok.Data;

@Data
@Builder
public class DwsBoundOrgSortDayBean {
    // 统计日期
    String curDate;

    // 机构 ID
    String orgId;

    // 机构名称
    String orgName;

    // 用于关联获取省份信息的机构 ID
    @TransientSink
    String joinOrgId;

    // 城市 ID
    String cityId;

    // 城市名称
    String cityName;

    // 省份 ID
    String provinceId;

    // 省份名称
    String provinceName;

    // 分拣次数
    Long sortCountBase;

    // 时间戳
    Long ts;
}

8.补充维度字段

我代码编写我们需要维度表的外键字段,所以我们重新修改mysql维度表,添加外键字段。

DROP TABLE IF EXISTS `tms_config_dim`;
CREATE TABLE `tms_config_dim` (
  `source_table` varchar(200) NOT NULL COMMENT '数据源表',
  `sink_table` varchar(200) DEFAULT NULL COMMENT '目标表',
  `sink_family` varchar(200) DEFAULT NULL COMMENT '目标表列族',
  `sink_columns` varchar(200) DEFAULT NULL COMMENT '目标表列',
  `sink_pk` varchar(256) DEFAULT NULL COMMENT '主键字段',
  `foreign_keys` varchar(256) DEFAULT NULL COMMENT '外键查询字段',
  PRIMARY KEY (`source_table`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='物流实时配置表';

然后从新导入数据。
在这里插入图片描述
在这里插入图片描述

然后我们使用dimapp同步一下数据即可,具体方法可看Dim层搭建。

9. MyBroadcastProcessFunction

我们之前在DIM层的搭建中,在MyBroadcastProcessFunction的processElement函数中过滤掉了外键,但现在需要他,我们把它加上。
在传递前添加一段代码

// 清除Redis缓存的准备工作(传递操作类型、外键字段的k-v)
String op = jsonObj.getString("op");
if ("u".equals(op)) {
    afterJsonObj.put("op", op);

    // 从配置表中获取当前维度表关联的外键名
    String foreignKeys = tmsConfigDimBean.getForeignKeys();
    // 定义个json对象,用于存储当前维度表对应的外键名以及值
    JSONObject foreignjsonObj = new JSONObject();
    if (StringUtils.isNotEmpty(foreignKeys)) {
        String[] foreignNameArr = foreignKeys.split(",");
        for (String foreignName : foreignNameArr) {
            // 获取修改前的数据
            JSONObject before = jsonObj.getJSONObject("before");
            String foreignKeyBefore = before.getString(foreignName);
            String foreignKeyAfter = afterJsonObj.getString(foreignName);

            if (!foreignKeyBefore.equals(foreignKeyAfter)) {
                // 如果修改的是外键
                foreignjsonObj.put(foreignName, foreignKeyBefore);
            }else {
                foreignjsonObj.put(foreignName, foreignKeyBefore);

            }

        }

    }
    afterJsonObj.put("foreign_key", foreignjsonObj);
}

完成代码

package com.atguigu.tms.realtime.app.func;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.tms.realtime.beans.TmsConfigDimBean;
import com.atguigu.tms.realtime.utils.DateFormatUtil;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;

import java.sql.*;
import java.util.*;


// 自定义类 完成主流和广播流的处理
public class MyBroadcastProcessFunction extends BroadcastProcessFunction<JSONObject, String, JSONObject> {

    private MapStateDescriptor<String, TmsConfigDimBean> mapStateDescriptor;

    private Map<String, TmsConfigDimBean> configMap = new HashMap<>();

    private String username;
    private String password;

    public MyBroadcastProcessFunction(MapStateDescriptor<String, TmsConfigDimBean> mapStateDescriptor, String[] args) {
        this.mapStateDescriptor = mapStateDescriptor;
        ParameterTool parameterTool = ParameterTool.fromArgs(args);
        this.username = parameterTool.get("mysql-username", "root");
        this.password = parameterTool.get("mysql-password", "000000");

    }

    @Override
    public void open(Configuration parameters) throws Exception {
        // 将配置表中的数据进行预加载-JDBC

        Class.forName("com.mysql.cj.jdbc.Driver");
        String url = "jdbc:mysql://hadoop102:3306/tms_config?useSSL=false&useUnicode=true" +
                "&user=" + username + "&password=" + password +
                "&charset=utf8&TimeZone=Asia/Shanghai";
        Connection conn = DriverManager.getConnection(url);

        PreparedStatement ps = conn.prepareStatement("select * from tms_config.tms_config_dim");

        ResultSet rs = ps.executeQuery();

        ResultSetMetaData metaData = rs.getMetaData();

        while (rs.next()) {
            JSONObject jsonObj = new JSONObject();
            for (int i = 1; i <= metaData.getColumnCount(); i++) {
                String columnName = metaData.getColumnName(i);
                Object columValue = rs.getObject(i);
                jsonObj.put(columnName, columValue);
            }

            TmsConfigDimBean tmsConfigDimBean = jsonObj.toJavaObject(TmsConfigDimBean.class);
            configMap.put(tmsConfigDimBean.getSourceTable(), tmsConfigDimBean);

        }
        rs.close();
        ps.close();
        conn.close();

        super.open(parameters);
    }

    @Override
    public void processElement(JSONObject jsonObj, BroadcastProcessFunction<JSONObject, String, JSONObject>.ReadOnlyContext ctx, Collector<JSONObject> out) throws Exception {
        // 获取操作的业务数据库的表名
        String table = jsonObj.getString("table");
        // 获取广播状态
        ReadOnlyBroadcastState<String, TmsConfigDimBean> broadcastState = ctx.getBroadcastState(mapStateDescriptor);
        // 根据操作的业务数据库的表名 到广播状态中获取对应的配置信息
        TmsConfigDimBean tmsConfigDimBean;

        if ((tmsConfigDimBean = broadcastState.get(table)) != null || (tmsConfigDimBean = configMap.get(table)) != null) {
            // 如果对应的配置信息不为空 是维度信息

            // 获取after对象,对应的是影响的业务数据表中的一条记录
            JSONObject afterJsonObj = jsonObj.getJSONObject("after");

            // 数据脱敏
            switch (table) {
                // 员工表信息脱敏
                case "employee_info":
                    String empPassword = afterJsonObj.getString("password");
                    String empRealName = afterJsonObj.getString("real_name");
                    String idCard = afterJsonObj.getString("id_card");
                    String phone = afterJsonObj.getString("phone");

                    // 脱敏
                    empPassword = DigestUtils.md5Hex(empPassword);
                    empRealName = empRealName.charAt(0) +
                            empRealName.substring(1).replaceAll(".", "\\*");
                    //知道有这个操作  idCard是随机生成的,和标准的格式不一样 所以这里注释掉
                    // idCard = idCard.matches("(^[1-9]\\d{5}(18|19|([23]\\d))\\d{2}((0[1-9])|(10|11|12))(([0-2][1-9])|10|20|30|31)\\d{3}[0-9Xx]$)|(^[1-9]\\d{5}\\d{2}((0[1-9])|(10|11|12))(([0-2][1-9])|10|20|30|31)\\d{2}$)")
                    //     ? DigestUtils.md5Hex(idCard) : null;
                    phone = phone.matches("^(13[0-9]|14[01456879]|15[0-35-9]|16[2567]|17[0-8]|18[0-9]|19[0-35-9])\\d{8}$")
                            ? DigestUtils.md5Hex(phone) : null;

                    afterJsonObj.put("password", empPassword);
                    afterJsonObj.put("real_name", empRealName);
                    afterJsonObj.put("id_card", idCard);
                    afterJsonObj.put("phone", phone);
                    break;
                // 快递员信息脱敏
                case "express_courier":
                    String workingPhone = afterJsonObj.getString("working_phone");
                    workingPhone = workingPhone.matches("^(13[0-9]|14[01456879]|15[0-35-9]|16[2567]|17[0-8]|18[0-9]|19[0-35-9])\\d{8}$")
                            ? DigestUtils.md5Hex(workingPhone) : null;
                    afterJsonObj.put("working_phone", workingPhone);
                    break;
                // 卡车司机信息脱敏
                case "truck_driver":
                    String licenseNo = afterJsonObj.getString("license_no");
                    licenseNo = DigestUtils.md5Hex(licenseNo);
                    afterJsonObj.put("license_no", licenseNo);
                    break;
                // 卡车信息脱敏
                case "truck_info":
                    String truckNo = afterJsonObj.getString("truck_no");
                    String deviceGpsId = afterJsonObj.getString("device_gps_id");
                    String engineNo = afterJsonObj.getString("engine_no");

                    truckNo = DigestUtils.md5Hex(truckNo);
                    deviceGpsId = DigestUtils.md5Hex(deviceGpsId);
                    engineNo = DigestUtils.md5Hex(engineNo);

                    afterJsonObj.put("truck_no", truckNo);
                    afterJsonObj.put("device_gps_id", deviceGpsId);
                    afterJsonObj.put("engine_no", engineNo);
                    break;
                // 卡车型号信息脱敏
                case "truck_model":
                    String modelNo = afterJsonObj.getString("model_no");
                    modelNo = DigestUtils.md5Hex(modelNo);
                    afterJsonObj.put("model_no", modelNo);
                    break;
                // 用户地址信息脱敏
                case "user_address":
                    String addressPhone = afterJsonObj.getString("phone");
                    addressPhone = addressPhone.matches("^(13[0-9]|14[01456879]|15[0-35-9]|16[2567]|17[0-8]|18[0-9]|19[0-35-9])\\d{8}$")
                            ? DigestUtils.md5Hex(addressPhone) : null;
                    afterJsonObj.put("phone", addressPhone);
                    break;
                // 用户信息脱敏
                case "user_info":
                    String passwd = afterJsonObj.getString("passwd");
                    String realName = afterJsonObj.getString("real_name");
                    String phoneNum = afterJsonObj.getString("phone_num");
                    String email = afterJsonObj.getString("email");

                    // 脱敏
                    passwd = DigestUtils.md5Hex(passwd);
                    if (StringUtils.isNotEmpty(realName)) {
                        realName = DigestUtils.md5Hex(realName);
                        afterJsonObj.put("real_name", realName);
                    }
                    phoneNum = phoneNum.matches("^(13[0-9]|14[01456879]|15[0-35-9]|16[2567]|17[0-8]|18[0-9]|19[0-35-9])\\d{8}$")
                            ? DigestUtils.md5Hex(phoneNum) : null;
                    email = email.matches("^[a-zA-Z0-9_-]+@[a-zA-Z0-9_-]+(\\.[a-zA-Z0-9_-]+)+$")
                            ? DigestUtils.md5Hex(email) : null;

                    afterJsonObj.put("birthday", DateFormatUtil.toDate(afterJsonObj.getInteger("birthday") * 24 * 60 * 60 * 1000L));
                    afterJsonObj.put("passwd", passwd);
                    afterJsonObj.put("phone_num", phoneNum);
                    afterJsonObj.put("email", email);
                    break;
            }

            // 过滤不需要的维度属性
            String sinkColumns = tmsConfigDimBean.getSinkColumns();
            filterColum(afterJsonObj, sinkColumns);

            // 补充输出目的的表名
            String sinkTable = tmsConfigDimBean.getSinkTable();
            afterJsonObj.put("sink_table", sinkTable);

            // 补充rowKey
            String sinkPk = tmsConfigDimBean.getSinkPk();
            afterJsonObj.put("sink_pk", sinkPk);

            // 清除Redis缓存的准备工作(传递操作类型、外键字段的k-v)
            String op = jsonObj.getString("op");
            if ("u".equals(op)) {
                afterJsonObj.put("op", op);

                // 从配置表中获取当前维度表关联的外键名
                String foreignKeys = tmsConfigDimBean.getForeignKeys();
                // 定义个json对象,用于存储当前维度表对应的外键名以及值
                JSONObject foreignjsonObj = new JSONObject();
                if (StringUtils.isNotEmpty(foreignKeys)) {
                    String[] foreignNameArr = foreignKeys.split(",");
                    for (String foreignName : foreignNameArr) {
                        // 获取修改前的数据
                        JSONObject before = jsonObj.getJSONObject("before");
                        String foreignKeyBefore = before.getString(foreignName);
                        String foreignKeyAfter = afterJsonObj.getString(foreignName);

                        if (!foreignKeyBefore.equals(foreignKeyAfter)) {
                            // 如果修改的是外键
                            foreignjsonObj.put(foreignName, foreignKeyBefore);
                        }else {
                            foreignjsonObj.put(foreignName, foreignKeyBefore);

                        }

                    }

                }
                afterJsonObj.put("foreign_key", foreignjsonObj);
            }

            // 将维度数据传递
            out.collect(afterJsonObj);
        }

    }

    private void filterColum(JSONObject afterJsonObj, String sinkColumns) {
        String[] fieldArr = sinkColumns.split(",");
        List<String> fieldList = Arrays.asList(fieldArr);
        Set<Map.Entry<String, Object>> entrySet = afterJsonObj.entrySet();
        entrySet.removeIf(entry -> !fieldList.contains(entry.getKey()));

    }

    @Override
    public void processBroadcastElement(String jsonStr, BroadcastProcessFunction<JSONObject, String, JSONObject>.Context ctx, Collector<JSONObject> out) throws Exception {
        JSONObject jsonObj = JSON.parseObject(jsonStr);

        // 获取广播状态
        BroadcastState<String, TmsConfigDimBean> broadcastState = ctx.getBroadcastState(mapStateDescriptor);


        // 获取对配置表的操作类型
        String op = jsonObj.getString("op");
        if ("d".equals(op)) {
            String sourceTable = jsonObj.getJSONObject("before").getString("source_table");
            broadcastState.remove(sourceTable);
            configMap.remove(sourceTable);
        } else {
            TmsConfigDimBean configDimBean = jsonObj.getObject("after", TmsConfigDimBean.class);
            String sourceTable = configDimBean.getSourceTable();
            broadcastState.put(sourceTable, configDimBean);
            configMap.put(sourceTable, configDimBean);
        }
    }
}

10. DimSinkFunction

添加清除代码

// 如果维度数据发生了变化,将Redis中缓存的维度数据清空掉
        if ("u".equals(op)) {
            // 删除当前维度数据在Redis中对应主键的缓存
            DimUtil.delCached(sinkTable, Tuple2.of("id", jsonObj.getString("id")));
            // 删除当前维度数据在Redis中对应外键的缓存
            Set<Map.Entry<String, Object>> set = foreignKeyJsonObj.entrySet();

            for (Map.Entry<String, Object> entry : set) {
                DimUtil.delCached(sinkTable, Tuple2.of(entry.getKey(), entry.getValue().toString()));
            }
        }

完整代码

package com.atguigu.tms.realtime.app.func;

import com.alibaba.fastjson.JSONObject;
import com.atguigu.tms.realtime.commom.TmsConfig;
import com.atguigu.tms.realtime.utils.DimUtil;
import com.atguigu.tms.realtime.utils.HbaseUtil;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.hadoop.hbase.client.Put;

import java.util.Map;
import java.util.Set;

public class DimSinkFunction implements SinkFunction<JSONObject> {
    public void invoke(JSONObject jsonObj, Context context) throws Exception {
        // 获取输出目的地表名和rowKey
        String sinkTable = jsonObj.getString("sink_table");
        String sinkPk = jsonObj.getString("sink_pk");
        jsonObj.remove("sink_table");
        jsonObj.remove("sink_pk");

        String op = jsonObj.getString("op");
        jsonObj.remove("op");

        JSONObject foreignKeyJsonObj = jsonObj.getJSONObject("foreign_key");
        jsonObj.remove("foreign_key");

        // 获取json中的每一个键值对
        Set<Map.Entry<String, Object>> entrySet = jsonObj.entrySet();
        Put put = new Put(jsonObj.getString(sinkPk).getBytes());
        for (Map.Entry<String, Object> entry : entrySet) {
            if (!sinkPk.equals(entry.getKey())) {
                put.addColumn("info".getBytes(), entry.getKey().getBytes(), entry.getValue().toString().getBytes());
            }
        }
        System.out.println("向hbase表中插入数据");
        HbaseUtil.putPow(TmsConfig.HBASE_NAMESPACE, sinkTable, put);

        // 如果维度数据发生了变化,将Redis中缓存的维度数据清空掉
        if ("u".equals(op)) {
            // 删除当前维度数据在Redis中对应主键的缓存
            DimUtil.delCached(sinkTable, Tuple2.of("id", jsonObj.getString("id")));
            // 删除当前维度数据在Redis中对应外键的缓存
            Set<Map.Entry<String, Object>> set = foreignKeyJsonObj.entrySet();

            for (Map.Entry<String, Object> entry : set) {
                DimUtil.delCached(sinkTable, Tuple2.of(entry.getKey(), entry.getValue().toString()));
            }
        }
    }
}

5.写入CK

1. ClickHouseUtil

先导入需要的依赖。

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-jdbc</artifactId>
    <version>3.1.0-1.17</version>
</dependency>

<dependency>
    <groupId>ru.yandex.clickhouse</groupId>
    <artifactId>clickhouse-jdbc</artifactId>
    <version>0.3.2</version>
    <exclusions>
        <exclusion>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
        </exclusion>
        <exclusion>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-core</artifactId>
        </exclusion>
    </exclusions>
</dependency>

在这里插入图片描述
ClickHouseUtil

package com.atguigu.tms.realtime.utils;

import com.atguigu.tms.realtime.beans.TransientSink;
import com.atguigu.tms.realtime.commom.TmsConfig;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;

import java.lang.reflect.Field;
import java.sql.PreparedStatement;
import java.sql.SQLException;

public class ClickHouseUtil {
    // 获取SinkFunction
    public static <T> SinkFunction<T> getJdbcSink(String sql) {
        SinkFunction<T> sinkFunction = JdbcSink.<T>sink(
                sql,
                new JdbcStatementBuilder<T>() {
                    @Override
                    public void accept(PreparedStatement ps, T obj) throws SQLException {
                        // 将流中对象的属性给问号占位符赋值
                        // 获取单签流中对象岁数的类型 以及类中的属性
                        Field[] fieldsArr = obj.getClass().getDeclaredFields();
                        // 遍历所有属性
                        int skipNum = 0;
                        for (int i = 0; i < fieldsArr.length; i++) {
                            Field field = fieldsArr[i];
                            // 判断当前属性是否需要向流中保存
                            TransientSink transientSink = field.getAnnotation(TransientSink.class);
                            if (transientSink != null) {
                                skipNum++;
                                continue;
                            }
                            // 设置私有属性的访问权限
                            field.setAccessible(true);
                            try {
                                Object fieldValue = field.get(obj);
                                ps.setObject(i + 1 - skipNum, fieldValue);
                            } catch (IllegalAccessException e) {
                                throw new RuntimeException(e);
                            }
                        }

                    }
                },
                new JdbcExecutionOptions.Builder()
                        .withBatchSize(5000)
                        .withBatchIntervalMs(3000L)
                        .build()
                ,
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withDriverName(TmsConfig.CLICKHOUSE_DRIVER)
                        .withUrl(TmsConfig.CLICKHOUSE_URL)
                        .build()
        );
        return sinkFunction;
    }
}

2.TransientSink

package com.atguigu.tms.realtime.beans;

// 自定义主键 用于标记不需要向ck中保存的属性

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
public @interface TransientSink {
}

二、代码测试

1.程序启动

根据代码逻辑,我们需要启动以下程序。
hdfs、zk、kafka、hbase、redise、ck、OdsApp、DwdBoundRelevantApp、DimApp和DwsBoundOrgSortDay。其中DimApp只需启动一次完成维度数据更新即可。

2.修改kafka分区

再从kafka读取数据时,应该保证kafka有4个分区,不然聚合无法成功。

kafka-topics.sh --bootstrap-server hadoop102:9092 --alter --topic tms_dwd_bound_sort --partitions 4

3.ck建表

1.建库

DROP DATABASE IF EXISTS tms_realtime;
CREATE DATABASE IF NOT EXISTS tms_realtime;
USE tms_realtime;

2.建表

DROP TABLE IF EXISTS dws_bound_org_sort_day_base;
CREATE TABLE IF NOT EXISTS dws_bound_org_sort_day_base
(
    `cur_date` Date COMMENT '统计日期',
    `org_id` String COMMENT '机构ID',
    `org_name` String COMMENT '机构名称',
    `city_id` String COMMENT '城市ID',
    `city_name` String COMMENT '城市名称',
    `province_id` String COMMENT '省份ID',
    `province_name` String COMMENT '省份名称',
    `sort_count_base` UInt64 COMMENT '分拣次数',
    `ts` UInt64 COMMENT '时间戳'
)
ENGINE = MergeTree
ORDER BY (cur_date, org_id, org_name, city_id, city_name, province_id, province_name);

3.物化视图

DROP VIEW IF EXISTS dws_bound_org_sort_day;
CREATE MATERIALIZED VIEW IF NOT EXISTS dws_bound_org_sort_day
(
    `cur_date` Date, 
    `org_id` String, 
    `org_name` String, 
    `city_id` String, 
    `city_name` String, 
    `province_id` String, 
    `province_name` String, 
    `sort_count` AggregateFunction(argMax, UInt64, UInt64)
)
ENGINE = AggregatingMergeTree()
ORDER BY (cur_date, org_id, org_name, city_id, city_name, province_id, province_name)
SETTINGS index_granularity = 8192 AS
SELECT 
    cur_date, 
    org_id, 
    org_name, 
    city_id, 
    city_name, 
    province_id, 
    province_name, 
    argMaxState(sort_count_base, ts) AS sort_count
FROM dws_bound_org_sort_day_base
GROUP BY 
    cur_date, 
    org_id, 
    org_name, 
    city_id, 
    city_name, 
    province_id, 
    province_name;

4.查看结果

当运行程序后,开始生成数据,等待执行完成之后,可以在ck中使用如下代码查看。

clickhouse-client -m

-m 参数代表可以使用回车。

SELECT
    cur_date,
    org_id,
    org_name,
    city_id,
    city_name,
    province_id,
    province_name,
    argMaxMerge(sort_count) AS sort_count
FROM dws_bound_org_sort_day
GROUP BY
    cur_date,
    org_id,
    org_name,
    city_id,
    city_name,
    province_id,
    province_name
LIMIT 10;

在这里插入图片描述


总结

至此,Dws的部分搭建就结束了,为了方便进行文件管理,我把项目开源到了github上。
项目地址:https://github.com/lcc-666/tms-parent

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/291679.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

JMeter CSV 参数文件的使用方法

.在 JMeter 测试中&#xff0c;参数化是非常重要的&#xff0c;参数化允许我们模拟真实世界中的各种情况。本文我们将探讨如何在 JMeter 中使用 CSV 参数文件。 创建 CSV 文件 首先&#xff0c;我们需要创建一个逗号分隔的值&#xff08;CSV&#xff09;文件&#xff0c;其中…

创建企业邮箱帐户指南:常见问题与解决方法分享

专业的电子邮件地址可以帮助客户识别商务人士&#xff0c;并了解公司给他们发邮件的目的。如果你从事管理、信息技术或人力资源工作&#xff0c;你可能会负责为一个企业建立一个企业邮箱帐户。了解如何为新员工和现有员工设置电子邮件帐户可以帮助您简化公司内部的沟通。 在这篇…

python小工具之弱密码检测工具

一、引用的python模块 Crypto&#xff1a; Python中一个强大的加密模块&#xff0c;提供了许多常见的加密算法和工具。它建立在pyc.ypodome或pyc.ypto等底层加密库之上&#xff0c;为Python程序员提供了简单易用的API&#xff0c;使其可以轻松地实现各种加密功能。 commands…

牵绳遛狗你我他文明家园每一天,助力共建文明社区,基于YOLOv5开发构建公共场景下未牵绳遛狗检测识别系统

遛狗是每天要打卡的事情&#xff0c;狗狗生性活泼爱动&#xff0c;一天不遛就浑身难受&#xff0c;遛狗最重要的就是要拴绳了&#xff0c;牵紧文明绳是养犬人的必修课。外出遛狗时&#xff0c;主人手上的牵引绳更多是狗狗生命健康的一道重要屏障。每天的社区生活中&#xff0c;…

秋招复习之数据结构

目录 前言 1 数据结构分类 2 基本数据类型 3 数字编码 4 字符编码 总结 前言 秋招复习之数据结构&#xff0c;数据结构分类、基本数据类型、字符编码、数字编码等基础知识。 1 数据结构分类 数据结构分为逻辑结构和物理结构。 逻辑结构分为线性数据结构&#xff08;数组链表…

在word文档中自制代码段样式

附&#xff1a; 在word中插入高亮代码的网站 HighlightCode&#xff1a;https://highlightcode.com/ CodeInWord&#xff1a;http://codeinword.com/ 一、新建代码段样式 点击下拉按钮&#xff0c;选择创建样式&#xff0c;命名为代码段&#xff0c;然后点击修改 点击格式&a…

CSS transition详解

文章目录 属性transition-propertytransition-durationtransition-timing-functiontransition-delaytransition 简写属性 方法Element&#xff1a;transitionrun 事件Element&#xff1a;transitionstart 事件Element&#xff1a;transitionend 事件Element&#xff1a;transit…

数据表示和进制转换

输入计算机的数字、字符、符号等信息必须转换成0、1组合的数据形式才能被计算机接收、存储并进行运算。能够进行算术运算并且得到明确的数值的数据概念的信息叫数值数据&#xff0c;其余的信息成为非数值数据。 权&#xff1a;每位数的数值。 基数&#xff1a;指该进位制中允…

【C初阶——指针2】鹏哥C语言系列文章,基本语法知识全面讲解——指针(2)

崩刃的剑&#xff0c;依旧致命&#xff0c;锈蚀的盾&#xff0c;屹立如初&#xff08;王者荣耀李信&#xff09; 本文由睡觉待开机原创&#xff0c;转载请注明出处。 本内容在csdn网站首发 欢迎各位点赞—评论—收藏 如果存在不足之处请评论留言&#xff0c;共同进步&#xff0…

Elasticsearch:结合 ELSER 和 BM25 文本查询的相关搜索

Elastic Learned Spare EncodeR (ELSER) 允许你执行语义搜索以获得更相关的搜索结果。 然而&#xff0c;有时&#xff0c;将语义搜索结果与常规关键字搜索结果相结合以获得最佳结果会更有用。 问题是&#xff0c;如何结合文本和语义搜索结果&#xff1f; 首先&#xff0c;让我…

揭秘Linux软链接:如何轻松创建、删除和修改

揭秘Linux软链接&#xff1a;如何轻松创建、删除和修改 一、简介二、创建软链接三、删除软链接四、修改软链接五、Linux软链接的高级用法六、总结 一、简介 在Linux中&#xff0c;软链接&#xff08;Symbolic Link&#xff09;是一种特殊的文件类型&#xff0c;它是一个指向另…

初识对抗生成网络(GAN)

在研究语义通信的时候&#xff0c;发现解码端很多都是用GAN或基于GAN来完成的。带着对GAN的好奇&#xff0c;对GAN进行了一个初步学习。这篇文章介绍一下和GAN相关的一些常识吧~   本文围绕以下几个内容展开&#xff1a;     1.什么是GAN&#xff1f;     2.为什么要…

03-微服务-Ribbon负载均衡

Ribbon负载均衡 1.1.负载均衡原理 SpringCloud底层其实是利用了一个名为Ribbon的组件&#xff0c;来实现负载均衡功能的。 那么我们发出的请求明明是http://userservice/user/1&#xff0c;怎么变成了http://localhost:8081的呢&#xff1f; 1.2.源码跟踪 为什么我们只输入…

FOURANDSIX:2.01

靶场下载 FourAndSix: 2.01 ~ VulnHub 信息收集 # nmap -sn 192.168.1.0/24 -oN live.nmap Starting Nmap 7.94 ( https://nmap.org ) at 2024-01-02 10:42 CST Nmap scan report for 192.168.1.1 Host is up (0.00030s latency). MAC Address: 00…

JavaScript 基础三part1.数组

JavaScript 基础三part1.数组 2.1 数组是什么2.2 数组的基本使用&#xff08;1&#xff09;声明&#xff08;2&#xff09;取值&#xff08;3&#xff09;一些术语&#xff08;4&#xff09;遍历数组 2.3 操作数组&#xff08;1&#xff09;修改&#xff08;2&#xff09;新增&…

SpringBoot日志打印Logback详解【子节点详解】【附案例】

笑小枫的专属目录 1. 背景2. 什么是Logback3. SpringBoot使用logback介绍4. 自定义logback配置5. 如何把日志同步到ES中6. logback配置属性详解根节点< configuration>子节点:< property>子节点:< appender>filetargetappendprudentlayout和encoderlayout和e…

Spring的bean的生命周期!!!

一.单例模式 单例&#xff1a;[启动容器]--->通过构造方法&#xff08;创建对象&#xff09;---->调用set方法&#xff08;注入&#xff09;--->调用init方法&#xff08;初始化&#xff09;----[容器关闭]----->调用destroy方法&#xff08;销毁&#xff09; app…

任务需求分析中的流程图、用例图、er图、类图、时序图线段、图形的作用意义

任务需求分析中的流程图、用例图、er图、类图、时序图线段、图形的作用意义 流程图 流程图中各种图形的含义及用法解析 连接线符号 连接各要素&#xff0c;表示流程的顺序或过程的方向。 批注符号 批注或说明&#xff0c;也可以做条件叙述。 子流程 流程中一部分图形的逻辑…

SpringMVC通用后台管理系统源码

SpringMVC通用后台管理系统源码 整体的SSM后台管理框架功能已经初具雏形&#xff0c;前端界面风格采用了结构简单、 性能优良、页面美观大的Layui页面展示框架 数据库支持了SQLserver,只需修改配置文件即可实现数据库之间的转换。 系统工具中加入了定时任务管理和cron生成器&am…

代码随想录27期|Pthon|Day31|贪心算法|理论基础|455.分发饼干|376. 摆动序列|53. 最大子序和

理论基础 首先&#xff0c;贪心算法基本靠“做题感觉”&#xff0c;所以没有规范的总结和做题技巧&#xff0c;只能说见到过之后还能想起来。 一般情况可以看成是对于一个大的问题的子问题的局部最优的求解&#xff0c;然后可以推导出全局的最优。 这个过程没有证明&#xf…