Spark MLlib

目录

一、Spark MLlib简介

(一)什么是机器学习

(二)基于大数据的机器学习

(三)Spark机器学习库MLlib

二、机器学习流水线

(一)机器学习流水线概念

(二)流水线工作过程

(三)构建一个机器学习流水线

三、特征提取和转换

(一)特征提取:TF-IDF

(二)特征转换:标签和索引的转化

四、分类与回归

(一)逻辑斯蒂回归分类器

(二)决策树分类器


一、Spark MLlib简介

(一)什么是机器学习

        机器学习可以看做是一门人工智能的科学,该领域的主要研究对象是人工智能。机器学习利用数据或以往的经验,以此优化计算机程序的性能标准。

        机器学习强调三个关键词:算法、经验、性能,其处理过程如上图所示。在数据的基础上,通过算法构建出模型并对模型进行评估。评估的性能如果达到要求,就用该模型来测试其他的数据;如果达不到要求,就要调整算法来重新建立模型,再次进行评估。如此循环往复,最终获得满意的经验来处理其他的数据。 

(二)基于大数据的机器学习

        传统的机器学习算法,由于技术和单机存储的限制,只能在少量数据上使用,因此,传统的统计、机器学习算法依赖于数据抽样。但是在实际应用中,往往很难做到样本随机,导致学习的模型不是很准确,测试数据的效果也不太好。随着HDFS等分布式文件系统的出现,我们可以对海量数据进行存储和管理,并利用MapReduce框架在全量数据上进行机器学习,这在一定程度上解决了统计随机性的问题,提高了机器学习的精度。但是,MapReduce自身存在缺陷,延迟高,磁盘开销大,无法高效支持迭代计算,这使MapReduce无法很好地实现分布式机器学习算法。这是因为在通常情况下,机器学习算法参数学习的过程都是迭代计算,本次计算的结果要作为下- 次迭代的输入。在这个过程中,MapReduce只能把中间结果存储到磁盘中,然后在下一次计算的时候重新从磁盘读取数据;对于迭代频发的算法,这是制约其性能的瓶颈。相比而言,Spark 立足于内存计算,天然地适用于迭代式计算,能很好地与机器学习算法相匹配。这也是近年来Spark平台流行的重要原因之一,业界的很多业务纷纷从Hadoop平台转向Spark平台。
        基于大数据的机器学习,需要处理全量数据并进行大量的迭代计算,这就要求机器学习平台具备强大的处理能力和分布式计算能力。然而,对于普通开发者来说,实现一个分布式机器学习算法,仍然是一件极具挑战性的工作。为此,Spark提供了一个基于海量数据的机器学习库,它提供了常用机器学习算法的分布式实现,对于开发者而言,只需要具有Spark编程基础,并且了解机器学习算法的基本原理和方法中相关参数的含义,就可以轻松地通过调用相应的API,来实现基于海量数据的机器学习过程。同时,spark-shell也提供即席(Ad Hoc)查询的功能,算法工程师可以边编写代码、边运行、边看结果。Spark提供的各种高效的工具,使机器学习过程更加直观和便捷,比如,可以通过sample函数非常方便地进行抽样。 Spark发展到今天,已经拥有了实时批计算、批处理、算法库、SQL流计算等模块,成了一个全平台系统,把机器学习作为关键模块加入Spark中也是大势所趋。

(三)Spark机器学习库MLlib

        需要注意的是,MLlib中只包含能够在集群上运行良好的并行算法,这一点很重要 有些经典的机器学习算法没有包含在其中,就是因为它们不能并行执行 相反地,一些较新的研究得出的算法因为适用于集群,也被包含在MLlib中,例如分布式随机森林算法、最小交替二乘算法。这样的选择使得MLlib中的每一个算法都适用于大规模数据集 如果是小规模数据集上训练各机器学习模型,最好还是在各个节点上使用单节点的机器学习算法库(比如Weka)

        MLlib是Spark的机器学习(Machine Learning)库,旨在简化机器学习的工程实践工作 MLlib由一些通用的学习算法和工具组成,包括分类、回归、聚类、协同过滤、降维等,同时还包括底层的优化原语和高层的流水线(Pipeline)API,具体如下:

(1)算法工具:常用的学习算法,如分类、回归、聚类和协同过滤;
(2)特征化工具:特征提取、转化、降维和选择工具;
(3)流水线(Pipeline):用于构建、评估和调整机器学习工作流的工具;
(4)持久性:保存和加载算法、模型和管道;
(5)实用工具:线性代数、统计、数据处理等工具。

        Spark在机器学习方面的发展非常快,已经支持了主流的统计和机器学习算法。纵观所有基于分布式架构的升源机器学习库,MLlib以计算效率高而著称。MLlib目前支持常见的机器学习算法,包括分类、回归、聚类和协同过滤等。如下表列出了目前MLlib支持的主要的机器学习算法。

        Spark 机器学习库从1.2 版本以后被分为两个包:

