C++项目 -- 高并发内存池(七)性能瓶颈与优化

C++项目 – 高并发内存池(七)性能瓶颈与优化

文章目录

  • C++项目 -- 高并发内存池(七)性能瓶颈与优化
  • 一、检测性能瓶颈
  • 二、使用基数树来优化项目
    • 1.基数树
    • 2.不加锁的原理
    • 3.性能对比
  • 三、最终代码实现


一、检测性能瓶颈

  • DeBug下运行代码,使用VS 2019中的性能探查器,选中检测
    在这里插入图片描述
int main()
{
	size_t n = 1000;
	cout << "==========================================================" <<
		endl;
	BenchmarkConcurrentMalloc(n, 4, 10);
	cout << endl << endl;
	//BenchmarkMalloc(n, 4, 10);
	cout << "==========================================================" <<
		endl;
	return 0;
}
  • 检测结果显示:多线程竞争锁占用的执行时间占比很多,性能瓶颈还是在锁的竞争上;
    是在MapObjectToSpan函数访问_idSpanMap的时候需要加锁;
    在这里插入图片描述

二、使用基数树来优化项目

1.基数树

  • 基数树类似于哈希数组,存储整数到整数的映射,并且可以在读取的时候不加锁

一层基数树

  • 结构:
    就是一个数组;
    在这里插入图片描述

  • BITS:非类型模板参数,代表存储页号需要多少位;
    在这里插入图片描述
    32位系统下,BITS就是32 - PAGE_SHIFT,原理就是计算(2^32 / 2 ^ 13)

  • 因此在32位系统下,共有2 ^ 19页,共需要2 ^ 19长度的数组;

  • LENGTH就是数组的长度,array_是数组头指针,数组的下标就是页号,数组中存储的是span*

// Single-level array
//直接定值法映射
template <int BITS>
class TCMalloc_PageMap1 {
private:
	static const int LENGTH = 1 << BITS;
	void** array_;

public:
	typedef uintptr_t Number;

	//explicit TCMalloc_PageMap1(void* (*allocator)(size_t)) {
	explicit TCMalloc_PageMap1() {
		//array_ = reinterpret_cast<void**>((*allocator)(sizeof(void*) << BITS));
		size_t size = sizeof(void*) << BITS;
		size_t alignSize = SizeClass::_RoundUp(size, 1 << PAGE_SHIFT);
		array_ = (void**)SystemAlloc(alignSize >> PAGE_SHIFT);
		memset(array_, 0, sizeof(void*) << BITS);
	}

	// Return the current value for KEY.  Returns NULL if not yet set,
	// or if k is out of range.
	void* get(Number k) const {
		if ((k >> BITS) > 0) {
			return NULL;
		}
		return array_[k];
	}

	// REQUIRES "k" is in range "[0,2^BITS-1]".
	// REQUIRES "k" has been ensured before.
	//
	// Sets the value 'v' for key 'k'.
	void set(Number k, void* v) {
		array_[k] = v;
	}
};

两层的基数树

  • 结构:
    是分层的数组;
    在这里插入图片描述
  • 分层哈希映射:第一层数组一共2^5 = 32个槽位,第二层是一个个指针数组,数组长2^14
    在这里插入图片描述
  • 页号的前5位决定了该页在第一层的哪个位置,从而找到第二层,后14位决定其在第二层的具体位置;
  • 一层的树和两层的树在空间上没有变化,但是一层的树开辟空间要一次开辟出来,但是两层的可以用到的时候再开空间;
  • Ensure函数就是在访问到了第一层的某个位置时,开好该起始页号对应的第二层的空间
// Two-level radix tree
template <int BITS>
class TCMalloc_PageMap2 {
private:
	// Put 32 entries in the root and (2^BITS)/32 entries in each leaf.
	static const int ROOT_BITS = 5;
	static const int ROOT_LENGTH = 1 << ROOT_BITS;

	static const int LEAF_BITS = BITS - ROOT_BITS;
	static const int LEAF_LENGTH = 1 << LEAF_BITS;

	// Leaf node
	struct Leaf {
		void* values[LEAF_LENGTH];
	};

	Leaf* root_[ROOT_LENGTH];             // Pointers to 32 child nodes
	void* (*allocator_)(size_t);          // Memory allocator

public:
	typedef uintptr_t Number;

	//explicit TCMalloc_PageMap2(void* (*allocator)(size_t)) {
	explicit TCMalloc_PageMap2() {
		//allocator_ = allocator;
		memset(root_, 0, sizeof(root_));

		PreallocateMoreMemory();
	}

	void* get(Number k) const {
		const Number i1 = k >> LEAF_BITS;
		const Number i2 = k & (LEAF_LENGTH - 1);
		if ((k >> BITS) > 0 || root_[i1] == NULL) {
			return NULL;
		}
		return root_[i1]->values[i2];
	}

	void set(Number k, void* v) {
		const Number i1 = k >> LEAF_BITS;
		const Number i2 = k & (LEAF_LENGTH - 1);
		ASSERT(i1 < ROOT_LENGTH);
		root_[i1]->values[i2] = v;
	}

	bool Ensure(Number start, size_t n) {
		for (Number key = start; key <= start + n - 1;) {
			const Number i1 = key >> LEAF_BITS;

			// Check for overflow
			if (i1 >= ROOT_LENGTH)
				return false;

			// Make 2nd level node if necessary
			if (root_[i1] == NULL) {
				//Leaf* leaf = reinterpret_cast<Leaf*>((*allocator_)(sizeof(Leaf)));
				//if (leaf == NULL) return false;
				static ObjectPool<Leaf>	leafPool;
				Leaf* leaf = (Leaf*)leafPool.New();

				memset(leaf, 0, sizeof(*leaf));
				root_[i1] = leaf;
			}

			// Advance key past whatever is covered by this leaf node
			key = ((key >> LEAF_BITS) + 1) << LEAF_BITS;
		}
		return true;
	}

	void PreallocateMoreMemory() {
		// Allocate enough to keep track of all possible pages
		Ensure(0, 1 << BITS);
	}
};

2.不加锁的原理

  • 使用unordered_map来做哈希映射的时候,多线程环境下如果不加锁,在对unordered_map进行写入的时候,红黑树会发生旋转,树的结构变了,节点之间的指针在变化,其他线程读取的时候可能会发生错误,因此使用unordered_map的时候一定要加锁;
  • 使用基数树做哈希映射的时候,基数树写之前都是开好了空间的,写的时候不会改变结构;
  • 读写是分离的,写入_idSpanMap的函数都是在PageCache层的NewSpan函数和ReleaseSpanToPageCache函数,这两个函数在调用的时候都是加了全局锁的,其他线程不可能访问到_idSpanMap;

因此用MapObjectToSpan函数访问_idSpanMap的时候就不需要加锁了

