站浏览量 站访问人数
目录
  1. 1. flink 下datastrema的window操作
    1. 1.1. flink下的windows的time类型,
  2. 2. flink下的窗口定义,
  3. 3. WindowedStream
  4. 4. flink运行的参数意义,
  5. 5. flink架构重新学习
    1. 5.1. 并行是怎么操作的?
  6. 6. 分布式,yarn,集群
  7. 7. 非常好的解读flink下的函数和参数
  8. 8. 参数传递,在构造函数中使用
  9. 9. 使用withParameters(Configuration)
  10. 10. 全局参数
  11. 11. richfunction的使用
  12. 12. state的使用
  13. 13. flink 下的向量,矩阵计算
  14. 14. flink 读取csv文件的参数含义,
  15. 15. 遍历目录下的所有文件,

讲述flink在实践中的一些经验,对入门级的有一定的认识;
flink的窗口,提交运行的命令含义,参数传递,函数重新编写,向量计算,读取csv文件等;
主要是一些实践的笔记,比较粗略,后续再考虑逐步细写下,分成更多的小点。

首先从统计单词出发,这个统计是实时接收socket的消息来统计的,需要一个窗口,这里设定为每隔5秒统计一次;

代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class WindowWordCount {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<String, Integer>> dataStream = env
.socketTextStream("localhost", 9999)
.flatMap(new Splitter())
.keyBy(0)
.timeWindow(Time.of(5, TimeUnit.SECONDS))
.sum(1);

dataStream.print();
env.execute("Window WordCount");
}

public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
for (String word: sentence.split(" ")) {
out.collect(new Tuple2<String, Integer>(word, 1));
}
}
}

}

new Splitter()和Splitter implements FlatMapFunction<String, Tuple2<String, Integer>>

可以参考这个文档

flink下的windows的time类型,

import org.apache.flink.streaming.api.TimeCharacteristic

  • processing time,当执行转换算子的时间,分布式和异步下不能满足目的。

    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
    
  • event time,事件时间,独立事件的时间,通常是进入到flink前就有了,嵌入在记录里或者内容里

    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    时间顺序,无序也能正确处理(需要缓存),有延迟,
    
  • Ingestion time,进入到flink的时间,介于前两者之间,
    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
    

watermark 时间水印,由源发出,没有时间戳的时间

对于事件时间event time,需如下步骤

1
2
3
4
5
Set env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
Use DataStream.assignTimestamps(...) in order to tell Flink how timestamps relate to events
(e.g., which record field is the timestamp),这是记录中包含时间戳的例子,
Set enableTimestamps(), as well the interval for watermark emission (
setAutoWatermarkInterval(long milliseconds)) in ExecutionConfig.

flink下的窗口定义,

窗口中的元素实际存储在 Key/Value State 中,key为Window,value为元素集合(或聚合值)
当一个元素进入流时,
1,根据window决定去哪个窗口;
窗口只是一个标示,不存储数据,也会存时间。
2,每个窗口有一个trigger,决定窗口的生存时间(该窗口计算和清除时间);
trigger返回continue(等待),fire(处理窗口数据),purge(清除窗口),
如果是fire,则窗口保留,数据一直存在,会被计算多次,直到purge。
3,如果窗口被fire(处理),窗口中的元素指定给evictor,
Evictor 主要用来遍历窗口中的元素列表,并决定最先进入窗口的多少个元素需要被移除。剩余的元素会交给用户指定的函数进行窗口的计算
4,计算函数,经过(window,trigger,evictor),接收到窗口元素,计算其中元素,返回给下游,
计算值可返回多个或一个,计算函数有max,min,基于WindowFunction实现的。。
(注意,evictor需要对窗口内所有元素保留,除了聚合sum,min只需保留最后值,)

WindowedStream

其中 T 数据流中元素的数据类型
K 是元素的key的类型
W是窗口类型
R 初始值(fold时用到)

1
2
3
4
5
6
7
8
9
10
11
apply (6个apply方法)  (k : Long, w : TimeWindow, T: Iterable[(Long, Long, Long)], out : Collector[(Long,String,String,Double)])

apply(WindowFunction<T, R, K, W> function)

apply(WindowFunction<T, R, K, W> function, TypeInformation<R> resultType)

apply(ReduceFunction<T> reduceFunction, WindowFunction<T, R, K, W> function)
apply(ReduceFunction<T> reduceFunction, WindowFunction<T, R, K, W> function, TypeInformation<R> resultType)

