Flink实时电商数仓之DWS层

需求分析

  • 关键词
    在这里插入图片描述
  • 统计关键词出现的频率

IK分词

进行分词需要引入IK分词器,使用它时需要引入相关的依赖。它能够将搜索的关键字按照日常的使用习惯进行拆分。比如将苹果iphone 手机,拆分为苹果,iphone, 手机。

<dependency>
    <groupId>org.apache.doris</groupId>
    <artifactId>flink-doris-connector-1.17</artifactId>
</dependency>

<dependency>
    <groupId>com.janeluo</groupId>
    <artifactId>ikanalyzer</artifactId>
</dependency>

测试代码如下:

public class IkUtil {
    public static void main(String[] args) throws IOException {
        String s = "Apple 苹果15 5G手机";
        StringReader stringReader = new StringReader(s);

        IKSegmenter ikSegmenter = new IKSegmenter(stringReader, true);//第二个参数表示是否再对拆分后的单词再进行拆分,true时表示不在继续拆分

        Lexeme next = ikSegmenter.next();
        while (next!= null) {
            System.out.println(next.getLexemeText());
            next = ikSegmenter.next();
        }
    }
}

整体流程

  1. 创建自定义分词工具类IKUtil,IK是一个分词工具依赖
  2. 创建自定义函数类
  3. 注册函数
  4. 消费kafka DWD页面主题数据并设置水位线
  5. 从主流中过滤搜索行为
    • page[‘item’] is not null
    • item_type : “keyword”
    • last_page_id: “search”
  6. 使用分词函数对keyword进行拆分
  7. 对keyword进行分组开窗聚合
  8. 写出到doris
    • 创建doris sink
    • flink需要打开检查点才能将数据写出到doris

在这里插入图片描述

具体实现

import com.atguigu.gmall.realtime.common.base.BaseSQLApp;
import com.atguigu.gmall.realtime.common.constant.Constant;
import com.atguigu.gmall.realtime.common.util.SQLUtil;
import com.atguigu.gmall.realtime.dws.function.KwSplit;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;

/**
 * title:
 *
 * @Author 浪拍岸
 * @Create 28/12/2023 上午11:06
 * @Version 1.0
 */
