站浏览量 站访问人数
目录
  1. 1. 批处理
  2. 2. Rich 函数
    1. 2.1. 传递参数给函数
    2. 2.2. 全局变量
  3. 3. 广播
  4. 4. TF-IDF计算
    1. 4.1. Flink实现计算TF-IDF
  5. 5. TF-IDF向量相似度计算
  6. 6. 完整的计算tf-idf
  7. 7. 最后的话

本文讲述在Flink下怎么实现批处理操作,以计算文档的TF-IDF向量为例子。

批处理

Flink下批处理用的数据类型是DataSet,按照一条一条记录存储,可以在多个分区上。Flink操作算子分为不打乱数据的分区和打乱数据分区两种;像map,flatmap这样单条执行的不打乱原有数据分区;而像groupBy,join等,会出现数据的新分区。

Rich 函数

Flink下可拓展的函数,并且可以与环境相互操作。除自定义的函数外(如 map、reduce 等),富函数还提供了四种函数:open, close, getRuntimeContext,以及setRuntimeContext。这些对于参数化函数,创建和销毁本地状态,访问广播变量,以及访问运行时信息如累加器和计数器(参考 Accumulators 和 Counters),还有迭代信都是有很大帮助的。

富函数同样可以被定义成匿名类:

data.map (new RichMapFunction[String, Int] {
def map(in: String):Int = { in.toInt }
})

传递参数给函数

传递参数给函数可以使用构造函数或withParameters(Configuration) 。 这些参数被序列化作为函数对象的部分并被发送到每个task实例,(每个task都会被接受到)。

构造函数,即构造基本的操作算子,map,filter,flatmap,(因为算子是每个task都能执行的,参数也就带上)

1
2
3
4
5
6
7
val toFilter = env.fromElements(1, 2, 3)
toFilter.filter(new MyFilter(2))//2是传递的参数,在每个task中都能接受到,
class MyFilter(limit: Int) extends FilterFunction[Int] {
override def filter(value: Int): Boolean = {
value > limit
}
}

withParameters需要自定义,并且在richfunction中使用,离不开configuration对象,它是一个map, key类型是string,value是其他类型。

1
2
3
4
5
6
7
8
9
10
11
12
val toFilter = env.fromElements(1, 2, 3)
val c = new Configuration() //以Configuration作为媒介,传递参数
c.setInteger("limit", 2)
toFilter.filter(new RichFilterFunction[Int]() { //注意,需要是richfunction
var limit = 0
override def open(config: Configuration): Unit = {
limit = config.getInteger("limit", 0) //把参数读取出来,open只会执行一次,即在每个task中执行一次,
}
def filter(in: Int): Boolean = {
in > limit
}
}).withParameters(c) //withParameter方法,

全局变量

flink 同样允许传递实现ExecutionConfig接口的自定义对象到环境中。 这个执行config在所有的rich 用户函数中都是可以访问的, 它是所有函数都可以获取。

1
2
3
4
val env = ExecutionEnvironment.getExecutionEnvironment
val conf = new Configuration()
conf.setString("mykey", "myvalue")
env.getConfig.setGlobalJobParameters(conf)

使用richfunction时,open或者方法中的参数值,必须是能够序列化的,如果不能序列化,则需要进行操作,

广播

广播数据集在Spark下也存在,分布式计算下,需要把一些数据分发到所有的计算节点下运行。在Flink下,广播DataSet实际上把数据当成collect广播到节点上。因此,广播的数据不能太大,会造成广播时间太长,后续处理的算子等待导致背压;另外,广播会占用网络传输资源。

广播的两个阶段:

Broadcast: 广播set通过withBroadcastSet(DataSet, String)来注册,
Access: 在一个函数内通过getRuntimeContext().getBroadcastVariable(String)来访问,

一个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
val toBroadcast = env.fromElements(1, 2, 3)

val data = env.fromElements("a", "b")

data.map(new RichMapFunction[String, String]() {
var broadcastSet: Traversable[String] = null

override def open(config: Configuration): Unit = {
// 3. Access the broadcasted DataSet as a Collection
broadcastSet = getRuntimeContext().getBroadcastVariable[String]("broadcastSetName").asScala
}

def map(in: String): String = {
...
}
}).withBroadcastSet(toBroadcast, "broadcastSetName") // 2. Broadcast the DataSet

TF-IDF计算

文本分析中不可或缺的一个特征即TF-IDF。计算方式有多种,这里采取的是标准的计算方式:

TF:文档的词频,tf[word,doc],即一个词word在该文档doc中出现的次数;
IDF:逆文档频率,即总的训练集中包含该词的文档数与总文档的比值,一般是log[(total+1)/(countDoc+1)];

代表的含义:

