站浏览量 站访问人数
目录
  1. 1. 新建build.sbt
  2. 2. pipeline训练方式
  3. 3. norm-model
  4. 4. 模型结构输出
  5. 5. 评估模型

spark-2.1版本后,提供了很多的机器学习模块,使得更方便去构建自己的模型。也新出了DataFrame数据存储结构—与结构化sql表的形式类似,让样本数据更容易操作,代码也更简洁。另外,不管是旧版本还是新版本,都提供了两种方式训练,pipeline类同于流方式训练,可以在离线和批处理时加快速度,但是存在应用时不好获取模型以及快速预测新样本不方便的缺陷,另一个方式是正常的读取样本进行训练,然后保存模型,再读取预测,这种方式虽然牺牲训练的时间,但是,在实践时,读取模型和预测新样本时更方便操作,利于实践。

新建build.sbt

开启一个spark的模型训练模块,从构建build.sbt开始,当然采取其他方式构建也是可以的,作者使用sbt较为频繁而已。值得注意的是scala版本最好用2.11,不要使用2.10。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
lazy val commonSettings = Seq(
scalaVersion := "2.11.8",
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % sparkVersion % "provided",
"org.scalatest" %% "scalatest" % "2.2.1" % "test",
"org.slf4j" % "slf4j-log4j12" % "1.7.12"
)
)

val sparkVersion = "2.1.0"

lazy val sparkMilibMl = project.dependsOn(readOtherMl).
settings(commonSettings: _*).
settings(
libraryDependencies ++= Seq(
"net.databinder.dispatch" %% "dispatch-core" % "0.11.2",
"org.apache.spark" %% "spark-mllib" % sparkVersion % "provided"
)
).
settings(runMain in Compile <<= Defaults.runMainTask(fullClasspath in Compile, runner in(Compile, run)))

更多关于sbt信息参考.

pipeline训练方式

pipeline训练方式把数据当做通道流,这样读取数据,训练,以及最后的预测都是在一个通道完成,时间会加速。其方式如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer}
import org.apache.spark.sql.SparkSession
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.ml.classification.{GBTClassificationModel, GBTClassifier}
import org.apache.spark.ml.linalg.Vector


object gBTClassify {

def main(args:Array[String]):Unit={

val spark = SparkSession.builder.master("local").appName("randomForestClassify").getOrCreate()

val train = spark.read.format("libsvm").load("/***/train_libsvm")
val test = spark.read.format("libsvm").load("/****/test_libsvm")

val labelIndexer = new StringIndexer()
.setInputCol("label")
.setOutputCol("indexedLabel")
.fit(train)

val featureIndexer = new VectorIndexer()
.setInputCol("features")
.setOutputCol("indexedFeatures")
.fit(train)

val gbt = new GBTClassifier()
.setLabelCol("indexedLabel")
.setFeaturesCol("indexedFeatures")
.setMaxIter(10)

val labelConverter = new IndexToString()
.setInputCol("prediction")
.setOutputCol("predictedLabel")
.setLabels(labelIndexer.labels)

val pipeline = new Pipeline()
.setStages(Array(labelIndexer, featureIndexer, gbt, labelConverter))

val model = pipeline.fit(train)
val predictions = model.transform(test)

model.write.overwrite().save("/****/gbt_classfy_model")
}
}

pipeline的构建在于

1
2
val pipeline = new Pipeline()
.setStages(Array(labelIndexer, featureIndexer, gbt, labelConverter))

把样本,模型都放在同一个通道里。

norm-model

接下来,是正常的训练方式,即读取数据、训练、保存模型、读取模型、预测新样本,这里需要多读取数据进行转换,因为不是所有的数据都是按照libsvm格式存储的,通常的数据格式是[id label feature1 feature2 ….],所以会做一个数据的转换。

另外,模型的保存和读取,使用了hdfs文件,在hadoop生态里,hdfs是很方便的,不需额外的搭建,把模型放置在hdfs上,这样可以保证在同集群下,所有机器都能访问到,跨集群的访问还是在研究中,猜测是可以实现的,只要拥有足够的权限(分布式数据的存储只要能拿到zookeeper的访问权限,其实是可以获取到数据的)。

详细的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import org.apache.spark.ml.classification.{GBTClassificationModel, GBTClassifier}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.AnyDataType
import org.apache.spark.sql.SQLContext
import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer}
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
import org.apache.spark.ml.linalg.VectorUDT
import classifyOperate.evaluateAUCAccuracyResult

object gbtClassify {

def main(args:Array[String]):Unit={

val conf = new SparkConf().setAppName("test").setMaster("local") // 调试的时候一定不要用local[*]
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val data = sc.textFile("/***/train.txt")
.map{x =>
val vs = x.split("\t")
val numFeatures = vs.size
val label = vs.apply(0).toDouble
val features = vs.slice(1, numFeatures)
val featu = Vectors.dense(features.map{y =>if ("NULL".equals(y)) 0.0 else y.toDouble })
(label,featu)
}
val train = sqlContext.createDataFrame(data).toDF("label","features")
val gbt = new GBTClassifier()
.setLabelCol("label")
.setFeaturesCol("features")
.setMaxIter(10)

val model = gbt.fit(train)
model.save("/****/gbt_classfy_model")
sc.stop()
}
}

读取样本然后转化成DataFrame,注意列名和rdd的数据形式一致即可。

