spark-2.1版本后,提供了很多的机器学习模块,使得更方便去构建自己的模型。也新出了DataFrame数据存储结构—与结构化sql表的形式类似,让样本数据更容易操作,代码也更简洁。另外,不管是旧版本还是新版本,都提供了两种方式训练,pipeline类同于流方式训练,可以在离线和批处理时加快速度,但是存在应用时不好获取模型以及快速预测新样本不方便的缺陷,另一个方式是正常的读取样本进行训练,然后保存模型,再读取预测,这种方式虽然牺牲训练的时间,但是,在实践时,读取模型和预测新样本时更方便操作,利于实践。
新建build.sbt 开启一个spark的模型训练模块,从构建build.sbt开始,当然采取其他方式构建也是可以的,作者使用sbt较为频繁而已。值得注意的是scala版本最好用2.11,不要使用2.10。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 lazy val commonSettings = Seq(   scalaVersion := "2.11.8" ,   libraryDependencies ++= Seq(     "org.apache.spark"  %% "spark-core"  % sparkVersion % "provided" ,     "org.scalatest"  %% "scalatest"  % "2.2.1"  % "test" ,     "org.slf4j"  % "slf4j-log4j12"  % "1.7.12"    ) ) val sparkVersion = "2.1.0"  lazy val sparkMilibMl = project.dependsOn(readOtherMl).   settings(commonSettings: _*).   settings(     libraryDependencies ++= Seq(       "net.databinder.dispatch"  %% "dispatch-core"  % "0.11.2" ,       "org.apache.spark"  %% "spark-mllib"  % sparkVersion % "provided"      )   ).   settings(runMain in  Compile <<= Defaults.runMainTask(fullClasspath in  Compile, runner in (Compile, run))) 
 
更多关于sbt信息参考 .
pipeline训练方式 pipeline训练方式把数据当做通道流,这样读取数据,训练,以及最后的预测都是在一个通道完成,时间会加速。其方式如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 import org.apache.spark.ml.Pipeline import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer} import org.apache.spark.sql.SparkSession import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.ml.classification.{GBTClassificationModel, GBTClassifier} import org.apache.spark.ml.linalg.Vector object gBTClassify {   def main(args:Array[String]):Unit={     val spark = SparkSession.builder.master("local" ).appName("randomForestClassify" ).getOrCreate()     val train = spark.read.format("libsvm" ).load("/***/train_libsvm" )     val test  = spark.read.format("libsvm" ).load("/****/test_libsvm" )     val labelIndexer = new StringIndexer()                           .setInputCol("label" )                           .setOutputCol("indexedLabel" )                           .fit(train)     val featureIndexer = new VectorIndexer()                           .setInputCol("features" )                           .setOutputCol("indexedFeatures" )                           .fit(train)     val gbt = new GBTClassifier()               .setLabelCol("indexedLabel" )               .setFeaturesCol("indexedFeatures" )               .setMaxIter(10)     val labelConverter = new IndexToString()               .setInputCol("prediction" )               .setOutputCol("predictedLabel" )               .setLabels(labelIndexer.labels)              val pipeline = new Pipeline()               .setStages(Array(labelIndexer, featureIndexer, gbt, labelConverter))               val model = pipeline.fit(train)     val predictions = model.transform(test )            model.write.overwrite().save("/****/gbt_classfy_model" )   } } 
 
pipeline的构建在于
1 2 val pipeline = new Pipeline()     .setStages(Array(labelIndexer, featureIndexer, gbt, labelConverter)) 
 
