计算机毕设 flink大数据淘宝用户行为数据实时分析与可视化

文章目录

  • 0 前言
  • 1、环境准备
    • 1.1 flink 下载相关 jar 包
    • 1.2 生成 kafka 数据
    • 1.3 开发前的三个小 tip
  • 2、flink-sql 客户端编写运行 sql
    • 2.1 创建 kafka 数据源表
    • 2.2 指标统计:每小时成交量
      • 2.2.1 创建 es 结果表, 存放每小时的成交量
      • 2.2.2 执行 sql ,统计每小时的成交量
    • 2.3 指标统计:每10分钟累计独立用户数
      • 2.3.1 创建 es 结果表,存放每10分钟累计独立用户数
      • 2.3.2 创建视图
      • 2.3.3 执行 sql ,统计每10分钟的累计独立用户数
    • 2.4 指标统计:商品类目销量排行
      • 2.4.1 创建商品类目维表
      • 2.4.1 创建 es 结果表,存放商品类目排行表
      • 2.4.2 创建视图
      • 2.4.3 执行 sql , 统计商品类目销量排行
  • 3、最终效果与体验心得
    • 3.1 最终效果
    • 3.2 体验心得
      • 3.2.1 执行
      • 3.2.2 存储
  • 4 最后


0 前言

🔥 这两年开始毕业设计和毕业答辩的要求和难度不断提升,传统的毕设题目缺少创新和亮点,往往达不到毕业答辩的要求,这两年不断有学弟学妹告诉学长自己做的项目系统达不到老师的要求。

为了大家能够顺利以及最少的精力通过毕设,学长分享优质毕业设计项目,今天要分享的是

🚩 flink大数据淘宝用户行为数据实时分析与可视化

🥇学长这里给一个题目综合评分(每项满分5分)

  • 难度系数:3分
  • 工作量:3分
  • 创新点:4分

1、环境准备

1.1 flink 下载相关 jar 包

flink-sql 连接外部系统时,需要依赖特定的 jar 包,所以需要事先把这些 jar 包准备好。说明与下载入口

本项目使用到了以下的 jar 包 ,下载后直接放在了 flink/lib 里面。

需要注意的是 flink-sql 执行时,是转化为 flink-job 提交到集群执行的,所以 flink 集群的每一台机器都要添加以下的 jar 包。

外部版本jar
kafka4.1flink-sql-connector-kafka_2.11-1.10.2.jar
flink-json-1.10.2-sql-jar.jar
elasticsearch7.6flink-sql-connector-elasticsearch7_2.11-1.10.2.jar
mysql5.7flink-jdbc_2.11-1.10.2.jar
mysql-connector-java-8.0.11.jar

1.2 生成 kafka 数据

用户行为数据来源: 阿里云天池公开数据集

网盘:https://pan.baidu.com/s/1wDVQpRV7giIlLJJgRZAInQ 提取码:gja5

商品类目纬度数据来源: category.sql

数据生成器:datagen.py

有了数据文件之后,使用 python 读取文件数据,然后并发写入到 kafka。

修改生成器中的 kafka 地址配置,然后运行 以下命令,开始不断往 kafka 写数据

# 5000 并发
nohup python3 datagen.py 5000 &                  

1.3 开发前的三个小 tip

  • 生成器往 kafka 写数据,会自动创建主题,无需事先创建

  • flink 往 elasticsearch 写数据,会自动创建索引,无需事先创建

  • Kibana 使用索引模式从 Elasticsearch 索引中检索数据,以实现诸如可视化等功能。

使用的逻辑为:创建索引模式 》Discover (发现) 查看索引数据 》visualize(可视化)创建可视化图表》dashboards(仪表板)创建大屏,即汇总多个可视化的图表

2、flink-sql 客户端编写运行 sql

# 进入 flink-sql 客户端, 需要指定刚刚下载的 jar 包目录
./bin/sql-client.sh embedded -l lib

2.1 创建 kafka 数据源表

