pyspark分布式部署随机森林算法

前言

分布式算法的文章我早就想写了,但是一直比较忙,没有写,最近一个项目又用到了,就记录一下运用Spark部署机器学习分类算法-随机森林的记录过程,写了一个demo。

基于pyspark的随机森林算法预测客户

本次实验采用的数据集链接:https://pan.baidu.com/s/13blFf0VC3VcqRTMkniIPTA
提取码:DJNB

数据集说明
某运营商提供了不同用户3个月的使用信息,共34个特征,1个标签列,其中存在一定的重复值、缺失值与异常值。各个特征的说明如下:
MONTH_ID 月份
USER_ID 用户id
INNET_MONT 在网时长
IS_AGREE 是否合约有效客户
AGREE_EXP_DATE 合约计划到期时间
CREDIT_LEVEL 信用等级
VIP_LVL vip等级
ACCT_FEE 本月费用(元)
CALL_DURA 通话时长(秒)
NO_ROAM_LOCAL_CALL_DURA 本地通话时长(秒)
NO_ROAM_GN_LONG_CALL_DURA 国内长途通话时长(秒)
GN_ROAM_CALL_DURA 国内漫游通话时长(秒)
CDR_NUM 通话次数(次)
NO_ROAM_CDR_NUM 非漫游通话次数(次)
NO_ROAM_LOCAL_CDR_NUM 本地通话次数(次)
NO_ROAM_GN_LONG_CDR_NUM 国内长途通话次数(次)
GN_ROAM_CDR_NUM 国内漫游通话次数(次)
P2P_SMS_CNT_UP 短信发送数(条)
TOTAL_FLUX 上网流量(MB)
LOCAL_FLUX 本地非漫游上网流量(MB)
GN_ROAM_FLUX 国内漫游上网流量(MB)
CALL_DAYS 有通话天数
CALLING_DAYS 有主叫天数
CALLED_DAYS 有被叫天数
CALL_RING 语音呼叫圈
CALLING_RING 主叫呼叫圈
CALLED_RING 被叫呼叫圈
CUST_SEX 性别
CERT_AGE 年龄
CONSTELLATION_DESC 星座
MANU_NAME 手机品牌名称
MODEL_NAME 手机型号名称
OS_DESC 操作系统描述
TERM_TYPE 硬件系统类型(0=无法区分,4=4g,3=dg,2=2g)
IS_LOST 用户在3月中是否流失标记(1=是,0=否),1月和2月值为空(标签)

数据字段打印
在这里插入图片描述
将数据集放到hadoop的HDFS中,通过Saprk读取HDFS文件里面的CSV格式的数据集,通过hadoop命令上传本地数据集到HDFS:

hadoop fs -put ./USER_INFO_M.csv /data/test/USER_INFO_M.csv

查看HDFS中的数据集CSV文件:

hadoop fs -ls /data/test

在这里插入图片描述

Spark中搭建分布式随机森林模型

从上面的数据集可以看到,数据是一个二分类数据,IS_LOST就是需要预测的标签,所以只需要构建一个随机森林二分类模型就可以了。Spark中提供了用于机器学习算法的库MLlib,这个库里面包含了许多机器学习算法,监督学习和无监督学习算法都有,例如线性回归、随机森林、GBDT、K-means等等(没有sklearn中提供的算法多),但是和sklearn中的随机森林模型构建有区别的是spark中程序底层是基于RDD弹性分布式计算单元,所以基于RDD的DataFrame数据结构和python中的DataFrame结构不一样,写法就不一样,python程序写的随机森林算法是不能直接在Spark中运行的,我们需要按照Spark中的写法来实现随机森林模型的构建,直接看代码:

from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline
from pyspark.sql.functions import col
import time

start_time = time.time()
# 创建SparkSession
spark = SparkSession.builder.appName("RandomForestExample").getOrCreate()

# 读取数据集,数据集放在HDFS上
data = spark.read.csv("/data/test/USER_INFO_M.csv", header=True, inferSchema=True, encoding='gbk')
print('=====================================================')
data.show()
# 去除包含缺失值的行
data = data.na.drop(subset=["IS_LOST"])
# 选择特征列和标签列
data = data.select([col for col in data.columns if col not in ['MONTH_ID', 'USER_ID','CONSTELLATION_DESC','MANU_NAME','MODEL_NAME','OS_DESC']])
label_col = "IS_LOST"
feature_cols=['CONSTELLATION_DESC','MANU_NAME','MODEL_NAME','OS_DESC']

data = data.fillna(-1)

# 创建特征向量列
assembler = VectorAssembler(inputCols=[col for col in data.columns if col not in ["IS_LOST"]], outputCol="features")
data = assembler.transform(data)

# 选择特征向量列和标签列
data = data.select("features", label_col)

# 将数据集分为训练集和测试集
(trainingData, testData) = data.randomSplit([0.8, 0.2])

