转 Spark Streaming场景应用-Kafka数据读取方式
4390 | 0 | 0
此两种读取方式存在很大的不同,当然也各有优劣。接下来就让我们具体剖解这两种数据读取方式。

/*读取kafka数据函数*/
def getKafkaInputStream(zookeeper: String,
topic: String,
groupId: String,
numRecivers: Int,
partition: Int,
ssc: StreamingContext): DStream[String] = {
val kafkaParams = Map(
("zookeeper.connect", zookeeper),
("auto.offset.reset", "largest"),
("zookeeper.connection.timeout.ms", "30000"),
("fetch.message.max.bytes", (1024 * 1024 * 50).toString),
("group.id", groupId)
)
val topics = Map(topic -> partition / numRecivers)
val kafkaDstreams = (1 to numRecivers).map { _ =>
KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc,
kafkaParams,
topics,
StorageLevel.MEMORY_AND_DISK_SER).map(_._2)
}
ssc.union(kafkaDstreams)
}
以上几个参数主要用来连接Kafka并读取Kafka数据。具体执行的步骤如下:
1、Kafka相关读取参数配置,其中 zookeeper.connect即传入进来的zookeeper参数;auto.offset.reset设置从topic的最新处开始读取数据;zookeeper.connection.timeout.ms指zookeepr连接超时时间,以防止网络不稳定的情况;fetch.message.max.bytes则是指单次读取数据的大小;group.id则是指定consumer。
2、指定topic的并发数,当指定receivers个数之后,但是由于receivers个数小于topic的partition个数,所以在每个receiver上面会起相应的线程来读取不同的partition。
3、读取Kafka数据,numReceivers的参数在此用于指定我们需要多少Executor来作为Receivers,开多个Receivers是为了提高应用吞吐量。
Receiver-based 读取问题
1.防数据丢失。做checkpoint操作以及配置spark.streaming.receiver.writeAheadLog.enable参数;
2.提高receiver数据吞吐量。采用MEMORY_AND_DISK_SER方式读取数据、提高单Receiver的内存或是调大并行度,将数据分散到多个Receiver中去。
配置spark.streaming.receiver.writeAheadLog.enable参数,每次处理之前需要将该batch内的日志备份到checkpoint目录中,这降低了数据处理效率,反过来又加重了Receiver端的压力;另外由于数据备份机制,会受到负载影响,负载一高就会出现延迟的风险,导致应用崩溃。
采用MEMORY_AND_DISK_SER降低对内存的要求。但是在一定程度上影响计算的速度
单Receiver内存。由于receiver也是属于Executor的一部分,那么为了提高吞吐量,提高Receiver的内存。但是在每次batch计算中,参与计算的batch并不会使用到这么多的内存,导致资源严重浪费。
提高并行度,采用多个Receiver来保存Kafka的数据。Receiver读取数据是异步的,并不参与计算。如果开较高的并行度来平衡吞吐量很不划算。
Receiver和计算的Executor的异步的,那么遇到网络等因素原因,导致计算出现延迟,计算队列一直在增加,而Receiver则在一直接收数据,这非常容易导致程序崩溃。
为了回辟以上问题,降低资源使用,我们后来采用Direct Approach来读取Kafka的数据,具体接下来细说。

def createDirectStream[
K: ClassTag,
V: ClassTag,
KD <: Decoder[K]: ClassTag,
VD <: Decoder[V]: ClassTag,
R: ClassTag] (
ssc: StreamingContext,
kafkaParams: Map[String, String],
fromOffsets: Map[TopicAndPartition, Long],
messageHandler: MessageAndMetadata[K, V] => R
): InputDStream[R] = {
val cleanedHandler = ssc.sc.clean(messageHandler)
new DirectKafkaInputDStream[K, V, KD, VD, R](
ssc, kafkaParams, fromOffsets, cleanedHandler)
}
def createDirectStream[
K: ClassTag,
V: ClassTag,
KD <: Decoder[K]: ClassTag,
VD <: Decoder[V]: ClassTag] (
ssc: StreamingContext,
kafkaParams: Map[String, String],
topics: Set[String]
): InputDStream[(K, V)] = {
val messageHandler = (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message)
val kc = new KafkaCluster(kafkaParams)
val fromOffsets = getFromOffsets(kc, kafkaParams, topics)
new DirectKafkaInputDStream[K, V, KD, VD, (K, V)](
ssc, kafkaParams, fromOffsets, messageHandler)
}
应用重启。因资源、网络等其他原因导致程序失败重启时,需要保证从上次的offsets处开始读取数据,此时就需要采用第一种方法来保证我们的场景。
/**
* 读取kafka数据,从最新的offset开始读
*
* @param ssc : StreamingContext
* @param kafkaParams : kafka参数
* @param topics : kafka topic
* @return : 返回流数据
*/
private def getDirectStream(ssc: StreamingContext,
kafkaParams: Map[String, String],
topics: Set[String]): DStream[String] = {
val kafkaDStreams = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc,
kafkaParams,
topics
)
kafkaDStreams.map(_._2)
}
/**
* 如果已有offset,则从offset开始读数据
*
* @param ssc : StreamingContext
* @param kafkaParams : kafkaParams配置参数
* @param fromOffsets : 已有的offsets
* @return : 返回流数据
*/
private def getDirectStreamWithOffsets(ssc: StreamingContext,
kafkaParams: Map[String, String],
fromOffsets: Map[TopicAndPartition, Long]): DStream[String] = {
val kfkData = try {
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, String](
ssc,
kafkaParams,
fromOffsets,
(mmd: MessageAndMetadata[String, String]) => mmd.message()
)
} catch { //offsets失效, 从最新的offsets读。
case _: Exception =>
val topics = fromOffsets.map { case (tap, _) =>
tap.topic
}.toSet
getDirectStream(ssc, kafkaParams, topics)
}
kfkData
}
val fromOffsets = offsets.map { consumerInfo =>
TopicAndPartition(consumerInfo.topic, consumerInfo.part) -> consumerInfo.until_offset
}.toMap
2.降低内存。Receiver-based的Receiver与其他Exectuor是异步的,并持续不断接收数据,对于小业务量的场景还好,如果遇到大业务量时,需要提高Receiver的内存,但是参与计算的Executor并无需那么多的内存。而Direct 因为没有Receiver,而是在计算时读取数据,然后直接计算,所以对内存的要求很低。实际应用中我们可以把原先的10G降至现在的2-4G左右。
3.鲁棒性更好。Receiver-based方法需要Receivers来异步持续不断的读取数据,因此遇到网络、存储负载等因素,导致实时任务出现堆积,但Receivers却还在持续读取数据,此种情况很容易导致计算崩溃。Direct 则没有这种顾虑,其Driver在触发batch 计算任务时,才会读取数据并计算。队列出现堆积并不会引起程序的失败。
2.监控可视化。Receiver-based方式指定topic指定consumer的消费情况均能通过ZooKeeper来监控,而Direct则没有这种便利,如果做到监控并可视化,则需要投入人力开发。
徐胜国,大连理工大学硕士毕业,360大数据中心数据研发工程师,主要负责基于Spark Streaming的项目架构及研发工作。邮箱 : xshguo_better@yeah.net。如有问题,可邮件联系,欢迎交流。
文章来源:https://my.oschina.net/u/1250040/blog/908571
0

wsr
0人已关注
领课教育 32666
10425
update 47870
5231
领课教育 18538
husheng 21224
请更新代码 41921
凯哥Java 2504
凯哥Java 2932
凯哥Java 2206