TF:如果某个词或短语在一篇文章中出现的频率TF高,并在其他文章中很少出现,则认为此词具有很好的区分能力。

TF-IDF更倾向过滤掉常见的词,留下重要的词。

Flink实现计算TF-IDF

在Flink下实现计算TF-IDF,用到的是批处理,是从静态的数据集中计算。分成三个方法进行。

读取数据集,并要做分词处理,分词后续会讲解,

1
2
3
4
5
6
7
8
9
val env = ExecutionEnvironment.getExecutionEnvironment
//源数据存储的格式 id\tword1空格word2空格word3
val file = "key_word_test.txt"
val mails = env.readTextFile(file).map{one =>
val values = one.split("\t")
val id = values.apply(0)
val words = values.apply(1).split(" ")
(id,words)
}.name("docs")

计算TF,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
//输入的是多篇文档
//(文档id,词在文档中的id,词在文档中出现的次数)
def computeTF(data:DataSet[(String, Array[String])]):DataSet[(String, String, Int)]={
data.flatMap{ m=>
m._2.groupBy { w => w}.map{e=>(m._1,e._1,e._2.length)}
}
}

//输入的是一篇文档
//(文档id,词在文档中的id,词在文档中出现的次数)
def computeTF(data:(String, Array[String])):List[(String, String, Int)]={
val id = data._1
val result = data._2.groupBy { w => w}.map{ e =>
(id,e._1,e._2.length)
}
result.toList
}

计算IDF,

1
2
3
4
5
6
7
8
9
//输入的是多篇文档
//1,每篇文档中词进行去重(是单独一篇文档,不是全部文档)
//2,计算词在整个集中出现的次数(即单个词出现的文档数量)
def computeIDF(data:DataSet[(String, Array[String])]):DataSet[(String, Int)]={
data.flatMap{m=>m._2.toSet}
.map { m => (m,1) }
.groupBy(0)
.sum(1)
}

计算TF-IDF,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
  /**
计算文档的TF-IDF,返回文档id,词,词的tf-idf值;[tf和idf逐条计算]
**/
def computeTFIDFOneByOne(data:DataSet[(String, Array[String])]):DataSet[(String,Array[(String,Double)])]={

import scala.collection.JavaConverters._
import breeze.linalg._

val dataBroad = data.map{x=>x}.name("broad cast")

val result = data.map(new RichMapFunction[(String, Array[String]),(String,Array[(String,Double)])](){

var toDataBroad:scala.collection.mutable.Buffer[(String, Array[String])]= null

override def open(config: Configuration): Unit = {
toDataBroad = getRuntimeContext().getBroadcastVariable[(String, Array[String])]("toData").asScala
}

override def map(from:(String, Array[String])):(String,Array[(String,Double)])={
val docWordCount = from._2.groupBy {w => w}.map{ e =>(e._1,e._2.length)}
val total = toDataBroad.length.toDouble

val docId = from._1
val words = from._2.toSet
val res = toDataBroad.filter(_._1 != docId)
val returnData = words.map { word =>
var countIDF = 1.0
res.foreach{otherDoc=>
if (otherDoc._2.toSet.contains(word)){
countIDF = countIDF + 1.0
}
}
val tdidf = docWordCount.get(word).get*Math.log((total+1.0)/(countIDF+1.0))
(word,tdidf)
}
(docId,returnData.toArray)
}
}).withBroadcastSet(dataBroad, "toData")
result
}

计算TF-IDF向量,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def computeTFIDFVector(tfidfs:DataSet[(String,Array[(String,Double)])]):DataSet[(String,Int,Array[(Int, Double)])]={
val allWords = computeWordsDictionary(tfidfs)

val TFIDF_MODEL = tfidfs.map(new RichMapFunction[(String,Array[(String,Double)]),(String,Int,Array[(Int, Double)])](){
import scala.collection.JavaConverters._
var allWordsBroad:scala.collection.mutable.Buffer[String]= null

override def open(config: Configuration): Unit = {
allWordsBroad = getRuntimeContext().getBroadcastVariable[String]("allwords").asScala
}

override def map(value:(String,Array[(String,Double)])):(String,Int,Array[(Int, Double)])={
val docId = value._1
val idvalueMap = value._2.map{ x=>
val i = allWordsBroad.indexOf(x._1)
(i,x._2)
}.sortBy(_._1)
(docId,allWordsBroad.size,idvalueMap.toArray)
}

}).withBroadcastSet(allWords, "allwords")

TFIDF_MODEL
}

TF-IDF向量相似度计算

从上述,计算TF-IDF向量时,返回的类型是(String,Int,Array[(Int, Double)]),并非是一个向量。因此,在使用时,需要做一些处理。下面,采取的是稀疏型向量和密集型向量的计算。能用稀疏就用稀疏,稀疏的时间比密集的快2倍左右。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95

