C++的线程安全队列模板类封装

目录

1  线程安全队列封装一

2  线程安全队列封装二

3  线程安全队列封装三


1  线程安全队列封装一

/**
 * ============================================================================
 *
 * Copyright (c) Huawei Technologies Co., Ltd. 2020-2022. All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions are met:
 *
 *   1 Redistributions of source code must retain the above copyright notice,
 *     this list of conditions and the following disclaimer.
 *
 *   2 Redistributions in binary form must reproduce the above copyright notice,
 *     this list of conditions and the following disclaimer in the documentation
 *     and/or other materials provided with the distribution.
 *
 *   3 Neither the names of the copyright holders nor the names of the
 *   contributors may be used to endorse or promote products derived from this
 *   software without specific prior written permission.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 * POSSIBILITY OF SUCH DAMAGE.
 * ============================================================================
 */

#ifndef THREAD_SAFE_QUEUE_H
#define THREAD_SAFE_QUEUE_H

#include <mutex>
#include <queue>

namespace acllite {

    template<typename T>
    class ThreadSafeQueue {
    public:

        /**
         * @brief ThreadSafeQueue constructor
         * @param [in] capacity: the queue capacity
         */
        ThreadSafeQueue(uint32_t capacity) {
            // check the input value: capacity is valid
            if (capacity >= kMinQueueCapacity && capacity <= kMaxQueueCapacity) {
                queueCapacity = capacity;
            }
            else { // the input value: capacity is invalid, set the default value
                queueCapacity = kDefaultQueueCapacity;
            }
        }

        /**
         * @brief ThreadSafeQueue constructor
         */
        ThreadSafeQueue() {
            queueCapacity = kDefaultQueueCapacity;
        }

        /**
         * @brief ThreadSafeQueue destructor
         */
        ~ThreadSafeQueue() = default;

        /**
         * @brief push data to queue
         * @param [in] input_value: the value will push to the queue
         * @return true: success to push data; false: fail to push data
         */
        bool Push(T input_value) {
            std::lock_guard<std::mutex> lock(mutex_);

            // check current size is less than capacity
            if (queue_.size() < queueCapacity) {
                queue_.push(input_value);
                return true;
            }

            return false;
        }

        /**
         * @brief pop data from queue
         * @return true: success to pop data; false: fail to pop data
         */
        T Pop() {
            std::lock_guard<std::mutex> lock(mutex_);
            if (queue_.empty()) { // check the queue is empty
                return nullptr;
            }

            T tmp_ptr = queue_.front();
            queue_.pop();
            return tmp_ptr;
        }

        /**
         * @brief check the queue is empty
         * @return true: the queue is empty; false: the queue is not empty
         */
        bool Empty() {
            std::lock_guard<std::mutex> lock(mutex_);
            return queue_.empty();
        }

        /**
         * @brief get the queue size
         * @return the queue size
         */
        uint32_t Size() {
            std::lock_guard<std::mutex> lock(mutex_);
            return queue_.size();
        }

        void ExtendCapacity(uint32_t newSize) {
            queueCapacity = newSize;
            kMaxQueueCapacity = newSize > kMaxQueueCapacity ? newSize : kMaxQueueCapacity;
        }

    private:
        std::queue<T> queue_; // the queue
        uint32_t queueCapacity; // queue capacity
        mutable std::mutex mutex_; // the mutex value
        const uint32_t kMinQueueCapacity = 1; // the minimum queue capacity
        const uint32_t kMaxQueueCapacity = 10000; // the maximum queue capacity
        const uint32_t kDefaultQueueCapacity = 10; // default queue capacity
    };
}
#endif /* THREAD_SAFE_QUEUE_H */

注意使用的时候,队列中要保存的是指针形式,例如

acllite::ThreadSafeQueue<frame_info_user_data *> videoFrameQueue_;

假如你这样写代码,那么会报错

acllite::ThreadSafeQueue<frame_info_user_data *> videoFrameQueue_;  //error

 编译报下面的错误

 src/ThreadSafeQueue.h: In instantiation of ‘T acllite::ThreadSafeQueue<T>::Pop() [with T = acllite::frame_info_user_data]’:
src/VdecHelperV2.cpp:213:51:   required from here
src/ThreadSafeQueue.h:96:24: error: could not convert ‘nullptr’ from ‘std::nullptr_t’ to ‘acllite::frame_info_user_data’
   96 |                 return nullptr;
      |                        ^~~~~~~
      |                        |
      |                        std::nullptr_t

