SparkSQL案例
**案例描述:**某系统存储有用户的基本信息,包括用户的姓名、身份证号、手机号码。
数据集: 有用得着的评论或私信即可
需求分析:
- 将表中的数据进行过滤,只保留 80 后、90 后、00 后的用户信息,并存入新的 Hive 分区表中,以年代为分区字段。
- 查询过滤后的表中,90 后的占比。
- 查询过滤后的表中,各个省份的人数及占比。
- 查询 00 后中性别的占比。
Hive建表语句
# 用户基础信息表 create table if not exists users( username string, idcard string, phone string ) row format delimited fields terminated by ',' lines terminated by '\n'; # 身份证地址信息对照表 create table if not exists idcard_info( idcard string, province string, city string, country string ) row format delimited fields terminated by ',' lines terminated by '\n'; # 新的分区表准备 create table if not exists filtered_users( username string, idcard string, phone string, birthday string, age int, gender string, province string, city string, country string ) partitioned by (era string) row format delimited fields terminated by ',' lines terminated by '\n';
"""
案例描述: 在某项目中有用户信息表、身份证地址对照表
用户信息表 users:
- 用户名 username
- 身份证号 idcard
- 手机号 phone
身份证地址对照表 idcard_info:
- 身份证号 idcard
- 省份 province
- 市 city
- 区县 country
- 将表中的数据进行过滤,只保留 80 后、90 后、00 后的用户信息,并存入新的 Hive 分区表中,以年代为分区字段。
- 查询过滤后的表中,每个年龄段的占比。
- 查询过滤后的表中,各个省份的人数及占比。
- 查询 00 后中女性的占比。
"""
import os
import re
import datetime
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType, StringType, BooleanType
os.environ.setdefault("HADOOP_USER_NAME", "root")
def idcard_checker(idcard: str) -> bool:
"""
检查一个身份证号是否合法
:param idcard: 身份证号
:return: 检验结果
"""
check_res = re.fullmatch(r'(\d{6})'
r'(?P<year>(19|20)\d{2})(?P<month>0[1-9]|1[0-2])(?P<day>[012][0-9]|10|20|30|31)\d{2}'
r'(?P<gender>\d)[0-9xX]',
idcard)
return check_res is not None
def get_year(idcard: str) -> int:
"""
从一个身份证中查询年
:param idcard: 身份证号
:return: 年
"""
return int(idcard[6:10])
def get_month(idcard: str) -> int:
"""
从一个身份证中查询月
:param idcard: 身份证号
:return: 月
"""
return int(idcard[10:12])
def get_day(idcard: str) -> int:
"""
从一个身份证中查询日
:param idcard: 身份证号
:return: 日
"""
return int(idcard[12:14])
def get_birthday(idcard: str) -> str:
"""
从一个身份证中查询生日
:param idcard: 身份证号
:return: 生日
"""
return "-".join([idcard[6:10], idcard[10:12], idcard[12:14]])
def get_gender(idcard: str) -> str:
"""
从一个身份证中查询性别
:param idcard: 身份证号
:return: 性别
"""
return '男' if int(idcard[-2]) % 2 != 0 else '女'
def get_era(idcard: str) -> str:
"""
从一个身份证中查询年代
:param idcard: 身份证号
:return: 年代
"""
return f"{idcard[8]}0"
def get_age(idcard: str) -> int:
"""
从一个身份证中查询年龄
:param idcard: 身份证号
:return: 年龄
"""
year = get_year(idcard)
month = get_month(idcard)
day = get_month(idcard)
now = datetime.datetime.now()
age = now.year - year
if now.month < month:
age -= 1
elif now.month == month and now.day < day:
age -= 1
return age
def get_addr(idcard: str) -> str:
"""
从一个身份证中查询地址信息
:param idcard: 身份证号
:return: 地址信息
"""
return idcard[0:6]
with SparkSession\
.builder.master("local[*]")\
.appName("exercise")\
.enableHiveSupport()\
.config("hive.exec.dynamic.partition.mode", "nonstrict")\
.getOrCreate() as spark:
# 注册 UDF 函数
spark.udf.register("get_year", get_year, IntegerType())
spark.udf.register("get_month", get_month, IntegerType())
spark.udf.register("get_day", get_day, IntegerType())
spark.udf.register("get_gender", get_gender, StringType())
spark.udf.register("get_age", get_age, IntegerType())
spark.udf.register("get_era", get_era, StringType())
spark.udf.register("get_birthday", get_birthday, StringType())
spark.udf.register("idcard_checker", idcard_checker, BooleanType())
spark.udf.register("get_addr", get_addr, StringType())
# 将身份证中的信息都提取出来
spark.sql("""
select
username,
idcard,
phone,
get_birthday(idcard) birthday,
get_age(idcard) age,
get_gender(idcard) gender,
get_era(idcard) era
from
mydb.users
where
idcard_checker(idcard) == true
""").createTempView("tmp_user")
# 连接上地址信息进行查询,并将结果写出到表中
# spark.sql("""
# insert into mydb.filtered_users partition(era)
# select
# username,
# tmp_user.idcard,
# phone,
# birthday,
# age,
# gender,
# province,
# city,
# country,
# era
# from
# tmp_user
# join
# mydb.idcard_info
# on
# mydb.idcard_info.idcard == get_addr(tmp_user.idcard)
# where
# tmp_user.era == 10 or tmp_user.era == 80 or tmp_user.era == 90 or tmp_user.era == 00
# """)
# - 查询过滤后的表中,每个年龄段的占比。
# spark.sql("""
# select distinct
# era,
# count(*) over(partition by era) / count(*) over() rate
# from
# mydb.filtered_users
# """).show()
# - 查询过滤后的表中,各个省份的人数及占比。
# spark.sql("""
# select distinct
# province,
# count(*) over(partition by province) / count(*) over() rate
# from
# mydb.filtered_users
# order by
# rate desc
# """).show(50)
# - 查询00后中男女的占比。
spark.sql("""
select distinct
gender,
count(*) over(partition by gender) / count(*) over() rate
from
mydb.filtered_users
where era = '00';
""").show()