apply(R initialValue, FoldFunction<T, R> foldFunction, WindowFunction<R, R, K, W> function)
apply(R initialValue, FoldFunction<T, R> foldFunction, WindowFunction<R, R, K, W> function, TypeInformation<R> resultType)

Blink 阿里

you can rely on the RichFunction’s open() method’s to load such data directly from a distributed file system.
DataStream没有DistributedCache,但是可以在RichFunction’s open() 方法中读取分布式存储的文件…

flink运行的参数意义,

目前用到:flink 提交yarn集群的参数

1
2
3
4
5
6
7
-yn,--yarncontainer <arg>            Number of YARN container to allocate(=Number of Task Managers)

-ys,--yarnslots <arg> Number of slots per TaskManager

-yjm,--yarnjobManagerMemory <arg> Memory for JobManager Container [inMB]

-ytm,--yarntaskManagerMemory <arg> Memory per TaskManager Container [inMB]

yn是taskmanager的个数,即worker的个数,ys是slot个数,即每个taskmanager由几个slot来计算,
yjm,
ytm,每一个taskmanager内存大小,

flink架构重新学习

flink的数据块是流,dataset也是流,特殊的流,开始一个或多个源,结束一个或多个sinks,一个转换有多个转换算子组成…

1
2
3
4
5
流的运算流程是,source->
transformation(flatmap,map,....算子operators)->
...->
transformation(flatmap,map,....算子operators)->
sink

并行度(一个task被split的并行实例个数),可以在不同层级上设定;
一个task可split多个并行实例,一个实例处理inputData的子集,(实例处理子集,有合并没?)

并行是怎么操作的?

分布式和并行下,数据是分区的,操作也变成操作subtask,(操作subtask是独立隔离的,不同线程,不同机器,不同containers)
操作subtask的个数是并行度,parallelism,由算子operators决定,一个程序会有多个并行,

1
2
3
-->    --->     --->
x y -->sink
--> ---> --->

流在两个operators(算子)之间可以使一对一前行,也可以是再分区前行,
相对的是Redistributing,即算子需要对数据进行重新规划,比如keyby,broadcast, rebalance

flink链,chains,把算子连在一起,起到优化作用,

分布式,yarn,集群

两个操作进程类型,master,JobManagers,协调分布式执行,checkpoint,故障恢复,
必须有一个,可以设置多个,一个是主,其他是备用(其中一个挂了,备用是不是可以当作主来重启运行???)
worker,TaskManagers,执行task或者执行subtasks,缓冲区,至少有一个,

The master and worker processes开始于集群,容器,或者yarn的资源管理器,

client客户端,不是运行时和程序的部分,准备发送流给master,然后断开或者保持连接等待信息反馈,

Each worker (TaskManager) is a JVM process, and may execute one or more subtasks in separate threads.

To control how many tasks a worker accepts, a worker has so called task slots (at least one).

slot是一个TaskManager可以运行几个task,一个taskTaskManager分配的内存是ytm,
每个slot是固定的资源子集,在大集合TaskManager中,也就是slot瓜分TaskManager的资源,而且是平分,
每个slot是不同thread执行.

Having one slot per TaskManager means each task group runs in a separate JVM,
Having multiple slots means more subtasks share the same JVM,share data sets and data structures..

共享slot,默认的配置,

迭代,大量的计算,大的分布式计算,做法是step-function和嵌入迭代算子(Iterate and Delta Iterate),

迭代例子

只用于dataset???

1,iterate operator,
1.1 Iteration Input: 初始化数据输入,来源前一个迭代或者前一个算子;
1.2 Step Function: 每一次迭代执行的函数. operators, map, reduce, join, …,
1.3 Next Partial Solution: 下一次迭代的数据,
1.4 Iteration Result: 最后一次迭代,数据输出到sink或者下一个算子,
终止条件,
1,最大迭代次数,自定义的,达到迭代次数,结束,输出结果;
2,自定义聚合和熟练准则,

1
2
3
4
5
6
7
8
val count = initial.iterate(10000) { iterationInput: DataSet[Int] =>
val result = iterationInput.map { i =>
val x = Math.random()
val y = Math.random()
i + (if (x * x + y * y < 1) 1 else 0)
}
result
}

