目录
一.CentralCache基本结构
1.CentralCache任务
2.基本结构
二.函数调用层次结构/.h文件
三.Span和SpanList的封装
Span:大块内存跨度
PAGE_ID _pageId
size_t _objSize
_useCount
SpanList:管理Span的双链表(桶锁)
四.获取大块内存GetOneSpan
五.FetchRangeObj输出内存对象
六.ReleaseListToSpans
MapObjectToSpan
为什么读取基数树映射关系时不需要加锁?
七.基数树代码
单层基数树
双层基数树
八.CentralCache.cpp/.h
一.CentralCache基本结构
1.CentralCache任务
有与ThreadCache相同数量的哈希桶,分别管理一个SpanList,负责从PageCache获取大块内存Span,完成切分后挂到freeList,当ThreadCache对应的桶无内存时,再从对应的freeList切分一段给ThreadCache.
2.基本结构
CentralCache与ThreadCache有两个明显不同的地方.
首先,ThreadCache是每个线程独享的,而CentralCache是所有线程共享的一个单例,因为每个线程的CentralCache没有内存了都会去找CentralCache,因此在访问CentralCache时是需要加锁的。
但CentralCache在加锁时并不是将整个CentralCache全部锁上了,CentralCache在加锁时用的是桶锁,也就是说每个桶都有一把锁。
此时只有当多个线程同时访问CentralCache的同一个桶时才会存在锁竞争,如果是多个线程同时访问CentralCache的不同桶就不会存在锁竞争。
CentralCache与ThreadCache的第二个不同之处就是,ThreadCache的每个桶中挂的是一个个切好的内存块,而CentralCache的每个桶中挂的由SpanList管理的Span.
二.函数调用层次结构/.h文件
三.Span和SpanList的封装
Span:大块内存跨度
// 管理多个连续页大块内存跨度结构
struct Span
{
PAGE_ID _pageId = 0; // 大块内存起始页的页号
size_t _n = 0; // 页的数量
Span* _next = nullptr; // 双向链表的结构
Span* _prev = nullptr;
size_t _objSize = 0; // 切好的小对象的大小
size_t _useCount = 0; // 切好小块内存,被分配给ThreadCache的计数
void* _freeList = nullptr; // 切好的小块内存的自由链表
bool _isUse = false; // 是否在被使用
};
PAGE_ID _pageId
表示大块内存起始页的页号,由于大块内存可能由好多page组成,因此Span中记录起始页号,方便后续进行内存管理._n为该Span中page的个数
size_t _objSize
Span中_freeList管理的每个小块内存对象的实际大小
_useCount
该Span中的小块内存被分配给ThreadCache使用的个数.初始状态为0
当use_Count再次为0时,代表这个Span中所有被分配出去的小块儿内存都被ThreadCache还回来了,此时可直接将这个Span从还给PageCache
而_isUse就是区分_useCount为0时的2个不同状态的
SpanList:管理Span的双链表(桶锁)
// 带头双向循环链表
class SpanList
{
public:
SpanList()
{
_head = new Span;
_head->_next = _head;
_head->_prev = _head;
}
Span* Begin()
{
return _head->_next;
}
Span* End()
{
return _head;
}
bool Empty()
{
return _head->_next == _head;
}
void PushFront(Span* span)
{
Insert(Begin(), span);
}
Span* PopFront()
{
Span* front = _head->_next;
Erase(front);
return front;
}
void Insert(Span* pos, Span* newSpan)
{
assert(pos);
assert(newSpan);
Span* prev = pos->_prev;
// prev newspan pos
prev->_next = newSpan;
newSpan->_prev = prev;
newSpan->_next = pos;
pos->_prev = newSpan;
}
void Erase(Span* pos)
{
assert(pos);
assert(pos != _head);
Span* prev = pos->_prev;
Span* next = pos->_next;
prev->_next = next;
next->_prev = prev;
}
private:
Span* _head;
public:
std::mutex _mtx; // 桶锁
};
从双链表删除的Span会还给下一层的PageCache,相当于只是把这个Span从双链表中移除,因此不需要对删除的Span进行delete.
四.获取大块内存GetOneSpan
从SpanList获取一个非空的Span,如果没有,就从PageCache获取一个NewSpan并切分好再返回.
// 获取一个非空的span
Span* CentralCache::GetOneSpan(SpanList& list, size_t size)
{
//1.sList中有span就返回
Span* span = list.Begin();
while (span != list.End())
{
if (span->_freeList)
return span;
else
span = span->_next;
}
// 先把CentralCache的桶锁解掉,这样如果其他线程释放内存对象回来,不会阻塞
list._mtx.unlock();
//2.list中没有span,向PageCache申请
PageCache::GetInstance()->_pageMtx.lock();
span = PageCache::GetInstance()->NewSpan(SizeClass::NumMovePage(size));
PageCache::GetInstance()->_pageMtx.unlock();
span->_isUse = true;
span->_objSize = size;
// 对获取到的Span进行切分,不需要加锁,因为此时其他线程访问不到这个Span
// 计算Span的大块内存的起始地址和大块内存的大小(字节数)
//这个页的起始地址是 PageId*8*1024,第0页的地址是0,以此类推
//计算这个span总共有多少个字节,用_n(页数)*8*1024
char* start = (char*)(span->_pageId << PAGE_SHIFT);
size_t bytes = span->_n << PAGE_SHIFT;
char* end = start + bytes;
//3.对span管理的大块内存进行切分,尾插链接到_freeList
span->_freeList = start;
start += size;
void* tail = span->_freeList;
int i = 1;
while (start < end)
{
++i;
NextObj(tail) = start;
tail = NextObj(tail); // tail = start;
start += size;
}
NextObj(tail) = nullptr;
//4.将申请到的新的span头插到list,访问共享资源,需要加锁
list._mtx.lock();
list.PushFront(span);
return span;
}
操作单个Span时不用加锁,此时该Span一定没有被使用,但操作SpanList时需要加锁.
五.FetchRangeObj输出内存对象
被ThreadCache中的FetchFromCentralCache所调用,用start和end来标识被ThreadCache取走的一批量内存的首尾2个.
// 从中心缓存获取一定数量的对象给ThreadCache, 输出型参数
size_t CentralCache::FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size)
{
size_t index = SizeClass::Index(size);
_spanLists[index]._mtx.lock();
Span* span = GetOneSpan(_spanLists[index], size);
assert(span && span->_freeList);
//1.span中对象充足,则取batchNum个
//2.span不足,则end指向最后一个
//从前向后取
start = end = span->_freeList;
size_t actualNum = 1;
for (size_t i = 0; i < batchNum - 1; ++i)
{
if (NextObj(end) == nullptr)break;
++actualNum;
end = NextObj(end);
}
span->_freeList = NextObj(end);
span->_useCount += actualNum;
NextObj(end) = nullptr;
_spanLists[index]._mtx.unlock();
return actualNum;
}
由于CentralCache是所有线程共享的,所以在访问CentralCache中的哈希桶时,需要先给对应的哈希桶加上桶锁,在获取到对象后再将桶锁解掉。
在从CentralCache获取对象时,先是在CentralCache对应的哈希桶中获取到一个非空Span,然后从这个Span的自由链表中取出个对象即可,但可能这个非空的span的自由链表当中对象的个数不足batchNum个,这时该自由链表当中有多少个对象就给多少就行了。
也就是说,ThreadCache实际从CentralCache获得的对象的个数可能与我们传入的batchNum是不一样的,因此我们需要统计本次申请过程中,ThreadCache实际获取到的对象个数,然后根据该值及时更新这个Span中的小对象被分配给ThreadCache的计数_useCount。
事实上,ThreadCache只要求我们取出1个对象,我们期望取出batchNum是为了整体效率的优化,但是若实际上取出的个数小于batchNum个也是没有问题的.
六.ReleaseListToSpans
void CentralCache::ReleaseListToSpans(void* start, size_t size)
{
size_t index = SizeClass::Index(size);
SpanList& sList = _spanLists[index];
sList._mtx.lock();
while (start)
{
void* next = NextObj(start);
Span* span = PageCache::GetInstance()->MapObjectToSpan(start);
NextObj(start) = span->_freeList;
span->_freeList = start;
span->_useCount--;
// 说明span的切分出去的所有小块内存都回来了
// 这个span就可以再回收给page cache
if (span->_useCount == 0)
{
sList.Erase(span);
span->_freeList = nullptr;
span->_next = nullptr;
span->_prev = nullptr;
//将span还给PageCache
sList._mtx.unlock();
PageCache::GetInstance()->_pageMtx.lock();
PageCache::GetInstance()->ReleaseSpanToPageCache(span);
PageCache::GetInstance()->_pageMtx.unlock();
sList._mtx.lock();
}
start = next;
}
sList._mtx.unlock();
}
当ThreadCache中某个自由链表太长时,会将自由链表当中的这些对象还给CentralCache中的对应的Span
但是需要注意的是,还给CentralCache的这些对象不一定都是属于同一个Span的CentralCache中的每个哈希桶当中可能都不止一个Span,因此当我们计算出还回来的对象应该还给CentralCache的哪一个桶后,还需要知道这些对象到底应该还给这个桶当中的哪一个Span
因为通过对象的地址除以一个page的大小即可得到页号,因此需要再建立PAGE_ID到Span的映射关系方便还内存给Span.
这时当ThreadCache还对象给CentralCache时,就可以依次遍历这些对象,将这些对象插入到其对应span的自由链表当中,并且及时更新该span的_useCount计数即可。
在ThreadCache还对象给CentralCache的过程中,如果CentralCache中某个span的_useCount减到0时,说明这个span分配出去的对象全部都还回来了,那么此时就可以将这个span再进一步还给PageCache
需要注意,如果要把某个span还给PageCache,我们需要先将这个span从CentralCache对应的双链表中移除,然后再将该span的自由链表置空,因为PageCache中的span是不需要切分成一个个的小对象的,以及该span的前后指针也都应该置空,因为之后要将其插入到PageCache对应的双链表中。但span当中记录的起始页号以及它管理的页数是不能清除的,否则对应内存块就找不到了.(从SpanList中移除)
并且在CentralCache还span给PageCache时也存在锁的问题,此时需要先将CentralCache中对应的桶锁解掉,然后再加上PageCache的大锁之后才能进入PageCache进行相关操作,当处理完毕回到CentralCache时,除了将PageCache的大锁解掉,还需要立刻获得CentralCache对应的桶锁,然后将还未还完对象继续还给CentralCache中对应的span(相关加锁解锁操作)
MapObjectToSpan
用unordered_map即可,为了效率优化,这里优化成了基数树.
Span* PageCache::MapObjectToSpan(void* obj)
{
PAGE_ID id = ((PAGE_ID)obj >> PAGE_SHIFT);
Span* ret = (Span*)_idSpanMap.get(id);
assert(ret != nullptr);
return ret;
//std::unique_lock<std::mutex> lock(_pageMtx);
//auto ret = _idSpanMap.find(id);
//if (ret != _idSpanMap.end())
//{
// return ret->second;
//}
//else
//{
// assert(false);
// return nullptr;
//}
}
为什么读取基数树映射关系时不需要加锁?
当某个线程在读取映射关系时,可能另外一个线程正在建立其他页号的映射关系,而此时无论我们用的是C++当中的map还是unordered_map,在读取映射关系时都是需要加锁(读写锁)的。
因为C++中map的底层数据结构是红黑树,unordered_map的底层数据结构是哈希表,而无论是红黑树还是哈希表,当我们在插入数据时其底层的结构都有可能会发生变化。
比如红黑树在插入数据时可能会引起树的旋转,而哈希表在插入数据时可能会引起哈希表扩容。此时要避免出现数据不一致的问题,就不能让插入操作和读取操作同时进行,因此我们在读取映射关系的时候是需要加锁的。
而对于基数树来说就不一样了,基数树的空间一旦开辟好了就不会发生变化,因此无论什么时候去读取某个页的映射,都是对应在一个固定的位置进行读取的。
并且我们不会同时对同一个页进行读取映射和建立映射的操作,因为我们只有在释放对象时才需要读取映射,而建立映射的操作都是在PageCache中进行的。
也就是说,读取映射时读取的都是对应Span的_useCount !=0的页,而建立映射时建立的都是对应Span的_useCount == 0的页,所以说我们不会同时对同一个页进行读取映射和建立映射的操作。
七.基数树代码
使用基数树时间效率(不用加锁)和空间上都能进行有效优化
#pragma once
#include"Common.h"
// Single-level array
template <int BITS>
class TCMalloc_PageMap1 {
private:
static const int LENGTH = 1 << BITS;
void** array_;
public:
typedef uintptr_t Number;
//explicit TCMalloc_PageMap1(void* (*allocator)(size_t)) {
explicit TCMalloc_PageMap1() {
//array_ = reinterpret_cast<void**>((*allocator)(sizeof(void*) << BITS));
//对于1层基数树,提前用SystemAlloc开好空间
size_t size = sizeof(void*) << BITS;
size_t alignSize = SizeClass::_RoundUp(size, 1 << PAGE_SHIFT);
array_ = (void**)SystemAlloc(alignSize >> PAGE_SHIFT);
memset(array_, 0, sizeof(void*) << BITS);
}
// Return the current value for KEY. Returns NULL if not yet set,
// or if k is out of range.
void* get(Number k) const {
if ((k >> BITS) > 0) {
return NULL;
}
return array_[k];
}
// REQUIRES "k" is in range "[0,2^BITS-1]".
// REQUIRES "k" has been ensured before.
//
// Sets the value 'v' for key 'k'.
void set(Number k, void* v) {
array_[k] = v;
}
};
// Two-level radix tree
template <int BITS>
class TCMalloc_PageMap2 {
private:
// Put 32 entries in the root and (2^BITS)/32 entries in each leaf.
static const int ROOT_BITS = 5;
static const int ROOT_LENGTH = 1 << ROOT_BITS;
static const int LEAF_BITS = BITS - ROOT_BITS;
static const int LEAF_LENGTH = 1 << LEAF_BITS;
// Leaf node
struct Leaf {
void* values[LEAF_LENGTH];
};
Leaf* root_[ROOT_LENGTH]; // Pointers to 32 child nodes
void* (*allocator_)(size_t); // Memory allocator
public:
typedef uintptr_t Number;
//explicit TCMalloc_PageMap2(void* (*allocator)(size_t)) {
explicit TCMalloc_PageMap2() {
//allocator_ = allocator;
memset(root_, 0, sizeof(root_));
//构造函数提前开好空间
PreallocateMoreMemory();
}
void* get(Number k) const {
const Number i1 = k >> LEAF_BITS;
const Number i2 = k & (LEAF_LENGTH - 1);
if ((k >> BITS) > 0 || root_[i1] == NULL) {
return NULL;
}
return root_[i1]->values[i2];
}
void set(Number k, void* v) {
const Number i1 = k >> LEAF_BITS;
const Number i2 = k & (LEAF_LENGTH - 1);
ASSERT(i1 < ROOT_LENGTH);
root_[i1]->values[i2] = v;
}
bool Ensure(Number start, size_t n) {
for (Number key = start; key <= start + n - 1;) {
const Number i1 = key >> LEAF_BITS;
// Check for overflow
if (i1 >= ROOT_LENGTH)
return false;
// Make 2nd level node if necessary
if (root_[i1] == NULL) {
//Leaf* leaf = reinterpret_cast<Leaf*>((*allocator_)(sizeof(Leaf)));
//if (leaf == NULL) return false;
static ObjectPool<Leaf> leafPool;
Leaf* leaf = (Leaf*)leafPool.New();
memset(leaf, 0, sizeof(*leaf));
root_[i1] = leaf;
}
// Advance key past whatever is covered by this leaf node
key = ((key >> LEAF_BITS) + 1) << LEAF_BITS;
}
return true;
}
void PreallocateMoreMemory() {
// Allocate enough to keep track of all possible pages
Ensure(0, 1 << BITS);
}
};
// Three-level radix tree
template <int BITS>
class TCMalloc_PageMap3 {
private:
// How many bits should we consume at each interior level
static const int INTERIOR_BITS = (BITS + 2) / 3; // Round-up
static const int INTERIOR_LENGTH = 1 << INTERIOR_BITS;
// How many bits should we consume at leaf level
static const int LEAF_BITS = BITS - 2 * INTERIOR_BITS;
static const int LEAF_LENGTH = 1 << LEAF_BITS;
// Interior node
struct Node {
Node* ptrs[INTERIOR_LENGTH];
};
// Leaf node
struct Leaf {
void* values[LEAF_LENGTH];
};
Node* root_; // Root of radix tree
void* (*allocator_)(size_t); // Memory allocator
Node* NewNode() {
Node* result = reinterpret_cast<Node*>((*allocator_)(sizeof(Node)));
if (result != NULL) {
memset(result, 0, sizeof(*result));
}
return result;
}
public:
typedef uintptr_t Number;
explicit TCMalloc_PageMap3(void* (*allocator)(size_t)) {
allocator_ = allocator;
root_ = NewNode();
}
void* get(Number k) const {
const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS);
const Number i2 = (k >> LEAF_BITS) & (INTERIOR_LENGTH - 1);
const Number i3 = k & (LEAF_LENGTH - 1);
if ((k >> BITS) > 0 ||
root_->ptrs[i1] == NULL || root_->ptrs[i1]->ptrs[i2] == NULL) {
return NULL;
}
return reinterpret_cast<Leaf*>(root_->ptrs[i1]->ptrs[i2])->values[i3];
}
void set(Number k, void* v) {
ASSERT(k >> BITS == 0);
const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS);
const Number i2 = (k >> LEAF_BITS) & (INTERIOR_LENGTH - 1);
const Number i3 = k & (LEAF_LENGTH - 1);
reinterpret_cast<Leaf*>(root_->ptrs[i1]->ptrs[i2])->values[i3] = v;
}
bool Ensure(Number start, size_t n) {
for (Number key = start; key <= start + n - 1;) {
const Number i1 = key >> (LEAF_BITS + INTERIOR_BITS);
const Number i2 = (key >> LEAF_BITS) & (INTERIOR_LENGTH - 1);
// Check for overflow
if (i1 >= INTERIOR_LENGTH || i2 >= INTERIOR_LENGTH)
return false;
// Make 2nd level node if necessary
if (root_->ptrs[i1] == NULL) {
Node* n = NewNode();
if (n == NULL) return false;
root_->ptrs[i1] = n;
}
// Make leaf node if necessary
if (root_->ptrs[i1]->ptrs[i2] == NULL) {
Leaf* leaf = reinterpret_cast<Leaf*>((*allocator_)(sizeof(Leaf)));
if (leaf == NULL) return false;
memset(leaf, 0, sizeof(*leaf));
root_->ptrs[i1]->ptrs[i2] = reinterpret_cast<Node*>(leaf);
}
// Advance key past whatever is covered by this leaf node
key = ((key >> LEAF_BITS) + 1) << LEAF_BITS;
}
return true;
}
void PreallocateMoreMemory() {
}
};
单层基数树
单层基数树实际采用的就是直接定址法,每一个页号对应span的地址就存储数组中在以该页号为下标的位置。
双层基数树
以32位平台下,一页的大小为8K为例来说明,此时存储页号最多需要19个比特位。而二层基数树实际上就是把这19个比特位分为两次进行映射。
比如用前5个比特位在基数树的第一层进行映射,映射后得到对应的第二层,然后用剩下的比特位在基数树的第二层进行映射,映射后最终得到该页号对应的span指针。
二层基数树相比一层基数树的好处就是,一层基数树必须一开始就把整个完整空间的数组开辟出来,而二层基数树一开始时只需将第一层的数组开辟出来,当需要进行某一页号映射时再开辟对应的第二层的数组就行了。
八.CentralCache.cpp/.h
#include "CentralCache.h"
#include "PageCache.h"
CentralCache CentralCache::_sInst;
// 获取一个非空的span
Span* CentralCache::GetOneSpan(SpanList& list, size_t size)
{
//1.sList中有span就返回
Span* span = list.Begin();
while (span != list.End())
{
if (span->_freeList)
return span;
else
span = span->_next;
}
// 先把central cache的桶锁解掉,这样如果其他线程释放内存对象回来,不会阻塞
list._mtx.unlock();
//2.list中没有span,向PageCache申请
PageCache::GetInstance()->_pageMtx.lock();
span = PageCache::GetInstance()->NewSpan(SizeClass::NumMovePage(size));
PageCache::GetInstance()->_pageMtx.unlock();
span->_isUse = true;
span->_objSize = size;
// 对获取span进行切分,不需要加锁,因为这会其他线程访问不到这个span
// 计算span的大块内存的起始地址和大块内存的大小(字节数)
char* start = (char*)(span->_pageId << PAGE_SHIFT);
size_t bytes = span->_n << PAGE_SHIFT;
char* end = start + bytes;
//3.对span管理的大块内存进行切分,尾插链接到_freeList
span->_freeList = start;
start += size;
void* tail = span->_freeList;
int i = 1;
while (start < end)
{
++i;
NextObj(tail) = start;
tail = NextObj(tail); // tail = start;
start += size;
}
NextObj(tail) = nullptr;
//4.将申请到的新的span头插到list,访问共享资源,需要加锁
list._mtx.lock();
list.PushFront(span);
return span;
}
// 从中心缓存获取一定数量的对象给thread cache
size_t CentralCache::FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size)
{
size_t index = SizeClass::Index(size);
_spanLists[index]._mtx.lock();
Span* span = GetOneSpan(_spanLists[index], size);
assert(span && span->_freeList);
//1.span中对象充足,则取batchNum个
//2.span不足,则end指向最后一个
start = end = span->_freeList;
size_t actualNum = 1;
for (size_t i = 0; i < batchNum - 1; ++i)
{
if (NextObj(end) == nullptr)break;
++actualNum;
end = NextObj(end);
}
span->_freeList = NextObj(end);
span->_useCount += actualNum;
NextObj(end) = nullptr;
_spanLists[index]._mtx.unlock();
return actualNum;
}
void CentralCache::ReleaseListToSpans(void* start, size_t size)
{
size_t index = SizeClass::Index(size);
SpanList& sList = _spanLists[index];
sList._mtx.lock();
//_spanLists[index]._mtx.lock();
while (start)
{
void* next = NextObj(start);
Span* span = PageCache::GetInstance()->MapObjectToSpan(start);
NextObj(start) = span->_freeList;
span->_freeList = start;
span->_useCount--;
// 说明span的切分出去的所有小块内存都回来了
// 这个span就可以再回收给page cache
if (span->_useCount == 0)
{
sList.Erase(span);
span->_freeList = nullptr;
span->_next = nullptr;
span->_prev = nullptr;
//将span还给PageCache
sList._mtx.unlock();
PageCache::GetInstance()->_pageMtx.lock();
PageCache::GetInstance()->ReleaseSpanToPageCache(span);
PageCache::GetInstance()->_pageMtx.unlock();
sList._mtx.lock();
}
start = next;
}
sList._mtx.unlock();
}
#pragma once
#include "Common.h"
// 单例模式
class CentralCache
{
public:
static CentralCache* GetInstance()
{
return &_sInst;
}
// 获取一个非空的span
Span* GetOneSpan(SpanList& list, size_t byte_size);
// 从中心缓存获取一定数量的对象给thread cache
size_t FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size);
// 将一定数量的对象释放到span跨度
void ReleaseListToSpans(void* start, size_t byte_size);
private:
SpanList _spanLists[NFREELIST];
private:
CentralCache(){}
CentralCache(const CentralCache&) = delete;
static CentralCache _sInst;
};