并发编程(19)——引用计数型无锁栈

文章目录

  • 十九、day19
  • 1. 引用计数
  • 2. 代码实现
    • 2.1 单引用计数器无锁栈
    • 2.2 双引用计数器无锁栈
  • 3. 本节的一些理解

十九、day19

上一节我们学习通过侯删链表以及风险指针与侯删链表的组合两种方式实现了并发无锁栈,但是这两种方式有以下缺点:

第一种方式:如果 pop 操作频繁被多个线程调用,待删除的节点将不断累积,导致待删除列表无法及时回收,进而占用大量内存。此时我们需要一种机制,每次调用 pop 函数结束后都需要判断侯删链表中的节点是否可以被 delete,即风险指针。

第二种方式

  1. 如何正确且高效地分配这块内存是一个难点。

  2. 为了删除一个节点,需要遍历风险指针数组,检查该节点是否被某个风险指针引用。同时,在处理待删除节点时,也需要从风险数组中选取合适的节点记录其地址,这进一步增加了开销。

  3. 而且因为该方法受专利保护,所以在一些场合可能无法使用

在这一节中,我们学习如何通过引用计数来实现无锁栈以及安全回收机制。

参考:

恋恋风辰官方博客

《c++并发编程》中无锁栈的实现为什么要用双引用计数器_无锁栈 双重引用技术-CSDN博客

C++实现“无锁”双buffer - 知乎


1. 引用计数

本节的引用计数通过两个计数器实现:一个是存储在节点内部的 内部计数器,另一个是与指针绑定在一起的 外部计数器。外部计数器和指针被封装在一个结构体 counted_node_ptr 中,因此栈顶指针的类型从原生指针(void⭐、int⭐、char⭐、node⭐…)修改为 counted_node_ptr

struct counted_node_ptr {
    int external_count; // 外部计数器
    node* ptr;          // 原始指针,指向节点
};

具体机制如下:

  • 当线程加载 head 指针时,外部计数器会加 1,表示有新的线程在引用这个节点。
  • 当线程不再引用某个节点时,内部计数器会减 1。

通过将 内部计数器外部计数器 相加,就可以得到该节点的实际引用计数。当引用计数归零时,说明没有线程再访问这个节点,此时可以安全地删除它。这种设计确保了节点的引用计数在并发环境下能够准确管理,同时避免了资源泄露的问题。数据结构如下图所示

在这里插入图片描述

图片来源:https://blog.csdn.net/ld_long/article/details/137692475

当线程尝试从栈中弹出数据时,会按照以下步骤操作:

  1. 读取栈顶指针
    线程首先读取 head 指针(栈顶指针),将其值存入线程的局部变量 local_head 中,同时将 head 指针的外部计数器加 1。这个过程是原子的,因此能够确保线程安全。
  2. 同步计数器
    local_head 的外部计数器更新为与 head 的外部计数器相同。随后,线程可以通过 local_head 访问节点。
  3. 判断能否弹出节点
    • 如果在 local_head 载入后,发现 local_head 的指针值与当前 head 不一致,说明有其他线程在此期间修改了 head。此时,当前线程无法弹出节点,因为它载入的 local_head 已作废。
    • 线程会将节点的内部计数器减 1(因为当前线程不再指向该节点)。如果内部计数器变为 0(说明没有其他线程再引用这个节点),当前线程负责删除节点;否则,什么也不做。
  4. 成功弹出节点
    • 如果 local_head 的指针值仍与 head 相同,说明没有其他线程修改 head,当前线程可以成功弹出节点。
    • 线程将 head 指针更新为当前节点的下一个节点(即 head.ptr->next),这个更新操作是原子的。
  5. 更新计数器
    • 弹出节点后,将 local_head 的外部计数器减 1,因为 head 指针已经不再指向这个节点。
    • 再次将 local_head 的外部计数器减 1,因为 local_head 也准备不再指向这个节点(实际上,可以一次性减 2,这里分开解释是为了说明两个来源)。
    • 然后,将 local_head 的外部计数器值加到节点的内部计数器上。
  6. 判断节点是否需要删除
    • 如果内部计数器因此变为 0(即它原本是外部计数器的相反数),说明没有线程再引用该节点,当前线程负责删除节点。
    • 如果内部计数器不为 0,则无需删除节点。

通过这样的流程,可以确保节点在无人引用时才被安全回收,同时减少不必要的内存操作,提升了效率和线程安全性。

这段文字光看很难看懂,我们通过代码进行解释:

2. 代码实现

2.1 单引用计数器无锁栈

在此之前,我们需要知道:该方式必须通过两个计数器才能实现,如果只用一个,会有一些问题。我们先通过只用一个计数器实现引用计数,进而说明为什么只能使用两个计数器实现。

template<typename T>
class single_ref_stack {
public:
    single_ref_stack():head(nullptr) {
    }
    ~single_ref_stack() {
        //循环出栈
        while (pop());
    }
    
    void push(T const& data);
    std::shared_ptr<T> pop();   
private:
    struct ref_node {
        //1 数据域智能指针
        std::shared_ptr<T>  _data;
        //2 引用计数
        std::atomic<int> _ref_count;
        //3  下一个节点
        ref_node* _next;
        ref_node(T const& data_) : _data(std::make_shared<T>(data_)),
            _ref_count(1), _next(nullptr) {}
    };
    //头部节点
    std::atomic<ref_node*> head;
};

