多线程数据读写顺序处理
一个典型的生产者-消费者模型,在这个模型中,多个工作线程并行处理从共享队列中获取的数据,并将处理结果以保持原始顺序的方式放入另一个队列。
多线程处理模型,具体细节如下:
1.数据:数据里必须有个递增的标识符
和一个结束标识(ending)
2. 读队列(安全队列):用于存放待处理的数据。
-
处理线程:每个线程都是一个死循环
读数据-处理数据-写数据
,它们被编号为1、2、3、4等。这些线程负责从读队列
中取出数据进行处理。线程的结束:判断数据里的ending为true
. -
结果聚合:处理完成后,判断数据的
递增的标识符
,是否为全局的递增的标识符
,如果相等 继续执行。以保持数据的一致性。 -
写队列(安全队列):用于处理好的数据按照读的顺序写入,写入数据到输出队列的顺序是保持一致的。
自定义设计多线程模版:
#include "queuestable.h"
#ifndef QUEUESTABLE_H
#define QUEUESTABLE_H
#include <queue>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <atomic>
//定义一个线程安全的队列
template <typename T>
class QueueStable
{
public:
QueueStable() = default;
QueueStable(const QueueStable<T>&) = delete;
QueueStable& operator=(const QueueStable<T>&) = delete;
QueueStable(unsigned int max_size)
{
m_max_size = max_size;
}
//设置存放数据的最大容量
void set_max_size(unsigned int max_size)
{
m_max_size = max_size;
}
void push(T value)
{
std::unique_lock<std::mutex> lock(m_mutex);
//队列大于20就阻塞
m_cv.wait(lock, [&] { return m_queue.size() < m_max_size; }); //为 true 继续执行,否则,解锁-》等待。唤醒后,加锁
//入队
m_queue.push(std::move(value));
//m_queue.push(value);
//解锁
lock.unlock();
m_cv.notify_one(); //唤醒另一个
}
T pop()
{
//加锁
std::unique_lock<std::mutex> lock(m_mutex);
m_cv.wait(lock, [&] { return !m_queue.empty(); }); //为 true 继续执行,否则,解锁-》等待。唤醒后,加锁
//出队
T data = std::move(m_queue.front());
//T data = m_queue.front();
m_queue.pop();
//解锁
lock.unlock();
m_cv.notify_one(); //唤醒另一个
return data;
}
T front() const
{
std::lock_guard<std::mutex> lock(m_mutex);
return m_queue.front();
}
// bool empty() const
// {
// std::lock_guard<std::mutex> lock(m_mutex);
// return m_queue.empty();
// }
// int size()
// {
// //加锁
// std::lock_guard<std::mutex> lock(m_mutex);
// return m_queue.size();
// }
private:
std::queue<T> m_queue;
std::mutex m_mutex;
std::condition_variable m_cv;
unsigned int m_max_size = 1;
};
//包数据
struct BaseData
{
//`递增的标识符` sequence_number
int64_t sequence_number ; //记录当前位置,保证数据顺序一致
bool ending = false; //true 代表结尾
};
//多线程处理
template <typename T,typename P>
class MultiThreadProcessing
{
static_assert(std::is_base_of<BaseData, T>::value, "T must be derived from BaseData!");
public:
enum class ThreadMode
{
Detach, // 分离
Join // 阻塞
};
MultiThreadProcessing()
{
m_ending = false;
m_thread_mode = ThreadMode::Join;
}
//设置上下文结构使用的数据 - 必须设置
void set_contexts(const std::vector<std::shared_ptr<P>>& contexts)
{
//生成用于线程的结构
for(int i=0; i<contexts.size(); ++i)
{
auto& context = m_contexts.emplace_back();
context.id = i;
context.ext_context = contexts[i];
}
}
//设置运行的处理函数 - 必须设置
void set_execute_function(const std::function<void(const std::shared_ptr<T>&,const std::shared_ptr<P>&)>& function)
{
m_execute_function = function;
}
//设置线程模式
void set_thread_mode(MultiThreadProcessing::ThreadMode thread_mode = ThreadMode::Join)
{
m_thread_mode = thread_mode;
}
//多线程同步检测
void start(const std::shared_ptr<QueueStable<std::shared_ptr<T>>>& read_queue,const std::shared_ptr<QueueStable<std::shared_ptr<T>>>& write_queue)
{
//检查处理函数是否可调用
if (!m_execute_function)
{
// 打印异常信息
std::cerr << "MultiThreadProcessing: Invalid to execute function." << std::endl;
throw std::runtime_error("Invalid to execute function.");
return;
}
std::shared_ptr<DataPacket> data_packer = read_queue->pop();//取出第一个包,查看序号 不处理
m_sequence_number = data_packer->sequence_number;//先找到第一个包的序号
m_ckeck_sequence_number = data_packer->sequence_number;
write_queue->push(data_packer);
m_sequence_number++;
m_ckeck_sequence_number++;
//循环开启线程
for(auto& context : m_contexts)
{
//开启线程
context.thread = std::make_shared<std::thread>([this,&context,&read_queue,&write_queue]()
{
std::cout << "context.id: " << context.id<<" Started." << std::endl;
for(;;)
{
// 读数据 逻辑
//加锁
m_read_mutex.lock();
//为true,末尾标志,不在继续,直接结束
if(m_ending)
{
m_read_mutex.unlock();
break;
}
context.temp_data = read_queue->pop();
//检测是否为递增1
if(m_ckeck_sequence_number.load() == context.temp_data->sequence_number)
{
m_ckeck_sequence_number++;
}
else
{
m_ending = true;
// 打印异常信息
std::cerr << "MultiThreadProcessing: The sequence number must be incremented by one." << std::endl;
throw std::runtime_error("The sequence number must be incremented by one.");
break;
//异常
}
//true 包的末尾,結束
if(context.temp_data->ending)
{
m_ending = true;
std::unique_lock<std::mutex> lock(m_write_mutex);
m_write_cond_var.wait(lock, [&] { return m_sequence_number.load() == context.temp_data->sequence_number; }); //为 true 继续执行,否则,解锁-》等待。唤醒后,加锁
//写入包
write_queue->push(context.temp_data);
//解锁
lock.unlock();
m_write_cond_var.notify_all(); //唤醒所有
break;
}
m_read_mutex.unlock();
//自定义函数处理数据
m_execute_function(context.temp_data,context.ext_context);
//写数据逻辑
std::unique_lock<std::mutex> lock(m_write_mutex);
m_write_cond_var.wait(lock, [&] { return m_sequence_number.load() == context.temp_data->sequence_number; }); //为 true 继续执行,否则,解锁-》等待。唤醒后,加锁
//写入包
write_queue->push(context.temp_data);
m_sequence_number++;
//解锁
lock.unlock();
m_write_cond_var.notify_all(); //唤醒所有
}
std::cout << "context.id: " << context.id<<" Finished." << std::endl;
});
}
//阻塞线程
for(auto& context : m_contexts)
{
if (context.thread->joinable())
{
if(m_thread_mode == ThreadMode::Join)
{
context.thread->join();
}
else if (m_thread_mode == ThreadMode::Detach)
{
context.thread->detach();
}
}
}
}
private:
struct S
{
std::shared_ptr<T> temp_data;
std::shared_ptr<std::thread> thread;
uint32_t id;
std::shared_ptr<P> ext_context; //外部的自定义数据
};
std::vector<S> m_contexts;//每个线程的临时数据,用于多线程临时
//结束标志
std::atomic<bool> m_ending = false;
std::mutex m_read_mutex;//读锁
std::mutex m_write_mutex;//写锁
std::condition_variable m_write_cond_var;//写条件变量 同步数据顺序
std::atomic<int64_t> m_sequence_number;//保证数据顺序
std::atomic<int64_t> m_ckeck_sequence_number;//检查数据序号,若数据序号不是递增为1,抛出异常
std::function<void(const std::shared_ptr<T>&,const std::shared_ptr<P>&)> m_execute_function; //每个数据单次处理函数
MultiThreadProcessing::ThreadMode m_thread_mode = ThreadMode::Join;
};
#endif // QUEUESTABLE_H
QueueStable
安全队列类
BaseData
基本数据
MultiThreadProcessing
多线程模版类 ,处理继承基本数据BaseData
的结构体
使用 QueueStable
类确保数据的读写正常,MultiThreadProcessing
多线程处理数据,保证数据先读的先写。
举例:下面实现openVINO + yolov8
推理代码,使用上面的多线程模版:
yolov8_2.h
#ifndef YOLOV8_2_H
#define YOLOV8_2_H
#include "filterbase.h"
#include "queuestable.h"
//包数据
struct DataPacket : BaseData
{
AVMediaType av_media_type = AVMEDIA_TYPE_UNKNOWN;//记录当前的索引,分辨是音频还是视频,字幕,等
std::shared_ptr<AVPacketPtr> packet; //存放包
//记录av_media_type类型数据,如果数据没解码,数据在packet中
std::vector<std::shared_ptr<AVFramePtr>> frame_vector;
};
class YOLOV8_2 : public FilterBase
{
public:
struct Config
{
float nms_threshold;
float score_threshold;
std::string model_path;
std::string bin_path = {};
std::string properties = "GPU.0";
uint32_t image_interval = 2; // 处理图像的间隔,每 image_interval处理一次
};
struct Detection
{
int class_id;
std::string class_name; //类型名
float confidence;//置信度
cv::Rect box; // 矩形框位置
};
struct InferContext
{
ov::InferRequest request;
uint32_t image_interval; // 处理图像的间隔,每 image_interval处理一次
};
YOLOV8_2(){}
YOLOV8_2(const Config& config);
~YOLOV8_2(){}
//实现基类的纯虚函数 start 是个接口,用于实现多态,基类不做实现
void start(const std::shared_ptr<QueueStable<std::shared_ptr<DataPacket>>>& read_queue,
const std::shared_ptr<QueueStable<std::shared_ptr<DataPacket>>>& write_queue) override;
//多线程同步检测
void detect(const std::shared_ptr<QueueStable<std::shared_ptr<DataPacket>>>& read_queue,
const std::shared_ptr<QueueStable<std::shared_ptr<DataPacket>>>& write_queue);
protected:
void initial();
//预处理
ov::Tensor preprocess(const cv::Mat& frame, cv::Mat& pre_frame);
//后处理
void postprocess(cv::Mat& frame, const ov::Tensor& output_tensor);
cv::Mat letterbox(const cv::Mat& input_image, const cv::Size& target_size, const cv::Scalar& fill_color = cv::Scalar(0, 0, 0), float* m_ratio = nullptr,int* m_top_offset = nullptr,int* m_left_offset = nullptr);
private:
Config m_config; // 参数
float m_ratio; // 原图与模型输入图 缩放比例
int m_top_offset;
int m_left_offset;
ov::CompiledModel m_compiled_model;
MultiThreadProcessing<DataPacket,InferContext> m_multi_thread;
std::vector<std::shared_ptr<InferContext>> m_infer_request_vector; //推理列表,用于多路推理
uint32_t m_infer_request_size;
};
#endif // YOLOV8_2_H
#include "yolov8_2.h"
const std::vector<std::string> coconame = { "person",
"bicycle",
"car",
"motorcycle",
"airplane",
"bus",
"train",
"truck",
"boat",
"traffic light",
"fire hydrant",
"stop sign",
"parking meter",
"bench",
"bird",
"cat",
"dog",
"horse",
"sheep",
"cow",
"elephant",
"bear",
"zebra",
"giraffe",
"backpack",
"umbrella",
"handbag",
"tie",
"suitcase",
"frisbee",
"skis",
"snowboard",
"sports ball",
"kite",
"baseball bat",
"baseball glove",
"skateboard",
"surfboard",
"tennis racket",
"bottle",
"wine glass",
"cup",
"fork",
"knife",
"spoon",
"bowl",
"banana",
"apple",
"sandwich",
"orange",
"broccoli",
"carrot",
"hot dog",
"pizza",
"donut",
"cake",
"chair",
"couch",
"potted plant",
"bed",
"dining table",
"toilet",
"tv",
"laptop",
"mouse",
"remote",
"keyboard",
"cell phone",
"microwave",
"oven",
"toaster",
"sink",
"refrigerator",
"book",
"clock",
"vase",
"scissors",
"teddy bear",
"hair drier",
"toothbrush" };
YOLOV8_2::YOLOV8_2(const YOLOV8_2::Config& config)
{
this->m_infer_request_size = 12;
m_config = config;
initial();
}
//多线程同步检测
void YOLOV8_2::detect(const std::shared_ptr<QueueStable<std::shared_ptr<DataPacket>>>& read_queue,
const std::shared_ptr<QueueStable<std::shared_ptr<DataPacket>>>& write_queue)
{
std::function<void(const std::shared_ptr<DataPacket>&,const std::shared_ptr<InferContext>&)> lambda = [this](const std::shared_ptr<DataPacket>& data_packer,const std::shared_ptr<InferContext>& context)
{
if(data_packer->av_media_type == AVMEDIA_TYPE_VIDEO && !data_packer->frame_vector.empty())
{
for(const auto& frame : data_packer->frame_vector)
{
if(frame->get_number() % context->image_interval != 0)
{
continue;
}
// 创建 cv::Mat 对象,注意这里直接使用 AVFrame 的数据
cv::Mat image = cv::Mat(frame->get()->height, frame->get()->width, CV_8UC3, frame->get()->data[0], frame->get()->linesize[0]);
if (!image.empty())
{
cv::Mat pre_image;//临时存储
//预处理
ov::Tensor input_tensor = preprocess(image,pre_image);
//开始推理
context->request.set_input_tensor(input_tensor);
context->request.infer();
//等待完成处理结果
const ov::Tensor& output_tensor = context->request.get_output_tensor();
this->postprocess(image, output_tensor);
}
}
}
};
m_multi_thread.set_contexts(m_infer_request_vector);
m_multi_thread.set_execute_function(lambda);
m_multi_thread.start(read_queue,write_queue);
}
void YOLOV8_2::start(const std::shared_ptr<QueueStable<std::shared_ptr<DataPacket>>>& read_queue,const std::shared_ptr<QueueStable<std::shared_ptr<DataPacket>>>& write_queue)
{
detect(read_queue,write_queue);
}
void YOLOV8_2::initial()
{
//创建推理引擎 ie
ov::Core core;
//读取模型
/*
* std::shared_ptr<ov::Model> model = core.read_model(this->onnx_path);
* @brief从IR / ONNX / PDPD / TF / TFLite文件格式读取模型。
* @param model_path模型的路径。
* @param bin_path数据文件的路径。
* 对于IR格式(*.bin) :
* 如果`bin_path`为空,将尝试读取与XML同名的bin文件
* *如果没有找到同名的bin文件,将加载无权重的IR。
* 对于以下文件格式,不使用`bin_path`参数:
* ONNX格式(*.onnx)
* *PDPD(*.pdmodel)
* *TF(*.pb)
* *TFLite(*.tflite)
* @返回一个模型。
*/
std::shared_ptr<ov::Model> model = core.read_model(m_config.model_path,m_config.bin_path);
ov::preprocess::PrePostProcessor ppp = ov::preprocess::PrePostProcessor(model);
ppp.input().tensor().set_element_type(ov::element::u8).set_layout("NHWC").set_color_format(ov::preprocess::ColorFormat::BGR)/*.set_spatial_static_shape(640, 640) //640*640 yolov8输入大小*/;
//ppp.input().tensor().set_shape(ov::PartialShape({ 1,640,640,3 }));//自定义输入大小,确保和模型大小同样
ppp.input().preprocess().convert_layout("NCHW").convert_element_type(ov::element::f32).convert_color(ov::preprocess::ColorFormat::RGB).scale({ 255, 255, 255 });// .scale({ 112, 112, 112 });
//ppp.input().preprocess().resize(ov::preprocess::ResizeAlgorithm::RESIZE_NEAREST, 640, 640);
//ppp.input().model().set_layout("NCHW");
ppp.output().postprocess().convert_element_type(ov::element::f32);
//ppp.output().tensor().set_element_type(ov::element::f32);
model = ppp.build();
this->m_compiled_model = core.compile_model(model,m_config.properties);
//创建推理请求
for(size_t i=0;i<m_infer_request_size;++i)
{
std::shared_ptr<InferContext> sh = std::make_shared<InferContext>();
sh->request = m_compiled_model.create_infer_request();
sh->image_interval = m_config.image_interval;
m_infer_request_vector.push_back(sh);
}
}
// Letterbox 缩放函数
/*
* input_image 输入原图像
* target_size 目标图像大小
* fill_color 填充颜色
* m_ratio 缩放比例
* m_top_offset 缩放的图像 在目标图像中的 y 位置
* m_left_offset 缩放的图像 在目标图像中的 x 位置
*/
cv::Mat YOLOV8_2::letterbox(const cv::Mat& input_image, const cv::Size& target_size, const cv::Scalar& fill_color, float* m_ratio,int* m_top_offset,int* m_left_offset)
{
//输出图像
cv::Mat output_image(target_size, input_image.type(), fill_color);
//输入图像和输出图像 高度和宽度 都相等,直接复制返回
if(input_image.cols == output_image.cols && input_image.rows == output_image.rows)
{
input_image.copyTo(output_image);
//获取比例
if (m_ratio)
{
*m_ratio = 1.0;
}
if (m_top_offset)
{
*m_top_offset = 0;
}
if (m_left_offset)
{
*m_left_offset = 0;
}
return output_image;
}
float r = 0.0;
cv::Rect dest_rect;
//输入图像宽 > 图像高,宽对齐,高至中
if (input_image.cols > input_image.rows)
{
// 宽缩放 m_ratio ,那么高也要缩放 m_ratio
r = static_cast<float>(input_image.cols) / output_image.cols;
int new_rows = static_cast<int>(input_image.rows / r);
dest_rect = cv::Rect(0, (output_image.rows - new_rows) / 2, output_image.cols, new_rows);
//dest_rect = cv::Rect(0, 0, output_image.cols, new_rows);
}
else
{
// 高缩放 m_ratio ,那么宽也要缩放 m_ratio
r = static_cast<float>(input_image.rows) / output_image.rows;
int new_cols = static_cast<int>(input_image.cols / r);
dest_rect = cv::Rect((output_image.cols - new_cols) / 2, 0, new_cols, output_image.rows);
}
//获取比例
if (m_ratio)
{
*m_ratio = r;
}
if (m_top_offset)
{
*m_top_offset = dest_rect.y;
}
if (m_left_offset)
{
*m_left_offset = dest_rect.x;
}
cv::resize(input_image, output_image(dest_rect), dest_rect.size(), cv::INTER_LINEAR);
return output_image;
}
//预处理
ov::Tensor YOLOV8_2::preprocess(const cv::Mat& frame, cv::Mat& pre_frame)
{
//预处理
const ov::Shape& shape = m_compiled_model.input().get_shape();
//shape 对应 ppp.input().tensor().set_element_type(ov::element::u8).set_layout("NHWC") 中 NHWC
pre_frame = letterbox(frame, cv::Size(shape.at(2), shape.at(1)), cv::Scalar(100, 100, 100), &m_ratio, &m_top_offset, &m_left_offset);
uchar* input_data = pre_frame.data;
return ov::Tensor(m_compiled_model.input().get_element_type(), m_compiled_model.input().get_shape(), input_data);
}
//后处理
void YOLOV8_2::postprocess(cv::Mat& frame, const ov::Tensor& output_tensor)
{
std::vector<cv::Rect> boxes;
std::vector<int> class_ids;
std::vector<float> confidences;
const ov::Shape& output_shape = output_tensor.get_shape();
const int& out_rows = output_shape.at(1);
const int& out_cols = output_shape.at(2);
const cv::Mat det_output(out_rows, out_cols, CV_32F, (float*)output_tensor.data<float>());
CHECK(det_output.cols == 8400)
CHECK(det_output.rows == 84)
//找到所有符合的 类别 矩形,置信度,
for (int i = 0; i < det_output.cols; ++i)
{
const cv::Mat& classes_scores = det_output.col(i).rowRange(4, 84);
cv::Point class_id_point;
double score;
cv::minMaxLoc(classes_scores, nullptr, &score, nullptr, &class_id_point);
//阈值大于0.25 认为检测出结果
//if (score > 0.3)
{
//坐标
const float& x = det_output.at<float>(0, i);
const float& y = det_output.at<float>(1, i);
const float& w = det_output.at<float>(2, i);
const float& h = det_output.at<float>(3, i);
cv::Rect box;
box.x = static_cast<int>(x);
box.y = static_cast<int>(y);
box.width = static_cast<int>(w);
box.height = static_cast<int>(h);
boxes.push_back(box);
class_ids.push_back(class_id_point.y);
confidences.push_back(score);
}
}
std::vector<int> nms_result;
//nms 去重,找到最优数据
cv::dnn::NMSBoxes(boxes, confidences, m_config.score_threshold, m_config.nms_threshold, nms_result);
std::vector<Detection> output;
for (int i = 0; i < nms_result.size(); ++i)
{
Detection result;
int idx = nms_result.at(i);
result.class_id = class_ids.at(idx);
result.confidence = confidences.at(idx);
result.class_name = coconame.at(result.class_id) + ' ' + std::to_string(result.confidence).substr(0, 4);
result.box.width = boxes.at(idx).width * this->m_ratio;
result.box.height = boxes.at(idx).height * this->m_ratio;
result.box.x = (boxes.at(idx).x - 0.5 * boxes.at(idx).width - this->m_left_offset) * this->m_ratio ;
result.box.y = (boxes.at(idx).y - 0.5 * boxes.at(idx).height - this->m_top_offset) * this->m_ratio ;
output.push_back(result);
}
//绘制
for (int i = 0; i < output.size(); ++i)
{
auto detection = output.at(i);
auto box = detection.box;
auto class_string = detection.class_name;
float xmax = box.x + box.width;
float ymax = box.y + box.height;
//生成随机颜色
// 获取当前系统时间作为种子
auto current_time = std::chrono::system_clock::now().time_since_epoch().count();
// 使用随机种子创建 RNG 对象
cv::RNG rng(current_time);
cv::Scalar color= cv::Scalar(rng.uniform(100, 256),rng.uniform(100, 256),rng.uniform(100, 256));
// Detection box
cv::rectangle(frame, cv::Point(box.x, box.y), cv::Point(xmax, ymax), color, 2);
// Detection box text
cv::Size textSize = cv::getTextSize(class_string, cv::FONT_HERSHEY_DUPLEX, 1, 2, 0);
cv::Rect textBox(box.x, box.y - 40, textSize.width + 10, textSize.height + 20);
cv::rectangle(frame, textBox, color, cv::FILLED);
cv::putText(frame, class_string, cv::Point(box.x + 5, box.y - 10), cv::FONT_HERSHEY_DUPLEX, 1, cv::Scalar(0, 0, 0));
}
}
这个例子是ffmpeg读取的数据,用openVINO推理实现对每张图片进行实时推理,推理后的数据,按照顺序写入显示,保证数据顺序一致性。