2  线程安全队列封装二

/*************************************************************************
 * Copyright (C) [2019] by Cambricon, Inc. All rights reserved
 *
 *  Licensed under the Apache License, Version 2.0 (the "License");
 *  you may not use this file except in compliance with the License.
 *  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * The above copyright notice and this permission notice shall be included in
 * all copies or substantial portions of the Software.
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
 * OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
 * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
 * THE SOFTWARE.
 *************************************************************************/

#ifndef CNSTREAM_THREADSAFE_QUEUE_HPP_
#define CNSTREAM_THREADSAFE_QUEUE_HPP_

#include <condition_variable>
#include <mutex>
#include <queue>

namespace cnstream {

template <typename T>
class ThreadSafeQueue {
 public:
  ThreadSafeQueue() = default;
  ThreadSafeQueue(const ThreadSafeQueue& other) = delete;
  ThreadSafeQueue& operator=(const ThreadSafeQueue& other) = delete;

  bool TryPop(T& value);  // NOLINT

  void WaitAndPop(T& value);  // NOLINT

  bool WaitAndTryPop(T& value, const std::chrono::microseconds rel_time);  // NOLINT

  void Push(const T& new_value);  // NOLINT

  bool Empty() {
    std::lock_guard<std::mutex> lk(data_m_);
    return q_.empty();
  }

  uint32_t Size() {
    std::lock_guard<std::mutex> lk(data_m_);
    return q_.size();
  }

 private:
  std::mutex data_m_;
  std::queue<T> q_;
  std::condition_variable notempty_cond_;
};

template <typename T>
bool ThreadSafeQueue<T>::TryPop(T& value) {  // NOLINT
  std::lock_guard<std::mutex> lk(data_m_);
  if (q_.empty()) {
    return false;
  } else {
    value = q_.front();
    q_.pop();
    return true;
  }
}

template <typename T>
void ThreadSafeQueue<T>::WaitAndPop(T& value) {  // NOLINT
  std::unique_lock<std::mutex> lk(data_m_);
  notempty_cond_.wait(lk, [&] { return !q_.empty(); });
  value = q_.front();
  q_.pop();
}

template <typename T>
bool ThreadSafeQueue<T>::WaitAndTryPop(T& value, const std::chrono::microseconds rel_time) {  // NOLINT
  std::unique_lock<std::mutex> lk(data_m_);
  if (notempty_cond_.wait_for(lk, rel_time, [&] { return !q_.empty(); })) {
    value = q_.front();
    q_.pop();
    return true;
  } else {
    return false;
  }
}

template <typename T>
void ThreadSafeQueue<T>::Push(const T& new_value) {
  std::unique_lock<std::mutex> lk(data_m_);
  q_.push(new_value);
  lk.unlock();
  notempty_cond_.notify_one();
}

}  // namespace cnstream

#endif  // CNSTREAM_THREADSAFE_QUEUE_HPP_

3  线程安全队列封装三

/*************************************************************************
 * Copyright (C) [2020] by Cambricon, Inc. All rights reserved
 *
 *  Licensed under the Apache License, Version 2.0 (the "License");
 *  you may not use this file except in compliance with the License.
 *  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * The above copyright notice and this permission notice shall be included in
 * all copies or substantial portions of the Software.
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
 * OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
 * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
 * THE SOFTWARE.
 *************************************************************************/

#ifndef INFER_SERVER_UTIL_THREADSAFE_QUEUE_H_
#define INFER_SERVER_UTIL_THREADSAFE_QUEUE_H_

#include <condition_variable>
#include <functional>
#include <mutex>
#include <queue>
#include <utility>
#include <vector>

namespace infer_server {

/**
 * @brief Thread-safe queue
 *
 * @tparam T Type of stored elements
 * @tparam Q Type of underlying container to store the elements, which acts as queue,
 *           `std::queue` and `std::priority_queue` satisfy the requirements
 */
template <typename T, typename Container = std::queue<T>>
class ThreadSafeQueue {
 public:
  /// type of container
  using queue_type = typename std::enable_if<std::is_same<typename Container::value_type, T>::value, Container>::type;
  /// type of elements
  using value_type = T;
  /// Container::size_type
  using size_type = typename Container::size_type;