1
val train = sqlContext.createDataFrame(data).toDF("label","features")

上述的实现是把训练集数据训练模型,并且保存模型在hdfs上,接着要读取训练好的模型,对新的样本进行预测,并且评估预测的效果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import org.apache.spark.ml.classification.{GBTClassificationModel, GBTClassifier}
import org.apache.spark.sql.SQLContext
import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer}
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import classifyOperate.evaluateAUCAccuracyResult
import org.apache.spark.ml.linalg.{Vector, Vectors}

object judgeModelCorrect {

def main(args:Array[String]):Unit={

val conf = new SparkConf().setAppName("test").setMaster("local") // 调试的时候一定不要用local[*]
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
//读取模型
val gbtModel = GBTClassificationModel.load("/****/gbt_classfy_model")

val data = sc.textFile("/****/test.txt")
.map{x =>
val vs = x.split("\t")
val numFeatures = vs.size
val report_id = vs.apply(0)
val label = vs.apply(1).toDouble
val features = vs.slice(2, numFeatures)
val featu = Vectors.dense(features.map{y =>if ("NULL".equals(y)) 0.0 else y.toDouble })
(report_id,label,featu)
}
val test = sqlContext.createDataFrame(data).toDF("reportId","label","features")

val testResult = gbtModel.transform(test).select("reportId","label","features","prediction")

val auc = evaluateAUCAccuracyResult.getPredictAUC(testResult, "label", "prediction")
val accurcy = evaluateAUCAccuracyResult.getPredictAccuracy(testResult, "label", "prediction")
val f1 = evaluateAUCAccuracyResult.getPredictF1(testResult, "label", "prediction")
evaluateAUCAccuracyResult.getPredictConfusionMatrix(testResult, "label", "prediction")
println("auc="+auc+",accurcy="+accurcy+",f1="+f1)

//testResult.toJSON.rdd.saveAsTextFile("/***/test_data_judge_result2")
sc.stop()
}
}

要注意的是训练集和测试集的源数据是不同格式,测试时加了样本id,所以,获取样本时要考虑到样本的起始位置,以免出错。

模型结构输出

当训练完后,可以输出模型的参数和结构信息,当然也可以读取加载模型,然后查看已经训练好的模型结构,操作如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14

val conf = new SparkConf().setAppName("test").setMaster("local")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
//读取模型
val gbtModel = GBTClassificationModel.load("*****")
//输出模型
//模型的特征数
println("模型的特征数="+gbtModel.numFeatures)
println("模型结构="+gbtModel.toDebugString)
//gbt的整体参数,
val paramesMap = gbtModel.extractParamMap()
println("paramesMap="+paramesMap.toString())
sc.stop()

评估模型

下面是评估的方法集合,把效果评估整合在一个函数里,可以进行拓展,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.sql.DataFrame
import org.apache.spark.mllib.evaluation.MulticlassMetrics
import org.apache.spark.sql.{Dataset, Row}
import org.apache.spark.ml.linalg.{Vector, VectorUDT}
import org.apache.spark.ml.util.SchemaUtils
import org.apache.spark.sql.types.DoubleType
import org.apache.spark.sql.functions._

object evaluateAUCAccuracyResult {

/**获得auc指标***/
def getPredictAUC(predictions: DataFrame,label:String,predict:String):Double={

val evaluatorAUC = new BinaryClassificationEvaluator()
.setLabelCol(label)
.setRawPredictionCol(predict)
.setMetricName("areaUnderROC")

val auc = evaluatorAUC.evaluate(predictions)
auc
}

/***获得准确率***/
def getPredictAccuracy(predictions: DataFrame,label:String,predict:String):Double={
val evaluatorAccuracy = new MulticlassClassificationEvaluator()
.setLabelCol(label)
.setPredictionCol(predict)
.setMetricName("accuracy")

val accuracy = evaluatorAccuracy.evaluate(predictions)
accuracy
}

/**获得f1指标***/
def getPredictF1(predictions: DataFrame,label:String,predict:String):Double={
val evaluator = new MulticlassClassificationEvaluator()
.setLabelCol(label)
.setPredictionCol(predict)
.setMetricName("f1")

val f1 = evaluator.evaluate(predictions)
f1
}

/**获得召回率指标***/
def getPredictRecall(predictions: DataFrame,label:String,predict:String):Double={
val evaluator = new MulticlassClassificationEvaluator()
.setLabelCol(label)
.setPredictionCol(predict)
.setMetricName("weightedRecall")

val recall = evaluator.evaluate(predictions)
recall
}

/**获得精确率指标***/
def getPredictPrecision(predictions: DataFrame,label:String,predict:String):Double={
val evaluator = new MulticlassClassificationEvaluator()
.setLabelCol(label)
.setPredictionCol(predict)
.setMetricName("weightedPrecision")

val pre = evaluator.evaluate(predictions)
pre
}

/**获取混淆矩阵**/
def getPredictConfusionMatrix(predictions: DataFrame,label:String,predict:String):Unit={

val schema = predictions.schema

val predictionAndLabels = predictions.select(col(predict), col(label).cast(DoubleType)).rdd.map {
case Row(rawPrediction: Vector, label: Double) => (rawPrediction(1), label)
case Row(rawPrediction: Double, label: Double) => (rawPrediction, label)
}

val metrics = new MulticlassMetrics(predictionAndLabels)
println("Confusion matrix:")
println(metrics.confusionMatrix)
}
}