val env = ExecutionEnvironment.getExecutionEnvironment val conf = new Configuration() conf.setString("mykey", "myvalue") env.getConfig.setGlobalJobParameters(conf)
data.map(new RichMapFunction[String, String]() { var broadcastSet: Traversable[String] = null
override def open(config: Configuration): Unit = { // 3. Access the broadcasted DataSet as a Collection broadcastSet = getRuntimeContext().getBroadcastVariable[String]("broadcastSetName").asScala }
val env = ExecutionEnvironment.getExecutionEnvironment //源数据存储的格式 id\tword1空格word2空格word3 val file = "key_word_test.txt" val mails = env.readTextFile(file).map{one => val values = one.split("\t") val id = values.apply(0) val words = values.apply(1).split(" ") (id,words) }.name("docs")
//输入的是一篇文档 //(文档id,词在文档中的id,词在文档中出现的次数) def computeTF(data:(String, Array[String])):List[(String, String, Int)]={ val id = data._1 val result = data._2.groupBy { w => w}.map{ e => (id,e._1,e._2.length) } result.toList }
override def map(from:(String, Array[String])):(String,Array[(String,Double)])={ val docWordCount = from._2.groupBy {w => w}.map{ e =>(e._1,e._2.length)} val total = toDataBroad.length.toDouble
val docId = from._1 val words = from._2.toSet val res = toDataBroad.filter(_._1 != docId) val returnData = words.map { word => var countIDF = 1.0 res.foreach{otherDoc=> if (otherDoc._2.toSet.contains(word)){ countIDF = countIDF + 1.0 } } val tdidf = docWordCount.get(word).get*Math.log((total+1.0)/(countIDF+1.0)) (word,tdidf) } (docId,returnData.toArray) } }).withBroadcastSet(dataBroad, "toData") result }
val fromId = from._1 val size = from._2 val fromIndexs = from._3.map(_._1) val fromDatas = from._3.map(_._2) val bdv1 = new BSV[Double](fromIndexs,fromDatas,size) val res = toDataBroad.filter(_._1!=from._1) .map{ to => val toIndexs = to._3.map(_._1) val toDatas = to._3.map(_._2) val bdv2 = new BSV[Double](toIndexs,toDatas,size) val cosSim = getCosSimValue(bdv1.dot(bdv2)/(norm(bdv1)*norm(bdv2))) (to._1, cosSim) }.sortWith(_._2>_._2).take(firstN).toArray (from._1,res) }
override def map(from:(String, Array[Double])):(String,Array[(String,Double)])={ //val bdv1 = new BDV(from._2) val bdv1 = BSV.apply(from._2) val res = toDataBroad.filter(_._1!=from._1) .map{ to => //val bdv2 = new BDV(to._2) val bdv2 = BSV.apply(to._2) val cosSim = getCosSimValue(bdv1.dot(bdv2)/(norm(bdv1)*norm(bdv2))) (to._1, cosSim) }.sortWith(_._2>_._2).take(firstN).toArray (from._1,res) } }).withBroadcastSet(dataBroad, "toData") result }
val env = ExecutionEnvironment.getExecutionEnvironment val file = args(0) val data = env.readTextFile(file).filter(!_.startsWith("#")).map { x => val values = x.split("\t") (values.apply(0),values.apply(1)) }.name("origin data")
val dataSeg = docSegFilterPunctuation(env, data).name("seg data") val tfidf = computeTFIDFOneByOne(dataSeg).name("doc tfidf") val tfidfvec = computeTFIDFVector(tfidf).name("doc tfidf vec") val tfidfSim = tfidfCosDistance(tfidfvec,20).name("doc cos sim ") val tfidfRes = tfidfSim.map(HBaseWriter.stringProcess(_))