往期教程
如果觉得写的可以,请给一个点赞+关注支持一下
观看之前请先看,往期的博客教程,否则这篇博客没办法看懂
-
workFlow c++异步网络库编译教程与简介
-
C++异步网络库workflow入门教程(1)HTTP任务
-
C++异步网络库workflow系列教程(2)redis任务
-
workflow系列教程(3)Series串联任务流
简介
上一篇博客中讲了串行任务流,有了串行,那必然也有并行,本篇博客讲解任务流并行执行
创建一个任务流序列
SubTask
是所有任务的基类first
参数是这个任务流序列执行的首个任务
callback
是这个序列执行完毕后执行的回调函数
using series_callback_t = std::function<void (const SeriesWork *)>;
inline SeriesWork *
Workflow::create_series_work(SubTask *first, series_callback_t callback)
{
return new SeriesWork(first, std::move(callback));
}
创建并行任务流序列
- 参数
callback
是设置并行任务流中所有任务执行完毕之后调用的回调函数 - 函数返回
ParallelWork *
并行任务指针
using parallel_callback_t = std::function<void (const ParallelWork *)>;
inline ParallelWork *
Workflow::create_parallel_work(parallel_callback_t callback)
{
return new ParallelWork(std::move(callback));
}
向并行任务流中添加一个任物流序列
class ParallelWork{
public:
void add_series(SeriesWork *series);
}
代码示例
图示流程
- 首先创建一个并行任务流序列,随后创建n个任务流序列,每个任务流序列添加一个http任务,
- 先并行执行每个序列的http任务基本工作,随后调用设置的
httpCallback
异步回调函数,httpCallback
执行完毕后调用所在序列的序列回调函数,当所有的序列回调函数执行完毕之后在执行并行任务流的parallelCallback
回调函数
#include <vector>
#include <workflow/WFFacilities.h>
#include <workflow/Workflow.h>
#include <workflow/HttpUtil.h>
struct SeriesContext{
std::string url;
int state;
int error;
protocol::HttpResponse resp;//响应报文的完整内容
};
void parallelCallback(const ParallelWork *pwork){
fprintf(stderr,"pwork callback!\n");
SeriesContext *context;
for(size_t i = 0; i != pwork->size(); ++i){
context = static_cast<SeriesContext *>(pwork->series_at(i)->get_context());
fprintf(stderr,"url = %s\n", context->url.c_str());
if(context->state == WFT_STATE_SUCCESS){
const void *body;
size_t size;
context->resp.get_parsed_body(&body,&size);
fwrite(body,1,size,stderr);
fprintf(stderr,"\n");
}
else{
fprintf(stderr,"Error, state = %d, error = %d\n", context->state, context->error);
}
delete context;
}
}
void httpCallback(WFHttpTask *httpTask){
SeriesContext *context = static_cast<SeriesContext *>(series_of(httpTask)->get_context());
fprintf(stderr,"httpTask callback, url = %s\n", context->url.c_str());
context->state = httpTask->get_state();
context->error = httpTask->get_error();
context->resp = std::move(*httpTask->get_resp());
}
int main(){
//使用工厂函数,创建一个并行任务
ParallelWork *pwork = Workflow::create_parallel_work(parallelCallback);//Workflow::create_parallel_work
std::vector<std::string> urlVec ={"http://192.168.135.129:81", "http://192.168.135.129","http://47.94.147.94"};
for(size_t i = 0; i != urlVec.size() ; ++i){
//创建若干个任务
// WFTaskFactory::create_http_task
std::string url = urlVec[i];
auto httpTask = WFTaskFactory::create_http_task(url,0,5,httpCallback);
// 修改任务的属性
auto req = httpTask->get_req();
req->add_header_pair("Accept","*/*");
req->add_header_pair("User-Agent","myHttpTask");
req->set_header_pair("Connection", "Close");
//为响应的内容申请一片堆空间
SeriesContext *context = new SeriesContext;
context->url = std::move(url);
// 为每个任务创建一个序列
auto series = Workflow::create_series_work(httpTask,nullptr);
// 把存储响应内容的指针 拷贝到序列的context当中。
series->set_context(context);
//把序列加入到并行任务中
// add_series
pwork->add_series(series);
}
pwork->start();//启动并行任务
}