Span* PageCache::MapObjectToSpan(void* obj) {
	PAGE_ID id = ((PAGE_ID)obj >> PAGE_SHIFT);

	//使用基数树改进后不需要加锁
	auto ret = (Span*)_idSpanMap.get(id);
	assert(ret != nullptr);
	return ret;
}

3.性能对比

性能有所提升;
在这里插入图片描述

三、最终代码实现

Common.h

#pragma once
//公共头文件

#include <iostream>
#include <vector>
#include <assert.h>
#include <thread>
#include <mutex>
#include <algorithm>
#include <unordered_map>
using std::cout;
using std::endl;
using std::vector;

static const size_t MAX_BYTES = 256 * 1024; //ThreadCache能分配对象的最大字节数
static const size_t NFREELIST = 208; //central cache 最大的哈希桶数量
static const size_t NPAGES = 129; //page cache 哈希桶的数量
static const size_t PAGE_SHIFT = 13; //页与字节的转换

#ifdef _WIN32
	#include<windows.h>
#else
//linux
#endif

#ifdef _WIN64
	typedef unsigned long long PAGE_ID;
#elif _WIN32
	typedef size_t PAGE_ID;
#elif
	//linux

#endif


//直接去堆上申请空间
inline static void* SystemAlloc(size_t kpage) {
#ifdef _WIN32
	void* ptr = VirtualAlloc(0, kpage << 13, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
#else

#endif

	if (ptr == nullptr) {
		throw std::bad_alloc();
	}

	return ptr;
}

//直接释放空间给堆
inline static void SystemFree(void* ptr) {
#ifdef _WIN32
	VirtualFree(ptr, 0, MEM_RELEASE);
#else

#endif
}


// 访问obj的前4 / 8字节地址空间
static void*& NextObj(void* obj) {
	return *(void**)obj;
}

//自由链表类,用于管理切分好的小内存块
class FreeList {
public:
	void Push(void* obj) {
		assert(obj);
		//头插
		NextObj(obj) = _freeList;
		_freeList = obj;
		_size++;
	}

	//范围插入
	void PushRange(void* start, void* end, size_t size) {
		assert(start);
		assert(end);

		NextObj(end) = _freeList;
		_freeList = start;
		_size += size;
	}

	void* Pop() {
		assert(_freeList);

		//头删
		void* obj = _freeList;
		_freeList = NextObj(obj);
		_size--;
		return obj;
	}

	//批量取走对象
	void PopRange(void*& start, void* end, size_t size) {
		assert(_size >= size); /// ????????  _size >= size
		start = _freeList;
		end = start;

		for (size_t i = 0; i < size - 1; i++) {
			end = NextObj(end);
		}

		_freeList = NextObj(end);
		NextObj(end) = nullptr;
		_size -= size;
	}

	bool Empty() {
		return _freeList == nullptr;
	}

	//用于实现thread cache从central cache获取内存的慢开始算法
	size_t& MaxSize() {
		return _maxSize;
	}

	size_t Size() {
		return _size;
	}
private:
	void* _freeList = nullptr;
	size_t _maxSize = 1;
	size_t _size = 0;
};


// 管理对齐和哈希映射规则的类
class SizeClass {
public:
	//对齐规则
	// 整体控制在最多10%左右的内碎片浪费
	// [1,128]				8byte对齐			freelist[0,16)
	// [128+1,1024]			16byte对齐			freelist[16,72)
	// [1024+1,8*1024]		128byte对齐			freelist[72,128)
	// [8*1024+1,64*1024]	1024byte对齐			freelist[128,184)
	// [64*1024+1,256*1024] 8*1024byte对齐		freelist[184,208)

	//RoundUp的子函数,根据对象大小和对齐数,返回对象对齐后的大小
	static inline size_t _RoundUp(size_t size, size_t align) {
		//if (size % align == 0) {
		//	return size;
		//}
		//else {
		//	return (size / align + 1) * align;
		//}

		//使用位运算能够得到一样的结果,但是位运算的效率很高
		return ((size + align - 1) & ~(align - 1));
	}

	//计算当前对象size字节对齐之后对应的size
	static inline size_t RoundUp(size_t size) {
		if (size <= 128) {
			//8字节对齐
			return _RoundUp(size, 8);
		}
		else if (size <= 1024) {
			//16字节对齐
			return _RoundUp(size, 16);
		}
		else if (size <= 8 * 1024) {
			//128字节对齐
			return _RoundUp(size, 128);
		}
		else if (size <= 64 * 1024) {
			//1024字节对齐
			return _RoundUp(size, 1024);
		}
		else if (size <= 256 * 1024) {
			//8KB字节对齐
			return _RoundUp(size, 8 * 1024);
		}
		else {
			//大于256KB就以页为单位对齐
			return _RoundUp(size, 1 << PAGE_SHIFT);
		}
		return -1;
	}

	//Index的子函数,用于计算映射的哈希桶下标
	static inline size_t _Index(size_t size, size_t alignShift) {
		//if (size % align == 0) {
		//	return size / align - 1;
		//}
		//else {
		//	return size / align;
		//}

		//使用位运算能够得到一样的结果,但是位运算的效率很高
		//使用位运算需要将输入参数由对齐数改为对齐数是2的几次幂、

		return ((size + (1 << alignShift) - 1) >> alignShift) - 1;
	}

	//计算对象size映射到哪一个哈希桶(freelist)
	static inline size_t Index(size_t size) {
		assert(size <= MAX_BYTES);

		//每个区间有多少个哈希桶
		static int groupArray[4] = { 16, 56, 56, 56 };
		if (size <= 128) {
			return _Index(size, 3);
		}
		else if (size <= 1024) {
			//由于前128字节不是16字节对齐,因此需要减去该部分,单独计算16字节对齐的下标
			//再在最终结果加上全部的8字节对齐哈希桶个数
			return _Index(size - 128, 4) + groupArray[0];
		}
		else if (size <= 8 * 1024) {
			return _Index(size - 1024, 7) + groupArray[0] + groupArray[1];
		}
		else if (size <= 64 * 1024) {
			return _Index(size - 8 * 1024, 10) + groupArray[0] + groupArray[1] + groupArray[2];
		}
		else if (size <= 256 * 1024) {
			return _Index(size - 64 * 1024, 13) + groupArray[0] + groupArray[1] + groupArray[2] + groupArray[3];
		}
		else {
			assert(false);
		}
		return -1;
	}

	//thread cache一次从central cache中获取多少内存块
	static size_t NumMoveSize(size_t size) {
		//一次获取的内存块由对象的大小来决定
		assert(size > 0);

		//将获取的数量控制在[2, 512]
		size_t num = MAX_BYTES / size;
		if (num < 2) {
			num = 2;
		}
		
		if (num > 512) {
			num = 512;
		}

		return num;
	}

	//计算central cache一次向page cache获取多少页的span
	static size_t NumMovePage(size_t size) {
		assert(size > 0);
		//先计算该对象一次申请内存块的上限值
		size_t num = NumMoveSize(size);
		//计算上限的空间大小
		size_t npage = num * size;
		//转换成page单位
		npage >>= PAGE_SHIFT;

		if (npage == 0) {
			npage = 1;
		}
		return npage;
	}
};

struct Span
{
	PAGE_ID _pageID = 0; // 大块内存起始页的页号
	size_t _n = 0; // 页的数量
	Span* _next = nullptr; // 双向链表的结构
	Span* _prev = nullptr;
	size_t _objSize = 0; // 切好的小对象的大小
	size_t _useCount = 0; // 切好小块内存,被分配给thread cache的计数
	void* _freeList = nullptr; // 切好的小块内存的自由链表
	bool _isUse = false; // 是否在被使用
};

class SpanList {
public:
	SpanList() {
		_head = new Span;
		_head->_next = _head;
		_head->_prev = _head;
	}

	void Insert(Span* pos, Span* newSapn) {
		assert(pos);
		assert(newSapn);

		Span* prev = pos->_prev;
		prev->_next = newSapn;
		newSapn->_prev = prev;
		newSapn->_next = pos;
		pos->_prev = newSapn;
	}

	void Erase(Span* pos) {
		assert(pos);
		assert(pos != _head);

		//不用释放空间
		Span* prev = pos->_prev;
		Span* next = pos->_next;
		prev->_next = next;
		next->_prev = prev;
	}

	Span* Begin() {
		return _head->_next;
	}

	Span* End() {
		return _head;
	}

	bool Empty() {
		return _head->_next == _head;
	}

	void PushFront(Span* newSapn) {
		Insert(Begin(), newSapn);
	}

	Span* PopFront() {
		Span* front = _head->_next;
		Erase(front);
		return front;
	}

private:
	Span* _head;		//头节点
public:
	std::mutex _mtx;	//桶锁
};

ThreadCache.h

#pragma once
#include "Common.h"

class ThreadCache {
public:
	//申请和释放对象内存
	void* Allocate(size_t size);
	void Deallocate(void* obj, size_t size);

	//从中心缓存获取对象
	void* FetchFromCentralCache(size_t index, size_t alignSize);

	//自由链表过长时,回收一段链表到中心缓存
	void ListTooLong(FreeList& list, size_t size);

private:
	FreeList _freeLists[NFREELIST];
};

//声明_declspec(thread)后,会为每一个线程创建一个单独的拷贝
//使用_declspec(thread)声明了ThreadCache*指针变量,则该指针在该线程中会创建一份单独的拷贝
//pTLSThreadCache指向的对象在本线程内是能够全局访问的,但是无法被其他线程访问到,这就做到了多线程情景下的无锁访问
static _declspec(thread) ThreadCache* pTLSThreadCache = nullptr;

ThreadCache.cpp

#define _CRT_SECURE_NO_WARNINGS 1
#include "ThreadCache.h"
#include "CentralCache.h"

void* ThreadCache::Allocate(size_t size) {
	assert(size <= MAX_BYTES);
	//获取对齐后的大小及对应的哈希桶下标
	size_t alignSize = SizeClass::RoundUp(size);
	size_t index = SizeClass::Index(size);

	if (!_freeLists[index].Empty()) {
		//若对应的freeList桶不为空,直接pop一个内存块给该对象
		return _freeLists[index].Pop();
	}
	else {
		//否则需要从CentralCache获取内存空间
		return ThreadCache::FetchFromCentralCache(index, alignSize);
	}
}

void ThreadCache::Deallocate(void* obj, size_t size) {
	assert(obj);
	assert(size <= MAX_BYTES);

	//找该对象对应的freeList的桶,直接插入
	size_t index = SizeClass::Index(size);
	_freeLists[index].Push(obj);
	
	//当链表的长度大于一次批量申请的内存块的数量时,就归还一段list给central cache
	if (_freeLists[index].Size() >= _freeLists[index].MaxSize()) {
		ListTooLong(_freeLists[index], size);
	}
}

void ThreadCache::ListTooLong(FreeList& list, size_t size) {
	void* start = nullptr;
	void* end = nullptr;
	//从list中取出MaxSize长度的链表
	list.PopRange(start, end, list.MaxSize());

	//归还给CentralCache的对应span
	CentralCache::GetInstance()->ReleaseListToSpans(start, size);
}


void* ThreadCache::FetchFromCentralCache(size_t index, size_t alignSize) {
	//慢开始算法
	//计算当前从Central Cache中获取内存块的最大数量
	size_t batchNum = min(_freeLists[index].MaxSize(), SizeClass::NumMoveSize(alignSize));
	//如果MaxSize未达上限,就将MaxSize + 1
	if (batchNum == _freeLists[index].MaxSize()) {
		_freeLists[index].MaxSize() += 1;
	}
	void* start = nullptr;
	void* end = nullptr;
	size_t actualNum = CentralCache::GetInstance()->FetchRangeObj(start, end, batchNum, alignSize);
	assert(actualNum >= 1);

	if (actualNum == 1) {
		//如果最终获取的数量为1,直接返回给对象
		assert(start == end);
		return start;
	}
	else {
		//如果最终获取的数量多于一个,则返回第一个给对象,剩下的插入freeList里
		_freeLists[index].PushRange(NextObj(start), end, actualNum - 1); // 批量插入
		//NextObj(start) = nullptr;
		return start;
	}
}

CentralCache.h

#pragma once
#include "Common.h"
#include "PageCache.h"


//饿汉单例模式
class CentralCache {
public:
	static CentralCache* GetInstance() {
		return &_sInstance;
	}

	//从CentralCache获取一定数量的内存对象给ThreadCache
	size_t FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size);

	//获取一个非空的Sapn
	Span* GetOneSpan(SpanList& spanList, size_t size);

	//归还一段list到对应的span
	void ReleaseListToSpans(void* start, size_t byte_size);

private:
	SpanList _spanLists[NFREELIST];

	//构造函数私有化
	CentralCache() 
	{}
	//不生成默认拷贝构造
	CentralCache(const CentralCache&) = delete;

	static CentralCache _sInstance;
};

