一.项目介绍
本项目——高并发内存池,是通过学习并模仿简化 google 的一个开源项目 tcmalloc ,全称 Thread-Caching Malloc,即线程缓存的malloc,模拟实现了一个自己的高并发内存池,用于高效的多线程内存管理,替代系统的内存分配相关的函数(malloc、free)。
二.了解内存池
1.池化技术
所谓“池化技术”,就是在计算机中模仿蓄水池,通过蓄水池,我们不再需要一次一次的打水,而是一次性积攒大量的水来供我们使用。
无论是现实中一次一次的打水,还是计算机中一次又一次的申请资源,都是需要很大的开销的,所以我们提前申请好一大批资源,在使用时调用,使用后返还,这样就能大大提高程序运行效率。
2.内存池
内存池即程序预先从操作系统申请一块足够大内存,此后,当程序中需要申请内存的时候,不是直接向操作系统申请,而是直接从内存池中获取;同理,当程序释放内存的时候,并不真正将内存返回给操作系统,而是返回内存池。当程序退出(或者特定时间)时,内存池才将之前申请的内存真正释放。
本项目所做的高并发内存池出了解决效率问题之外,还解决内存碎片的问题。
何为内存碎片?
在这块地址空间中,先是由四个部分分别占用并占满整个空间,随后 vector 和 list 释放了空间,此时该地址空间剩余了384Byte的空间,但是此时如果申请了大于256Byte的空间,能成功吗?
答案是不能,因为申请空间必须是连续的,很明显上述的两块空间并不连续,这时候,这两块空间就体现出了碎片化的问题。这种碎片化称为外碎片。
3.malloc
在 C/C++ 中我们要动态申请内存都是通过malloc去申请内存,但是我们要知道,实际我们不是直接去堆获取内存的,而malloc就是一个内存池。malloc() 相当于向操作系统“批发”了一块较大的内存空间,然后“零售”给程序用。当全部“售完”或程序有大量的内存需求时,再根据实际需求向操作系统“进货”。
三.整体框架设计
现代很多的开发环境都是多核多线程,在申请内存的场景下,必然存在激烈的锁竞争问题。
所以高并发内存池除了解决性能问题和内存碎片问题之外,还解决多线程下锁竞争问题。
高并发内存池主要由以下三部分构成:
-
thread cache:线程缓存是每个线程独有的,用于小于256KB的内存的分配,线程从这里申请内存不需要加锁,每个线程独享一个cache,这就是这个并发线程池高效的地方。
-
central cache:中心缓存是所有线程所共享,thread cache是按需从central cache中获取的对象。central cache会回收thread cache中的对象,避免一个线程占用了太多的内存而其他线程的内存吃紧,达到内存分配在多个线程中更均衡的按需调度的目的。central cache是存在竞争的,所以从这里取内存对象是需要加锁,首先这里用的是桶锁,其次只有thread cache没有内存对象时才会找central cache,所以这里竞争不会很激烈。
-
page cache:页缓存是在central cache缓存更下一层的缓存,存储的内存是以页为单位存储及分配的,当central cache没有内存对象时,从page cache分配出一定数量的page,并切割成定长大小的小块内存,分配给central cache。当一个span的几个跨度页的对象都回收以后,page cache 会回收central cache满足条件的span对象,并且合并相邻的页,组成更大的页,缓解内存碎片的问题。
也就是说,高并发内存池是由三部分缓存空间共同维护,从而达到高性能的内存分配。
1.thread cache整体设计
thread cache是哈希桶结构,每个桶是一个按桶位置映射大小的内存块对象的自由链表。每个线程都会有一个thread cache对象,这样每个线程在这里获取对象和释放对象时是无锁的。
thread cache基础结构:
class ThreadCache
{
public:
private:
FreeList _freelists[NFREELISTS];//存储每个自由链表的哈希桶
};
// TLS thread local storage 线程自己的静态变量
static _declspec(thread) ThreadCache* pTLSThreadCache = nullptr;
由于 thread cache 未来要被线程调用,但是多个线程在一个进程中会共享资源,这与我们设计的每个线程拥有自己独立的 thread cache 不合,所以我们通过TLS,创建每个线程独立的 thread cache。
自由链表的基础结构:
//获取链表节点的下一个节点
static void*& NextObj(void* obj)
{
return *(void**)obj;
}
//用于管理切分好的内存块的自由链表
class FreeList
{
public:
//获取内存块
void Push(void* obj)
{
assert(obj);
//头插
//*(void**)obj = _freelist;
NextObj(obj) = _freelist;
_freelist = obj;
}
//分发内存块
void* Pop()
{
assert(_freelist);
//头删
void* obj = _freelist;
_freelist = NextObj(obj);
return obj;
}
//判空
bool Empty()
{
return _freelist == nullptr;
}
private:
void* _freelist = nullptr;
};
链表中的每个节点的通过头几个字节构成 void* 指针连接的。
既然是哈希桶,那桶里必然会有很多的自由链表,那么这么多的自由链表,当内存空间被获取时,该选择从哪个自由链表中获取呢???
此处,我们引入“内碎片”的概念,所谓内碎片,就是我分给你了一段空间,但是这段空间你不一定能完全利用,会有部分空间剩余,这些剩余的空间,就称为“内碎片”。
相较于“外碎片”,前者是整个内存空间的剩余,而“内碎片”则是已经借出去的空间的剩余。
为了控制内碎片的浪费,我们需要根据需要的空间大小,来合理划分出哈希桶中自由链表的个数即其分别连接多大空间的块内存。
// 整体控制在最多10%左右的内碎片浪费
// [1,128] 8byte对齐 freelist[0,16)
// [128+1,1024] 16byte对齐 freelist[16,72)
// [1024+1,8*1024] 128byte对齐 freelist[72,128)
// [8*1024+1,64*1024] 1024byte对齐 freelist[128,184)
// [64*1024+1,256*1024] 8*1024byte对齐 freelist[184,208)
我们通过将内碎片的整体的浪费控制在10%左右,根据内存分配空间最大为256KB,将借用的内存大小分为5个范围,然后设定5个对齐规则,将哈希桶总共添加208个自由链表,并将这些自由链表分为5大类。
那么按照这种划分方式,该如何得到该要分配的内存空间在哈希桶中具体的链表下标呢???
所以根据要分配的内存空间的大小,经过下述运算,便能得出结果:
//自由链表桶映射下标计算
static inline size_t _Index(size_t bytes, size_t align_shift)
{
return ((bytes + (1 << align_shift) - 1) >> align_shift) - 1;
}
static inline size_t Index(size_t bytes)
{
assert(bytes <= MAX_BYTES);
// 每个区间有多少个链表
static int group_array[4] = { 16, 56, 56, 56 };
if (bytes <= 128)
return _Index(bytes, 3);//传入2的次幂
else if (bytes <= 1024)
return _Index(bytes - 128, 4) + group_array[0];//下标要加上所在的区间的前边的区间数
else if (bytes <= 8 * 1024)
return _Index(bytes - 1024, 7) + group_array[1] + group_array[0];
else if (bytes <= 64 * 1024)
return _Index(bytes - 8 * 1024, 10) + group_array[2] + group_array[1]
+ group_array[0];
else if (bytes <= 256 * 1024)
return _Index(bytes - 64 * 1024, 13) + group_array[3] +
group_array[2] + group_array[1] + group_array[0];
else
{
assert(false);
return -1;
}
}
通过下述代码,计算要分配的内存空间的对齐大小:
//对齐大小计算
static inline size_t _RoundUp(size_t bytes, size_t align)
{
return ((bytes + align - 1) & ~(align - 1));
}
static inline size_t RoundUp(size_t bytes)
{
if (bytes <= 128)
return _RoundUp(bytes, 8);
else if (bytes <= 1024)
return _RoundUp(bytes, 16);
else if (bytes <= 8 * 1024)
return _RoundUp(bytes, 128);
else if (bytes <= 64 * 1024)
return _RoundUp(bytes, 1024);
else if (bytes <= 256 * 1024)
return _RoundUp(bytes, 8 * 1024);
else
return _RoundUp(bytes, 1 << PAGE_SHIFT);
通过这两段代码,我们就可以实现内存对象的申请和释放,规则如下:
申请内存:
- 当内存申请size<=256KB,先获取到线程本地存储的thread cache对象,计算size映射的哈希桶自由链表下标i。
- 如果自由链表_freeLists[i]中有对象,则直接Pop一个内存对象返回。
- 如果_freeLists[i]中没有对象时,则批量从central cache中获取一定数量的对象,插入到自由链表并返回一个对象。
释放内存:
- 当释放内存小于256k时将内存释放回thread cache,计算size映射自由链表桶位置i,将对象Push到_freeLists[i]。
- 当链表的长度过长,则回收一部分内存对象到central cache。
实际上一开始在 thread cache 中是没有任何空间的,它是通过一次次的向central cache索取内存,并保留在自由链表中,进而一步步壮大自己,经过漫长的空间积累之后,才可能使得空间分配仅在thread cache这部分进行。
但是如果thread cache中保存的空间过多,也应该向central cache归还部分内存。
2.central cache整体设计
central cache也是一个哈希桶结构,他的哈希桶的映射关系跟thread cache是一样的,这样当thread cache 需要从central cache 中获取对象时,直接就可以更容易的从相同映射的自由链表中获取。
不同的是他的每个哈希桶位置挂是SpanList链表结构,不过每个映射桶下面的span中的大内存块被按映射关系切成了一个个小内存块对象挂在span的自由链表中。
申请内存:
- 当thread cache中没有内存时,就会批量向central cache申请一些内存对象,central cache也有一个哈希映射的spanlist,spanlist中挂着span,从span中取出对象给thread cache,这个过程是需要加锁的,不过这里使用的是一个桶锁,即每个spanlist拥有单独的锁,尽可能提高效率。
- central cache映射的spanlist中所有span的都没有内存以后,则需要向page cache申请一个新的span对象,拿到span以后将span管理的内存按大小切好作为自由链表链接到一起。然后从 span 中取对象给thread cache。
- central cache的中挂的span中存在变量use_count,记录分配了多少个对象出去,分配一个对象给thread cache,就++use_count。
释放内存:
- 当thread_cache过长或者线程销毁,则会将内存释放回central cache中的,释放回来时-- use_count。当use_count减到0时则表示所有对象都回到了span,则将span释放回page cache, page cache中会对前后相邻的空闲页进行合并,以此来解决“外碎片”问题。
central cache基础结构:
//饿汉单例模式
class CentralCache
{
public:
static CentralCache* GetInstance()
{
return &_sInet;
}
private:
SpanList _spanlists[NFREELISTS];//CentralCache哈希桶
private:
CentralCache() {}
CentralCache(const CentralCache&) = delete;
static CentralCache _sInet;
};
由于central cache在我们的设计中只存在一个,所以我们将其设计为单例模式,只能创建出一个实例供多线程调用。
Span基础结构:
// 管理多个连续页大块内存跨度结构
struct Span
{
PageID _pageId = 0;//大块内存起始页的页号
size_t _n = 0;//页的数量
Span* _next = nullptr;//双向链表结构
Span* _prev = nullptr;
size_t _objSize = 0; //切好的小对象的大小
size_t _useCount = 0;//切好小块内存,被分配给thread cache的计数
void* _freelist = nullptr;//切好的小块内存的自由链表
bool _isUse = false;//该span是否被使用
};
SpanList基础结构:
class SpanList
{
public:
//初始化
SpanList()
{
_head = new Span;
_head->_next = _head;
_head->_prev = _head;
}
//插入
void Insert(Span* pos, Span* newspan)
{
assert(pos);
assert(newspan);
Span* prev = pos->_prev;
prev->_next = newspan;
newspan->_prev = prev;
newspan->_next = pos;
pos->_prev = newspan;
}
//删除
void Erase(Span* pos)
{
assert(pos);
assert(pos != _head);
Span* prev = pos->_prev;
prev->_next = pos->_next;
pos->_next->_prev = prev;
}
private:
Span* _head;
public:
std::mutex _mtx;//桶锁
};
3.page cache整体设计
page cache也是哈希桶结构,但是其映射关系不再是内存大小,而是页。每个页存放的仍然是spanlist链表,但是每个span节点不再拥有小对象,等上层central cache拿走之后自己进行切分。
从1页开始,一直到128页。 最大内存空间为512KB,即为了防止当上层要申请256KB的内存时,可以多给一个备用。
申请内存:
- 当central cache向page cache申请内存时,page cache先检查对应位置有没有span,如果没有 则向更大页寻找一个span,如果找到则分裂成两个。比如:申请的是4页page,4页page后面没有挂span,则向后面寻找更大的span,假设在10页page位置找到一个span,则将10页page span分裂为一个4页page span和一个6页page span,4页的span交给上层,6页的span则继续挂在6页链表中。
- 如果找到_spanList[128]都没有合适的span,则向系统使用mmap、brk或者是VirtualAlloc等方式 申请128页page span挂在自由链表中,再重复上述过程。
释放内存:
- 如果central cache释放回一个span,则依次寻找span的前后page id的没有在使用的空闲span,看是否可以合并,如果合并继续向前寻找。这样就可以将切小的内存合并收缩成大的span,减少内存碎片。
page cache基础结构:
class PageCache
{
public:
static PageCache* GetInstance()
{
return &_sInst;
}
private:
SpanList _spanlist[NPAGES];
std::unordered_map<PageID, Span*> _idSpanMap;//存放Pageid 和 span映射的map
PageCache(){}
PageCache(const PageCache&) = delete;
static PageCache _sInst;
public:
std::mutex _mtx;
};
page cache 同样只有一个,所以也将其设计为单例模式。
四.申请内存完整逻辑
thread cache 作为第一资源空间,线程首先肯定是要向其申请空间,代码如下:
// 向thread cache申请内存对象
void* ThreadCache::Allocate(size_t size)
{
assert(size <= MAX_BYTES);
size_t alignSize = SizeClass::RoundUp(size);//获取偏移量
size_t index = SizeClass::Index(size);//获取链表下标
if (!_freelists[index].Empty())//不为空,直接从头部获取
{
return _freelists[index].Pop();
}
else//为空,从下一层central cache索取空间
{
return FetchFromCentralCache(index, alignSize);
}
}
得到所需空间大小的偏移量,及其索取的空间所在的桶的下标,判断当前桶是否有小对象,有则直接返回头部的小对象,没有则向下层 centeal cache 索取空间:
// 从中心缓存获取对象
void* ThreadCache::FetchFromCentralCache(size_t index, size_t size)
{
//慢开始反馈调节
//1.最开始不会向CentralCache一次要太多的小对象,因为可能用不完
//2.如果你不要这个size的大小内存要求,那么batchNum就会不断增长,直到上限
//3.size越大,一次向CentralCache要的batchNum就越小
//4.size越小,一次向CentralCache要的batchNum就越大
size_t batchNum = min(_freelists[index].MaxSize(),SizeClass::NumMoveSize(size));
if (batchNum == _freelists[index].MaxSize())
{
_freelists[index].MaxSize() += 1;
}
void* start = nullptr;
void* end = nullptr;
//从CentralCache中获取 span,并返回其数量
size_t actualNum = CentralCache::GetInstance()->FetchRangeObj(start, end, batchNum, size);
assert(actualNum >= 1);
if (actualNum == 1)//只获取到一个,直接返回
{
assert(start == end);
return start;
}
else//获取到多个,返回第一个,并将剩余的插入自由链表
{
_freelists[index].PushRange(NextObj(start), end);
return start;
}
}
这里存在一个问题:到底要向central cache 获取多少小对象呢???
引入一个新的函数:
// 一次从central cache获取多少个小对象
static size_t NumMoveSize(size_t size)
{
// [2, 512],一次批量移动多少个对象的(慢启动)上限值
// 小对象一次批量上限高
// 大对象一次批量上限低
int num = MAX_BYTES / size;
if (num < 2)
num = 2;
if (num > 512)
num = 512;
return num;
}
通过该函数,根据所需空间的大小,来计算要为其分配多少个小对象合适。所需空间越小,就为其分配越多的小对象,反之则分配的越少,而分配的数量限制在 [2,512] 之间。
但是实际上,在前几次为其分配空间时,也不能真的就为其分配512个小对象,这样很可能导致空间用不完,所以我们采用慢开始的方式,为其分配的空间数量根据该桶中内存的流动情况(即其最大内存数量)来更合理的为其分配空间。
计算出需要为其分配多少个小对象之后,我们就需要开始向 central cache 索取了:
// 从中心缓存获取一定数量的对象给thread cache
size_t CentralCache::FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size)
{
//得到小对象所处的自由链表下表
size_t index = SizeClass::Index(size);
_spanlists[index]._mtx.lock();
//从自由链表中获取span
Span* span = GetOneSpan(_spanlists[index], size);
assert(span);
assert(span->_freelist);
//从span中获取batchNum个小对象
//如果数量够,直接返回对应数量,如果数量不够,则全部返回
start = span->_freelist;
end = start;
size_t actualNum = 1;
size_t i = 0;
while (i < batchNum - 1 && NextObj(end) != nullptr)
{
end = NextObj(end);
i++;
actualNum++;
}
span->_freelist = NextObj(end);
NextObj(end) = nullptr;
span->_useCount += actualNum;
_spanlists[index]._mtx.unlock();
return actualNum;
}
在向central cache索取空间时,必须加锁,防止多线程争抢资源而引发错乱。
首先仍然要确定,你要索取的小对象所处的桶的下标index,随后从链表中获取一个span,(获取span的逻辑我们随后在分析),判断当前span中的小对象的数量是否满足,数量够,则给出对应的数量即可,数量不够,则全部给出,这些对象通过start和end两个指针管理,返回给上层函数,同时将span的_useCount += actualNum,记录小对象被使用的数量。最后返回实际给出的小对象的个数。
上层函数 ThreadCache::FetchFromCentralCache 得到小对象之后,将第一个小对象给到 thread cache ,剩余的对象则插入到对应的自由链表中备用。
下面来看如何从central cache桶中获取一个span:
// 获取一个非空的span
Span* CentralCache::GetOneSpan(SpanList& list, size_t byte_size)
{
//查看当前链表中是否有未分配的span
Span* it = list.Begin();
while (it != list.End())
{
if (it->_freelist != nullptr)
return it;
else
it = it->_next;
}
//到这里说明没有可以使用的span,所以多个线程就算都进来也无法竞争
//解锁可以让其他释放内存的线程进来
list._mtx.unlock();
//当前链表中没有未分配的span,向page cache获取span,需要加锁
PageCache::GetInstance()->_mtx.lock();
Span* span = PageCache::GetInstance()->NewSpan(SizeClass::NumMovePage(byte_size));
span->_isUse = true;
PageCache::GetInstance()->_mtx.unlock();
//到这里获取到新的span,但是由于还没有插入到自由链表中,其他线程无法获取该span,所以仍无需加锁
//计算span的大块内存的起始地址和大块内存的大小(字节数)
char* start = (char*)(span->_pageId << PAGE_SHIFT);
size_t bytes = span->_n << PAGE_SHIFT;
char* end = start + bytes;
//切分span大块内存成小块用自由链表链接起来
//1.先切下来一块做头部,方便进行尾插
span->_freelist = start;
start += byte_size;
void* tail = span->_freelist;
//2.切块进行尾插
while (start < end)
{
NextObj(tail) = start;
tail = start;
start += byte_size;
}
NextObj(tail) = nullptr;
//切好span后,需要将span挂到桶里,此时需要进行加锁
list._mtx.lock();
list.PushFront(span);
return span;
}
通过循环遍历该span链表中是否还有拥有小对象的span,有则直接返回,没有的话,则需要进一步从下层 page cache 获取span。(如何向下层获取span,我们下文再分析),如果已经向下文得到了一个span,因为page cache给出的是一整块页内存,所以要将这些内存进行切块。
先通过该块内存的页id,计算出其起始地址,通过start,end两个指针,将整个大块内存进行切分并链接成自由链表,随后将该span插入桶中,并返回。
下面来看,如何向 page cache获取span:
由于page cache的桶的映射关系是页,所以你必须告诉我你需要获取一个几页的span:
// 计算一次向系统获取几个页
// 单个对象 8byte
// ...
// 单个对象 256KB
static size_t NumMovePage(size_t size)
{
size_t num = NumMoveSize(size);
size_t npage = num * size;
npage >>= PAGE_SHIFT;
if (npage == 0)
npage = 1;
return npage;
}
通过上述计算方法,根据所需数据的大小,计算出需要多少页, 并返回,当然就算是最小的数据,索取时也必须给出一页内存。
// 获取一个K页的span
Span* PageCache::NewSpan(size_t k)
{
assert(k > 0 && k < NPAGES);
//先检查第k个桶中是否存在span
if (!_spanlist[k].Empty())
{
Span* kSpan = _spanlist[k].PopFront();
//建立pageid 和 span 之间的映射关系,方便central cache回收小块内存时,查找对应的span
for (PageID i = 0; i < kSpan->_n; i++)
{
_idSpanMap[kSpan->_pageId + i] = kSpan;
}
return kSpan;
}
//检查后边的桶中有没有span,有就进行切分
for (int i = k + 1; i < NPAGES; i++)
{
if (!_spanlist[i].Empty())
{
Span* nSpan = _spanlist[i].PopFront();
Span* kSpan = new Span;
//在nSpan的头部切下k页,给到kSpan
// 返回kSpan
//nSpan剩余的页数,插入对应的自由链表中
kSpan->_pageId = nSpan->_pageId;
kSpan->_n = k;
//nSpan头部被切去,其新页id和页数量要发生变化
nSpan->_pageId += k;
nSpan->_n -= k;
//剩余的nSpan的首尾也需要建立 pageid 和 span 的映射关系
_idSpanMap[nSpan->_pageId] = nSpan;
_idSpanMap[nSpan->_pageId + nSpan->_n - 1] = nSpan;
_spanlist[nSpan->_n].PushFront(nSpan);
//建立pageid 和 span 之间的映射关系,方便central cache回收小块内存时,查找对应的span
for (PageID i = 0; i < kSpan->_n; i++)
{
_idSpanMap[kSpan->_pageId + i] = kSpan;
}
return kSpan;
}
}
//整个page cache都没有Span,需要向堆中获取128page的span
Span* bigSpan = new Span;
void* ptr = SystemAlloc(NPAGES - 1);
bigSpan->_pageId = (PageID)ptr >> PAGE_SHIFT;
bigSpan->_n = NPAGES - 1;
//将获得的128page插入对应的桶中
_spanlist[bigSpan->_n].PushFront(bigSpan);
//递归调用函数获取k页的span
return NewSpan(k);
}
首先判断出,当前所需页对应的桶中是否存在span,不存在,则循环查询其后边的桶,查看其中有没有span,如果有,则将该span进行拆分,返回所需的页span,将剩余的span重新插入对应的桶中。切分之后,我们需要在map中建立页id和span的映射关系,方便后续释放内存进行。
如果后续的所有page cache桶都不存在span了,则就需要从系统的堆空间中获取一个128页的span,将该span直接插入桶中,随后采用递归的方式,重新获取span并返回。
向堆索取空间如下:
//向堆获取内存空间
inline static void* SystemAlloc(size_t kpage)
{
#ifdef _WIN32
void* ptr = VirtualAlloc(0, kpage << PAGE_SHIFT, MEM_COMMIT | MEM_RESERVE,
PAGE_READWRITE);
#else
// linux下brk mmap等
#endif
if (ptr == nullptr)
throw std::bad_alloc();
return ptr;
}
根据系统的不同,采用不同的函数获取。
以上,就是该高并发内存池的整个申请内存的逻辑,能够看出,程序一开始运行时,整个cache是没有任何内存空间的,必须通过上层逐步的向下层索取并积累,最终才能实现自给自足。
五.释放内存完整逻辑
当一个对象要被释放时,要将该对象重新插入到 thread cache 桶中:
// 向thread cache释放内存对象
void ThreadCache::Deallocate(void* ptr, size_t size)
{
assert(ptr);
assert(size <= MAX_BYTES);
//得到要返回的桶的自由链表,并插入进去
size_t index = SizeClass::Index(size);
_freelists[index].Push(ptr);
//当链表长度大于一次申请的内存时,就开始返还一段list给central cache
if (_freelists[index].Size() >= _freelists[index].MaxSize())
{
ListTooLong(_freelists[index], size);
}
}
我们规定,当该桶中对象的个数,大于等于该桶的最大对象数量时,就将这一段 list 返回给 central cache :
// 释放对象时,链表过长时,回收内存回到central cache
void ThreadCache::ListTooLong(FreeList& list, size_t size)
{
void* start = nullptr;
void* end = nullptr;
list.PopRange(start, end, list.MaxSize());
// 将一定数量的对象释放到central cache span跨度
CentralCache::GetInstance()->ReleaseListToSpans(start, size);
}
首先要将该段链表从桶中删除,而后得到其起始地址start:
//删除一段链表
void PopRange(void*& start, void*& end, size_t n)
{
assert(n <= _size);
start = _freelist;
end = start;
for (size_t i = 0; i < n - 1; i++)
{
end = NextObj(end);
}
_freelist = NextObj(end);
NextObj(end) = nullptr;
_size -= n;
}
通过起始地址及其每个小对象的大小,将该段内存释放为 central cache :
// 将一定数量的对象释放到span跨度
void CentralCache::ReleaseListToSpans(void* start, size_t byte_size)
{
//确定要归还到的是哪个桶
size_t index = SizeClass::Index(byte_size);
_spanlists[index]._mtx.lock();
//循环找到各个小对象对应的span,并将其归还
while (start)
{
void* next = NextObj(start);
//根据pageid找到要归还的是桶中的哪一个span
Span* span = PageCache::GetInstance()->MapObjectToSpan(start);
//头插进span
NextObj(start) = span->_freelist;
span->_freelist = start;
span->_useCount--;
//如果该span所有的小对象都回来了,则将该span返还给page cache
if (span->_useCount == 0)
{
_spanlists[index].Erase(span);
span->_freelist = nullptr;
span->_next = nullptr;
span->_prev = nullptr;
//释放该span给page cache 该span不会再被竞争,可以释放锁
_spanlists[index]._mtx.unlock();
//归还span,使用Page cache锁
PageCache::GetInstance()->_mtx.lock();
PageCache::GetInstance()->ReleaseSpanToPageCache(span);
PageCache::GetInstance()->_mtx.unlock();
//span归还之后,需要重新加锁
_spanlists[index]._mtx.lock();
}
start = next;
}
_spanlists[index]._mtx.unlock();
}
首先我们仍然要根据小对象的大小,计算出其所要返回的桶下标,随后,因为每个小对象都是由span切分而来的,所以我们希望让每个小对象返回时都能挂在其一开始所在的 span ,所以我们需要知道这些小对象到底来自哪个span:
// 获取从对象到span的映射
Span* PageCache::MapObjectToSpan(void* obj)
{
PageID id = (PageID)obj >> PAGE_SHIFT;
std::unique_lock<std::mutex> lock(_mtx);
auto ret = _idSpanMap.find(id);
if (ret != _idSpanMap.end())
return ret->second;
else
{
assert(false);
return nullptr;
}
}
通过小对象的起始地址,能够计算出其所在的页的id,通过map,找到其对应的span并返回。
在寻找span时,也需要对其进行加锁,防止发生前一个线程还没找到,后一个线程就将其释放的情况发生。
由于每个小对象都可能来自不同的span,所以需要一个一个的循环寻找, 找到对应的span之后,将小对象头插进该span,此时,当该 span的_useCount == 0 时,说明此时该 span 中的所有小对象都回来了,该 span 很有可能短期内不在被使用,所以需要将其整个释放给 page cache :
// 释放空闲span回到Pagecache,并合并相邻的span
void PageCache::ReleaseSpanToPageCache(Span* span)
{
//对span的前页尝试进行合并,缓解内存碎片问题
while (1)
{
PageID previd = span->_pageId - 1;
auto ret = _idSpanMap.find(previd);
//前边的页号没有,不合并
if (ret == _idSpanMap.end())
break;
//前边相邻页在使用,不能合并
Span* prevSpan = ret->second;
if (prevSpan->_isUse)
break;
//两块页数相加大于128,不合并
if (span->_n + prevSpan->_n > NPAGES - 1)
break;
//排除上述所有情况,可以进行合并
span->_pageId = prevSpan->_pageId;
span->_n += prevSpan->_n;
//将前页从桶中删除
_spanlist[prevSpan->_n].Erase(prevSpan);
delete prevSpan;
}
//对span的后页尝试进行合并,缓解内存碎片问题
while (1)
{
PageID nextid = span->_pageId + span->_n;
auto ret = _idSpanMap.find(nextid);
//后边的页号没有,不合并
if (ret == _idSpanMap.end())
break;
//后边相邻页在使用,不能合并
Span* nextSpan = ret->second;
if (nextSpan->_isUse)
break;
//两块页数相加大于128,不合并
if (span->_n + nextSpan->_n > NPAGES - 1)
break;
//排除上述所有情况,可以进行合并
span->_n += nextSpan->_n;
//将后页从桶中删除
_spanlist[nextSpan->_n].Erase(nextSpan);
delete nextSpan;
}
//将合并好的span插入桶中,并将其首尾页在map中映射
_spanlist[span->_n].PushFront(span);
span->_isUse = false;
_idSpanMap[span->_pageId] = span;
_idSpanMap[span->_pageId + span->_n - 1] = span;
}
span 回来之后, 由于其本身是一个页切分后的产物,所以为了解决内存碎片问题,自然是要将该页与其相邻的前后页进行合并。
首先根据 Pageid 与 span的映射关系,得到该 span 对应的pageid,该 pageid - 1,就得到了其前一页的id,在通过map映射,查看该页id是否存在,不存在就不能合并;
如果存在,则需要根据_isUse查看该页是否正在被使用,如果正在被使用,不能合并;
如果没有被使用,则还需计算这两个页的页数相加是否大于该哈希桶中所能存放的最大的页数量,超过数量,不能合并;
不超过,则所有条件满足,开始进行合并,合并也非常简单,因为这两个页如果能合并,说明它们在物理内存中本身就是一段连续内存,所以直接让 span 继承其相邻前页的起始页id,在加上其页数量,这样span就成为两个页合并后的新页,再将指向前页的指针删除即可。
向后合并和向前合并的逻辑基本一致,就不在详细分析,由于页的合并不仅仅只是进行一次,合并之后,仍然可以继续向前或向后寻找,所以设置为循环合并,直到不能合并才退出循环。
合并之后,需要将新页插入到其对应的新桶中,并记录 pageid 和 span 的映射。
以上就是高并发内存池释放内存的整个逻辑。当内存释放到page cache后,并不需要在将内存还给堆,因为这些内存需要在 page cache 中积攒,而且当进程退出时,也会自动释放这些内存。
六.获取更大内存
在一开始,我们就规定,该高并发内存池主要服务于所需内存小于等于256KB的用户,但是如果所需内存超过256KB,又该如何申请和释放内存呢???
1.申请内存
如果内存超过256KB,我们就无法通过thread cache和 central cache来获取内存,而应直接向page cache索要一个span,因为256KB是32页,而我们设计的page cache的最大页数为128。
static void* ConcurrentAlloc(size_t size)
{
//所需空间大于内存池规定的最大分配空间
if (size > MAX_BYTES)
{
size_t alignsize = SizeClass::RoundUp(size);
size_t kpage = alignsize >> PAGE_SHIFT;
PageCache::GetInstance()->_mtx.lock();
Span* span = PageCache::GetInstance()->NewSpan(kpage);
PageCache::GetInstance()->_mtx.unlock();
void* ptr = (void*)(span->_pageId << PAGE_SHIFT);
return ptr;
}
else//所需空间不大于内存池规定的最大分配空间
{
//...
}
}
大于256KB的内存,以页作为其偏移量,通过计算该内存的偏移量,就能够计算出其页数量,进而向page cache索取span。
此时,我们还需面临一个问题,那就是如果该内存大于128页,甚至超过了page cache的最大内存限制,该怎么办???
很简单,因为这种内存申请情况一般是很少很少的情况,所以直接向堆申请空间即可:
// 获取一个K页的span
Span* PageCache::NewSpan(size_t k)
{
assert(k > 0);
//如果所需页数大于128,则直接向堆获取
if (k > NPAGES - 1)
{
void* ptr = SystemAlloc(k);
Span* span = new Span;
span->_pageId = (PageID)ptr >> PAGE_SHIFT;
span->_n = k;
_idSpanMap[span->_pageId] = span;
return span;
}
//...
}
申请出span之后,仍要将其 pageid 和 span 进行映射,来方便后续的内存释放。
2.释放内存
释放内存就不多说什么了,直接看代码:
static void Concurrentfree(void *ptr, size_t size)
{
//要释放的空间大于内存池的限定最大空间
if (size > MAX_BYTES)
{
Span* span = PageCache::GetInstance()->MapObjectToSpan(ptr);
PageCache::GetInstance()->_mtx.lock();
PageCache::GetInstance()->ReleaseSpanToPageCache(span);
PageCache::GetInstance()->_mtx.unlock();
}
else
{
//...
}
}
先通过map映射获取到span,随后进行释放,当然,在释放内存时,如果该span的页数量大于128页,则直接调用系统函数将其释放回堆空间:
// 释放空闲span回到Pagecache,并合并相邻的span
void PageCache::ReleaseSpanToPageCache(Span* span)
{
//如果要释放的空间大于128页
if (span->_n > NPAGES - 1)
{
void* ptr = (void*)(span->_pageId << PAGE_SHIFT);
SystemFree(ptr);
delete span;
return;
}
//...
}
以上,就是当所需内存大于256KB时,如何实现其申请与释放。
七.进一步优化
1.脱离使用new
要知道,该高并发内存池设计的初衷是实现申请内存的效率比malloc更高效的tcmalloc,那么就不能在代码中出现任何有关malloc的痕迹,包括new对象,因为new的底层也是malloc。
那么该如何取代new呢???这里设计出一个定长内存池,用于取代new的使用:
template<class T>
class ObjectPool
{
public:
T* New()
{
T* obj = nullptr;
//优先把还回来的内存对象,再次重复利用
if (_freelist)
{
void* next = *((void**)_freelist);
obj = (T*)_freelist;
_freelist = next;
}
else
{
//剩余内存不够一个对象,重新开辟大块空间
if (_remainBytes < sizeof(T))
{
_remainBytes = 128 * 1024;
//_memory = (char*)malloc(_remainBytes);
_memory = (char*)SystemAlloc(_remainBytes >> 13);
if (_memory == nullptr)
{
throw std::bad_alloc();
}
}
obj = (T*)_memory;
//如果调用的空间不够存储链表指针
size_t objSize = sizeof(T) < sizeof(void*) ? sizeof(void*) : sizeof(T);
_memory += objSize;
_remainBytes -= objSize;
}
//定位new,显示调用T的构造函数初始化
new(obj)T;
return obj;
}
//回收空间,将空间头插进链表
void Delete(T* obj)
{
//显示调用T的析构函数清理对象
obj->~T();
//头插
*(void**)obj = _freelist;
_freelist = obj;
}
private:
char* _memory = nullptr;//指向大内存块的指针
size_t _remainBytes = 0;//大内存块剩余的大小
void* _freelist = nullptr;//链接返还内存块的自由链表头指针
};
该定长内存池的设计,可以说是高并发内存池的一个前身,通过直接获取一个大块内存,然后分块进行申请释放,用链表来管理返还回来的块内存。
使用该定长内存池来替换new的使用,就可以进一步提高效率。
2.释放内存无需传内存大小
在前边分享释放内存的代码时,都需要传入要释放的内存的大小,显得有些繁琐且不舒适,获取内存时,传入要获取的内存大小这是必然的,但是释放内存,能否就不传入内存的大小呢???
很简单,因为从头到尾,大大小小的内存都和span有关,所以只需在span结构体中加入要分配的内存的大小这一字段,当需要得到内存大小时,直接从span中获取即可:
size_t _objSize = 0; //切好的小对象的大小
随后,修改部分代码,在获取一个新span时,记录其要分配的内存大小,释放内存时,也从span中获取内存大小。
static void Concurrentfree(void *ptr)
{
//要释放的空间大于内存池的限定最大空间
Span* span = PageCache::GetInstance()->MapObjectToSpan(ptr);
size_t size = span->_objSize;
if (size > MAX_BYTES)
{
PageCache::GetInstance()->_mtx.lock();
PageCache::GetInstance()->ReleaseSpanToPageCache(span);
PageCache::GetInstance()->_mtx.unlock();
}
else
{
assert(pTLSThreadCache);
pTLSThreadCache->Deallocate(ptr, size);
}
}
八.性能分析
// 单轮次申请释放次数 线程数 轮次
void BenchmarkConcurrentMalloc(size_t ntimes, size_t nworks, size_t rounds)
{
std::vector<std::thread> vthread(nworks);
std::atomic<size_t> malloc_costtime = 0;
std::atomic<size_t> free_costtime = 0;
for (size_t k = 0; k < nworks; ++k)
{
vthread[k] = std::thread([&]() {
std::vector<void*> v;
v.reserve(ntimes);
for (size_t j = 0; j < rounds; ++j)
{
size_t begin1 = clock();
for (size_t i = 0; i < ntimes; i++)
{
//v.push_back(ConcurrentAlloc(16));
v.push_back(ConcurrentAlloc((16 + i) % 8192 + 1));
}
size_t end1 = clock();
size_t begin2 = clock();
for (size_t i = 0; i < ntimes; i++)
{
ConcurrentFree(v[i]);
}
size_t end2 = clock();
v.clear();
malloc_costtime += (end1 - begin1);
free_costtime += (end2 - begin2);
}
});
}
for (auto& t : vthread)
{
t.join();
}
printf("%u个线程并发执行%u轮次,每轮次concurrent alloc %u次: 花费:%u ms\n",
nworks, rounds, ntimes, malloc_costtime.load());
printf("%u个线程并发执行%u轮次,每轮次concurrent dealloc %u次: 花费:%u ms\n",
nworks, rounds, ntimes, free_costtime.load());
printf("%u个线程并发concurrent alloc&dealloc %u次,总计花费:%u ms\n",
nworks, nworks * rounds * ntimes, malloc_costtime.load() + free_costtime.load());
}
int main()
{
size_t n = 10000;
BenchmarkConcurrentMalloc(n, 4, 10);
return 0;
}
上述代码是一个针对于高并发线程池申请和释放代码的性能的一个测试代码,其中用到了atomic类,用于原子性的计算时间,防止多个线程并发导致时间计算出错的情况。
将高并发内存池与malloc对比,能够发现性能确实提升了不少:
1.寻找性能瓶颈
上述代码测试的是4个线程并发执行10轮,每轮申请或释放内存10000次的性能情况。
对该测试代码进行性能分析,寻找性能瓶颈,得到如下结果:
能够看出整个代码运行过程中,释放内存所占用的时间最多,进一步深入查看释放内存的逻辑,能够得出如下结果:
在执行通过map隐射找到对应的span时,对该过程进行加锁解锁消耗了巨大的时间。
2.针对性能瓶颈进一步优化
在tcmalloc源码中,大佬们使用了基数树来存放pageid和span的映射关系,基数树与unordered_map的结构不同,基数树使用了一种类似于指针数组的树形存储结构,通过指针连接各个节点,每个节点存储一个键值对,它的结构是固定的,并不会像哈希表结构在插入和删除操作时可能会发生变化(如扩容、重新哈希等),因此基数树可以实现读写分离,在多个线程并发访问时,读操作就不用加锁了。
基数树也是一种分层结构,每一层划分一定的键值范围。当基数树为高并发内存池服务时,通常将其设计为三种结构,分别是一层,二层,三层三种结构,其中一层、二层结构适用于32位系统下的高并发内存池,三层则适用于64位系统下的高并发内存池。
下面仅解析一层、二层结构的基数树。
基数树代码来源于tcmalloc源码,我们将其部分进行修改以服务于高并发内存池。
(1)一层基数树
// Single-level array
template <int BITS>
class TCMalloc_PageMap1 {
private:
static const int LENGTH = 1 << BITS;
void** array_;
public:
typedef uintptr_t Number;
//explicit TCMalloc_PageMap1(void* (*allocator)(size_t))
explicit TCMalloc_PageMap1()
{
//array_ = reinterpret_cast<void**>((*allocator)(sizeof(void*) << BITS));
size_t size = sizeof(void*) << BITS;
size_t alignSize = SizeClass::_RoundUp(size, 1 << PAGE_SHIFT);
array_ = (void**)SystemAlloc(alignSize >> PAGE_SHIFT);
memset(array_, 0, sizeof(void*) << BITS);
}
// Return the current value for KEY. Returns NULL if not yet set,
// or if k is out of range.
void* get(Number k) const
{
if ((k >> BITS) > 0)
{
return NULL;
}
return array_[k];
}
// REQUIRES "k" is in range "[0,2^BITS-1]".
// REQUIRES "k" has been ensured before.
//
// Sets the value 'v' for key 'k'.
void set(Number k, void* v)
{
array_[k] = v;
}
};
在32为系统下,根据页的大小,总共能存储2 ^ 32 / 2 ^ 13 = 2 ^ 19个页,因此BITS要传入19,在构造函数中,要为指针数组array_开辟空间,这里该用了直接从系统堆空间来获取。
随后,直接根据页号通过 get 和 set 函数来获取和写入映射。
(2)二层基数树
// Two-level radix tree
template <int BITS>
class TCMalloc_PageMap2 {
private:
// Put 32 entries in the root and (2^BITS)/32 entries in each leaf.
static const int ROOT_BITS = 5;
static const int ROOT_LENGTH = 1 << ROOT_BITS;
static const int LEAF_BITS = BITS - ROOT_BITS;
static const int LEAF_LENGTH = 1 << LEAF_BITS;
// Leaf node
struct Leaf
{
void* values[LEAF_LENGTH];
};
Leaf* root_[ROOT_LENGTH]; // Pointers to 32 child nodes
void* (*allocator_)(size_t); // Memory allocator
public:
typedef uintptr_t Number;
//explicit TCMalloc_PageMap2(void* (*allocator)(size_t))
explicit TCMalloc_PageMap2()
{
//allocator_ = allocator;
memset(root_, 0, sizeof(root_));
PreallocateMoreMemory();
}
void* get(Number k) const {
const Number i1 = k >> LEAF_BITS;
const Number i2 = k & (LEAF_LENGTH - 1);
if ((k >> BITS) > 0 || root_[i1] == NULL)
{
return NULL;
}
return root_[i1]->values[i2];
}
void set(Number k, void* v)
{
const Number i1 = k >> LEAF_BITS;
const Number i2 = k & (LEAF_LENGTH - 1);
ASSERT(i1 < ROOT_LENGTH);
root_[i1]->values[i2] = v;
}
//开辟空间
bool Ensure(Number start, size_t n)
{
for (Number key = start; key <= start + n - 1;)
{
const Number i1 = key >> LEAF_BITS;
// Check for overflow
if (i1 >= ROOT_LENGTH)
return false;
// Make 2nd level node if necessary
if (root_[i1] == NULL)
{
//Leaf* leaf = reinterpret_cast<Leaf*>((*allocator_)(sizeof(Leaf)));
//if (leaf == NULL) return false;
static ObjectPool<Leaf> LeafPool;
Leaf* leaf = (Leaf*)LeafPool.New();
memset(leaf, 0, sizeof(*leaf));
root_[i1] = leaf;
}
// Advance key past whatever is covered by this leaf node
key = ((key >> LEAF_BITS) + 1) << LEAF_BITS;
}
return true;
}
void PreallocateMoreMemory()
{
// Allocate enough to keep track of all possible pages
Ensure(0, 1 << BITS);
}
};
二层基数树不同于一层的点在于,它将2 ^ 19个节点分为两层来连接,第一层有2 ^ 5个节点,每个节点中再连接 2 ^ 14个节点,类似于二维数组的树形机构。
总体来说,一层基数树和二层基数树在空间占据上是相同的,而二层基数树的优势在于,可以不用直接开辟 2 ^ 19个节点,当你需要用到对于键值范围的节点时,再进行开辟。
但是上述代码在设计时,依然是直接将所有的空间都开辟好了。
同时在 get 和 set 的调用中,需要分别计算出对应键值所在的第一层和第二层的位置,也就是页号k 的高5位和低14位。
三层基数树的结构与二层类似,这里就不在过多分享。
使用基数树优化之后结果如下:
与优化前相比,性能提升了一倍,不同的计算机系统的优化力度也会不一样。
总结
以上就是高并发内存池项目的完整分享啦,第一次写一个大项目,出现了很多的问题,发现了很多知识层面的掌握不牢或者忘记的情况,但是完整写完整个项目之后,感觉自己的代码能力有了很大的提升,对于相关知识的掌握也有了质的飞跃。
最后完整代码放在Gitee仓库:ConcurrentMemoryPool · 楠朋友学编程/MyProjects - 码云 - 开源中国