public ByteBuffer allocate(int size, long maxTimeToBlockMs)throws InterruptedException { if (size > this.totalMemory) thrownewIllegalArgumentException("Attempt to allocate " + size + " bytes, but there is a hard limit of " + this.totalMemory + " on memory allocations.");
this.lock.lock(); /* 同步加锁 */ try { // check if we have a free buffer of the right size pooled /* 请求的是 poolableSize 指定大小的 ByteBuffer,且 free 中有空闲的 ByteBuffer */ if (size == poolableSize && !this.free.isEmpty()) returnthis.free.pollFirst(); /* 返回合适的 ByteBuffer */
/* 当申请的空间大小不是 poolableSize,则执行下面的操作 */
// now check if the request is immediately satisfiable with the // memory on hand or if we need to block /* free 队列中都是 poolableSize 大小的 ByteBuffer,可以直接计算整个 free 队列的空间 */ intfreeListSize=this.free.size() * this.poolableSize; if (this.availableMemory + freeListSize >= size) { // we have enough unallocated or pooled memory to immediately // satisfy the request /* 为了让 availableMemory > size,freeUp() 方法会从 free 队列中不断释放 * ByteBuffer,直到 availableMemory 满足这次申请 */ freeUp(size); this.availableMemory -= size; /* 减少 availableMemory */ lock.unlock(); /* 解锁 */ /* 这里没有用 free 队列中的 buffer,而是直接分配 size 大小的 HeapByteBuffer */ return ByteBuffer.allocate(size); } else { /* 没有足够空间,只能阻塞了 */ // we are out of memory and will have to block intaccumulated=0; ByteBufferbuffer=null; ConditionmoreMemory=this.lock.newCondition(); longremainingTimeToBlockNs= TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs); /* 将 Condition 添加到 waiters 中 */ this.waiters.addLast(moreMemory); // loop over and over until we have a buffer or have reserved // enough memory to allocate one while (accumulated < size) { /* 循环等待 */ longstartWaitNs= time.nanoseconds(); long timeNs; boolean waitingTimeElapsed; try { /* 阻塞 */ waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS); } catch (InterruptedException e) { /* 异常,移除此线程对应的 Condition */ this.waiters.remove(moreMemory); throw e; } finally { /* 统计阻塞时间 */ longendWaitNs= time.nanoseconds(); timeNs = Math.max(0L, endWaitNs - startWaitNs); this.waitTime.record(timeNs, time.milliseconds()); }
if (waitingTimeElapsed) { /* 超时,报错 */ this.waiters.remove(moreMemory); thrownewTimeoutException("Failed to allocate memory within the configured max blocking time " + maxTimeToBlockMs + " ms."); }
remainingTimeToBlockNs -= timeNs; // check if we can satisfy this request from the free list, // otherwise allocate memory /* 请求的是 poolableSize 大小的 ByteBuffer,且 free 中有空间的 ByteBuffer */ if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) { // just grab a buffer from the free list buffer = this.free.pollFirst(); accumulated = size; } else { /* 先分配一部分空间,并继续等待空闲空间 */ // we'll need to allocate memory, but we may only get // part of what we need on this iteration freeUp(size - accumulated); intgot= (int) Math.min(size - accumulated, this.availableMemory); this.availableMemory -= got; accumulated += got; } }
// remove the condition for this thread to let the next thread // in line start getting memory /* 已经成功分配空间,移除 Condition */ Conditionremoved=this.waiters.removeFirst(); if (removed != moreMemory) thrownewIllegalStateException("Wrong condition: this shouldn't happen.");
// signal any additional waiters if there is more memory left // over for them /* 还是要用空闲空间,就唤醒下一个线程 */ if (this.availableMemory > 0 || !this.free.isEmpty()) { if (!this.waiters.isEmpty()) this.waiters.peekFirst().signal(); }
// unlock and return the buffer lock.unlock(); /* 解锁 */ if (buffer == null) return ByteBuffer.allocate(size); else return buffer; } } finally { /* 解锁 */ if (lock.isHeldByCurrentThread()) lock.unlock(); } }
public RecordAppendResult append(TopicPartition tp, long timestamp, byte[] key, byte[] value, Callback callback, long maxTimeToBlock)throws InterruptedException { // We keep track of the number of appending thread to make sure we do not miss batches in // abortIncompleteBatches(). /* 统计正在向 RecordsAccumulator 中追加数据的线程数 */ appendsInProgress.incrementAndGet(); try { /* 1. 查找 TopicPartition 对应的 Deque */ // check if we have an in-progress batch Deque<RecordBatch> dq = getOrCreateDeque(tp); synchronized (dq) { /* 2. 对 Deque 对象加锁 */ /* 边界检查 */ if (closed) thrownewIllegalStateException("Cannot send after the producer is closed."); /* 3. 向 Deque 中最后一个 RecordBatch 追加 Record */ RecordAppendResultappendResult= tryAppend(timestamp, key, value, callback, dq); if (appendResult != null) return appendResult; /* 5. 追加成功直接返回 */ } /* 4. synchronized 块结束,自动解锁 */
// we don't have an in-progress record batch try to allocate a new batch /* 6. 追加失败,从 BufferPool 中申请新空间 */ intsize= Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value)); log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition()); ByteBufferbuffer= free.allocate(size, maxTimeToBlock); synchronized (dq) { // Need to check if producer is closed again after grabbing the dequeue lock. /* 边界检查 */ if (closed) thrownewIllegalStateException("Cannot send after the producer is closed.");
/* 7. 对 Deque 加锁后,再次调用 tryAppend() 方法尝试追加 Record */ RecordAppendResultappendResult= tryAppend(timestamp, key, value, callback, dq); if (appendResult != null) { /* 8. 追加成功,则返回 */ // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often... free.deallocate(buffer); /* 释放 7 申请的新空间 */ return appendResult; } MemoryRecordsrecords= MemoryRecords.emptyRecords(buffer, compression, this.batchSize); RecordBatchbatch=newRecordBatch(tp, records, time.milliseconds()); /* 9. 在新创建的 RecordBatch 中追加 Record,并将其添加到 batches 集合中 */ FutureRecordMetadatafuture= Utils.notNull(batch.tryAppend(timestamp, key, value, callback, time.milliseconds()));
Nodeleader= cluster.leaderFor(part); /* 查找分区的 Leader 副本所在的 Node */ synchronized (deque) { /* 加锁读取 deque 的元素 */ /* 根据 Cluster 的信息检查 Leader,Leader 找不到,肯定不能发送消息 */ if (leader == null && !deque.isEmpty()) { // This is a partition for which leader is not known, but messages are available to send. // Note that entries are currently not removed from batches when deque is empty. /* 这里不为空之后会触发 Metadata 的更新 */ unknownLeaderTopics.add(part.topic()); } elseif (!readyNodes.contains(leader) && !muted.contains(part)) { /* 只取 Deque 中的第一个 RecordBatch */ RecordBatchbatch= deque.peekFirst(); if (batch != null) { booleanbackingOff= batch.attempts > 0 && batch.lastAttemptMs + retryBackoffMs > nowMs; longwaitedTimeMs= nowMs - batch.lastAttemptMs; longtimeToWaitMs= backingOff ? retryBackoffMs : lingerMs; longtimeLeftMs= Math.max(timeToWaitMs - waitedTimeMs, 0); /* Deque 中有多个 RecordBatch 或是第一个 RecordBatch 是否满了 */ booleanfull= deque.size() > 1 || batch.records.isFull(); /* 是否超时了 */ booleanexpired= waitedTimeMs >= timeToWaitMs; booleansendable= full || expired || exhausted /* 是否有其他线程在等待 BufferPool 释放空间(即 BufferPool 的空间耗尽了)*/ || closed /* Sender 线程准备关闭 */ || flushInProgress(); /* 是否有线程正在等待 flush 操作完成 */ if (sendable && !backingOff) { readyNodes.add(leader); } else { // Note that this results in a conservative estimate since an un-sendable partition may have // a leader that will later be found to have sendable data. However, this is good enough // since we'll just wake up and then sleep again for the remaining time. /* 记录下次需要调用 ready() 方法检查的时间间隔 */ nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs); } } } } }
public Map<Integer, List<RecordBatch>> drain(Cluster cluster, Set<Node> nodes, int maxSize, long now) { if (nodes.isEmpty()) return Collections.emptyMap();
/* 转换之后的结果 */ Map<Integer, List<RecordBatch>> batches = newHashMap<>(); for (Node node : nodes) { /* 遍历指定的 ready Node 集合 */ intsize=0; /* 获取当前 Node 上的分区集合 */ List<PartitionInfo> parts = cluster.partitionsForNode(node.id()); /* 记录要发送的 RecordBatch */ List<RecordBatch> ready = newArrayList<>(); /* to make starvation less likely this loop doesn't start at 0 */ /* drainIndex 是 batches 的下标,记录上次发送停止时的位置,下次继续从此位置开始发送 * 若一直从索引 0 的队列开始发送,可能会出现一直发送前几个分区的消息的情况,造成其他分区饥饿 */ intstart= drainIndex = drainIndex % parts.size(); do { PartitionInfopart= parts.get(drainIndex); /* 获取分区详细情况 */ TopicPartitiontp=newTopicPartition(part.topic(), part.partition()); // Only proceed if the partition has no in-flight batches. if (!muted.contains(tp)) { /* 获取对应的 RecordBatch 队列 */ Deque<RecordBatch> deque = getDeque(newTopicPartition(part.topic(), part.partition())); /* 边界检查 */ if (deque != null) { synchronized (deque) { RecordBatchfirst= deque.peekFirst(); if (first != null) { booleanbackoff= first.attempts > 0 && first.lastAttemptMs + retryBackoffMs > now; // Only drain the batch if it is not during backoff period. if (!backoff) { if (size + first.records.sizeInBytes() > maxSize && !ready.isEmpty()) { // there is a rare case that a single batch size is larger than the request size due // to compression; in this case we will still eventually send this batch in a single // request break; /* 队列已满,结束循环,一般是一个请求的大小 */ } else { /* 从队列中获取一个 RecordBatch,并将这个 RecordBatch 放到 ready 集合中 */ RecordBatchbatch= deque.pollFirst(); /* 关闭 Compressor 及底层输出流,并将 MemoryRecords 设置为只读 */ batch.records.close(); size += batch.records.sizeInBytes(); ready.add(batch); batch.drainedMs = now; } } } } } } /* 更新 drainIndex */ this.drainIndex = (this.drainIndex + 1) % parts.size(); } while (start != drainIndex); batches.put(node.id(), ready); } return batches; }