(1)spark.mllib 包含基于RDD的原始算法API。Spark MLlib 历史比较长,在1.0 以前的版本即已经包含了,提供的算法实现都是基于原始的RDD。
(2)spark.ml 则提供了基于DataFrames 高层次的API,可以用来构建机器学习工作流(PipeLine)。ML Pipeline 弥补了原始 MLlib 库的不足,向用户提供了一个基于 DataFrame 的机器学习工作流式 API 套件。

        MLlib目前支持4种常见的机器学习问题:分类、回归、聚类和协同过滤。

        Spark MLlib架构由底层基础、算法库和应用程序三部分构成。基层基础包括Spark运行库、进行线性代数相关技术的矩阵库和向量库。算法库包括Spark Mllib实现的具体机器学习算法,以及为这些算法提供的各类评估方法。应用程序包括测试数据的生成以及外部数据的加载等。

二、机器学习流水线

(一)机器学习流水线概念

在介绍流水线之前,先来了解几个重要概念:

DataFrame:使用Spark SQL中的DataFrame作为数据集,它可以容纳各种数据类型。较之RDD,DataFrame包含了schema 信息,更类似传统数据库中的二维表格。它被ML Pipeline用来存储源数据。例如,DataFrame中的列可以是存储的文本、特征向量、真实标签和预测的标签等。

Transformer:翻译成转换器,是一种可以将一个DataFrame转换为另一个DataFrame的算法。比如一个模型就是一个Transformer。它可以把一个不包含预测标签的测试数据集DataFrame打上标签,转化成另一个包含预测标签的DataFrame。技术上,Transformer实现了一个方法transform(),它通过附加一个或多个列将一个DataFrame转换为另一个DataFrame。

Estimator:翻译成估计器或评估器,它是学习算法或在训练数据上的训练方法的概念抽象。在 Pipeline 里通常是被用来操作DataFrame数据并生成一个Transformer。从技术上讲,Estimator实现了一个方法fit(),它接受一个DataFrame并产生一个转换器。比如,一个随机森林算法就是一个 Estimator,它可以调用fit(),通过训练特征数据而得到一个随机森林模型。

Parameter:Parameter 被用来设置Transformer或者Estimator的参数。现在,所有转换器和估计器可共享用于指定参数的公共API。ParamMap是一组(参数,值)对。

PipeLine:翻译为流水线或者管道。流水线将多个工作流阶段(转换器和估计器)连接在一起,形成机器学习的工作流,并获得结果输出。

(二)流水线工作过程

        要构建一个Pipeline流水线,首先需要定义Pipeline中的各个流水线阶段PipelineStage(包括转换器和评估器),比如指标提取和转换模型训练等。有了这些处理特定问题的转换器和评估器,就可以按照具体的处理逻辑有序地组织PipelineStages并创建一个Pipeline。

>>> pipeline = Pipeline(stages=[stage1,stage2,stage3])

        然后就可以把训练数据集作为输入参数,调用 Pipeline 实例的fit方法来开始以流的方式来处理源训练数据。这个调用会返回一个PipelineModel类实例,进而被用来预测测试数据的标签。

        流水线的各个阶段按顺序运行,输入的DataFrame在它通过每个阶段时被转换。

        值得注意的是,流水线本身也可以看做是一个估计器。在流水线的fit()方法运行之后,它产生一个PipelineModel,它是一个Transformer。 这个管道模型将在测试数据的时候使用。 下图说明了这种用法。

(三)构建一个机器学习流水线

        以逻辑斯蒂回归为例,构建一个典型的机器学习过程,来具体介绍一下流水线是如何应用的。

任务描述

        查找出所有包含"spark"的句子,即将包含"spark"的句子的标签设为1,没有"spark"的句子的标签设为0。

        需要使用SparkSession对象。Spark2.0以上版本的pyspark在启动时会自动创建一个名为spark的SparkSession对象,当需要手工创建时,SparkSession可以由其伴生对象的builder()方法创建出来,如下代码段所示: 

from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("Word Count").getOrCreate()

        pyspark.ml依赖numpy包,执行如下命令安装:

pip3 install numpy

(1)引入要包含的包并构建训练数据集。

from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer
 
# Prepare training documents from a list of (id, text, label) tuples.
training = spark.createDataFrame([
    (0, "a b c d e spark", 1.0),
    (1, "b d", 0.0),
    (2, "spark f g h", 1.0),
    (3, "hadoop mapreduce", 0.0)
], ["id", "text", "label"])

(2)定义 Pipeline 中的各个流水线阶段PipelineStage,包括转换器和评估器,具体地,包含tokenizer, hashingTF和lr。

tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.001)

(3)按照具体的处理逻辑有序地组织PipelineStages,并创建一个Pipeline。

pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

        现在构建的Pipeline本质上是一个Estimator,在它的fit()方法运行之后,它将产生一个PipelineModel,它是一个Transformer。

model = pipeline.fit(training)

        可以看到,model的类型是一个PipelineModel,这个流水线模型将在测试数据的时候使用。

(4)构建测试数据。

test = spark.createDataFrame([
    (4, "spark i j k"),
    (5, "l m n"),
    (6, "spark hadoop spark"),
    (7, "apache hadoop")
], ["id", "text"])

