1 PooledByteBufAllocator简述
现在来分析池化内存的分配原理。首先找到AbstractByteBufAllocator的子类PooledByteBufAllocator实现分配内存的两个方法:newDirectBuffer和newHeapBuffer方法。
public class PooledByteBufAllocator extends AbstractByteBufAllocator {
@Override
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
PoolThreadCache cache = threadCache.get();
PoolArena<ByteBuffer> directArena = cache.directArena;
ByteBuf buf;
if (directArena != null) {
buf = directArena.allocate(cache, initialCapacity, maxCapacity);
} else {
if (PlatformDependent.hasUnsafe()) {
buf = UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);
} else {
buf = new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
}
}
return toLeakAwareBuffer(buf);
}
@Override
protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
PoolThreadCache cache = threadCache.get();
PoolArena<byte[]> heapArena = cache.heapArena;
ByteBuf buf;
if (heapArena != null) {
buf = heapArena.allocate(cache, initialCapacity, maxCapacity);
} else {
buf = new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity);
}
return toLeakAwareBuffer(buf);
}
}
观察发现,这两个方法大体结构是一样的,以newDirectBuffer为例,简单分析一下。
首先,通过threadCache.get()方法获得一个类型为PoolThreadCache的cache对象,然后,通过cache获得directArena对象;最后,调用directRrena.allocate()方法分配ByteBuf。threadCache对象其实是PoolThreadLocalCache类型的变量,PoolThreadLocalCache的相关代码如下:
final class PoolThreadLocalCache extends FastThreadLocal<PoolThreadCache> {
@Override
protected synchronized PoolThreadCache initialValue() {
final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas);
final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas);
return new PoolThreadCache(
heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize,
DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL);
}
@Override
protected void onRemoval(PoolThreadCache threadCache) {
threadCache.free();
}
private <T> PoolArena<T> leastUsedArena(PoolArena<T>[] arenas) {
if (arenas == null || arenas.length == 0) {
return null;
}
PoolArena<T> minArena = arenas[0];
for (int i = 1; i < arenas.length; i++) {
PoolArena<T> arena = arenas[i];
if (arena.numThreadCaches.get() < minArena.numThreadCaches.get()) {
minArena = arena;
}
}
return minArena;
}
}
从名字来看,会发现PoolThreadLocalCache的initialValue()方法就是用来初始化PoolThreadLocalCache的。首先调用leastUsedArena()方法分别获得类型为PoolArena的heapArena和directArena对象。然后把heapArena和directArena对象作为参数传递到PoolThreadCache的构造器中。那么heapArena和directArena对象是在哪里初始化呢?经过查找,发现是PooledByteBufAllocator的构造方法中调用newArenaArray()方法给heapArena和directArena进行赋值,代码如下:
public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder,
int tinyCacheSize, int smallCacheSize, int normalCacheSize) {
super(preferDirect);
threadCache = new PoolThreadLocalCache();
this.tinyCacheSize = tinyCacheSize;
this.smallCacheSize = smallCacheSize;
this.normalCacheSize = normalCacheSize;
final int chunkSize = validateAndCalculateChunkSize(pageSize, maxOrder);
if (nHeapArena < 0) {
throw new IllegalArgumentException("nHeapArena: " + nHeapArena + " (expected: >= 0)");
}
if (nDirectArena < 0) {
throw new IllegalArgumentException("nDirectArea: " + nDirectArena + " (expected: >= 0)");
}
int pageShifts = validateAndCalculatePageShifts(pageSize);
if (nHeapArena > 0) {
heapArenas = newArenaArray(nHeapArena);
List<PoolArenaMetric> metrics = new ArrayList<PoolArenaMetric>(heapArenas.length);
for (int i = 0; i < heapArenas.length; i ++) {
PoolArena.HeapArena arena = new PoolArena.HeapArena(this, pageSize, maxOrder, pageShifts, chunkSize);
heapArenas[i] = arena;
metrics.add(arena);
}
heapArenaMetrics = Collections.unmodifiableList(metrics);
} else {
heapArenas = null;
heapArenaMetrics = Collections.emptyList();
}
if (nDirectArena > 0) {
directArenas = newArenaArray(nDirectArena);
List<PoolArenaMetric> metrics = new ArrayList<PoolArenaMetric>(directArenas.length);
for (int i = 0; i < directArenas.length; i ++) {
PoolArena.DirectArena arena = new PoolArena.DirectArena(
this, pageSize, maxOrder, pageShifts, chunkSize);
directArenas[i] = arena;
metrics.add(arena);
}
directArenaMetrics = Collections.unmodifiableList(metrics);
} else {
directArenas = null;
directArenaMetrics = Collections.emptyList();
}
}
newArenaArray方法的实现代码如下:
private static <T> PoolArena<T>[] newArenaArray(int size) {
return new PoolArena[size];
}
其实就是创建了一个固定大小的PoolArena数组,数组大小由传入的参数nHeapArena和nDirectArena决定。再回到PooledByteBufAllocator构造器源码,看nHeapArena和nDirectArena是怎么初始化的,nHeapArena和nDirectArena的重载构造器代码如下:
public PooledByteBufAllocator(boolean preferDirect) {
this(preferDirect, DEFAULT_NUM_HEAP_ARENA, DEFAULT_NUM_DIRECT_ARENA, DEFAULT_PAGE_SIZE, DEFAULT_MAX_ORDER);
}
发现nHeapArena和nDirectArena是通过DEFAULT_NUM_HEAP_ARENA, DEFAULT_NUM_DIRECT_ARENA 这两个常量默认赋值的。相关常量的定义代码如下:
final int defaultMinNumArena = runtime.availableProcessors() * 2;
final int defaultChunkSize = DEFAULT_PAGE_SIZE << DEFAULT_MAX_ORDER;
DEFAULT_NUM_HEAP_ARENA = Math.max(0,
SystemPropertyUtil.getInt(
"io.netty.allocator.numHeapArenas",
(int) Math.min(
defaultMinNumArena,
runtime.maxMemory() / defaultChunkSize / 2 / 3)));
DEFAULT_NUM_DIRECT_ARENA = Math.max(0,
SystemPropertyUtil.getInt(
"io.netty.allocator.numDirectArenas",
(int) Math.min(
defaultMinNumArena,
PlatformDependent.maxDirectMemory() / defaultChunkSize / 2 / 3)));
到这里为止,发现nHeapArena和nDirectArena 的默认值就是CPU核数 * 2,也就是把defaultMinNumArena的值赋给nHeapArena和nDirectArena。Netty为什么要这样设计呢?其实,主要目的就是保证Netty中的每一个任务线程都可以有一个独享的Arena,保证在每个线程分配内存的时候不用加锁。
基于上面的分析,直到Arena和heapArena和directArena,这里统称为Arena。假设有四个线程,那么对应会分配四个Arena。在创建ByteBuf的时候,首先通过PoolThreadCache获取Arena对象并赋值给其他成员变量,然后每个线程通过PoolThreadCache调用get方法的时候会获得它底层的Arena,也就是说通过EventLoop1获取Arena1,通过EventLoop2获得Arena2,以此类推,如下图所示:
PoolThreadCache除了可以在Arena上进行内存分配,还可以在它底层维护的ByteBuf缓存列表进行分配。例如:通过PooledByteBufAllocator创建了一个1024字节的ByteBuf,当用完释放后,可能在其他地方会继续分配1024字节的ByteBuf。这是,其实不需要再Arena上进行内存分配,而是直接通过PoolThreadCache中维护的ByteBuf的缓存列表直接拿过来返回。在PooledByteBufAllocator中维护着三种规格大小的缓存列表,分别是三个值tinyCacheSize,smallCacheSize,normalCacheSize,相关代码如下:
public class PooledByteBufAllocator extends AbstractByteBufAllocator {
private static final int DEFAULT_TINY_CACHE_SIZE;
private static final int DEFAULT_SMALL_CACHE_SIZE;
private static final int DEFAULT_NORMAL_CACHE_SIZE;
DEFAULT_TINY_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.tinyCacheSize", 512);
DEFAULT_SMALL_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.smallCacheSize", 256);
DEFAULT_NORMAL_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.normalCacheSize", 64);
public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder) {
this(preferDirect, nHeapArena, nDirectArena, pageSize, maxOrder,
DEFAULT_TINY_CACHE_SIZE, DEFAULT_SMALL_CACHE_SIZE, DEFAULT_NORMAL_CACHE_SIZE);
}
public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder,
int tinyCacheSize, int smallCacheSize, int normalCacheSize) {
super(preferDirect);
threadCache = new PoolThreadLocalCache();
this.tinyCacheSize = tinyCacheSize;
this.smallCacheSize = smallCacheSize;
this.normalCacheSize = normalCacheSize;
final int chunkSize = validateAndCalculateChunkSize(pageSize, maxOrder);
if (nHeapArena < 0) {
throw new IllegalArgumentException("nHeapArena: " + nHeapArena + " (expected: >= 0)");
}
if (nDirectArena < 0) {
throw new IllegalArgumentException("nDirectArea: " + nDirectArena + " (expected: >= 0)");
}
int pageShifts = validateAndCalculatePageShifts(pageSize);
if (nHeapArena > 0) {
heapArenas = newArenaArray(nHeapArena);
List<PoolArenaMetric> metrics = new ArrayList<PoolArenaMetric>(heapArenas.length);
for (int i = 0; i < heapArenas.length; i ++) {
PoolArena.HeapArena arena = new PoolArena.HeapArena(this, pageSize, maxOrder, pageShifts, chunkSize);
heapArenas[i] = arena;
metrics.add(arena);
}
heapArenaMetrics = Collections.unmodifiableList(metrics);
} else {
heapArenas = null;
heapArenaMetrics = Collections.emptyList();
}
if (nDirectArena > 0) {
directArenas = newArenaArray(nDirectArena);
List<PoolArenaMetric> metrics = new ArrayList<PoolArenaMetric>(directArenas.length);
for (int i = 0; i < directArenas.length; i ++) {
PoolArena.DirectArena arena = new PoolArena.DirectArena(
this, pageSize, maxOrder, pageShifts, chunkSize);
directArenas[i] = arena;
metrics.add(arena);
}
directArenaMetrics = Collections.unmodifiableList(metrics);
} else {
directArenas = null;
directArenaMetrics = Collections.emptyList();
}
}
}
可以看到,在PooledByteBufAllocator的构造器中,分别赋值tinyCacheSize = 512,smallCacheSize = 256,normalCacheSize = 64。通过这种方式,Netty预创建了固定规格的内存池,大大提高了内存分配的性能。
2 DirectArena内存分配流程
Arena分配内存的基本流程有三个步骤。
1、优先从对象池里获得PooledByteBuf进行复用。
2、然后在缓存中进行内存分配。
3、最后考虑从内存堆里进行内存分配。
以directBuffer为例,首先来看从对象池里获得PooledByteBuf进行复用的情况,依旧跟进到PooledByteBufAllocator的newDirectBuffer方法,代码如下:
@Override protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) { PoolThreadCache cache = threadCache.get(); PoolArena<ByteBuffer> directArena = cache.directArena; ByteBuf buf; if (directArena != null) { buf = directArena.allocate(cache, initialCapacity, maxCapacity); } else { if (PlatformDependent.hasUnsafe()) { buf = UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity); } else { buf = new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity); } } return toLeakAwareBuffer(buf); }
在进入到PoolArena的allocate方法:
PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) { PooledByteBuf<T> buf = newByteBuf(maxCapacity); allocate(cache, buf, reqCapacity); return buf; }
到了这里其实思路就非常清晰了,首先调用newByteBuf方法获得一个PooledByteBuf对象,然后通过allocate方法在线程私有的PoolThreadCache中分配一块内存,再对buf里面的内存地址之类的值进行初始化。跟进newByteBuf方法,选择DirectArena对象。
@Override protected PooledByteBuf<ByteBuffer> newByteBuf(int maxCapacity) { if (HAS_UNSAFE) { return PooledUnsafeDirectByteBuf.newInstance(maxCapacity); } else { return PooledDirectByteBuf.newInstance(maxCapacity); } }
首先判断是否支持Unsafe,默认情况下一般是支持Unsafe的,继续看PooledUnsafeDirectByteBuf的newInstance方法,代码如下:
final class PooledUnsafeDirectByteBuf extends PooledByteBuf<ByteBuffer> { private static final Recycler<PooledUnsafeDirectByteBuf> RECYCLER = new Recycler<PooledUnsafeDirectByteBuf>() { @Override protected PooledUnsafeDirectByteBuf newObject(Handle<PooledUnsafeDirectByteBuf> handle) { return new PooledUnsafeDirectByteBuf(handle, 0); } }; static PooledUnsafeDirectByteBuf newInstance(int maxCapacity) { PooledUnsafeDirectByteBuf buf = RECYCLER.get(); buf.reuse(maxCapacity); return buf; }}
首先通过RECYCLER(内存回收站)对象的get方法获得一个buf。从上面的代码片段来看,RECYCLER对象实现了一个newObject方法,当回收站里面没有可用的buf时就会创建一个新的buf。因为获得的buf可能是回收站里取出来的,所以复用前需要重置。继续往下看就会调用buf的reuse方法,代码如下:
final void reuse(int maxCapacity) { maxCapacity(maxCapacity); setRefCnt(1); setIndex0(0, 0); discardMarks(); }
reuse方法就是让所有的参数重新归为初始状态。接下来,再回到PoolArena的allocate方法,看看真实的内存是如何分配出来的。buf的内存分配主要有两种情况,分别是从缓存中进行内存分配和从内存堆里进行内存分配。来看一下allocate方法的具体逻辑代码。
private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) { final int normCapacity = normalizeCapacity(reqCapacity); if (isTinyOrSmall(normCapacity)) { // capacity < pageSize int tableIdx; PoolSubpage<T>[] table; boolean tiny = isTiny(normCapacity); if (tiny) { // < 512 if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) { // was able to allocate out of the cache so move on return; } tableIdx = tinyIdx(normCapacity); table = tinySubpagePools; } else { if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) { // was able to allocate out of the cache so move on return; } tableIdx = smallIdx(normCapacity); table = smallSubpagePools; } final PoolSubpage<T> head = table[tableIdx]; /** * Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and * {@link PoolChunk#free(long)} may modify the doubly linked list as well. */ synchronized (head) { final PoolSubpage<T> s = head.next; if (s != head) { assert s.doNotDestroy && s.elemSize == normCapacity; long handle = s.allocate(); assert handle >= 0; s.chunk.initBufWithSubpage(buf, handle, reqCapacity); if (tiny) { allocationsTiny.increment(); } else { allocationsSmall.increment(); } return; } } allocateNormal(buf, reqCapacity, normCapacity); return; } if (normCapacity <= chunkSize) { if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) { // was able to allocate out of the cache so move on return; } allocateNormal(buf, reqCapacity, normCapacity); } else { // Huge allocations are never served via the cache so just call allocateHuge allocateHuge(buf, reqCapacity); } }
这段代码逻辑看上去 非常复杂,主要是判断不同规格大小,从其对应的缓存中获取内存。如果所有规则都不满足,就直接调用allocateHuge()方法进行真实的内存分配。
3 内存池的内存规格
在Netty内存池中主要设置了四种规格大小的内存:tiny指 0~512Byte的规格大小,small指512~8KB的规格,normal指8KB~16MB的规格,huge指16MB以上。为什么Netty会选择这些值作为分界点呢?其实在Netty底层还有一个内存单位的封装,为了更高效地管理内存,避免内存浪费,把每一个区间的内存规格又做了细分。默认情况下,Netty将内存规格划分为四个部分。Netty中所有的内存申请是以Chunk为单位向系统申请的,每个Chunk大小为16MB,后续的所有内存分配都是在这个Chunk里操作。一个Chunk会以Page为单位进行切分,8KB对应一个Page,而一个Chunk被划分为2048个Page。小于8KB的是SubPage。例如,申请的一段内存空间只有1KB,确分配了一个Page,显然另外7KB就会被浪费,所以就继续把Page进行划分,以节省空间。内存规格大小如下图所示:
到此,Netty的内存池缓存管理机制就介绍完了。