上段代码是一个单引用的无锁栈实现,内容很简单不多于解释,我们只需要注意一点:计数器引用计数必须要使用原子类型,为了保证多线程修改下仍是线程安全的。

接下来实现 pop() 和 push():

void push(T const& data) {
    auto new_node = new ref_node(data);
    new_node->next = head.load();
    while (!head.compare_exchange_weak(new_node->next, new_node));
}

push() 函数很简单,就是将数据插入至栈链表的表头,并且调用原子操作 compare_exchange_weakhead 修改为 new_node,即将 head 指针指向新插入的表头节点 new_node

std::shared_ptr<T> pop() {
    ref_node* old_head = head.load();
    for (;;) {
        if (!old_head) { // 空栈检查
            return std::shared_ptr<T>();
        }
        //1 只要执行pop就对引用计数+1
        ++(old_head->_ref_count);
        //2 比较head和old_head想等则交换否则说明head已经被其他线程更新
        if (head.compare_exchange_strong(old_head, old_head->_next)) {
            auto cur_count = old_head->_ref_count.load();
            auto new_count;
            //3  循环重试保证引用计数安全更新
            do {
                //4 减去本线程增加的1次和初始的1次
                new_count = cur_count - 2;
            } while (!old_head->_ref_count.compare_exchange_weak(cur_count, new_count));
            //返回头部数据
            std::shared_ptr<T> res;
            //5  交换数据
            res.swap(old_head->_data);
            //6
            if (old_head->_ref_count == 0) {
                delete old_head;
            }
            return res;
        }
        else {
            //7 
            if (old_head->_ref_count.fetch_sub(1) == 1) {
                delete old_head;
            }
        }
    }
}

该函数仅有两种返回值:空栈返回一个空的智能指针;非空栈返回一个指向弹出数据的智能指针。执行流程如下:

  1. 进行空栈检查,如果是空栈,直接返回空的 std::shared_ptr<T>(),否则继续执行
  2. 因为节点结构体的成员增加了一个原子类型的引用计数,这里增加当前线程对节点(old_head)的引用计数(因为计数器是 std::atomic<int> 类型的整型原子变量,所以 ++ 是原子操作),避免该节点(old_head)在操作过程中被其他线程删除
  3. 使用 CAS(Compare-And-Swap) 操作尝试将 headold_head 更新为 old_head->_next。如果成功说明当前线程成功弹出头节点,否则说明其他线程在此期间修改了 head,需要重新尝试。
  4. 如果判断成功,则 head 被修改为 old_head->_next。按照上一节的做法,这里需要将 old_head 加入至侯删链表,但是在本节中我们需要在这里更新引用计数
    1. 首先,读取 old_head 的引用计数(我们在 pop 函数最开始将其 +1,保证其他线程不会删除它),并将其赋值给 cur_count
    2. 然后将引用计数减去 2(pop 函数一开始加了1,每个节点初始化的时候默认构造函数将引用计数设置为 1,一共增加了两次 1)
    3. 使用 CAS(Weak) 重试,保证引用计数能被安全更新到 new_count
    4. 引用计数修改成功之后,将弹出节点 old_head 的数据存储至 std::shared_ptr<T> 类型的智能指针中,并判断 old_head 的引用计数是否为 0,如果为 0 则直接将 old_head 删除,否则保留
    5. 最后返回存有 old_head->_data 的智能指针 res
  5. 如果判断失败,说明其他线程在此期间修改了 head。判断失败的线程可以理解为抢占失败的线程,因为 pop 函数的一开始会将栈表头节点的引用计数 +1,所以如果抢占失败,需要在 pop 函数结束前将引用计数 -1。如果引用 -1 之后的值为 1,则表示其他对该节点 pop 的线程已经结束,当前线程是最后引用此指针的线程,我们需要在该线程中将 old_head 删除。

但是该函数存在隐含的风险

  • 风险1:假设线程 1 和线程 2 同时调用pop函数,并已经读取了head 的值,现在准备将 old_head 的引用计数 +1,但线程 2 率先完成了pop函数操作,将引用计数 _ref_count 更新为 0 并删除了 old_head。此时,线程 1 恰好继续执行第 1 步代码,却尝试访问已经被删除的 old_head,因此导致程序崩溃。
  • 风险2:假设线程 1 和线程 2 同时调用 pop函数,并先后将 old_head 的引用计数 +1(此时 old_head 的引用计数总计为 3),假如线程 1 率先通过 if 的判断,将 head 成功更新为 old_head->next;而线程 2 会因为 CAS 会读取到 head 最新修改的值,即 old_head->next,因为 head 最新的值和 old_head 不同,所以会将 old_head 修改为 old_head->next,线程 2 走进 else 分支之后判断的 old_head->ref_count 其实是 old_head 修改后的引用计数,即 old_head->next->ref_count,为1(只有初始化的时候才 +1,其他线程没有对它的引用计数进行操作),从而会将其删除。其他线程如果要弹出 old_head->next 的时候会发现节点已经不存在,报错。流程如下图所示:

在这里插入图片描述

  • 假设线程 1 和线程 2 同时调用 pop函数,并先后将 old_headnode1)的引用计数 +1(此时 old_head 的引用计数总计为 3),如上图中的流程1
  • 假如线程 1 率先通过 if 的判断,将 head 成功更新为 old_head->next,即node2,如上图的流程2
  • 线程 2 会因为 CAS 会读取到 head 最新修改的值,即 old_head->nextnode2),因为 head 最新的值和 old_head 不同,所以会将 old_head 修改为和 head 相同的值 old_head->next,如上图的流程3
  • 因为线程 2 没通过 if 的判断条件,从而走进了 else 分支,else分支中会执行 old_head->_ref_count.fetch_sub(1) == 1操作,此时 old_head 指向节点的引用计数是 1。注意,fetch_sub_ref_count 减 1 后返回的是减之前的值,即1,而不是减之后的值 0,所以这里虽然将 _ref_count 从 1 减为 0 ,但是 if 条件仍然会判断成功,从而将 old_head 删除
  • 假如线程 3 在线程 2 执行完之后调用了 pop 函数想要将 node2 弹出,按正常流程来走的话,会读取 head 指向的节点,即 node2,但因为 node2 已经被线程 2删除,所以这里在读取 head.load() 的时候会报错,此时 head 指向空指针 nullptr,如下图所示:

在这里插入图片描述

解决方法就是将引用计数与指针解耦,将引用计数从结构体提出来,不与指针耦合

将原本数据结构体 ref_node 分解为 ref_nodenode 两个结构体,前者用于存储引用计数,后者用于存储数据。

struct node {
    //1 数据域智能指针
    std::shared_ptr<T>  _data;
    //2  下一个节点
    ref_node _next;
    node(T const& data_) : _data(std::make_shared<T>(data_)) {}
};
struct ref_node {
    // 引用计数
    std::atomic<int> _ref_count;
    node* _node_ptr;
    ref_node( T const & data_):_node_ptr(new node(data_)), _ref_count(1){}
    ref_node():_node_ptr(nullptr),_ref_count(0){}
};

如果将 head 存储为指针类型而不是副本类型

std::atomic<ref_node*> head; // 头节点

注意:这里的头节点存储的类型是 ref_node* 而不是 node*

那么 pop() 函数需修改如下:

std::shared_ptr<T> pop() {
    //0 处
    ref_node* old_head = head.load();
    for (;;) {
        //1 只要执行pop就对引用计数+1并更新到head中
        ref_node* new_head;
        do {
            new_head = old_head;
            new_head->_ref_count += 1; // 7
        } while (!head.compare_exchange_weak(old_head, new_head));
        //4 确保 head == old_head == new_head 
        old_head = new_head;
        
        auto* node_ptr = old_head->_node_ptr; // 获取当前节点的 node(存储数据的结构体)
        if (node_ptr == nullptr) { // 判断空栈
            return  std::shared_ptr<T>();
        }
        
        //2 比较head和old_head想等则交换否则说明head已经被其他线程更新
        if (head.compare_exchange_strong(old_head, node_ptr->_next)) {
            // 要返回的值
            std::shared_ptr<T> res;
            //5 交换智能指针
            res.swap(node_ptr->_data);
            //6 增加的数量
            int increase_count = old_head->_ref_count.fetch_sub(2);
            //3 判断仅有当前线程持有指针则删除
            if (increase_count == 2) {
                delete node_ptr;
            }
            return res;
        }else {
            if (old_head->_ref_count.fetch_sub(1) == 1) {
                delete node_ptr;
            }
        }
    }
}
  • 第一个 while 是为了在多线程下安全地更新头节点的引用计数
  • 因为 headold_headnew_head 都是 ref_node 类型的结构体,这里必须使用 auto* node_ptr = old_head->node_ptr 获取和当前引用计数匹配的数据节点,使用该节点判断是否存在内容;而 head 的下一个节点并不是 head ->node_ptr,而是 head ->node_ptr->next
  • 第二个 while 是为了更新 head 的值,将表头节点从链表中删除(从链表中删除而不是从内存中删除)。
  • 如果 head 更新成功,则将 old_head ->node_ptr 弹出,而不是将 old_head 弹出,old_head 表示的仅仅只是引用计数而不是数据。
  • int increase_count = old_head->_ref_count.fetch_sub(2)是为了获取引用计数增加的数量(ref_count.fetch_sub(2) 会将 ref_count 的值减去 2 并返回 _ref_count 的旧值,而不是减去 2 后的新值),如果确实只增加了 2,那么表示当前线程是弹出节点的唯一持有者,可以直接删除。

