一.PageCache基本结构
1.PageCache任务
PageCache负责使用系统调用向系统申请页的内存,给CentralCache分配大块儿的内存,以及合并前后页空闲的内存,整体也是一个单例,需要加锁.
PageCache桶的下标按照页号进行映射,每个桶里span的页数即为下标大小.
2.基本结构
当每个线程的ThreadCache没有内存时都会向central cache申请,此时多个线程的ThreadCache如果访问的不是CentralCache的同一个桶,那么这些线程是可以同时进行访问的。
这时CentralCache的多个桶就可能同时向PageCache申请内存的,所以PageCache也是存在线程安全问题的,因此在访问PageCache时也必须要加锁。
在PageCache这里我们不能使用桶锁,因为当CentralCache向PageCache申请内存时,PageCache可能会将其他桶当中大页的span切小后再给CentralCache。此外,当CentralCache将某个span归还给PageCache时,PageCache也会尝试将该span与其他桶当中的span进行合并。
即PageCache内部存在多个桶之间的交互,所以要么所有桶都加锁,要么给PageCache加一把大锁
也就是说,在访问PageCache时,我们可能需要访问PageCache中的多个桶,如果PageCache用桶锁就会出现大量频繁的加锁和解锁,导致程序的效率低下。因此我们在访问PageCache时使用没有使用桶锁,而是用一个大锁将整个PageCache给锁住。
此外,page cache在整个进程中也是只能存在一个的,因此我们也需要将其设置为单例模式。
二.class PageCache
三.系统调用接口封装
// 直接去堆上按页申请空间
inline static void* SystemAlloc(size_t kpage)
{
#ifdef _WIN32
void* ptr = VirtualAlloc(0, kpage << 13, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
#else
// linux下brk mmap等
size_t bytes = kpage << 13; // kpage 是页数,每页大小为 8 KB(1 << 13 字节)
ptr = mmap(0, bytes, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
if (ptr == MAP_FAILED) {
ptr = nullptr; // mmap 失败时返回 MAP_FAILED,我们需要将其转换为 nullptr
}
#endif
if (ptr == nullptr)
throw std::bad_alloc();
return ptr;
}
inline static void SystemFree(void* ptr)
{
#ifdef _WIN32
VirtualFree(ptr, 0, MEM_RELEASE);
#else
// sbrk unmmap等
#endif
}
三.NewSpan获取一个k页的span
// 获取一个K页的span
Span* PageCache::NewSpan(size_t k)
{
assert(k > 0);
// 大于128 page的直接向堆申请
if (k >= NPAGES)
{
void* ptr = SystemAlloc(k);
//Span* span = new Span;
Span* span = _spanPool.New();
span->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;
span->_n = k;
//_idSpanMap[span->_pageId] = span;
_idSpanMap.set(span->_pageId, span);
return span;
}
///1._spanLists中有k个page的span
if (!_spanLists[k].Empty())
{
Span* kSpan = _spanLists[k].PopFront();
// 建立id和span的映射,方便central cache回收小块内存时,查找对应的span
for (PAGE_ID i = 0; i < kSpan->_n; ++i)
{
//_idSpanMap[kSpan->_pageId + i] = kSpan;
_idSpanMap.set(kSpan->_pageId + i, kSpan);
}
return kSpan;
}
//2._spanLists[k]为空,从更大的span中寻找
for (size_t i = k + 1; i < NPAGES; ++i)
{
if (!_spanLists[i].Empty())
{
Span* nSpan = _spanLists[i].PopFront();
//Span* kSpan = new Span;
Span* kSpan = _spanPool.New();
// 在nSpan的头部切一个k页下来,k页span返回,nSpan再挂到对应映射的位置
kSpan->_pageId = nSpan->_pageId;
kSpan->_n = k;
nSpan->_pageId += k;
nSpan->_n -= k;
_spanLists[nSpan->_n].PushFront(nSpan);
// 存储nSpan的首位页号跟nSpan映射,方便PageCache回收内存时向前向后合并查找
//_idSpanMap[nSpan->_pageId] = nSpan;
//_idSpanMap[nSpan->_pageId + nSpan->_n - 1] = nSpan;
_idSpanMap.set(nSpan->_pageId, nSpan);
_idSpanMap.set(nSpan->_pageId + nSpan->_n - 1, nSpan);
// 建立id和span的映射,方便CentralCache回收小块内存时,查找对应的span
for (PAGE_ID i = 0; i < kSpan->_n; ++i)
{
//_idSpanMap[kSpan->_pageId + i] = kSpan;
_idSpanMap.set(kSpan->_pageId + i, kSpan);
}
return kSpan;
}
}
//3.整个_spanLists都为空,向堆申请一个NPAGES(128)大小的span
//Span* bigSpan = new Span;
Span* bigSpan = _spanPool.New();
void* ptr = SystemAlloc(NPAGES - 1);
bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;
bigSpan->_n = NPAGES - 1;
_spanLists[bigSpan->_n].PushFront(bigSpan);
//4.递归调用NewSpan重新切分
return NewSpan(k);
}
- 因为PageCache是直接按照页数进行映射的,因此我们要从PageCache获取一个k页的span,就应该直接先去找PageCache的第k号桶,如果第k号桶中有span,那我们直接头删一个span返回给CentralCache就行了.
- 如果PageCache的第k号桶中没有span,我们就应该继续找后面的桶,只要后面任意一个桶中有一个n页span,我们就可以将其切分成一个k页的span和一个n-k页的span,然后将切出来k页的span返回给CentralCache,再将n-k页的span挂到PageCache的第n-k号桶即可。
- 但如果后面的桶中都没有span,此时我们就需要向堆申请一个128页的span了,在向堆申请内存时,直接调用我们封装的SystemAlloc函数即可。
需要注意的是,向堆申请内存后得到的是这块内存的起始地址,此时我们需要将该地址转换为页号。由于我们向堆申请内存时都是按页进行申请的,因此我们直接将该地址除以一页的大小即可得到对应的页号。
递归调用:将申请到的128页的span挂到PageCache对应的哈希桶128号中,然后递归调用该函数,此时在往后找span时就一定会在第128号桶中找到该span,然后进行切分。
这里可以使用递归锁,或者在NewSpan外部加锁,防止递归时产生死锁.
这里其实有一个问题:当CentralCache向PageCache申请内存时,CentralCache对应的哈希桶是处于加锁的状态的,那在访问PageCache之前我们应不应该把CentralCache对应的桶锁解掉呢?
这里建议在访问PageCache前,先把CentralCache对应的桶锁解掉。
虽然此时CentralCache的这个桶当中是没有内存供其他ThreadCache申请的,但ThreadCache除了申请内存还会释放内存,如果在访问PageCache前将CentralCache对应的桶锁解掉,那么此时当其他ThreadCache想要归还内存到CentralCache的这个桶时就不会阻塞
因此在调用NewSpan函数之前,我们需要先将CentralCache对应的桶锁解掉,然后再将PageCache的大锁加上,当申请到k页的span后,我们需要将PageCache的大锁解掉,CentralCache拿到k页的span后对其进行切分操作(该过程不需要加锁),在span切好后需要将其挂到CentralCache对应的桶上时,再获取对应的桶锁。
四.MapObjectToSpan哈希
PageCache在合并span时,是需要通过页号获取到对应的span的,因此要华北库页号与span之间的映射关系,用MapObjectToSpan存储.
Span* PageCache::MapObjectToSpan(void* obj)
{
PAGE_ID id = ((PAGE_ID)obj >> PAGE_SHIFT);
Span* ret = (Span*)_idSpanMap.get(id);
assert(ret != nullptr);
return ret;
//std::unique_lock<std::mutex> lock(_pageMtx);
//auto ret = _idSpanMap.find(id);
//if (ret != _idSpanMap.end())
//{
// return ret->second;
//}
//else
//{
// assert(false);
// return nullptr;
//}
}
五.ReleaseSpanToPageCache归还Span
void PageCache::ReleaseSpanToPageCache(Span* span)
{
// 大于128 page的直接还给堆
if (span->_n > NPAGES - 1)
{
void* ptr = (void*)(span->_pageId << PAGE_SHIFT);
SystemFree(ptr);
//delete span;
_spanPool.Delete(span);
return;
}
// 对span前后的页,尝试进行合并,缓解内存碎片问题
while (1)
{
PAGE_ID prevId = span->_pageId - 1;
//1.前一个page还没有被申请
auto ret = (Span*)_idSpanMap.get(prevId);
if (ret == nullptr)break;
//2.前面相邻页的span在使用,不合并了
Span* prevSpan = ret;
if (prevSpan->_isUse == true)break;
//3.合并出超过128页的span无需合并
if (prevSpan->_n + span->_n > NPAGES - 1)break;
span->_pageId = prevSpan->_pageId;
span->_n += prevSpan->_n;
_spanLists[prevSpan->_n].Erase(prevSpan);
//delete prevSpan;
_spanPool.Delete(prevSpan);
}
// 向后合并
while (1)
{
PAGE_ID nextId = span->_pageId + span->_n;
auto ret = (Span*)_idSpanMap.get(nextId);
if (ret == nullptr)break;
Span* nextSpan = ret;
if (nextSpan->_isUse == true)break;
if (nextSpan->_n + span->_n > NPAGES - 1)break;
span->_n += nextSpan->_n;
_spanLists[nextSpan->_n].Erase(nextSpan);
//delete nextSpan;
_spanPool.Delete(nextSpan);
}
_spanLists[span->_n].PushFront(span);
span->_isUse = false;
//_idSpanMap[span->_pageId] = span;
//_idSpanMap[span->_pageId+span->_n-1] = span;
_idSpanMap.set(span->_pageId, span);
_idSpanMap.set(span->_pageId + span->_n - 1, span);
}
如果CentralCache中有某个span的_useCount减到0了,CentralCache就需要将这个span还给PageCache.
这个过程看似是非常简单的,PageCache只需将还回来的span挂到对应的哈希桶上就行了。但实际为了缓解内存碎片的问题,PageCache还需要尝试将还回来的span与其他空闲的span进行合并。
合并的过程可以分为向前合并和向后合并.
如果还回来的span的起始页号是num,该span所管理的页数是n.
那么在向前合并时,就需要判断第num-1页对应span是否空闲,如果空闲则可以将其进行合并,并且合并后还需要继续向前尝试进行合并,直到不能进行合并为止.
而在向后合并时,就需要判断第num+n页对应的span是否空闲,如果空闲则可以将其进行合并,并且合并后还需要继续向后尝试进行合并,直到不能进行合并为止.
因此PageCache在合并span时,是需要通过页号获取到对应的span的,这就是我们要把页号与span之间的映射关系存储到PageCache的原因.
但需要注意的是,当我们通过页号找到其对应的span时,这个span此时可能挂在PageCache,也可能挂在CentralCache。而在合并时我们只能合并挂在PageCache的span,因为挂在CentralCache的span当中的对象正在被其他线程使用。
可是我们不能通过span结构当中的_useCount成员,来判断某个span到底是在CentralCache还是在PageCache.因为当CentralCache刚向PageCache申请到一个span时,这个span的_useCount就是等于0的,这时可能当我们正在对该span进行切分的时候,PageCache就把这个span拿去进行合并了,这显然是不合理的。
因此,我们可以在span结构中再增加一个_isUse成员,用于标记这个span是否正在被使用,而当一个span结构被创建时我们默认该span是没有被使用的。
即在PageCache中,_isUse=false,在CentralCache中为true
由于在合并PageCache当中的span时,需要通过页号找到其对应的span,而一个span是在被分配给CentralCache时,才建立的各个页号与span之间的映射关系,因此PageCache当中的span也需要建立页号与span之间的映射关系。
与CentralCache中的span不同的是,在PageCache中,只需建立一个span的首尾页号与该span之间的映射关系。因为当一个span在尝试进行合并时,如果是往前合并,那么只需要通过一个span的尾页找到这个span,如果是向后合并,那么只需要通过一个span的首页找到这个span。也就是说,在进行合并时我们只需要用到span与其首尾页之间的映射关系就够了。
因此当我们申请k页的span时,如果是将n页的span切成了一个k页的span和一个n-k页的span,我们除了需要建立k页span中每个页与该span之间的映射关系之外,还需要建立剩下的n-k页的span与其首尾页之间的映射关系。
PageCache.cpp/.h
PageCache.cpp
#include "PageCache.h"
PageCache PageCache::_sInst;
std::mutex PageCache::_pageMtx;
// 获取一个K页的span
Span* PageCache::NewSpan(size_t k)
{
assert(k > 0);
// 大于128 page的直接向堆申请
if (k >= NPAGES)
{
void* ptr = SystemAlloc(k);
//Span* span = new Span;
Span* span = _spanPool.New();
span->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;
span->_n = k;
//_idSpanMap[span->_pageId] = span;
_idSpanMap.set(span->_pageId, span);
return span;
}
///1._spanLists中有k个page的span
if (!_spanLists[k].Empty())
{
Span* kSpan = _spanLists[k].PopFront();
// 建立id和span的映射,方便central cache回收小块内存时,查找对应的span
for (PAGE_ID i = 0; i < kSpan->_n; ++i)
{
//_idSpanMap[kSpan->_pageId + i] = kSpan;
_idSpanMap.set(kSpan->_pageId + i, kSpan);
}
return kSpan;
}
//2._spanLists[k]为空,从更大的span中寻找
for (size_t i = k + 1; i < NPAGES; ++i)
{
if (!_spanLists[i].Empty())
{
Span* nSpan = _spanLists[i].PopFront();
//Span* kSpan = new Span;
Span* kSpan = _spanPool.New();
// 在nSpan的头部切一个k页下来,k页span返回,nSpan再挂到对应映射的位置
kSpan->_pageId = nSpan->_pageId;
kSpan->_n = k;
nSpan->_pageId += k;
nSpan->_n -= k;
_spanLists[nSpan->_n].PushFront(nSpan);
// 存储nSpan的首位页号跟nSpan映射,方便PageCache回收内存时向前向后合并查找
//_idSpanMap[nSpan->_pageId] = nSpan;
//_idSpanMap[nSpan->_pageId + nSpan->_n - 1] = nSpan;
_idSpanMap.set(nSpan->_pageId, nSpan);
_idSpanMap.set(nSpan->_pageId + nSpan->_n - 1, nSpan);
// 建立id和span的映射,方便CentralCache回收小块内存时,查找对应的span
for (PAGE_ID i = 0; i < kSpan->_n; ++i)
{
//_idSpanMap[kSpan->_pageId + i] = kSpan;
_idSpanMap.set(kSpan->_pageId + i, kSpan);
}
return kSpan;
}
}
//3.整个_spanLists都为空,向堆申请一个NPAGES(128)大小的span
//Span* bigSpan = new Span;
Span* bigSpan = _spanPool.New();
void* ptr = SystemAlloc(NPAGES - 1);
bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;
bigSpan->_n = NPAGES - 1;
_spanLists[bigSpan->_n].PushFront(bigSpan);
//4.递归调用NewSpan重新切分
return NewSpan(k);
}
Span* PageCache::MapObjectToSpan(void* obj)
{
PAGE_ID id = ((PAGE_ID)obj >> PAGE_SHIFT);
Span* ret = (Span*)_idSpanMap.get(id);
assert(ret != nullptr);
return ret;
//std::unique_lock<std::mutex> lock(_pageMtx);
//auto ret = _idSpanMap.find(id);
//if (ret != _idSpanMap.end())
//{
// return ret->second;
//}
//else
//{
// assert(false);
// return nullptr;
//}
}
void PageCache::ReleaseSpanToPageCache(Span* span)
{
// 大于128 page的直接还给堆
if (span->_n > NPAGES - 1)
{
void* ptr = (void*)(span->_pageId << PAGE_SHIFT);
SystemFree(ptr);
//delete span;
_spanPool.Delete(span);
return;
}
// 对span前后的页,尝试进行合并,缓解内存碎片问题
while (1)
{
PAGE_ID prevId = span->_pageId - 1;
//1.前一个page还没有被申请
auto ret = (Span*)_idSpanMap.get(prevId);
if (ret == nullptr)break;
//2.前面相邻页的span在使用,不合并了
Span* prevSpan = ret;
if (prevSpan->_isUse == true)break;
//3.合并出超过128页的span无需合并
if (prevSpan->_n + span->_n > NPAGES - 1)break;
span->_pageId = prevSpan->_pageId;
span->_n += prevSpan->_n;
_spanLists[prevSpan->_n].Erase(prevSpan);
//delete prevSpan;
_spanPool.Delete(prevSpan);
}
// 向后合并
while (1)
{
PAGE_ID nextId = span->_pageId + span->_n;
auto ret = (Span*)_idSpanMap.get(nextId);
if (ret == nullptr)break;
Span* nextSpan = ret;
if (nextSpan->_isUse == true)break;
if (nextSpan->_n + span->_n > NPAGES - 1)break;
span->_n += nextSpan->_n;
_spanLists[nextSpan->_n].Erase(nextSpan);
//delete nextSpan;
_spanPool.Delete(nextSpan);
}
_spanLists[span->_n].PushFront(span);
span->_isUse = false;
//_idSpanMap[span->_pageId] = span;
//_idSpanMap[span->_pageId+span->_n-1] = span;
_idSpanMap.set(span->_pageId, span);
_idSpanMap.set(span->_pageId + span->_n - 1, span);
}
PageCache.h
#pragma once
#include "Common.h"
#include "ObjectPool.h"
#include "PageMap.h"
class PageCache
{
public:
static PageCache* GetInstance()
{
return &_sInst;
}
// 获取从内存对象到span的映射
Span* MapObjectToSpan(void* obj);
// 释放空闲span回到Pagecache,并合并相邻的span
void ReleaseSpanToPageCache(Span* span);
// 获取一个K页的span
Span* NewSpan(size_t k);
static std::mutex _pageMtx;
private:
SpanList _spanLists[NPAGES];
ObjectPool<Span> _spanPool;
//std::unordered_map<PAGE_ID, Span*> _idSpanMap;
#ifdef _WIN64
TCMalloc_PageMap3<64 - PAGE_SHIFT> _idSpanMap;
#else
TCMalloc_PageMap1<32 - PAGE_SHIFT> _idSpanMap;
#endif
private:
PageCache(){}
PageCache(const PageCache&) = delete;
static PageCache _sInst;
};