(5)调用之前训练好的PipelineModel的transform()方法,让测试数据按顺序通过拟合的流水线,生成预测结果。

prediction = model.transform(test)
selected = prediction.select("id", "text", "probability", "prediction")
for row in selected.collect():
    rid, text, prob, prediction = row
    print("(%d, %s) --> prob=%s, prediction=%f" % (rid, text, str(prob), prediction))
 
(4, spark i j k) --> prob=[0.155543713844,0.844456286156], prediction=1.000000
(5, l m n) --> prob=[0.830707735211,0.169292264789], prediction=0.000000
(6, spark hadoop spark) --> prob=[0.0696218406195,0.93037815938], prediction=1.000000
(7, apache hadoop) --> prob=[0.981518350351,0.018481649649], prediction=0.000000

三、特征提取和转换

(一)特征提取:TF-IDF

        “词频-逆向文件频率”(TF-IDF)是一种在文本挖掘中广泛使用的特征向量化方法,它可以体现一个文档中词语在语料库中的重要程度。

        词语由t表示,文档由d表示,语料库由D表示。词频TF(t,d)是词语t在文档d中出现的次数。文件频率DF(t,D)是包含词语的文档的个数。

        TF-IDF就是在数值化文档信息,衡量词语能提供多少信息以区分文档。其定义如下:

        IDF(t,D)=log_{2}\tfrac{|D|+1}{DF(t,D)+1}

        TF-IDF 度量值表示如下:

        TFIDF(t,d,D)=TF(t,d)\cdot IDF(t,D)

        在Spark ML库中,TF-IDF被分成两部分: TF (+hashing)和IDF

        TF: HashingTF 是一个Transformer,在文本处理中,接收词条的集合然后把这些集合转化成固定长度的特征向量。这个算法在哈希的同时会统计各个词条的词频。

        IDF: IDF是一个Estimator,在一个数据集上应用它的fit()方法,产生一个IDFModel。 该IDFModel 接收特征向量(由HashingTF产生),然后计算每一个词在文档中出现的频次。IDF会减少那些在语料库中出现频率较高的词的权重。

        过程描述: 在下面的代码段中,我们以一组句子开始。首先使用分解器Tokenizer把句子划分为单个词语。对每一个句子(词袋),使用HashingTF将句子转换为特征向量。最后使用IDF重新调整特征向量(这种转换通常可以提高使用文本特征的性能)。

(1)导入TF-IDF所需要的包

>>> from pyspark.ml.feature import HashingTF,IDF,Tokenizer

(2)创建一个简单的DataFrame,每一个句子代表一个文档

>>> sentenceData = spark.createDataFrame([(0, "I heard about Spark and I love Spark"),(0, "I wish Java could use case classes"),(1, "Logistic regression models are neat")]).toDF("label", "sentence")

(3)得到文档集合后,即可用tokenizer对句子进行分词