  /**
   * @brief Construct a new Thread Safe Queue object
   */
  ThreadSafeQueue() = default;

  /**
   * @brief Try to pop an element from queue
   *
   * @param value An element
   * @retval true Succeed
   * @retval false Fail, no element stored in queue
   */
  bool TryPop(T& value);  // NOLINT

  /**
   * @brief Try to pop an element from queue, wait for `rel_time` if queue is empty
   *
   * @param value An element
   * @param rel_time Maximum duration to block for
   * @retval true Succeed
   * @retval false Timeout
   */
  bool WaitAndTryPop(T& value, const std::chrono::microseconds rel_time);  // NOLINT

  /**
   * @brief Pushes the given element value to the end of the queue
   *
   * @param new_value the value of the element to push
   */
  void Push(const T& new_value) {
    std::lock_guard<std::mutex> lk(data_m_);
    q_.push(new_value);
    notempty_cond_.notify_one();
  }

  /**
   * @brief Pushes the given element value to the end of the queue
   *
   * @param new_value the value of the element to push
   */
  void Push(T&& new_value) {
    std::lock_guard<std::mutex> lk(data_m_);
    q_.push(std::move(new_value));
    notempty_cond_.notify_one();
  }

  /**
   * @brief Pushes a new element to the end of the queue. The element is constructed in-place.
   *
   * @tparam Arguments Type of arguments to forward to the constructor of the element
   * @param args Arguments to forward to the constructor of the element
   */
  template <typename... Arguments>
  void Emplace(Arguments&&... args) {
    std::lock_guard<std::mutex> lk(data_m_);
    q_.emplace(std::forward<Arguments>(args)...);
    notempty_cond_.notify_one();
  }

  /**
   * @brief Checks if the underlying container has no elements
   *
   * @retval true If the underlying container is empty
   * @retval false Otherwise
   */
  bool Empty() {
    std::lock_guard<std::mutex> lk(data_m_);
    return q_.empty();
  }

  /**
   * @brief Returns the number of elements in the underlying container
   *
   * @return size_type The number of elements in the container
   */
  size_type Size() {
    std::lock_guard<std::mutex> lk(data_m_);
    return q_.size();
  }

 private:
  ThreadSafeQueue(const ThreadSafeQueue& other) = delete;
  ThreadSafeQueue& operator=(const ThreadSafeQueue& other) = delete;

  std::mutex data_m_;
  queue_type q_;
  std::condition_variable notempty_cond_;
};  // class ThreadSafeQueue

namespace detail {
template <typename T, typename = typename std::enable_if<!std::is_move_assignable<T>::value>::type>
inline void GetFrontAndPop(std::queue<T>* q_, T* value) {
  *value = q_->front();
  q_->pop();
}

template <typename T, typename Container = std::vector<T>, typename Compare = std::less<T>,
          typename = typename std::enable_if<!std::is_move_assignable<T>::value>::type>
inline void GetFrontAndPop(std::priority_queue<T, Container, Compare>* q_, T* value) {
  *value = q_->top();
  q_->pop();
}

template <typename T>
inline void GetFrontAndPop(std::queue<T>* q_, T* value) {
  *value = std::move(q_->front());
  q_->pop();
}

template <typename T, typename Container = std::vector<T>, typename Compare = std::less<T>>
inline void GetFrontAndPop(std::priority_queue<T, Container, Compare>* q_, T* value) {
  // cut off const to enable move
  *value = std::move(const_cast<T&>(q_->top()));
  q_->pop();
}
}  // namespace detail

template <typename T, typename Q>
bool ThreadSafeQueue<T, Q>::TryPop(T& value) {  // NOLINT
  std::lock_guard<std::mutex> lk(data_m_);
  if (q_.empty()) {
    return false;
  }

  detail::GetFrontAndPop<T>(&q_, &value);
  return true;
}

template <typename T, typename Q>
bool ThreadSafeQueue<T, Q>::WaitAndTryPop(T& value, const std::chrono::microseconds rel_time) {  // NOLINT
  std::unique_lock<std::mutex> lk(data_m_);
  if (notempty_cond_.wait_for(lk, rel_time, [&] { return !q_.empty(); })) {
    detail::GetFrontAndPop<T>(&q_, &value);
    return true;
  } else {
    return false;
  }
}

/**
 * @brief Alias of ThreadSafeQueue<T, std::queue<T>>
 *
 * @tparam T Type of stored elements
 */
template <typename T>
using TSQueue = ThreadSafeQueue<T, std::queue<T>>;

/**
 * @brief Alias of ThreadSafeQueue<T, std::priority_queue<T>>
 *
 * @tparam T Type of stored elements
 */
template <typename T>
using TSPriorityQueue = ThreadSafeQueue<T, std::priority_queue<T>>;

}  // namespace infer_server

