站浏览量 站访问人数
目录
  1. 1. RabbitMQ
  2. 2. Flink消费rabbitMQ消息
  3. 3. 消息处理
  4. 4. 使用例子

讲述Flink消费RabbitMQ消息,并进行处理。

RabbitMQ

RabbitMQ另一种消息发送系统,与kafka类似,但有差别。kafka的消息可以有保存期限,rabbitMQ的消息一般在消费后就焚毁。另外,rabbitMQ的消息有一个唯一id,而kafka则是靠offset来标记数据。

Flink消费rabbitMQ消息

与读取kafka的topic相同,也需要配置一些数据,但是,更严,需要密码和账号。

1
2
3
4
def readFromRabbitMQDataStream(env:StreamExecutionEnvironment,host:String,port:String,user:String,pass:String,queueName:String):DataStream[String]= {
val rmqConnectionConfig=new RMQConnectionConfig.Builder().setHost(host).setPort(port.toInt).setUserName(user).setPassword(pass).setVirtualHost("/").build()
env.addSource(new RMQSource(rmqConnectionConfig,queueName,new SimpleStringSchema))
}

rabbitMQ的消息识别用队列(kafka是topic),同样可以设置checkpoint和消费模式等。

消息处理

与kafka一致,消息一般是json格式,保证数据的完整。

使用例子

使用如下程序调用读取函数,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
object readMessage {

def main(args:Array[String]):Unit={

val env = StreamExecutionEnvironment.getExecutionEnvironment

//设置job失败重启策略
env.setRestartStrategy(RestartStrategies.failureRateRestart(
3, // max failures per unit
Time.of(5, TimeUnit.MINUTES), //time interval for measuring failure rate
Time.of(10, TimeUnit.SECONDS) // delay
))

val data = readFromRabbitMQDataStream(env,Config("rmq.host"),Config("rmq.port"), Config("rmq.user"), Config("rmq.pass"),Config("rmq.queueName"))
.......
env.execute("read from rabbitMQ")
}
}