站浏览量 站访问人数
目录
  1. 1. Flink引入开源包
  2. 2. 使用hanlp的分词
    1. 2.1. 自定义词添加
    2. 2.2. 分词
    3. 2.3. 词性过滤
  3. 3. 最后的话

本文讲述在Flink下使用hanlp包,一个文本分析开源工具,主要是分词,语法分词,依存分词,命名实体识别。本文将只讲分词的应用,其他应用可参考官网文档,自行实现。

Flink引入开源包

Flink项目引入开源的方式,可以在本地项目中导入;或者在构建项目时把第三方包带上。这里以在sbt构建的项目中,引入第三方开源包hanlp为例,更多参考可见搭建项目

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
在sbt构建的子项目中,加入想要引入的第三方包,
lazy val flinkseg = project.
settings(commonSettings: _*).
settings(flinkSettings: _*).
settings(
libraryDependencies ++= Seq(
"com.hankcs" % "hanlp" % "portable-1.2.8",
"net.databinder.dispatch" %% "dispatch-core" % "0.11.2"
)).
settings(
assemblyMergeStrategy in assembly <<= (assemblyMergeStrategy in assembly) {
(old) => {
case PathList("META-INF", xs @ _*) => MergeStrategy.discard
case x => MergeStrategy.first
}
}
).
settings(runMain in Compile <<= Defaults.runMainTask(fullClasspath in Compile, runner in(Compile, run)))

使用hanlp的分词

在hanlp中,包含了多种分词方法,以下只是使用其中之一,最大熵模型分词,此外有网络的,隐马尔可夫的。

自定义词添加

通过文件形式,把新词保存进,然后在程序启动时加以读取,以便在分词时可以识别出新词。用到了CustomDictionary方法,可以添加词以及对应的词性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

def reReadDefineDictionarysFromFile(env:ExecutionEnvironment,stopWordPath:String,newWordPath:String):Unit={
//CustomDictionary,
val mustSeg = env.readTextFile(newWordPath).collect()
.map { wordAndTag =>
val word = wordAndTag.split("\t").apply(0)
val tag = wordAndTag.split("\t").apply(1)
CustomDictionary.insert(word, tag)
//println(word)
//CustomDictionary.add(word)
}
//mustSeg.count() //executor
//CoreStopWordDictionary
//word \t tag
val mustSop = env.readTextFile(stopWordPath).collect()
.map{ wordAndTag =>
val word = wordAndTag.split("\t").apply(0)
CoreStopWordDictionary.add(word)
}
//mustSop.count() //executor

}

def reReadDefineDictionarysBy():Unit={
//CustomDictionary
CustomDictionary.insert("测试", "vg 1024")
//CoreStopWordDictionary
CoreStopWordDictionary.add("是否")//停用词,
}

分词

分词使用的是hanlp默认的分词方法,HanLP.newSegment(),可以对一个串分词,或者对数据集进行分词。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

/**单个串分词**/
def segFromString(str:String):List[Term]={
val segment = HanLP.newSegment()//.enableOrganizationRecognize(true)
val termList = segment.seg(str)
termList.asScala.toList
}

/**文档集分词**/
def segFromString(data:DataSet[(String,String)]):DataSet[(String,List[Term])]={
val res = data.map{ x=>
val segment = HanLP.newSegment()
val termList = segment.seg(x._2).asScala.toList
(x._1,termList)
}
res
}

词性过滤

当分词完毕后,需要对词性进行挑选,即挑选有用的词性或者自定义的词性的词。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38

不是标点符号
if(!'w'.equals(tagFirst))


//留下实体词性,即认为代表有意义的词性
//n-名词,g-学术词,j-简略语,i-成语,l-习用语,t-时间,s-处所,f-方位,v-动词,
//a-形容词,b区别词,z状态词,d副词,c连词,




ns 地名
nt 机构团体名
nh 医药疾病等健康相关名词
nn 工作相关名词
nz 其他专名
g 学术词汇
v 动词
a 形容词
j 简称略语
i 成语
l 习用语


n 名词
g 学术词汇
t 时间
s 处所
f 方位
b 区别词
z 状态词
v 动词
p 介词
a 形容词
j 简称略语
i 成语
l 习用语

最后的话

hanlp是一个java开源库,可以自己写,自己打包上线使用;一个很好试用于实践的库,如果其他应用参考可分享。hanlp在Spark下亦可同样使用,因为都是scala编写的。但是,需注意,不论是Flink还是Spark,在其算子操作下,目前只能是在算子里定义实例来分词,是由于源库中的函数没有使用序列化,对这方面性能或者想深究的可以把原库相关方法加上序列化,再打包使用,之前实践过一次是可行。