CentralCache.cpp

#define _CRT_SECURE_NO_WARNINGS 1
#include "CentralCache.h"

//单例模式静态成员的定义
CentralCache CentralCache::_sInstance;

//从CentralCache获取一定数量的内存对象给ThreadCache
size_t CentralCache::FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size) {
	//先根据对象size获取对应的spanList下标
	size_t index = SizeClass::Index(size);
	//每个线程访问spanList时需要加锁
	_spanLists[index]._mtx.lock();

	//获取非空的span
	Span* span = GetOneSpan(_spanLists[index], size);
	assert(span);
	assert(span->_freeList);

	//从span中获取batchNum个对象,若不够,就有多少拿多少
	start = span->_freeList;
	end = start;
	size_t i = 0;
	size_t actualNum = 1; // 实际拿到的对象数量
	while (i < batchNum - 1 && NextObj(end) != nullptr) {
		end = NextObj(end);
		actualNum++;
		i++;
	}
	//在span中去掉这一段对象
	span->_freeList = NextObj(end);
	NextObj(end) = nullptr;
	//更新span->_useCount参数
	span->_useCount += actualNum;

	_spanLists[index]._mtx.unlock();

	return actualNum;
}

Span* CentralCache::GetOneSpan(SpanList& spanList, size_t size) {
	//先检查该SpanList有没有未分配的Span
	Span* it = spanList.Begin();
	while (it != spanList.End()) {
		if (it->_freeList != nullptr) {
			return it;
		}
		else {
			it = it->_next; 
		}
	}

	//先把central cache 的桶锁解掉,这样如果其他线程释放对象回来,就不会被阻塞
	spanList._mtx.unlock();

	//SpanList中没有空闲的Span,需要向page cache申请
	//在此处加上page cache的全局锁,NewSpan的所有操作都是加锁进行的
	PageCache::GetInstance()->_pageMtx.lock();
	Span* span = PageCache::GetInstance()->NewSpan(SizeClass::NumMovePage(size));

	//更新_isUse属性
	span->_isUse = true;
	//存储该span切分对象的size
	span->_objSize = size;
	PageCache::GetInstance()->_pageMtx.unlock();

	//从page cache获取到了新的span,需要进行切分
	//无需在此加上桶锁,因为该span还没有放到spanList中,其他线程访问不到
	
	//计算span大块内存的起始地址和大块内存的大小(字节数)
	char* start = (char*)(span->_pageID << PAGE_SHIFT);
	size_t bytes = span->_n << PAGE_SHIFT;
	char* end = start + bytes;

	//把大块内存切成自由链表链接起来
	//先切一块下来做头,方便尾插
	span->_freeList = start;
	start += size;
	void* tail = span->_freeList;

	while (start < end) {
		NextObj(tail) = start;
		tail = NextObj(tail);
		start += size;
	}

	NextObj(tail) = nullptr;

	//在span挂载到spanList之前加上桶锁
	spanList._mtx.lock();
	spanList.PushFront(span);
	return span;
}

