使用pyspark进行机器学习(回归问题)

2018-02-27 11:47:07来源:http://blog.csdn.net/littlely_ll/article/details/78161574作者:littlely_ll人点击

分享

使用pyspark进行机器学习(分类问题)使用pyspark进行机器学习(聚类问题)


DecisionTreeRegressor

class pyspark.ml.regression.DecisionTreeRegressor(self, featuresCol="features", labelCol="label", predictionCol="prediction", maxDepth=5, maxBins=32, minInstancesPerNode=1, minInfoGain=0.0, maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10, impurity="variance", seed=None, varianceCol=None)

支持连续与分类变量

参数解释


fit(dataset, params=None)方法Impurity: 信息增益计算准则,支持选项:variancemaxBins: 连续特征离散化的最大分箱个数, >=2并且>=任何分类特征的分类个数maxDepth: 最大树深minInfoGain: 分割节点所需最小信息增益minInstancesPerNode: 分割后每个子节点最小实例个数Setter方法和getter方法

拟合后模型拥有的方法或属性


depth: 返回决策树的深度featureImportances: 每个特征重要性估计(见DecisionTreeClassifer)numFeatures: 返回训练模型中的特征个数,如果不知,返回-1numNodes: 返回决策树中的结点个数transform(dataset, params=None)方法

代码

from pyspark.ml.linalg import Vectors
df = spark.createDataFrame([(1.0, Vectors.dense(1.0)),(0.0, Vectors.sparse(1, [], []))], ["label", "features"])
dt = DecisionTreeRegressor(maxDepth=2, varianceCol="variance")
model = dt.fit(df)
model.depth
#1
model.numNodes
#3
model.featureImportances
#SparseVector(1, {0: 1.0})
model.numFeatures
#1
test0 = spark.createDataFrame([(Vectors.dense(-1.0),)], ["features"])
model.transform(test0).head().prediction
#0.0
test1 = spark.createDataFrame([(Vectors.sparse(1, [0], [1.0]),)], ["features"])
model.transform(test1).head().prediction
#1.0
dtr_path = temp_path + "/dtr"
dt.save(dtr_path)
dt2 = DecisionTreeRegressor.load(dtr_path)
dt2.getMaxDepth()
#2
model_path = temp_path + "/dtr_model"
model.save(model_path)
model2 = DecisionTreeRegressionModel.load(model_path)
model.numNodes == model2.numNodes
#True
model.depth == model2.depth
#True
model.transform(test1).head().variance
#0.0

GBTRegressor

class pyspark.ml.regression.GBTRegressor(self, featuresCol="features", labelCol="label", predictionCol="prediction", maxDepth=5, maxBins=32, minInstancesPerNode=1, minInfoGain=0.0, maxMemoryInMB=256, cacheNodeIds=False, subsamplingRate=1.0, checkpointInterval=10, lossType="squared", maxIter=20, stepSize=0.1, seed=None, impurity="variance")

参数解释


fit(dataset,params=None)方法Impurity: 同DecisionTreeRegressorlossType: GBT要最小化的损失函数,可选:squared, absolutemaxBins: 同DecisionTreeRegressormaxDepth: 同DecisionTreeRegressormaxIter: 最大迭代次数minIfoGain: 同DecisionTreeRegressorminInstancesPerNode: 同DecisionTreeRegressorstepSize: 每次优化迭代的步长subsamplingRate:用于训练每颗决策树的训练数据集的比例,区间[0,1]Setter和getter方法

拟合后模型拥有的方法或属性


featureImportances: 同DecisionTreeRegressorgetNumTrees: 同DecisionTreeRegressornumFeatures: 同DecisionTreeRegressortotalNumNoes: 集成树中所有的结点transform(dataset,params=None)方法treeWeights: 每棵树的权重trees: 返回树

代码

from numpy import allclose
from pyspark.ml.linalg import Vectors
df = spark.createDataFrame([(1.0, Vectors.dense(1.0)), (0.0, Vectors.sparse(1, [], []))], ["label", "features"])
gbt = GBTRegressor(maxIter=5, maxDepth=2, seed=42)
print(gbt.getImpurity())
#variance
model = gbt.fit(df)
model.featureImportances
#SparseVector(1, {0: 1.0})
model.numFeatures
#1
allclose(model.treeWeights, [1.0, 0.1, 0.1, 0.1, 0.1])
#True
test0 = spark.createDataFrame([(Vectors.dense(-1.0),)], ["features"])
model.transform(test0).head().prediction
#0.0
test1 = spark.createDataFrame([(Vectors.sparse(1, [0], [1.0]),)],["features"])
model.transform(test1).head().prediction
#1.0
gbtr_path = temp_path + "gbtr"
gbt.save(gbtr_path)
gbt2 = GBTRegressor.load(gbtr_path)
gbt2.getMaxDepth()
#2
model_path = temp_path + "gbtr_model"
model.save(model_path)
model2 = GBTRegressionModel.load(model_path)
model.featureImportances == model2.featureImportances
#True
model.treeWeights == model2.treeWeights
#True
model.trees
#[DecisionTreeRegressionModel (uid=...) of depth..., DecisionTreeRegressionModel...]

