温馨提示:本文基于 Kafka 2.2.1 版本。如果觉得源码阅读比较枯燥,本文的中间有 Sender 线程的工作流程图。
上文 已经详细介绍了 KafkaProducer send 方法的流程,该方法只是将消息追加到 KafKaProducer 的缓存中,并未真正的向 broker 发送消息,本文将来探讨 Kafka 的 Sender 线程。
在 KafkaProducer 中会启动一个单独的线程,其名称为 “kafka-producer-network-thread | clientID”,其中 clientID 为生产者的 id 。
1、Sender 线程详解
1.1 类图
我们先来看一下其各个属性的含义:
KafkaClient client kafka 网络通信客户端,主要封装与 broker 的网络通信。
RecordAccumulator accumulator 消息记录累积器,消息追加的入口(RecordAccumulator 的 append 方法)。
Metadata metadata 元数据管理器,即 topic 的路由分区信息。
boolean guaranteeMessageOrder 是否需要保证消息的顺序性。
int maxRequestSize 调用 send 方法发送的最大请求大小,包括 key、消息体序列化后的消息总大小不能超过该值。通过参数 max.request.size 来设置。
short acks 用来定义消息“已提交”的条件(标准),就是 Broker 端向客户端承偌已提交的条件,可选值如下0、-1、1.
int retries 重试次数。
Time time 时间工具类。
boolean running 该线程状态,为 true 表示运行中。
boolean forceClose 是否强制关闭,此时会忽略正在发送中的消息。
SenderMetrics sensors 消息发送相关的统计指标收集器。
int requestTimeoutMs 请求的超时时间。
long retryBackoffMs 请求失败时在重试之前等待的时间。
ApiVersions apiVersions API版本信息。
TransactionManager transactionManager 事务处理器。
Map TopicPartition, List ProducerBatch inFlightBatches 正在执行发送相关的消息批次。
1.2 run 方法详解
Sender#run
public void run() {
log.debug("Starting Kafka producer I/O thread.");
while (running) {
try {
runOnce(); // @1
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
}
log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records.");
while (!forceClose && (this.accumulator.hasUndrained() || this.client.inFlightRequestCount() 0)) { // @2
try {
runOnce();
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
}
if (forceClose) { // @3
log.debug("Aborting incomplete batches due to forced shutdown");
this.accumulator.abortIncompleteBatches();
}
try {
this.client.close(); // @4
} catch (Exception e) {
log.error("Failed to close network client", e);
}
log.debug("Shutdown of Kafka producer I/O thread has completed.");
}
代码@1:Sender 线程在运行状态下主要的业务处理方法,将消息缓存区中的消息向 broker 发送。
代码@2:如果主动关闭 Sender 线程,如果不是强制关闭,则如果缓存区还有消息待发送,再次调用 runOnce 方法将剩余的消息发送完毕后再退出。
代码@3:如果强制关闭 Sender 线程,则拒绝未完成提交的消息。
代码@4:关闭 Kafka Client 即网络通信对象。
接下来将分别探讨其上述方法的实现细节。
1.2.1 runOnce 详解
Sender#runOnce
void runOnce() {
// 此处省略与事务消息相关的逻辑
long currentTimeMs = time.milliseconds();
long pollTimeout = sendProducerData(currentTimeMs); // @1
client.poll(pollTimeout, currentTimeMs); // @2
}
本文不关注事务消息的实现原理,故省略了该部分的代码。
代码@1:调用 sendProducerData 方法发送消息。
代码@2:调用这个方法的作用?
接下来分别对上述两个方法进行深入探究。
1.1.2.1 sendProducerData
接下来将详细分析其实现步骤。
Sender#sendProducerData
Cluster cluster = metadata.fetch();
// get the list of partitions with data ready to send
RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
Step1:首先根据当前时间,根据缓存队列中的数据判断哪些 topic 的 哪些分区已经达到发送条件。达到可发送的条件将在 2.1.1.1 节详细分析。
Sender#sendProducerData
if (!result.unknownLeaderTopics.isEmpty()) {
for (String topic : result.unknownLeaderTopics)
this.metadata.add(topic);
log.debug("Requesting metadata update due to unknown leader topics from the batched records: {}",
result.unknownLeaderTopics);
this.metadata.requestUpdate();
}
Step2:如果在待发送的消息未找到其路由信息,则需要首先去 broker 服务器拉取对应的路由信息(分区的 leader 节点信息)。
Sender#sendProducerData
long notReadyTimeout = Long.MAX_VALUE;
while (iter.hasNext()) {
Node node = iter.next();
if (!this.client.ready(node, now)) {
iter.remove();
notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now));
}
}
Step3:移除在网络层面没有准备好的分区,并且计算在接下来多久的时间间隔内,该分区都将处于未准备状态。
1、在网络环节没有准备好的标准如下:
2、client pollDelayMs 预估分区在接下来多久的时间间隔内都将处于未转变好状态(not ready),其标准如下:
Sender#sendProducerData
// create produce requests
MapInteger, ListProducerBatch batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);
Step4:根据已准备的分区,从缓存区中抽取待发送的消息批次( ProducerBatch ),并且按照 nodeId:List 组织,注意,抽取后的 ProducerBatch 将不能再追加消息了,就算还有剩余空间可用,具体抽取将在下文在详细介绍。
Sender#sendProducerData
addToInflightBatches(batches);
public void addToInflightBatches(MapInteger, ListProducerBatch batches) {
for (ListProducerBatch batchList : batches.values()) {
addToInflightBatches(batchList);
}
}
private void addToInflightBatches(ListProducerBatch batches) {
for (ProducerBatch batch : batches) {
ListProducerBatch inflightBatchList = inFlightBatches.get(batch.topicPartition);
if (inflightBatchList == null) {
inflightBatchList = new ArrayList();
inFlightBatches.put(batch.topicPartition, inflightBatchList);
}
inflightBatchList.add(batch);
}
}
Step5:将抽取的 ProducerBatch 加入到 inFlightBatches 数据结构,该属性的声明如下:MapTopicPartition, List ProducerBatch inFlightBatches,即按照 topic-分区 为键,存放已抽取的 ProducerBatch,这个属性的含义就是存储待发送的消息批次。可以根据该数据结构得知在消息发送时以分区为维度反馈 Sender 线程的“积压情况”,max.in.flight.requests.per.connection 就是来控制积压的最大数量,如果积压达到这个数值,针对该队列的消息发送会限流。
Sender#sendProducerData
accumulator.resetNextBatchExpiryTime();
ListProducerBatch expiredInflightBatches = getExpiredInflightBatches(now);
ListProducerBatch expiredBatches = this.accumulator.expiredBatches(now);
expiredBatches.addAll(expiredInflightBatches);
Step6:从 inflightBatches 与 batches 中查找已过期的消息批次( ProducerBatch ),判断是否过期的标准是系统当前时间与 ProducerBatch 创建时间之差是否超过120s,过期时间可以通过参数 delivery.timeout.ms 设置。
Sender#sendProducerData
if (!expiredBatches.isEmpty())
log.trace("Expired {} batches in accumulator", expiredBatches.size());
for (ProducerBatch expiredBatch : expiredBatches) {
String errorMessage = "Expiring " + expiredBatch.recordCount + " record(s) for " + expiredBatch.topicPartition
+ ":" + (now - expiredBatch.createdMs) + " ms has passed since batch creation";
failBatch(expiredBatch, -1, NO_TIMESTAMP, new TimeoutException(errorMessage), false);
if (transactionManager != null && expiredBatch.inRetry()) {
// This ensures that no new batches are drained until the current in flight batches are fully resolved.
transactionManager.markSequenceUnresolved(expiredBatch.topicPartition);
}
}
Step7:处理已超时的消息批次,通知该批消息发送失败,即通过设置 KafkaProducer#send 方法返回的凭证中的 FutureRecordMetadata 中的 ProduceRequestResult result,使之调用其 get 方法不会阻塞。
Sender#sendProducerData
sensors.updateProduceRequestMetrics(batches);
Step8:收集统计指标,本文不打算详细分析,但后续会专门对 Kafka 的 Metrics 设计进行一个深入的探讨与学习。
Sender#sendProducerData
long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
pollTimeout = Math.min(pollTimeout, this.accumulator.nextExpiryTimeMs() - now);
pollTimeout = Math.max(pollTimeout, 0);
if (!result.readyNodes.isEmpty()) {
log.trace("Nodes with data ready to send: {}", result.readyNodes);
pollTimeout = 0;
}
Step9:设置下一次的发送延时,待补充详细分析。
Sender#sendProducerData
sendProduceRequests(batches, now);
private void sendProduceRequests(MapInteger, ListProducerBatch collated, long now) {
for (Map.EntryInteger, ListProducerBatch entry : collated.entrySet())
sendProduceRequest(now, entry.getKey(), acks, requestTimeoutMs, entry.getValue());
}
Step10:该步骤按照 brokerId 分别构建发送请求,即每一个 broker 会将多个 ProducerBatch 一起封装成一个请求进行发送,同一时间,每一个 与 broker 连接只会只能发送一个请求,注意,这里只是构建请求,并最终会通过 NetworkClient#send 方法,将该批数据设置到 NetworkClient 的待发送数据中,此时并没有触发真正的网络调用。
sendProducerData 方法就介绍到这里了,既然这里还没有进行真正的网络请求,那在什么时候触发呢?
我们继续回到 runOnce 方法。
1.2.1.2 NetworkClient 的 poll 方法
public ListClientResponse poll(long timeout, long now) {
ensureActive();
if (!abortedSends.isEmpty()) {
// If there are aborted sends because of unsupported version exceptions or disconnects,
// handle them immediately without waiting for Selector#poll.
ListClientResponse responses = new ArrayList();
handleAbortedSends(responses);
completeResponses(responses);
return responses;
}
long metadataTimeout = metadataUpdater.maybeUpdate(now); // @1
try {
this.selector.poll(Utils.min(timeout, metadataTimeout, defaultRequestTimeoutMs)); // @2
} catch (IOException e) {
log.error("Unexpected error during I/O", e);
}
// process completed actions
long updatedNow = this.time.milliseconds();
ListClientResponse responses = new ArrayList(); // @3
handleCompletedSends(responses, updatedNow);
handleCompletedReceives(responses, updatedNow);
handleDisconnections(responses, updatedNow);
handleConnections();
handleInitiateApiVersionRequests(updatedNow);
handleTimedOutRequests(responses, updatedNow);
completeResponses(responses); // @4
return responses;
}
本文并不会详细深入探讨其网络实现部分,Kafka 的 网络通讯后续我会专门详细的介绍,在这里先点出其关键点。
代码@1:尝试更新云数据。
代码@2:触发真正的网络通讯,该方法中会通过收到调用 NIO 中的 Selector#select() 方法,对通道的读写就绪事件进行处理,当写事件就绪后,就会将通道中的消息发送到远端的 broker。
代码@3:然后会消息发送,消息接收、断开连接、API版本,超时等结果进行收集。
代码@4:并依次对结果进行唤醒,此时会将响应结果设置到 KafkaProducer#send 方法返回的凭证中,从而唤醒发送客户端,完成一次完整的消息发送流程。
Sender 发送线程的流程就介绍到这里了,接下来首先给出一张流程图,然后对上述流程中一些关键的方法再补充深入探讨一下。
1.2.2 run 方法流程图
根据上面的源码分析得出上述流程图,图中对重点步骤也详细标注了其关键点。下面我们对上述流程图中 Sender 线程依赖的相关类的核心方法进行解读,以便加深 Sender 线程的理解。
由于在讲解 Sender 发送流程中,大部分都是调用 RecordAccumulator 方法来实现其特定逻辑,故接下来重点对上述涉及到 RecordAccumulator 的方法进行一个详细剖析,加强对 Sender 流程的理解。
2、RecordAccumulator 核心方法详解
2.1 RecordAccumulator 的 ready 方法详解
该方法主要就是根据缓存区中的消息,判断哪些分区已经达到发送条件。
RecordAccumulator#ready
public ReadyCheckResult ready(Cluster cluster, long nowMs) {
SetNode readyNodes = new HashSet();
long nextReadyCheckDelayMs = Long.MAX_VALUE;
SetString unknownLeaderTopics = new HashSet();
boolean exhausted = this.free.queued() 0;
for (Map.EntryTopicPartition, DequeProducerBatch entry : this.batches.entrySet()) { // @1
TopicPartition part = entry.getKey();
DequeProducerBatch deque = entry.getValue();
Node leader = cluster.leaderFor(part); // @2
synchronized (deque) {
if (leader == null && !deque.isEmpty()) { // @3
// This is a partition for which leader is not known, but messages are available to send.
// Note that entries are currently not removed from batches when deque is empty.
unknownLeaderTopics.add(part.topic());
} else if (!readyNodes.contains(leader) && !isMuted(part, nowMs)) { // @4
ProducerBatch batch = deque.peekFirst();
if (batch != null) {
long waitedTimeMs = batch.waitedTimeMs(nowMs);
boolean backingOff = batch.attempts() 0 && waitedTimeMs retryBackoffMs;
long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;
boolean full = deque.size() 1 || batch.isFull();
boolean expired = waitedTimeMs = timeToWaitMs;
boolean sendable = full || expired || exhausted || closed || flushInProgress();
if (sendable && !backingOff) { // @5
readyNodes.add(leader);
} else {
long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);
// Note that this results in a conservative estimate since an un-sendable partition may have
// a leader that will later be found to have sendable data. However, this is good enough
// since we'll just wake up and then sleep again for the remaining time.
nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);
}
}
}
}
}
return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeaderTopics);
}
代码@1:对生产者缓存区 ConcurrentHashMapTopicPartition, Deque ProducerBatch batches 遍历,从中挑选已准备好的消息批次。
代码@2:从生产者元数据缓存中尝试查找分区(TopicPartition) 的 leader 信息,如果不存在,当将该 topic 添加到 unknownLeaderTopics (代码@3),稍后会发送元数据更新请求去 broker 端查找分区的路由信息。
代码@4:如果不在 readyNodes 中就需要判断是否满足条件,isMuted 与顺序消息有关,本文暂时不关注,在后面的顺序消息部分会重点探讨。
代码@5:这里就是判断是否准备好的条件,先一个一个来解读局部变量的含义。
long waitedTimeMs 该 ProducerBatch 已等待的时长,等于当前时间戳 与 ProducerBatch 的 lastAttemptMs 之差,在 ProducerBatch 创建时或需要重试时会将当前的时间赋值给lastAttemptMs。
retryBackoffMs 当发生异常时发起重试之前的等待时间,默认为 100ms,可通过属性 retry.backoff.ms 配置。
batch.attempts() 该批次当前已重试的次数。
backingOff 后台发送是否关闭,即如果需要重试并且等待时间小于 retryBackoffMs ,则 backingOff = true,也意味着该批次未准备好。
timeToWaitMs send 线程发送消息需要的等待时间,如果 backingOff 为 true,表示该批次是在重试,并且等待时间小于系统设置的需要等待时间,这种情况下 timeToWaitMs = retryBackoffMs 。否则需要等待的时间为 lingerMs。
- Deque ProducerBatch 该队列的个数大于1,表示肯定有一个 ProducerBatch 已写满。
- ProducerBatch 已写满。
- 该批次已写满。(full = true)。
- 已等待系统规定的时长。(expired = true)
- 发送者内部缓存区已耗尽并且有新的线程需要申请(exhausted = true)。
- 该发送者的 close 方法被调用(close = true)。
- 该发送者的 flush 方法被调用。
boolean expired
是否过期,等于已经等待的时间是否大于需要等待的时间,如果把发送看成定时发送的话,expired 为 true 表示定时器已到达触发点,即需要执行。
boolean exhausted
当前生产者缓存已不够,创建新的 ProducerBatch 时阻塞在申请缓存空间的线程大于0,此时应立即将缓存区中的消息立即发送到服务器。
boolean sendable 是否可发送。其满足下面的任意一个条件即可:
2.2 RecordAccumulator 的 drain方法详解
RecordAccumulator#drain
public MapInteger, ListProducerBatch drain(Cluster cluster, SetNode nodes, int maxSize, long now) { // @1
if (nodes.isEmpty())
return Collections.emptyMap();
MapInteger, ListProducerBatch batches = new HashMap();
for (Node node : nodes) {
ListProducerBatch ready = drainBatchesForOneNode(cluster, node, maxSize, now); // @2
batches.put(node.id(), ready);
}
return batches;
}
代码@1:我们首先来介绍该方法的参数:
Cluster cluster 集群信息。
Set Node nodes 已准备好的节点集合。
int maxSize 一次请求最大的字节数。
long now 当前时间。
代码@2:遍历所有节点,调用 drainBatchesForOneNode 方法抽取数据,组装成 MapInteger /** brokerId */, List ProducerBatch batches。
接下来重点来看一下 drainBatchesForOneNode。
RecordAccumulator#drainBatchesForOneNode
private ListProducerBatch drainBatchesForOneNode(Cluster cluster, Node node, int maxSize, long now) {
int size = 0;
ListPartitionInfo parts = cluster.partitionsForNode(node.id()); // @1
ListProducerBatch ready = new ArrayList();
int start = drainIndex = drainIndex % parts.size(); // @2
do { // @3
PartitionInfo part = parts.get(drainIndex);
TopicPartition tp = new TopicPartition(part.topic(), part.partition());
this.drainIndex = (this.drainIndex + 1) % parts.size();
if (isMuted(tp, now))
continue;
DequeProducerBatch deque = getDeque(tp); // @4
if (deque == null)
continue;
synchronized (deque) {
// invariant: !isMuted(tp,now) && deque != null
ProducerBatch first = deque.peekFirst(); // @5
if (first == null)
continue;
// first != null
boolean backoff = first.attempts() 0 && first.waitedTimeMs(now) retryBackoffMs; // @6
// Only drain the batch if it is not during backoff period.
if (backoff)
continue;
if (size + first.estimatedSizeInBytes() maxSize && !ready.isEmpty()) { // @7
break;
} else {
if (shouldStopDrainBatchesForPartition(first, tp))
break;
// 这里省略与事务消息相关的代码,后续会重点学习。
batch.close(); // @8
size += batch.records().sizeInBytes();
ready.add(batch);
batch.drained(now);
}
}
} while (start != drainIndex);
return ready;
}
代码@1:根据 brokerId 获取该 broker 上的所有主分区。
代码@2:初始化 start。这里首先来阐述一下 start 与 drainIndex 。
代码@3:循环从缓存区抽取对应分区中累积的数据。
代码@4:根据 topic + 分区号从生产者发送缓存区中获取已累积的双端Queue。
代码@5:从双端队列的头部获取一个元素。(消息追加时是追加到队列尾部)。
代码@6:如果当前批次是重试,并且还未到阻塞时间,则跳过该分区。
代码@7:如果当前已抽取的消息总大小 加上新的消息已超过 maxRequestSize,则结束抽取。
代码@8:将当前批次加入到已准备集合中,并关闭该批次,即不在允许向该批次中追加消息。
关于消息发送就介绍到这里,NetworkClient 的 poll 方法内部会调用 Selector 执行就绪事件的选择,并将抽取的消息通过网络发送到 Broker 服务器,关于网络后面的具体实现,将在后续文章中单独介绍。
如果文章对您有所帮助的话,麻烦帮忙点【在看】,谢谢您的认可与支持。
中间件兴趣圈已经陆续发表了源码分析Dubbo、ElasticJob、Mybatis、RocketMQ系列,RocketMQ实战与案例分析、ElasticSearch使用指南等。
原文始发于微信公众号(中间件兴趣圈):