>>> tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
>>> wordsData = tokenizer.transform(sentenceData)
>>> wordsData.show()
+-----+--------------------+--------------------+                               
|label|            sentence|               words|
+-----+--------------------+--------------------+
|    0|I heard about Spa...|[i, heard, about,...|
|    0|I wish Java could...|[i, wish, java, c...|
|    1|Logistic regressi...|[logistic, regres...|
+-----+--------------------+--------------------+

(4)得到分词后的文档序列后,即可使用HashingTF的transform()方法把句子哈希成特征向量,这里设置哈希表的桶数为2000

>>> hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=2000)
>>> featurizedData = hashingTF.transform(wordsData)
>>> featurizedData.select("words","rawFeatures").show(truncate=False)
+---------------------------------------------+---------------------------------------------------------------------+
|words                                        |rawFeatures                                                          |
+---------------------------------------------+---------------------------------------------------------------------+
|[i, heard, about, spark, and, i, love, spark]|(2000,[240,333,1105,1329,1357,1777],[1.0,1.0,2.0,2.0,1.0,1.0])       |
|[i, wish, java, could, use, case, classes]   |(2000,[213,342,489,495,1329,1809,1967],[1.0,1.0,1.0,1.0,1.0,1.0,1.0])|
|[logistic, regression, models, are, neat]    |(2000,[286,695,1138,1193,1604],[1.0,1.0,1.0,1.0,1.0])                |
+---------------------------------------------+---------------------------------------------------------------------+

(5)调用IDF方法来重新构造特征向量的规模,生成的变量idf是一个评估器,在特征向量上应用它的fit()方法,会产生一个IDFModel(名称为idfModel)。

>>> idf = IDF(inputCol="rawFeatures", outputCol="features")
>>> idfModel = idf.fit(featurizedData)

(6)调用IDFModel的transform()方法,可以得到每一个单词对应的TF-IDF度量值。

>>> rescaledData = idfModel.transform(featurizedData)
>>> rescaledData.select("features", "label").show(truncate=False)
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+
|features                                                                                                                                                                       |label|
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+
|(2000,[240,333,1105,1329,1357,1777],[0.6931471805599453,0.6931471805599453,1.3862943611198906,0.5753641449035617,0.6931471805599453,0.6931471805599453])                       |0    |
|(2000,[213,342,489,495,1329,1809,1967],[0.6931471805599453,0.6931471805599453,0.6931471805599453,0.6931471805599453,0.28768207245178085,0.6931471805599453,0.6931471805599453])|0    |
|(2000,[286,695,1138,1193,1604],[0.6931471805599453,0.6931471805599453,0.6931471805599453,0.6931471805599453,0.6931471805599453])                                               |1    |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+

(二)特征转换:标签和索引的转化

        在机器学习处理过程中,为了方便相关算法的实现,经常需要把标签数据(一般是字符串)转化成整数索引,或是在计算结束后将整数索引还原为相应的标签。

        Spark ML包中提供了几个相关的转换器,例如:StringIndexer、IndexToString、OneHotEncoder、VectorIndexer,它们提供了十分方便的特征转换功能,这些转换器类都位于org.apache.spark.ml.feature包下。

        值得注意的是,用于特征转换的转换器和其他的机器学习算法一样,也属于ML Pipeline模型的一部分,可以用来构成机器学习流水线,以StringIndexer为例,其存储着进行标签数值化过程的相关超参数,是一个Estimator,对其调用fit(..)方法即可生成相应的模型StringIndexerModel类,很显然,它存储了用于DataFrame进行相关处理的 参数,是一个Transformer(其他转换器也是同一原理)。

1、StringIndexer

        StringIndexer转换器可以把一列类别型的特征(或标签)进行编码,使其数值化,索引的范围从0开始,该过程可以使得相应的特征索引化,使得某些无法接受类别型特征的算法可以使用,并提高诸如决策树等机器学习算法的效率。

        索引构建的顺序为标签的频率,优先编码频率较大的标签,所以出现频率最高的标签为0号。如果输入的是数值型的,会首先把它转化成字符型,然后再对其进行编码。

(1)首先,引入所需要使用的类 。

>>> from pyspark.ml.feature import StringIndexer

(2)其次,构建1个DataFrame,设置StringIndexer的输入列和输出列的名字。

>>> df = spark.createDataFrame([(0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c")],["id", "category"])
>>> indexer = StringIndexer(inputCol="category", outputCol="categoryIndex")

(3)然后,通过fit()方法进行模型训练,用训练出的模型对原数据集进行处理,并通过indexed.show()进行展示。

>>> model = indexer.fit(df)
>>> indexed = model.transform(df)
>>> indexed.show()
+---+--------+-------------+
| id|category|categoryIndex|
+---+--------+-------------+
|  0|       a|          0.0|
|  1|       b|          2.0|
|  2|       c|          1.0|
|  3|       a|          0.0|
|  4|       a|          0.0|
|  5|       c|          1.0|
+---+--------+-------------+

2、IndexToString

        与StringIndexer相对应,IndexToString的作用是把标签索引的一列重新映射回原有的字符型标签。其主要使用场景一般都是和StringIndexer配合,先用StringIndexer将标签转化成标签索引,进行模型训练,然后在预测标签的时候再把标签索引转化成原有的字符标签。

>>> from pyspark.ml.feature import IndexToString, StringIndexer
>>> toString = IndexToString(inputCol="categoryIndex", outputCol="originalCategory")
>>> indexString = toString.transform(indexed)
>>> indexString.select("id", "originalCategory").show()
+---+----------------+
| id|originalCategory|
+---+----------------+
|  0|               a|
|  1|               b|
|  2|               c|
|  3|               a|
|  4|               a|
|  5|               c|
+---+----------------+

3、VectorIndexer

        之前介绍的StringIndexer是针对单个类别型特征进行转换,倘若所有特征都已经被组织在一个向量中,又想对其中某些单个分量进行处理时,Spark ML提供了VectorIndexer类来解决向量数据集中的类别性特征转换。

        通过为其提供maxCategories超参数,它可以自动识别哪些特征是类别型的,并且将原始值转换为类别索引。它基于不同特征值的数量来识别哪些特征需要被类别化,那些取值可能性最多不超过maxCategories的特征需要会被认为是类别型的。

(1)首先引入所需要的类,并构建数据集。

>>> from pyspark.ml.feature import VectorIndexer
>>> from pyspark.ml.linalg import Vector, Vectors
>>> df = spark.createDataFrame([ \
... (Vectors.dense(-1.0, 1.0, 1.0),), \
... (Vectors.dense(-1.0, 3.0, 1.0),), \
... (Vectors.dense(0.0, 5.0, 1.0), )], ["features"])

(2)然后,构建VectorIndexer转换器,设置输入和输出列,并进行模型训练。

>>> indexer = VectorIndexer(inputCol="features", outputCol="indexed", maxCategories=2)
>>> indexerModel = indexer.fit(df)

(3)接下来,通过VectorIndexerModel的categoryMaps成员来获得被转换的特征及其映射,这里可以看到,共有两个特征被转换,分别是0号和2号。

>>> categoricalFeatures = indexerModel.categoryMaps.keys()
>>> print ("Choose"+str(len(categoricalFeatures))+ \
... "categorical features:"+str(categoricalFeatures))
Chose 2 categorical features: [0, 2]

(4)最后,把模型应用于原有的数据,并打印结果。

>>> indexed = indexerModel.transform(df)
>>> indexed.show()
+--------------+-------------+
|      features|      indexed|
+--------------+-------------+
|[-1.0,1.0,1.0]|[1.0,1.0,0.0]|
|[-1.0,3.0,1.0]|[1.0,3.0,0.0]|
| [0.0,5.0,1.0]|[0.0,5.0,0.0]|
+--------------+-------------+

四、分类与回归

(一)逻辑斯蒂回归分类器

        逻辑斯蒂回归(logistic regression)是统计学习中的经典分类方法,属于对数线性模型。logistic回归的因变量可以是二分类的,也可以是多分类的。

任务描述

        以iris数据集(iris)为例进行分析。iris以鸢尾花的特征作为数据来源,数据集包含150个数据集,分为3类,每类50个数据,每个数据包含4个属性,是在数据挖掘、数据分类中非常常用的测试集、训练集。为了便于理解,这里主要用后两个属性(花瓣的长度和宽度)来进行分类。

(1)首先我们先取其中的后两类数据,用二项逻辑斯蒂回归进行二分类分析。导入本地向量Vector和Vectors,导入所需要的类。

>>> from pyspark.ml.linalg import Vector,Vectors
>>> from pyspark.sql import Row,functions
>>> from pyspark.ml.evaluation import MulticlassClassificationEvaluator
>>> from pyspark.ml import Pipeline
>>> from pyspark.ml.feature import IndexToString, StringIndexer, \
... VectorIndexer,HashingTF, Tokenizer
>>> from pyspark.ml.classification import LogisticRegression, \
... LogisticRegressionModel,BinaryLogisticRegressionSummary, LogisticRegression

(2)我们定制一个函数,来返回一个指定的数据,然后读取文本文件,第一个map把每行的数据用“,”隔开,比如在我们的数据集中,每行被分成了5部分,前4部分是鸢尾花的4个特征,最后一部分是鸢尾花的分类;我们这里把特征存储在Vector中,创建一个Iris模式的RDD,然后转化成dataframe;最后调用show()方法来查看一下部分数据。

>>> def f(x):
...     rel = {}
...     rel['features']=Vectors. \
...     dense(float(x[0]),float(x[1]),float(x[2]),float(x[3]))
...     rel['label'] = str(x[4])
...     return rel
>>> data = spark.sparkContext. \
... textFile("file:///usr/local/spark/iris.txt"). \
... map(lambda line: line.split(',')). \
... map(lambda p: Row(**f(p))). \
... toDF()
>>> data.show()
+-----------------+-----------+
|         features|      label|
+-----------------+-----------+
|[5.1,3.5,1.4,0.2]|Iris-setosa|
|[4.9,3.0,1.4,0.2]|Iris-setosa|
|[4.7,3.2,1.3,0.2]|Iris-setosa|
|[4.6,3.1,1.5,0.2]|Iris-setosa|
………
+-----------------+-----------+
only showing top 20 rows

(3)分别获取标签列和特征列,进行索引并进行重命名。

>>> labelIndexer = StringIndexer(). \
... setInputCol("label"). \
... setOutputCol("indexedLabel"). \
... fit(data)
>>> featureIndexer = VectorIndexer(). \
... setInputCol("features"). \
... setOutputCol("indexedFeatures"). \
... fit(data)

(4)设置LogisticRegression算法的参数。这里设置了循环次数为100次,规范化项为0.3等,具体可以设置的参数,可以通过explainParams()来获取,还能看到程序已经设置的参数的结果。

>>> lr = LogisticRegression(). \
... setLabelCol("indexedLabel"). \
... setFeaturesCol("indexedFeatures"). \
... setMaxIter(100). \
... setRegParam(0.3). \
... setElasticNetParam(0.8)
>>> print("LogisticRegression parameters:\n" + lr.explainParams())

(5)设置一个IndexToString的转换器,把预测的类别重新转化成字符型的。构建一个机器学习流水线,设置各个阶段。上一个阶段的输出将是本阶段的输入。

>>> labelConverter = IndexToString(). \
... setInputCol("prediction"). \
... setOutputCol("predictedLabel"). \
... setLabels(labelIndexer.labels)
>>> lrPipeline = Pipeline(). \
... setStages([labelIndexer, featureIndexer, lr, labelConverter])

(6)把数据集随机分成训练集和测试集,其中训练集占70%。Pipeline本质上是一个评估器,当Pipeline调用fit()的时候就产生了一个PipelineModel,它是一个转换器。然后,这个PipelineModel就可以调用transform()来进行预测,生成一个新的DataFrame,即利用训练得到的模型对测试集进行验证。

>>> trainingData, testData = data.randomSplit([0.7, 0.3])
>>> lrPipelineModel = lrPipeline.fit(trainingData)
>>> lrPredictions = lrPipelineModel.transform(testData)

(7)输出预测的结果,其中,select选择要输出的列,collect获取所有行的数据,用foreach把每行打印出来。

>>> preRel = lrPredictions.select( \
... "predictedLabel", \
... "label", \
... "features", \
... "probability"). \
... collect()
>>> for item in preRel:
...     print(str(item['label'])+','+ \
...     str(item['features'])+'-->prob='+ \
...     str(item['probability'])+',predictedLabel'+ \
...     str(item['predictedLabel']))

(8)对训练的模型进行评估。创建一个MulticlassClassificationEvaluator实例,用setter方法把预测分类的列名和真实分类的列名进行设置,然后计算预测准确率。

>>> evaluator = MulticlassClassificationEvaluator(). \
... setLabelCol("indexedLabel"). \
... setPredictionCol("prediction")
>>> lrAccuracy = evaluator.evaluate(lrPredictions)
>>> lrAccuracy
0.7774712643678161 #模型预测的准确率

(9)可以通过model来获取训练得到的逻辑斯蒂模型。lrPipelineModel是一个PipelineModel,因此,可以通过调用它的stages方法来获取模型,具体如下:

>>> lrModel = lrPipelineModel.stages[2]
>>> print ("Coefficients: \n " + str(lrModel.coefficientMatrix)+ \
... "\nIntercept: "+str(lrModel.interceptVector)+ \
... "\n numClasses: "+str(lrModel.numClasses)+ \
... "\n numFeatures: "+str(lrModel.numFeatures))
 
Coefficients: 
 3 X 4 CSRMatrix
(1,3) 0.4332
(2,2) -0.2472
(2,3) -0.1689
Intercept: [-0.11530503231364186,-0.63496556499483,0.750270597308472]
 numClasses: 3
 numFeatures: 4

(二)决策树分类器

        决策树(decision tree)是一种基本的分类与回归方法,这里主要介绍用于分类的决策树。决策树模式呈树形结构,其中每个内部节点表示一个属性上的测试,每个分支代表一个测试输出,每个叶节点代表一种类别。学习时利用训练数据,根据损失函数最小化的原则建立决策树模型;预测时,对新的数据,利用决策树模型进行分类。

        决策树学习通常包括3个步骤:特征选择、决策树的生成和决策树的剪枝

任务描述

        我们以iris数据集(iris)为例进行分析。iris以鸢尾花的特征作为数据来源,数据集包含150个数据集,分为3类,每类50个数据,每个数据包含4个属性,是在数据挖掘、数据分类中非常常用的测试集、训练集。

(1)导入需要的包

>>> from pyspark.ml.classification import DecisionTreeClassificationModel
>>> from pyspark.ml.classification import DecisionTreeClassifier
>>> from pyspark.ml import Pipeline,PipelineModel
>>> from pyspark.ml.evaluation import MulticlassClassificationEvaluator
>>> from pyspark.ml.linalg import Vector,Vectors
>>> from pyspark.sql import Row
>>> from pyspark.ml.feature import IndexToString,StringIndexer,VectorIndexer

(2)读取文本文件,第一个map把每行的数据用“,”隔开,比如在我们的数据集中,每行被分成了5部分,前4部分是鸢尾花的4个特征,最后一部分是鸢尾花的分类;我们这里把特征存储在Vector中,创建一个Iris模式的RDD,然后转化成dataframe。

>>> def f(x):
...     rel = {}
...     rel['features']=Vectors. \
...     dense(float(x[0]),float(x[1]),float(x[2]),float(x[3]))
...     rel['label'] = str(x[4])
...     return rel
>>> data = spark.sparkContext. \
... textFile("file:///usr/local/spark/iris.txt"). \
... map(lambda line: line.split(',')). \
... map(lambda p: Row(**f(p))). \
... toDF()

(3)进一步处理特征和标签,把数据集随机分成训练集和测试集,其中训练集占70%。

>>> labelIndexer = StringIndexer(). \
... setInputCol("label"). \
... setOutputCol("indexedLabel"). \
... fit(data)
>>> featureIndexer = VectorIndexer(). \
... setInputCol("features"). \
... setOutputCol("indexedFeatures"). \
... setMaxCategories(4). \
... fit(data)
>>> labelConverter = IndexToString(). \
... setInputCol("prediction"). \
... setOutputCol("predictedLabel"). \
... setLabels(labelIndexer.labels)
>>> trainingData, testData = data.randomSplit([0.7, 0.3])

(4)创建决策树模型DecisionTreeClassifier,通过setter的方法来设置决策树的参数,也可以用ParamMap来设置。这里仅需要设置特征列(FeaturesCol)和待预测列(LabelCol)。具体可以设置的参数可以通过explainParams()来获取。

>>> dtClassifier = DecisionTreeClassifier(). \
... setLabelCol("indexedLabel"). \
... setFeaturesCol("indexedFeatures")

(5)构建机器学习流水线(Pipeline),在训练数据集上调用fit()进行模型训练,并在测试数据集上调用transform()方法进行预测。

>>> dtPipeline = Pipeline(). \
... setStages([labelIndexer, featureIndexer, dtClassifier, labelConverter])
>>> dtPipelineModel = dtPipeline.fit(trainingData)
>>> dtPredictions = dtPipelineModel.transform(testData)
>>> dtPredictions.select("predictedLabel", "label", "features").show(20)
+---------------+---------------+-----------------+
| predictedLabel|          label|         features|
+---------------+---------------+-----------------+
|    Iris-setosa|    Iris-setosa|[4.4,3.0,1.3,0.2]|
|    Iris-setosa|    Iris-setosa|[4.6,3.4,1.4,0.3]|
|    Iris-setosa|    Iris-setosa|[4.9,3.1,1.5,0.1]|
|    Iris-setosa|    Iris-setosa|[5.0,3.2,1.2,0.2]|
>>> evaluator = MulticlassClassificationEvaluator(). \
... setLabelCol("indexedLabel"). \
... setPredictionCol("prediction")
>>> dtAccuracy = evaluator.evaluate(dtPredictions)
>>> dtAccuracy
0.9726976552103888  #模型的预测准确率

(6)可以通过调用DecisionTreeClassificationModel的toDebugString方法,查看训练的决策树模型结构。

>>> treeModelClassifier = dtPipelineModel.stages[2]
>>> print("Learned classification tree model:\n" + \
... str(treeModelClassifier.toDebugString))
 
Learned classification tree model:
DecisionTreeClassificationModel (uid=DecisionTreeClassifier_5427198bb4c1) of depth 5 with 15 nodes
  If (feature 2 <= 2.45)
   Predict: 2.0
  Else (feature 2 > 2.45)
   If (feature 2 <= 4.75)
    Predict: 0.0
   Else (feature 2 > 4.75)
    If (feature 3 <= 1.75)
     If (feature 2 <= 4.95)
……

更多关于算法的介绍可参考我以下的博客:

【大数据分析与挖掘技术】Mahout推荐算法-CSDN博客icon-default.png?t=N7T8https://blog.csdn.net/Morse_Chen/article/details/135711426?spm=1001.2014.3001.5501【大数据分析与挖掘技术】Mahout聚类算法-CSDN博客icon-default.png?t=N7T8https://blog.csdn.net/Morse_Chen/article/details/135725821?spm=1001.2014.3001.5501【大数据分析与挖掘技术】Mahout分类算法-CSDN博客icon-default.png?t=N7T8https://blog.csdn.net/Morse_Chen/article/details/135762416?spm=1001.2014.3001.5501

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/384901.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【Java程序设计】【C00249】基于Springboot的私人健身与教练预约管理系统(有论文)

基于Springboot的私人健身与教练预约管理系统&#xff08;有论文&#xff09; 项目简介项目获取开发环境项目技术运行截图 项目简介 这是一个基于Springboot的私人健身与教练预约管理系统 本系统分为系统功能模块、管理员功能模块、教练功能模块以及用户功能模块。 系统功能模…

小白速成法:剖析一个Android项目以快速上手

这是一个基于Tasmota的设备、用MQTT协议来通信控制的安卓应用程序。支持ON/OFF命令插座和基本的RGB LED控制。 源码点击此处 只需要关注SmartController-main\app\src的代码 项目解压之后如图 只需要关注“app”文件夹里的东西即可&#xff0c;“gradle”是配置文件&#xf…

【国产MCU】-CH32V307-基本定时器(BCTM)

基本定时器(BCTM) 文章目录 基本定时器(BCTM)1、基本定时器(BCTM)介绍2、基本定时器驱动API介绍3、基本定时器使用实例CH32V307的基本定时器模块包含一个16 位可自动重装的定时器(TIM6和TIM7),用于计数和在更新新事件产生中断或DMA 请求。 本文将详细介绍如何使用CH32…

服务治理中间件-Eureka

目录 简介 搭建Eureka服务 注册服务到Eureka 简介 Eureka是Spring团队开发的服务治理中间件&#xff0c;可以轻松在项目中&#xff0c;实现服务的注册与发现&#xff0c;相比于阿里巴巴的Nacos、Apache基金会的Zookeeper&#xff0c;更加契合Spring项目&#xff0c;缺点就是…

unity 点击事件

目录 点击按钮&#xff0c;显示图片功能教程 第1步添加ui button&#xff0c;添加ui RawImage 第2步 添加脚本&#xff1a; 第3步&#xff0c;把脚本拖拽到button&#xff0c;点击button&#xff0c;设置脚本的变量&#xff0c; GameObject添加 Component组件 点击按钮&am…

在程序中使用日志功能

在应用中&#xff0c;需要记录程序运行过程中的一些关键信息以及异常输出等。这些信息用来排查程序故障或者其他用途。 日志模块可以自己实现或者是借用第三方库&#xff0c;之前写过一个类似的使用Qt的打印重定向将打印输出到文件&#xff1a;Qt将打印信息输出到文件_qt log输…

随机过程及应用学习笔记(二)随机过程的基本概念

随机过程论就是研究随时间变化的动态系统中随机现象的统计规律的一门数学学科。 目录 前言 一、随机过程的定义及分类 1、定义 2、分类 二、随机过程的分布及其数字特征 1、分布函数 2、数字特征 均值函数和方差函数 协方差函数和相关函数 3、互协方差函数与互相关函…

java微服务面试篇

目录 目录 SpringCloud Spring Cloud 的5大组件 服务注册 Eureka Nacos Eureka和Nacos的对比 负载均衡 负载均衡流程 Ribbon负载均衡策略 自定义负载均衡策略 熔断、降级 服务雪崩 服务降级 服务熔断 服务监控 为什么需要监控 服务监控的组件 skywalking 业务…

【MySQL进阶之路】详解执行计划 type 列

欢迎关注公众号&#xff08;通过文章导读关注&#xff1a;【11来了】&#xff09;&#xff0c;及时收到 AI 前沿项目工具及新技术的推送&#xff01; 在我后台回复 「资料」 可领取编程高频电子书&#xff01; 在我后台回复「面试」可领取硬核面试笔记&#xff01; 文章导读地址…

BUUCTF-Real-[Jupyter]notebook-rce

1、简介 Jupyter Notebook&#xff08;此前被称为 IPython notebook&#xff09;是一个交互式笔记本&#xff0c;支持运行 40 多种编程语言。 如果管理员未为Jupyter Notebook配置密码&#xff0c;将导致未授权访问漏洞&#xff0c;游客可在其中创建一个console并执行任意Pytho…

python - 模块使用详解

前言 Python有非常强大的第三方库&#xff0c;也有非常多的内置模块帮助开发人员实现某些功能&#xff0c;无需开发人员自己造轮子。本文介绍Python的模块。 什么是模块 模块简单来说就是一系列功能的集合体&#xff0c;如果将程序的开发比喻成拼图&#xff0c;模块就是各种…

C++STL速查手册

本文参考cppreference&#xff0c;整理了一些常用的STL容器及其内置函数与算法&#xff0c;方便查用。 CSTL速查手册 什么是STL&#xff1f;STL模板 什么是STL&#xff1f; 在C中&#xff0c;STL 是指标准模板库&#xff08;Standard Template Library&#xff09;。STL 是 C 标…

CSS之盒模型

盒模型概念 浏览器盒模型&#xff08;Box Model&#xff09;是CSS中的基本概念&#xff0c;它描述了元素在布局过程中如何占据空间。盒模型由内容&#xff08;content&#xff09;、内边距&#xff08;padding&#xff09;、边框&#xff08;border&#xff09;、和外边距&…

mysql Day05

sql性能分析 sql执行频率 show global status like Com_______ 慢查询日志 执行时间超过10秒的sql语句 profile详情 show profiles帮助我们了解时间都耗费到哪里了 #查看每一条sql的耗时情况 show profiles#查看指定query_id的sql语句各个阶段的耗时情况 show profile fo…

【项目日记(九)】项目整体测试,优化以及缺陷分析

&#x1f493;博主CSDN主页:杭电码农-NEO&#x1f493;   ⏩专栏分类:项目日记-高并发内存池⏪   &#x1f69a;代码仓库:NEO的学习日记&#x1f69a;   &#x1f339;关注我&#x1faf5;带你做项目   &#x1f51d;&#x1f51d; 开发环境: Visual Studio 2022 项目日…

Spring Cloud Gateway 网关路由

一、路由断言 路由断言就是判断路由转发的规则 二、路由过滤器 1. 路由过滤器可以实现对网关请求的处理&#xff0c;可以使用 Gateway 提供的&#xff0c;也可以自定义过滤器 2. 路由过滤器 GatewayFilter&#xff08;默认不生效&#xff0c;只有配置到路由后才会生效&#x…

无人机飞行原理,多旋翼无人机飞行原理详解

多旋翼无人机升空飞行的首要条件是动力&#xff0c;有了动力才能驱动旋粪旋转&#xff0c;才能产生克服重力所必需的升力。使旋翼产生升力&#xff0c;进而推动多旋翼无人机升空飞行的一套设备装置称为动力装置&#xff0c;包括多旋翼无人机的发动机以及保证发动机正常工作所必…

LibreOffice Calc 取消首字母自动大写 (Capitalize first letter of every sentence)

LibreOffice Calc 取消首字母自动大写 [Capitalize first letter of every sentence] 1. Tools -> AutoCorrect Options2. AutoCorrect -> Options -> Capitalize first letter of every sentenceReferences 1. Tools -> AutoCorrect Options 2. AutoCorrect ->…

论文介绍 One-step Diffusion 只需单步扩散生成!

论文介绍 One-step Diffusion with Distribution Matching Distillation 关注微信公众号: DeepGo 源码地址&#xff1a; https://tianweiy.github.io/dmd/ 论文地址&#xff1a; https://arxiv.org/abs/2311.18828 这篇论文介绍了一种新的图像生成方法&#xff0c;名为分布匹配…

C++三剑客之std::optional(一) : 使用详解

相关文章系列 C三剑客之std::optional(一) : 使用详解 C三剑客之std::any(一) : 使用 C之std::tuple(一) : 使用精讲(全) C三剑客之std::variant(一) : 使用 C三剑客之std::variant(二)&#xff1a;深入剖析 目录 1.概述 2.构建方式 2.1.默认构造 2.2.移动构造 2.3.拷贝构…