#endif  // INFER_SERVER_UTIL_THREADSAFE_QUEUE_H_

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/636652.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

机器学习(五) -- 监督学习(3) -- 决策树

系列文章目录及链接 上篇&#xff1a;机器学习&#xff08;五&#xff09; -- 监督学习&#xff08;2&#xff09; -- 朴素贝叶斯 下篇&#xff1a;机器学习&#xff08;五&#xff09; -- 监督学习&#xff08;4&#xff09; -- 集成学习方法-随机森林 前言 tips&#xff1a…

JAVA面试题大全(九)

1、为什么要使用 spring&#xff1f; 方便解耦&#xff0c;便于开发支持aop编程声明式事务的支持方便程序的测试方便集成各种优秀的框架降低JavaEE API的使用难度 2、解释一下什么是 aop&#xff1f; AOP 是 Aspect-Oriented Programming 的缩写&#xff0c;中文翻译为“面向…

Java CRM客户关系管理系统源码:基于Spring Cloud Alibaba与Spring Boot,专为成长型企业设计

项目名称&#xff1a;CRM客户关系管理系统 功能模块及描述&#xff1a; 一、待办事项 今日需联系客户&#xff1a;显示当日需跟进的客户列表&#xff0c;支持查询和筛选。分配给我的线索&#xff1a;管理分配给用户的线索&#xff0c;包括线索列表和查询功能。分配给我的客户…

EDM图纸管理软件_图纸文档管理软件

图纸文档管理软件是一种用于管理和组织各种类型的图纸和文档的工具。它提供了一种集中存储、查找、共享和版本控制图纸和文档的方式&#xff0c;以便团队成员可以更有效地进行协作和管理。 以下是一些常见的图纸文档管理软件&#xff1a; 彩虹EDM系统&#xff1a;这是一款图纸文…

K8S认证|CKA题库+答案| 5. 创建 Ingress

5 . 创建 Ingress 您必须在以下Cluster/Node上完成此考题&#xff1a; Cluster Master node Worker node k8s master …

java项目之图书管理系统源码(springboot+vue+mysql)

风定落花生&#xff0c;歌声逐流水&#xff0c;大家好我是风歌&#xff0c;混迹在java圈的辛苦码农。今天要和大家聊的是一款基于springboot的图书管理系统。项目源码以及部署相关请联系风歌&#xff0c;文末附上联系信息 。 项目简介&#xff1a; 系统主要分为管理员角色和用…

六种常用设计模式

单例设计模式 单例模式指在整个系统生命周期里&#xff0c;保证一个类只能产生一个实例&#xff0c;确保该类的唯一性。 单例模式分类 单例模式可以分为懒汉式和饿汉式&#xff0c;两者之间的区别在于创建实例的时间不同&#xff1a; 懒汉式&#xff1a;指系统运行中&#…

基于Python实现 HR 分析(逻辑回归和基于树的机器学习)【500010104】

介绍 数据集说明 此数据集包含与员工有关的综合属性集合&#xff0c;从人口统计细节到与工作相关的因素。该分析的主要目的是预测员工流动率并辨别导致员工流失的潜在因素。 在这个数据集中&#xff0c;有14,999行&#xff0c;10列&#xff0c;以及这些变量&#xff1a;满意度…

【Python】 如何使用逗号作为千位分隔符打印数字

基本原理 在Python中&#xff0c;打印数字时自动添加千位分隔符可以提高数字的可读性&#xff0c;尤其是在处理大数字时。Python提供了多种方法来实现这一功能&#xff0c;包括使用内置的format()函数、f-string&#xff08;格式化字符串字面量&#xff09;以及locale模块。 …

数据量较小的表是否有必要添加索引问题分析