2,Delta Iterate Operator,
每次迭代会选择部分数据计算,不是全量,适用于高级的算法,或者某些数据不是每次迭代对计算有用,只关注那些频繁和有用于计算的数据,舍弃无用,没计算的数据.
2.1 Iteration Input: 初始化数据输入,来源data source或者前一个算子;
2.2 Step Function: 每一次迭代执行的函数. operators, map, reduce, join, …,
2.3 Next Workset/Update Solution Set: 下一次迭代反馈workset,数据可通过不同算子更新 ,
2.4 Iteration Result: 最后一次迭代,数据输出到sink或者下一个算子,

终止条件,
1,最大迭代次数,自定义的,达到迭代次数,结束,输出结果;
2,下一次迭代workset收敛为0,也就是数据为空.(自定义聚合和收敛准则)

两个数据集,workset和solution,每次迭代得到newworkset和newsolution
To create a DeltaIteration call the iterateDelta(initialWorkset, maxIterations, key) on the initial solution set. The step function takes two parameters: (solutionSet, workset), and must return two values: (solutionSetDelta, newWorkset).

ParameterTool 整合参数

1
2
3
4
5
env.readCsvFile[Point](
params.get("points"),
fieldDelimiter = " ",
includedFields = Array(0, 1))
}

flink下的functions
single abstract method 单一的抽象方法,

1,coGroup(Iterable first, Iterable second, Collector out)

  set1.coGroup(set2).where(<key-definition>).equalTo(<key-definition>).with(new MyCoGroupFunction());

注意,first:java.lang.Iterable[Int],
    然后,计算时,采取用
import scala.collection.JavaConverters._
val scalaT2 = first.asScala.toList

2,CombineFunction<IN, OUT>
把元素组合并返回,

非常好的解读flink下的函数和参数

链接1
链接2

1,使用Rich functions

Rich functions provide, in addition to the user-defined function (map, reduce, etc), four methods: open, close, getRuntimeContext, and setRuntimeContext.

Rich functions的使用和普通的function是一样的,不同的就是,多4个接口函数,可以用于一些特殊的场景,比如给function传参,或访问broadcast变量,accumulators和counter,因为这些场景你需要先getRuntimeContext

2,Broadcast Variables
一个算子中在并行实例下可以访问到数据,广播data set的结构是collection,
withBroadcastSet(DataSet, String)广播一个dataset
getRuntimeContext().getBroadcastVariable(String) at the target operator.访问广播的data set;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 1. The DataSet to be broadcasted
DataSet<Integer> toBroadcast = env.fromElements(1, 2, 3);

DataSet<String> data = env.fromElements("a", "b");

data.map(new RichMapFunction<String, String>() {
@Override
public void open(Configuration parameters) throws Exception { //open方法会被启用一次,
// 3. Access the broadcasted DataSet as a Collection
Collection<Integer> broadcastSet = getRuntimeContext().getBroadcastVariable("broadcastSetName");
}


@Override //map调用的次数是data中的元素个数,
public String map(String value) throws Exception {
...
}
}).withBroadcastSet(toBroadcast, "broadcastSetName"); // 2. Broadcast the DataSet

广播数据集,就是有些不大的公共数据,是要被所有的实例访问到的,比如一些查询表,(不大的公共数据)
上面的例子,会将toBroadcast设置为广播变量broadcastSetName,这样在运行时,可以用getRuntimeContext().getBroadcastVariable获取该变量使用

参数传递,在构造函数中使用

1
2
3
4
5
6
7
8
9
10
11
12
13
private static class MyFilter implements FilterFunction<Integer> {

private final int limit; //参数

public MyFilter(int limit) {//构造函数
this.limit = limit;
}

@Override
public boolean filter(Integer value) throws Exception {
return value > limit;
}
}

使用withParameters(Configuration)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Configuration config = new Configuration();
config.setInteger("limit", 2);

toFilter.filter(new RichFilterFunction<Integer>() {
private int limit;

@Override
public void open(Configuration parameters) throws Exception {
limit = parameters.getInteger("limit", 0);
}

@Override
public boolean filter(Integer value) throws Exception {
return value > limit;
}
}).withParameters(config);

可以用withParameters将定义好的config传入function

然后用RichFunction的Open接口,将参数解析出来使用


全局参数

(全局参数只能用于参数形式,广播变量可以是任意dataset),但是全局参数,可用于更大的数据集,比如几百万的表数据,广播的话
花费就大.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Configuration conf = new Configuration();
conf.setString("mykey","myvalue");
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(conf);