public class DwsTrafficSourceKeywordPageViewWindow extends BaseSQLApp {
    public static void main(String[] args) {
        new DwsTrafficSourceKeywordPageViewWindow().start(10021,4,"dws_traffic_source_keyword_page_view_window");
    }
    @Override
    public void handle(StreamExecutionEnvironment env, TableEnvironment tableEnv, String groupId) {
        //1. 读取主流dwd页面主题数据
        tableEnv.executeSql("create table page_info(\n" +
                "    `common` map<string,string>,\n" +
                "    `page` map<string,string>,\n" +
                "    `ts` bigint,\n" +
                "    `row_time` as to_timestamp_ltz(ts,3),\n" +
                "     WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND\n" +
                ")" + SQLUtil.getKafkaSourceSQL(Constant.TOPIC_DWD_TRAFFIC_PAGE, groupId));

        //测试是否获取到数据
        //tableEnv.executeSql("select * from page_info").print();

        //2. 筛选出关键字keywords
        Table keywrodTable = tableEnv.sqlQuery("select\n" +
                "    page['item'] keywords,\n" +
                "    `row_time`,\n" +
                "    ts\n" +
                " from page_info\n" +
                " where page['last_page_id'] = 'search'\n" +
                " and page['item_type'] = 'keyword'\n" +
                " and page['item'] is not null");
        tableEnv.createTemporaryView("keywords_table", keywrodTable);

        // 测试是否获取到数据
        //tableEnv.executeSql("select * from keywords_table").print();

        //3. 自定义分词函数并注册
        tableEnv.createTemporarySystemFunction("kwSplit", KwSplit.class );

        //4. 调用分词函数对keywords进行拆分
        Table splitKwTable = tableEnv.sqlQuery("select keywords, keyword, `row_time`" +
                " from keywords_table" +
                " left join lateral Table(kwSplit(keywords)) on true");
        tableEnv.createTemporaryView("split_kw_table", splitKwTable);

        //tableEnv.executeSql("select * from split_kw_table").print();

        //5. 对keyword进行分组开窗聚合
        Table windowAggTable = tableEnv.sqlQuery("select\n" +
                "    keyword,\n" +
                "    cast(tumble_start(row_time,interval '10' second ) as string) wStart,\n" +
                "    cast(tumble_end(row_time,interval '10' second ) as string) wEnd,\n" +
                "    cast(current_date as string)  cur_date,\n" +
                "    count(*) keyword_count\n" +
                "from split_kw_table\n" +
                "group by tumble(row_time, interval '10' second), keyword");

        //tableEnv.createTemporaryView("result_table",table);
        //tableEnv.executeSql("select keyword,keyword_count+1 from result_table").print();

        //6. 写出到doris
        tableEnv.executeSql("create table doris_sink\n" +
                "(\n" +
                "    keyword                STRING,\n" +
                "    wStart                 STRING,\n" +
                "    wEnd                   STRING,\n" +
                "    cur_date               STRING,\n" +
                "    keyword_count          BIGINT\n" +
                ")" + SQLUtil.getDorisSinkSQL(Constant.DWS_TRAFFIC_SOURCE_KEYWORD_PAGE_VIEW_WINDOW));

        windowAggTable.insertInto("doris_sink").execute();
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/283151.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

createElement, parentNode, removeChild, appendChild

1获取父节点 var childNode document.getElementById("child"); var parentNode childNode.parentNode; // 获取父节点利用dom获取元素要嵌套 引出&#xff1a;利用父子兄节点关系获取元素 标签&#xff0c;元素&#xff0c;元素节点空格&#xff0c;文本节点属性…

32个关于FPGA的学习网站

语言类学习网站 1、HDLbits 网站地址&#xff1a;https://hdlbits.01xz.net/wiki/Main_Page 在线作答、编译的学习Verilog的网站&#xff0c;题目很多&#xff0c;内容丰富。非常适合Verilog初学者&#xff01;&#xff01;&#xff01; 2、牛客网 网站地址&#xff1a;https:…

简述Redis备份策略以及对应的实现机制

引言 Redis作为高性能的内存数据库&#xff0c;数据的安全性至关重要。一旦数据丢失&#xff0c;可能会对业务造成重大影响。因此&#xff0c;备份Redis数据是每个Redis使用者都必须考虑的问题。本文将介绍Redis的备份策略以及对应的实现机制。 一、备份策略 1.1 定期备份 …

【华为OD机试真题2023CD卷 JAVAJS】测试用例执行计划

华为OD2023(C&D卷)机试题库全覆盖,刷题指南点这里 测试用例执行计划 时间限制:1s 空间限制:256MB 限定语言:不限 题目描述: 某个产品当前迭代周期内有N个特性()需要进行覆盖测试,每个特性都被评估了对应的优先级,特性使用其ID作为下标进行标识。 设计了M个测试用…

FileZilla的使用以及主动模式跟被动模式

目录 FileZilla的安装 下载路径 安装 FileZilla的基本使用 添加组 添加用户 设置权限 测试 开始测试 FileZilla的主动模式及被动模式 主动模式&#xff08;Active Mode&#xff09; 被动模式&#xff08;Passive Mode&#xff09; 思维导图 ​编辑 FileZilla的安装 下载…

【基础】【Python网络爬虫】【3.chrome 开发者工具】(详细笔记)

Python网络爬虫基础 chrome 开发者工具元素面板&#xff08;Elements)控制台面板&#xff08;Console&#xff09;资源面板&#xff08;Source&#xff09;网络面板&#xff08;Network&#xff09;工具栏Requests Table详情 chrome 开发者工具 ​ 当我们爬取不同的网站是&…

以太网转RS485通讯类库封装

最近选用有人科技的以太网转RS485模块做项目&#xff0c;设备真漂亮&#xff0c;国货之光。调通了通讯的代码&#xff0c;发到网上供大家参考&#xff0c;多多交流。 以下分别是配套的头文件与源文件&#xff1a; /*******************************************************…

软件测试/测试开发丨Pytest测试用例生命周期管理-Fixture

1、Fixture 用法 Fixture 特点及优势 1&#xff64;命令灵活&#xff1a;对于 setup,teardown,可以不起这两个名字2&#xff64;数据共享&#xff1a;在 conftest.py 配置⾥写⽅法可以实现数据共享&#xff0c;不需要 import 导⼊。可以跨⽂件共享3&#xff64;scope 的层次及…

扫雷(c语言)

先开一个test.c文件用来游戏的逻辑测试&#xff0c;在分别开一个game.c文件和game.h头文件用来实现游戏的逻辑 主要步骤&#xff1a; 游戏规则&#xff1a; 输入1&#xff08;0&#xff09;开始&#xff08;结束&#xff09;游戏&#xff0c;输入一个坐标&#xff0c;如果该坐…

MFC消息机制详细剖析

易语言程序的破解99%的时候都需要用到FF55FC5F5E这个特征码 新建一个MFC应用程序&#xff1a; 去编辑MFC的.rc资源文件来DIY窗体 静态编译的&#xff0c;把很多静态库的代码都添加进去了 &#xff0c;所以速度很慢 消息机制针对的是GUI程序&#xff08;比如窗口程序&#xf…

有道翻译web端 爬虫, js

以下内容写于2023-12-28, 原链接为:https://fanyi.youdao.com/index.html#/ 1 在输入框内输入hello world进行翻译,通过检查发出的网络请求可以看到翻译文字的http接口应该是: 2 复制下链接最后的路径,去js文件中搜索下: 可以看到这里是定义了一个函数B来做文字的翻译接口函数…

快速部署supervisord详解

Supervisor是一个用于监控和管理进程的工具。它可以在Unix-like系统中启动、停止、重启和管理后台进程&#xff0c;确保这些进程始终保持运行状态。 yum check-update 更新yum软件包索引 yum install epel-release -y 下载eprl源 yum install supervisor -y 直接yu…

给多行文本的每行添加指定的前缀textwrap.indent()

【小白从小学Python、C、Java】 【计算机等考500强证书考研】 【Python-数据分析】 给多行文本的每行添加指定的前缀 textwrap.indent() [太阳]选择题 请问以下代码输出的第一行结果是&#xff1f; import textwrap text welcome to China! print("【显示】text\n&quo…

WEB 3D技术 three.js通过 GLTFLoader 导入并应用 gltf/glb 3D资源

上文 WEB 3D技术 three.js 雾 基础使用讲解我们讲了雾的基本使用方法 但是 如果我们要做一个树林 一颗一颗树去加 那真的是要累死了 我们一定是在建模软件上 建模好这样的模型 然后将模型导入到场景中 官网中搜索 GLTFLoader 在我们日常WEB开发中 用的最多的3D格式 就是 GLTF…

Linux---进程控制

一、进程创建 fork函数 在Linux中fork函数是非常重要的函数&#xff0c;它从已存在进程中创建一个新进程&#xff0c;原进程为父进程 fork函数的功能&#xff1a; 分配新的内存和内核数据结构给子进程将父进程部分数据结构内容拷贝至子进程添加子进程到系统的进程列表中fork返…

B+树的插入删除

操作 插入 case2的原理,非叶子节点永远和最右边的最左边的节点的值相等。 case3:的基本原理 非叶子节点都是索引节点 底层的数据分裂之后 相当于向上方插入一个新的索引(你可以认为非叶子节点都是索引),反正第二层插入160 都要分裂,然后也需要再插入(因为索引部分不需要重…

x-cmd pkg | openssl - 密码学开源工具集

目录 简介首次用户技术特点竞品分析进一步阅读 简介 OpenSSL 是一个开源的密码库和 SSL/TLS 协议实现&#xff0c;它提供了一组密码学工具和加密功能&#xff0c;用于保护数据通信的安全性。项目发展历史可以追溯到 1998 年&#xff0c;源自 Eric A. Young 和 Tim J. Hudson 开…

【后端】Docker学习笔记

文章目录 Docker一、Docker安装&#xff08;Linux&#xff09;二、Docker概念三、Docker常用命令四、数据卷五、自定义镜像六、网络七、DockerCompose Docker Docker是一个开源平台&#xff0c;主要基于Go语言构建&#xff0c;它使开发者能够将应用程序及其依赖项打包到一个轻…

java零拷贝zero copy MappedByteBuffer

目录 调用操作系统的 mmap 未使用 mmap 的文件通过网络传输的过程 使用 mmap 的文件通过网络传输的过程 使用例子 调用操作系统的 sendfile() 在 java 中的具体实现 mmap的优劣 mmap 的不足 mmap 的优点 mmap 的使用场景 对于零拷贝&#xff08;zero copy&#xff09…

LVGL 在framebuffer设备上的移植

LVGL 在framebuffer设备上的移植 ItemDescDate2023-12-31Authorhongxi.zhuplatformNXP I.MX6ULLLCDSPI TFTLCD NV3030B 文章目录 LVGL 在framebuffer设备上的移植一、LVGL源码获取二、源码修改适配三、编译&运行补充 一、LVGL源码获取 新建lvgl_imx6ull文件夹&#xff0c…