学习大数据DAY61 宽表加工

目录

模型设计

加工宽表

任务调度:


大表 - 把很多数据整合起来
方便后续的明细查询和指标计算

模型设计

设计 建模
设计: excel 文档去编写
建模: 使用建模工具 PowerDesigner Navicat 在线画图工具... 把表结构给绘
制出来
共享\项目课工具\pd

加工宽表

数据层 DWS 层
dws_lijinquan.dws_xbd_mxm_memberinfo_dim_t
dws_shihaihong.dws_xbd_mxm_memberinfo_dim_t:
Python:
#!/bin/python3
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from datetime import datetime
import os
import re
# 宽表加工
# pyspark + spark sql
# 宽表加工
# pyspark + spark sql
spark = SparkSession.builder \
.master("local[*]") \
.appName("app") \
.config('spark.driver.memory', '6g') \
.config('spark.executor.memory', '6g') \
.config('spark.sql.parquet.writeLegacyFormat', 'true') \
.config("hive.metastore.uris", "thrift://cdh01:9083") \
.enableHiveSupport() \
.getOrCreate()
# 会员检测临时表
def do_member_tmp_check():
sql = '''
SELECT
member_id AS member, -- 会员卡号
MIN(CASE WHEN detect_time = max_detect_time THEN erp_code END)
AS rec_detect_store, -- 最近检测门店
max(detect_time) AS rec_detect_date, -- 最近检测时间
count(1) AS check_count, -- 累计检测次数
min(substr(detect_time,1,10)) AS filing_date, -- 建档时间
min(CASE WHEN detect_time = min_detect_time THEN erp_code END)
AS store_name, -- 建档门店名称
max(extend) AS is_anamnesis, -- 有无既往病史
CASE WHEN COUNT(bec_chr_mbr_date) > 0 THEN 1 ELSE 0 END AS
is_chr_mbr -- 是否特慢病会员
FROM (
SELECT
*,
MIN(detect_time) OVER (PARTITION BY member_id) AS
min_detect_time,
MAX(detect_time) OVER (PARTITION BY member_id) AS
max_detect_time
FROMchange_shihaihong.his_chronic_patient_info_new
)
GROUP BY member_id
'''
df = spark.sql(sql)
df.show()
# 保存到 hive: change_shihaihong.member_check
os.system("hadoop fs -mkdir -p
/zhiyun/shihaihong/change/member_check")
os.system("""
beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e '
create database if not exists change_shihaihong location
"/zhiyun/shihaihong/change";
'
""")
df.write.format("hive") \
.mode("overwrite") \
.option("fileFormat","parquet") \
.option("location","/zhiyun/shihaihong/change/member_check")
\
.saveAsTable("change_shihaihong.member_check")
print("写入 hive 表成功")
# 会员订单情况临时表
def do_member_tmp_sale():
@F.udf()
def handle_pay_fav_type(val1,val2,val3):
payments = {
"银行卡": val1,
"手机支付": val2,
"现金": val3
}
payment_tuples = list(payments.items())
payment_tuples.sort(key=lambda x: (-x[1], x[0])) #使用
负值来确保从大到小排序
result_strings = [method for method, _ in payment_tuples]
# 使用>符号连接字符串result = '>'.join(result_strings)
return result
sql='''
select
m.member, -- 会员卡号
count(1) as order_total, -- 总订单数
round(sum(m.precash),2) as order_amount, -- 消费总额
max(m.starttime) as last_order_date, -- 最后一单日期
min(m.starttime) as first_order_date, -- 首单日期
count(case when m.starttime >= date_sub('2018-01-01',
30) then 1 end) as order_30, -- 30 天订单量
count(case when m.starttime >= date_sub('2018-01-01',
90) then 1 end) as order_90, -- 90 天订单量
sum(case when m.starttime >= date_sub('2018-01-01', 30)
then round(m.precash,2) else 0 end) as amount_30, -- 30 天消费金
额
sum(case when m.starttime >= date_sub('2018-01-01', 90)
then round(m.precash,2) else 0 end) as amount_90, -- 90 天消费金
额
sum(case when m.starttime >= date_sub('2018-01-01',
180) then round(m.precash,2) else 0 end) as amount_180, -- 180
天消费金额
count(case when eusp.paytype = '银行卡 ' then 1 end) as
bank_count,
count(case when eusp.paytype = '手机支付' then 1 end)
as credit_card_count,
count(case when eusp.paytype = '现金 ' then 1 end) as
cash_count,
'' as pay_fav_type
from change_shihaihong.erp_u_sale_m_inc_new m
left join change_shihaihong.erp_u_sale_pay_inc_new eusp
on m.saleno = eusp.saleno -- 确保连接条件是正确的
group by m.member
'''
df = spark.sql(sql)
df = df.withColumn("pay_fav_type",
handle_pay_fav_type("bank_count","credit_card_count","cash_cou
nt"))
df.drop("bank_count")
df.drop("credit_card_count")
df.drop("cash_count")
df.show()# 保存到 Hive: change_shihaihong.member_sale
df.write.format("hive") \
.mode("overwrite") \
.option("fileFormat", "parquet") \
.option("location",
"/zhiyun/shihaihong/change/member_sale") \
.saveAsTable("change_shihaihong.member_sale")
print("临时表保存成功")
print("写入 hive 表成功")
# 宽表加工
def do_member_table():
# 主表可以列最多的那个表
# 会员的订单情况可以用子查询统计出来, 也可以使用临时表
sql = '''
with t as (select * from
change_shihaihong.crm_user_base_info_his_new )
select
t.user_id as mbr_code, -- 会员编码
t.user_type as mbr_type, -- 会员类型
t.source as mbr_resource, -- 会员来源
m.memcardno as mbr_cardno, -- 会员卡号
t.erp_code as store_code, -- 注册门店编码
t.active_time as sto_reg_date, -- 门店注册日期
"" as reg_platform, -- 注册外部平台
"" as platform_reg_date, -- 外部平台注册时间
t.name as name, -- 姓名
t.sex as gender, -- 性别
t.birthday as birthdate, -- 出生日期
t.age as age, -- 年龄
t.id_card_no as mbr_id_card, -- 身份证号
t.social_insurance_no as social_security_no, -- 社保卡号
t.education as edu_background, -- 教育背景
t.job as profession, -- 职业
"未知" as is_marriage, -- 婚姻状况
"无" as have_children, -- 是否有孩
t.address as address, -- 通信地址
"" as region, -- 区域
m.province as province, -- 省
m.city as city, -- 城市t.last_subscribe_time as cancel_date, -- 注销时间
m.tel as phone, -- 联系电话
m.handset as cell_phone, -- 手机号
t.email as email, -- 邮箱
t.wechat as wechat, -- 微信账号
t.webo as weibo, -- 微博账号
"" as alipay, -- 支付宝账号
"" as app, -- APP 账号
sale.order_total as order_total, -- 总订单数
sale.order_amount as order_amount, -- 消费总额
sale.last_order_date as last_order_date, -- 最后一单日期
sale.first_order_date as first_order_date, -- 首单日期
sale.order_30 as order_30, -- 30 天订单量
sale.order_90 as order_90, -- 90 天订单量
sale.amount_30 as amount_30, -- 30 天消费金额
sale.amount_90 as amount_90, -- 90 天消费金额
sale.amount_180 as amount_180, -- 180 天消费金额
sale.pay_fav_type as pay_fav_type, -- 付款方式偏爱排行
g.groupname as group_name, -- 会员分组
"" as ware_buy_sort, -- 药品购买排行
m.ness as sickness_motion, -- 疾病关注
check.rec_detect_store as rec_detect_store, -- 最近检测门
店
check.rec_detect_date as rec_detect_date, -- 最近检测时间
check.check_count as check_count, -- 累计检测次数
check.filing_date as filing_date, -- 建档时间
check.store_name as store_name, -- 建档门店名称
check.is_anamnesis as is_anamnesis, -- 有无既往病史
check.is_chr_mbr as is_chr_mbr, -- 是否特慢病会员
current_timestamp as etl_time, -- ETL 时间
"ETL by qinyuxiao" as comments -- 备注信息
from t
left join change_shihaihong.erp_u_memcard_reg_full_new m on
m.scrm_userid = t.user_id
left join change_shihaihong.member_sale sale on sale.member =
m.memcardno
left join change_shihaihong.member_check check on
check.member = m.memcardno
left join dwd_qinyuxiao.erp_c_memcard_class_group g on
sale.order_amount >=g.lg and sale.order_amount < g.gt
'''
df = spark.sql(sql)df.show()
# 保存
# 保存到 hive: dws_xbd_mxm_memberinfo_dim_t
os.system("hadoop fs -mkdir -p
/zhiyun/shihaihong/dws/dws_xbd_mxm_memberinfo_dim_t")
os.system("""
beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e '
create database if not exists dws_shihaihong location
"/zhiyun/shihaihong/dws";
'
""")
df.write.format("hive") \
.mode("overwrite") \
.option("fileFormat","parquet") \
.option("location","/zhiyun/shihaihong/dws/dws_xbd_mxm_memb
erinfo_dim_t"). \
saveAsTable("dws_shihaihong.dws_xbd_mxm_memberinfo_dim_t")
print("写入 hive 表成功")
# 验证数据
# 注意总数据量应该跟 CRM 表一致 168W 整 多一条都不行
# 计算原表的总记录数
original_count_sql = "select count(1) from
change_shihaihong.crm_user_base_info_his_new"
original_count =
spark.sql(original_count_sql).collect()[0][0]
print(f"原表总记录数: {original_count}")
# 计算新表的总记录数
new_count_sql = "select count(1) from
dws_shihaihong.dws_xbd_mxm_memberinfo_dim_t"
new_count = spark.sql(new_count_sql).collect()[0][0]
print(f"新表总记录数: {new_count}")
do_member_tmp_check()
do_member_tmp_sale()
do_member_table()
print("宽表加工完成")# 部署

任务调度:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/919000.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

ChromeDriver驱动下载地址更新(保持最新最全)

说明&#xff1a; ChromeDriver 是 Selenium WebDriver 用于控制 Chrome 的独立可执行文件。 为了方便下载使用&#xff0c;本文保持ChromeDriver的最新版本更新&#xff0c;并提供115.0.5763.0-133.0.6841.0版本的下载地址&#xff1a; 所有版本和下载地址&#xff1a; &am…

QT基本绘图

QT绘图 1.概述 这篇文章介绍如何绘图 2.绘图基本操作 创建一个普通的widget类型的项目 在widget.h 文件中重写绘图事件 #ifndef WIDGET_H #define WIDGET_H#include <QWidget>QT_BEGIN_NAMESPACE namespace Ui { class Widget; } QT_END_NAMESPACEclass Widget : p…

[ACTF2020]Upload 1--详细解析

信息收集 题目告诉我们是一道upload&#xff0c;也就是文件上传漏洞题目。 进入界面&#xff0c;是一个灯泡&#xff0c;将鼠标放在图标上就会出现文件上传的相应位置&#xff1a; 思路 文件上传漏洞&#xff0c;先看看有没有前端校验。 在js源码中找到了前端校验&#xff…

Android Studio开发学习(五)———LinearLayout(线性布局)

一、布局 认识了解一下Android中的布局&#xff0c;分别是: LinearLayout(线性布局)&#xff0c;RelativeLayout(相对布局)&#xff0c;TableLayout(表格布局)&#xff0c; FrameLayout(帧布局)&#xff0c;AbsoluteLayout(绝对布局)&#xff0c;GridLayout(网格布局) 等。 二、…

计算机视觉在自动驾驶汽车中的应用

&#x1f493; 博客主页&#xff1a;瑕疵的CSDN主页 &#x1f4dd; Gitee主页&#xff1a;瑕疵的gitee主页 ⏩ 文章专栏&#xff1a;《热点资讯》 计算机视觉在自动驾驶汽车中的应用 计算机视觉在自动驾驶汽车中的应用 计算机视觉在自动驾驶汽车中的应用 引言 计算机视觉在自动…

表格的选择弹窗,选中后返显到表格中

项目场景&#xff1a; 提示&#xff1a;这里简述项目相关背景&#xff1a; 表格的下拉框可以直接显示选项&#xff0c;那如果选择框不是下拉的&#xff0c;而是弹窗&#xff0c;那么在表格中如何返显呢&#xff1f; 问题描述 如上图所示&#xff0c;点击表格中的选择&#xf…

金融领域先锋!海云安成功入选2024年人工智能先锋案例集

近日&#xff0c;中国人工智能产业发展联盟《2024年人工智能先锋案例集》&#xff08;以下简称“AIIA先锋案例集”&#xff09;在中国人工智能产业发展联盟第十三次全体会议上正式发布。该案例集由人工智能产业发展联盟&#xff08;AIIA&#xff09;、工业和信息化部新闻宣传中…

HarmonyOs鸿蒙开发实战(16)=>沉浸式效果第一种方案一窗口全屏布局方案

1.沉浸式效果的目的 开发应用沉浸式效果主要指通过调整状态栏、应用界面和导航条的显示效果来减少状态栏导航条等系统界面的突兀感&#xff0c;从而使用户获得最佳的UI体验。 2.窗口全屏布局方案介绍 调整布局系统为全屏布局&#xff0c;界面元素延伸到状态栏和导航条区域实现沉…

OpenAI震撼发布:桌面版ChatGPT,Windows macOS双平台AI编程体验!

【雪球导读】 「OpenAI推出ChatGPT桌面端」 OpenAI重磅推出ChatGPT桌面端&#xff0c;全面支持Windows和macOS系统&#xff01;这款新工具为用户在日常生活和工作中提供了前所未有的无缝交互体验。对于那些依赖桌面端进行开发工作的专业人士来说&#xff0c;这一更新带来了令人…

Android OpenGLES2.0开发(八):Camera预览

严以律己&#xff0c;宽以待人 引言 终于到该章节了&#xff0c;还记得Android OpenGLES2.0开发&#xff08;一&#xff09;&#xff1a;艰难的开始章节说的吗&#xff1f;写这个系列的初衷就是因为每次用到GLSurfaceViewCamera预览时&#xff0c;总是CtrlC、CtrlV从来没有研究…

独立站干货:WordPress主机推荐

WordPress作为全球最受欢迎的独立站建设平台&#xff0c;提供了灵活性和强大的功能&#xff0c;使得建站变得简单而高效。本文将为您详细介绍WordPress建站的流程&#xff0c;并推荐几款实测后觉得好用的主机商。 WordPress建站流程 域名注册 首先需要注册一个域名&#xff0c…

细说STM32单片机DMA中断收发RTC实时时间并改善其鲁棒性的方法

目录 一、DMA基础知识 1、DMA简介 (1)DMA控制器 (2)DMA流 (3)DMA请求 (4)仲裁器 (5)DMA传输属性 2、源地址和目标地址 3、DMA传输模式 4、传输数据量的大小 5、数据宽度 6、地址指针递增 7、DMA工作模式 8、DMA流的优先级别 9、FIFO或直接模式 10、单次传输或突…

基于Spring Boot+Vue的多媒体素材管理系统的设计与实现

一.系统开发工具与环境搭建 1.系统设计开发工具 后端使用Java编程语言的Spring boot框架 项目架构&#xff1a;B/S架构 运行环境&#xff1a;win10/win11、jdk17 前端&#xff1a; 技术&#xff1a;框架Vue.js&#xff1b;UI库&#xff1a;ElementUI&#xff1b; 开发工具&…

如何禁用关闭奇安信天擎开机自启动教程

前言 公司要求我们员工每个电脑上都要安装奇安信防护软件&#xff0c;但是身为开发&#xff0c;这个软件占内存不说&#xff0c;还禁用我们电脑上todesk等远程软件&#xff0c;因为我们给客户部署的项目&#xff0c;部署的有软件服务&#xff0c;经常需要用到todesk等远程软件…

[Docker#8] 容器配置 | Mysql | Redis | C++ | 资源控制 | 命令对比

目录 一&#xff1a;Mysql 容器化安装 二&#xff1a;Redis 容器化安装 Redis 简介 Redis 容器创建 三&#xff1a;C容器制作 四&#xff1a;容器资源更新 常见问题 一&#xff1a;Mysql 容器化安装 进入 mysql 的镜像网站&#xff0c;查找 mysql 的镜像 mysql docker…

CentOS 修改服务器登录密码的完整指南

个人名片 &#x1f393;作者简介&#xff1a;java领域优质创作者 &#x1f310;个人主页&#xff1a;码农阿豪 &#x1f4de;工作室&#xff1a;新空间代码工作室&#xff08;提供各种软件服务&#xff09; &#x1f48c;个人邮箱&#xff1a;[2435024119qq.com] &#x1f4f1…

深入理解Redis(七)----Redis实现分布式锁

基于Redis的实现方式 1、选用Redis实现分布式锁原因&#xff1a; &#xff08;1&#xff09;Redis有很高的性能&#xff1b; &#xff08;2&#xff09;Redis命令对此支持较好&#xff0c;实现起来比较方便 2、使用命令介绍&#xff1a; &#xff08;1&#xff09;SETNX SETNX …

Uniapp运行环境判断和解决跨端兼容性详解

Uniapp运行环境判断和解决跨端兼容性 开发环境和生产环境 uniapp可通过process.env.NODE_ENV判断当前环境是开发环境还是生产环境&#xff0c;一般用于链接测试服务器或者生产服务器的动态切换。在HX中&#xff0c;点击运行编译出来的代码是开发环境&#xff0c;点击发行编译…

WPF MVVM框架

一、MVVM简介 MVC Model View Control MVP MVVM即Model-View-ViewModel&#xff0c;MVVM模式与MVP&#xff08;Model-View-Presenter&#xff09;模式相似&#xff0c;主要目的是分离视图&#xff08;View&#xff09;和模型&#xff08;Model&#xff09;&#xff0c;具有低…

【nginx】client timed out和send_timeout的大小设置

websocket连接会断开&#xff0c;抓包检查后发现是中间的代理服务器nginx断开的&#xff0c;同时将后端和浏览器都断开了。将nginx日志调到debug级别后&#xff0c;有下面的断开信息。 [info] 125923#125923: *34 client timed out (110: Connection timed out) while proxyin…