站浏览量 站访问人数
目录
  1. 1. 编辑一个简单的Flink程序
  2. 2. 打包执行
  3. 3. 提交包运行

讲述把一个Flink的job怎么打包提交到集群上运行,当然也可以在Eclipse本地运行。Flink本地运行不用像Spark那样需要设置本地为master,即setMaster(“local”),可以直接点击Run。

编辑一个简单的Flink程序

当sbt eclipse运行成功后,用eclipse导入项目,在eclipse里看到生成的子项目。在这些字项目中我们可以随意添加程序,即想操作的job。

比如我们在example的子项目中,添加一个统计单词次数的job,这时的操作步骤:

  • 新建一个包,
  • 新建一个scala Object
  • 编辑程序

比如在example下新建test包,然后新建test1.scala的Scala Object。

1
2
3
4
5
6
7
package test

object test1 {
def main(args:Array[String]):Unit={
println("in flink")
}
}

在项目构建完成后,building workspace,然后选择run scala application,一般用scala语言编辑Flink的job,
当然也可以用java语言编写。这时会出现输出的字样。

打包执行

本地执行Flink的程序,一是测试用或者练习用。Flink跑job一般是数据量大,需要分布式跑;而是流式项目job,我们可以通过本地socket模拟。

当我们在本地测试程序没有错误,能够得到正确要想的结果时,这时就可以将程序进行打包,提交到集群执行。打包的命令如下,

1
sbt 子项目名/assembly

一般在实践中,不会针对某个子项目来进行打包,而是为了通用化,会结合git来进行打包,达到线上和线下代码分离,因此,线下打包可以沿用上述命令。线上则需进行脚本的控制。

具体的控制有两个方面:

1,打包时需要拉取最新的代码,与git交互;

2,打包时带的一些参数,为了隐私和安全,一般在代码中会加以编码,打包时需要进行读取解析,这个后续会单独讲;

3,打包成功后,Flink的包是存储在子项目的target/scala-2.11目录下,名字会生产,需要进行重命名,以便识别;

线下打包的脚本配置
assembly.sh

1
2
3
4
5
6
7
8
9
10
11

#!/bin/bash

MODULE_NAME="$1"

install -D 配置文件移动 项目的配置文件存放

cd 项目的目录
sbt $MODULE_NAME/assembly

配置文件的重新移动,保证隐私

这里所说的配置文件,与Flink的job打包没有任何关系,而是有些job会涉及到一些重要参数,额外采取文件存放,打包时在程序里进行解析,打包后进行还原,达到保护重要参数的目的。如果,不注重参数保护,可以在参数中直接写明。建议把重要的配置参数进行单独文件存放,这需要自定义一个参数获取的函数,后续内容将会放出来。

提交包运行

当得到打包好的job时,一个jar文件,这时可以把它提交到集群或者YARN集群运行。如下命令:

1
2
3
4
5
6
7
8
9
/****/flink-1.1.1/bin/flink run \
-m yarn-cluster \
-yn 1 \
-ys 2 \
-yjm 4096 \
-ytm 4096 \
--class test.test1 \
--classpath file:///***.jar \
*****.jar

单独运行一个job的命令行。足够了,不建议开启一个Flink会话,然后提交job,一个Flink的job,一次提交比较安全。指定job manager和task manager,以及各自的内存大小即可。所带包可以带,建议不用带,打包时带上。

所带的参数详见Flink的官方文档.

最后,可以在集群的web页面查看Flink的job运行情况,具体看运行集群的web的设置,当然,Flink的配置也是可以的。最后可参考文章.