void CentralCache::ReleaseListToSpans(void* start, size_t byte_size) {
	size_t index = SizeClass::Index(byte_size);
	_spanLists[index]._mtx.lock();

	//该段list的尾部指针已经置空,遍历到空指针就停止
	while (start) {
		//将内存块对象挂载到对应的span上
		void* next = NextObj(start);
		//获取该对象对应的span
		Span* span = PageCache::GetInstance()->MapObjectToSpan(start);
		NextObj(start) = span->_freeList;
		span->_freeList = start;
		//更新_useCount
		span->_useCount--;

		//说明该span的小块内存都回收了
		//这个span就可以回收给page cache,由page cache去做前后页的合并
		if (span->_useCount == 0) {
			_spanLists[index].Erase(span);
			span->_prev = nullptr;
			span->_next = nullptr;
			span->_freeList = nullptr;

			//释放span给page cache的时候,使用page cache的锁就可以了
			//将桶锁先解除,方便其他线程在该桶上申请和释放内存
			_spanLists[index]._mtx.unlock();

			PageCache::GetInstance()->_pageMtx.lock();
			PageCache::GetInstance()->ReleaseSpanToPageCache(span);
			PageCache::GetInstance()->_pageMtx.unlock();

			_spanLists[index]._mtx.lock();
		}
		start = next;
	}

	_spanLists[index]._mtx.unlock();
}

PageCache.h

#pragma once
#include "Common.h"
#include "ObjectPool.h"
#include "PageMap.h"

//单例模式
class PageCache {
public:
	static PageCache* GetInstance() {
		return &_sInstance;
	}

	std::mutex _pageMtx; //全局锁

	//获取一个k页的Span
	Span* NewSpan(size_t k);

	//获取对象到span的映射
	Span* MapObjectToSpan(void* obj);

	//释放空闲span回到page cache,并合并相邻的span
	void ReleaseSpanToPageCache(Span* span);

private:
	SpanList _spanLists[NPAGES];
	// 用于存储页号到Span的映射关系
	//std::unordered_map<PAGE_ID, Span*> _idSpanMap;

	//用基数树代替unordered_map
	TCMalloc_PageMap1<32 - PAGE_SHIFT> _idSpanMap;

	//定长内存池用于代替new
	ObjectPool<Span> _spanPool;

	PageCache() {}

	PageCache(const PageCache&) = delete;

	static PageCache _sInstance;
};

PageCache.cpp

#define _CRT_SECURE_NO_WARNINGS 1
#include "PageCache.h"

PageCache PageCache::_sInstance;

Span* PageCache::NewSpan(size_t k) {
	assert(k > 0);
	//大于128页的span直接从堆申请
	if (k > NPAGES - 1) {
		void* ptr = SystemAlloc(k);
		//Span* span = new Span;
		Span* span = _spanPool.New();
		span->_n = k;
		span->_pageID = (PAGE_ID)ptr >> PAGE_SHIFT;

		//存储页号与span的映射关系
		//_idSpanMap[span->_pageID] = span;
		_idSpanMap.set(span->_pageID, span);

		return span;
	}

	//先检查第k个桶里面有没有span
	if (!_spanLists[k].Empty()) {
		有就返回
		Span* kSpan = _spanLists[k].PopFront();

		// 建立id和span的映射,方便central cache回收小块内存时,查找对应的span
		for (PAGE_ID i = 0; i < kSpan->_n; ++i)
		{
			//_idSpanMap[kSpan->_pageID + i] = kSpan;
			_idSpanMap.set(kSpan->_pageID + i, kSpan);
		}

		return kSpan;
	}

	//没有就需要检查后面的桶有没有更大的span,如果有可以拆分
	for (size_t i = k + 1; i < NPAGES; i++) {
		if (!_spanLists[i].Empty()) {
			Span* nspan = _spanLists[i].PopFront();
			//Span* kspan = new Span;
			Span* kspan = _spanPool.New();

			//在nspan头部且下一个k页的span
			//kspan返回
			//nspan剩下的部分挂载到相应的桶上
			kspan->_pageID = nspan->_pageID;
			kspan->_n = k;
			nspan->_pageID += k;
			nspan->_n -= k;

			_spanLists[nspan->_n].PushFront(nspan);

			//存储nspan的首尾页号与Span的关系,方便page cache回收内存时进行合并查找
			//_idSpanMap[nspan->_pageID] = nspan;
			//_idSpanMap[nspan->_pageID + nspan->_n - 1] = nspan;

			_idSpanMap.set(nspan->_pageID, nspan);
			_idSpanMap.set(nspan->_pageID + nspan->_n - 1, nspan);

			//存储kspan每一页的页号与span的映射,方便central cache回收小块内存时,查找对应的span
			for (PAGE_ID i = 0; i < kspan->_n; i++) {
				//_idSpanMap[kspan->_pageID + i] = kspan;
				_idSpanMap.set(kspan->_pageID + i, kspan);
			}

			return kspan;
		}
	}

	//走到这里说明没有更大的span了,需要向堆申请一个128页的大块内存
	//Span* bigSpan = new Span;
	Span* bigSpan = _spanPool.New();
	void* ptr = SystemAlloc(NPAGES - 1);
	bigSpan->_pageID = (PAGE_ID)ptr >> PAGE_SHIFT;
	bigSpan->_n = NPAGES - 1;

	_spanLists[bigSpan->_n].PushFront(bigSpan);
	//此时需要将_spanLists中的128页的内存切分,递归调用一下
	return NewSpan(k);
}