目录 前言一、分析前准备1.1、准备测试表和数据1.2、插入测试数据1.3、测试环境说明 二、具体业务分析2.1、单次查询耗时分析2.2、无索引并发查询服务器CPU占用率分析2.3、添加索引并发查询服务器CPU占用率分析 三、总结 前言 在一次节日活动我们系统访问量到达了平时的两倍&am…

50道题目!Python、SQL数据库、AB测试、业务分析、机器学习都在这里了!

介绍 每日一题系列已经更新了50道题目啦&#xff01; 题目难度为初级到中级&#xff0c;涵盖了Python、SQL数据库、AB测试、业务分析、机器学习五大主题&#xff0c;适合初学者和有一定基础的朋友。 原文链接: 50道题目&#xff01;Python、SQL数据库、AB测试、业务分析、机器…

达梦数据库详解

达梦认证是指针对中国数据库管理系统&#xff08;DBMS&#xff09;厂商达梦公司所推出的数据库产品&#xff0c;即达梦数据库&#xff08;DMDB&#xff09;&#xff0c;进行的一种官方认证体系。达梦认证旨在验证数据库管理人员对达梦数据库产品的掌握程度&#xff0c;及其在数…

LoRA:大型语言模型的低秩适应

LoRA 官网 LoRA(Low-Rank Adaptation)出自2021年的论文“LoRA: Low-Rank Adaptation of Large Language Models” 常见的大模型微调方法&#xff1a; Adapter-Tuning、Prefix-Tuning、Prompt-Tuning(P-Tuning)、P-Tuning v2、LoRA。 LoRA技术冻结预训练…

冬奥会|基于SprinBoot+vue的冬奥会科普平台(源码+数据库+文档)

目录 基于SprinBootvue的冬奥会科普平台 一、前言 二、系统设计 三、系统功能设计 1登录注册 2系统功能模块 3管理员功能模块 四、数据库设计 五、核心代码 六、论文参考 七、最新计算机毕设选题推荐 八、源码获取&#xff1a; 博主介绍&#xff1a;✌️大厂码农|…

Discourse 使用 DiscourseConnect 调用接口 admin/users/sync_sso 404 错误

在对用户数据通过 SSO 同步的时候&#xff0c;调用提示 404 错误。 我们使用的是 Java 的代码。 2024-05-23_16-34-421340802 70.3 KB 如上图&#xff0c;返回显示的代码为 404。 问题原因 出现上面错误的原因是安装的 Discourse 实例的 discourse connect 没有启用。 2024-…

【C语言】明析部分C语言内存函数

目录 1.memcpy 2.memmove 3.memset 4.memcmp 以下都是内存函数&#xff0c;作用单位均是字节 1.memcpy memcpy是C/C语言中的一个内存拷贝函数&#xff0c;其原型为&#xff1a; void* memcpy(void* dest, const void* src, size_t n);目标空间&#xff08;字节&#xff09…

作家百度百科怎么做出来的 怎么创建作家百科词条才能通过

创建作家百度百科词条需要遵循一定的步骤&#xff0c;并注意一些关键点&#xff0c;以确保词条能够顺利通过审核。以下是伯乐网络传媒pouquan根据经验结果得出的详细指导&#xff1a; 准备工作 注册百度账号&#xff1a;在创建任何百度百科词条之前&#xff0c;您需要先注册一…

Milvus的内存索引

简介&#xff1a; 这篇文章主要介绍milvus支持的各种内存索引&#xff0c;以及它们最适用的场景&#xff0c;还有用户为了获得更好的搜索性能可以配置的参数。 索引是有效组织数据的过程&#xff0c;它的主要角色是在大的数据集中显著的加速耗时的查询从而有效的进行相似搜索…

常见的100个Shell命令,超级实用!

在大多数的Linux和Unix系统、及其他类Unix系统中&#xff0c;Shell是用户与操作系统内核交互的主要方式。作为一种强大的命令行解释器&#xff0c;它也支持编程功能&#xff0c;用户可以写脚本来处理各种任务。 熟悉shell脚本&#xff0c;首先要对shell指令熟悉&#xff0c;今…

Python图形界面(GUI)Tkinter笔记(八):用【Label()】方法制作九九乘数表

主要是使用"config()"方法来体现函数式、模块化的美好风景。把需随时要修改的控件参数定义在“config()”方法里且把它封装在一个函数中&#xff0c;这时只需对这函数内的“config()”方法作出相应的修改即可&#xff0c;无需对主代码或全部代码重新修一遍。这也是Py…