# 创建随机森林分类器
rf = RandomForestClassifier(labelCol=label_col, featuresCol="features")

# 训练模型
model = rf.fit(trainingData)

# 在测试集上进行预测
predictions = model.transform(testData)

# 评估模型
evaluator = MulticlassClassificationEvaluator(labelCol=label_col, predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)

# 打印准确率
print("测试集准确率为: {:.2f}".format(accuracy))
end_time = time.time()
print("代码运行时间: {:.2f}".format(end_time - start_time))
# 关闭SparkSession
spark.stop()

上面是通过python代码构建的Spark中的随机森林模型,Spark支持scala、java、R和python语言,python最简洁,所以直接用pyspark进行程序实现。将上面的代码放到自己的路径下,然后通过spark-submit命令提交.py文件运行即可:

./spark-submit     
	--master yarn     
	--deploy-mode client    
	--num-executors 4    
	/data/rf/spark_m.py

提交:
在这里插入图片描述

拓展:Spark中还支持提交Python环境,而不需要每个spark分布式集群节点都安装适配的python环境,spark-submit命令可以支持将python解释器连同整个配置好了的环境都提交到集群上面然后下发给其他节点,命令如下:

./spark-submit \
    --master yarn \
    --deploy-mode client\
    --num-executors 4\
    --queue default \
    --verbose \
    --conf spark.pyspark.driver.python=/anaconda/bin/python \
    --conf spark.pyspark.python=/anaconda/bin/python \
    /test.py

其中spark.pyspark.python和spark.pyspark.driver.python两个参数就是配置提交机器的python环境的路径,还可以通过将python环境打包放到HDFS路径下,Spark直接读取HDFS中的python环境包。

模型运行结果

将数据集按照2-8分为测试集和训练集,在测试集上的预测准确率为97%,运行时间80s。
在这里插入图片描述
同时登录集群查看提交的Spark任务运行情况,访问http://localhost:8088/cluster查看如下:
在这里插入图片描述
可以看到,RandomForestExample任务就是我们创建的任务,运行完成了,成功!

写在最后

在大规模数据的情况下如果需要用机器学习算法,Spark是一个很好的选择,可以大大提升任务的运行速度,工业环境中效率往往是最需要的,Spark可以解决我们的分布式算法部署需求。

本人才疏学浅,如果有不对的地方请指证!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/412438.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

目标检测-Transformer-ViT和DETR

文章目录 前言一、ViT应用和结论结构及创新点 二、DETR应用和结论结构及创新点 总结 前言 随着Transformer爆火以来,NLP领域迎来了大模型时代,成为AI目前最先进和火爆的领域,介于Transformer的先进性,基于Transformer架构的CV模型…

240Hz高刷电竞显示器 - HKC VG253KM

🎉🎉🎉 各位电竞爱好者们,今天给大家带来一款神秘武器,一款能够让你在游戏中大展拳脚的高刷电竞显示器 - HKC VG253KM!🔥🔥🔥 这款显示器,哎呀,真…

【Vue3】插槽使用和animate使用

插槽使用 插槽slot匿名插槽具名插槽插槽作用域简写 动态插槽transition动画组件自定义过渡class类名如何使用animate动画库组件动画生命周期appear transition- group过渡列表 插槽slot 插槽就是子组件中提供给父组件使用的一个占位符父组件可以在这个占位符智能填充任何模板代…

深度学习 精选笔记(3)线性神经网络-线性回归

