文章目录
- 十九、day19
- 1. 引用计数
- 2. 代码实现
- 2.1 单引用计数器无锁栈
- 2.2 双引用计数器无锁栈
- 3. 本节的一些理解
十九、day19
上一节我们学习通过侯删链表以及风险指针与侯删链表的组合两种方式实现了并发无锁栈,但是这两种方式有以下缺点:
第一种方式:如果 pop
操作频繁被多个线程调用,待删除的节点将不断累积,导致待删除列表无法及时回收,进而占用大量内存。此时我们需要一种机制,每次调用 pop 函数结束后都需要判断侯删链表中的节点是否可以被 delete,即风险指针。
第二种方式:
-
如何正确且高效地分配这块内存是一个难点。
-
为了删除一个节点,需要遍历风险指针数组,检查该节点是否被某个风险指针引用。同时,在处理待删除节点时,也需要从风险数组中选取合适的节点记录其地址,这进一步增加了开销。
-
而且因为该方法受专利保护,所以在一些场合可能无法使用
在这一节中,我们学习如何通过引用计数来实现无锁栈以及安全回收机制。
参考:
恋恋风辰官方博客
《c++并发编程》中无锁栈的实现为什么要用双引用计数器_无锁栈 双重引用技术-CSDN博客
C++实现“无锁”双buffer - 知乎
1. 引用计数
本节的引用计数通过两个计数器实现:一个是存储在节点内部的 内部计数器,另一个是与指针绑定在一起的 外部计数器。外部计数器和指针被封装在一个结构体 counted_node_ptr
中,因此栈顶指针的类型从原生指针(void⭐、int⭐、char⭐、node⭐…)修改为 counted_node_ptr
。
struct counted_node_ptr {
int external_count; // 外部计数器
node* ptr; // 原始指针,指向节点
};
具体机制如下:
- 当线程加载
head
指针时,外部计数器会加 1,表示有新的线程在引用这个节点。 - 当线程不再引用某个节点时,内部计数器会减 1。
通过将 内部计数器 和 外部计数器 相加,就可以得到该节点的实际引用计数。当引用计数归零时,说明没有线程再访问这个节点,此时可以安全地删除它。这种设计确保了节点的引用计数在并发环境下能够准确管理,同时避免了资源泄露的问题。数据结构如下图所示
当线程尝试从栈中弹出数据时,会按照以下步骤操作:
- 读取栈顶指针
线程首先读取head
指针(栈顶指针),将其值存入线程的局部变量local_head
中,同时将head
指针的外部计数器加 1。这个过程是原子的,因此能够确保线程安全。 - 同步计数器
将local_head
的外部计数器更新为与head
的外部计数器相同。随后,线程可以通过local_head
访问节点。 - 判断能否弹出节点
- 如果在
local_head
载入后,发现local_head
的指针值与当前head
不一致,说明有其他线程在此期间修改了head
。此时,当前线程无法弹出节点,因为它载入的local_head
已作废。 - 线程会将节点的内部计数器减 1(因为当前线程不再指向该节点)。如果内部计数器变为 0(说明没有其他线程再引用这个节点),当前线程负责删除节点;否则,什么也不做。
- 如果在
- 成功弹出节点
- 如果
local_head
的指针值仍与head
相同,说明没有其他线程修改head
,当前线程可以成功弹出节点。 - 线程将
head
指针更新为当前节点的下一个节点(即head.ptr->next
),这个更新操作是原子的。
- 如果
- 更新计数器
- 弹出节点后,将
local_head
的外部计数器减 1,因为head
指针已经不再指向这个节点。 - 再次将
local_head
的外部计数器减 1,因为local_head
也准备不再指向这个节点(实际上,可以一次性减 2,这里分开解释是为了说明两个来源)。 - 然后,将
local_head
的外部计数器值加到节点的内部计数器上。
- 弹出节点后,将
- 判断节点是否需要删除
- 如果内部计数器因此变为 0(即它原本是外部计数器的相反数),说明没有线程再引用该节点,当前线程负责删除节点。
- 如果内部计数器不为 0,则无需删除节点。
通过这样的流程,可以确保节点在无人引用时才被安全回收,同时减少不必要的内存操作,提升了效率和线程安全性。
这段文字光看很难看懂,我们通过代码进行解释:
2. 代码实现
2.1 单引用计数器无锁栈
在此之前,我们需要知道:该方式必须通过两个计数器才能实现,如果只用一个,会有一些问题。我们先通过只用一个计数器实现引用计数,进而说明为什么只能使用两个计数器实现。
template<typename T>
class single_ref_stack {
public:
single_ref_stack():head(nullptr) {
}
~single_ref_stack() {
//循环出栈
while (pop());
}
void push(T const& data);
std::shared_ptr<T> pop();
private:
struct ref_node {
//1 数据域智能指针
std::shared_ptr<T> _data;
//2 引用计数
std::atomic<int> _ref_count;
//3 下一个节点
ref_node* _next;
ref_node(T const& data_) : _data(std::make_shared<T>(data_)),
_ref_count(1), _next(nullptr) {}
};
//头部节点
std::atomic<ref_node*> head;
};
上段代码是一个单引用的无锁栈实现,内容很简单不多于解释,我们只需要注意一点:计数器引用计数必须要使用原子类型,为了保证多线程修改下仍是线程安全的。
接下来实现 pop() 和 push():
void push(T const& data) {
auto new_node = new ref_node(data);
new_node->next = head.load();
while (!head.compare_exchange_weak(new_node->next, new_node));
}
push()
函数很简单,就是将数据插入至栈链表的表头,并且调用原子操作 compare_exchange_weak
将 head
修改为 new_node
,即将 head
指针指向新插入的表头节点 new_node
。
std::shared_ptr<T> pop() {
ref_node* old_head = head.load();
for (;;) {
if (!old_head) { // 空栈检查
return std::shared_ptr<T>();
}
//1 只要执行pop就对引用计数+1
++(old_head->_ref_count);
//2 比较head和old_head想等则交换否则说明head已经被其他线程更新
if (head.compare_exchange_strong(old_head, old_head->_next)) {
auto cur_count = old_head->_ref_count.load();
auto new_count;
//3 循环重试保证引用计数安全更新
do {
//4 减去本线程增加的1次和初始的1次
new_count = cur_count - 2;
} while (!old_head->_ref_count.compare_exchange_weak(cur_count, new_count));
//返回头部数据
std::shared_ptr<T> res;
//5 交换数据
res.swap(old_head->_data);
//6
if (old_head->_ref_count == 0) {
delete old_head;
}
return res;
}
else {
//7
if (old_head->_ref_count.fetch_sub(1) == 1) {
delete old_head;
}
}
}
}
该函数仅有两种返回值:空栈返回一个空的智能指针;非空栈返回一个指向弹出数据的智能指针。执行流程如下:
- 进行空栈检查,如果是空栈,直接返回空的
std::shared_ptr<T>()
,否则继续执行 - 因为节点结构体的成员增加了一个原子类型的引用计数,这里增加当前线程对节点(old_head)的引用计数(因为计数器是
std::atomic<int>
类型的整型原子变量,所以++
是原子操作),避免该节点(old_head)在操作过程中被其他线程删除 - 使用 CAS(Compare-And-Swap) 操作尝试将
head
从old_head
更新为old_head->_next
。如果成功说明当前线程成功弹出头节点,否则说明其他线程在此期间修改了head
,需要重新尝试。 - 如果判断成功,则
head
被修改为old_head->_next
。按照上一节的做法,这里需要将old_head
加入至侯删链表,但是在本节中我们需要在这里更新引用计数- 首先,读取
old_head
的引用计数(我们在 pop 函数最开始将其 +1,保证其他线程不会删除它),并将其赋值给cur_count
- 然后将引用计数减去 2(pop 函数一开始加了1,每个节点初始化的时候默认构造函数将引用计数设置为 1,一共增加了两次 1)
- 使用 CAS(Weak) 重试,保证引用计数能被安全更新到
new_count
- 引用计数修改成功之后,将弹出节点 old_head 的数据存储至
std::shared_ptr<T>
类型的智能指针中,并判断old_head
的引用计数是否为 0,如果为 0 则直接将old_head
删除,否则保留 - 最后返回存有
old_head->_data
的智能指针 res
- 首先,读取
- 如果判断失败,说明其他线程在此期间修改了 head。判断失败的线程可以理解为抢占失败的线程,因为 pop 函数的一开始会将栈表头节点的引用计数 +1,所以如果抢占失败,需要在 pop 函数结束前将引用计数 -1。如果引用 -1 之后的值为 1,则表示其他对该节点 pop 的线程已经结束,当前线程是最后引用此指针的线程,我们需要在该线程中将 old_head 删除。
但是该函数存在隐含的风险:
- 风险1:假设线程 1 和线程 2 同时调用pop函数,并已经读取了head 的值,现在准备将 old_head 的引用计数 +1,但线程 2 率先完成了pop函数操作,将引用计数
_ref_count
更新为 0 并删除了old_head
。此时,线程 1 恰好继续执行第 1 步代码,却尝试访问已经被删除的old_head
,因此导致程序崩溃。 - 风险2:假设线程 1 和线程 2 同时调用 pop函数,并先后将
old_head
的引用计数 +1(此时old_head
的引用计数总计为 3),假如线程 1 率先通过 if 的判断,将head
成功更新为old_head->next
;而线程 2 会因为 CAS 会读取到head
最新修改的值,即old_head->next
,因为head
最新的值和old_head
不同,所以会将old_head
修改为old_head->next
,线程 2 走进 else 分支之后判断的old_head->ref_count
其实是old_head
修改后的引用计数,即old_head->next->ref_count
,为1(只有初始化的时候才 +1,其他线程没有对它的引用计数进行操作),从而会将其删除。其他线程如果要弹出 old_head->next 的时候会发现节点已经不存在,报错。流程如下图所示:
- 假设线程 1 和线程 2 同时调用 pop函数,并先后将
old_head
(node1
)的引用计数 +1(此时old_head
的引用计数总计为 3),如上图中的流程1 - 假如线程 1 率先通过 if 的判断,将
head
成功更新为old_head->next
,即node2
,如上图的流程2 - 线程 2 会因为 CAS 会读取到
head
最新修改的值,即old_head->next
(node2
),因为head
最新的值和old_head
不同,所以会将old_head
修改为和 head 相同的值old_head->next
,如上图的流程3 - 因为线程 2 没通过 if 的判断条件,从而走进了 else 分支,else分支中会执行
old_head->_ref_count.fetch_sub(1) == 1
操作,此时old_head
指向节点的引用计数是 1。注意,fetch_sub
将_ref_count
减 1 后返回的是减之前的值,即1,而不是减之后的值 0,所以这里虽然将 _ref_count 从 1 减为 0 ,但是 if 条件仍然会判断成功,从而将 old_head 删除 - 假如线程 3 在线程 2 执行完之后调用了 pop 函数想要将
node2
弹出,按正常流程来走的话,会读取head
指向的节点,即node2
,但因为node2
已经被线程 2删除,所以这里在读取head.load()
的时候会报错,此时head
指向空指针nullptr
,如下图所示:
解决方法就是将引用计数与指针解耦,将引用计数从结构体提出来,不与指针耦合。
将原本数据结构体 ref_node
分解为 ref_node
和 node
两个结构体,前者用于存储引用计数,后者用于存储数据。
struct node {
//1 数据域智能指针
std::shared_ptr<T> _data;
//2 下一个节点
ref_node _next;
node(T const& data_) : _data(std::make_shared<T>(data_)) {}
};
struct ref_node {
// 引用计数
std::atomic<int> _ref_count;
node* _node_ptr;
ref_node( T const & data_):_node_ptr(new node(data_)), _ref_count(1){}
ref_node():_node_ptr(nullptr),_ref_count(0){}
};
如果将 head
存储为指针类型而不是副本类型:
std::atomic<ref_node*> head; // 头节点
注意:这里的头节点存储的类型是
ref_node*
而不是node*
。
那么 pop()
函数需修改如下:
std::shared_ptr<T> pop() {
//0 处
ref_node* old_head = head.load();
for (;;) {
//1 只要执行pop就对引用计数+1并更新到head中
ref_node* new_head;
do {
new_head = old_head;
new_head->_ref_count += 1; // 7
} while (!head.compare_exchange_weak(old_head, new_head));
//4 确保 head == old_head == new_head
old_head = new_head;
auto* node_ptr = old_head->_node_ptr; // 获取当前节点的 node(存储数据的结构体)
if (node_ptr == nullptr) { // 判断空栈
return std::shared_ptr<T>();
}
//2 比较head和old_head想等则交换否则说明head已经被其他线程更新
if (head.compare_exchange_strong(old_head, node_ptr->_next)) {
// 要返回的值
std::shared_ptr<T> res;
//5 交换智能指针
res.swap(node_ptr->_data);
//6 增加的数量
int increase_count = old_head->_ref_count.fetch_sub(2);
//3 判断仅有当前线程持有指针则删除
if (increase_count == 2) {
delete node_ptr;
}
return res;
}else {
if (old_head->_ref_count.fetch_sub(1) == 1) {
delete node_ptr;
}
}
}
}
- 第一个 while 是为了在多线程下安全地更新头节点的引用计数
- 因为
head
、old_head
、new_head
都是ref_node
类型的结构体,这里必须使用auto* node_ptr = old_head->node_ptr
获取和当前引用计数匹配的数据节点,使用该节点判断是否存在内容;而head
的下一个节点并不是head ->node_ptr
,而是head ->node_ptr->next
- 第二个 while 是为了更新 head 的值,将表头节点从链表中删除(从链表中删除而不是从内存中删除)。
- 如果
head
更新成功,则将old_head ->node_ptr
弹出,而不是将old_head
弹出,old_head
表示的仅仅只是引用计数而不是数据。 int increase_count = old_head->_ref_count.fetch_sub(2)
是为了获取引用计数增加的数量(ref_count.fetch_sub(2)
会将ref_count
的值减去 2 并返回_ref_count
的旧值,而不是减去 2 后的新值),如果确实只增加了 2,那么表示当前线程是弹出节点的唯一持有者,可以直接删除。
如果我们将
head
存储为指针类型而不是副本类型,那么上面的代码有以下风险:
-
风险1:和上面引用计数和数据耦合为一个结构体时的风险2相同,会造成误删:
假设线程 1 比线程 2 先执行,线程 1 在步骤 2 进行比较并交换操作(CAS)后,会将
head
更新为新的值。而此时线程 2 也尝试执行同样的 CAS 操作,但由于head
已被线程 1 修改,线程 2 的 CAS 操作会失败。此时,线程 2 会进入
else
分支进行处理,并更新old_head
为当前的head
值。然而,此时线程 2 中的old_head
的引用计数为 1,而old_head
指向的数据实际上是新的head
所指向的数据节点。如果线程 2 随后误将old_head
指向的数据节点删除,就会导致问题。因为线程 2 删除的实际上是当前head
所指向的节点,而它并未真正弹出节点或修改head
的值。这会导致其他线程在尝试弹出栈顶元素时访问到已被删除的节点,从而引发程序崩溃。 -
风险2:
假设线程 1 和线程 2 都执行完了第 0 步的代码,此时它们读取到的
old_head
值是相同的。线程 1 抢先一步执行,完成了第 5 步操作,并准备执行第 3 步判断逻辑时,线程 2 抢占了 CPU 开始执行第 7 步的代码。虽然线程 2 的
while
循环会检测到old_head
和当前head
不同,并因此进行重试,更新了old_head
,但在do
循环的第一次迭代中,线程 2 的old_head
和线程 1 的old_head
仍然指向同一个节点。此时,线程 2 修改了old_head
的引用计数。这会导致线程 1 在执行第 3 步时,引用计数不满足删除条件(因为线程 2 将old_head
的计数加一,因为head、old_head、new_head的类型都是指针类型,指向的是同一块内存(指针),所以会反映到线程1中。将此时为3,线程 1 加一,线程 2 加一,初始化加一),因此不会进入if
逻辑删除节点。与此同时,因为线程2在2处之后while会不断重试,线程2的head已经和old_head指向不同了,导致线程2也不会回收old_head内部节点指向的数据。最终,
old_head
节点中的数据既没有被线程 1 删除,也没有被线程 2 回收,导致内存泄漏。问题根源在于,多个线程对
old_head
的引用计数存在竞争,但删除逻辑对引用计数的操作未能同步,导致某些节点被遗漏回收。 -
一种和风险2相似但是正常的情况:
假设线程 1 和线程 2 都执行完了第 0 步的代码,此时两者读取的
old_head
值是相同的。接下来,线程 1 比线程 2 先获得 CPU 时间片并继续执行,而线程 2 因未抢占到 CPU 时间片停在了第 1 步。线程 1 按照顺序继续执行,最终到达第 3 步并将
node_ptr
所指向的节点删除。同时,head
已更新为新的栈顶元素,也就是old_head
的下一个节点。这时,线程 2 抢占到了 CPU 时间片,继续执行第 1 步的代码,并在第一个 while 循环中因为 CAS 失败将
old_head
更新为当前head
的值。此时,因为new_head
是指针,所以对new_head
的修改能反映到old_head
本身,old_head
的引用计数也增加了 1,变为 2,但它实际指向的是下一个节点(即新的栈顶)。在这种情况下,线程 2 仍会进入
if
条件(不是进入else
分支),对新的old_head
节点执行删除操作。由于此时的old_head
和引用计数逻辑匹配,这种情况是正常的,不会引发错误。
我们总结如下:
-
在实现引用计数内存回收机制时,应尽量避免直接存储指针,因为存储指针会带来多个线程同时操作同一块内存的风险。我们这里需要将 head 的类型从
std::atomic<ref_node*>
修改为std::atomic<ref_node>
,使用副本类型而不是指针类型。 -
引入一个专门用于记录减少引用计数的计数器。为了确保在多个线程中修改该计数器的安全性,可以将其设计为
node
类的成员变量。由于node
类的实例是通过指针存储在栈的节点中的,多个线程能够安全地访问和修改它。
通过将引用计数分为“增加”和“减少”两部分,并将减少的计数器放入 node
类中,我们能够更清晰地追踪各线程对节点的引用情况,从而避免因引用计数不准确导致的内存泄漏或竞争问题。即双引用计数器版本。
- 一个节点是否可以被回收,取决于其整体引用计数(增加的引用计数加减少的引用计数)是否为 0。只有当引用计数为 0 时,节点才可以被安全地删除。
2.2 双引用计数器无锁栈
有两种双引用计数器无锁栈的实现方式,一种是博主恋恋风辰的实现方式:将增加计数器放在 ref_node 结构体中,将减少计数器放在 node 结构体中;另一种是C++并发编程书中的实现方式,使用了内存序进行了优化,具体实现相同,只不过后者将一些功能分离成了单独的函数,并通过内存序增加了效率。后者我们在下一节学习,本节只学习第一种。
#pragma once
#include <atomic>
template<typename T>
class single_ref_stack {
public:
single_ref_stack(){}
~single_ref_stack() {
//循环出栈
while (pop());
}
void push(T const& data) {
auto new_node = ref_node(data);
new_node._node_ptr->_next = head.load();
while (!head.compare_exchange_weak(new_node._node_ptr->_next, new_node));
}
std::shared_ptr<T> pop() {
ref_node old_head = head.load();
for (;;) {
//1 只要执行pop就对引用计数+1并更新到head中
ref_node new_head;
do {
new_head = old_head;
new_head._ref_count += 1;
} while (!head.compare_exchange_weak(old_head, new_head));
old_head = new_head;
auto* node_ptr = old_head._node_ptr;
if (node_ptr == nullptr) {
return std::shared_ptr<T>();
}
//2 比较head和old_head想等则交换否则说明head已经被其他线程更新
if (head.compare_exchange_strong(old_head, node_ptr->_next)) {
//要返回的值
std::shared_ptr<T> res;
//交换智能指针
res.swap(node_ptr->_data);
//增加的数量
int increase_count = old_head._ref_count - 2;
if (node_ptr->_dec_count.fetch_add(increase_count) == -increase_count) {
delete node_ptr;
}
return res;
}else {
if (node_ptr->_dec_count.fetch_sub(1) == 1) {
delete node_ptr;
}
}
}
}
private:
struct ref_node;
struct node {
//1 数据域智能指针
std::shared_ptr<T> _data;
//2 下一个节点
ref_node _next;
node(T const& data_) : _data(std::make_shared<T>(data_)) {}
// 减少引用计数器
std::atomic<int> _dec_count;
};
struct ref_node {
// 增加引用计数
int _ref_count;
node* _node_ptr;
ref_node( T const & data_):_node_ptr(new node(data_)), _ref_count(1){}
ref_node():_node_ptr(nullptr),_ref_count(0){}
};
//头部节点
std::atomic<ref_node> head;
};
针对单引用计数器提出的结论,我们做如下修改:
-
增加一个减少计数器,用于记录引用计数减少的次数,存储至 node 结构体中。因为 head 存储的是
std::atomic<ref_node>
类型,所以_ref_count
的类型是int
而不用是std::atomic<int>
,ref_node
的数据只需每个线程自己看到就行,不用分享给其他线程。因为
_ref_count
的类型是 int,所以其他线程没办法看到,为了让其他线程看到ref_count
的值,我们在 pop 内部将 _ref_count - 2 的值加到了_dec_count
上面,dec_count
是原子变量,其他线程可以看到。而且我们通过CAS可以修改head的值,因为head是原子类型,所以head的引用计数也会被其他线程看到。一共有两种方式。
struct ref_node;
struct node {
//1 数据域智能指针
std::shared_ptr<T> _data;
//2 下一个节点
ref_node _next;
node(T const& data_) : _data(std::make_shared<T>(data_)) {}
//减少的数量
std::atomic<int> _dec_count;
};
struct ref_node {
// 引用计数
int _ref_count;
node* _node_ptr;
ref_node( T const & data_):_node_ptr(new node(data_)), _ref_count(1){}
ref_node():_node_ptr(nullptr),_ref_count(0){}
};
- 将
head
的类型从std::atomic<ref_node*>
修改为std::atomic<ref_node>
:
std::atomic<ref_node> head;
push()
函数的实现如下:
void push(T const& data) {
auto new_node = ref_node(data);
new_node._node_ptr->_next = head.load();
while (!head.compare_exchange_weak(new_node._node_ptr->_next, new_node));
}
pop()
函数的实现如下:
std::shared_ptr<T> pop() {
ref_node old_head = head.load();
for (;;) {
//1 只要执行pop就对引用计数+1并更新到head中
ref_node new_head;
//2
do {
new_head = old_head;
new_head._ref_count += 1;
} while (!head.compare_exchange_weak(old_head, new_head));
old_head = new_head;
//3
auto* node_ptr = old_head._node_ptr;
if (node_ptr == nullptr) {
return std::shared_ptr<T>();
}
//4 比较head和old_head相等则交换否则说明head已经被其他线程更新
if (head.compare_exchange_strong(old_head, node_ptr->_next)) {
//要返回的值
std::shared_ptr<T> res;
//交换智能指针
res.swap(node_ptr->_data);
//5 增加的数量
int increase_count = old_head._ref_count - 2;
//6
if (node_ptr->_dec_count.fetch_add(increase_count) == -increase_count) {
delete node_ptr;
}
return res;
}else {
//7
if (node_ptr->_dec_count.fetch_sub(1) == 1) {
delete node_ptr;
}
}
}
}
- 当多个线程并发调用
pop
时,可能会有线程在代码的第 2 处不断重试。这种重试的原因是head
和old_head
之间的某些字段(如引用计数或节点node地址)可能已经被其他线程更新。但无论如何,head
使用的是副本存储(std::atomic<ref_node>
),即使重试增加了引用计数,也不会影响到其他线程,因为每个线程操作的都是自己的old_head
副本。 - 在代码的第 3 处,我们从
old_head
中取出node_ptr
,用来保存指向节点的地址。这样,后续操作中可以安全地减少该节点的引用计数(比如 6 和 7 处的node_ptr->_dec_count.fetch_add(x)
)。因为多个线程可能同时操作同一个node_ptr
指向的节点,所以引用计数使用了原子变量,以确保多个线程对其修改时相互可见且线程安全。 - 在第 4 处,我们通过比较交换(
compare_exchange_strong
)操作来判断old_head
和head
是否一致。因为 head 、new_head、old_head 存储的是副本类型,所以多个线程看到的old_head
的值可能不一样(因为每个线程的old_head
都是独立的副本,互不干涉),但我们能保证仅有一个线程进入if逻辑,进入的线程就是old_head和head匹配的那个。 如果一致,说明当前线程成功抢占操作权,进入if
逻辑;如果不一致,说明head
已被其他线程更新,当前线程无法进入if
,需要进行后续的else
逻辑处理。值得注意的是,即使多个线程同时看到不同的old_head
,也只有一个线程能进入if
条件,其它线程都会失败。 - 在第 5 处,我们已经确定当前线程需要处理的
node_ptr
节点。此时,我们定义了res
,用来保存弹出的数据。在调整引用计数时,先减去 2,其中 1 表示当前线程自身的操作,另 1 表示 初始化的 1。接着计算其他并发线程操作该节点的数量,存储在increase_count
中。最终,是否删除节点取决于增加和减少的引用计数之和是否为 0。 - 在第 6 处,我们使用
fetch_add
操作来调整_dec_count
,并返回操作前的值。如果返回值为负的increase_count
,说明当前线程是唯一操作该节点的线程(因为没有其他线程增加引用计数),此时可以安全删除该节点。如果返回值不是负的increase_count
,则说明还有其他线程正在操作此节点,不能删除。
示例:
假设有两个线程 t1
、t2
执行 pop()
函数,假如线程 t1
先于线程 t2
执行 2 处代码:
head
的类型是std::atomic<ref_node>
,而new_head
和old_head
的类型是ref_node
,所以在线程t1
和t2
中的old_head
和new_head
互相独立互不影响。- 线程
t1
将属于自身线程的old_head
和new_head
副本的引用计数 +1,并将head
中的数据修改为和old_head
相同的数据内容,head
的增计数器_ref_count
值此时为 2。 - 当线程
t1
执行完 2 处代码后,线程t2
开始执行,此时因为 CAS 机制使得线程t2
专属的old_head
中的增引用计数器被修改为和head
相同的内容,即 2。因为head
的原子类型,所以线程t1
对head
的修改会被线程t2
看到,此时线程t2
看到的head
的增引用计数器的值为 2,在线程t2
第二次 CAS 重试之后,会将old_head
的增计数器 +1,从 2 增到 3。(所以虽然_ref_count
的类型是 int,但是通过这一种方式可以间接的让其他线程看到)。
假如线程 t1
先于线程 t2
执行 4 处代码:
-
因为
head.load() == old_head
,所以线程t1
会进入 if 分支并将head
更新到node_ptr->_next
-
increase_count
表示除本线程对目标节点的引用外,其他线程对该目标节点的引用数量,这里 -2 是将线程t1
对目标节点的引用减去,再将初始化时对引用的初始化减一,一共减去二。increase_count
此时应该为 1,表示除了线程t1
在引用目标节点外,还有线程t2
在引用目标节点(除了将head
更新这种方式,也可以通过这种方式让其他线程看到引用计数的修改,increase_count
表示其他线程对目标节点的引用数量)
线程 t1
在执行 6 处代码之前,一共有两种可能:
-
第一种可能:
线程
t2
还未执行 7 处代码,此时_dec_count
的默认初始值为 0,线程t1
将_dec_count
加increase_count
,即加 1,并返回_dec_count
原本的值 0;此时不满足 if 的判断,即0 != -1
,所以不会删除数据节点。线程
t2
在线程t1
执行完 6 处代码之后执行 7 处代码,此时_dec_count
被线程t1
增加到了 1,然后在 if 判断中对_dec_count - 1
并返回_dec_count
初值,即 1;因为1 == 1
,表示当前仅有线程t2
引用目标节点,可以直接删除。 -
第二种可能:
线程
t2
已经执行了 7 处代码,此时_dec_count
的默认初始值为 0,线程t2
将_dec_count
减 1 ,并返回 0;因为0 != 1
,所以线程t2
不会将数据内容删除。线程
t1
在线程t2
执行完 7 处代码后执行 6 处代码,此时_dec_count
的值被线程t2
修改为 -1,然后在 if 判断中将_dec_count
加increase_count
,并返回 -1;因为 -1 == -1,此时仅有线程t1
引用了目标节点,线程t1
负责将节点删除。
head
是std::atomic<ref_node>
类型,old_head
和new_head
是ref_node
类型。head
的修改能及时反映到不同线程上(在局一致性内存序下)
3. 本节的一些理解
为什么需要两个计数器,一个是否足够?
如果只用一个计数器,我们首先要考虑把它放在哪里。直觉上,可能会想到将计数器放在节点内部,但这样是行不通的。因为计数器的目的是避免空悬指针解引用(访问已删除的节点),但如果计数器在节点内部,访问计数器本身就需要先访问节点。如果节点已经被删除,这就会导致问题(所以我们将引用计数与指针解耦,将引用计数从结构体提出来,不与指针耦合,将一个结构体拆解成了两个)。
举个场景:线程 A 读取了 head
指针,并存到 local_head
中。这时线程 A 还没来得及通过 local_head
访问节点内部的计数器并增加计数,而线程 B 率先操作了节点的计数器,加了 1,然后弹出了节点,并在操作完成后让计数器减 1,发现计数变成了 0,于是删除了节点。此时,线程 A 再试图通过 local_head
访问节点时,节点已经被删除了,出现了空悬指针解引用。
因此,如果只使用一个计数器,它就不能放在节点内部,而需要放在节点外部。计数器需要和 head 指针处于同一“层级”(即数据结构体node和ref_node相互关联,我们通过ref_node->node_ptr即可访问数据内容,通过node->next即可访问下一个节点的计数器),成为所有线程都能直接操作的数据。
为了实现这一点,我们可以把计数器和 head
指针合二为一。这样,读取 head
指针和增加计数器的操作可以原子地进行,避免在这两步之间出现时间差(如果计数器和 head
是两个独立变量,操作之间无法保证原子性)。理想情况下,每当有一个指针指向 head
节点,计数器加 1;当指针不再指向该节点时,计数器减 1。但这种设计在实现时会遇到复杂问题。使用一个计数器无法实现,我们需要使用两个计数器来实现。
为什么一个计数器无法实现?
设想以下过程:
- 线程 A 加载
head
到new_head
(类型为ref_node
),如果head
没有变化,就增加new_head
的计数器,并更新head
的计数器。最后将new_head
的计数器赋值给old_head
,此时,三者的计数器都变为 2。注意,这些操作发生在线程 A 访问节点之前,因此确保节点不会被删除。 - 在线程 A 还没来得及访问节点时,线程 B 也加载了
head
到它的local_head
。线程 B 增加了计数器,此时计数器值变为 3(表明两个线程正在引用该节点,在此之前线程B会经历一次CAS,因为head
已经被线程A修改,所以线程B的old_head
会读取head
最新的值,此时线程B即可读取到线程A对引用计数修改的最新值,再+1会变为3)。接着,线程 B 将head
修改为指向下一个节点,并让head
的计数器减 2,计数器变回 1。 - 线程 B 完成操作后退出,此时节点只被线程A引用,计数器保持为 1。线程 A 继续操作,但发现
old_head
(3) 和当前的head
(1) 已不一致,放弃弹出该节点,并进入 else 分支让减计数器减 1,计数器变为 0。
问题是,上述过程仅仅是在两个计数器的共同配合下才会实现的。如果仅有一个计数器,那么计数器永远不会减到 0,导致节点永远不会被删除。这是因为不同线程对计数器的操作是独立的,无法相互感知。
为什么需要两个计数器?
为了解决这个问题,计数器必须与节点绑定,且每个节点只能有一个全局计数器,就像 shared_ptr
的设计一样。然而,shared_ptr
无法实现对 head
的无锁、原子性操作,因此必须引入第二个计数器。
两个计数器的设计允许:
- 一个计数器跟随
head
,用于线程安全地读取和操作(增计数器)。 - 另一个计数器绑定到节点,用于判断节点是否可以安全删除(减计数器)。
这样的设计虽然复杂,但能确保计数的正确性和节点的安全性。