讲述Flink读取kafka下topic的消息,并进行处理。
kafka简介
分布式消息系统Apache Kafka ,在实际生产中常作为消息传递的工具,其稳定,高并发,接受多个数据源,并可保存数据(自定义数据的周期),在消费端,可以做到相互独立,基于数据的处理工具,可保证数据的有序性。
kafka消费数据以组为单位,一个组里可有多个消费者或消费实例,共享groupId,组内所有消费者一起消费topic的所有分区.一个分区只能由同一个消费组的一个consumer来消费.
Flink消费kafka
Flink是流处理框架,合适处理流式数据,在产生数据端把消息写进kafka的topic下,即是流的源头,Flink可实现对数据的处理。在读取消息中,Flink可设置模式,仅一次,至多一次,至少一次。
为体现Flink的流处理优势,即窗口,可以对流执行多样的处理,Flink的窗口比Spark丰富,不是简单的提供批处理,可以实现更复杂的窗口处理,比如计数,多种时间窗口等。
Flink设置checkpoint后,kafka的offset会保存在zookeeper中,消费端不必保存offset,这样新的消费组进来,会从最新的offset处开始消费;但是在Flink-1.3版本下,可以在程序中指定offset,也可以指定该消费组是否从开始出重新消费。
Flink读取kafka消息的配置
读取kafka消息,必须提供kafka的配置信息,broker,zookeeper,groupid,topic等参数,为屏蔽重要参数,在程序中把它们写进配置文件,采取Config()来读取值。
另外,在Flink下可读取多个topic下消息。以下是两个读取例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| def readFromKafkaDataStream(env:StreamExecutionEnvironment,zookeeper:String,broker:String,groupid:String,topic:String):DataStream[String]={ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.enableCheckpointing(5000) env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
val kafkaProps = new Properties() kafkaProps.setProperty("zookeeper.connect", zookeeper) kafkaProps.setProperty("bootstrap.servers", broker) kafkaProps.setProperty("group.id", groupid)
env.addSource( new FlinkKafkaConsumer08[String](topic, new SimpleStringSchema(), kafkaProps)) }
def readFromKafkaDataStream(env:StreamExecutionEnvironment,zookeeper:String,broker:String,groupid:String,topic:List[String]):DataStream[String]={ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.enableCheckpointing(5000) env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
val kafkaProps = new Properties() kafkaProps.setProperty("zookeeper.connect", zookeeper) kafkaProps.setProperty("bootstrap.servers", broker) kafkaProps.setProperty("group.id", groupid)
env.addSource( new FlinkKafkaConsumer08[String](topic.asJava, new SimpleStringSchema(), kafkaProps)) }
|
读取的程序,采用仅一次模式,并且设置checkpoint,数据的格式SimpleStringSchema化。
消息格式的处理
kafka消息一般采用json串,在处理时需要对其进行解析,采取的是json4s。实践中,同一种类的消息格式应统一,可能数据来源不同;消息的格式应保证数据完整,能用一个topic解决就用一个topic,不要为一个业务而采取多个topic的方案。
使用例子
使用如下程序调用读取函数,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| object readMessage { private val ZOOKEEPER_HOST = Config("***") private val KAFKA_BROKER = Config("***")
private val TRANSACTION_GROUP = "***" private val topics = List("***","***")
def main(args:Array[String]):Unit={
val env = StreamExecutionEnvironment.getExecutionEnvironment
//设置job失败重启策略 env.setRestartStrategy(RestartStrategies.failureRateRestart( 3, // max failures per unit Time.of(5, TimeUnit.MINUTES), //time interval for measuring failure rate Time.of(10, TimeUnit.SECONDS) // delay ))
val opMysqlStream = readFromKafkaDataStream(env, ZOOKEEPER_HOST, KAFKA_BROKER, TRANSACTION_GROUP, topics) ....... env.execute("read from kafka") } }
|