学习参考: 动手学深度学习2.0Deep-Learning-with-TensorFlow-bookpytorchlightning ①如有冒犯、请联系侵删。 ②已写完的笔记文章会不定时一直修订修改(删、改、增),以达到集多方教程的精华于一文的目的。 ③非常推荐上面(学习参考&#x…

spring boot 集成科大讯飞星火认知大模型

首先到官网https://console.xfyun.cn/services/aidoc申请key 一、安装依赖 <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0" xmlns:xsi"http://www.w3.org/2001/XMLSchema-instance&…

记录 | docker权限原因导致service ssh start失败

【报错】 容器内启 ssh server 报错 有两个错&#xff1a; &#xff08;1&#xff09;/etc/ssh/sshd_host_rsa_key 权限太高&#xff1b; &#xff08;2&#xff09;/run/sshd用户组不为 root 解决方法&#xff1a; 方法一&#xff1a; 各自容器内对/etc/ssh/sshd_host_r…

41.仿简道云公式函数实战-数学函数-SUMIF

1. SUMIF函数 SUMIF 函数可用于计算子表单中满足某一条件的数字相加并返回和。 2. 函数用法 SUMIF(range, criteria, [sum_range]) 其中各参数的含义及使用方法如下&#xff1a; range&#xff1a;必需&#xff1b;根据 criteria 的条件规则进行检测的判断字段。支持的字段…

Spring篇----第九篇

系列文章目录 文章目录 系列文章目录前言一、@Qualifier 注解有什么用?二、@RequestMapping 注解有什么用?三、spring DAO 有什么用?四、列举 Spring DAO 抛出的异常。前言 前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到…

精益思维引领AI创新浪潮:从理念到实践的蜕变!

在人工智能&#xff08;AI&#xff09;飞速发展的今天&#xff0c;精益思维作为一种追求卓越、持续改进的管理哲学&#xff0c;正逐渐成为推动AI创新的重要动力。本文&#xff0c;天行健咨询将探讨精益思维如何与AI创新相结合&#xff0c;以及这种结合如何推动科技进步和社会发…

JetBrains系列工具,配置PlantUML绘图

PlantUML是一个很强大的绘图工具&#xff0c;各种图都可以绘制&#xff0c;具体的可以去官网看看&#xff0c;或者百度。 PlantUML简述 https://plantuml.com/zh/ PlantUML语言参考指引 https://plantuml.com/zh/guide PlantUML语言是依赖Graphviz进行解析的。Graphviz是开源…

每日一题 2867统计树中的合法路径

2867. 统计树中的合法路径数目 题目描述&#xff1a; 给你一棵 n 个节点的无向树&#xff0c;节点编号为 1 到 n 。给你一个整数 n 和一个长度为 n - 1 的二维整数数组 edges &#xff0c;其中 edges[i] [ui, vi] 表示节点 ui 和 vi 在树中有一条边。 请你返回树中的 合法路…

【Linux深入剖析】进程优先级 | 命令行参数 | 环境变量

&#x1f4d9; 作者简介 &#xff1a;RO-BERRY &#x1f4d7; 学习方向&#xff1a;致力于C、C、数据结构、TCP/IP、数据库等等一系列知识 &#x1f4d2; 日后方向 : 偏向于CPP开发以及大数据方向&#xff0c;欢迎各位关注&#xff0c;谢谢各位的支持 目录 1.进程优先级2.Linux…

hot100刷题记录-哈希

一、两数之和 题目&#xff1a;https://leetcode.cn/problems/two-sum/description/?envTypestudy-plan-v2&envIdtop-100-liked 方法1&#xff1a;枚举 class Solution:def twoSum(self, nums: List[int], target: int) -> List[int]:for id, num in enumerate(nums)…

jmeter 按线程数阶梯式压测数据库

当前版本&#xff1a; jmeter 5.6.3mysql 5.7.39 简介 JMeter 通过 bzm - Concurrency Thread Group 来实现阶梯式压测&#xff0c;它并不是JMeter的官方插件&#xff0c;而是一种由Blazemeter提供的高级线程组插件。可以在不同的时间内并发执行不同数量的线程&#xff0c;模拟…

相册图片怎么压缩?3种方法教你压缩图片

相册图片怎么压缩&#xff1f;相册图片压缩在日常生活中扮演着至关重要的角色。它不仅能够帮助我们节省手机或电脑的存储空间&#xff0c;避免设备因存储空间不足而运行缓慢&#xff0c;还能显著减少图片在上传、下载或分享时的时间。此外&#xff0c;压缩图片还能在一定程度上…

[算法沉淀记录] 排序算法 —— 选择排序

排序算法 —— 选择排序 基本概念 选择排序是一种简单的排序算法&#xff0c;它的工作原理是每次从待排序的列表中选择最小&#xff08;或最大&#xff09;的元素&#xff0c;将其与列表中的第一个位置交换&#xff0c;然后继续对剩余的元素进行排序&#xff0c;直到整个列表…

【Java程序员面试专栏 算法思维】四 高频面试算法题:回溯算法

一轮的算法训练完成后,对相关的题目有了一个初步理解了,接下来进行专题训练,以下这些题目就是汇总的高频题目,本篇主要聊聊回溯算法,主要就是排列组合问题,所以放到一篇Blog中集中练习 题目关键字解题思路时间空间岛屿数量网格搜索分别向上下左右四个方向探索,遇到海洋…

R绘图 | 单列数据的分布图,对A变量分bin求B变量的平均值

问题1&#xff1a;单个向量的 density 分布图&#xff1f; (1) 模拟数据 set.seed(202402) datdiamonds[sample(nrow(diamonds), 1000),]> head(dat) # A tibble: 6 10carat cut color clarity depth table price x y z<dbl> <ord> &l…

数据可视化引领智慧仓储新时代

随着科技的飞速发展&#xff0c;数据可视化已然成为智慧仓储领域的璀璨明珠&#xff0c;其强大的功能和多面的作用让智慧仓储焕发出勃勃生机。让我们一同探索&#xff0c;数据可视化究竟在智慧仓储中起到了怎样的作用。下面我就以可视化从业者的角度来简单谈谈这个话题。 在这…