-- 创建 kafka 表, 读取 kafka 数据
CREATE TABLE user_behavior (
    user_id BIGINT,
    item_id BIGINT,
    category_id BIGINT,
    behavior STRING,
    ts TIMESTAMP(3),
    proctime as PROCTIME(),
    WATERMARK FOR ts as ts - INTERVAL '5' SECOND  
) WITH (
    'connector.type' = 'kafka', 
    'connector.version' = 'universal',  
    'connector.topic' = 'user_behavior',  
    'connector.startup-mode' = 'earliest-offset', 
    'connector.properties.zookeeper.connect' = '172.16.122.24:2181', 
    'connector.properties.bootstrap.servers' = '172.16.122.17:9092', 
    'format.type' = 'json'  
);
SELECT * FROM user_behavior;

2.2 指标统计:每小时成交量

2.2.1 创建 es 结果表, 存放每小时的成交量

CREATE TABLE buy_cnt_per_hour (
    hour_of_day BIGINT,
    buy_cnt BIGINT
) WITH (
    'connector.type' = 'elasticsearch', 
    'connector.version' = '7',  
    'connector.hosts' = 'http://172.16.122.13:9200',  
    'connector.index' = 'buy_cnt_per_hour',
    'connector.document-type' = 'user_behavior',
    'connector.bulk-flush.max-actions' = '1',
    'update-mode' = 'append',
    'format.type' = 'json'
);

2.2.2 执行 sql ,统计每小时的成交量

INSERT INTO buy_cnt_per_hour
SELECT HOUR(TUMBLE_START(ts, INTERVAL '1' HOUR)), COUNT(*)
FROM user_behavior
WHERE behavior = 'buy'
GROUP BY TUMBLE(ts, INTERVAL '1' HOUR);

2.3 指标统计:每10分钟累计独立用户数

2.3.1 创建 es 结果表,存放每10分钟累计独立用户数

CREATE TABLE cumulative_uv (
    time_str STRING,
    uv BIGINT
) WITH (
    'connector.type' = 'elasticsearch', 
    'connector.version' = '7',  
    'connector.hosts' = 'http://172.16.122.13:9200',  
    'connector.index' = 'cumulative_uv',
    'connector.document-type' = 'user_behavior',    
    'update-mode' = 'upsert',
    'format.type' = 'json'
);

2.3.2 创建视图

CREATE VIEW uv_per_10min AS
SELECT
  MAX(SUBSTR(DATE_FORMAT(ts, 'HH:mm'),1,4) || '0') OVER w AS time_str,
  COUNT(DISTINCT user_id) OVER w AS uv
FROM user_behavior
WINDOW w AS (ORDER BY proctime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW);

2.3.3 执行 sql ,统计每10分钟的累计独立用户数

INSERT INTO cumulative_uv
SELECT time_str, MAX(uv)
FROM uv_per_10min
GROUP BY time_str;

2.4 指标统计:商品类目销量排行

2.4.1 创建商品类目维表

先在 mysql 创建一张商品类目的维表,然后配置 flink 读取 mysql。

CREATE TABLE category_dim (
    sub_category_id BIGINT,
    parent_category_name STRING
) WITH (
    'connector.type' = 'jdbc',
    'connector.url' = 'jdbc:mysql://172.16.122.25:3306/flink',
    'connector.table' = 'category',
    'connector.driver' = 'com.mysql.jdbc.Driver',
    'connector.username' = 'root',
    'connector.password' = 'root',
    'connector.lookup.cache.max-rows' = '5000',
    'connector.lookup.cache.ttl' = '10min'
);

2.4.1 创建 es 结果表,存放商品类目排行表

CREATE TABLE top_category  (
    category_name  STRING,
    buy_cnt  BIGINT
) WITH (
    'connector.type' = 'elasticsearch', 
    'connector.version' = '7',  
    'connector.hosts' = 'http://172.16.122.13:9200',  
    'connector.index' = 'top_category',
    'connector.document-type' = 'user_behavior',
    'update-mode' = 'upsert',
    'format.type' = 'json'
);

2.4.2 创建视图

CREATE VIEW rich_user_behavior AS
SELECT U.user_id, U.item_id, U.behavior, C.parent_category_name as category_name
FROM user_behavior AS U LEFT JOIN category_dim FOR SYSTEM_TIME AS OF U.proctime AS C
ON U.category_id = C.sub_category_id;

2.4.3 执行 sql , 统计商品类目销量排行

INSERT INTO top_category
SELECT category_name, COUNT(*) buy_cnt
FROM rich_user_behavior
WHERE behavior = 'buy'
GROUP BY category_name;