Span* PageCache::MapObjectToSpan(void* obj) {
	PAGE_ID id = ((PAGE_ID)obj >> PAGE_SHIFT);

	//加入RAII锁,出了函数作用域自动解锁
	//std::unique_lock<std::mutex> lock(_pageMtx);

	//auto ret = _idSpanMap.find(id);
	//if (ret != _idSpanMap.end()) {
	//	return ret->second;
	//}
	//else {
	//	//应该是一定能够获取到的
	//	//如果获取不到就是出现了问题
	//	assert(false);
	//	return nullptr;
	//}

	//使用基数树改进后不需要加锁
	auto ret = (Span*)_idSpanMap.get(id);
	assert(ret != nullptr);
	return ret;
}

void PageCache::ReleaseSpanToPageCache(Span* span) {
	//大于128页的span,直接归还给堆
	if (span->_n > NPAGES - 1) {
		void* ptr = (void*)(span->_pageID << PAGE_SHIFT);
		SystemFree(ptr);
		//delete span;
		_spanPool.Delete(span);
		return;
	}

	//对span前后的相邻页进行合并,缓解内存碎片的问题
	//向前合并
	while (1) {
		//前一页的id
		PAGE_ID prevId = span->_pageID - 1;
		//从map中寻找页号与span的映射
		//auto ret = _idSpanMap.find(prevId);

		前面的页号没有,不合并
		//if (ret == _idSpanMap.end()) {
		//	break;
		//}

		auto ret = (Span*)_idSpanMap.get(prevId);
		if (ret == nullptr) {
			break;
		}

		//前面相邻页的span在使用,不合并
		//Span* prevSpan = ret->second;
		Span* prevSpan = ret;
		if (prevSpan->_isUse == true) {
			break;
		}

		//合并超出128页的span没法管理,不合并
		if (prevSpan->_n + span->_n > NPAGES - 1) {
			break;
		}

		//合并前面的span
		span->_pageID = prevSpan->_pageID;

		span->_n += prevSpan->_n;
		_spanLists[prevSpan->_n].Erase(prevSpan);
		//delete prevSpan;
		_spanPool.Delete(prevSpan);
	}

	//向后合并
	while (1) {
		PAGE_ID nextId = span->_pageID + span->_n;
		//auto ret = _idSpanMap.find(nextId);

		//if (ret == _idSpanMap.end()) {
		//	break;
		//}

		auto ret = (Span*)_idSpanMap.get(nextId);
		if (ret == nullptr) {
			break;
		}

		//Span* nextSpan = ret->second;
		Span* nextSpan = ret;
		if (nextSpan->_isUse == true) {
			break;
		}

		if (nextSpan->_n + span->_n > NPAGES - 1) {
			break;
		}

		span->_n += nextSpan->_n;

		_spanLists[nextSpan->_n].Erase(nextSpan);
		//delete nextSpan;
		_spanPool.Delete(nextSpan);
	}

	//将合并好的span挂载到对应的哈希桶,更新isUse
	_spanLists[span->_n].PushFront(span);

	span->_isUse = false;
	//_idSpanMap[span->_pageID] = span;
	//_idSpanMap[span->_pageID + span->_n - 1] = span;

	_idSpanMap.set(span->_pageID, span);
	_idSpanMap.set(span->_pageID + span->_n - 1, span);
}

ObjectPool.h

#pragma once
#include "Common.h"

template<class T>
class ObjectPool {
public:
	T* New() {
		T* obj = nullptr;
		//若freeList不为空,先分配这里的空间
		if (_freeList) {
			void* next = *((void**)_freeList);
			obj = (T*)_freeList;
			_freeList = next;
		}
		else {
			if (_remainBytes < sizeof(T)) { //当剩余空间不足一个对象时,就需要重新申请空间
											//这其中也包括了首次申请空间
				_remainBytes = 128 * 1024;
				//_memory = (char*)malloc(_remainBytes);
				_memory = (char*)SystemAlloc(_remainBytes >> 13); // 直接在堆上申请空间
				if (_memory == nullptr) {
					throw std::bad_alloc();
				}
			}
			//为新对象分配内存空间
			obj = (T*)_memory;
			//一个内存块的大小不能小于当前系统指针的大小,因为freeList需要存指针
			int objSize = sizeof(T) < sizeof(void*) ? sizeof(void*) : sizeof(T);
			_memory += objSize;
			_remainBytes -= objSize;
		}
		//自定义类型只开辟了空间,并没有初始化,定位new,显式调用类的构造函数
		new(obj)T;
		return obj;
	}

	void Delete(T* obj) {
		//显式调用类的析构函数
		obj->~T();

		//头插
		*((void**)obj) = _freeList;
		_freeList = obj;
	}


private:
	char* _memory = nullptr;
	void* _freeList = nullptr;
	int _remainBytes = 0;
};

ConcurrentAlloc.h

#pragma once
#include "Common.h"
#include "ThreadCache.h"
#include "PageCache.h"

static void* ConcurrentAlloc(size_t size) {
	if (size > MAX_BYTES) {
		//大于256KB的直接向page cache申请
		size_t alignSize = SizeClass::RoundUp(size);
		size_t kpage = alignSize >> PAGE_SHIFT;
		PageCache::GetInstance()->_pageMtx.lock();
		Span* span = PageCache::GetInstance()->NewSpan(kpage);
		span->_isUse = true;
		span->_objSize = size;
		PageCache::GetInstance()->_pageMtx.unlock();

		void* ptr = (void*)(span->_pageID << PAGE_SHIFT);
		return ptr;
	}
	else {
		if (pTLSThreadCache == nullptr) {
			//如果pTLSThreadCache指针是空的,就构造一个ThreadCache对象,并指向它
			//则这个ThreadCache对象就是本线程专属的ThreadCache对象
			static ObjectPool<ThreadCache> tcPool;
			//pTLSThreadCache = new ThreadCache;
			pTLSThreadCache = tcPool.New();
		}

		//cout << std::this_thread::get_id() << " : " << pTLSThreadCache << endl;

		//使用pTLSThreadCache访问本线程专属的ThreadCache对象来开辟空间
		return pTLSThreadCache->Allocate(size);
	}
}

