spark-2.1版本后,提供了很多的机器学习模块,使得更方便去构建自己的模型。也新出了DataFrame数据存储结构—与结构化sql表的形式类似,让样本数据更容易操作,代码也更简洁。另外,不管是旧版本还是新版本,都提供了两种方式训练,pipeline类同于流方式训练,可以在离线和批处理时加快速度,但是存在应用时不好获取模型以及快速预测新样本不方便的缺陷,另一个方式是正常的读取样本进行训练,然后保存模型,再读取预测,这种方式虽然牺牲训练的时间,但是,在实践时,读取模型和预测新样本时更方便操作,利于实践。
新建build.sbt 开启一个spark的模型训练模块,从构建build.sbt开始,当然采取其他方式构建也是可以的,作者使用sbt较为频繁而已。值得注意的是scala版本最好用2.11,不要使用2.10。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 lazy val commonSettings = Seq( scalaVersion := "2.11.8" , libraryDependencies ++= Seq( "org.apache.spark" %% "spark-core" % sparkVersion % "provided" , "org.scalatest" %% "scalatest" % "2.2.1" % "test" , "org.slf4j" % "slf4j-log4j12" % "1.7.12" ) ) val sparkVersion = "2.1.0" lazy val sparkMilibMl = project.dependsOn(readOtherMl). settings(commonSettings: _*). settings( libraryDependencies ++= Seq( "net.databinder.dispatch" %% "dispatch-core" % "0.11.2" , "org.apache.spark" %% "spark-mllib" % sparkVersion % "provided" ) ). settings(runMain in Compile <<= Defaults.runMainTask(fullClasspath in Compile, runner in (Compile, run)))
更多关于sbt信息参考 .
pipeline训练方式 pipeline训练方式把数据当做通道流,这样读取数据,训练,以及最后的预测都是在一个通道完成,时间会加速。其方式如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 import org.apache.spark.ml.Pipeline import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer} import org.apache.spark.sql.SparkSession import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.ml.classification.{GBTClassificationModel, GBTClassifier} import org.apache.spark.ml.linalg.Vector object gBTClassify { def main(args:Array[String]):Unit={ val spark = SparkSession.builder.master("local" ).appName("randomForestClassify" ).getOrCreate() val train = spark.read.format("libsvm" ).load("/***/train_libsvm" ) val test = spark.read.format("libsvm" ).load("/****/test_libsvm" ) val labelIndexer = new StringIndexer() .setInputCol("label" ) .setOutputCol("indexedLabel" ) .fit(train) val featureIndexer = new VectorIndexer() .setInputCol("features" ) .setOutputCol("indexedFeatures" ) .fit(train) val gbt = new GBTClassifier() .setLabelCol("indexedLabel" ) .setFeaturesCol("indexedFeatures" ) .setMaxIter(10) val labelConverter = new IndexToString() .setInputCol("prediction" ) .setOutputCol("predictedLabel" ) .setLabels(labelIndexer.labels) val pipeline = new Pipeline() .setStages(Array(labelIndexer, featureIndexer, gbt, labelConverter)) val model = pipeline.fit(train) val predictions = model.transform(test ) model.write.overwrite().save("/****/gbt_classfy_model" ) } }
pipeline的构建在于
1 2 val pipeline = new Pipeline() .setStages(Array(labelIndexer, featureIndexer, gbt, labelConverter))
把样本,模型都放在同一个通道里。
norm-model 接下来,是正常的训练方式,即读取数据、训练、保存模型、读取模型、预测新样本,这里需要多读取数据进行转换,因为不是所有的数据都是按照libsvm格式存储的,通常的数据格式是[id label feature1 feature2 ….],所以会做一个数据的转换。
另外,模型的保存和读取,使用了hdfs文件,在hadoop生态里,hdfs是很方便的,不需额外的搭建,把模型放置在hdfs上,这样可以保证在同集群下,所有机器都能访问到,跨集群的访问还是在研究中,猜测是可以实现的,只要拥有足够的权限(分布式数据的存储只要能拿到zookeeper的访问权限,其实是可以获取到数据的)。
详细的实现如下:1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 import org.apache.spark.ml.classification.{GBTClassificationModel, GBTClassifier} import org.apache.spark.sql.SparkSession import org.apache.spark.sql.Row import org.apache.spark.sql.types.AnyDataType import org.apache.spark.sql.SQLContext import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer} import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.ml.linalg.{Vector, Vectors} import org.apache.spark.sql.types.{DataTypes, StructField, StructType} import org.apache.spark.ml.linalg.VectorUDT import classifyOperate.evaluateAUCAccuracyResult object gbtClassify { def main(args:Array[String]):Unit={ val conf = new SparkConf().setAppName("test" ).setMaster("local" ) // 调试的时候一定不要用local [*] val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) val data = sc.textFile("/***/train.txt" ) .map{x => val vs = x.split("\t" ) val numFeatures = vs.size val label = vs.apply(0).toDouble val features = vs.slice(1, numFeatures) val featu = Vectors.dense(features.map{y =>if ("NULL" .equals(y)) 0.0 else y.toDouble }) (label,featu) } val train = sqlContext.createDataFrame(data).toDF("label" ,"features" ) val gbt = new GBTClassifier() .setLabelCol("label" ) .setFeaturesCol("features" ) .setMaxIter(10) val model = gbt.fit(train) model.save("/****/gbt_classfy_model" ) sc.stop() } }
读取样本然后转化成DataFrame,注意列名和rdd的数据形式一致即可。
1 val train = sqlContext.createDataFrame(data).toDF("label","features")
上述的实现是把训练集数据训练模型,并且保存模型在hdfs上,接着要读取训练好的模型,对新的样本进行预测,并且评估预测的效果。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 import org.apache.spark.ml.classification.{GBTClassificationModel, GBTClassifier} import org.apache.spark.sql.SQLContext import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer} import org.apache.spark.SparkConf import org.apache.spark.SparkContext import classifyOperate.evaluateAUCAccuracyResult import org.apache.spark.ml.linalg.{Vector, Vectors} object judgeModelCorrect { def main(args:Array[String]):Unit={ val conf = new SparkConf().setAppName("test" ).setMaster("local" ) // 调试的时候一定不要用local [*] val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) //读取模型 val gbtModel = GBTClassificationModel.load("/****/gbt_classfy_model" ) val data = sc.textFile("/****/test.txt" ) .map{x => val vs = x.split("\t" ) val numFeatures = vs.size val report_id = vs.apply(0) val label = vs.apply(1).toDouble val features = vs.slice(2, numFeatures) val featu = Vectors.dense(features.map{y =>if ("NULL" .equals(y)) 0.0 else y.toDouble }) (report_id,label,featu) } val test = sqlContext.createDataFrame(data).toDF("reportId" ,"label" ,"features" ) val testResult = gbtModel.transform(test ).select("reportId" ,"label" ,"features" ,"prediction" ) val auc = evaluateAUCAccuracyResult.getPredictAUC(testResult, "label" , "prediction" ) val accurcy = evaluateAUCAccuracyResult.getPredictAccuracy(testResult, "label" , "prediction" ) val f1 = evaluateAUCAccuracyResult.getPredictF1(testResult, "label" , "prediction" ) evaluateAUCAccuracyResult.getPredictConfusionMatrix(testResult, "label" , "prediction" ) println("auc=" +auc+",accurcy=" +accurcy+",f1=" +f1) //testResult.toJSON.rdd.saveAsTextFile("/***/test_data_judge_result2" ) sc.stop() } }
要注意的是训练集和测试集的源数据是不同格式,测试时加了样本id,所以,获取样本时要考虑到样本的起始位置,以免出错。
模型结构输出 当训练完后,可以输出模型的参数和结构信息,当然也可以读取加载模型,然后查看已经训练好的模型结构,操作如下:1 2 3 4 5 6 7 8 9 10 11 12 13 14 val conf = new SparkConf().setAppName("test" ).setMaster("local" ) val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) //读取模型 val gbtModel = GBTClassificationModel.load("*****" ) //输出模型 //模型的特征数 println("模型的特征数=" +gbtModel.numFeatures) println("模型结构=" +gbtModel.toDebugString) //gbt的整体参数, val paramesMap = gbtModel.extractParamMap() println("paramesMap=" +paramesMap.toString()) sc.stop()
评估模型 下面是评估的方法集合,把效果评估整合在一个函数里,可以进行拓展,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator import org.apache.spark.sql.DataFrame import org.apache.spark.mllib.evaluation.MulticlassMetrics import org.apache.spark.sql.{Dataset, Row} import org.apache.spark.ml.linalg.{Vector, VectorUDT} import org.apache.spark.ml.util.SchemaUtils import org.apache.spark.sql.types.DoubleType import org.apache.spark.sql.functions._ object evaluateAUCAccuracyResult { /**获得auc指标***/ def getPredictAUC(predictions: DataFrame,label:String,predict:String):Double={ val evaluatorAUC = new BinaryClassificationEvaluator() .setLabelCol(label) .setRawPredictionCol(predict) .setMetricName("areaUnderROC" ) val auc = evaluatorAUC.evaluate(predictions) auc } /***获得准确率***/ def getPredictAccuracy(predictions: DataFrame,label:String,predict:String):Double={ val evaluatorAccuracy = new MulticlassClassificationEvaluator() .setLabelCol(label) .setPredictionCol(predict) .setMetricName("accuracy" ) val accuracy = evaluatorAccuracy.evaluate(predictions) accuracy } /**获得f1指标***/ def getPredictF1(predictions: DataFrame,label:String,predict:String):Double={ val evaluator = new MulticlassClassificationEvaluator() .setLabelCol(label) .setPredictionCol(predict) .setMetricName("f1" ) val f1 = evaluator.evaluate(predictions) f1 } /**获得召回率指标***/ def getPredictRecall(predictions: DataFrame,label:String,predict:String):Double={ val evaluator = new MulticlassClassificationEvaluator() .setLabelCol(label) .setPredictionCol(predict) .setMetricName("weightedRecall" ) val recall = evaluator.evaluate(predictions) recall } /**获得精确率指标***/ def getPredictPrecision(predictions: DataFrame,label:String,predict:String):Double={ val evaluator = new MulticlassClassificationEvaluator() .setLabelCol(label) .setPredictionCol(predict) .setMetricName("weightedPrecision" ) val pre = evaluator.evaluate(predictions) pre } /**获取混淆矩阵**/ def getPredictConfusionMatrix(predictions: DataFrame,label:String,predict:String):Unit={ val schema = predictions.schema val predictionAndLabels = predictions.select(col(predict), col(label).cast(DoubleType)).rdd.map { case Row(rawPrediction: Vector, label: Double) => (rawPrediction(1), label) case Row(rawPrediction: Double, label: Double) => (rawPrediction, label) } val metrics = new MulticlassMetrics(predictionAndLabels) println("Confusion matrix:" ) println(metrics.confusionMatrix) } }