3、最终效果与体验心得

3.1 最终效果

整个开发过程,只用到了 flink-sql ,无需写 java 或者其它代码,就完成了这样一个实时报表。

image-20201201175438743

3.2 体验心得

3.2.1 执行

  • flink-sql 的 ddl 语句不会触发 flink-job , 同时创建的表、视图仅在会话级别有效。

  • 对于连接表的 insert、select 等操作,则会触发相应的流 job, 并自动提交到 flink 集群,无限地运行下去,直到主动取消或者 job 报错。

  • flink-sql 客户端关闭后,对于已经提交到 flink 集群的 job 不会有任何影响。

本次开发,执行了 3 个 insert , 因此打开 flink 集群面板,可以看到有 3 个无限的流 job 。即使 kafka 数据全部写入完毕,关闭 flink-sql 客户端,这个 3 个 job 都不会停止。
image-20201201175523916

3.2.2 存储

  • flnik 本身不存储业务数据,只作为流批一体的引擎存在,所以主要的用法为读取外部系统的数据,处理后,再写到外部系统。

  • flink 本身的元数据,包括表、函数等,默认情况下只是存放在内存里面,所以仅会话级别有效。但是,似乎可以存储到 Hive Metastore 中,关于这一点就留到以后再实践。

4 最后

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/103228.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

RetentionPolicy枚举类

包名package java.lang.annotation 作用 注释保留策略。此枚举类型的常量描述用于保留注释的各种策略。它们被使用与{ Retention}元注释类型一起指定注释要保留多长时间。 属性 SOURCE编译器将丢弃注释。CLASS注释将由编译器记录在类文件…

Docker网络与资源控制

Docker网络与资源控制 一、Docker网络模式1.1、Docker网络实现原理1.2、Docker的网络模式1.2.1、host模式1.2.2、Container模式1.2.3、None模式1.2.4、 Bridge模式1.2.5、自定义网络 二、资源控制2.1、CPU 资源控制2.1.1、设置CPU使用率上限2.1.2、进行CPU压力测试2.1.3、设置C…

vue首页多模块布局(标题布局)

<template><div class"box"><div class"content"><div class"box1" style"background-color: rgb(245,23,156)">第一个</div><div class"box2" style"background-color: rgb(12,233,…

霸王条款惹品牌争议,京东双11站在商家对立面?

作者 | 江北 来源 | 洞见新研社 双11活动第一天&#xff0c;京东就站上了风口浪尖。 与烘焙烤箱品牌海氏的话题接连登上微博热搜&#xff0c;海氏控诉京东滥用市场竞争地位&#xff0c;破坏市场竞争秩序。在海氏的声明中&#xff0c;京东的行为让吃瓜群众大开眼界&#xff1a…

智能井盖监测系统功能,万宾科技传感器效果

智能井盖传感器的出现是高科技产品的更新换代&#xff0c;同时也是智慧城市建设中的需求。在智慧城市建设过程之中&#xff0c;高科技产品的应用数不胜数&#xff0c;智能井盖传感器的出现&#xff0c;解决了城市道路安全保护着城市地下生命线&#xff0c;改善着传统井盖带来的…

Hive安装配置笔记

版本说明 hadoop-3.3.6&#xff08;已安装&#xff09; mysql-8&#xff08;已安装&#xff09; hive-3.1.3 将hive解压到对应目录后做如下配置&#xff1a; 基本配置与操作 1、hive-site <configuration><!-- jdbc连接的URL --><property><name>ja…

微信小程序菜单导航选中自动居中

菜单导航选中自动居中 示例库 代码片段

Android Studio模拟器/虚拟设备连接互联网的方法

如图&#xff0c;无线、网络都无法联网 找到本机的DNS 找到emu-launch-params.txt&#xff0c;添加DNS -dns-server 192.168.124.1 重启虚拟机&#xff0c;关闭无线

智慧公厕:打造更美好的城市生活环境

在信息技术迅猛发展的今天&#xff0c;智慧公厕作为一种创新的城市管理模式&#xff0c;正逐渐受到人们的关注。以物联网、大数据、人工智能为基础&#xff0c;智慧公厕正逐步改变传统公厕的面貌&#xff0c;为城市居民提供更便捷、舒适的公共服务。本文将以智慧公厕源头厂家广…

