生产篇
使用
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.examples;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
public class Producer extends Thread {
private final KafkaProducer<Integer, String> producer;
private final String topic;
private final Boolean isAsync;
private int numRecords;
private final CountDownLatch latch;
public Producer(final String topic,
final Boolean isAsync,
final String transactionalId,
final boolean enableIdempotency,
final int numRecords,
final int transactionTimeoutMs,
final CountDownLatch latch) {
Properties props = new Properties();
//指定Kafka集群节点列表(全部 or 部分均可),用于KafkaProducer初始获取Server端元数据(如完整节点列表、Partition分布等等)
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
if (transactionTimeoutMs > 0) {
props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, transactionTimeoutMs);
}
if (transactionalId != null) {
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);
}
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotency);
//指定服务端有多少个副本完成同步,才算该Producer发出的消息写入成功
props.put(ProducerConfig.ACKS_CONFIG, "-1");
//失败重试次数;
props.put(ProducerConfig.RETRIES_CONFIG, "3");
producer = new KafkaProducer<>(props);
this.topic = topic;
this.isAsync = isAsync;
this.numRecords = numRecords;
this.latch = latch;
}
KafkaProducer<Integer, String> get() {
return producer;
}
@Override
public void run() {
// key用来决定目标Partition
int messageKey = 0;
int recordsSent = 0;
while (recordsSent < numRecords) {
//传递业务数据
String messageStr = "Message_" + messageKey;
long startTime = System.currentTimeMillis();
if (isAsync) { // Send asynchronously
producer.send(new ProducerRecord<>(topic,
messageKey,
messageStr), new DemoCallBack(startTime, messageKey, messageStr));
} else { // Send synchronously
try {
// KafkaProducer中各类send方法均返回Future,并不会直接返回发送结果,其原因便是线程模型设计。
producer.send(new ProducerRecord<>(topic,
messageKey,
messageStr)).get();
System.out.println("Sent message: (" + messageKey + ", " + messageStr + ")");
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
messageKey += 2;
recordsSent += 1;
}
System.out.println("Producer sent " + numRecords + " records successfully");
latch.countDown();
}
}
class DemoCallBack implements Callback {
private final long startTime;
private final int key;
private final String message;
public DemoCallBack(long startTime, int key, String message) {
this.startTime = startTime;
this.key = key;
this.message = message;
}
/**
* A callback method the user can implement to provide asynchronous handling of request completion. This method will
* be called when the record sent to the server has been acknowledged. When exception is not null in the callback,
* metadata will contain the special -1 value for all fields except for topicPartition, which will be valid.
*
* @param metadata The metadata for the record that was sent (i.e. the partition and offset). An empty metadata
* with -1 value for all fields except for topicPartition will be returned if an error occurred.
* @param exception The exception thrown during processing of this record. Null if no error occurred.
*/
public void onCompletion(RecordMetadata metadata, Exception exception) {
long elapsedTime = System.currentTimeMillis() - startTime;
if (metadata != null) {
System.out.println(
"message(" + key + ", " + message + ") sent to partition(" + metadata.partition() +
"), " +
"offset(" + metadata.offset() + ") in " + elapsedTime + " ms");
} else {
exception.printStackTrace();
}
}
}
图解
1)ProducerInterceptors:消息过滤器,对消息进行发送拦截;
2)Serializer:对消息key和value进行序列化;
3)Partitioner:为消息选择合适的分区;
4)RecordAccumulator:消息收集器,可以认为是主线程和Sender线程之间的消息缓冲区,对消息进行分批量发送;
5)Sender:负责从消息收集器获取批量消息发送;
6)创建ClientRequest;
7)将ClientRequest交给NetworkClient,准备发送;
8)NetworkClient将RequestSend发送给KafkaChannel缓存区域;
9)向Kafka服务执行网络IO请求;
10)收到响应,将CientResponse交给ClientRequest的回调函数;
11)调用RecordBatch的回调函数,最终会指定每条消息的回调函数;
RecordBatch
batch部分
1)baseOffset:当前RecordBatch的起始位移,Record中的offset delta与该baseOffset相加才能得到真正的offset值。当RecordBatch还在producer端的时候,offset是producer分配的一个值(不partition的offset);
2)batchLength:RecordBatch的总长度,从`partition leader epoch`到末尾的长度;
3)partitionLeaderEpoch:用于标记目标 partition中leader replica的纪元信息,可以看做是分区leader的版本号或者更新次数;
4)magic:魔数值,V0就是0,V1就是1,V2就是2;
5)crc32校验码:参与校验的部分是从attributes到RecordBatch末尾的全部数据;partitionLeaderEpoch 不在 CRC 里面是因为每次 broker 收到 RecordBatch 的时候,都会赋值 partitionLeaderEpoch,如果包含在 CRC 里面会导致需要重新计算CRC;
6)attributes:从 V1 版本中的 8 位扩展到 16 位,0~2 位表示压缩类型,第 3 位表示时间戳类型,第 4 位表示是否是事务型记录。所谓“事务”是Kafka的新功能,开启事务之后,只有在事务提交之后,事务型 consumer 才可以看到记录。5表示是否是 Control Record,这类记录总是单条出现,被包含在一个 control record batch 里面,它可以用于标记“事务是否已经提交”、“事务是否已经中止” 等,它只会在 broker 内处理,不会被传输给 consumer 和 producer,即对客户端是透明的;
7)lastOffsetDelta:RecordBatch最后一个Record的相对位移,用于broker确认RecordBatch中Records的组装正确性。
8)firstTimestamp:RecordBatch第一条Record的时间戳。
9)maxTimestamp:RecordBatch中最大的时间戳,一般是最后一条消息的时间戳,用于broker确认RecordBatch中 Records 的组装正确性。
10)producerID:生产者编号,用于支持事务和幂等性;
11)producerEpoch:生产者纪元,用于支持事务和幂等性;
12)baseSequence:基础序号,用于支持事务和幂等性,校验是否是重复Record;
13)recordCount:Record 的数量;
14)Records:消息Record集合,每个Record结构如下;
Record部分:
1)所有标识长度的字段都是变长字段(varint或者varlong)
2)timestamp和offset是偏移量,也叫做delta值;
3)attributes字段不再标识任务信息;
4)headers:增加的Header相关扩展,每个Header结构如下;
Header部分:
1)headerKeyLength:消息头key的长度;
2、headerKey:消息头key的值;
3、headerValueLength:消息头值的长度;
4、headerValue:消息头的值;
RecordAccumulator
业务线程(或者叫做主线程)使用KafkaProducer.send()方法发送message的时候,会先将其写入RecordAccumulator,然后主线程就从send方法中返回了,此时message还未真正发送到Kafka,而是暂存在消息收集器中了,然后主线程继续使用send方法发送message不断向RecordAccumulator追加消息,当RecordAccumulator中缓存的message达到一定阈值(batch大小/linger.ms时间)的时候,会唤醒Sender线程发送RecordAccumulator,发送到kafka,以达到减少网络请求开销,提高吞吐目的;
RecordAccumulator至少有主线程和Sender线程访问,所以要保证线程安全性。
重要成员变量
1)batchs
在RecordAccumulator内部,维护了一个Map集合batchs,用于缓存发送到Kafka服务端的批次消息,因为需要保证线程安全,所以类型是ConcurrentMap<TopicPartition, Deque<ProducerBatch>>,并且初始化的时候设置的对象是Kafka自定义的一个对象CopyOnWriteMap,Deque的实例是ArrayDeque,这个是非线程安全的,所以在操作的时候是要加锁的
2)free
free是缓冲池对象(BufferPool),用于存储消息字节,使用的是ByteBuffer数据结构,简单的说BufferPool就是管理ByteBuffer的分配和释放。
3)incomplete
存放未发送或者发送未Ack的ProducerBatch,类型是IncompleteBatches,在该类的内部维护了一个Set集合,存放这些ProducerBatch;
MemoryRecordsBuilder
1.在MemoryRecordsBuilder内,将ByteBuffer封装到ByteBufferOutputStream,ByteBufferOutputStream实现了OutputStream,所以可以按照流的方式写入数据。
2.向当前RecordBatch追加消息Record方法appendWithOffset
- 首先是是进行一些消息格式的验证:
- 当前不处理ControlBatch,ControlBatch有自己的逻辑处理;
- 当前Record要追加的记录肯定是要在最近一次lastOffset之后才是合理的;
- 消息Record的timestamp需是大于0的合法数字;
- 然后调用数据写入方法写入数据流DataOutputStream,更新当前RecordBatch的相关元数据;
3.该类里面还有一个重要的方法就是hasRoomFor方法,该方法用于判断当前的MemoryRecords是否有足够的空间追加新的消息;
4.判断当前RecordBatch的空间是否满的方法:isFull
5.预估已写入的字节大小方法:estimatedBytesWritten
6.获取MemoryRecords的build()
MemoryRecordsBuilder负责创建MemoryRecords,方法build()的作用就是返回创建和写入的MemoryRecords,要返回MemoryRecords了,那么之后写入相关的操作就要禁止了
public void close() {
if (aborted)
throw new IllegalStateException("Cannot close MemoryRecordsBuilder as it has already been aborted");
if (builtRecords != null)
return;
// 简单的参数校验
validateProducerState();
// 流资源的释放和关闭
closeForRecordAppends();
// 初始化records位置
if (numRecords == 0L) {
buffer().position(initialPosition);
builtRecords = MemoryRecords.EMPTY;
} else {
if (magic > RecordBatch.MAGIC_VALUE_V1)
this.actualCompressionRatio = (float) writeDefaultBatchHeader() / this.uncompressedRecordsSizeInBytes;
else if (compressionType != CompressionType.NONE)
this.actualCompressionRatio = (float) writeLegacyCompressedWrapperHeader() / this.uncompressedRecordsSizeInBytes;
//复制一份ByteBuffer出来,然后切换到读模式(flip),通过slice()方法得到新一个独立的ByteBuffer,设置给builtRecords,builtRecords对象里面持有对象ByteBuffer
ByteBuffer buffer = buffer().duplicate();
buffer.flip();
buffer.position(initialPosition);
builtRecords = MemoryRecords.readableRecords(buffer.slice());
}
}
ProducerBatch
ProducerBatch是Sender线程发送的对象,对应就是上面提到的RecordBatch,ProducerBatch包含多条Record,生产者每个批次发送的消息大小通过batch.size决定,默认16KB;
在RecordAccumulator中的batchs队列中的每个元素就是ProducerBatch,第一次发送消息的时候会消息所在分区的ProducerBatch队列,并创建ProducerBatch将该条消息追加在ProducerBatch,然后有新的消息发送时,就会追加消息到对应TopicPartition的ProducerBatch队列里面最后一个ProducerBatch中,如果ProducerBatch空间满了,会再创建一个新的ProducerBatch来存放消息;
1.追加消息到ProducerBatch
2.拆分ProducerBatch(split方法)
当ProducerBatch过大时,可以通过slice方法将一个大的batch拆分为更小的batch。这个方式是在sender线程中发送失败返回”MESSAGE_TOO_LARGE"时,需要将batch拆分并重新加入消息收集器的batch队列
public Deque<ProducerBatch> split(int splitBatchSize) {
// 分割结果
Deque<ProducerBatch> batches = new ArrayDeque<>();
// 获取当前batch的MemoryRecords对象,也就是获取当前batch的ByteBuffer中存储的消息;
MemoryRecords memoryRecords = recordsBuilder.build();
Iterator<MutableRecordBatch> recordBatchIter = memoryRecords.batches().iterator();
if (!recordBatchIter.hasNext())
throw new IllegalStateException("Cannot split an empty producer batch.");
RecordBatch recordBatch = recordBatchIter.next();
if (recordBatch.magic() < MAGIC_VALUE_V2 && !recordBatch.isCompressed())
throw new IllegalArgumentException("Batch splitting cannot be used with non-compressed messages " +
"with version v0 and v1");
if (recordBatchIter.hasNext())
throw new IllegalArgumentException("A producer batch should only have one record batch.");
Iterator<Thunk> thunkIter = thunks.iterator();
// We always allocate batch size because we are already splitting a big batch.
// And we also Retain the create time of the original batch.
ProducerBatch batch = null;
// 遍历batch的record
for (Record record : recordBatch) {
assert thunkIter.hasNext();
Thunk thunk = thunkIter.next();
if (batch == null)
//首次循环会调用createBatchOffAccumulatorForRecord()方法来分配一个ByteBuffer内存空间,空间大小根据Record和splitBatchSize(batch.size)最大值来决定,然后创建MemoryRecordsBuilder和ProducerBatch对象,这里同正常创建MemoryRecordsBuilder和ProducerBatch一样,不同的是这里的Record记录的大小可能超过batch.size;
batch = createBatchOffAccumulatorForRecord(record, splitBatchSize);
//调用方法tryAppendForSplit来追加当前记录到新创建的更大的ProducerBatch,tryAppendForSplit追加Record的方式同上面的tryAppend基本差不多,不同的是这里的Thunk对象使用已经存在的即可,这里Thunk里面使用的是元数据FutureRecordMetadata 链,也就是加在原来batch的future后面;在batch拆分的情况下,此时的ProducerBatch应该只有一条Record的,如果在下次循环或者某个循环内,tryAppendForSplit失败的情况下,也是空间不足了,就再创建新的ProducerBatch,以此类推,直到把所有的Records记录循环完毕;
// A newly created batch can always host the first message.
if (!batch.tryAppendForSplit(record.timestamp(), record.key(), record.value(), record.headers(), thunk)) {
batches.add(batch);
batch.closeForRecordAppends();
batch = createBatchOffAccumulatorForRecord(record, splitBatchSize);
batch.tryAppendForSplit(record.timestamp(), record.key(), record.value(), record.headers(), thunk);
}
}
// 拆分后的batch存放在队列batches里面,并结束拆分前的batch的发送结果produceFuture,调用了ProduceRequestResult.done()方法之后,就会唤醒等待在ProduceRequestResult处理结果上的await的线程;
// Close the last batch and add it to the batch list after split.
if (batch != null) {
batches.add(batch);
batch.closeForRecordAppends();
}
produceFuture.set(ProduceResponse.INVALID_OFFSET, NO_TIMESTAMP, index -> new RecordBatchTooLargeException());
//会唤醒等待在ProduceRequestResult处理结果上的await的线程;
produceFuture.done();
// 当设置baseSequence基础序号的时候,需要设置ProducerState用于支持事务和幂等性,校验是否是重复Record;
if (hasSequence()) {
int sequence = baseSequence();
ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(producerId(), producerEpoch());
for (ProducerBatch newBatch : batches) {
newBatch.setProducerState(producerIdAndEpoch, sequence, isTransactional());
sequence += newBatch.recordCount;
}
}
return batches;
}
/**
* This method is only used by {@link #split(int)} when splitting a large batch to smaller ones.
* @return true if the record has been successfully appended, false otherwise.
*/
private boolean tryAppendForSplit(long timestamp, ByteBuffer key, ByteBuffer value, Header[] headers, Thunk thunk) {
if (!recordsBuilder.hasRoomFor(timestamp, key, value, headers)) {
return false;
} else {
// No need to get the CRC.
this.recordsBuilder.append(timestamp, key, value, headers);
this.maxRecordSize = Math.max(this.maxRecordSize, AbstractRecords.estimateSizeInBytesUpperBound(magic(),
recordsBuilder.compressionType(), key, value, headers));
FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,
timestamp,
key == null ? -1 : key.remaining(),
value == null ? -1 : value.remaining(),
Time.SYSTEM);
// Chain the future to the original thunk.
thunk.future.chain(future);
this.thunks.add(thunk);
this.recordCount++;
return true;
}
}
3.done() 方法
在sender线程将消息发送到kafka后,ProducerBatch需要进行一些善后工作,比如释放资源(例如ByteBuffer)、回调用户自定义的callback等等,这些就是done方法做的事情了。
1)设置发送结果状态ProducerBatch.FinalState,一共有3种结果:ABORTED, FAILED, SUCCEEDED,如果服务端返回了异常,那么设置为FAILED,否则就是SUCCEEDED;
在这里要注意的是:如果FinalState之前已经设置过且是SUCCEEDED,说明是一个已经处理的结果,会抛出非法状态异常
2)将服务端返回的offset、logAppendTime(这个在log类型是createTime时值为-1)设置给future元数据,以便执行future元数据的done方法;
3)回调用户自定义的callback
4)future的done上面也说过就是ProduceRequestResult.done()方法,会唤醒等待在ProduceRequestResult处理结果上的await的线程;
调用关系
KafkaProducer().send(ProducerRecord<K, V> record, Callback callback)
@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
// intercept the record, which can be potentially modified; this method does not throw exceptions
ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
return doSend(interceptedRecord, callback);
}
KafkaProducer().send包含:KafkaProducer().doSend(ProducerRecord<K, V> record, Callback callback)
/**
* Implementation of asynchronously send a record to a topic.
*/
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
TopicPartition tp = null;
try {
throwIfProducerClosed();
// first make sure the metadata for the topic is available
long nowMs = time.milliseconds();
ClusterAndWaitTime clusterAndWaitTime;
try {
clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), nowMs, maxBlockTimeMs);
} catch (KafkaException e) {
if (metadata.isClosed())
throw new KafkaException("Producer closed while send in progress", e);
throw e;
}
nowMs += clusterAndWaitTime.waitedOnMetadataMs;
long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
Cluster cluster = clusterAndWaitTime.cluster;
byte[] serializedKey;
try {
serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
} catch (ClassCastException cce) {
throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
" to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
" specified in key.serializer", cce);
}
byte[] serializedValue;
try {
serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
} catch (ClassCastException cce) {
throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
" to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
" specified in value.serializer", cce);
}
int partition = partition(record, serializedKey, serializedValue, cluster);
tp = new TopicPartition(record.topic(), partition);
setReadOnly(record.headers());
Header[] headers = record.headers().toArray();
int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
compressionType, serializedKey, serializedValue, headers);
ensureValidRecordSize(serializedSize);
long timestamp = record.timestamp() == null ? nowMs : record.timestamp();
if (log.isTraceEnabled()) {
log.trace("Attempting to append record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
}
// producer callback will make sure to call both 'callback' and interceptor callback
Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);
if (transactionManager != null && transactionManager.isTransactional()) {
transactionManager.failIfNotReadyForSend();
}
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs, true, nowMs);
if (result.abortForNewBatch) {
int prevPartition = partition;
partitioner.onNewBatch(record.topic(), cluster, prevPartition);
partition = partition(record, serializedKey, serializedValue, cluster);
tp = new TopicPartition(record.topic(), partition);
if (log.isTraceEnabled()) {
log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", record.topic(), partition, prevPartition);
}
// producer callback will make sure to call both 'callback' and interceptor callback
interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);
result = accumulator.append(tp, timestamp, serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs, false, nowMs);
}
if (transactionManager != null && transactionManager.isTransactional())
transactionManager.maybeAddPartitionToTransaction(tp);
if (result.batchIsFull || result.newBatchCreated) {
log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
this.sender.wakeup();
}
return result.future;
// handling exceptions and record the errors;
// for API exceptions return them in the future,
// for other exceptions throw directly
} catch (ApiException e) {
log.debug("Exception occurred during message send:", e);
if (callback != null)
callback.onCompletion(null, e);
this.errors.record();
this.interceptors.onSendError(record, tp, e);
return new FutureFailure(e);
} catch (InterruptedException e) {
this.errors.record();
this.interceptors.onSendError(record, tp, e);
throw new InterruptException(e);
} catch (KafkaException e) {
this.errors.record();
this.interceptors.onSendError(record, tp, e);
throw e;
} catch (Exception e) {
// we notify interceptor about all exceptions, since onSend is called before anything else in this method
this.interceptors.onSendError(record, tp, e);
throw e;
}
}
KafkaProducer().doSend包含:RecordAccumulater().append(TopicPartition tp, long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, long maxTimeToBlock, boolean abortOnNewBatch, long nowMs)
消息append是Accumulator的核心,将一条消息Record存放到指定TopicPartition的batchs队列,并返回append结果RecordAppendResult。该方法的源码如下:
/**
* Add a record to the accumulator, return the append result
* <p>
* The append result will contain the future metadata, and flag for whether the appended batch is full or a new batch is created
* <p>
*
* @param tp The topic/partition to which this record is being sent
* @param timestamp The timestamp of the record
* @param key The key for the record
* @param value The value for the record
* @param headers the Headers for the record
* @param callback The user-supplied callback to execute when the request is complete
* @param maxTimeToBlock The maximum time in milliseconds to block for buffer memory to be available
* @param abortOnNewBatch A boolean that indicates returning before a new batch is created and
* running the partitioner's onNewBatch method before trying to append again
* @param nowMs The current time, in milliseconds
*/
public RecordAppendResult append(TopicPartition tp,
long timestamp,
byte[] key,
byte[] value,
Header[] headers,
Callback callback,
long maxTimeToBlock,
boolean abortOnNewBatch,
long nowMs) throws InterruptedException {
// We keep track of the number of appending thread to make sure we do not miss batches in
// abortIncompleteBatches().
// 统计当前Accumulator中正在追加消息的执行次数,在方法的执行最后的finally块再进行减1操作;该值越大,说明并发越高、或者block在缓冲区的线程越多,比如可能由于内存没有空间分配,缓冲区的消息没能及时发送到kafka等多种原因;
appendsInProgress.incrementAndGet();
ByteBuffer buffer = null;
if (headers == null) headers = Record.EMPTY_HEADERS;
try {
// check if we have an in-progress batch
// 调用方法getOrCreateDeque获取或者创建topicPartition对应的ProducerBatch队列;
Deque<ProducerBatch> dq = getOrCreateDeque(tp);
// 锁住dq,保证只有一个线程执行tryAppend方法来追加当前消息到数据缓冲区,如果tryAppend成功返回,就直接返回append的结果的封装对象RecordAppendResult;
synchronized (dq) {
if (closed)
throw new KafkaException("Producer closed while send in progress");
// 如果tryAppend失败,比如在当前ProducerBatch空间不足或者队列中还没有可用的ProducerBatch时,如果标示abortOnNewBatch=true,标示放弃创建新的ProducerBatch,直接返回一个“空”的RecordAppendResult;
RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq, nowMs);
if (appendResult != null)
return appendResult;
}
// we don't have an in-progress record batch try to allocate a new batch
if (abortOnNewBatch) {
// Return a result that will cause another call to append.
return new RecordAppendResult(null, false, false, true);
}
byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
// 调用estimateSizeInBytesUpperBound方法是对追加的Record大小进行一个预估,根据上面Record格式的不同,采取的预估值不同,最终取的还是batch.size和预估值的最大值进行内存分配;
int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
log.trace("Allocating a new {} byte message buffer for topic {} partition {} with remaining timeout {}ms", size, tp.topic(), tp.partition(), maxTimeToBlock);
buffer = free.allocate(size, maxTimeToBlock);
// Update the current time in case the buffer allocation blocked above.
nowMs = time.milliseconds();
synchronized (dq) {
// Need to check if producer is closed again after grabbing the dequeue lock.
if (closed)
throw new KafkaException("Producer closed while send in progress");
RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq, nowMs);
if (appendResult != null) {
// Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
return appendResult;
}
// 如果tryAppend成功返回,就直接返回append的结果的封装对象RecordAppendResult,如果tryAppend失败,那么就新创建ProducerBatch,这里就用到了上面的MemoryRecordsBuilder机制来实现RecordBatch缓冲区数据的追加及消息相关元数据的管理;
将新的ProducerBatch添加的batches队列,这样后续的消息就可以使用该ProducerBatch来追加消息了;
MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, nowMs);
FutureRecordMetadata future = Objects.requireNonNull(batch.tryAppend(timestamp, key, value, headers,
callback, nowMs));
dq.addLast(batch);
incomplete.add(batch);
// Don't deallocate this buffer in the finally block as it's being used in the record batch
// 没在finally执行了buffer = null,是因为在finally块如果在buffer不为空的时候会进行释放ByteBuffer内存,正常情况下因为消息成功的加入了缓冲区,不能进行释放,但是如果在执行的过程中发生了异常,没能成功加入缓冲区的情况下,要进行已分配ByteBuffer内存的释放;
buffer = null;
return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true, false);
}
} finally {
if (buffer != null)
free.deallocate(buffer);
appendsInProgress.decrementAndGet();
}
}
RecordAccumulater().append包含:RecordAccumulater().tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, Deque<ProducerBatch> deque, long nowMs)
1)从指定TopicPartition对应的Deque取最后一个ProducerBatch,因为每次append消息都是向队列最后一个缓冲区添加;如果队列中没有ProducerBatch,就直接返回null;然后在上面的append方法中会新创建ProducerBatch添加到队列;
2)调用ProducerBatch.tryAppend方法试着将当前消息追加到最后一个缓冲数据流(ByteBuffer)中;如果返回了null,说明当前缓冲区可能空间不足,那么就关闭当前ProducerBatch的数据写入,等待sender线程的发送;如果追加成功,就返回追加结果封装对象RecordAppendResult;
/**
* Try to append to a ProducerBatch.
*
* If it is full, we return null and a new batch is created. We also close the batch for record appends to free up
* resources like compression buffers. The batch will be fully closed (ie. the record batch headers will be written
* and memory records built) in one of the following cases (whichever comes first): right before send,
* if it is expired, or when the producer is closed.
*/
private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers,
Callback callback, Deque<ProducerBatch> deque, long nowMs) {
ProducerBatch last = deque.peekLast();
if (last != null) {
FutureRecordMetadata future = last.tryAppend(timestamp, key, value, headers, callback, nowMs);
if (future == null)
last.closeForRecordAppends();
else
return new RecordAppendResult(future, deque.size() > 1 || last.isFull(), false, false);
}
return null;
}
RecordAccumulater().tryAppend包含:ProducerBatch().tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, long now)
1)判断当前RecordBatch是否有足够的空间,没有泽直接返回null;
2)在空间足够的情况下,调用MemoryRecordsBuilder的append方法,最终调用就是appendWithOffset方法来追加当前记录到RecoredBatch中;
3)FutureRecordMetadata是作为保存消息发送的Future元数据,在sender线程发送ProducerBatch时记录请求结果相关元数据的;
4)每个Thunk保存着一个Record消息发送的callback和FutureRecordMetadata,所以每次追加消息记录的时候就会创建一个新的Thunk封装用户自定义的callback和FutureRecordMetadata,在sender线程处理结果中进行回调用户的callback和请求的结果处理
/**
* Append the record to the current record set and return the relative offset within that record set
*
* @return The RecordSend corresponding to this record or null if there isn't sufficient room.
*/
public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, long now) {
if (!recordsBuilder.hasRoomFor(timestamp, key, value, headers)) {
return null;
} else {
this.recordsBuilder.append(timestamp, key, value, headers);
this.maxRecordSize = Math.max(this.maxRecordSize, AbstractRecords.estimateSizeInBytesUpperBound(magic(),
recordsBuilder.compressionType(), key, value, headers));
this.lastAppendTime = now;
FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,
timestamp,
key == null ? -1 : key.length,
value == null ? -1 : value.length,
Time.SYSTEM);
// we have to keep every future returned to the users in case the batch needs to be
// split to several new batches and resent.
thunks.add(new Thunk(callback, future));
this.recordCount++;
return future;
}
ProducerBatch().tryAppend调用:MemoryRecordsBuilder().hasRoomFor(long timestamp, byte[] key, byte[] value, Header[] headers)
1)isFull()方法判断当前RecordBatch是否满了;如果满了则不会再追加记录;
2)numRecords是已有的Records数量;
3)然后就是针对V2和非V2格式的消息大小的计算处理;
4)在已有预估大小加上当前记录大小之后小于等于可用剩余空间才会返回true,表示空间足够;
/**
* Check if we have room for a new record containing the given key/value pair. If no records have been
* appended, then this returns true.
*
* Note that the return value is based on the estimate of the bytes written to the compressor, which may not be
* accurate if compression is used. When this happens, the following append may cause dynamic buffer
* re-allocation in the underlying byte buffer stream.
*/
public boolean hasRoomFor(long timestamp, ByteBuffer key, ByteBuffer value, Header[] headers) {
if (isFull())
return false;
// We always allow at least one record to be appended (the ByteBufferOutputStream will grow as needed)
if (numRecords == 0)
return true;
final int recordSize;
if (magic < RecordBatch.MAGIC_VALUE_V2) {
recordSize = Records.LOG_OVERHEAD + LegacyRecord.recordSize(magic, key, value);
} else {
int nextOffsetDelta = lastOffset == null ? 0 : (int) (lastOffset - baseOffset + 1);
long timestampDelta = baseTimestamp == null ? 0 : timestamp - baseTimestamp;
recordSize = DefaultRecord.sizeInBytes(nextOffsetDelta, timestampDelta, key, value, headers);
}
// Be conservative and not take compression of the new record into consideration.
return this.writeLimit >= estimatedBytesWritten() + recordSize;
}
MemoryRecordsBuilder().hasRoomFor调用:MemoryRecordsBuilder().isFull()
1)appendStream上面讲过是ByteBuffer的写入流DataOutputStream;
2)numRecords:当前已写入的Record记录数;
3)writeLimit:RecordBatch的可用剩余空间带下,初始值就是batch.size;
4)estimatedBytesWritten()是预估当前已写入的RecordBatch的总大小:日志头大小与Records大小之和;
public boolean isFull() {
// note that the write limit is respected only after the first record is added which ensures we can always
// create non-empty batches (this is used to disable batching when the producer's batch size is set to 0).
return appendStream == CLOSED_STREAM || (this.numRecords > 0 && this.writeLimit <= estimatedBytesWritten());
}
MemoryRecordsBuilder().isFull()调用:MemoryRecordsBuilder().estimatedBytesWritten()
1)batchHeaderSizeInBytes在V2消息格式下就是RecordBatch结构下非Record(日志头部分)的字节大小,总长度是61(B);
2)uncompressedRecordsSizeInBytes:在每次追加Record之后都会累加的值,表示已写入Record的累加大小;
3)estimatedCompressionRatio:预估的压缩率,目前4种压缩算法的值默认都是1.0;
4)COMPRESSION_RATE_ESTIMATION_FACTOR:预估空间大小时用的一个值,默认1.05,应该是用于将预估值放大已确保追加记录时判断空间是否足够的准确性;
/**
* Get an estimate of the number of bytes written (based on the estimation factor hard-coded in {@link CompressionType}.
* @return The estimated number of bytes written
*/
private int estimatedBytesWritten() {
if (compressionType == CompressionType.NONE) {
return batchHeaderSizeInBytes + uncompressedRecordsSizeInBytes;
} else {
// estimate the written bytes to the underlying byte buffer based on uncompressed written bytes
return batchHeaderSizeInBytes + (int) (uncompressedRecordsSizeInBytes * estimatedCompressionRatio * COMPRESSION_RATE_ESTIMATION_FACTOR);
}
}
ProducerBatch().tryAppend包含:MemoryRecordsBuilder().append(long timestamp, byte[] key, byte[] value, Header[] headers)
MemoryRecordsBuilder().append包含:...
MemoryRecordsBuilder().append包含:MemoryRecordsBuilder().appendWithOffset(long offset, boolean isControlRecord, long timestamp, ByteBuffer key, ByteBuffer value, Header[] headers)
/**
* Append a new record at the given offset.
*/
private void appendWithOffset(long offset, boolean isControlRecord, long timestamp, ByteBuffer key,
ByteBuffer value, Header[] headers) {
try {
if (isControlRecord != isControlBatch)
throw new IllegalArgumentException("Control records can only be appended to control batches");
if (lastOffset != null && offset <= lastOffset)
throw new IllegalArgumentException(String.format("Illegal offset %s following previous offset %s " +
"(Offsets must increase monotonically).", offset, lastOffset));
if (timestamp < 0 && timestamp != RecordBatch.NO_TIMESTAMP)
throw new IllegalArgumentException("Invalid negative timestamp " + timestamp);
if (magic < RecordBatch.MAGIC_VALUE_V2 && headers != null && headers.length > 0)
throw new IllegalArgumentException("Magic v" + magic + " does not support record headers");
if (baseTimestamp == null)
baseTimestamp = timestamp;
if (magic > RecordBatch.MAGIC_VALUE_V1) {
appendDefaultRecord(offset, timestamp, key, value, headers);
} else {
appendLegacyRecord(offset, timestamp, key, value, magic);
}
} catch (IOException e) {
throw new KafkaException("I/O exception when writing to the append stream, closing", e);
}
}
MemoryRecordsBuilder().appendWithOffset包含(V2版本):MemoryRecordsBuilder().appendDefaultRecord(long offset, long timestamp, ByteBuffer key, ByteBuffer value, Header[] headers)
private void appendDefaultRecord(long offset, long timestamp, ByteBuffer key, ByteBuffer value,
Header[] headers) throws IOException {
ensureOpenForRecordAppend();
int offsetDelta = (int) (offset - baseOffset);
long timestampDelta = timestamp - baseTimestamp;
int sizeInBytes = DefaultRecord.writeTo(appendStream, offsetDelta, timestampDelta, key, value, headers);
recordWritten(offset, timestamp, sizeInBytes);
}
高并发和线程安全保证
1 读写分离的设计CopyOnWriteMap
RecordAccumulator类中batches的实例类型是Kafka自定义的类CopyOnWriteMap,实现了接口ConcurrentMap,用读写分离来实现线程安全。
- 线程要修改map内容,就复制一份map,在修改之后,把新的指针赋给map,且put方法是加了 synchronized修饰的,因此同一时间只能有一个线程修改内容。
- 在修改的时候别的线程依然可以读取老的Map。
源码如下:
这个非常适合读多写少的场景。这里扩展两个细节:
1、为什么读多写少呢?
因为每次append消息都会走一次getOrCreateDeque方法来读batches,即使有新的TopicPartition来了,在修改batches时候也能访问,并且消息的数量远远多于TopicPartition数量,因此这个结构很划算。
2、在getOrCreateDeque方法中,将新的TopicPartition放入的时候,为什么使用的是putIfAbsent?
因为this.batches.get(tp)方法调用时,如果多个线程同时拿到了null,那么都会创建ArrayDeque并且放入batches中,用putIfAbsent的话,只要有一个线程创建,就返回已有的Deque;
2 消息追加方法RecordAccumulator.append的线程安全和高并发效率的保证
在append方中,使用了synchronized同步锁来锁住Deque,且使用了两次,这样设计的有何巧妙之处吗?
1)第一个地方:调用getOrCreateDeque(tp)获取或者创建Deque后,对dq进行加锁。
2)第二个地方:在分配了ByteBuffer内存空间之后,再次对dp加锁
分析:这么做既保证了只有一个线程调用tryAppend方法追加消息到缓冲队列,同时由防止了锁住整个batches的性能影响,且又能保证写batches的安全性,这里只锁住了dp,同时在上面的CopyOnWriteMap中提到了该方法不会锁住读,所以也不会影响到其他线程读batches,所以也保证了高并发的效率;还有一点需要提到,这里锁住dp,然后追加消息记录到Deque的最后一个批次缓冲中,除非批次没有足够的空间会再创建ProducerBatch,这样就能高效的利用每一个批次的缓冲区,防止了内存的支离破碎现象;
如何触发sender线程发送
具备发送条件的分区节点
缓冲区的消息在发送给Kafka之前,sender线程会先调用RecordAccumulator.ready方法来获取缓冲区中消息对应的分区中已经准备好的分区Leader节点,以便将消息发送的分区对应的Leader节点上。
那些情况属于准备好了呢?
- RecordBatch 满了
- 消息在RecordBatch中停留的时间超过了linger.ms;
- 消息缓冲区内存不足存在线程等待分配空间;
- RecordBatch写入流关闭了
- 手动执行了KafkaProducer的flush方法:这会触发所有分区的ready来发送消息;
- 当满了上面说的5个条件之一时,设置sendable=true
- 在sendable=true且当前batch不属于重试时,就认为该分区Leader节点已经准备就绪了:
- 否则可能就是sendable条件不满足或者是发送重试的batch,计算下次准备检查的时间nextReadyCheckDelayMs
- 封装ready计算结果对象ReadyCheckResult
ProducerBatch过期处理
如果Sender线程在发送缓冲区的消息时,发现缓冲区的消息停留的时间太长,这些消息的发送也许是没有意义的,也可能是因为Kafka集群问题导致发送超时等等问题引起。消息缓冲区ProducerBatch过期后,对应的appendStream就会关闭不再允许写入数据了,且该ProducerBatch的状态就是Abort了。
这里逻辑比较简单,主要就是过期的条件:linger.ms+request.timeout.ms之和,或者用户自定义的更小的投递超时时间;
ProducerBatch重入队列
ProducerBatch重入队列有两种情况:
1 在Sender线程发送ProducerBatch到kafka后,发生了异常,在可以重试的时候,就将ProducerBatch重新加入队列,等待下次重试的时候再从队列drain;
batch.reenqueued方法更新重入队列的相关时间:
2 Kafka返回“MESSAGE_TOO_LARGE”时,进行batch的拆分和冲入队列。
1)在将bigBatch拆分只是将batches中的Record拆分成多个ProducerBatch,但不会拆具体的Record;
2)bigBatch拆分后,bigBatch自身也就不再支持写数据了;
3)需要注意一点:拆分的batches的内存不由缓冲池来管理,所以这部分内存的释放是在缓冲池外释放的;
总结:Kafka 源码解析 - 消息收集器机制 - 知乎 (zhihu.com)
server篇
Kafka源码分析(三) - Server端 - 消息存储 - 知乎 (zhihu.com)
消费篇
kafka——消费者原理解析 - 简书 (jianshu.com)
Spring kafka源码分析——消息是如何消费的_kafka messaging.handler.annotation.support.message-CSDN博客