如果我们将 head存储为指针类型而不是副本类型,那么上面的代码有以下风险:

  • 风险1:和上面引用计数和数据耦合为一个结构体时的风险2相同,会造成误删:

    假设线程 1 比线程 2 先执行,线程 1 在步骤 2 进行比较并交换操作(CAS)后,会将 head 更新为新的值。而此时线程 2 也尝试执行同样的 CAS 操作,但由于 head 已被线程 1 修改,线程 2 的 CAS 操作会失败。

    此时,线程 2 会进入 else 分支进行处理,并更新 old_head 为当前的 head 值。然而,此时线程 2 中的 old_head 的引用计数为 1,而 old_head 指向的数据实际上是新的 head 所指向的数据节点。如果线程 2 随后误将 old_head 指向的数据节点删除,就会导致问题。因为线程 2 删除的实际上是当前 head 所指向的节点,而它并未真正弹出节点或修改 head 的值。这会导致其他线程在尝试弹出栈顶元素时访问到已被删除的节点,从而引发程序崩溃。

  • 风险2

    假设线程 1 和线程 2 都执行完了第 0 步的代码,此时它们读取到的 old_head 值是相同的。线程 1 抢先一步执行,完成了第 5 步操作,并准备执行第 3 步判断逻辑时,线程 2 抢占了 CPU 开始执行第 7 步的代码。

    虽然线程 2 的 while 循环会检测到 old_head 和当前 head 不同,并因此进行重试,更新了 old_head,但在 do 循环的第一次迭代中,线程 2 的 old_head 和线程 1 的 old_head 仍然指向同一个节点。此时,线程 2 修改了 old_head 的引用计数。这会导致线程 1 在执行第 3 步时,引用计数不满足删除条件(因为线程 2 将old_head的计数加一,因为head、old_head、new_head的类型都是指针类型,指向的是同一块内存(指针),所以会反映到线程1中。将此时为3,线程 1 加一,线程 2 加一,初始化加一),因此不会进入 if 逻辑删除节点。

    与此同时,因为线程2在2处之后while会不断重试,线程2的head已经和old_head指向不同了,导致线程2也不会回收old_head内部节点指向的数据。最终,old_head 节点中的数据既没有被线程 1 删除,也没有被线程 2 回收,导致内存泄漏。

    问题根源在于,多个线程对 old_head 的引用计数存在竞争,但删除逻辑对引用计数的操作未能同步,导致某些节点被遗漏回收。

  • 一种和风险2相似但是正常的情况

    假设线程 1 和线程 2 都执行完了第 0 步的代码,此时两者读取的 old_head 值是相同的。接下来,线程 1 比线程 2 先获得 CPU 时间片并继续执行,而线程 2 因未抢占到 CPU 时间片停在了第 1 步。

    线程 1 按照顺序继续执行,最终到达第 3 步并将 node_ptr 所指向的节点删除。同时,head 已更新为新的栈顶元素,也就是 old_head 的下一个节点。

    这时,线程 2 抢占到了 CPU 时间片,继续执行第 1 步的代码,并在第一个 while 循环中因为 CAS 失败将 old_head 更新为当前 head 的值。此时,因为 new_head 是指针,所以对 new_head 的修改能反映到old_head 本身,old_head的引用计数也增加了 1,变为 2,但它实际指向的是下一个节点(即新的栈顶)。

    在这种情况下,线程 2 仍会进入 if 条件(不是进入 else 分支),对新的 old_head 节点执行删除操作。由于此时的 old_head 和引用计数逻辑匹配,这种情况是正常的,不会引发错误。

我们总结如下:

  1. 在实现引用计数内存回收机制时,应尽量避免直接存储指针,因为存储指针会带来多个线程同时操作同一块内存的风险。我们这里需要将 head 的类型从 std::atomic<ref_node*> 修改为 std::atomic<ref_node>,使用副本类型而不是指针类型。

  2. 引入一个专门用于记录减少引用计数的计数器。为了确保在多个线程中修改该计数器的安全性,可以将其设计为 node 类的成员变量。由于 node 类的实例是通过指针存储在栈的节点中的,多个线程能够安全地访问和修改它。

通过将引用计数分为“增加”和“减少”两部分,并将减少的计数器放入 node 类中,我们能够更清晰地追踪各线程对节点的引用情况,从而避免因引用计数不准确导致的内存泄漏或竞争问题。即双引用计数器版本。

  1. 一个节点是否可以被回收,取决于其整体引用计数(增加的引用计数加减少的引用计数)是否为 0。只有当引用计数为 0 时,节点才可以被安全地删除。

2.2 双引用计数器无锁栈

有两种双引用计数器无锁栈的实现方式,一种是博主恋恋风辰的实现方式:将增加计数器放在 ref_node 结构体中,将减少计数器放在 node 结构体中;另一种是C++并发编程书中的实现方式,使用了内存序进行了优化,具体实现相同,只不过后者将一些功能分离成了单独的函数,并通过内存序增加了效率。后者我们在下一节学习,本节只学习第一种。

#pragma once
#include <atomic>

template<typename T>
class single_ref_stack {
public:
	single_ref_stack(){}

	~single_ref_stack() {
		//循环出栈
		while (pop());
	}

	void push(T const& data) {
		auto new_node =  ref_node(data);
		new_node._node_ptr->_next = head.load();
		while (!head.compare_exchange_weak(new_node._node_ptr->_next, new_node));
	}