static void ConcurrentFree(void* obj) {
	Span* span = PageCache::GetInstance()->MapObjectToSpan(obj);
	size_t size = span->_objSize;
	if (size > MAX_BYTES) {
		//大于256KB的内存直接释放给page cache

		PageCache::GetInstance()->_pageMtx.lock();
		PageCache::GetInstance()->ReleaseSpanToPageCache(span);
		PageCache::GetInstance()->_pageMtx.unlock();
	}
	else {
		assert(pTLSThreadCache);

		pTLSThreadCache->Deallocate(obj, size);
	}
}

PageMap.h

#pragma once

//基数树

#include"Common.h"

// Single-level array
//直接定值法映射
template <int BITS>
class TCMalloc_PageMap1 {
private:
	static const int LENGTH = 1 << BITS;
	void** array_;

public:
	typedef uintptr_t Number;

	//explicit TCMalloc_PageMap1(void* (*allocator)(size_t)) {
	explicit TCMalloc_PageMap1() {
		//array_ = reinterpret_cast<void**>((*allocator)(sizeof(void*) << BITS));
		size_t size = sizeof(void*) << BITS;
		size_t alignSize = SizeClass::_RoundUp(size, 1 << PAGE_SHIFT);
		array_ = (void**)SystemAlloc(alignSize >> PAGE_SHIFT);
		memset(array_, 0, sizeof(void*) << BITS);
	}

	// Return the current value for KEY.  Returns NULL if not yet set,
	// or if k is out of range.
	void* get(Number k) const {
		if ((k >> BITS) > 0) {
			return NULL;
		}
		return array_[k];
	}

	// REQUIRES "k" is in range "[0,2^BITS-1]".
	// REQUIRES "k" has been ensured before.
	//
	// Sets the value 'v' for key 'k'.
	void set(Number k, void* v) {
		array_[k] = v;
	}
};

// Two-level radix tree
template <int BITS>
class TCMalloc_PageMap2 {
private:
	// Put 32 entries in the root and (2^BITS)/32 entries in each leaf.
	static const int ROOT_BITS = 5;
	static const int ROOT_LENGTH = 1 << ROOT_BITS;

	static const int LEAF_BITS = BITS - ROOT_BITS;
	static const int LEAF_LENGTH = 1 << LEAF_BITS;

	// Leaf node
	struct Leaf {
		void* values[LEAF_LENGTH];
	};

	Leaf* root_[ROOT_LENGTH];             // Pointers to 32 child nodes
	void* (*allocator_)(size_t);          // Memory allocator

public:
	typedef uintptr_t Number;

	//explicit TCMalloc_PageMap2(void* (*allocator)(size_t)) {
	explicit TCMalloc_PageMap2() {
		//allocator_ = allocator;
		memset(root_, 0, sizeof(root_));

		PreallocateMoreMemory();
	}

	void* get(Number k) const {
		const Number i1 = k >> LEAF_BITS;
		const Number i2 = k & (LEAF_LENGTH - 1);
		if ((k >> BITS) > 0 || root_[i1] == NULL) {
			return NULL;
		}
		return root_[i1]->values[i2];
	}

	void set(Number k, void* v) {
		const Number i1 = k >> LEAF_BITS;
		const Number i2 = k & (LEAF_LENGTH - 1);
		ASSERT(i1 < ROOT_LENGTH);
		root_[i1]->values[i2] = v;
	}

	bool Ensure(Number start, size_t n) {
		for (Number key = start; key <= start + n - 1;) {
			const Number i1 = key >> LEAF_BITS;

			// Check for overflow
			if (i1 >= ROOT_LENGTH)
				return false;

			// Make 2nd level node if necessary
			if (root_[i1] == NULL) {
				//Leaf* leaf = reinterpret_cast<Leaf*>((*allocator_)(sizeof(Leaf)));
				//if (leaf == NULL) return false;
				static ObjectPool<Leaf>	leafPool;
				Leaf* leaf = (Leaf*)leafPool.New();

				memset(leaf, 0, sizeof(*leaf));
				root_[i1] = leaf;
			}

			// Advance key past whatever is covered by this leaf node
			key = ((key >> LEAF_BITS) + 1) << LEAF_BITS;
		}
		return true;
	}

	void PreallocateMoreMemory() {
		// Allocate enough to keep track of all possible pages
		Ensure(0, 1 << BITS);
	}
};

// Three-level radix tree
template <int BITS>
class TCMalloc_PageMap3 {
private:
	// How many bits should we consume at each interior level
	static const int INTERIOR_BITS = (BITS + 2) / 3; // Round-up
	static const int INTERIOR_LENGTH = 1 << INTERIOR_BITS;

	// How many bits should we consume at leaf level
	static const int LEAF_BITS = BITS - 2 * INTERIOR_BITS;
	static const int LEAF_LENGTH = 1 << LEAF_BITS;

	// Interior node
	struct Node {
		Node* ptrs[INTERIOR_LENGTH];
	};

	// Leaf node
	struct Leaf {
		void* values[LEAF_LENGTH];
	};

	Node* root_;                          // Root of radix tree
	void* (*allocator_)(size_t);          // Memory allocator

	Node* NewNode() {
		Node* result = reinterpret_cast<Node*>((*allocator_)(sizeof(Node)));
		if (result != NULL) {
			memset(result, 0, sizeof(*result));
		}
		return result;
	}

public:
	typedef uintptr_t Number;

	explicit TCMalloc_PageMap3(void* (*allocator)(size_t)) {
		allocator_ = allocator;
		root_ = NewNode();
	}

	void* get(Number k) const {
		const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS);
		const Number i2 = (k >> LEAF_BITS) & (INTERIOR_LENGTH - 1);
		const Number i3 = k & (LEAF_LENGTH - 1);
		if ((k >> BITS) > 0 ||
			root_->ptrs[i1] == NULL || root_->ptrs[i1]->ptrs[i2] == NULL) {
			return NULL;
		}
		return reinterpret_cast<Leaf*>(root_->ptrs[i1]->ptrs[i2])->values[i3];
	}

	void set(Number k, void* v) {
		ASSERT(k >> BITS == 0);
		const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS);
		const Number i2 = (k >> LEAF_BITS) & (INTERIOR_LENGTH - 1);
		const Number i3 = k & (LEAF_LENGTH - 1);
		reinterpret_cast<Leaf*>(root_->ptrs[i1]->ptrs[i2])->values[i3] = v;
	}

	bool Ensure(Number start, size_t n) {
		for (Number key = start; key <= start + n - 1;) {
			const Number i1 = key >> (LEAF_BITS + INTERIOR_BITS);
			const Number i2 = (key >> LEAF_BITS) & (INTERIOR_LENGTH - 1);

			// Check for overflow
			if (i1 >= INTERIOR_LENGTH || i2 >= INTERIOR_LENGTH)
				return false;

			// Make 2nd level node if necessary
			if (root_->ptrs[i1] == NULL) {
				Node* n = NewNode();
				if (n == NULL) return false;
				root_->ptrs[i1] = n;
			}

			// Make leaf node if necessary
			if (root_->ptrs[i1]->ptrs[i2] == NULL) {
				Leaf* leaf = reinterpret_cast<Leaf*>((*allocator_)(sizeof(Leaf)));
				if (leaf == NULL) return false;
				memset(leaf, 0, sizeof(*leaf));
				root_->ptrs[i1]->ptrs[i2] = reinterpret_cast<Node*>(leaf);
			}

			// Advance key past whatever is covered by this leaf node
			key = ((key >> LEAF_BITS) + 1) << LEAF_BITS;
		}
		return true;
	}

	void PreallocateMoreMemory() {
	}
};

