【Kafka】LinkedIn开源分布式消息系统Kafka ---用途、系统架构与应用 | Yet Another Thoughts
导论
互联网公司通常每天都会生成海量的日志数据。文中举了几个例子:Facebook每天会聚集多达几乎6TB的日志数据,来自各式各样的用户活动记录;中国移动每天也会收集6~8TB的通话记录。通常,这些日志数据包括:
- 用户的活动事件,包括登录,浏览,点击,”喜欢“,分享,评论与搜索查询;
- 各项操作指标,包括服务的调用栈记录,调用延迟,错误, 还有系统运行指标,包括每台机器的CPU,内存,网络,或者硬盘利用率等。
这些日志数据是数据分析的一个重要部分,用于用户参与度、系统利用率及其他指标的分析计算。传统的日志分析工作常常是离线运算的。许多相关工作已经对离线日志分析提供了非常完善的工具,包括Facebook的Scribe(已不再更新),Yahoo的Data Highway,Cloudera的Flume,等。这些工具首要设计成收集日志数据,并载入到数据仓库或者Hadoop集群中用来离线处理。
然而,在LinkedIn及其他的许多应用场景中,除了离线的日子数据分析,在低延迟下提供支持实时的日志数据分析的需求日益强烈。互联网应用中,越来越多地将这些日志数据分析工作融入到生产数据流水线中做到实时与低响应,成为应用的特征。这些应用场景包括但不限于:
- 搜索相关性分析;
- 由物品流行度或者最近的活动流中的出现频率驱动的推荐系统;
- 在线广告的目标投放与记录;
- 安全相关的应用,防止应用遭到恶意滥用,比如垃圾邮件,或者未授权的数据爬取;
- 社交用户的新鲜事实时推送,聚集朋友或者关注者的新近的状态更新、行为推送到用户面前。
正是这样的实时的日志数据的分析、处理需求促成了Kafka在LinkedIn的诞生。Kafka是一个卓有成效的用于日志处理的消息系统,结合了传统的离线日志分析的支持与实时线上日志分析的能力。
在InfoQ文章《Kafka剖析(一):Kafka背景及架构介绍》中,有以下关于Kafka的说明:
Kafka是一种分布式的,基于发布/订阅的消息系统。主要设计目标如下:
- 以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上数据也能保证常数时间复杂度的访问性能。
- 高吞吐率。即使在非常廉价的商用机器上也能做到单机支持每秒100K条以上消息的传输。
- 支持Kafka Server间的消息分区,及分布式消费,同时保证每个Partition内的消息顺序传输。
- 同时支持离线数据处理和实时数据处理。
- Scale out:支持在线水平扩展。
文章中还从工程与工业界角度说明了为何要使用消息系统:
- 解耦 在项目启动之初来预测将来项目会碰到什么需求,是极其困难的。消息系统在处理过程中间插入了一个隐含的、基于数据的接口层,两边的处理过程都要实现这一接口。这允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。
- 冗余 有些情况下,处理数据的过程会失败。除非数据被持久化,否则将造成丢失。消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。许多消息队列所采用的”插入-获取-删除”范式中,在把一个消息从队列中删除之前,需要你的处理系统明确的指出该消息已经被处理完毕,从而确保你的数据被安全的保存直到你使用完毕。
- 扩展性 因为消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的,只要另外增加处理过程即可。不需要改变代码、不需要调节参数。扩展就像调大电力按钮一样简单。
- 灵活性 & 峰值处理能力 在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见;如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。
- 可恢复性 系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。
- 顺序保证 在大多使用场景下,数据处理的顺序都很重要。大部分消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。Kafka保证一个Partition内的消息的有序性。 缓冲在任何重要的系统中,都会有需要不同的处理时间的元素。例如,加载一张图片比应用过滤器花费更少的时间。消息队列通过一个缓冲层来帮助任务最高效率的执行———写入队列的处理会尽可能的快速。该缓冲有助于控制和优化数据流经过系统的速度。
7, 异步通信 很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。
Kafka已经开源(链接)。本文写作时的最新版本号是0.8.2.1. Kafka是基于JVM语言Scala实现的,它有一个基于java的克隆版本Jafka。阿里巴巴公司在他们的应用场景下基于Kafka的思想开发了广泛应用于淘宝、支付宝等产品中的消息系统RocketMQ,在《RocketMQ与Kafka对比(18项差异)》中罗列了与Kafka的不同之处。
相关工作
论文中主要介绍了以下几个消息系统:
- IBM Wehsphere MQ 这个系统是有消息传输的保障的。
- JMS(Java Message Service) JMS允许每一个消息个体再被处理后产生通知,存在乱序的可能性。这样的特性可能对于收集日志信息过犹不及。
上面的系统并没有非常强烈地关注吞吐量作为系统设计的重要考量,一些其它的并非那么必要的约束会降低吞吐量,增加API与实现的复杂度。
已经有了一些专门做日志收集的工具,包括以下几个: - Facebook Scribe 每一个前端的机器向一序列的Scribe服务器通过Socket发送日志信息,每个Scribe服务器收集这些日志单元并定期将它们dump到HDFS或者NFS设备。
- Yahoo’s Data Highway 有着与Scribe类似的数据流程图。
- Flume Flume比较新,支持可扩展的“管道(pipes)”和“数据槽(sinks)”,同时使得日志数据流非常地灵活。它有着更加基层的分布式支持。
但是上述的工具更多地用于支持离线处理日志数据,并会不必要地暴露实现的细节。它们大多使用推送模型(push),每个broker将数据推送给日志的消费者(consumer)。在linkedIn的场合及其他许多场合,拉取模型(pull)更适合,这样日志数据的处理者可以以一个合适的处理速率从broker那里获取日志数据,防止被数据淹没造成数据丢失。同时,这个模型使得重新绑定一个处理者变得更容易。
在文章《Kafka剖析(一):Kafka背景及架构介绍》中,还提到了一下几个相关的消息系统:
- RabbitMQ RabbitMQ是使用Erlang编写的一个开源的消息队列,本身支持很多的协议:AMQP,XMPP, SMTP, STOMP,也正因如此,它非常重量级,更适合于企业级的开发。同时实现了Broker构架,这意味着消息在发送给客户端时先在中心队列排队。对路由,负载均衡或者数据持久化都有很好的支持。
- Redis Redis是一个基于Key-Value对的NoSQL数据库,开发维护很活跃。虽然它是一个Key-Value数据库存储系统,但它本身支持MQ功能,所以完全可以当做一个轻量级的队列服务来使用。对于RabbitMQ和Redis的入队和出队操作,各执行100万次,每10万次记录一次执行时间。测试数据分为128Bytes、512Bytes、1K和10K四个不同大小的数据。实验表明:入队时,当数据比较小时Redis的性能要高于RabbitMQ,而如果数据大小超过了10K,Redis则慢的无法忍受;出队时,无论数据大小,Redis都表现出非常好的性能,而RabbitMQ的出队性能则远低于Redis。
- ZeroMQ ZeroMQ号称最快的消息队列系统,尤其针对大吞吐量的需求场景。ZeroMQ能够实现RabbitMQ不擅长的高级/复杂的队列,但是开发人员需要自己组合多种技术框架,技术上的复杂度是对这MQ能够应用成功的挑战。ZeroMQ具有一个独特的非中间件的模式,你不需要安装和运行一个消息服务器或中间件,因为你的应用程序将扮演这个服务器角色。你只需要简单的引用ZeroMQ程序库,可以使用NuGet安装,然后你就可以愉快的在应用程序之间发送消息了。但是ZeroMQ仅提供非持久性的队列,也就是说如果宕机,数据将会丢失。其中,Twitter的Storm 0.9.0以前的版本中默认使用ZeroMQ作为数据流的传输(Storm从0.9版本开始同时支持ZeroMQ和Netty作为传输模块)。
- ActiveMQ ActiveMQ是Apache下的一个子项目。 类似于ZeroMQ,它能够以代理人和点对点的技术实现队列。同时类似于RabbitMQ,它少量代码就可以高效地实现高级应用场景。
Kafka的系统架构与设计原则
一个特定类型的消息流定义为一个Topic。一个生产者(producer)可以在某一个Topic发布消息。这些消息在一个序列的叫经纪人(broker)服务器中被分布、排序。而一个消费者(consumer)可以在经纪人那里订阅一个或者多个Topic的消息,然后通过从经纪人(broker)那里拉取数据的方式消费订阅的消息。 系统架构如下图所示。
一个典型的Kafka集群中包含若干Producer,若干Broker(支持水平扩展,一般Broker数量越多,集群吞吐率越高),若干Consumer分组,以及一个Zookeeper集群。Zookeeper用于在一个分布式集群中提供用于维护配置信息、命名空间、提供分布式同步服务、提供分组服务等服务的集中式服务。Kafka通过Zookeeper管理集群配置,选举leader,以及在Comsumer分组发生变化时进行重新平衡。Producer使用push模式将消息发布到Broker,Consumer使用pull模式从Broker订阅并消费消息。 为了平衡负载,在Broker中,一个Topic会被分到多个partition中,每个Broker存储处理一个或者多个partition。
下面是Producer和Comsumer的示例代码。
Sample Producer code(link):
//TestProducer.javaimport java.util.*; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; public class TestProducer { public static void main(String[] args) { long events = Long.parseLong(args[0]); Random rnd = new Random(); Properties props = new Properties(); props.put("metadata.broker.list", "broker1:9092,broker2:9092 "); props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("partitioner.class", "example.producer.SimplePartitioner"); props.put("request.required.acks", "1"); ProducerConfig config = new ProducerConfig(props); Producer<String, String> producer = new Producer<String, String>(config); for (long nEvents = 0; nEvents < events; nEvents++) { long runtime = new Date().getTime(); String ip = “192.168.2.” + rnd.nextInt(255); String msg = runtime + “,www.example.com,” + ip; KeyedMessage<String, String> data = new KeyedMessage <String, String>("page_visits", ip, msg); producer.send(data); } producer.close(); }}//SimplePartitioner.javaimport kafka.producer.Partitioner;import kafka.utils.VerifiableProperties;public class SimplePartitioner implements Partitioner { public SimplePartitioner (VerifiableProperties props) {} public int partition(Object key, int a_numPartitions) { int partition = 0; String stringKey = (String) key; int offset = stringKey.lastIndexOf('.'); if (offset > 0) { partition = Integer.parseInt( stringKey.substring(offset+1)) % a_numPartitions; } return partition; }}在上面Producer的例子中,包含了两个类——TestProducer和SimplePartitioner。
Partitioner
SimplePartitioner是kafka.producer.Partitioner的实现,它用来决定当前信息应该发送该Topic的哪一个partition。如果设置合理,通过Partitioner可以均匀分布消息到不同的partition里,实现负载平衡。通过partition,对同一个Topic的请求会并行写入到不同Partition里,不会成为IO瓶颈。例子中,随机生成了给定数目的ip地址作为消息的key,并把消息发送到了Topic page_visits下。
Sample Consumer Code(link):
package com.test.groups;import kafka.consumer.ConsumerConfig;import kafka.consumer.KafkaStream;import kafka.javaapi.consumer.ConsumerConnector;import java.util.HashMap;import java.util.List;import java.util.Map;import java.util.Properties;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;class ConsumerTest implements Runnable { private KafkaStream m_stream; private int m_threadNumber; public ConsumerTest(KafkaStream a_stream, int a_threadNumber) { m_threadNumber = a_threadNumber; m_stream = a_stream; } public void run() { ConsumerIterator<byte[], byte[]> it = m_stream.iterator(); while (it.hasNext()) System.out.println("Thread " + m_threadNumber + ": " + new String(it.next().message ())); System.out.println("Shutting down Thread: " + m_threadNumber); }}public class ConsumerGroupExample { private final ConsumerConnector consumer; private final String topic; private ExecutorService executor; public ConsumerGroupExample(String a_zookeeper, String a_groupId, String a_topic) { consumer = kafka.consumer.Consumer.createJavaConsumerConnector( createConsumerConfig(a_zookeeper, a_groupId)); this.topic = a_topic; } public void shutdown() { if (consumer != null) consumer.shutdown(); if (executor != null) executor.shutdown(); try { if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) { System.out.println("Timed out waiting for consumer threads to shut down, exiting uncleanly"); } } catch (InterruptedException e) { System.out.println("Interrupted during shutdown, exiting uncleanly"); } } public void run(int a_numThreads) { Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topic, new Integer(a_numThreads)); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer .createMessageStreams(topicCountMap); List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); // now launch all the threads executor = Executors.newFixedThreadPool(a_numThreads); // now create an object to consume the messages int threadNumber = 0; for (final KafkaStream stream : streams) { executor.submit(new ConsumerTest(stream, threadNumber)); threadNumber++; } } private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) { Properties props = new Properties(); props.put("zookeeper.connect", a_zookeeper); props.put("group.id", a_groupId); props.put("zookeeper.session.timeout.ms", "400"); props.put("zookeeper.sync.time.ms", "200"); props.put("auto.commit.interval.ms", "1000"); return new ConsumerConfig(props); } public static void main(String[] args) { String zooKeeper = args[0]; String groupId = args[1]; String topic = args[2]; int threads = Integer.parseInt(args[3]); ConsumerGroupExample example = new ConsumerGroupExample (zooKeeper, groupId, topic); example.run(threads); try { Thread.sleep(10000); } catch (InterruptedException ie) {} example.shutdown(); }}上面的代码中,展示了一个简单的Consumer示例代码。一个Consumer可以订阅多个Topic的信息,通过一个Map传入订阅的Topic和对应消息数目,然后得到各个Topic对应的Stream,每个Stream提供一个迭代器iterator来获取对应若干消息。
Consumer Group
Kafka提供了high level的Consumer group的API,同一个Topic的一条消息只能被同一个group里的一个Consumer所消费,但多个group可以同时消费一个消息。这是Kafka用来实现一个Topic消息的广播/单播的手段。一个Topic可以对应多个Comsumer group。要实现广播,只需要一个Consumer组成一个group就好了。而单播只要所有Consumer都在同一个group里。用group还可以将Consumer自由分组。这样可以用一个Storm这种实时流处理系统做实时处理,Spark进行快速的离线处理,同时将数据实时备份到另一个数据中心,保证这三种操作分布在不同的Consumer group里即可。
单个partition上的效率
Kafka通过以下几个关键的决策提升整个系统的效率。
简单的存储
Kafka的消息存储有着非常简单的存储布局。一个Topic下的每个partition对应一个逻辑上的日志文件。物理上,一个partition对应一个文件夹,包含了消息的信息,还包含了其对应的索引文件。每次Producer向一个partition发布一条消息,Broker简单地把它添加到最后一个段文件的尾部。每个日志文件包含了一个序列的log entrie,每个entry都有一个当前partition下唯一的64字节offset,指明了这条消息的起始位置。不同于传统的消息系统,每个消息并没有一个ID所标识,这样省去了维护附加的、随机读取、高频查询的消息ID索引。 而逻辑上的日志文件被分成了若干个大小均为1GB左右的段文件,每个段文件由其包含的第一条消息的offset + “.kafka”作为文件名。索引文件包含了每个segment下包含的log entry的offset范围。
为了更好的性能,在预设数量的消息被发布或者预设时间到了才会一次性把这些段文件保存到硬盘上。只有清空缓存后的消息才会暴露给对应的Consumer。
Kafka读取特定消息的时间复杂度为O(1),即与文件大小无关,删除过期日志与提高性能无关,删除策略至于磁盘及具体需求有关。
下图显示了Broker里的存储的结构的大致示意图。
一个Consumer总是会顺序地从一个特定partition消费消息。如果一个Consumer获得了一个特定的offset,说明它已经获取了那个offset之前的全部消息。背后,消费者会异步地向Broker发出pull请求,来获取让应用消费的缓存下的数据。每一个pull请求包含要消费的消息开始的消息的offset和可以接受的消息的大小字节数。每个Broker都在内存中保存了一个排序的offset列表,包含了每个段文件的第一个消息的offset。Broker通过搜索offset列表定位到请求的消息所在的段文件,将要求的数据发回给Consumer。当Consumer收到了消息,它会计算出下一段消息的Offset,以待下一次pull请求使用。
高效的数据传输
在数据传输中,首先尽量批量地拉取消息集合。
另外一个反传统的决定是避免在Kafka这个层面上明确地在内存中缓存数据。相反地,依赖文件系统的页缓存机制。这样做可以避免缓存两份数据——消息只被缓存在了文件系统的页缓存中。同时,即使Broker重启了,仍然可以得到额外的好处——消息数据可能仍然会有热的缓存。Kafka避免了在进程中缓存数据,也避免了垃圾回收的大量开销,使得基于虚拟机的语言实现更加高效。而Producer和Consumer都会顺序地读写段文件,同时Consumer经常会一定程度拖慢Producer,一般情况下操作系统的缓存机制会非常有效(尤其是直写缓存和预读)。对于数TB的数据,发现Producer和Consumer都会有着稳定的线性于数据容量的性能表现。
另外,针对Consumer优化了网络连接的性能。Kafka是支持多订阅的,一个单独的消息可能会被多个Consumer应用所消费。由一个本地文件向远程socket发送字节的典型步骤是: 1)从存储媒介将数据读取到操作系统的页缓存; 2)将数据从页缓存复制到应用的缓存;3)将应用缓存复制到内核缓存;4)从内核缓存发送数据到socket。这包含了4次复制于2次系统调用。在Linux/Unix系统中,都有一个sendfile API支持直接从文件的管道传输文件到socket管道。Kafka利用了这样的API来更高效地从Broker中的消息段文件向Consumer传输字节。
无状态的Broker
不同于其他的消息系统,Broker并不会保存每个Consumer消费了多少消息,而是由各个Consumer保存这个状态,读取全部由Consumer通过offset控制。这样的设计减少了很多复杂性并且避免了Broker的过载。产生的问题是Broker并不知道某一个消息是否被消费过从而不知道该不该删除它。Kafka使用了一种基于时间的SLA机制作为删留机制。如果一个消息被留在Broker超过一定时间,则会被删除。由于Kafka读取特定消息的时间复杂度为O(1),使得消息留长时间积存在Broker上不会影响性能,而即使是离线处理,通常消息都会在一天内、一小时内甚至实时地被消费。
这样的设计还带来了额外的好处——一个Consumer可以从容地回退到之前的offset,再次消费数据。这虽然违背了一个队列的通常的定义,但却对许多Consumer应用来说非常必要的。比如处理消息出现了异常,Consumer可以在问题修复后再次消费刚才的消息。比如Consumer通常定期地才会将消费完的结果存储到持久的存储中,而如果Consumer崩了,未flush的数据会丢失,所以从上次消费的offset重新处理消息是很必要的。使用pull模型比push模型实现这样的特性更容易。
分布式的协调
之前提到过,每一个Producer可以向一个随机选择的partition或者被一个特定的partitioning关键字和partitioning函数决定的partition发布消息。同时,Consumer被分成Consumer group,目标是将broker上存储的消息均衡地分配给各个Consumer而尽量避免过多的协调。
LinkedIn工程师们做出的第一个设计上的决策是让一个Topic下的一个partition成为并行的最小单元,即任何时候一个partition都只被每个Consumer group的一个Consumer所消费。否则,则存在锁、消息分配、过多的状态信息维护等问题。相对地,这个设计只会在consumer试图重新平衡负载时需要协调,而这种情况并不多见。为了平衡负载,对一个Topic需要比对应的任一个group中的Consumer更多数目的段文件。
第二个决策是不设置“主”节点(master node),而使用去中心化的方法协调各个Consumer。LinkedIn使用了分布式的高可用的协同服务Apache Zookeeper。通过它创建、修改、读取、删除一个或若干路径。Zookeeper有以下若干有趣的地方:(a)可以注册一个监视器到一个路径上可以在该路径或者其子路径被改动时得到通知;(b)通过Zookeeper建立的路径可以自动随着创建者的消失而消失;(c)Zookeeper自动备份其信息到多个服务器上,使得数据可用、可靠。
Kafka在以下几个地方使用了Zookeeper:(1)检测Broker和Consumer的添加与移除;(2)当上述事件发生时触发每个Consumer的再平衡过程;(3)维护Consumer消费关系并记录每个partition的消费offset。Broker的注册服务器存储了若干关于Broker的元信息、配置信息,包括存储的Topic、partition。Consumer的注册服务器包含了Consumer与Consumer Group间的从属关系,各自订阅的Topic。每一个group在Zookeeper对应一个从属关系注册服务器和一个offset注册服务器。从属关系注册服务器对每个订阅的partition都有一个路径(路径值是当前消费这个partition的Consumer的ID)。
对Broker注册服务器、Consumer注册服务器、从属关系注册服务器来说,在Zookeeper创建的路径是临时的,而对offset注册服务器来说是持久的。一个Broker崩了其所有的partition会自动从Broker注册服务器删除。一个Consumer崩了,也会删除Consumer注册服务器和从属关系注册服务器上对应的信息。每个Consumer都会在Broker注册服务器、Consumer服务器上设置一个Zookeeper监视器,当Broker或者Consumer group发生变化时都会被通知。
每次一个Consumer初始化进程或者一个Consumer被通知Consumer/Broker变化时,便会启动一个重新平衡的过程,来决定它应当消费的新的partition子集。下面的伪代码很简单,描述了重新平衡的过程。
Rebalance process for Consumer C_i in group G
For each topic T that Consumer C_i subscribes to { remove partitions owned by C_i from the ownership registry read the Broker and the Consumer registries from Zookeeper compute P_T = prtitions available in all brokers under topic T compute C_T = all consumers in G that subscribe to topic T sort P_T and C_T let j be the index position of C_i in C_T and let N = |P_T|/|C_T| assign partitions from j*N to (j+1)*N-1 in P_T to consumer C_i for each assigned partition p { set the owner of p to C_i in the ownership registry let O_p = the offset of partition p stored in the offset registry invoke a thread to pull data in partition p from offset O_p }} 由于一个group中有多个Consumer,它们的每一个都会收到Broker/Consumer变化的通知,且是异步收到,可能会造成一个Consumer尝试去占有被另一个Consumer占有的partition。出现这种情况时,第一个Consumer只需释放它占有的所有partition,等待一段时间重启重新平衡的过程。一般实际中只需要几次的重试。
当新的group创建时,在offset的注册服务器并没有可用的offset。通常,Consumer会从每个订阅的Partition的可用的或者最小或者最大(看配置文件)的offset开始。
交付保障
通常,Kafka只提供at-least-once的保障,即消息绝不会丢,但可能重复传输。而exactly once的保障,需要一个两阶段的数据提交。大多数时候,对每个Consumer group一个消息只会传送一次。当一个Consumer崩了并且关闭地不干净的话,那些“接盘侠”(接过没有处理完的消息的Consumer)可能会处理若干个已经被处理过但还没有提交给Zookeeper正确的offset更新而当前offset值之后的消息。如果一个应用需要保证exactly once,它必须添加自己的重复消息删除机制,或者使用Kafka返回给Consumer的offset,或者消息内的关键字信息。这通常比两阶段数据提交更高效一些。
在博客《Kafka剖析(一):Kafka背景及架构介绍》中,关于交付保障,有这样的描述:
有这么几种可能的delivery guarantee:
At most once 消息可能会丢,但绝不会重复传输
At least once 消息绝不会丢,但可能会重复传输
Exactly once 每条消息肯定会被传输一次且仅传输一次,很多时候这是用户所想要的。
- 当Producer向broker发送消息时,一旦这条消息被commit,因数replication的存在,它就不会丢。但是如果Producer发送数据给broker后,遇到网络问题而造成通信中断,那Producer就无法判断该条消息是否已经commit。虽然Kafka无法确定网络故障期间发生了什么,但是Producer可以生成一种类似于主键的东西,发生故障时幂等性的重试多次,这样就做到了Exactly once。截止到目前(Kafka 0.8.2版本,2015-03-04),这一Feature还并未实现,有希望在Kafka未来的版本中实现。(所以目前默认情况下一条消息从Producer到broker是确保了At least once,可通过设置Producer异步发送实现At most once)。
- 接下来讨论的是消息从broker到Consumer的delivery guarantee语义。(仅针对Kafka consumer high level API)。Consumer在从broker读取消息后,可以选择commit,该操作会在Zookeeper中保存该Consumer在该Partition中读取的消息的offset。该Consumer下一次再读该Partition时会从下一条开始读取。如未commit,下一次读取的开始位置会跟上一次commit之后的开始位置相同。当然可以将Consumer设置为autocommit,即Consumer一旦读到数据立即自动commit。如果只讨论这一读取消息的过程,那Kafka是确保了Exactly once。但实际使用中应用程序并非在Consumer读取完数据就结束了,而是要进行进一步处理,而数据处理与commit的顺序在很大程度上决定了消息从broker和consumer的delivery guarantee semantic。
- 读完消息先commit再处理消息。这种模式下,如果Consumer在commit后还没来得及处理消息就crash了,下次重新开始工作后就无法读到刚刚已提交而未处理的消息,这就对应于At most once
- 读完消息先处理再commit。这种模式下,如果在处理完消息之后commit之前Consumer crash了,下次重新开始工作时还会处理刚刚未commit的消息,实际上该消息已经被处理过了。这就对应于At least once。在很多使用场景下,消息都有一个主键,所以消息的处理往往具有幂等性,即多次处理这一条消息跟只处理一次是等效的,那就可以认为是Exactly once。(笔者认为这种说法比较牵强,毕竟它不是Kafka本身提供的机制,主键本身也并不能完全保证操作的幂等性。而且实际上我们说delivery guarantee 语义是讨论被处理多少次,而非处理结果怎样,因为处理方式多种多样,我们不应该把处理过程的特性——如是否幂等性,当成Kafka本身的Feature)
- 如果一定要做到Exactly once,就需要协调offset和实际操作的输出。精典的做法是引入两阶段提交。如果能让offset和操作输入存在同一个地方,会更简洁和通用。这种方式可能更好,因为许多输出系统可能不支持两阶段提交。比如,Consumer拿到数据后可能把数据放到HDFS,如果把最新的offset和数据本身一起写到HDFS,那就可以保证数据的输出和offset的更新要么都完成,要么都不完成,间接实现Exactly once。(目前就high level API而言,offset是存于Zookeeper中的,无法存于HDFS,而low level API的offset是由自己去维护的,可以将之存于HDFS中)
- 总之,Kafka默认保证At least once,并且允许通过设置Producer异步提交来实现At most once。而Exactly once要求与外部存储系统协作,幸运的是Kafka提供的offset可以非常直接非常容易得使用这种方式。
Kafka在每条消息中都保存了CRC元信息以避免日志“生锈”。当发生I/O错误时,Kafka会启动恢复过程删除CRC不一致的消息。这也可以用来在消息被产生或者消费时检查网络错误。磁盘上存储的消息格式如下:
message length : 4 bytes (value: 1+4+n)
“magic” value : 1 byte
crc : 4 bytes
payload : n bytes
如果一个Broker崩了,它存储的所有未消费的消息会变得不可用。如果是存储介质永久性损坏,则未消费的消息会永久丢失。在Kafka的0.8+版本中提供了数据备份的设计提高数据的可用性与可靠性,冗余地存储消息在多个Broker上。
在LinkedIn Kafka的应用于未来
如下图所示,简单展示了Kafka的部署。分为实时处理的集群和离线处理的集群两部分。不同的前段的服务会批量提交不同的消息通过一个硬件实现的负载均衡器平衡分发给Kafka集群的Broker服务器。会有不同的实时服务作为Consumer消费这些信息。
离线的Kafka集群离Hadoop集群和数据仓库集群更近,它们作为Consumer而存在,从Kafka的replica集群拉取数据,用于数据统计分析、对原始的时间流进行处理以支持ad-hoc查询等工作。不需要太多的tuning,便可以将整个流水线的端对端的延迟大约平均在10秒左右,足够满足LinkedIn的需求。
LinkedIn的数据跟踪系统还包括审查整个流水线没有数据的丢失。每个消息都会携带时间戳和生产的服务器标识。Producer监控的信息也会作为特定Topic的消息发送给Kafka。会有专门监控信息的Consumer来验证、核对这些监控信息。
工程师们设计了专门的工具来将数据从Kafka的数据格式直接导入到Hadoop集群。而数据和offset只有在任务成功完成后才会存储在HDFS上。
Kafka使用Apache Avro!作为序列化的协议,它非常高效并且支持schema的进化。Kafka在payload中存储每个消息的Avro! schema ID和序列化后的字节。这个schema确保了Producer和Consumer的数据兼容性。LinkedIn使用了轻量的schema注册服务来将schema ID映射到实际的schema。当Consumer得到一个消息时,它从schema的注册服务中获取对应的schema,用来将字节数据转化成对象。
LinkedIn Engineering的官方博客在2015年年初的博文Kafka at LinkedIn: Current and Future详细介绍了Kafka当前在LinkedIn的使用状况、在LinkedIn Kafka的生态,Kafka未来发展的重点关注点。在LinkedIn,Kafka通常在以下发挥杠杆作用:
- 监控。所有格的服务器都提交适用于对应系统的标准与应用的健康状况给Kafka。这些数据用于生成监控控制板和警告。这里有更深入的介绍。LinkedIn还使用Apache Samza 来做call graph analysis ,实时地处理事件。
- 传统的消息服务。包含搜索、内容feed,相关度等应用都使用Kafka做标准的队列式的和pub-sub消息服务。
- 数据。使用Kafka来提供给离线的Hadoop集群做数据分析。
- 作为多种分布式应用/平台的构成要素。比如LinkedIn的大数据数据仓库解决方案Pinot,这里是Pinot相关介绍。在分布式数据库Espresso也使用了Kafka作为内部的备份、改变等事件的广播层。
在2015年,LinkedIn对于Kafka的进化主要关注以下几个方面: 1)安全;2)磁盘配额;3)可靠性与可用性;4)核心功能强化;5)更加高效;6)新的提议与想法的实现与验证;7)提升操作性。详细信息请参考原官方博文。
References
- Kafka: A Distributed Messaging System for Log Processing, Jay Kreps, Neha Narkhede, Jun Rao from LinkedIn, at NetDB workshop 2011
- Kafka剖析(一):Kafka背景及架构介绍
- Kafka at LinkedIn: Current and Future