	std::shared_ptr<T> pop() {
		ref_node old_head = head.load();
		for (;;) {
			//1 只要执行pop就对引用计数+1并更新到head中
			ref_node new_head;
			do {
				new_head = old_head;
				new_head._ref_count += 1;
			} while (!head.compare_exchange_weak(old_head, new_head));

			old_head = new_head;

			auto* node_ptr = old_head._node_ptr;
			if (node_ptr == nullptr) {
				return  std::shared_ptr<T>();
			}

			//2 比较head和old_head想等则交换否则说明head已经被其他线程更新
			if (head.compare_exchange_strong(old_head, node_ptr->_next)) {
				
				//要返回的值
				std::shared_ptr<T> res;
				//交换智能指针
				res.swap(node_ptr->_data);

				//增加的数量
				int increase_count = old_head._ref_count - 2;
				
				if (node_ptr->_dec_count.fetch_add(increase_count) == -increase_count) {
					delete node_ptr;
				}

				return res;
			}else {
				if (node_ptr->_dec_count.fetch_sub(1) == 1) {
					delete node_ptr;
				}
			}
		}
	}

private:
	struct ref_node;
	struct node {
		//1 数据域智能指针
		std::shared_ptr<T>  _data;
		//2  下一个节点
		ref_node _next;
		node(T const& data_) : _data(std::make_shared<T>(data_)) {}
		// 减少引用计数器
		std::atomic<int>  _dec_count;
	};

	struct ref_node {
		// 增加引用计数
		int  _ref_count;
		node* _node_ptr;
		ref_node( T const & data_):_node_ptr(new node(data_)), _ref_count(1){}
		ref_node():_node_ptr(nullptr),_ref_count(0){}
	};

	//头部节点
	std::atomic<ref_node> head;
};

针对单引用计数器提出的结论,我们做如下修改:

  1. 增加一个减少计数器,用于记录引用计数减少的次数,存储至 node 结构体中。因为 head 存储的是 std::atomic<ref_node> 类型,所以 _ref_count 的类型是 int 而不用是 std::atomic<int>ref_node 的数据只需每个线程自己看到就行,不用分享给其他线程。

    因为 _ref_count 的类型是 int,所以其他线程没办法看到,为了让其他线程看到 ref_count 的值,我们在 pop 内部将 _ref_count - 2 的值加到了 _dec_count 上面,dec_count原子变量,其他线程可以看到。而且我们通过CAS可以修改head的值,因为head是原子类型,所以head的引用计数也会被其他线程看到。一共有两种方式。

    struct ref_node;
    struct node {
        //1 数据域智能指针
        std::shared_ptr<T>  _data;
        //2  下一个节点
        ref_node _next;
        node(T const& data_) : _data(std::make_shared<T>(data_)) {}
        //减少的数量
        std::atomic<int>  _dec_count;
    };
    struct ref_node {
        // 引用计数
        int  _ref_count;
        node* _node_ptr;
        ref_node( T const & data_):_node_ptr(new node(data_)), _ref_count(1){}
        ref_node():_node_ptr(nullptr),_ref_count(0){}
    };
  1. head 的类型从 std::atomic<ref_node*> 修改为 std::atomic<ref_node>
std::atomic<ref_node> head;

push() 函数的实现如下:

void push(T const& data) {
    auto new_node =  ref_node(data);
    new_node._node_ptr->_next = head.load();
    while (!head.compare_exchange_weak(new_node._node_ptr->_next, new_node));
}

pop() 函数的实现如下:

std::shared_ptr<T> pop() {
    ref_node old_head = head.load();
    for (;;) {
        //1 只要执行pop就对引用计数+1并更新到head中
        ref_node new_head;
        //2
        do {
            new_head = old_head;
            new_head._ref_count += 1;
        } while (!head.compare_exchange_weak(old_head, new_head));
        old_head = new_head;
        //3
        auto* node_ptr = old_head._node_ptr;
        if (node_ptr == nullptr) {
            return  std::shared_ptr<T>();
        }
        //4 比较head和old_head相等则交换否则说明head已经被其他线程更新
        if (head.compare_exchange_strong(old_head, node_ptr->_next)) {
            //要返回的值
            std::shared_ptr<T> res;
            //交换智能指针
            res.swap(node_ptr->_data);
            //5  增加的数量
            int increase_count = old_head._ref_count - 2;
            //6  
            if (node_ptr->_dec_count.fetch_add(increase_count) == -increase_count) {
                delete node_ptr;
            }
            return res;
        }else {
            //7
            if (node_ptr->_dec_count.fetch_sub(1) == 1) {
                delete node_ptr;
            }
        }
    }
}
  1. 当多个线程并发调用 pop 时,可能会有线程在代码的第 2 处不断重试。这种重试的原因是 headold_head 之间的某些字段(如引用计数或节点node地址)可能已经被其他线程更新。但无论如何,head 使用的是副本存储(std::atomic<ref_node>),即使重试增加了引用计数,也不会影响到其他线程,因为每个线程操作的都是自己的 old_head 副本
  2. 在代码的第 3 处,我们从 old_head 中取出 node_ptr,用来保存指向节点的地址。这样,后续操作中可以安全地减少该节点的引用计数(比如 6 和 7 处的 node_ptr->_dec_count.fetch_add(x))。因为多个线程可能同时操作同一个 node_ptr 指向的节点,所以引用计数使用了原子变量,以确保多个线程对其修改时相互可见且线程安全。
  3. 在第 4 处,我们通过比较交换(compare_exchange_strong)操作来判断 old_headhead 是否一致。因为 head 、new_head、old_head 存储的是副本类型,所以多个线程看到的 old_head 的值可能不一样(因为每个线程的 old_head 都是独立的副本,互不干涉),但我们能保证仅有一个线程进入if逻辑,进入的线程就是old_head和head匹配的那个。 如果一致,说明当前线程成功抢占操作权,进入 if 逻辑;如果不一致,说明 head 已被其他线程更新,当前线程无法进入 if,需要进行后续的 else 逻辑处理。值得注意的是,即使多个线程同时看到不同的 old_head,也只有一个线程能进入 if 条件,其它线程都会失败。
  4. 在第 5 处,我们已经确定当前线程需要处理的 node_ptr 节点。此时,我们定义了 res,用来保存弹出的数据。在调整引用计数时,先减去 2,其中 1 表示当前线程自身的操作,另 1 表示 初始化的 1。接着计算其他并发线程操作该节点的数量,存储在 increase_count 中。最终,是否删除节点取决于增加和减少的引用计数之和是否为 0。
  5. 在第 6 处,我们使用 fetch_add 操作来调整 _dec_count,并返回操作前的值。如果返回值为负的 increase_count,说明当前线程是唯一操作该节点的线程(因为没有其他线程增加引用计数),此时可以安全删除该节点。如果返回值不是负的 increase_count,则说明还有其他线程正在操作此节点,不能删除。

