El Psy Congroo

Spring-kafka学习笔记

Spring-kafka

KafkaMessageListenerContainer

  • AbstractMessageListenerContainer

    • enum AckMode
    • ContainerProperties containerProperties
      • topics等kafka相关配置
      • messageListener
      • consumerTaskExecutor ==拉消息线程 autoCommit模式下同时处理消息==
      • listenerTaskExecutor ==处理消息线程 非autoCommit模式==
      • errorHandler
      • consumerRebalanceListener
      • offsetCommitCallback
  • KafkaMessageListenerContainer

    • consumerFactory
    • listenerConsumer ==在consumerTaskExecutor线程中运行==
      • consumer
      • containerProperties,listener,errorHandler等container中设置的属性
      • ListenerInvoker invoker ==非autoCommit模式下在listenerTaskExecutor中运行onMessage方法==
      • listenerInvokerFuture
      • rebalanceListener
    • listenerConsumerFuture
    • listener
    • acknowledgingMessageListener
  • Consumer相关参数

    • enable.auto.commit 自动提交,默认5000ms一次,通过auto.commit.interval.ms设置
    • session.timeout.ms 客户端超时时间,broker超过该时间未收到客户端心跳,即认为客户端挂了,触发group rebalance。==心跳是在consumer.poll期间发送的,因此消息处理耗时过长也会引起超时==
    • session.timeout.ms request超时时间,需要大于session超时
    • heartbeat.interval.ms 心跳间隔,默认3秒,需要小于1/3的session超时,避免丢包导致超时
    • max.partition.fetch.bytes 每个分区单次获取最大字节,默认1M,==注意必须大于最大消息大小==,不然会导致消费阻塞,单次poll最大字节等于分区数量乘以该值
  • Container相关参数

    • pollTimeout 单次poll无数据时的block等待时间,有数据则立刻返回
    • pauseAfter offer到待处理队列的等待时间,默认10秒,超时则pause消费者,避免session超时
    • shutdownTimeout 默认10秒,consumer和listener线程关闭超时
  • Producer相关参数

    • retries 发送重试次数,默认0,只有遇到继承RetriableException的异常才会重试