GeneralizedLinearRegression

class pyspark.ml.regression.GeneralizedLinearRegression(self, labelCol="label", featuresCol="features", predictionCol="prediction", family="gaussian", link=None, fitIntercept=True, maxIter=25, tol=1e-6, regParam=0.0, weightCol=None, solver="irls", linkPredictionCol=None)

给出link函数和family函数拟合广义线性模型Family函数对应的link函数,第一个分别是默认值 • “gaussian” -> “identity”, “log”, “inverse” • “binomial” -> “logit”, “probit”, “cloglog” • “poisson” -> “log”, “identity”, “sqrt” • “gamma” -> “inverse”, “identity”, “log”

参数解释


fit(dataset,params=None)方法family: 误差分布的描述名称,可选:gaussian(默认), binomial, poisson, gammafitIntercept: 是否拟合截距项link: 提供线性预测变量和分布函数均值的关系名称,可选:identity, log, inverse, logit, probit, cloglog, sqrt.maxIter: 最大迭代次数solver: 优化的算法,如果没设置或空则使用”auto”tol: 迭代的收敛偏差Setter和getter方法

拟合后模型拥有的方法或属性(experimental)


coefficients: 模型系数Evaluate(dataset): 在检测集上评估模型hasSummary: 是否有summaryIntercept: 模型截距numFeatures: 训练模型的特征个数Summary:获取summarytransform(dataset,params=None)方法

Summary属性


aic: 模型的aic准则degreesOfFreedom: 自由度deviance: 拟合模型的偏差dispersion: 离差,对于binomial和poisson family为1,其他的由残差的Pearson Chi方统计量估计predictions: 由模型tranform方法产生的预测输出rank: 拟合线性模型的数值排秩residualDegreeOfFreedom: 残差的自由度residual(residualType=’deviance’): 拟合模型的残差,residualType为返回残差类型,可选:pearson, working, response

trainingSummary属性


aic: aic准则coefficientStandardErrors: 估计系数和截距的标准误degreesOfFreedomdeviancedispersionpValues: 估计系数和解决的双边p值predictionsrankresidualDegreeOfFreedomResiduals(residualType=’deviance’)solvertValues: 估计系数和截距的T统计量

代码

