# 使用pyspark进行机器学习（回归问题）

2018-02-27 11:47:07来源:http://blog.csdn.net/littlely_ll/article/details/78161574作者:littlely_ll人点击

# DecisionTreeRegressor

``class pyspark.ml.regression.DecisionTreeRegressor(self, featuresCol="features", labelCol="label", predictionCol="prediction", maxDepth=5, maxBins=32, minInstancesPerNode=1, minInfoGain=0.0, maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10, impurity="variance", seed=None, varianceCol=None)``

## 参数解释

fit(dataset, params=None)方法Impurity: 信息增益计算准则，支持选项：variancemaxBins: 连续特征离散化的最大分箱个数， >=2并且>=任何分类特征的分类个数maxDepth: 最大树深minInfoGain: 分割节点所需最小信息增益minInstancesPerNode: 分割后每个子节点最小实例个数Setter方法和getter方法

## 拟合后模型拥有的方法或属性

depth: 返回决策树的深度featureImportances: 每个特征重要性估计（见DecisionTreeClassifer)numFeatures: 返回训练模型中的特征个数，如果不知，返回-1numNodes: 返回决策树中的结点个数transform(dataset, params=None)方法

## 代码

``from pyspark.ml.linalg import Vectorsdf = spark.createDataFrame([(1.0, Vectors.dense(1.0)),(0.0, Vectors.sparse(1, [], []))], ["label", "features"])dt = DecisionTreeRegressor(maxDepth=2, varianceCol="variance")model = dt.fit(df)model.depth#1model.numNodes#3model.featureImportances#SparseVector(1, {0: 1.0})model.numFeatures#1test0 = spark.createDataFrame([(Vectors.dense(-1.0),)], ["features"])model.transform(test0).head().prediction#0.0test1 = spark.createDataFrame([(Vectors.sparse(1, [0], [1.0]),)], ["features"])model.transform(test1).head().prediction#1.0dtr_path = temp_path + "/dtr"dt.save(dtr_path)dt2 = DecisionTreeRegressor.load(dtr_path)dt2.getMaxDepth()#2model_path = temp_path + "/dtr_model"model.save(model_path)model2 = DecisionTreeRegressionModel.load(model_path)model.numNodes == model2.numNodes#Truemodel.depth == model2.depth#Truemodel.transform(test1).head().variance#0.0``

# GBTRegressor

``class pyspark.ml.regression.GBTRegressor(self, featuresCol="features", labelCol="label", predictionCol="prediction", maxDepth=5, maxBins=32, minInstancesPerNode=1, minInfoGain=0.0, maxMemoryInMB=256, cacheNodeIds=False, subsamplingRate=1.0, checkpointInterval=10, lossType="squared", maxIter=20, stepSize=0.1, seed=None, impurity="variance")``

## 参数解释

fit(dataset,params=None)方法Impurity: 同DecisionTreeRegressorlossType: GBT要最小化的损失函数，可选：squared, absolutemaxBins: 同DecisionTreeRegressormaxDepth: 同DecisionTreeRegressormaxIter: 最大迭代次数minIfoGain: 同DecisionTreeRegressorminInstancesPerNode: 同DecisionTreeRegressorstepSize: 每次优化迭代的步长subsamplingRate:用于训练每颗决策树的训练数据集的比例，区间[0,1]Setter和getter方法

## 拟合后模型拥有的方法或属性

featureImportances: 同DecisionTreeRegressorgetNumTrees: 同DecisionTreeRegressornumFeatures: 同DecisionTreeRegressortotalNumNoes: 集成树中所有的结点transform(dataset,params=None)方法treeWeights: 每棵树的权重trees: 返回树

## 代码

``from numpy import allclosefrom pyspark.ml.linalg import Vectorsdf = spark.createDataFrame([(1.0, Vectors.dense(1.0)), (0.0, Vectors.sparse(1, [], []))], ["label", "features"])gbt = GBTRegressor(maxIter=5, maxDepth=2, seed=42)print(gbt.getImpurity())#variancemodel = gbt.fit(df)model.featureImportances#SparseVector(1, {0: 1.0})model.numFeatures#1allclose(model.treeWeights, [1.0, 0.1, 0.1, 0.1, 0.1])#Truetest0 = spark.createDataFrame([(Vectors.dense(-1.0),)], ["features"])model.transform(test0).head().prediction#0.0test1 = spark.createDataFrame([(Vectors.sparse(1, [0], [1.0]),)],["features"])model.transform(test1).head().prediction#1.0gbtr_path = temp_path + "gbtr"gbt.save(gbtr_path)gbt2 = GBTRegressor.load(gbtr_path)gbt2.getMaxDepth()#2model_path = temp_path + "gbtr_model"model.save(model_path)model2 = GBTRegressionModel.load(model_path)model.featureImportances == model2.featureImportances#Truemodel.treeWeights == model2.treeWeights#Truemodel.trees#[DecisionTreeRegressionModel (uid=...) of depth..., DecisionTreeRegressionModel...]``