示例

假设有两个线程 t1t2 执行 pop() 函数,假如线程 t1 先于线程 t2 执行 2 处代码:

  • head 的类型是std::atomic<ref_node>,而 new_headold_head 的类型是 ref_node,所以在线程 t1t2 中的 old_headnew_head 互相独立互不影响。
  • 线程 t1 将属于自身线程的 old_headnew_head 副本的引用计数 +1,并将 head 中的数据修改为和 old_head 相同的数据内容,head 的增计数器 _ref_count 值此时为 2。
  • 当线程 t1 执行完 2 处代码后,线程 t2 开始执行,此时因为 CAS 机制使得线程 t2 专属的 old_head 中的增引用计数器被修改为和 head 相同的内容,即 2。因为 head 的原子类型,所以线程 t1head 的修改会被线程 t2 看到,此时线程 t2 看到的 head 的增引用计数器的值为 2,在线程 t2 第二次 CAS 重试之后,会将 old_head 的增计数器 +1,从 2 增到 3。(所以虽然 _ref_count 的类型是 int,但是通过这一种方式可以间接的让其他线程看到)。

假如线程 t1 先于线程 t2 执行 4 处代码:

  • 因为 head.load() == old_head,所以线程 t1 会进入 if 分支并将 head 更新到 node_ptr->_next

  • increase_count 表示除本线程对目标节点的引用外,其他线程对该目标节点的引用数量,这里 -2 是将线程 t1 对目标节点的引用减去,再将初始化时对引用的初始化减一,一共减去二。increase_count 此时应该为 1,表示除了线程 t1 在引用目标节点外,还有线程 t2 在引用目标节点(除了将 head 更新这种方式,也可以通过这种方式让其他线程看到引用计数的修改increase_count 表示其他线程对目标节点的引用数量)

线程 t1 在执行 6 处代码之前,一共有两种可能:

  • 第一种可能:

    线程 t2 还未执行 7 处代码,此时 _dec_count 的默认初始值为 0,线程 t1_dec_countincrease_count ,即加 1,并返回 _dec_count 原本的值 0;此时不满足 if 的判断,即 0 != -1,所以不会删除数据节点。

    线程 t2 在线程 t1 执行完 6 处代码之后执行 7 处代码,此时 _dec_count 被线程 t1 增加到了 1,然后在 if 判断中对 _dec_count - 1 并返回 _dec_count 初值,即 1;因为 1 == 1,表示当前仅有线程 t2 引用目标节点,可以直接删除。

  • 第二种可能:

    线程 t2 已经执行了 7 处代码,此时 _dec_count 的默认初始值为 0,线程 t2_dec_count 减 1 ,并返回 0;因为 0 != 1,所以线程 t2 不会将数据内容删除。

    线程 t1 在线程 t2 执行完 7 处代码后执行 6 处代码,此时 _dec_count 的值被线程 t2 修改为 -1,然后在 if 判断中将 _dec_countincrease_count,并返回 -1;因为 -1 == -1,此时仅有线程 t1 引用了目标节点,线程 t1 负责将节点删除。

headstd::atomic<ref_node> 类型,old_headnew_headref_node 类型。head 的修改能及时反映到不同线程上(在局一致性内存序下)

3. 本节的一些理解

为什么需要两个计数器,一个是否足够?

如果只用一个计数器,我们首先要考虑把它放在哪里。直觉上,可能会想到将计数器放在节点内部,但这样是行不通的。因为计数器的目的是避免空悬指针解引用(访问已删除的节点),但如果计数器在节点内部,访问计数器本身就需要先访问节点。如果节点已经被删除,这就会导致问题(所以我们将引用计数与指针解耦,将引用计数从结构体提出来,不与指针耦合,将一个结构体拆解成了两个)。

举个场景:线程 A 读取了 head 指针,并存到 local_head 中。这时线程 A 还没来得及通过 local_head 访问节点内部的计数器并增加计数,而线程 B 率先操作了节点的计数器,加了 1,然后弹出了节点,并在操作完成后让计数器减 1,发现计数变成了 0,于是删除了节点。此时,线程 A 再试图通过 local_head 访问节点时,节点已经被删除了,出现了空悬指针解引用。

