val env = StreamExecutionEnvironment.getExecutionEnvironment
//设置job失败重启策略 env.setRestartStrategy(RestartStrategies.failureRateRestart( 3, // max failures per unit Time.of(5, TimeUnit.MINUTES), //time interval for measuring failure rate Time.of(10, TimeUnit.SECONDS) // delay ))
val data = readFromRabbitMQDataStream(env,Config("rmq.host"),Config("rmq.port"), Config("rmq.user"), Config("rmq.pass"),Config("rmq.queueName")) ....... env.execute("read from rabbitMQ") } }