public static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> {

private String mykey;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
ExecutionConfig.GlobalJobParameters globalParams = getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
Configuration globConf = (Configuration) globalParams;
mykey = globConf.getString("mykey", null);
}
// ... more here ...

richfunction的使用

Rich functions的使用和普通的function是一样的,不同的就是,多4个接口函数,可以用于一些特殊的场景,比如给function传参,或访问broadcast变量,accumulators和counter,因为这些场景你需要先getRuntimeContext,

four methods: open, close, getRuntimeContext, and setRuntimeContext.

state的使用

Key/Value state 只能用于KeyedStream,
3种不同类型的state,

ValueState,单值的state,可以通过update(T)或T value()来操作

ListState<T>, 多值的state,通过add(T)或Iterable<T> get()来操作和访问
  访问当前存储的元素,
ReducingState<T>,多值状态,但是只保留reduce的结果
    聚集的单个元素,

并且所有的state,都有clear,来清除状态数据

这些state对象只能被状态接口使用,
并且取出的状态对象,取决于input element的key;所以不同的调用user function 得到的state value是不一样的,因为element的key 可能不同

对于state,需要一个StateDescriptor ,作为name用于reference这个state,如果你定义多个state,他们的StateDescriptor 必须是unique的。
不同类型的state,有不同类型的StateDescriptor

ValueState getState(ValueStateDescriptor)
ReducingState getReducingState(ReducingStateDescriptor)
ListState getListState(ListStateDescriptor)

State对象通过RuntimeContext的接口获取到,当然不同类型的state,对应于不同的接口;
关键是,如果要使用state,必须要使用rich function,用普通的function是无法获取到的

http://blog.csdn.net/lmalds/article/details/52229827
ListState的使用
http://blog.csdn.net/codemosi/article/details/51602204
三种state的使用,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
引入包,

("org.apache.flink" %% "flink-ml" % flinkVersion).
exclude("org.apache.flink","flink-shaded-hadoop1_2.10"),

import org.apache.flink.ml.math.Vector
import org.apache.flink.ml.math.VectorBuilder
import org.apache.flink.ml.math.DenseVector

val vec1 = "12.3,45,55".split(",").map(_.toDouble)
val flinkVec1 = DenseVector.apply(vec1)
val vec2 = "12.3,45,55".split(",").map(_.toDouble)
val flinkVec2 = DenseVector.apply(vec2)

println(flinkVec1.dot(flinkVec2))

推特的

雅虎的

2015flink会议

使用 RichGroupReduceFunction 实现GroupCombineFunction interface,
在shuffle前combining,减少网络负载;
另外,reduce实现reduceGroup也可以提高性能,
结果再次使用.setCombineHint(CombineHint.HASH) after the reduce.

val csvInput = env.readCsvFile(String, Double)) // take the first and the fourth field

lineDelimiter: 行与行之间的分隔符,默认是’\n’.

fieldDelimiter: 每行数据间的分隔符,默认是 ‘,’.

includeFields: Array[Int] 需要过滤的字段,也就是每行分割所要留下的哪几列.

pojoFields: Array[String] 对象实例所要留下的字段.

parseQuotedStrings: Character enables quoted string parsing. Strings are parsed as quoted strings if the first character of the string field is the quote character (leading or tailing whitespaces are not trimmed). Field delimiters within quoted strings are ignored. Quoted string parsing fails if the last character of a quoted string field is not the quote character. If quoted string parsing is enabled and the first character of the field is not the quoting string, the string is parsed as unquoted string. By default, quoted string parsing is disabled.

ignoreComments: String specifies a comment prefix. All lines that start with the specified comment prefix are not parsed and ignored. By default, no lines are ignored.

lenient: Boolean enables lenient parsing, i.e., lines that cannot be correctly parsed are ignored. By default, lenient parsing is disabled and invalid lines raise an exception.

ignoreFirstLine: 是否忽略第一行,因为第一行有时是为了解释用的.

遍历目录下的所有文件,

1
2
3
4
5
6
7
8
9
10
11
// enable recursive enumeration of nested input files
val env = ExecutionEnvironment.getExecutionEnvironment

// create a configuration object
val parameters = new Configuration

// set the recursive enumeration parameter
parameters.setBoolean("recursive.file.enumeration", true) //将递归遍历目录设置为true

// pass the configuration to the data source
env.readTextFile("file:///path/with.nested/files").withParameters(parameters)