Active Object 设计模式:
1) 根据对象被调用的方式,可以将对象分为两类:
Passive Object和Active Object。Passive 和 Object和调用者在同一个线程中,这就是我们通常所用的函数调用。而Active Object和调用在不同的线程中,它有自己的控制线程。Active Object设计模式是一种应用于并发编程的设计模式,它通过解耦对象的访问和对象的执行来增加并发性,从而简化对象的同步访问。
将方法的调用和方法的执行解耦。方法的调用在客户线程中,而方法的执行在另一个独立的线程中。另外,通过良性设计,使得客户端看起来就像是调用一个普通的方法。具体来说:我们用一个Proxy来描述Active Object的接口,由一个Servant来提供Active Object的实现。Proxy和Servant运行在两个不同的线程中,这样方法的调用和方法的执行就可以并发地运行了。其中Proxy运行在客户线程中,而Servant运行在另一个线程中。在运行期,Proxy将客户端的方法调用转化为方法请求,它们通过一个调度器保存在一个请求链表中,调度器运行在Servant线程中,它将方法请求从链表中取出,交由Servant执行。客户还可以获得运行结果。由于方法的调用和运行在不同的线程中,所以运行结果并不是立即返回的。客户只能通过阻塞等待或者循环查询的方式来获得运行结果,这个结果我们用Future来表示。
2) 在Active Object设计模式中,有6个关键的参与者:
Proxy:提供一个接口,客户通过该接口调用Active Object的方法。通过Proxy应用程序可以使用强类型的编程语言,而不是线程间松散的消息。Proxy位于客户线程中。
方法请求(Method Request):方法请求类定义用于执行Active Object方法的接口。客户每次调用Proxy定义的方法,就会构造一个方法请求对象。一个方法请求包含了方法参数等上下文信息,这些上下文信息用于方法的执行和执行结果的返回。
请求链表:Proxy将具体的方法请求对象插入到请求链表中。请求链表既用于保存方法请求,又用于判断方法请求的可执行性。请求链表将Proxy线程和Servant线程解耦,使得它们可以并发地运行。
调度器(Scheduler):调度器运行于Active Object线程,用于调度方法请求。调度器使用请求链表来管理方法请求,调度的策略可以根据应用程序的实际需要进行选择。
工作者(Servant):定义Active Object的行为和状态。工作者实现的方法需要与Proxy 接口、方法请求一致。当方法请求被调度器调度时,工作者的方法才会得到执行,因此,它总是运行在调度器线程中。
结果(Future):客户通过Proxy调用一个方法,就可以接收到Future。Future使得客户可以获得方法调用的执行结果。由于客户的请求和方法的执行在不同的线程中,所以Future只能通过阻塞等待或者循环查询的方式获取。
#include "ace/Log_Msg.h"
#include "ace/OS.h"
#include "ace/Task.h"
#include "ace/Method_Request.h"
#include "ace/Future.h"
#include "ace/Activation_Queue.h"
#include <memory.h>
using namespace std;
#pragma comment(lib,"ace.lib")
// Servant 类对status_result_ 进行简单的累加工作
class Servant
{
public:
Servant()
{
status_result_ = 1;
}
int status_update(void)
{
ACE_DEBUG((LM_DEBUG, ACE_TEXT("Obtaining a status_update in %t ") ACE_TEXT("thread of control\n")));
ACE_OS::sleep(2);
return next_result_id();
}
private:
int next_result_id(void)
{
return status_result_++;
};
int status_result_;
};
class StatusUpdate : public ACE_Method_Request
{
public:
StatusUpdate(Servant & controller, ACE_Future<int>&returnval)
: controller_(controller), returnVal_(returnval)
{
ACE_DEBUG((LM_DEBUG, "StatusUpdate::StatusUpdate.\n"));
}
virtual int call(void)
{
ACE_DEBUG((LM_DEBUG, "StatusUpdate::call.\n "));
this->returnVal_.set(this->controller_.status_update());
return 0;
}
private:
Servant& controller_;
ACE_Future<int> returnVal_;
};
class ExitMethod : public ACE_Method_Request
{
public:
virtual int call(void)
{
return -1;
}
};
// 调度器类
class Scheduler : public ACE_Task_Base
{
public:
Scheduler()
{
ACE_DEBUG((LM_DEBUG, "Scheduler::Scheduler.\n"));
this->activate();
}
virtual int svc(void)
{
ACE_DEBUG((LM_DEBUG, "Scheduler::svc.\n"));
while(1)
{
auto_ptr<ACE_Method_Request> request
(this->activation_queue_.dequeue());
if (request->call() == -1)
break;
}
return 0;
}
int enqueue(ACE_Method_Request* request)
{
ACE_DEBUG((LM_DEBUG, "Proxy::status_update.\n"));
return this->activation_queue_.enqueue(request);
}
private:
ACE_Activation_Queue activation_queue_;
};
//既然在Active Object设计模式中使用了ACE_Activation_Queue作为消息队列,那么也就没有有必要使用ACE_Task类了,因为ACE_Task使用的消息队列是ACE_Message_Queue。
class Proxy
{
public:
ACE_Future<int> status_update(void)
{
ACE_DEBUG((LM_DEBUG, "Proxy::status_update.\n"));
ACE_Future<int> result;
this->scheduler_.enqueue(new StatusUpdate(this->controller_, result));
return result;
}
void exit(void)
{
ACE_DEBUG((LM_DEBUG, "Proxy::exit.\n"));
this->scheduler_.enqueue(new ExitMethod);
}
private:
Scheduler scheduler_;
Servant controller_;
};
int ACE_TMAIN(int, ACE_TCHAR* [])
{
Proxy controller;
ACE_Future<int> results[10];
//客户通过Proxy接口发起累加方法请求
for (int i = 0; i < 10; i++)
results[i] = controller.status_update();
ACE_OS::sleep(5);
//客户获取运行结果
for (int j = 0; j < 10; j++)
{
int result = 0;
results[j].get(result);
ACE_DEBUG((LM_DEBUG, "[%D(%t)] result %d\n", result));
}
controller.exit();
ACE_Thread_Manager::instance()->wait();
return 0;
}
运行结果如下: