El Psy Congroo

Kafka学习笔记

概述

Kafka本质上是一个append-only的日志系统,消息就是日志,按顺序追加到log文件中。为了能快速定位到消息,每个log文件还对应一个index文件,稀疏记录log文件中消息的offset,因此单个log文件也不能太大,log文件对应segment这个概念。为了提高并发性能,那就需要同时提供多组log文件以供写入,这就是topic中的partition概念,消息会hash到固定的partition中。为了保证高可用,每个partition都需要有备份,根据配置会复制1至N份到其他broker,保持同步的节点群共同组成ISR,当leader失效时从ISR中选出新leader。消息写入时,可以按对可靠性的要求,选择仅写入leader就返回,还是需要等待写入至少N个或全部replica。消息消费时,kafka使用pull的模式,服务端不负责记录消费者offset(但是提供了用于记录的基础设施,以前是zk,现在是特殊的topic),并且一个partition只允许一个消费者,这保证了消息是partition有序的。新版本kafka对zk的依赖很少,仅用于节点发现,集群协调工作都是由选举出的一台broker(controller)来负责的。kafka依赖操作系统中的基础设施来保证高性能,例如利用顺序读取的log文件来规避磁盘性能问题,pagecache做缓存,zero-copy提高发送性能,因此部署时也要考虑这些特性,避免使用raid(将顺序读写分散了),保留足够内存用于pagecache,选择合适的replica份数在空间和可靠性中取得均衡,规划带宽时不要忘了节点间同步的开销等。

kafka conception

  • multiple consumers on one partition

    • like mutliple threads with a synchronize block
  • Message Delivery Semantics

    • at-least-once
      • Producer 发送重试,ack=all
      • Consumer 关闭auto commit,处理完消息后commit
    • at-most-once
      • 无重试,自动提交
    • exactly-once
      • 重试加基于消息主键的幂等发送/消费
  • Consumer Offset Tracking

high watermark: message alread copy to all replicas, consumer can only read up to this position

  • Consumer Liveness

    • 通过心跳机制实现,coordinator超过session.timeout.ms时间没有收到心跳即认为consumer失效,默认30s
    • consumer的心跳信息是在调用poll的过程中发送的,并没有使用后台线程,因此只要consumer没有按时执行poll(消息处理太慢,或抛出异常导致线程终止),就会被认为失效,触发group的重平衡,因此需要根据消息处理速度来合理配置timeout
    • spring-kafka使用consumer(poll)和listener(process)两个线程来处理消息,当listener处理耗时较长时,consumer线程可以向服务端发送pause,使poll调用不会返回新消息,仅发送心跳,避免了超时
  • Leader Election

    • 用zk从broker中选出一个controller,负责partition的leader选举,topic增删及rebalance replica
    • 为什么partition的leader选举不用zk(所有follower watch leader对应的ephemeral node)
      • herd effect,一个broker挂了会触发大量partition选举
      • partition过多时需要注册大量watch,有性能压力
  • Offset Management

    • consumer向controller获取consumer metadata,找到offset manager,向manager提交及查询(有cache)offset
    • manager变更时(offset分区leader变更),consumer提交失败,重复上一步骤
    • consumer offset默认保存24小时(consumer下线超过24小时删除offset),通过offsets.retention.minutes设置
  • Partition Assignment

    目前有两种策略range(默认)及roundrobin,通过partition.assignment.strategy设置

    • range从单个topic角度对partition进行均分,因此当一个consumer group监听多个分区数不一致的topic时,会导致consumer压力不均
    • roundrobin会拿所有topic的分区进行均分
  • High Availability

    • 使用ISR机制替代多数派写,容忍更多节点挂掉
    • 所有replica宕机后的leader选举:等待ISR中的节点活过来,或选择第一个活过来的节点作为leader,由unclean.leader.election.enable控制,默认true
    • ack=all不保证所有replica收到消息,仅保证ISR中存活的replica收到消息,可以通过min.insync.replicas设置需要多少个replica存活才允许ack,默认是1

kafka design

  • 持久化 - 依靠文件系统
    • 利用操作系统的IO调度(合并排序),pagecache,规避了机械硬盘随机访问的性能弱点
    • 避免了在内存中维护一份数据带来的数据结构的开销,GC问题,以及内存浪费(同一份持久化数据内存中一份,pagecache中一份)
    • 使用队列而非Btree,将访问复杂度从O(logN)降为O(1),节约了存储空间,访问磁盘的效率也更高
  • 效率
    • 批量发送及接收消息,减少网络开销
    • sendfile / zero-copy 将pagecache中的数据直接发送到socket,避免了在kernel和user-space之间来回copy,并且数据只需copy到pagecache中一次(即可应对后续多次消费),因此kafka消费速度几乎只受网络带宽限制
    • 支持消息批量压缩(默认未启用)
  • 生产者
    • 负载均衡(hash),无中间路由,直接发到leader
    • 异步发送(积累消息批量发送)
  • 消费者
    • pull
      • 避免对下游造成过大压力,并且可以积累消息批量接收,不过在无消息时需要通过阻塞的long poll避免客户端忙等待
      • push的实时性更好,但缺少上述优势
    • consumer position
      • 在客户端维护offset,避免由服务端来判断一个消息是否被消费(sent/ack),减少了服务端的性能开销
      • partition中消息顺序存储且只有一个消费者,因此offset管理开销很低
      • 附带的好处是客户端可以回放消息
  • 复制
  • 日志压缩
  • 限流

kafka command-line

1
2
3
4
5
6
7
8
9
# 列出当前在线的group
./kafka-consumer-groups.sh --new-consumer --bootstrap-server 10.8.12.139:9092 --list
# 查看指定在线group的offset
./kafka-consumer-groups.sh --new-consumer --bootstrap-server 10.8.12.139:9092 --describe --group console1
# 查看任意group在指定topic下的offset
# topic必填,并且topic下不能有无数据的分区,不然会报zk node不存在的错误
./kafka-consumer-offset-checker.sh --zookeeper localhost --group console1 --topic test123

kafka producer