Benchmark.cpp

#define _CRT_SECURE_NO_WARNINGS 1
#include "Common.h"
#include "ConcurrentAlloc.h"

//ntimes:一轮申请和释放内存的次数
//rounds:轮次
//统计申请内存和释放内存各花费多少时间

void BenchmarkMalloc(size_t ntimes, size_t nworks, size_t rounds)
{
	std::vector<std::thread> vthread(nworks);
	std::atomic<size_t> malloc_costtime = 0;
	std::atomic<size_t> free_costtime = 0;
	for (size_t k = 0; k < nworks; ++k)
	{
		vthread[k] = std::thread([&, k]() {
			std::vector<void*> v;
			v.reserve(ntimes);
			for (size_t j = 0; j < rounds; ++j)
			{
				size_t begin1 = clock();
				for (size_t i = 0; i < ntimes; i++)
				{
					//v.push_back(malloc(16));
					v.push_back(malloc((16 + i) % 8192 + 1));
				}
				size_t end1 = clock();
				size_t begin2 = clock();
				for (size_t i = 0; i < ntimes; i++)
				{
					free(v[i]);
				}
				size_t end2 = clock();
				v.clear();
				malloc_costtime += (end1 - begin1);
				free_costtime += (end2 - begin2);
			}
			});
	}
	for (auto& t : vthread)
	{
		t.join();
	}
	//printf("%u个线程并发执行%u轮次,每轮次malloc %u次: 花费:%d ms\n",
	//	nworks, rounds, ntimes, malloc_costtime);
	//printf("%u个线程并发执行%u轮次,每轮次free %u次: 花费:%d ms\n",
	//	nworks, rounds, ntimes, free_costtime);
	cout << nworks << "个线程并发执行" << rounds << "轮次,每轮次malloc"
		<< ntimes << "花费:" << malloc_costtime << "ms" << endl;
	cout << nworks << "个线程并发执行" << rounds << "轮次,每轮次free"
		<< ntimes << "花费:" << free_costtime << "ms" << endl;
	printf("%u个线程并发malloc&free %u次,总计花费:%d ms\n",
		nworks, nworks * rounds * ntimes, malloc_costtime + free_costtime);
}

// 单轮次申请释放次数 线程数 轮次
void BenchmarkConcurrentMalloc(size_t ntimes, size_t nworks, size_t rounds)
{
	std::vector<std::thread> vthread(nworks);
	std::atomic<size_t> malloc_costtime = 0;
	std::atomic<size_t> free_costtime = 0;
	for (size_t k = 0; k < nworks; ++k)
	{
		vthread[k] = std::thread([&]() {
			std::vector<void*> v;
			v.reserve(ntimes);
			for (size_t j = 0; j < rounds; ++j)
			{
				size_t begin1 = clock();
				for (size_t i = 0; i < ntimes; i++)
				{
					//v.push_back(ConcurrentAlloc(16));
					v.push_back(ConcurrentAlloc((16 + i) % 8192 + 1));
				}
				size_t end1 = clock();
				size_t begin2 = clock();
				for (size_t i = 0; i < ntimes; i++)
				{
					ConcurrentFree(v[i]);
				}
				size_t end2 = clock();
				v.clear();
				malloc_costtime += (end1 - begin1);
				free_costtime += (end2 - begin2);
			}
			});
	}
	for (auto& t : vthread)
	{
		t.join();
	}
	//printf("%u个线程并发执行%u轮次,每轮次concurrent alloc %u次: 花费:%d ms\n",
	//	nworks, rounds, ntimes, malloc_costtime);
	//printf("%u个线程并发执行%u轮次,每轮次concurrent dealloc %u次: 花费:%d ms\n",
	//	nworks, rounds, ntimes, free_costtime);
	cout << nworks << "个线程并发执行" << rounds << "轮次,每轮次concurrent alloc"
		<< ntimes << "花费:" << malloc_costtime << "ms" << endl;
	cout << nworks << "个线程并发执行" << rounds << "轮次,每轮次concurrent dealloc" 
		<< ntimes << "花费:" << free_costtime << "ms" << endl;
	printf("%u个线程并发concurrent alloc&dealloc %u次,总计花费:%d ms\n",
		nworks, nworks * rounds * ntimes, malloc_costtime + free_costtime);
}