import org.apache.flink.api.scala._
import breeze.linalg.{DenseVector => BDV,SparseVector =>BSV}
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.configuration.Configuration
import basecommon.JsonUtils

/**
计算稀疏向量间的迪卡尔积,取排名前n个记录
**/
def tfidfCosDistance(data:DataSet[(String,Int,Array[(Int,Double)])],firstN:Int):DataSet[(String,Array[(String,Double)])]={
import scala.collection.JavaConverters._
import breeze.linalg._

val dataBroad = data.map(x=>x).name("broad cast")

val result = data.map(new RichMapFunction[(String,Int,Array[(Int,Double)]),(String,Array[(String,Double)])](){

var toDataBroad:scala.collection.mutable.Buffer[(String, Int,Array[(Int,Double)])]= null

override def open(config: Configuration): Unit = {
toDataBroad = getRuntimeContext().getBroadcastVariable[(String, Int,Array[(Int,Double)])]("toData").asScala
}

override def map(from:(String,Int,Array[(Int,Double)])):(String,Array[(String,Double)])={

val fromId = from._1
val size = from._2
val fromIndexs = from._3.map(_._1)
val fromDatas = from._3.map(_._2)
val bdv1 = new BSV[Double](fromIndexs,fromDatas,size)
val res = toDataBroad.filter(_._1!=from._1)
.map{ to =>
val toIndexs = to._3.map(_._1)
val toDatas = to._3.map(_._2)
val bdv2 = new BSV[Double](toIndexs,toDatas,size)
val cosSim = getCosSimValue(bdv1.dot(bdv2)/(norm(bdv1)*norm(bdv2)))
(to._1, cosSim)
}.sortWith(_._2>_._2).take(firstN).toArray
(from._1,res)
}

}).withBroadcastSet(dataBroad, "toData")

result
}

/**
计算密集向量间的迪卡尔积,取排名前n个记录
**/
def word2vecCosDistance(data:DataSet[(String,Array[Double])],firstN:Int):DataSet[(String,Array[(String,Double)])]={
import scala.collection.JavaConverters._
import breeze.linalg._

val dataBroad = data.map(x=>x).name("broad cast")

val result = data.map(new RichMapFunction[(String, Array[Double]),(String,Array[(String,Double)])](){

var toDataBroad:scala.collection.mutable.Buffer[(String, Array[Double])]= null

override def open(config: Configuration): Unit = {
toDataBroad = getRuntimeContext().getBroadcastVariable[(String, Array[Double])]("toData").asScala
}

override def map(from:(String, Array[Double])):(String,Array[(String,Double)])={
//val bdv1 = new BDV(from._2)
val bdv1 = BSV.apply(from._2)
val res = toDataBroad.filter(_._1!=from._1)
.map{ to =>
//val bdv2 = new BDV(to._2)
val bdv2 = BSV.apply(to._2)
val cosSim = getCosSimValue(bdv1.dot(bdv2)/(norm(bdv1)*norm(bdv2)))
(to._1, cosSim)
}.sortWith(_._2>_._2).take(firstN).toArray
(from._1,res)
}
}).withBroadcastSet(dataBroad, "toData")
result
}

/***
余弦值的范围限定
**/
def getCosSimValue(v:Double):Double={
if (v.isInfinity){
0.0
}else if (v>1) {
1.0
}else if (v.isNaN()){
0.0
}
else {
v
}
}

完整的计算tf-idf

根据上述的方法分解,当有一个训练集时(提前分好词),计算文档的TF-IDF相似度,就可以用如下方式来计算,并保存,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

def main(args:Array[String]):Unit={

val env = ExecutionEnvironment.getExecutionEnvironment
val file = args(0)
val data = env.readTextFile(file).filter(!_.startsWith("#")).map { x =>
val values = x.split("\t")
(values.apply(0),values.apply(1))
}.name("origin data")

val dataSeg = docSegFilterPunctuation(env, data).name("seg data")
val tfidf = computeTFIDFOneByOne(dataSeg).name("doc tfidf")
val tfidfvec = computeTFIDFVector(tfidf).name("doc tfidf vec")
val tfidfSim = tfidfCosDistance(tfidfvec,20).name("doc cos sim ")
val tfidfRes = tfidfSim.map(HBaseWriter.stringProcess(_))

tfidfRes.writeAsText(args(1))

env.execute()

分词将在后续讲解。

最后的话

Flink批处理实践中众多公司的结果是比Spark快,但是在计算TF-IDF下,Flink与Spark相差不大,或许是因为Flink是用自己写的方式算,而Spark下有自带的方法,因此,上述在Flink计算TF-IDF的函数有优化的空间,欢迎优化并实验对比Spark下的。