因此,如果只使用一个计数器,它就不能放在节点内部,而需要放在节点外部。计数器需要和 head 指针处于同一“层级”(即数据结构体node和ref_node相互关联,我们通过ref_node->node_ptr即可访问数据内容,通过node->next即可访问下一个节点的计数器),成为所有线程都能直接操作的数据。

为了实现这一点,我们可以把计数器和 head 指针合二为一。这样,读取 head 指针和增加计数器的操作可以原子地进行,避免在这两步之间出现时间差(如果计数器和 head 是两个独立变量,操作之间无法保证原子性)。理想情况下,每当有一个指针指向 head 节点,计数器加 1;当指针不再指向该节点时,计数器减 1。但这种设计在实现时会遇到复杂问题。使用一个计数器无法实现,我们需要使用两个计数器来实现。

为什么一个计数器无法实现?

设想以下过程:

  1. 线程 A 加载 headnew_head(类型为 ref_node),如果 head 没有变化,就增加 new_head 的计数器,并更新 head 的计数器。最后将 new_head 的计数器赋值给 old_head,此时,三者的计数器都变为 2。注意,这些操作发生在线程 A 访问节点之前,因此确保节点不会被删除。
  2. 在线程 A 还没来得及访问节点时,线程 B 也加载了 head 到它的 local_head。线程 B 增加了计数器,此时计数器值变为 3(表明两个线程正在引用该节点,在此之前线程B会经历一次CAS,因为head已经被线程A修改,所以线程B的old_head会读取head最新的值,此时线程B即可读取到线程A对引用计数修改的最新值,再+1会变为3)。接着,线程 B 将 head 修改为指向下一个节点,并让 head 的计数器减 2,计数器变回 1。
  3. 线程 B 完成操作后退出,此时节点只被线程A引用,计数器保持为 1。线程 A 继续操作,但发现 old_head(3) 和当前的 head(1) 已不一致,放弃弹出该节点,并进入 else 分支让减计数器减 1,计数器变为 0。

问题是,上述过程仅仅是在两个计数器的共同配合下才会实现的。如果仅有一个计数器,那么计数器永远不会减到 0,导致节点永远不会被删除。这是因为不同线程对计数器的操作是独立的,无法相互感知。

为什么需要两个计数器?

为了解决这个问题,计数器必须与节点绑定,且每个节点只能有一个全局计数器,就像 shared_ptr 的设计一样。然而,shared_ptr 无法实现对 head 的无锁、原子性操作,因此必须引入第二个计数器。

两个计数器的设计允许:

  • 一个计数器跟随 head,用于线程安全地读取和操作(增计数器)。
  • 另一个计数器绑定到节点,用于判断节点是否可以安全删除(减计数器)。

这样的设计虽然复杂,但能确保计数的正确性和节点的安全性。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/942031.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

大恒相机开发(2)—Python软触发调用采集图像

大恒相机开发&#xff08;2&#xff09;—Python软触发调用采集图像 完整代码详细解读和功能说明扩展学习 这段代码是一个Python程序&#xff0c;用于从大恒相机采集图像&#xff0c;通过软件触发来采集图像。 完整代码 咱们直接上python的完整代码&#xff1a; # version:…

步进电机直线插补

基础原理 代码部分

数据结构经典算法总复习(上卷)

第一章&#xff1a;数据结构导论 无重要考点&#xff0c;仅需了解时间复杂度。 第二章&#xff1a;线性表 1.获得线性表第i个元素 void GetElem_sq(SqList L, int i, ElemType &e) {if (i<1 || i>L.length) ErrorMsg("Invalid i value"); //注意错误监…

Windows11 安装 Ubuntu-20.04,同时安装配置 zsh shell,配置 git 别名(alias),大大提高开发效率

背景&#xff1a;家里配置了一台 Windows 电脑&#xff0c;有时候需要用到 vscode 开发测试一些代码&#xff0c;在使用过程中发现原生 windows 敲代码不是很友好&#xff0c;于是想到配置 wsl&#xff0c;安装 Ubuntu&#xff0c;并安装配置 zsh shell&#xff0c;同时配置 gi…

PE文件结构

PE文件是Windows系统下可执行文件的总称&#xff0c;英文全称 Portable Executable 可移植的可执行文件&#xff0c;常见的有exe、dll、sys、com、ocx 对于学习反&#xff08;木马、免杀、病毒、外挂、内核&#xff09;&#xff0c;了解PE文件结构是非常有必要且非常非常重要的…

Helm 官方脚本

Helm 官方脚本 #!/usr/bin/env bash# Copyright The Helm Authors. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # …

JSON 系列之1:将 JSON 数据存储在 Oracle 数据库中

本文为Oracle数据库JSON学习系列的第一篇&#xff0c;讲述如何将JSON文档存储到数据库中&#xff0c;包括了版本为19c和23ai的情形。 19c中的JSON 先来看一下数据库版本为19c时的情形。 创建表colortab&#xff0c;其中color列的长度设为4000。若color的长度需要设为32767&a…

C语言-结构体内存大小

#include <stdio.h> #include <string.h> struct S1 { char a;//1 int b;//4 char c;//1 }; //分析 默认对齐数 成员对齐数 对齐数(前两个最小值) 最大对齐数 // 8 1 …