把样本,模型都放在同一个通道里。
norm-model 接下来,是正常的训练方式,即读取数据、训练、保存模型、读取模型、预测新样本,这里需要多读取数据进行转换,因为不是所有的数据都是按照libsvm格式存储的,通常的数据格式是[id label feature1 feature2 ….],所以会做一个数据的转换。
另外,模型的保存和读取,使用了hdfs文件,在hadoop生态里,hdfs是很方便的,不需额外的搭建,把模型放置在hdfs上,这样可以保证在同集群下,所有机器都能访问到,跨集群的访问还是在研究中,猜测是可以实现的,只要拥有足够的权限(分布式数据的存储只要能拿到zookeeper的访问权限,其实是可以获取到数据的)。
详细的实现如下:1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 import org.apache.spark.ml.classification.{GBTClassificationModel, GBTClassifier} import org.apache.spark.sql.SparkSession import org.apache.spark.sql.Row import org.apache.spark.sql.types.AnyDataType import org.apache.spark.sql.SQLContext import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer} import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.ml.linalg.{Vector, Vectors} import org.apache.spark.sql.types.{DataTypes, StructField, StructType} import org.apache.spark.ml.linalg.VectorUDT import classifyOperate.evaluateAUCAccuracyResult object gbtClassify {   def main(args:Array[String]):Unit={     val conf = new SparkConf().setAppName("test" ).setMaster("local" ) // 调试的时候一定不要用local [*]     val sc = new SparkContext(conf)     val sqlContext = new SQLContext(sc)     val data = sc.textFile("/***/train.txt" )     .map{x =>          val vs = x.split("\t" )          val numFeatures = vs.size          val label = vs.apply(0).toDouble          val features = vs.slice(1, numFeatures)          val featu = Vectors.dense(features.map{y =>if  ("NULL" .equals(y)) 0.0 else  y.toDouble })          (label,featu)     }     val train = sqlContext.createDataFrame(data).toDF("label" ,"features" )     val gbt = new GBTClassifier()               .setLabelCol("label" )               .setFeaturesCol("features" )               .setMaxIter(10)     val model = gbt.fit(train)     model.save("/****/gbt_classfy_model" )     sc.stop()   } } 
读取样本然后转化成DataFrame,注意列名和rdd的数据形式一致即可。
1 val train = sqlContext.createDataFrame(data).toDF("label","features") 
 
上述的实现是把训练集数据训练模型,并且保存模型在hdfs上,接着要读取训练好的模型,对新的样本进行预测,并且评估预测的效果。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 import org.apache.spark.ml.classification.{GBTClassificationModel, GBTClassifier} import org.apache.spark.sql.SQLContext import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer} import org.apache.spark.SparkConf import org.apache.spark.SparkContext import classifyOperate.evaluateAUCAccuracyResult import org.apache.spark.ml.linalg.{Vector, Vectors} object judgeModelCorrect {   def main(args:Array[String]):Unit={     val conf = new SparkConf().setAppName("test" ).setMaster("local" ) // 调试的时候一定不要用local [*]     val sc = new SparkContext(conf)     val sqlContext = new SQLContext(sc)     //读取模型     val gbtModel = GBTClassificationModel.load("/****/gbt_classfy_model" )     val data = sc.textFile("/****/test.txt" )     .map{x =>          val vs = x.split("\t" )          val numFeatures = vs.size          val report_id = vs.apply(0)          val label = vs.apply(1).toDouble          val features = vs.slice(2, numFeatures)          val featu = Vectors.dense(features.map{y =>if  ("NULL" .equals(y)) 0.0 else  y.toDouble })          (report_id,label,featu)     }     val test  = sqlContext.createDataFrame(data).toDF("reportId" ,"label" ,"features" )     val testResult = gbtModel.transform(test ).select("reportId" ,"label" ,"features" ,"prediction" )     val auc = evaluateAUCAccuracyResult.getPredictAUC(testResult, "label" , "prediction" )     val accurcy = evaluateAUCAccuracyResult.getPredictAccuracy(testResult, "label" , "prediction" )     val f1 = evaluateAUCAccuracyResult.getPredictF1(testResult, "label" , "prediction" )     evaluateAUCAccuracyResult.getPredictConfusionMatrix(testResult, "label" , "prediction" )     println("auc=" +auc+",accurcy=" +accurcy+",f1=" +f1)    //testResult.toJSON.rdd.saveAsTextFile("/***/test_data_judge_result2" )    sc.stop()   } } 
 
要注意的是训练集和测试集的源数据是不同格式,测试时加了样本id,所以,获取样本时要考虑到样本的起始位置,以免出错。
模型结构输出 当训练完后,可以输出模型的参数和结构信息,当然也可以读取加载模型,然后查看已经训练好的模型结构,操作如下:1 2 3 4 5 6 7 8 9 10 11 12 13 14 val conf = new SparkConf().setAppName("test" ).setMaster("local" )     val sc = new SparkContext(conf)     val sqlContext = new SQLContext(sc)     //读取模型     val gbtModel = GBTClassificationModel.load("*****" )     //输出模型     //模型的特征数     println("模型的特征数=" +gbtModel.numFeatures)     println("模型结构=" +gbtModel.toDebugString)     //gbt的整体参数,     val paramesMap = gbtModel.extractParamMap()     println("paramesMap=" +paramesMap.toString())     sc.stop() 
评估模型 下面是评估的方法集合,把效果评估整合在一个函数里,可以进行拓展,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator import org.apache.spark.sql.DataFrame import org.apache.spark.mllib.evaluation.MulticlassMetrics import org.apache.spark.sql.{Dataset, Row} import org.apache.spark.ml.linalg.{Vector, VectorUDT} import org.apache.spark.ml.util.SchemaUtils import org.apache.spark.sql.types.DoubleType import org.apache.spark.sql.functions._ object evaluateAUCAccuracyResult {     /**获得auc指标***/     def getPredictAUC(predictions: DataFrame,label:String,predict:String):Double={        val evaluatorAUC = new BinaryClassificationEvaluator()                        .setLabelCol(label)                        .setRawPredictionCol(predict)                        .setMetricName("areaUnderROC" )         val auc = evaluatorAUC.evaluate(predictions)         auc     }     /***获得准确率***/     def getPredictAccuracy(predictions: DataFrame,label:String,predict:String):Double={          val evaluatorAccuracy = new MulticlassClassificationEvaluator()                        .setLabelCol(label)                        .setPredictionCol(predict)                        .setMetricName("accuracy" )         val accuracy = evaluatorAccuracy.evaluate(predictions)         accuracy     }     /**获得f1指标***/     def getPredictF1(predictions: DataFrame,label:String,predict:String):Double={          val evaluator = new MulticlassClassificationEvaluator()                        .setLabelCol(label)                        .setPredictionCol(predict)                        .setMetricName("f1" )          val f1 = evaluator.evaluate(predictions)          f1     }     /**获得召回率指标***/     def getPredictRecall(predictions: DataFrame,label:String,predict:String):Double={          val evaluator = new MulticlassClassificationEvaluator()                        .setLabelCol(label)                        .setPredictionCol(predict)                        .setMetricName("weightedRecall" )          val recall = evaluator.evaluate(predictions)          recall     }     /**获得精确率指标***/     def getPredictPrecision(predictions: DataFrame,label:String,predict:String):Double={          val evaluator = new MulticlassClassificationEvaluator()                        .setLabelCol(label)                        .setPredictionCol(predict)                        .setMetricName("weightedPrecision" )          val pre = evaluator.evaluate(predictions)          pre     }     /**获取混淆矩阵**/     def getPredictConfusionMatrix(predictions: DataFrame,label:String,predict:String):Unit={          val schema = predictions.schema          val predictionAndLabels = predictions.select(col(predict), col(label).cast(DoubleType)).rdd.map {               case  Row(rawPrediction: Vector, label: Double) => (rawPrediction(1), label)               case  Row(rawPrediction: Double, label: Double) => (rawPrediction, label)           }          val metrics = new MulticlassMetrics(predictionAndLabels)          println("Confusion matrix:" )          println(metrics.confusionMatrix)     } }