kafka 应用场景
- 日志收集
- 消息系统 解耦生产者和消费者、缓存消息。
- 用户活动跟踪: 就是我们在做的。
- 运营指标:生产各种操作的集中反馈。
- 流式处理:比如spark steaming
kafka的发布对象是topic。每类数据我们可以归为一个topic。向topic发送消息的我们称为生产者、从topic订阅消息的称为consumer。producer 和 consumer 可以同时读写数据。
- topic: 消息主题。
- producer: 生产者到topic的一方。
- consumer: 订阅topic消费消息的一方。
- broker
kafka topic & partition
kafka 集群会保存所有消息,不管消息有没有被消费;通过设置消息过期时间,可以来定制的删除消息。比如我们设置过期时间为2天。
一个消息被生产出来,写入到多个partition。消息就是以partition作为存储单位,每个partition可以通过调整以适应它所在的机器,而一个topic对应多个partition,这样整个集群就可以适应各个大小的数据了。第二,也可以提高并发,因为可以以partition 为单位来读写了。
Kafka 核心组件
replications partitions and leaders
怎么实现持久化?
kafka能够做数据持久化。可以为每个topic设置副本容量。 如果副本容量设为3,那么一份数据就会被放在3台不同的机器上。一般设为2.
关于partition。
topic的存放形式是partition。每一个topic都可以设置partition数量。partition的数量决定了log的数量。producer 在生产消息时,会把消息发布到topic的各个partition中。上面说的副本都是以partition为单位的,不过只有一个partition的副本会被选为leader作为读写用。
kafka从0.8开始提供partition级别的replication,replication的数量可在$KAFKA_HOME/config/server.properties中配置。
default.replication.factor = 1
如何设置partition值要考虑的因素?
一个partition只能被一个消费者消费(但是一个消费者可以同时消费多个partition。),所以,运行的partition的数量要大于运行的comsumer的数量,否则就会有消费者消费不到数据。另一方面,建议partition的数量大于broker 的数量。这样leader partition 的数据就能均匀的分布在各个broker中,最终使得集群负载均衡。
(如果小于会怎样样,会造成比较集中的存储在单个broker之中吗。)。注意:kafka需要为每个partition分配一些内存来缓存消息数据,如果parttion数量越大,分配更大的heap space。
partition每一个都会保存作为一个repilca么? 不是的。partition的概念是根据partition 方法来将数据分布存储。
producers
producer发送消息。
producer 可以直接发送到broker对应的leader partition中,不需要经历任何一个中介的转发。为实现这个特性,每个broker都可以响应producer的请求,并返回topic的一些元信息,这些元信息包括哪些机器是存活的,topic的你leader partition都在哪。现阶段哪些leader partition 是可以直接访问的?
如果访问的不是leader partition 怎么搞? 而且我看是可以指定多个进行访问的。
producer 和 partition
producer 可以控制以什么样的将消息推送到客户端。实现方法包括随机,实现一类随机负载均衡的算法,或者指定一些分区算法。kafka 提供了用户自定义分区的方法,用户可以为每一个消息指定一个partitionkey,通过这个key来实现一些hash 分区算法。
效率。
batch的方式将有效的提高效率,减少网络和磁盘io的占用。这里batch的大小,可以再producer来设置,比如焕春100s,缓存1000条,或者数据的大小。
关于消息的完整性。
producer 可以异步的并行的向kafka发送消息,但是通常你producer在发送完消息之后会得到一个future的响应,返回的是offset或者发送过程遇到的错误。这里,acks 这参数很重要,这个参数决定了producer要求leader partition收到的确认副本数。如果acks设置数量为0,表示producer不会等待broker的响应,所以,producer无法知道消息是否发送成功,这有可能会导致数据丢失,但这也是吞吐量最大的方式。
如果acks设置为1,表示producer 和laeder partition收到消息的时得到的broker的一个确认,这样会有更好的可靠性。如果设置为-1,则组要等待所有partition收到消息。这样能保持最高的可靠性。
kafka 消息。
kafka消息有一个定长的header和变长的字节组成。kafka没有限定单个消息的大小,但一般不超过一mb,通常控制在1-10kb之间。
Consumers
kafka 提供了两套api。sample api 。是一套无状态的api。每次请求都需要指定offset。所以也是最灵活的。
在kafka中,当前消息的offset是由consumer来维护的。consumer可以自己决定读哪些数据。比如,consumer 可以重新消费已经消费国的数据。这些数据有一个过期限制。这个限制是可配置的。
high-level api 封装了对集群的访问。可以透明的消费一个topic。自己本身维持了一个消费队列,每次消费下一个。
这里consumer 用组来模拟了广播和订阅两个功能。组是嫁接topic和consumer 的桥梁。 组对topic是来说是组内的成员都可以接受到消息,相当于广播,组对成员来说,是订阅,即你在这个组里才能接受到这个消息。所以都在一个组,就相当于一个大广播。
kafka 的核心特性
压缩
kafka 支持以batch的方式来发送消息。在此之上,还支持对消息的压缩。 producer端进行压缩之后,在consumer端进行解压。这么做的好处是,往往大数据的瓶颈在于网络,而不是cpu(所以会损耗一定的cpu。)
消息压缩的信息,存储在消息头部的描述压缩属性字节。这个字节的后两位表示消息的压缩采用的编码,若后两位为0,则表示消息未被压缩。
消息可靠性
在消息系统中,保证消息的可靠性是很重要的。在实际消息的传递过程中,会出现如下3种情况:
- 一个消息传递失败
- 一个消息被发送多次
- exactly once,一个消息发送成功并且仅发送了一次。
有许多系统声称它们实现了exactly-once,但一般没有考虑生产者或消费者在生产和消费过程中有可能失败的情况。比如虽然一个producer成功发送一个消息,但消息丢失,或者成功发送到broker,也被consumer成功取走,但是这个consumer在处理消息时失败了。
这里从两个角度来分析这个。
从producer的角度:在发送端,看producer会等待broker成功接收到消息的反馈,如果没有接到broker的反馈信息,producer 会重新发送,(我们知道kafka有备份机制,可以通过参数设置是否等待所有节点都收到消息,而本身的消息也有缓存)
从consumer的角度:因为consumer 可以调整offset,所以可以重复消费消息。也保证了,一条消息被发送一次就ok。
备份机制
备份机制是Kafka0.8版本的新特性,备份机制的出现大大提高了Kafka集群的可靠性、稳定性。有了备份机制后,Kafka允许集群中的节点挂掉后而不影响整个集群工作。一个备份数量为n的集群允许n-1个节点失败。在所有备份节点中,有一个节点作为lead节点,这个节点保存了其它备份节点列表,并维持各个备份间的状体同步。
kafka 高效性相关设计
消息持久化
首先这里,kafka是高度依赖文件系统和缓存的。
文件系统的速度。文件系统的速度并不是想象中的慢或者快。对于,顺序写入和随机写入两者有很大速度差。一个7200的硬盘顺序写入有600m/s的速度,随机写入有100k/s的速度。
缓存思路。所以,基本的数据写入思路是,先拿内存缓存数据再刷新到磁盘。但是,众所周知,内存的垃圾回收的代价很大,尤其当数据量过大的时候,垃圾回收会非常昂贵。
感觉这块理解的不是很好
基于以上,得到的一个结论就是利用文件系统并且依靠页缓存比维护一个内存缓存或者其他结构要好。而事实上,数据被传输到内核页,稍后会被刷新。这里加上了一个配置项来控制让系统的用户来控制数据在什么时候被刷新到物理硬盘上。
常数时间性能保证
没太理解
消息系统中持久化数据结构的设计通常是维护者一个和消费队列有关的B树或者其它能够随机存取结构的元数据信息。B树是一个很好的结构,可以用在事务型与非事务型的语义中。但是它需要一个很高的花费,尽管B树的操作需要O(logN)。通常情况下,这被认为与常数时间等价,但这对磁盘操作来说是不对的。磁盘寻道一次需要10ms,并且一次只能寻一个,因此并行化是受限的。 直觉上来讲,一个持久化的队列可以构建在对一个文件的读和追加上,就像一般情况下的日志解决方案。尽管和B树相比,这种结构不能支持丰富的语义,但是它有一个优点,所有的操作都是常数时间,并且读写之间不会相互阻塞。这种设计具有极大的性能优势:最终系统性能和数据大小完全无关,服务器可以充分利用廉价的硬盘来提供高效的消息服务。
事实上还有一点,磁盘空间的无限增大而不影响性能这点,意味着我们可以提供一般消息系统无法提供的特性。比如说,消息被消费后不是立马被删除,我们可以将这些消息保留一段相对比较长的时间(比如一个星期)。
进一步提高效率
在web开发中,每次一条log都会产生一次写操作,这些小的写操作的量非常大,另外这些log也要至少被一个或以上consumer消费。
所以,这里出现了两个比较低效的场景。
- 太多小的io操作。
- 过多的字节拷贝。
太多小的io操作。可以通过发送messageset来搞定。所以对消息的处理,这里没有分开的序列化和反序列化的上步骤,消息的字段可以按需反序列化(如果没有需要,可以不用反序列化)。
过多的字节拷贝。 为了解决这个问题,kafka设计了一个标准字节消息。producer,broker,consumer共享这一种消息格式。kafka的message log 在broker端就是一些目录文件。这些文件都是按照message set 来存的。
而这种通用的方式,非常重要: 持久化log块的网络传输。这传输通过一钟非常搞笑的途径来实现页面缓存和socket之间的数据传递。 叫sendfile
这里解释下sendfile的作用,先声明下一般的数据从文件到socket的路径:
- 操作系统将数据从磁盘读到内核空间的页缓存中。
- 应用将数据从内核空间读到用户空间的页缓存中。
- 应用将数据从用户空间写到内核空间的socket缓存。
- 操作系统将数据从socket缓存写入到网卡缓存中。
这钟方式非常低效,因为这里有四次拷贝,两次系统调用。如果使用sendfile,就可以避免两次拷贝:操作系统将数据直接从页缓存发送到网络上。所以这个过程,只有第一步和最后一步是需要的。利用上述zero copy,数据只需要拷贝到页缓存一次,就可以重复被consumer利用。这样通过页缓存和sendfile的结合,下游有很多consumers,也不会对整个集群服务造成压力。
kafka 集群部署
为了提高性能,尽量与hadoop的集群分开部署。如果共享节点的话,会影响其使用页面缓存的性能。
kafka的性能主要在磁盘上。
kafka依赖于zookper,一般使用专用服务器来管理。zookeeper集群的节点采用偶数个。注意,zookeeper集群越大其读写性能越慢,因为zookeeper 要在节点之间同步数据。一个3节点的zookeeper集群允许一个节点失败,一个5节点的集群允许2个节点失败。
集群大小
衡量kafka集群所需的大小,最好是用模拟负载来测算一下。如果不想用模拟实验,最好的方法是根据磁盘。
kafka 主要配置
broker config
log.dirs /tmp/kafka-logs Kafka数据存放的目录。可以指定多个目录,中间用逗号分隔,当新partition被创建的时会被存放到当前存放partition最少的目录。