文章目录
- day12_调度和可视化
- 一、任务调度
- 1、开启进程
- 2、登入UI界面
- 3、配置租户
- 4、创建项目
- 5、创建工作流
- 5.1 HiveSQL部署(掌握)
- 5.2 SparkDSL部署(掌握)
- 5.3 SparkSQL部署(熟悉)
- 5.4 SeaTunnel部署(掌握)
- 5.5 任务上线
- 二、数据可视化
- 1、用户画像管理系统(了解)
- 2、使用Doris分析ElasticSearch
- 2.1 创建Catalog
- 2.2 使用示例
- 3、FineBI可视化
- 3.1 安装FineBI
- 3.2 创建数据连接
- 3.3 创建公共数据(掌握)
- 3.3.1 Doris数据集
- 3.3.2 ElasticSearch数据集
- 3.3.3 ElasticSearch和MySQL标签关联的数据集
- 3.3.4 数据集更新
- 3.4 创建分析主题
- 3.5 创建各个组件
- 3.6 创建仪表盘
- 3.7 图表发布
day12_调度和可视化
一、任务调度
1、开启进程
按顺序启动如下服务。注意:如果已经启动了,不用再次启动,其他的例如ElasticSearch、Doris、Hive等不要停。
- 启动Zookeeper
nohup /export/server/kafka/bin/zookeeper-server-start.sh ../config/zookeeper.sql > /dev/null 2>&1 &
注意: 这是一条命令
- 启动Hadoop
start-all.sh
- 启动Hive
启动metastore服务
nohup /export/server/hive/bin/hive --service metastore > /tmp/hive-metastore.log 2>&1 &
启动Hiveserver2服务
nohup /export/server/hive/bin/hive --service hiveserver2 > /tmp/hive-hiveserver2.log 2>&1 &
- 启动DS海豚调度器
/export/server/dolphinscheduler/bin/start-all.sh
- 如需写出到Doris,则还需要启动Doris;如需写出到ES,则还需启动ES
启动Doris
/export/server/doris/fe/bin/start_fe.sh --daemon
/export/server/doris/be/bin/start_be.sh --daemon
/export/server/doris/apache_hdfs_broker/bin/start_broker.sh --daemon
----------------------------------------------------------
启动ES
1- 切换用户
su es
2- 进入目录
cd /home/es/elasticsearch-7.10.2/bin
3- 启动
elasticsearch -d
4- 切回为root
exit
5- 验证
jps
----------------------------------------------------------
启动Kafka
cd /export/server/kafka/bin
nohup ./kafka-server-start.sh ../config/server.sql 2>&1 &
2、登入UI界面
http://192.168.88.166:12345/dolphinscheduler/ui/login
账号:admin
密码:dolphinscheduler123
3、配置租户
租户对应的是 Linux 的用户,用于 worker 提交作业所使用的用户。注意一定要是root。
然后设置admin的租户为root。
4、创建项目
项目管理->创建项目->输入名称即可
5、创建工作流
5.1 HiveSQL部署(掌握)
演示用的HiveSQL语句
insert overwrite table dwm.dwm_mem_first_buy_i partition(dt)
select
t.zt_id,
t.trade_date_time,
t.trade_date,
t.week_trade_date,
t.month_trade_date,
t.store_no,
t.sale_amount,
t.order_no,
t.source_type,
'2025-03-01' as dt
from
(select
zt_id,
create_time as trade_date_time,
trade_date,
week_trade_date,
month_trade_date,
store_no,
real_paid_amount as sale_amount,
order_no,
source_type,
row_number() over(partition by zt_id order by create_time) as rn
from dwm.dwm_sell_o2o_order_i
where dt<'${inputdate}' and member_type = 1) t
left join dwm.dwm_mem_first_buy_i f
on t.zt_id=f.zt_id and f.dt < '${inputdate}'
where t.rn=1
and f.zt_id is null
配置过程如下:
-
创建数据源:点击数据源中心,然后点击创建数据源
-
填写数据源信息
数据源:选择hive/impala
数据源名称: hive
主机名:up01
端口:10000
用户名:root
数据库名:dwm
配置完成后点击测试连接,没问题后点击提交。
- 创建工作流:点击工作流定义,然后点击创建工作流。
- 拖拽一个SQL类型到面板,然后填写节点设置。
节点名称为表名
设置失败重试次数为3次
设置延时告警
设置数据源类型为HIVE,数据源实例选择hive
SQL类型选择非查询
SQL语句填写计算的语句,注意不要带上;
如果使用到UDF函数,可以先注册后进行选择使用
可以在任务中自定义参数,也可以在工作流中进行整体设置。
如果有前置SQL语句或后置SQL语句,可以添加在对应的地方,这里可以把set语句写在前置SQL中
set hive.exec.dynamic.partition.mode=nonstrict;
在工作中,需要配置前置任务,这里的前置任务应该是 dwm_sell_o2o_order_i 。
- 保存工作流:点击右上角保存,填写基本信息
工作流名称填写具体的主题或需求名称
租户选择root
执行策略选择并行
全局变量设置key为inputdate,value为$[yyyy-MM-dd-12]。注意使用的是中括号。
- 测试:在工作流定义页面,选择member工作流,点击上线,然后点击运行,进行测试。
- 结果验证
DS调度前后执行如下的SQL,核对数据条数是否发生变化
select count(1) from dwm.dwm_mem_first_buy_i;
5.2 SparkDSL部署(掌握)
将程序部署到Yarn上,需要注意如下两点:
- 注意1:需要将代码中的setMaster内容改为yarn,或者直接删除。推荐直接删除
- 注意2:在程序代码中与文件路径有关的地方,需要全部都改为分布式文件系统,例如:HDFS。否则会出现找不到文件的异常
本次项目中,也就是需要修改基类中的master("local[*]"),这里推荐删除。具体代码如下:
创建工作流,以及保存工作流、测试工作流的方式都一样,最大的区别是任务的类型及配置。
-
首先需要将整体的代码打成zip包,因为计算标签的代码中使用到了相关的代码。这里只将tags文件夹打成zip包即可。压缩包的名称不要有中文。
-
将zip包上传到资源中心
- 上传标签计算代码
将标签计算的代码,也就是每个四级标签对应的py文件上传到资源中心。可以按照在PyCharm中的文件结构组织文件。
比如创建tags文件夹,在tags文件夹下创建match文件夹,在match文件夹中上传匹配类标签。
- 创建计算节点
新建一个工作流,在其中拖拽一个SPARK类型。然后填写节点设置。
选项参数如下:
--conf "spark.pyspark.driver.python=/root/anaconda3/bin/python3" \
--conf "spark.pyspark.python=/root/anaconda3/bin/python3" \
--py-files tags.zip
注意: --py-files tags.zip要改成你自己的压缩包
在资源处选择打好并上传的zip包
注意:Spark版本一定要选择SPARK1
- 保存工作流:使用相同的方法,配置其他几个标签计算的节点,配置完成后,点击保存。填写工作流,选择root租户,点击确定。
- 上线测试:在工作流定义页面,选择match工作流,点击上线,然后点击运行,进行测试。
**拓展:**也可以直接使用spark-submit命令来提交作业,假设将tags.zip上传到了hdfs的/tags路径,而age_tag.py也上传到了/tags路径,则命令如下:
cd /export/server/spark
bin/spark-submit --master yarn \
--deploy-mode client \
--driver-cores 1 --driver-memory 512M --num-executors 2 --executor-cores 2 --executor-memory 2G \
--name age_tags \
--conf "spark.pyspark.driver.python=/root/anaconda3/bin/python3" \
--conf "spark.pyspark.python=/root/anaconda3/bin/python3" \
--py-files hdfs://up01:8020/tags/tags.zip \
hdfs://up01:8020/tags/age_tag.py
5.3 SparkSQL部署(熟悉)
整体与HiveSQL部署类似。唯一区别是程序类型选择SQL,Spark版本选择SPARK1。
演示用的SQL:
insert overwrite table ads.ads_mem_user_rfm_i
select
mem.zt_id,
if(t.zt_id is null, '121', t.tags_id) as tags_id, -- 121理解为默认标签值
'2025-03-01' as dt
from dwd.dwd_mem_member_union_i mem
left join (
select
zt_id,
case when r = 1 and f = 1 and m = 1 then '114'
when r = 1 and f = 0 and m = 1 then '115'
when r = 0 and f = 1 and m = 1 then '116'
when r = 0 and f = 0 and m = 1 then '117'
when r = 1 and f = 1 and m = 0 then '118'
when r = 1 and f = 0 and m = 0 then '119'
when r = 0 and f = 1 and m = 0 then '120'
when r = 0 and f = 0 and m = 0 then '121'
end as tags_id
from (
select
zt_id,
if(min(r) over (partition by zt_id) < 7, 1, 0) as r, -- 取最小的时间差作为r,如果小于7则为1
if(f > sum_f * 1.00 / user_count, 1, 0) as f, -- 与平均值对比,如果大于平均值则为1
if(m > sum_m * 1.00 / user_count, 1, 0) as m, -- 与平均值对比,如果大于平均值则为1
row_number() over (partition by zt_id order by dt desc ) as rn -- 按照日期进行逆序排列
from (
-- 这里就算出了每个用户的R、F、M的值是多少
select
zt_id,
datediff(current_date(), '2025-01-19') as r, -- 距离当天的时间差
sum(consume_times) over (partition by zt_id) as f, -- 单个用户完单单量
sum(consume_times) over () as sum_f, -- 所有用户完单单量
sum(consume_amount) over (partition by zt_id) as m, -- 单个用户完单金额
sum(consume_amount) over () as sum_m, -- 所有用户完单金额
t2.user_count, -- 总用户数
t1.dt
from dwm.dwm_mem_member_behavior_day_i as t1
cross join (
select count(distinct zt_id) as user_count -- 总用户数
from dwm.dwm_mem_member_behavior_day_i -- 用户行为表:下单、付款、浏览、退货等各种信息
where datediff(current_date(), to_date(dt)) <= 30
) t2
where datediff(current_date(), to_date(t1.dt)) <= 30
) t3
) t4 where rn = 1 -- 只取最新的一条记录
) t on mem.zt_id = t.zt_id
where mem.start_date <= '${inputdate}' and mem.end_date > '2025-01-19'
5.4 SeaTunnel部署(掌握)
在工作流中拖拽一个SHELL节点,然后设置节点。
-
设置节点名称
-
配置重试次数,这里可以只设置1次
-
配置资源信息
-
配置超时策略,勾选超时告警和超时失败(防止因任务卡住而占用资源,如果超时会自动被杀死)
-
填写脚本,注意将日期配置成参数形式
首先配置好hive2es.config,然后在shell中填写命令。
cd /export/server/apache-seatunnel-2.3.5
./bin/seatunnel.sh --config ./config/job/hive2es.config -e local -i pt=${inputdate}
- 设置前置任务
5.5 任务上线
- 上线任务
任务上线后,就不可编辑,这样可以避免正在执行的任务被修改
- 定时
-
时间设定,DS已经可视化处理,我们直接点击相应的时间即可。
-
定时好了后我么可以点击执行时间,查看它任务模拟的执行时间是否正确,执行得到结果如下:
- 上线定时
整个流程到此就结束了。到指定的时间点,任务会被执行。
二、数据可视化
1、用户画像管理系统(了解)
画像web产品主要使用java和vue实现前后端交互和标签可视化。其中主要有3方面可视化内容:单个用户标签可视化、单个标签可视化、组合标签可视化。
-
单个用户分析
-
标签分析
- 圈人精准营销
- 使用Elasticsearch的DSL语句进行查询
案例: 【查用户】查询id为 16682885 的用户的所有标签
POST /user_profile_tags/_search
{
"query": {
"term": {
"user_id": 16682885
}
}
}
2、使用Doris分析ElasticSearch
启动Doris
/export/server/doris/fe/bin/start_fe.sh --daemon
/export/server/doris/be/bin/start_be.sh --daemon
/export/server/doris/apache_hdfs_broker/bin/start_broker.sh --daemon
2.1 创建Catalog
- 在Doris中执行如下命令:
CREATE CATALOG es sql (
"type"="es",
"hosts"="http://up01:9200",
"enable_docvalue_scan" = "true",
"enable_keyword_sniff" = "true"
);
注: 具体参数解释查看课件
- 验证集成结果
-- 查看Doris与数据源的整合情况
show catalogs;
-- Catalog名称.数据库名称.数据表名称
select * from es.default_db.user_profile_tags limit 10;
2.2 使用示例
完成在Doris中建立ES外表后,除了无法使用Doris中的数据模型(rollup、预聚合、物化视图等)外并无区别。使用示例如下
- 切换到es的catalog
switch es;
- 【查用户】查询id为 16682885 的用户的所有标签
select user_id,tags_id_times,tags_id_once,tags_id_streaming from es.default_db.user_profile_tags where user_id=16682885;
- 【查标签】查询标签为18的用户
select user_id,tags_id_times,tags_id_once,tags_id_streaming from es.default_db.user_profile_tags where tags_id_times='18' limit 10;
3、FineBI可视化
3.1 安装FineBI
-
1- 执行FineBI安装包, 将其安装到windows上。下一步安装即可。安装完以后启动
-
2- 设置管理员账户
3.2 创建数据连接
- 点击左侧的管理系统,然后点击数据连接,点击数据连接管理
- 点击新建数据连接,选择所有,点击Doris。
- 根据下图进行配置,完成后点击右上角测试链接,通过后点击保存。
- 注意一:如果要配置Doris与ElasticSearch集成后的数据库,数据库名称前面需要带Catalog名称。如下图中的红色框
- 注意二:如果FineBI不支持Doris,直接使用MySQL连接即可。端口号要改成9030
3.3 创建公共数据(掌握)
3.3.1 Doris数据集
- 点击公共数据,然后点击新建文件夹,新建xtzg文件夹。
- 点击xtzg文件夹右边的+号,选择数据库表。
-
数据连接选择Doris_DB_log_analysis_db,将右边的所有表选上,点击确定,完成数据库表数据的添加。
3.3.2 ElasticSearch数据集
因为es数据属于外部表,所以不能直接通过数据库表来配置,需要通过SQL数据集的方式来配置。
前提是先要创建es的catalog。
- 点击【添加表】,创建SQL数据集
-
填写表名为:user_profile_tags
-
填写SQL语句
select * from es.default_db.user_profile_tags
- 点击右上角预览,没有问题后,点击确定
3.3.3 ElasticSearch和MySQL标签关联的数据集
- 前提准备
在/export/server/doris/fe/lib/目录中上传mysql-connector-java-5.1.49.jar。
- 创建catalog
CREATE CATALOG jdbc_mysql sql (
"type"="jdbc",
"user"="root",
"password"="123456",
"jdbc_url" = "jdbc:mysql://up01:3306/tags_info",
"driver_url" = "file:///export/server/doris/fe/lib/mysql-connector-java-5.1.49.jar",
"driver_class" = "com.mysql.jdbc.Driver"
);
-- 集成以后的验证语句
select * from jdbc_mysql.tags_info.tbl_basic_tag limit 10;
- 有了mysql catalog后,就可以查询mysql中的数据。
- 查询某个标签下的所有用户及对应的标签名称:如查询每个用户的年龄段标签,其中年龄段标签的pid为15,对应的sql如下:
select t1.user_id,t2.name
from
(
select
user_id,
tags_id_times,
cast(tag as int) as tag
from es.default_db.user_profile_tags
lateral view explode_split(tags_id_times,',') t as tag
) t1
join jdbc_mysql.tags_info.tbl_basic_tag t2
on t1.tag=t2.id
where t2.pid=15
- 配置数据集:跟配置es的数据集相同,只是sql不同。
3.3.4 数据集更新
可以设置缓存策略,从而实现数据的自动更新。
点击数据表,然后点击缓存设置,然后点击编辑,设置缓存策略。
3.4 创建分析主题
-
点击仪表板,然后点击新建文件夹,创建xtzg文件夹。
-
点击xtzg文件夹进去,然后新建仪表板,创建小兔智购用户画像分析主题。
- 选择数据。点击公共数据,选择xtzg文件夹下的doris_log_user_event_result,然后确定。
3.5 创建各个组件
- 绘制漏斗图
-
绘制柱状图
-
用同样的方式,可以绘制其他图
3.6 创建仪表盘
- 点击下方添加仪表盘,重命名为:小兔智购用户画像
- 添加各个组件:通过拖拽的方式,添加绘制好的各个组件
3.7 图表发布
可以按照文档操作https://help.fanruan.com/finebi/doc-view-2108.html