站浏览量 站访问人数
目录

1,dataset迪卡尔积计算,出现混乱,
首先,两个大的数据集不建议使用cross操作算子,不仅浪费资源,而且很满很慢.
然后,如果是从文件读取记录,一行一条记录,在broadcast下计算是正常的,flink的dataset是按照记录来计算的;然后,当这个dataset是从其他shuffle过程得到,比如join,造成混乱,再broadcast会造成数据紊乱,解决措施,设置并行度为1;或者再多次使用broadcast保证一条记录计算.

2,flink-1.2.0在提交yarn运行时,将habse的包在运行命令里带上,会出现Filter错误,这是因为版本问题(flink-1.1.1没有这个问题),habse的包需要在sbt里带上,即去除provided,这样就可以了.

3,flink下的背压问题,首先造成背压是因为上游数据太快产生,下游数据处理过慢,导致挤压反向.解决措施在上游和下游设置同样大小内存,即jobmanager和taskmanager内存设置一样(不晓得为啥,job的大,task的小就产生背压)

4,akka.client.timeout,可以在conf下配置,注意时间是空格带上s或者ms.但是,也不一定能解决,主要是构建job花时间,不要在job起始时使用一个费时间的操作,比如读取很大的数据集,后续却只用其中一部分;或者大数据集生成dataset,是很花时间的.

5,flink的包冲突或者包版本不对,首先找到该包定位属于那个,然后再唉sbt里进行exclude,但是,一般不晓得在哪个,一个一个去尝试(最笨但是最有效的方法啊)

6,flink下稀疏向量的计算比密集型的向量计算快一倍至多,各种结构或者算法,优先考虑稀疏的,

7,flink下的map等各种算子操作,其中带的方法必须是序列化的,否则会出错或者不能运行或者结果达不到预期,尤其是调用外部包,需确定是否实现了序列化(可以自己实现,在java包等下常见)

8,flink通过每条记录(doc–>array[String])形式计算tf-idf,非常缓慢,主要是在每条doc需要遍历array[String],一小时才计算2000左右;更改计算方法,先单独计算tf和idf,非常快捷;然后对idf进行broadcast,计算tf-idf,再计算tf-idf-vec;但是,在idf计算时已经计算了所有词,无需再次进行计算tf-idf-vec;更改措施是再加上broadcast,即两个广播数据集,idf和词典.

9,flink下的string不建议用+来连接,

10,flink在程序中设置job失败重启, env.getConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000))
重启策略

11,flink的流操作使用窗口,必须是KeyedStream,另外自定义窗口函数,用到apply,需重写,
//IN = keyStream的数据类型
//Out=操作后的类型
//key=keyStream的key分组的key类型
//w=窗口类型

简单的窗口例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

val lines = readFromKafkaDataStream(env, ZOOKEEPER_HOST, KAFKA_BROKER, TRANSACTION_GROUP, topic)

//默认给一个key,相同的key,是由于窗口必须对keyed类型操作,
val logs = lines.map{ line =>
sojParsers.parseLine(line)
}.filter(new MustCountFilter).map { x => (1,x) }

//设置时间类型,没有水位设置,使用处理时间。
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
val htKeyedLogs = logs.keyBy(_._1)
.timeWindow(Time.seconds(3))
.apply(new MyWindowFunction())

//自定义窗口函数
class MyWindowFunction extends WindowFunction[(Int, CountImp),String,Int,TimeWindow] {
override def apply(key: Int, window: TimeWindow, input: Iterable[(Int, CountImp)], out: Collector[String]):Unit={
val Sojs = input.map(_._2)

val rows = Sojs.map { log =>
......
}.toList
out.collect(rows.size.toString())
}
}

12, scala 的json处理包, “org.scalatestplus.play” %% “scalatestplus-play” % “2.0.0”