# GeneralizedLinearRegression

``class pyspark.ml.regression.GeneralizedLinearRegression(self, labelCol="label", featuresCol="features", predictionCol="prediction", family="gaussian", link=None, fitIntercept=True, maxIter=25, tol=1e-6, regParam=0.0, weightCol=None, solver="irls", linkPredictionCol=None)``

## 参数解释

fit(dataset,params=None)方法family: 误差分布的描述名称，可选：gaussian（默认), binomial, poisson, gammafitIntercept: 是否拟合截距项link: 提供线性预测变量和分布函数均值的关系名称，可选：identity, log, inverse, logit, probit, cloglog, sqrt.maxIter: 最大迭代次数solver: 优化的算法，如果没设置或空则使用”auto”tol: 迭代的收敛偏差Setter和getter方法

## 拟合后模型拥有的方法或属性(experimental)

coefficients: 模型系数Evaluate(dataset): 在检测集上评估模型hasSummary: 是否有summaryIntercept: 模型截距numFeatures: 训练模型的特征个数Summary：获取summarytransform(dataset,params=None)方法

## Summary属性

aic: 模型的aic准则degreesOfFreedom: 自由度deviance: 拟合模型的偏差dispersion: 离差，对于binomial和poisson family为1，其他的由残差的Pearson Chi方统计量估计predictions： 由模型tranform方法产生的预测输出rank: 拟合线性模型的数值排秩residualDegreeOfFreedom: 残差的自由度residual(residualType=’deviance’): 拟合模型的残差,residualType为返回残差类型，可选：pearson, working, response

## trainingSummary属性

aic: aic准则coefficientStandardErrors: 估计系数和截距的标准误degreesOfFreedomdeviancedispersionpValues: 估计系数和解决的双边p值predictionsrankresidualDegreeOfFreedomResiduals(residualType=’deviance’)solvertValues: 估计系数和截距的T统计量

## 代码

