目录
1 线程安全队列封装一
2 线程安全队列封装二
3 线程安全队列封装三
1 线程安全队列封装一
/**
* ============================================================================
*
* Copyright (c) Huawei Technologies Co., Ltd. 2020-2022. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1 Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
*
* 2 Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* 3 Neither the names of the copyright holders nor the names of the
* contributors may be used to endorse or promote products derived from this
* software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
* ============================================================================
*/
#ifndef THREAD_SAFE_QUEUE_H
#define THREAD_SAFE_QUEUE_H
#include <mutex>
#include <queue>
namespace acllite {
template<typename T>
class ThreadSafeQueue {
public:
/**
* @brief ThreadSafeQueue constructor
* @param [in] capacity: the queue capacity
*/
ThreadSafeQueue(uint32_t capacity) {
// check the input value: capacity is valid
if (capacity >= kMinQueueCapacity && capacity <= kMaxQueueCapacity) {
queueCapacity = capacity;
}
else { // the input value: capacity is invalid, set the default value
queueCapacity = kDefaultQueueCapacity;
}
}
/**
* @brief ThreadSafeQueue constructor
*/
ThreadSafeQueue() {
queueCapacity = kDefaultQueueCapacity;
}
/**
* @brief ThreadSafeQueue destructor
*/
~ThreadSafeQueue() = default;
/**
* @brief push data to queue
* @param [in] input_value: the value will push to the queue
* @return true: success to push data; false: fail to push data
*/
bool Push(T input_value) {
std::lock_guard<std::mutex> lock(mutex_);
// check current size is less than capacity
if (queue_.size() < queueCapacity) {
queue_.push(input_value);
return true;
}
return false;
}
/**
* @brief pop data from queue
* @return true: success to pop data; false: fail to pop data
*/
T Pop() {
std::lock_guard<std::mutex> lock(mutex_);
if (queue_.empty()) { // check the queue is empty
return nullptr;
}
T tmp_ptr = queue_.front();
queue_.pop();
return tmp_ptr;
}
/**
* @brief check the queue is empty
* @return true: the queue is empty; false: the queue is not empty
*/
bool Empty() {
std::lock_guard<std::mutex> lock(mutex_);
return queue_.empty();
}
/**
* @brief get the queue size
* @return the queue size
*/
uint32_t Size() {
std::lock_guard<std::mutex> lock(mutex_);
return queue_.size();
}
void ExtendCapacity(uint32_t newSize) {
queueCapacity = newSize;
kMaxQueueCapacity = newSize > kMaxQueueCapacity ? newSize : kMaxQueueCapacity;
}
private:
std::queue<T> queue_; // the queue
uint32_t queueCapacity; // queue capacity
mutable std::mutex mutex_; // the mutex value
const uint32_t kMinQueueCapacity = 1; // the minimum queue capacity
const uint32_t kMaxQueueCapacity = 10000; // the maximum queue capacity
const uint32_t kDefaultQueueCapacity = 10; // default queue capacity
};
}
#endif /* THREAD_SAFE_QUEUE_H */
注意使用的时候,队列中要保存的是指针形式,例如
acllite::ThreadSafeQueue<frame_info_user_data *> videoFrameQueue_;
假如你这样写代码,那么会报错
acllite::ThreadSafeQueue<frame_info_user_data *> videoFrameQueue_; //error
编译报下面的错误
src/ThreadSafeQueue.h: In instantiation of ‘T acllite::ThreadSafeQueue<T>::Pop() [with T = acllite::frame_info_user_data]’:
src/VdecHelperV2.cpp:213:51: required from here
src/ThreadSafeQueue.h:96:24: error: could not convert ‘nullptr’ from ‘std::nullptr_t’ to ‘acllite::frame_info_user_data’
96 | return nullptr;
| ^~~~~~~
| |
| std::nullptr_t
2 线程安全队列封装二
/*************************************************************************
* Copyright (C) [2019] by Cambricon, Inc. All rights reserved
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
* OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*************************************************************************/
#ifndef CNSTREAM_THREADSAFE_QUEUE_HPP_
#define CNSTREAM_THREADSAFE_QUEUE_HPP_
#include <condition_variable>
#include <mutex>
#include <queue>
namespace cnstream {
template <typename T>
class ThreadSafeQueue {
public:
ThreadSafeQueue() = default;
ThreadSafeQueue(const ThreadSafeQueue& other) = delete;
ThreadSafeQueue& operator=(const ThreadSafeQueue& other) = delete;
bool TryPop(T& value); // NOLINT
void WaitAndPop(T& value); // NOLINT
bool WaitAndTryPop(T& value, const std::chrono::microseconds rel_time); // NOLINT
void Push(const T& new_value); // NOLINT
bool Empty() {
std::lock_guard<std::mutex> lk(data_m_);
return q_.empty();
}
uint32_t Size() {
std::lock_guard<std::mutex> lk(data_m_);
return q_.size();
}
private:
std::mutex data_m_;
std::queue<T> q_;
std::condition_variable notempty_cond_;
};
template <typename T>
bool ThreadSafeQueue<T>::TryPop(T& value) { // NOLINT
std::lock_guard<std::mutex> lk(data_m_);
if (q_.empty()) {
return false;
} else {
value = q_.front();
q_.pop();
return true;
}
}
template <typename T>
void ThreadSafeQueue<T>::WaitAndPop(T& value) { // NOLINT
std::unique_lock<std::mutex> lk(data_m_);
notempty_cond_.wait(lk, [&] { return !q_.empty(); });
value = q_.front();
q_.pop();
}
template <typename T>
bool ThreadSafeQueue<T>::WaitAndTryPop(T& value, const std::chrono::microseconds rel_time) { // NOLINT
std::unique_lock<std::mutex> lk(data_m_);
if (notempty_cond_.wait_for(lk, rel_time, [&] { return !q_.empty(); })) {
value = q_.front();
q_.pop();
return true;
} else {
return false;
}
}
template <typename T>
void ThreadSafeQueue<T>::Push(const T& new_value) {
std::unique_lock<std::mutex> lk(data_m_);
q_.push(new_value);
lk.unlock();
notempty_cond_.notify_one();
}
} // namespace cnstream
#endif // CNSTREAM_THREADSAFE_QUEUE_HPP_
3 线程安全队列封装三
/*************************************************************************
* Copyright (C) [2020] by Cambricon, Inc. All rights reserved
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
* OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*************************************************************************/
#ifndef INFER_SERVER_UTIL_THREADSAFE_QUEUE_H_
#define INFER_SERVER_UTIL_THREADSAFE_QUEUE_H_
#include <condition_variable>
#include <functional>
#include <mutex>
#include <queue>
#include <utility>
#include <vector>
namespace infer_server {
/**
* @brief Thread-safe queue
*
* @tparam T Type of stored elements
* @tparam Q Type of underlying container to store the elements, which acts as queue,
* `std::queue` and `std::priority_queue` satisfy the requirements
*/
template <typename T, typename Container = std::queue<T>>
class ThreadSafeQueue {
public:
/// type of container
using queue_type = typename std::enable_if<std::is_same<typename Container::value_type, T>::value, Container>::type;
/// type of elements
using value_type = T;
/// Container::size_type
using size_type = typename Container::size_type;
/**
* @brief Construct a new Thread Safe Queue object
*/
ThreadSafeQueue() = default;
/**
* @brief Try to pop an element from queue
*
* @param value An element
* @retval true Succeed
* @retval false Fail, no element stored in queue
*/
bool TryPop(T& value); // NOLINT
/**
* @brief Try to pop an element from queue, wait for `rel_time` if queue is empty
*
* @param value An element
* @param rel_time Maximum duration to block for
* @retval true Succeed
* @retval false Timeout
*/
bool WaitAndTryPop(T& value, const std::chrono::microseconds rel_time); // NOLINT
/**
* @brief Pushes the given element value to the end of the queue
*
* @param new_value the value of the element to push
*/
void Push(const T& new_value) {
std::lock_guard<std::mutex> lk(data_m_);
q_.push(new_value);
notempty_cond_.notify_one();
}
/**
* @brief Pushes the given element value to the end of the queue
*
* @param new_value the value of the element to push
*/
void Push(T&& new_value) {
std::lock_guard<std::mutex> lk(data_m_);
q_.push(std::move(new_value));
notempty_cond_.notify_one();
}
/**
* @brief Pushes a new element to the end of the queue. The element is constructed in-place.
*
* @tparam Arguments Type of arguments to forward to the constructor of the element
* @param args Arguments to forward to the constructor of the element
*/
template <typename... Arguments>
void Emplace(Arguments&&... args) {
std::lock_guard<std::mutex> lk(data_m_);
q_.emplace(std::forward<Arguments>(args)...);
notempty_cond_.notify_one();
}
/**
* @brief Checks if the underlying container has no elements
*
* @retval true If the underlying container is empty
* @retval false Otherwise
*/
bool Empty() {
std::lock_guard<std::mutex> lk(data_m_);
return q_.empty();
}
/**
* @brief Returns the number of elements in the underlying container
*
* @return size_type The number of elements in the container
*/
size_type Size() {
std::lock_guard<std::mutex> lk(data_m_);
return q_.size();
}
private:
ThreadSafeQueue(const ThreadSafeQueue& other) = delete;
ThreadSafeQueue& operator=(const ThreadSafeQueue& other) = delete;
std::mutex data_m_;
queue_type q_;
std::condition_variable notempty_cond_;
}; // class ThreadSafeQueue
namespace detail {
template <typename T, typename = typename std::enable_if<!std::is_move_assignable<T>::value>::type>
inline void GetFrontAndPop(std::queue<T>* q_, T* value) {
*value = q_->front();
q_->pop();
}
template <typename T, typename Container = std::vector<T>, typename Compare = std::less<T>,
typename = typename std::enable_if<!std::is_move_assignable<T>::value>::type>
inline void GetFrontAndPop(std::priority_queue<T, Container, Compare>* q_, T* value) {
*value = q_->top();
q_->pop();
}
template <typename T>
inline void GetFrontAndPop(std::queue<T>* q_, T* value) {
*value = std::move(q_->front());
q_->pop();
}
template <typename T, typename Container = std::vector<T>, typename Compare = std::less<T>>
inline void GetFrontAndPop(std::priority_queue<T, Container, Compare>* q_, T* value) {
// cut off const to enable move
*value = std::move(const_cast<T&>(q_->top()));
q_->pop();
}
} // namespace detail
template <typename T, typename Q>
bool ThreadSafeQueue<T, Q>::TryPop(T& value) { // NOLINT
std::lock_guard<std::mutex> lk(data_m_);
if (q_.empty()) {
return false;
}
detail::GetFrontAndPop<T>(&q_, &value);
return true;
}
template <typename T, typename Q>
bool ThreadSafeQueue<T, Q>::WaitAndTryPop(T& value, const std::chrono::microseconds rel_time) { // NOLINT
std::unique_lock<std::mutex> lk(data_m_);
if (notempty_cond_.wait_for(lk, rel_time, [&] { return !q_.empty(); })) {
detail::GetFrontAndPop<T>(&q_, &value);
return true;
} else {
return false;
}
}
/**
* @brief Alias of ThreadSafeQueue<T, std::queue<T>>
*
* @tparam T Type of stored elements
*/
template <typename T>
using TSQueue = ThreadSafeQueue<T, std::queue<T>>;
/**
* @brief Alias of ThreadSafeQueue<T, std::priority_queue<T>>
*
* @tparam T Type of stored elements
*/
template <typename T>
using TSPriorityQueue = ThreadSafeQueue<T, std::priority_queue<T>>;
} // namespace infer_server
#endif // INFER_SERVER_UTIL_THREADSAFE_QUEUE_H_