站浏览量 站访问人数
目录
  1. 1. kafka简介
  2. 2. Flink消费kafka
  3. 3. Flink读取kafka消息的配置
  4. 4. 消息格式的处理
  5. 5. 使用例子

讲述Flink读取kafka下topic的消息,并进行处理。

kafka简介

分布式消息系统Apache Kafka ,在实际生产中常作为消息传递的工具,其稳定,高并发,接受多个数据源,并可保存数据(自定义数据的周期),在消费端,可以做到相互独立,基于数据的处理工具,可保证数据的有序性。

kafka消费数据以组为单位,一个组里可有多个消费者或消费实例,共享groupId,组内所有消费者一起消费topic的所有分区.一个分区只能由同一个消费组的一个consumer来消费.

Flink消费kafka

Flink是流处理框架,合适处理流式数据,在产生数据端把消息写进kafka的topic下,即是流的源头,Flink可实现对数据的处理。在读取消息中,Flink可设置模式,仅一次,至多一次,至少一次。

为体现Flink的流处理优势,即窗口,可以对流执行多样的处理,Flink的窗口比Spark丰富,不是简单的提供批处理,可以实现更复杂的窗口处理,比如计数,多种时间窗口等。

Flink设置checkpoint后,kafka的offset会保存在zookeeper中,消费端不必保存offset,这样新的消费组进来,会从最新的offset处开始消费;但是在Flink-1.3版本下,可以在程序中指定offset,也可以指定该消费组是否从开始出重新消费。

Flink读取kafka消息的配置

读取kafka消息,必须提供kafka的配置信息,broker,zookeeper,groupid,topic等参数,为屏蔽重要参数,在程序中把它们写进配置文件,采取Config()来读取值。

另外,在Flink下可读取多个topic下消息。以下是两个读取例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def readFromKafkaDataStream(env:StreamExecutionEnvironment,zookeeper:String,broker:String,groupid:String,topic:String):DataStream[String]={
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.enableCheckpointing(5000)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)

val kafkaProps = new Properties()
kafkaProps.setProperty("zookeeper.connect", zookeeper)
kafkaProps.setProperty("bootstrap.servers", broker)
kafkaProps.setProperty("group.id", groupid)

env.addSource( new FlinkKafkaConsumer08[String](topic, new SimpleStringSchema(), kafkaProps))
}

def readFromKafkaDataStream(env:StreamExecutionEnvironment,zookeeper:String,broker:String,groupid:String,topic:List[String]):DataStream[String]={
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.enableCheckpointing(5000)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)

val kafkaProps = new Properties()
kafkaProps.setProperty("zookeeper.connect", zookeeper)
kafkaProps.setProperty("bootstrap.servers", broker)
kafkaProps.setProperty("group.id", groupid)

env.addSource( new FlinkKafkaConsumer08[String](topic.asJava, new SimpleStringSchema(), kafkaProps))
}

读取的程序,采用仅一次模式,并且设置checkpoint,数据的格式SimpleStringSchema化。

消息格式的处理

kafka消息一般采用json串,在处理时需要对其进行解析,采取的是json4s。实践中,同一种类的消息格式应统一,可能数据来源不同;消息的格式应保证数据完整,能用一个topic解决就用一个topic,不要为一个业务而采取多个topic的方案。

使用例子

使用如下程序调用读取函数,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
object readMessage {
private val ZOOKEEPER_HOST = Config("***")
private val KAFKA_BROKER = Config("***")

private val TRANSACTION_GROUP = "***"
private val topics = List("***","***")

def main(args:Array[String]):Unit={

val env = StreamExecutionEnvironment.getExecutionEnvironment

//设置job失败重启策略
env.setRestartStrategy(RestartStrategies.failureRateRestart(
3, // max failures per unit
Time.of(5, TimeUnit.MINUTES), //time interval for measuring failure rate
Time.of(10, TimeUnit.SECONDS) // delay
))

val opMysqlStream = readFromKafkaDataStream(env, ZOOKEEPER_HOST, KAFKA_BROKER, TRANSACTION_GROUP, topics)
.......
env.execute("read from kafka")
}
}