int main()
{
	size_t n = 10000;
	cout << "==========================================================" <<
		endl;
	BenchmarkConcurrentMalloc(n, 4, 10);
	cout << endl << endl;
	BenchmarkMalloc(n, 4, 10);
	cout << "==========================================================" <<
		endl;
	return 0;
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/399750.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

015—pandas 标记按月连续变化趋势

前言 在业务数据分析中&#xff0c;特别是和时间相关的数据&#xff0c;会经常要判断数据的变化情况&#xff0c;比如是否是增长还是降低&#xff0c;或是持平。 需求 以数据中最后的月份为基础&#xff0c;来看它最近的数据变化&#xff0c;并将变化情况标记在本行的最后一…

网络基础与通信原理:构建数字世界的框架

目录 初识计算机网络 网络介绍 按照拓扑分类 按地域分类 网络设备 交换机&#xff08;switch&#xff09; 路由器&#xff08;router&#xff09; 传输介质 双绞线 光纤 光纤速度 ISO ISO和OSI有什么关系呢&#xff1f; OSI七层模型 TCP/IP四层 TCP/IP协议族 …

Atmel ATSHA204应用总结

1 ACES软件安装 Atmel Crypto Evaluation Studio (ACES) https://www.microchip.com/DevelopmentTools/ProductDetails/PartNO/Atmel%20Crypto%20%20Studio%20(ACES) 2 基本概念 ACES CE&#xff1a;Atmel Crypto Evalution Studio Configuration Environment&#xff08;基于加…

美国纽约时代广场纳斯达克大屏投放-大舍传媒

美国纽约时代广场纳斯达克大屏投放-大舍传媒 引言 对于大舍传媒来说&#xff0c;能够在美国纽约时代广场纳斯达克大屏投放广告是一个里程碑式的时刻。这不仅仅代表着大舍传媒在全球范围内的知名度与实力&#xff0c;也标志着该公司在国际市场上取得了巨大的进展。纽约时代广场…

Prometheus+TDengine集群实现监控体系高可用

背景 为避免再次出现因Prometheus宕机导致业务无法查看历史数据受到影响&#xff0c;准备将Prometheus架构从单节点方式升级为高可用集群方式并将后端存储由本地存储改为远端分布式时序数据库存储。分布式时序数据库采用国产数据库TDengine。 架构 解释&#xff1a;虚线代表P…

【安卓基础3】Activity(一)

&#x1f3c6;作者简介&#xff1a;|康有为| &#xff0c;大四在读&#xff0c;目前在小米安卓实习&#xff0c;毕业入职 &#x1f3c6;安卓学习资料推荐&#xff1a; 视频&#xff1a;b站搜动脑学院 视频链接 &#xff08;他们的视频后面一部分没再更新&#xff0c;看看前面也…

C#之WPF学习之路(1)

目录 WPF的起源 C的qt和C#的wpf对比 winform 和 wpf有什么区别 安装 Visual Studio2022 创建 HelloWorld 程序 App.xaml与Application类 Application的生命周期 Window窗体的生命周期 WPF的起源 WPF&#xff08;Windows Presentation Foundation&#xff09;是一种用于…

《图解设计模式》笔记(二)交给子类

三、Template Method模式&#xff1a;将具体处理交给子类 示例程序类图 public static void main(String[] args) {// 生成一个持有H的CharDisplay类的实例AbstractDisplay d1 new CharDisplay(H);// 生成一个持有"Hello, world."的StringDisplay类的实例AbstractD…

【HarmonyOS应用开发】三方库(二十)

三方库的基本使用 一、如何获取三方库 目前提供了两种途径获取开源三方库&#xff1a; 通过访问Gitee网站开源社区获取 在Gitee中&#xff0c;搜索OpenHarmony-TPC仓库&#xff0c;在tpc_resource中对三方库进行了资源汇总&#xff0c;可以供开发者参考。 通过OpenHarmony三…

数字世界的探索者:计算机相关专业电影精选推荐

目录 推荐计算机专业必看的几部电影 《黑客帝国》 《社交网络》 《乔布斯传》 《心灵捕手》 《源代码》 《盗梦空间》 《头号玩家》 《我是谁&#xff1a;没有绝对安全的系统》 《战争游戏》(WarGames) 《模仿游戏》(The Imitation Game) 《硅谷》(Silicon Valley) …

SpringBoot+WebSocket实现即时通讯(四)

前言 紧接着上文《SpringBootWebSocket实现即时通讯&#xff08;三&#xff09;》 本博客姊妹篇 SpringBootWebSocket实现即时通讯&#xff08;一&#xff09;SpringBootWebSocket实现即时通讯&#xff08;二&#xff09;SpringBootWebSocket实现即时通讯&#xff08;三&…

如何在Shopee 上选择热销商品?shopee应该在哪选品

在如今激烈竞争的电商市场中&#xff0c;如何通过精准的选品策略提升在Shopee平台上的销售业绩成为卖家们关注的焦点。Shopee作为一个蓬勃发展的电商平台&#xff0c;提供了多种资源和工具来帮助卖家做出明智的选品决策。通过深入了解这些渠道和策略&#xff0c;卖家们可以更好…

第2.4章 StarRocks表设计——分区分桶与副本数

目录 一、数据分布 1.1 概述 1.2 数据分布方式 1.2.1 Round-Robin 1.2.2 Range 1.2.3 List 1.2.4 Hash 1.3 StarRocks的数据分布方式 1.3.1 不分区 Hash分桶 1.3.2 Range分区Hash分桶 三、分区 3.1 分区概述 3.2 创建分区 3.2.1 手动创建分区 3.2.2 批量创建分区…

微服务篇之负载均衡

一、Ribbon负载均衡流程 二、Ribbon负载均衡策略 1. RoundRobinRule&#xff1a;简单轮询服务列表来选择服务器。 2. WeightedResponseTimeRule&#xff1a;按照权重来选择服务器&#xff0c;响应时间越长&#xff0c;权重越小。 3. RandomRule&#xff1a;随机选择一个可用的服…

Java 那些诗一般的 数据类型 (1)

本篇会加入个人的所谓‘鱼式疯言’ ❤️❤️❤️鱼式疯言:❤️❤️❤️此疯言非彼疯言 而是理解过并总结出来通俗易懂的大白话, 小编会尽可能的在每个概念后插入鱼式疯言,帮助大家理解的. &#x1f92d;&#x1f92d;&#x1f92d;可能说的不是那么严谨.但小编初心是能让更多人…

如何实现一个K8S DevicePlugin?

什么是device plugin k8s允许限制容器对资源的使用&#xff0c;比如CPU和内存&#xff0c;并以此作为调度的依据。 当其他非官方支持的设备类型需要参与到k8s的工作流程中时&#xff0c;就需要实现一个device plugin。 Kubernetes提供了一个设备插件框架&#xff0c;你可以用…

一文读懂函数式接口、Lambda表达式、Stream

文章目录 前言版本函数式接口定义特点使用 Lambda表达式主要场景用法无参写法有参写法 Lambda 表达式的基础&#xff1a;函数式接口 类型推断自定义函数接口使用 Lambda 表达式底层实现JDK7JDK8 this 的含义JDK7JDK8 Stream特点操作Stream 流创建中间操作和终端操中间操作无状…

vue.js el-tooltip根据文字长度控制是否提示toolTip

一、需求&#xff1a;如何判断当前文本文字是否超出文本长度&#xff0c;是否需要出现提示toolTip。效果图如下&#xff1a; 二、实现&#xff1a; 1、表格字段鼠标放置el-popover出现 “引用主题” 的具体内容&#xff1b; <!-- 表格字段&#xff1a;引用主题 --> <…

006 矢量数据属性表的使用和关联

1 空间属性关系 1.1 数据导入和图层组合 读取数据 除了使用关键项的连接之外&#xff0c;还有一种根据属性组合中的空间位置关系进行组合的方法。 在本节中&#xff0c;我们将介绍一种方法&#xff0c;用于将空间关系中的多边形数据属性与任意点数据属性相结合。 我们使用的是…

文件上传漏洞--Upload-labs--Pass11--(GET)00绕过

一、环境准备&#xff1a; php版本&#xff1a;推荐 php5.2.17&#xff08;官方推荐版本&#xff09;。小于php5.3.4也可以&#xff0c;但是要在 php.ini 配置文件中手动将 magic_quotes_gpc 的状态改为 Off。 magic_quotes_gpc的作用是对 get请求、post请求、cookie...传入的…