from pyspark.ml.linalg import Vectors
df = spark.createDataFrame([ (1.0, Vectors.dense(0.0, 0.0)), (1.0, Vectors.dense(1.0, 2.0)),(2.0, Vectors.dense(0.0, 0.0)),(2.0, Vectors.dense(1.0, 1.0)),], ["label", "features"])
glr = GeneralizedLinearRegression(family="gaussian", link="identity", #linkPredictionCol="p")
model = glr.fit(df)
transformed = model.transform(df)
abs(transformed.head().prediction - 1.5) < 0.001
#True
abs(transformed.head().p - 1.5) < 0.001
#True
model.coefficients
DenseVector([1.5..., -1.0...])
model.numFeatures
#2
abs(model.intercept - 1.5) < 0.001
#True
glr_path = temp_path + "/glr"
glr.save(glr_path)
glr2 = GeneralizedLinearRegression.load(glr_path)
glr.getFamily() == glr2.getFamily()
#True
model_path = temp_path + "/glr_model"
model.save(model_path)
model2 = GeneralizedLinearRegressionModel.load(model_path)
model.intercept == model2.intercept
#True
model.coefficients[0] == model2.coefficients[0]
#True

LinearRegression

class pyspark.ml.regression.LinearRegression(self, featuresCol="features", labelCol="label", predictionCol="prediction", maxIter=100, regParam=0.0, elasticNetParam=0.0, tol=1e-6, fitIntercept=True, standardization=True, solver="auto", weightCol=None, aggregationDepth=2)

支持多种类型的正则化: • None:OLS • L2:ridge回归 • L1:Lasso回归 • L1+L2:elastic回归

参数解释


aggregationDepth: 树聚合的深度, >=2elasticNtParam: ElasticNet混合参数,在[0,1]范围内,alpha=0为L2, alpha=1为L1fit(dataset,params=None)方法fitIntercept: 是否拟合截距maxIter: 最大迭代次数regParam:正则化参数 >=0solver: 优化算法,没设置或空则使用”auto”standardization: 是否对拟合模型的特征进行标准化Setter和getter方法

拟合后模型拥有的方法或属性


coefficientsevaluate(dataset)hasSummaryinterceptnumFeaturesSummarytransform(dataset, params=None)

Summary属性(experimental)


coefficientStandardErrorsdevianceResiduals: 加权残差explainedVariance: 返回解释的方差回归得分,explainedVariance=1variance(y(^y))/variance(y)meanAbsoluteError: 返回均值绝对误差meanSquaredError: 返回均值平方误numInstances: 预测的实例个数pValues: 系数和截距的双边P值,只有用”normal”solver才可用predictions: 模型transform方法返回的预测r2: R方residuals: 残差rootMeanSquaredError: 均方误差平方根tValues: T统计量

traningSummary属性


coefficientStandardErrorsdevianceResidualsexplainedVariancemeanAbsoluteErrormeanSquaredErrornumInstancesobjectiveHistory: 每次迭代的目标函数,只有使用”l-bfgs”solver才可用pValuesPredictionsr2residualsrootMeanSquaredErrortValuestotalIterations: 结束前总迭代次数

代码

from pyspark.ml.linalg import Vectors
df = spark.createDataFrame([ (1.0, 2.0, Vectors.dense(1.0)),(0.0, 2.0, Vectors.sparse(1, [], []))], ["label", "weight", "features"])
lr = LinearRegression(maxIter=5, regParam=0.0, solver="normal", weightCol="weight")
model = lr.fit(df)
test0 = spark.createDataFrame([(Vectors.dense(-1.0),)], ["features"])
abs(model.transform(test0).head().prediction - (-1.0)) < 0.001
#True
abs(model.coefficients[0] - 1.0) < 0.001
#True
abs(model.intercept - 0.0) < 0.001
#True
test1 = spark.createDataFrame([(Vectors.sparse(1, [0], [1.0]),)], ["features"])
abs(model.transform(test1).head().prediction - 1.0) < 0.001
#True
lr.setParams("vector")
#Traceback (most recent call last):
# ...
#TypeError: Method setParams forces keyword arguments.
lr_path = temp_path + "/lr"
lr.save(lr_path)
lr2 = LinearRegression.load(lr_path)
lr2.getMaxIter()
#5
model_path = temp_path + "/lr_model"
model.save(model_path)
model2 = LinearRegressionModel.load(model_path)
model.coefficients[0] == model2.coefficients[0]
#True
model.intercept == model2.intercept
#True
model.numFeatures
#1

RandomForestRegressor

class pyspark.ml.regression.RandomForestRegressor(self, featuresCol="features", labelCol="label", predictionCol="prediction", maxDepth=5, maxBins=32, minInstancesPerNode=1, minInfoGain=0.0, maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10, impurity="variance", subsamplingRate=1.0, seed=None, numTrees=20, featureSubsetStrategy="auto")

参数解释


fit(dataset,params=None)方法featureSubsetStrategy: 每棵树的节点上要分割的特征数量,可选:auto, all, onethird, sqrt, log2,(0.0,1.0],[1-n]impurity: 信息增益计算的准则,可选:variancemaxBins: 连续特征离散化最大分箱个数。maxDepth: 树的最大深度minInfoGain: 树节点分割特征所需最小的信息增益minInstancesPerNode: 每个结点所需最小实例个数numTrees: 训练树的个数subsamplingRate: 学习每颗决策树所需样本比例Setter和getter方法

拟合后模型拥有的方法或属性


featureImportances: 见DecisionTreeRegressornumFeatures: 返回模型特征个数,如果不知,返回-1totalNumNodes: 结点总数transform(dataset,params=None)方法treeWeights: 每棵树的权重trees: 树

代码

from numpy import allclose
from pyspark.ml.linalg import Vectors
df = spark.createDataFrame([(1.0, Vectors.dense(1.0)), (0.0, Vectors.sparse(1, [], []))], ["label", "features"])
rf = RandomForestRegressor(numTrees=2, maxDepth=2, seed=42)
model = rf.fit(df)
model.featureImportances
#SparseVector(1, {0: 1.0})
allclose(model.treeWeights, [1.0, 1.0])
#True
test0 = spark.createDataFrame([(Vectors.dense(-1.0),)], ["features"])
model.transform(test0).head().prediction
#0.0
model.numFeatures
#1
model.trees
#[DecisionTreeRegressionModel (uid=...) of depth..., DecisionTreeRegressionModel...]
model.getNumTrees
#2
test1 = spark.createDataFrame([(Vectors.sparse(1, [0], [1.0]),)], ["features"])
model.transform(test1).head().prediction
#0.5
rfr_path = temp_path + "/rfr"
rf.save(rfr_path)
rf2 = RandomForestRegressor.load(rfr_path)
rf2.getNumTrees()
#2
model_path = temp_path + "/rfr_model"
model.save(model_path)
model2 = RandomForestRegressionModel.load(model_path)
model.featureImportances == model2.featureImportances
#True

最新文章

123

最新摄影

闪念基因

微信扫一扫

第七城市微信公众平台