``from pyspark.ml.linalg import Vectorsdf = spark.createDataFrame([ (1.0, Vectors.dense(0.0, 0.0)), (1.0, Vectors.dense(1.0, 2.0)),(2.0, Vectors.dense(0.0, 0.0)),(2.0, Vectors.dense(1.0, 1.0)),], ["label", "features"])glr = GeneralizedLinearRegression(family="gaussian", link="identity", #linkPredictionCol="p")model = glr.fit(df)transformed = model.transform(df)abs(transformed.head().prediction - 1.5) < 0.001#Trueabs(transformed.head().p - 1.5) < 0.001#Truemodel.coefficientsDenseVector([1.5..., -1.0...])model.numFeatures#2abs(model.intercept - 1.5) < 0.001#Trueglr_path = temp_path + "/glr"glr.save(glr_path)glr2 = GeneralizedLinearRegression.load(glr_path)glr.getFamily() == glr2.getFamily()#Truemodel_path = temp_path + "/glr_model"model.save(model_path)model2 = GeneralizedLinearRegressionModel.load(model_path)model.intercept == model2.intercept#Truemodel.coefficients[0] == model2.coefficients[0]#True``

# LinearRegression

``class pyspark.ml.regression.LinearRegression(self, featuresCol="features", labelCol="label", predictionCol="prediction", maxIter=100, regParam=0.0, elasticNetParam=0.0, tol=1e-6, fitIntercept=True, standardization=True, solver="auto", weightCol=None, aggregationDepth=2)``

## 参数解释

aggregationDepth: 树聚合的深度, >=2elasticNtParam: ElasticNet混合参数，在[0,1]范围内，alpha=0为L2， alpha=1为L1fit(dataset,params=None)方法fitIntercept: 是否拟合截距maxIter: 最大迭代次数regParam：正则化参数 >=0solver: 优化算法，没设置或空则使用”auto”standardization: 是否对拟合模型的特征进行标准化Setter和getter方法

## 拟合后模型拥有的方法或属性

coefficientsevaluate(dataset)hasSummaryinterceptnumFeaturesSummarytransform(dataset, params=None)

## Summary属性(experimental)

coefficientStandardErrorsdevianceResiduals: 加权残差explainedVariance: 返回解释的方差回归得分，explainedVariance=1variance(y(^y))/variance(y)meanAbsoluteError: 返回均值绝对误差meanSquaredError: 返回均值平方误numInstances: 预测的实例个数pValues: 系数和截距的双边P值，只有用”normal”solver才可用predictions: 模型transform方法返回的预测r2: R方residuals: 残差rootMeanSquaredError: 均方误差平方根tValues： T统计量

## traningSummary属性

coefficientStandardErrorsdevianceResidualsexplainedVariancemeanAbsoluteErrormeanSquaredErrornumInstancesobjectiveHistory: 每次迭代的目标函数，只有使用”l-bfgs”solver才可用pValuesPredictionsr2residualsrootMeanSquaredErrortValuestotalIterations: 结束前总迭代次数

## 代码

``from pyspark.ml.linalg import Vectorsdf = spark.createDataFrame([ (1.0, 2.0, Vectors.dense(1.0)),(0.0, 2.0, Vectors.sparse(1, [], []))], ["label", "weight", "features"])lr = LinearRegression(maxIter=5, regParam=0.0, solver="normal", weightCol="weight")model = lr.fit(df)test0 = spark.createDataFrame([(Vectors.dense(-1.0),)], ["features"])abs(model.transform(test0).head().prediction - (-1.0)) < 0.001#Trueabs(model.coefficients[0] - 1.0) < 0.001#Trueabs(model.intercept - 0.0) < 0.001#Truetest1 = spark.createDataFrame([(Vectors.sparse(1, [0], [1.0]),)], ["features"])abs(model.transform(test1).head().prediction - 1.0) < 0.001#Truelr.setParams("vector")#Traceback (most recent call last):#    ...#TypeError: Method setParams forces keyword arguments.lr_path = temp_path + "/lr"lr.save(lr_path)lr2 = LinearRegression.load(lr_path)lr2.getMaxIter()#5model_path = temp_path + "/lr_model"model.save(model_path)model2 = LinearRegressionModel.load(model_path)model.coefficients[0] == model2.coefficients[0]#Truemodel.intercept == model2.intercept#Truemodel.numFeatures#1``

# RandomForestRegressor

``class pyspark.ml.regression.RandomForestRegressor(self, featuresCol="features", labelCol="label", predictionCol="prediction", maxDepth=5, maxBins=32, minInstancesPerNode=1, minInfoGain=0.0, maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10, impurity="variance", subsamplingRate=1.0, seed=None, numTrees=20, featureSubsetStrategy="auto")``

## 参数解释

fit(dataset,params=None)方法featureSubsetStrategy: 每棵树的节点上要分割的特征数量，可选：auto, all, onethird, sqrt, log2,(0.0,1.0],[1-n]impurity: 信息增益计算的准则，可选：variancemaxBins: 连续特征离散化最大分箱个数。maxDepth: 树的最大深度minInfoGain: 树节点分割特征所需最小的信息增益minInstancesPerNode: 每个结点所需最小实例个数numTrees: 训练树的个数subsamplingRate: 学习每颗决策树所需样本比例Setter和getter方法

## 拟合后模型拥有的方法或属性

featureImportances: 见DecisionTreeRegressornumFeatures: 返回模型特征个数，如果不知，返回-1totalNumNodes: 结点总数transform(dataset,params=None)方法treeWeights: 每棵树的权重trees: 树

## 代码

``from numpy import allclosefrom pyspark.ml.linalg import Vectorsdf = spark.createDataFrame([(1.0, Vectors.dense(1.0)), (0.0, Vectors.sparse(1, [], []))], ["label", "features"])rf = RandomForestRegressor(numTrees=2, maxDepth=2, seed=42)model = rf.fit(df)model.featureImportances#SparseVector(1, {0: 1.0})allclose(model.treeWeights, [1.0, 1.0])#Truetest0 = spark.createDataFrame([(Vectors.dense(-1.0),)], ["features"])model.transform(test0).head().prediction#0.0model.numFeatures#1model.trees#[DecisionTreeRegressionModel (uid=...) of depth..., DecisionTreeRegressionModel...]model.getNumTrees#2test1 = spark.createDataFrame([(Vectors.sparse(1, [0], [1.0]),)], ["features"])model.transform(test1).head().prediction#0.5rfr_path = temp_path + "/rfr"rf.save(rfr_path)rf2 = RandomForestRegressor.load(rfr_path)rf2.getNumTrees()#2model_path = temp_path + "/rfr_model"model.save(model_path)model2 = RandomForestRegressionModel.load(model_path)model.featureImportances == model2.featureImportances#True``