机械设计制造,设计行业图纸透明加密保护。防止内部终端核心文件数据、资料外泄

当下互联网时代&#xff0c;许多设计单位的设计图纸都是以电子文件的形式存在于终端电脑和服务器上。在图纸的设计生产过程中&#xff0c;必定会经过多个部门人员之手&#xff0c;此过程中就隐藏着巨大的风险。所以&#xff0c;设计单位需要使用专业的图纸加密软件来保护内部图…

【软考系统架构设计师】2021年系统架构师综合知识真题及解析

本文主要分享2021年下半年系统架构师综合知识历年真题以及本人在做题时的所思所想。题目序号有点混乱&#xff0c;可忽略 【01】.某计算机系统页面大小为4K&#xff0c;进程P1的页面变换表如下图所示&#xff0c;看P1要访问数据的逻辑地址为十六进制1B1AH,那么该逻辑地址经过变…

DDOS直接攻击系统资源

DDOS ——直接攻击系统资源 思路&#xff1a; 攻击机利用三次握手机制&#xff0c;产生大量半连接&#xff0c;挤占受害者系统资源&#xff0c;使其无法正常提供服务。 1、先体验下受害者的正常网速。在受害者主机上执行以下命令 (1)开启Apache。 systemctl start apache2 (2…

【QT】其他常用控件2

新建项目 lineEdit 什么都不显示&#xff08;linux password&#xff09; password textEdit和plainTextEdit spinBox和doubleSpinBox timeEdit、dateEdit、dateTimeEdit label 显示图案&#xff0c;导入资源&#xff1a;【QT】资源文件导入_复制其他项目中的文件到qt项目中_St…

rabbitmq-3.8.15集群、集群镜像模式安装部署

目录 一、环境 1、映射、域名、三墙 2、Erlang和socat安装&#xff08;三台服务器都实行&#xff09; 二、部署三台rabbitmq-3.8.15实例 1、rabbitmq官网下载地址 &#xff1a; 2、解压rabbitmq 3、添加系统变量 4、启动web插件、启动rabbitmq 5、在rabbitmq1上添加用…

HackTheBox---Starting Point-- Tier 0---Meow

文章目录 一 题目二 实验过程 一 题目 Tags Telnet、Network、Protocols、Reconnaissance、Weak Credentials、Misconfiguration译文&#xff1a;标签、远程登录、网络、协议、侦察、弱凭证、配置错误Connect To attack the target machine, you must be on the same networ…

在pycharm中创建python模板文件

File——>Setting——>File and Code Templates——>Python Scripts 在文本框中输入模板内容

RabbitMQ 笔记

一、win10安装erlang 1.1 安装erLang语言&#xff0c;配置环境变量 erLang官网地址 1.2 配置环境变量 &#xff08;1&#xff09;添加系统变量ERLANG_HOME &#xff08;2&#xff09;path路径&#xff0c;指向bin目录 1.3 配置完成后再cmd命令窗口erl -version可以查看…

Lua与C++交互

文章目录 1、Lua和C交互2、基础练习2.1、加载Lua脚本并传递参数2.2、加载脚本到stable&#xff08;包&#xff09;2.3、Lua调用c语言接口2.4、Lua实现面向对象2.5、向脚本中注册c的类 1、Lua和C交互 1、lua和c交互机制是基于一个虚拟栈&#xff0c;C和lua之间的所有数据交互都通…

【proteus】8086 写一个汇编程序并调试

参考书籍&#xff1a;微机原理与接口技术——基于8086和Proteus仿真&#xff08;第3版&#xff09;p103-105&#xff0c;p119-122. 参考程序是p70&#xff0c;例4-1 在上一篇的基础上&#xff1a; 创建项目和汇编文件 写一个汇编程序并编译 双击8086的元件图&#xff1a; …

挑战没有免费的午餐定理?南洋理工提出扩散模型增强方法FreeU

论文名称&#xff1a;FreeU: Free Lunch in Diffusion U-Net 文章链接&#xff1a;https://arxiv.org/abs/2309.11497 代码仓库&#xff1a;https://github.com/ChenyangSi/FreeU 项目主页&#xff1a;https://chenyangsi.top/FreeU 机器学习领域中一个著名的基本原理就是“没…