目录
- 1 概述
- 2 实现
- 3 测试
- 4 运行
1 概述
最近研究了C++11的并发编程的线程/互斥/锁/条件变量,利用互斥/锁/条件变量实现一个支持多线程并发的环形队列,队列大小通过模板参数传递。
环形队列是一个模板类,有两个模块参数,参数1是元素类型,参数2是队列大小,默认是10。入队操作如果队列满阻塞,出队操作如果队列为空则阻塞。
其类图为:
2 实现
#ifndef RING_QUEUE_H
#define RING_QUEUE_H
#include <mutex>
#include <condition_variable>
template<typename T, std::size_t N = 10>
class ring_queue
{
public:
typedef T value_type;
typedef std::size_t size_type;
typedef std::size_t pos_type;
typedef typename std::unique_lock<std::mutex> lock_type;
ring_queue() { static_assert(N != 0); }
ring_queue(ring_queue const&) = delete;
ring_queue(ring_queue&& ) = delete;
ring_queue& operator = (ring_queue const&) = delete;
ring_queue& operator = (ring_queue &&) = delete;
size_type spaces() const { return N; }
bool empty() const
{
lock_type lock(mutex_);
return read_pos_ == write_pos_;
}
size_type size() const
{
lock_type lock(mutex_);
return N - space_size_;
}
void push(value_type const& value)
{
{
lock_type lock(mutex_);
while(!space_size_)
write_cv_.wait(lock);
queue_[write_pos_] = value;
--space_size_;
write_pos_ = next_pos(write_pos_);
}
read_cv_.notify_one();
}
void push(value_type && value)
{
{
lock_type lock(mutex_);
while(!space_size_)
write_cv_.wait(lock);
queue_[write_pos_] = std::move(value);
--space_size_;
write_pos_ = next_pos(write_pos_);
}
read_cv_.notify_one();
}
value_type pop()
{
value_type value;
{
lock_type lock(mutex_);
while(N == space_size_)
read_cv_.wait(lock);
value = std::move(queue_[read_pos_]);
++space_size_;
read_pos_ = next_pos(read_pos_);
}
write_cv_.notify_one();
return value;
}
private:
pos_type next_pos(pos_type pos) { return (pos + 1) % N; }
private:
value_type queue_[N];
pos_type read_pos_ = 0;
pos_type write_pos_ = 0;
size_type space_size_ = N;
std::mutex mutex_;
std::condition_variable write_cv_;
std::condition_variable read_cv_;
};
#endif
说明:
- 实现利用了一个固定大小数组/一个读位置/一个写位置/互斥/写条件变量/读条件变量/空间大小变量。
- 两个入队接口:
- push(T const&) 左值入队
- push(T &&) 左值入队
- 一个出队接口
- pop()
3 测试
基于cpptest的测试代码如下:
struct Function4RingQueue
{
ring_queue<std::string, 2> queue;
std::mutex mutex;
int counter = 0;
void consume1(size_t n)
{
std::cerr << "\n";
for(size_t i = 0; i < n; ++i)
{
std::cerr << "I get a " << queue.pop() << std::endl;
counter++;
}
}
void consume2(size_t id)
{
std::string fruit = queue.pop();
{
std::unique_lock<std::mutex> lock(mutex);
std::cerr << "\nI get a " << fruit << " in thread(" << id << ")" << std::endl;
counter++;
}
}
void product1(std::vector<std::string> & fruits)
{
for(auto const& fruit: fruits)
queue.push(fruit + std::string(" pie"));
}
void product2(std::vector<std::string> & fruits)
{
for(auto const& fruit: fruits)
queue.push(fruit);
}
};
void RingQueueSuite::one_to_one()
{
Function4RingQueue function;
std::vector<std::string> fruits{"Apple", "Banana", "Pear", "Plum", "Pineapple"};
std::thread threads[2];
threads[0] = std::thread(&Function4RingQueue::product1, std::ref(function), std::ref(fruits));
threads[1] = std::thread(&Function4RingQueue::consume1, std::ref(function), fruits.size());
for(auto &thread : threads)
thread.join();
TEST_ASSERT_EQUALS(fruits.size(), function.counter)
function.counter = 0;
threads[0] = std::thread(&Function4RingQueue::product2, std::ref(function), std::ref(fruits));
threads[1] = std::thread(&Function4RingQueue::consume1, std::ref(function), fruits.size());
for(auto &thread : threads)
thread.join();
TEST_ASSERT_EQUALS(fruits.size(), function.counter)
}
void RingQueueSuite::one_to_multi()
{
Function4RingQueue function;
std::vector<std::string> fruits{"Apple", "Banana", "Pear", "Plum", "Pineapple"};
std::thread product;
std::vector<std::thread> consumes(fruits.size());
for(size_t i = 0; i < consumes.size(); ++i)
consumes[i] = std::thread(&Function4RingQueue::consume2, std::ref(function), i);
product = std::thread(&Function4RingQueue::product1, std::ref(function), std::ref(fruits));
product.join();
for(auto &thread : consumes)
thread.join();
TEST_ASSERT_EQUALS(fruits.size(), function.counter)
function.counter = 0;
for(size_t i = 0; i < consumes.size(); ++i)
consumes[i] = std::thread(&Function4RingQueue::consume2, std::ref(function), i);
product = std::thread(&Function4RingQueue::product2, std::ref(function), std::ref(fruits));
product.join();
for(auto &thread : consumes)
thread.join();
TEST_ASSERT_EQUALS(fruits.size(), function.counter)
}
- 函数one_to_one测试一个生成者对应一个消费者。
- 函数one_to_multi测试一个生产者对应多个消费者。
4 运行
RingQueueSuite: 0/2
I get a Apple pie
I get a Banana pie
I get a Pear pie
I get a Plum pie
I get a Pineapple pie
I get a Apple
I get a Banana
I get a Pear
I get a Plum
I get a Pineapple
RingQueueSuite: 1/2
I get a Apple pie in thread(1)
I get a Banana pie in thread(0)
I get a Pear pie in thread(2)
I get a Plum pie in thread(4)
I get a Pineapple pie in thread(3)
I get a Apple in thread(0)
I get a Banana in thread(1)
I get a Plum in thread(3)
I get a Pear in thread(2)
I get a Pineapple in thread(4)
RingQueueSuite: 2/2, 100% correct in 0.007452 seconds
Total: 2 tests, 100% correct in 0.007452 seconds
分析:
- 从结果看入队顺序和出队顺序是一致的。