学习大数据DAY62 指标计算

客户需求

第一张汇总报表需要的指标 - 决策报表 汇总表 每次计算只有一天的记录 -
大 BOSS:
- 全部会员数 新增会员数
- 有效会员数 有效会员占比
- 流失会员数: 倒推一年含一年无消费记录的会员
- 净增有效会员数
- 会员消费级别分类人数 (A >=2000 B >=1000 < 2000 C >=500 <1000
D >=100 <500 E <100)
- 会员消费总额
- 会员消费总次数
- 会员消费单价
- 流失会员数
- 新增流失会员
- 60 天会员复购率
- 180 天会员复购率
- 365 天会员复购率
第二张报表用于市场营销 - 明细报表, 普通报表 - 市场部同事
- 筛选大于 30笔的会员或者消费总金额大于 1500 的会员作为目标用户 用于电 话营销
- 字段: 姓名 手机号 消费总额 消费次数 城市 门店 付款偏好 (手机 刷卡 现
金..) 关注的疾病
- 该会员最近 3 个月的月消费订单数和额度 m1_total m1_sale m2_total
m2_sale
第三张报表用于市场营销 - 2022.1-2023.12 每个月消费前 20 的会员名单
24X20=480 条 - 市场部经理
- T+1(月) yyyy-mm 月份
- 会员姓名 联系方式... 消费级别分类, 最近 30 天消费订单数和总额
- 该会员当月前 30 天消费, 60 天消费, 90 天消费 (困难点)
- 报表排序方式: 默认按消费总额倒序 / 按消费次数倒序
- 报表默认显示 2021 年 1 月份的数据, 可选 2 年内任何一个月的数据查看
指标表需要存放在 DM 层
dm_lijinquan.xxx
应该有三个指标表 保存有通过 SQL 计算的结果数据
理解指标的业务的意思 =>
SQL 计算
=> 得到结果 => 保存到结果表
4.指标说明.xlsx

作业