PyTorch 神经网络回归(Regression)任务:关系拟合与优化过程

PyTorch 神经网络回归&#xff08;Regression&#xff09;任务&#xff1a;关系拟合与优化过程 本教程介绍了如何使用 PyTorch 构建一个简单的神经网络来实现关系拟合&#xff0c;具体演示了从数据准备到模型训练和可视化的完整过程。首先&#xff0c;利用一维线性空间生成带噪…

渐开线齿轮和摆线齿轮有什么区别?

摆线齿形与渐开线齿形的区别 虽然在比对这两种齿形&#xff0c;但有一个事情希望大家注意&#xff1a;渐开线齿轮只是摆线齿轮的一个特例。 &#xff08;1&#xff09;摆线齿形的压力角在啮合开始时最大&#xff0c;在齿节点减小到零&#xff0c;在啮合结束时再次增大到最大…

Debian 12 安装配置 fail2ban 保护 SSH 访问

背景介绍 双十一的时候薅羊毛租了台腾讯云的虚机, 是真便宜, 只是没想到才跑了一个月, 系统里面就收集到了巨多的 SSH 恶意登录失败记录. 只能说, 互联网真的是太不安全了. 之前有用过 fail2ban 在 CentOS 7 上面做过防护, 不过那已经是好久好久之前的故事了, 好多方法已经不…

Vulhub靶场Apache解析漏洞

一.apache_parsing 原理&#xff1a;Apache HTTPD ⽀持⼀个⽂件拥有多个后缀&#xff0c;并为不同后缀执⾏不同的指令。在Apache1.x/2.x中Apache 解析⽂件的规则是从右到左开始判断解析,如果后缀名为不可识别⽂件解析,就再往左判断。如 1.php.xxxxx 打开靶场 创建一个名为1.p…

MATLAB 抛物线拟合(Quadratic,二维)

文章目录 一、简介二、实现代码三、实现效果参考资料一、简介 这里仍然是最小二乘法的应用,其推导过程如下所述: 1.二次函数模型: 其中,a、b 和 c 是需要确定的参数。 2.最小二乘法 假设我们有一组数据点 ( x 1 ​ , y 1

重温设计模式--原型模式

文章目录 原型模式定义原型模式UML图优点缺点使用场景C 代码示例深拷贝、浅拷贝 原型模式定义 用原型实例指定创建对象的种类&#xff0c;并且通过拷贝这些原型创建新的对象&#xff1b; 核心中的核心就是 克隆clone ,后面讲 原型模式是一种创建型设计模式&#xff0c;它的主要…

mac iterm2 使用 lrzsz

前言 mac os 终端不支持使用 rz sz 上传下载文件&#xff0c;本文提供解决方法。 mac 上安装 brew install lrzsz两个脚本 注意&#xff1a;/usr/local/bin/iterm2-send-zmodem.sh 中的 sz命令路径要和你mac 上 sz 命令路径一致。 /usr/local/bin/iterm2-recv-zmodem.sh 中…

【基础篇】1. JasperSoft Studio编辑器与报表属性介绍

编辑器介绍 Jaspersoft Studio有一个多选项卡编辑器&#xff0c;其中包括三个标签&#xff1a;设计&#xff0c;源代码和预览。 Design&#xff1a;报表设计页面&#xff0c;可以图形化拖拉组件设计报表&#xff0c;打开报表文件的主页面Source&#xff1a;源代码页码&#xff…

【magic-dash】01:magic-dash创建单页面应用及二次开发

文章目录 一、magic-dash是什么1.1 安装1.2 使用1.2.1 查看内置项目模板1.2.2 生成指定项目模板1.2.3 查看当前magic-dash版本1.2.4 查看命令说明1.2.5 内置模板列表二、创建虚拟环境并安装magic-dash三、magic-dash单页工具应用开发3.1 创建单页面项目3.1.1 使用命令行创建单页…

《Pytorch框架CV开发-从入门到实战》

目录 1.环境部署2.自动梯度计算张量 tensor3.线性回归4.逻辑回归6.人工神经网络的基本概念6.1 感知器6.2 激活函数6.3多层感知器6.4 反向传播算法——前向传播6.5 反向传播算法——反向传播6.6 反向传播算法——训练方法7.Pytorch基础数据集8.手写数字识别人工神经网络训练8.1 …

WebRTC学习二:WebRTC音视频数据采集

系列文章目录 第一篇 基于SRS 的 WebRTC 环境搭建 第二篇 基于SRS 实现RTSP接入与WebRTC播放 第三篇 centos下基于ZLMediaKit 的WebRTC 环境搭建 第四篇 WebRTC 学习一&#xff1a;获取音频和视频设备 第五篇 WebRTC学习二&#xff1a;WebRTC音视频数据采集 文章目录 系列文章…

国自然联合项目|影像组学智能分析理论与关键技术|基金申请·24-12-25

小罗碎碎念 该项目为国自然联合基金项目&#xff0c;执行年限为2019年1月至2022年12月&#xff0c;直接费用为204万元。 项目研究内容包括影像组学分析、智能计算、医疗风险评估等&#xff0c;旨在通过模拟医生诊断过程&#xff0c;推动人工智能在医疗领域的创新。 项目取得了…