使用模型工具设计这 3 张表 建模 字段名称随意 表名随意 存储的格式为
textfile
使用 spark sql 计算里面的结果, 部署到调度平台
Summary_table:
Python:
#!/bin/python3
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import os
#汇总表
spark = SparkSession.builder \
.master("local[*]") \
.appName("app") \
.config('spark.driver.memory', '6g') \
.config('spark.executor.memory', '6g') \
.config('spark.sql.parquet.writeLegacyFormat', 'true') \
.config("hive.metastore.uris", "thrift://cdh01:9083") \
.enableHiveSupport() \
.getOrCreate()def temp_table():
table_name='total_report_table'
@F.udf()
def a_division_b(val1,val2):
return f"{(val1/val2):.2f}"
@F.udf()
def handle_inc_vaild_members(val1,val2):
return val1-val2
sql='''
--today is 2023-07-24
with
t as(
select member,
count(case when stamp >= trunc('2023-07-24','MM') and
stamp <= last_day('2023-07-24') then 1 end) as
this_months_members,
--当月有消费的会员数
count(case when stamp >= date_sub('2023-07-24',60) and
stamp <= date_sub('2023-07-24', 1) then 1 end) as 60days,
count(case when stamp >= date_sub('2023-07-24',180) and
stamp <= date_sub('2023-07-24', 1) then 1 end) as 180days,
count(case when stamp >= date_sub('2023-07-24',365) and
stamp <= date_sub('2023-07-24', 1) then 1 end) as 365days
from dwd_tanghongjian.erp_u_sale_m eusm
group by member),
tt as (
select
count(case when this_months_members>0 then 1 end) as
this_months_members,
count(case when this_months_members>0 and 60days > 0 then
1 end) as 60day,
count(case when this_months_members>0 and 180days > 0 then
1 end) as 180day,
count(case when this_months_members>0 and 365days > 0 then
1 end) as 365day
from t),
a as (select * from
dws_tanghongjian.dws_xbd_mxm_memberinfo_dim_t),
aa as (
select
count(a.mbr_cardno) as all_members,
--全部会员数
count(case when a.first_order_date >=
trunc('2023-07-24','MM') and a.first_order_date <=
last_day('2023-07-24') then 1 end) as inc_members, --新增会员数
(当月新增首次消费会员数)
count(case when a.last_order_date >=
date_sub(last_day('2023-07-24'), 180) then 1 end ) as
vaild_members,
--有效会员数 (按当月月末,倒推最近 180 天有消费的会
员)
"" as per_members,
--有效会员占比 (有效会员数 / 全
部会员数)
count(case when a.last_order_date <=
date_sub(last_day('2023-07-24'), 365) then 1 end ) as
loss_members,
--流失会员数 (按当月月末,倒推一年含一年)无消费
记录的会员)
"" as inc_vaild_members,
--净增有效会员数 (新增有效
会员 - 流失有效会员)
count(case when a.group_name='A 星用户组' then 1 end)
as A_members,
count(case when a.group_name='B 星用户组' then 1 end)
as B_members,
count(case when a.group_name='C 星用户组' then 1 end)
as C_members,
count(case when a.group_name='D 星用户组' then 1 end)
as D_members,
count(case when a.group_name='E 星用户组' then 1 end)
as E_members,
round(sum(a.order_amount),2) as total_consume,
--
会员消费总额
sum(a.order_total) as total_consume_times,
--会员消
费总次数
"" as consume_unit_price,
--会员消费单价(会员含税销
售收入 / 会员来客数)
count(case when a.last_order_date <=
date_sub(last_day('2023-07-24'), 365) and a.last_order_date >
add_months(last_day('2023-07-24'), -13) then 1 end ) as
new_loss_members,
--新增流失会员--当月有消费的会员向前 60 天内仍有消费的会员数/当月有消费
的会员数、如客户 A 在 10 月 7 日有消费,则向前推 8 月 7 日-10 月 6 日有消费,
则视为 60 天复购会员
"" as 60_days,
--60 天会员复购率
"" as 180_days,
--180 天会员复购率
"" as 365_days
--365 天会员复购率
from a)
select date_format(current_date(), 'yyyy-MM-dd') as
time_,aa.*,tt.* from aa cross join tt
'''
df = spark.sql(sql)
#有效会员占比 (有效会员数 / 全部会员数)
df = df.withColumn("per_members",
a_division_b("vaild_members","all_members"))
#净有效会员数 (有效会员 - 流失会员)
df = df.withColumn("inc_vaild_members",
handle_inc_vaild_members("vaild_members","loss_members"))
#会员消费单价
df = df.withColumn("consume_unit_price",
a_division_b("total_consume","all_members"))
#60 天会员复购率
df = df.withColumn("60_days",
a_division_b("60day","this_months_members"))
#90 天会员复购率
df = df.withColumn("180_days",
a_division_b("180day","this_months_members"))
#180 天会员复购率
df = df.withColumn("365_days",
a_division_b("365day","this_months_members"))
del_columns=['this_months_members','60day','180day','365day
']
dfs = df.drop(*del_columns)
dfs.show()
saveAsTable(dfs,table_name)
def saveAsTable(df,table_name):
# 保存
# 创建 HDFS 路径os.system(f"hadoop fs -mkdir -p
/zhiyun/shihaihong/dm/{table_name}")
# 创建数据
os.system('''
beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e '
create database if not exists dm_shihaihong location
"/zhiyun/shihaihong/dm";
'
''')
# 保存到 hive 表里
# parquet 文件格式 类似 orc
因为 spark 会把 orc 表的版本存成高版
本
df.write.format("hive") \
.mode("overwrite") \
.option("fileFormat","TextFile") \
.option("location",f"/zhiyun/shihaihong/dm/{table_name}
"). \
saveAsTable(f"dm_shihaihong.{table_name}")
print("保存成功")
temp_table()

任务调度:

Sales_Analysis_Report:

Python:
#!/bin/python3
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import os
#筛选大于 30 笔的会员或者消费总金额大于 1500 的会员作为目标用户spark = SparkSession.builder \
.master("local[*]") \
.appName("app") \
.config('spark.driver.memory', '6g') \
.config('spark.executor.memory', '6g') \
.config('spark.sql.parquet.writeLegacyFormat', 'true') \
.config("hive.metastore.uris", "thrift://cdh01:9083") \
.enableHiveSupport() \
.getOrCreate()
def temp_table():
table_name='datail_table'
sql='''
with t as (
select member,
count(case when stamp >= add_months('2023-07-24', -1) and
stamp <= '2023-07-24' then 1 end) as m1_total,
sum(case when stamp >= add_months('2023-07-24', -1) and
stamp <= '2023-07-24' then precash end) as m1_sale,
count(case when stamp >= add_months('2023-07-24', -2) and
stamp <= add_months('2023-07-24', -1) then 1 end) as m2_total,
sum(case when stamp >= add_months('2023-07-24', -2) and
stamp <= add_months('2023-07-24', -1) then precash end) as m2_sale,
count(case when stamp >= add_months('2023-07-24', -3) and
stamp <= add_months('2023-07-24', -2) then 1 end) as m3_total,
sum(case when stamp >= add_months('2023-07-24', -3) and
stamp <= add_months('2023-07-24', -2) then precash end) as m3_sale
from dwd_tanghongjian.erp_u_sale_m eusm
group by member
),a as (
select * from dws_tanghongjian.dws_xbd_mxm_memberinfo_dim_t
dxmmdt
WHERE order_total > 30 or order_amount>1500)
SELECT
a.name,
a.phone,
a.order_total,
a.order_amount,
a.address,a.store_code,
a.pay_fav_type,
a.sickness_motion,
t.m1_total,
coalesce(t.m1_sale, 0) AS m1_sale,
t.m2_total,
coalesce(t.m2_sale, 0)+coalesce(t.m1_sale, 0) AS m2_sale,
t.m3_total,
coalesce(t.m3_sale, 0)+coalesce(t.m1_sale,
0)+coalesce(t.m2_sale, 0) AS m3_sale
from a left join t on a.mbr_cardno = t.member
'''
df = spark.sql(sql)
df.show()
saveAsTable(df,table_name)
def saveAsTable(df,table_name):
# 保存
# 创建 HDFS 路径
os.system(f"hadoop fs -mkdir -p
/zhiyun/shihaihong/dm/{table_name}")
# 创建数据
os.system('''
beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e '
create database if not exists dm_shihaihong location
"/zhiyun/shihaihong/dm";
'
''')
# 保存到 hive 表里
# parquet 文件格式 类似 orc
因为 spark 会把 orc 表的版本存成高版
本
df.write.format("hive") \
.mode("overwrite") \
.option("fileFormat","TextFile") \
.option("location",f"/zhiyun/shihaihong/dm/{table_name}
"). \
saveAsTable(f"dm_shihaihong.{table_name}")
print("保存成功")temp_table()
任务调度:

python:
#!/bin/python3
from pyspark.sql import SparkSession
from pyspark.sql import functions as Fimport os
#月消费情况 - 报表
spark = SparkSession.builder \
.master("local[*]") \
.appName("app") \
.config('spark.driver.memory', '6g') \
.config('spark.executor.memory', '6g') \
.config('spark.sql.parquet.writeLegacyFormat', 'true') \
.config("hive.metastore.uris", "thrift://cdh01:9083") \
.enableHiveSupport() \
.getOrCreate()
def temp_table():
table_name='front_20'
sql='''
with a as (
select member,
date_format(stamp, 'yyyy-MM') as year_month,
sum(precash) as total_consumption
from dwd_tanghongjian.erp_u_sale_m
where stamp between '2022-01-01' and '2023-12-31'
group by member,date_format(stamp, 'yyyy-MM')),
aa as (
select member,year_month,total_consumption,
row_number()over(partition by year_month order by
total_consumption desc) as r
from a),
re as (
select member,year_month,total_consumption
from aa where r<=20 order by year_month,total_consumption
desc),
t as (
select member,year_month,total_consumption,
lead(member,1,0)over(partition by member order by
year_month) as next_member,
lead(year_month,1,0)over(partition by member order by
year_month) as one_month,lead(year_month,2,0)over(partition by member order by
year_month) as two_month,
lead(year_month,3,0)over(partition by member order by
year_month) as three_month,
lead(total_consumption,1,0)over(partition by member
order by year_month) as one_month_total,
lead(total_consumption,2,0)over(partition by member
order by year_month) as two_month_total,
lead(total_consumption,3,0)over(partition by member
order by year_month) as three_month_total
from a ),
tt as (
select member,year_month,total_consumption,
case when member = next_member then
case when
(substr(one_month, 1, 4) = substr(year_month, 1, 4) AND
cast(substr(one_month, 6) as int) = cast(substr(year_month, 6) as
int) + 1)
OR
(substr(one_month, 1, 4) = cast(substr(year_month, 1,
4) as int) + 1 AND cast(substr(one_month, 6) as int) = 1)
then one_month_total else 0 end else 0 end as
one_month_total,
case when member = next_member then
case when
(substr(two_month, 1, 4) = substr(year_month, 1, 4) AND
cast(substr(two_month, 6) as int) = cast(substr(year_month, 6) as
int) + 2)
OR
(substr(two_month, 1, 4) = cast(substr(year_month, 1,
4) as int) + 1 AND cast(substr(two_month, 6) as int) = 2)
then two_month_total else 0 end else 0 end as
two_month_total,
case when member = next_member then
case when
(substr(three_month, 1, 4) = substr(year_month, 1, 4)
AND cast(substr(three_month, 6) as int) = cast(substr(year_month,
6) as int) + 3)
OR
(substr(three_month, 1, 4) = cast(substr(year_month,
1, 4) as int) + 1 AND cast(substr(three_month, 6) as int) = 3)
then three_month_total else 0 end else 0 end as
three_month_totalfrom t),
re2 as (
select member,year_month,total_consumption,
one_month_total as 30_days,one_month_total +
two_month_total as 60_days,
one_month_total + two_month_total + three_month_total as
90_days
from tt)
select
re.member,
ddd.name,
ddd.phone,
ddd.order_total,
ddd.order_amount,
ddd.address,
ddd.store_code,
ddd.pay_fav_type,
ddd.sickness_motion,
ddd.order_30,
ddd.amount_30,
re2.30_days,
re2.60_days,
re2.90_days
from re left join re2 on re.member = re2.member and
re.year_month = re2.year_month
left join dws_tanghongjian.dws_xbd_mxm_memberinfo_dim_t
ddd on re.member = ddd.mbr_cardno
'''
df = spark.sql(sql)
df.show()
saveAsTable(df,table_name)
def saveAsTable(df,table_name):
# 保存
# 创建 HDFS 路径
os.system(f"hadoop fs -mkdir -p
/zhiyun/shihaihong/dm/{table_name}")
# 创建数据
os.system('''
beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e 'create database if not exists dm_shihaihong location
"/zhiyun/shihaihong/dm";
'
''')
# 保存到 hive 表里
# parquet 文件格式 类似 orc
因为 spark 会把 orc 表的版本存成高版
本
df.write.format("hive") \
.mode("overwrite") \
.option("fileFormat","TextFile") \
.option("location",f"/zhiyun/shihaihong/dm/{table_name}
"). \
saveAsTable(f"dm_shihaihong.{table_name}")
print("保存成功")
temp_table()
任务调度:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/917817.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

快速上手 Vue 3 的高效组件库Element Plus

目录 前言1. 什么是组件&#xff1f;2. 安装与引入 Element Plus2.1 安装 Element Plus2.2 在 main.js 中引入 Element Plus 3. 使用 Element Plus 组件3.1 组件的基本使用3.2 控制组件状态 4. 常用组件实例解析4.1 表单与输入框4.2 表格与分页 5. 组件库的扩展性结语 前言 在…

自动驾驶车载SoC设计功能安全

我是穿拖鞋的汉子&#xff0c;魔都中坚持长期主义的汽车电子工程师。 老规矩&#xff0c;分享一段喜欢的文字&#xff0c;避免自己成为高知识低文化的工程师&#xff1a; 所有人的看法和评价都是暂时的&#xff0c;只有自己的经历是伴随一生的&#xff0c;几乎所有的担忧和畏惧…

【开源免费】基于Vue和SpringBoot的私人健身与教练预约管理系统(附论文)

本文项目编号 T 618 &#xff0c;文末自助获取源码 \color{red}{T618&#xff0c;文末自助获取源码} T618&#xff0c;文末自助获取源码 随着信息技术和网络技术的飞速发展&#xff0c;人类已进入全新信息化时代&#xff0c;传统管理技术已无法高效&#xff0c;便捷地管理信息…

【项目实战】基于 LLaMA-Factory 通过 LoRA 微调 Qwen2

【项目实战】基于 LLaMAFactory 通过 LoRA 微调 Qwen2 一、项目介绍二、环境准备1、环境准备2、安装LLaMa-Factory3、准备模型数据集3.1 模型准备3.2 数据集准备 三、微调1、启动webui2、选择参数3、训练 四、测试五、总结 一、项目介绍 LLaMA-Factory是一个由北京航空航天大学…

《Probing the 3D Awareness of Visual Foundation Models》论文解析——多视图一致性

一、论文简介 论文讨论了大规模预训练产生的视觉基础模型在处理任意图像时的强大能力&#xff0c;这些模型不仅能够完成训练任务&#xff0c;其中间表示还对其他视觉任务&#xff08;如检测和分割&#xff09;有用。研究者们提出了一个问题&#xff1a;这些模型是否能够表示物体…

C++ | Leetcode C++题解之第565题数组嵌套

题目&#xff1a; 题解&#xff1a; class Solution { public:int arrayNesting(vector<int> &nums) {int ans 0, n nums.size();for (int i 0; i < n; i) {int cnt 0;while (nums[i] < n) {int num nums[i];nums[i] n;i num;cnt;}ans max(ans, cnt);…

计算机前沿技术-人工智能算法-大语言模型-最新研究进展-2024-11-04

计算机前沿技术-人工智能算法-大语言模型-最新研究进展-2024-11-04 目录 文章目录 计算机前沿技术-人工智能算法-大语言模型-最新研究进展-2024-11-04目录1. Alopex: A Computational Framework for Enabling On-Device Function Calls with LLMs摘要&#xff1a;研究背景&…

智能运维:提升效率与响应速度的关键能力

在当今这个信息化高速发展的时代&#xff0c;运维工作的重要性日益凸显。一个高效、智能的运维系统不仅能够确保企业IT环境的稳定运行&#xff0c;还能在出现问题时迅速响应&#xff0c;最小化业务中断的影响。本文将深入探讨现代运维系统应具备的关键能力&#xff0c;包括告警…

Linux 下网络套接字(Socket) 与udp和tcp 相关接口

文章目录 1. socket常见API2 sockaddr结构体及其子类1. sockaddr结构体定义&#xff08;基类&#xff09;2. 子类 sockaddr_in结构体用于(IPv4)3 子类 sockaddr_un(Unix域套接字)4. 总结画出其结构体 3.实现一个简单的tcp Echo 服务器和客户端(cpp&#xff09;3.1 客户端3.2 服…

IPv6基础知识

IPv6是由IEIF提出的互聯網協議第六版&#xff0c;用來替代IPv4的下一代協議&#xff0c;它的提出不僅解決了網絡地址資源匱乏問題&#xff0c;也解決了多種接入設備接入互聯網的障礙。IPv6的地址長度為128位&#xff0c;可支持340多萬億個地址。如下圖&#xff0c;3ffe:1900:fe…

24首届数证杯(流量分析部分)

目录 流量分析 流量分析 1、分析网络流量包检材&#xff0c;写出抓取该流量包时所花费的秒数?(填写数字&#xff0c;答案格式:10) 3504相加即可 2、分析网络流量包检材&#xff0c;抓取该流量包时使用计算机操作系统的build版本是多少? 23F793、分析网络流量包检材&#x…

Linux(CentOS)安装达梦数据库 dm8

CentOS版本&#xff1a;CentOS 7&#xff0c;查看操作系统版本信息&#xff0c;请查阅 查看Linux内核版本信息 达梦数据库版本&#xff1a;dm8 一、获取 dm8 安装文件 1、下载安装文件 打开达梦官网&#xff1a;https://www.dameng.com/ 下载的文件 解压后的文件 2、上传安…

vue-i18n下载完报错

解决方法&#xff1a; 这是i18n版本太高了&#xff0c;与当前VUE版本不谦容&#xff1b; 查看版本&#xff1a;npm view vue-i18n versions 选择其中一个低版本&#xff0c;不要太低的 npm install vue-i18n7.3.22.可以删掉依赖包重新下载试试 报错类似如下&#xff1a; 1…/…

Docker环境搭建Cloudreve网盘服务(附shell脚本一键搭建)

Docker搭建Cloudreve Cloudreve介绍&#xff1a; Cloudreve 是一个基于 ThinkPHP 框架构建的开源网盘系统&#xff0c;旨在帮助用户以较低的成本快速搭建起既能满足个人也能满足企业需求的网盘服务。Cloudreve 支持多种存储介质&#xff0c;包括但不限于本地存储、阿里云OSS、…

凹凸/高度贴图、法线贴图、视差贴图、置换贴图异同

参考&#xff1a; 凹凸贴图、法线贴图、置换贴图-CSDN博客 视差贴图 - LearnOpenGL CN 1,Learn about Parallax(视差贴图) - 知乎 “视差贴图”的工作流程及原理(OpenGL) - 哔哩哔哩 法线与置换贴图原理讲解以及烘焙制作&#xff01; - 知乎 1. Bump Mapping 凹凸贴图 BumpMap…

Vant组件

结合项目学习下Vant组件。 Vue2&#xff1a;Vant 2 - Mobile UI Components built on Vue Vue3&#xff1a;Vant 4 - A lightweight, customizable Vue UI library for mobile web apps. 课程地址&#xff1a;【vue-vant组件库】 https://www.bilibili.com/video/BV1q5411E7…

【DEKF算法】DEKF(双扩展卡尔曼滤波算法)估计锂电池荷电状态,SOC与SOH联合仿真

摘要 本文研究了基于双扩展卡尔曼滤波&#xff08;DEKF&#xff09;算法对锂电池荷电状态&#xff08;SOC&#xff09;和健康状态&#xff08;SOH&#xff09;的估计问题。通过构建锂电池的等效电路模型&#xff08;ECM&#xff09;&#xff0c;将SOC与SOH联合估计&#xff0c…

4-3 AUTOSAR BSW IO抽象

返回总目录->返回总目录<- 目录 一、概述 二、示例接口 一、概述 在AUTOSAR中,IO抽象模块的主要作用是提供对硬件设备的控制和访问。它包括了以下几个主要模块: DIO(Digital Input/Output):用于控制数字输入和输出信号,例如控制LED灯的开关或读取按键状态…

【动手学深度学习Pytorch】1. 线性回归代码

零实现 导入所需要的包&#xff1a; # %matplotlib inline import random import torch from d2l import torch as d2l import matplotlib.pyplot as plt import matplotlib import os构造人造数据集&#xff1a;假设w[2, -3.4]&#xff0c;b4.2&#xff0c;存在随机噪音&…

【数据结构】树——顺序存储二叉树

写在前面 在学习数据结构前&#xff0c;我们早就听说大名鼎鼎的树&#xff0c;例如什么什么手撕红黑树大佬呀&#xff0c;那这篇笔记不才就深入浅出的介绍二叉树。 文章目录 写在前面一、树的概念及结构1.1、数的相关概念1.2、数的表示1.3 树在实际中的运用&#xff08;表示文…