目录
ACE Service Configurator框架
ACE_Server_Object类
ACE_Server_Repository类
ACE_Server_Config类
ACE Task框架
ACE_Message_Queue类
ACE_TASK类
在开始之前,首先介绍一下模板类的实例化和使用。给出以下代码
//ACCEPTOR代表模板的方法
template <class ACCEPTOR>
class Reactor_Logging_Server_Adapter : public ACE_Service_Object {
public:
// Hook methods inherited from <ACE_Service_Object>.
virtual int init (int argc, ACE_TCHAR *argv[]);
virtual int fini ();
virtual int info (ACE_TCHAR **, size_t) const;
virtual int suspend ();
virtual int resume ();
private:
Reactor_Logging_Server<ACCEPTOR> *server_;
};
//cpp中同样需要声明ACCEPTOR
template <class ACCEPTOR> int
Reactor_Logging_Server_Adapter<ACCEPTOR>::init (int argc,
ACE_TCHAR *argv[]) {
int i;
char **array = 0;
ACE_NEW_RETURN (array, char*[argc], -1);
std::unique_ptr<char *[]> char_argv (array);
for (i = 0; i < argc; ++i)
char_argv[i] = ACE::strnew (ACE_TEXT_ALWAYS_CHAR (argv[i]));
ACE_NEW_NORETURN (server_, Reactor_Logging_Server<ACCEPTOR>
(i, char_argv.get (),
ACE_Reactor::instance ()));
for (i = 0; i < argc; ++i) ACE::strdelete (char_argv[i]);
return server_ == 0 ? -1 : 0;
}
下面代码中使用Logging_Acceptor来实例化Reactor_Logging_Server_Adapter模板,也就是替代了上面的ACCEPTOR对象。
Server_Logging_Daemon_Ex
是通过 typedef
定义的别名,它指向 Reactor_Logging_Server_Adapter<Logging_Acceptor_Ex>
。因此,Server_Logging_Daemon_Ex
的方法本质上是 Reactor_Logging_Server_Adapter<Logging_Acceptor_Ex>
的方法。
//通过Logging_Acceptor类来实例化Reactor_Logging_Server_Adapter模板
typedef Reactor_Logging_Server_Adapter<Logging_Acceptor>
Server_Logging_Daemon;
ACE Service Configurator框架
ACE Service Configurator也就是服务器配置框架,是适配器模式的一种实现。一个适配允许通常因为接口不兼容而不能在一起工作的类工作在一起,做法是将类自己的接口包裹在一个已存在的类中。能够提升应用的可扩展性和灵活性。该模式允许应用在运行时重新配置其服务,而无需修改、重新编译或重新链接程序自身,或是关闭和重启应用。
ACE Service Configurator框架类主要如下:
其角色如下:
ACE_Server_Object类
ACE_Server_Object主要负责初始化、执行控制、报告以及终止。ACE_Server_Object是一个统一的接口,各个服务必须是称为ACE_Server_Object的公共基类的后代。其主要能力和方法如下:
继承自ACE_Service_Object的应用服务可以有选择地重新定义其挂钩方法:这些方法会在适当的时间被 ACE Service Configurator 框架回调,以响应特定的事件。例如,服务对象的init()挂钩方法会在 Service Configurator 框架执行激活服务的指令时(dynamic 和 satic 指令都可以邀活服务)被回调。如果初始化成功,Init()挂钩方法必须返回0;如果失败,必须返回-1。如果(且仅在这样的情况下)init()成功的话,在ACEService Confgurator 框架执行 remove 指令移除服务或是关闭所有服务时,相应的 fini()方法会在服务对象上被回调。
ACE Service Configurator 框架属于动态配置,而不是静态地去配置服务器端口号以及ip地址等。ACE使用了c++方法重载和下面的描述的宏使用不同的字符类型,而没有改变API
为了实现这一目标,讲适配器创建在Reactor_Logging_Server_Adapter.h文件中
template <class ACCEPTOR>
//继承于ACE_Service_Object
class Reactor_Logging_Server_Adapter : public ACE_Service_Object {
public:
// Hook methods inherited from <ACE_Service_Object>.
virtual int init (int argc, ACE_TCHAR *argv[]);
virtual int fini ();
virtual int info (ACE_TCHAR **, size_t) const;
virtual int suspend ();
virtual int resume ();
private:
//采用上述已有的日志服务器
//如果不适用该类则直接继承ACE_Service_Object进行实现
Reactor_Logging_Server<ACCEPTOR> *server_;
};
使用ACE Service Configurator框架创建一个Reactor_Logging_Server_Adapter实体类,会调用以下init方法:
template <class ACCEPTOR> int
Reactor_Logging_Server_Adapter<ACCEPTOR>::init (int argc,
ACE_TCHAR *argv[]) {
//使用ACE_TEXT_ALWAYS_CHAR在需要的地方转换为char
//创建临时对象,通过strnew 进行复制 使得整个Reactor_Logging_Server拥有它
int i;
char **array = 0;
ACE_NEW_RETURN (array, char*[argc], -1);
std::unique_ptr<char *[]> char_argv (array);
for (i = 0; i < argc; ++i)
char_argv[i] = ACE::strnew (ACE_TEXT_ALWAYS_CHAR (argv[i]));
//动态分配Reactor_Logging_Server
ACE_NEW_NORETURN (server_, Reactor_Logging_Server<ACCEPTOR>
(i, char_argv.get (),
ACE_Reactor::instance ()));
//释放argv资源
for (i = 0; i < argc; ++i) ACE::strdelete (char_argv[i]);
return server_ == 0 ? -1 : 0;
}
在被指示移除动态配置的日志服务时,ACE Service Configurator 框架调用下面所示的Reactor Logging_Server_Adapter::fini()挂钩方法:
template <class ACCEPTOR> int
Reactor_Logging_Server_Adapter<ACCEPTOR>::fini ()
{ ((ACCEPTOR*)server_)->handle_close (); server_ = 0; return 0; }
//调用了handle_close 删除Reactor_Logging_Server对象
info方法则用于格式化一个字符串,在其中含有它用于侦听的tcp端口
template <class ACCEPTOR> int
Reactor_Logging_Server_Adapter<ACCEPTOR>::info
(ACE_TCHAR **bufferp, size_t length) const {
//从Reactor_Logging_Server中获取网络地址
ACE_INET_Addr local_addr;
server_->acceptor ().get_local_addr (local_addr);
//格式化消息 解释做的事情
ACE_TCHAR buf[BUFSIZ];
ACE_OS::sprintf (buf,
ACE_TEXT ("%hu"),
local_addr.get_port_number ());
ACE_OS::strcat
(buf, ACE_TEXT ("/tcp # Reactor-based logging server\n"));
//如果没有提供缓存区来存储数据 则通过strnew进行初始化
if (*bufferp == 0)
*bufferp = ACE::strnew (buf);
else
ACE_OS::strncpy (*bufferp, buf, length); //将已经格式化的数据复制到其中
//返回消息的长度
return ACE_Utils::truncate_cast<int> (ACE_OS::strlen (*bufferp));
}
ACE中提供了多种申请内存的方法,需要及时对申请空间的对象进行释放,函数如下所示:
Reactor_Logging_Server的suspend和resume方法使用了反应堆的方法:
//suspend 方法
template <class ACCEPTOR> int
Reactor_Logging_Server_Adapter<ACCEPTOR>::suspend ()
{ return server_->reactor ()->suspend_handler (server_); }
//resume 方法
template <class ACCEPTOR> int
Reactor_Logging_Server_Adapter<ACCEPTOR>::resume ()
{ return server_->reactor ()->resume_handler (server_); }
ACE_Server_Repository类
ACE Service Configurator 框架不仅支持但服务器配置,同样支持多服务器配置,通过使用ACE_Server_Repository类以及ACE_Server_Repository_Iterator类来进行实现。ACE_Server_Repository类实现了管理器模式,将对一个类的所有对象的管理封装到一个单独的管理器类中,而不是单独去操作类对象。通过该种设计模式来控制所配置服务对象的生命周期,以及对他们的访问。提供的功能主要如下:
接口如下:
ACE_Service_object_Type object()万法返回的是指问相关联的 ACE_Service_Object的指针。 ACE_Module_Type object()方法返川指向相关联的ACE_Module的指针. ACE_Module_Typeobject()方法返网指问相关联的ACE Stream的指针。
ACE_Server_Repository的实现类似于容器的实现,ACE_Server_Repository_Iterator类则实现了迭代器模式,顺序地访问ACE_Service_Repository中的ACE_Service_Type系目,而又无需暴露其内部表示。主要方法如下:
不可以在迭代器遍历的过程中删除条目,否则会发生错误。
下面将给出一个实例。将ACE_Server_Repository类以及ACE_Server_Repository_Iterator类来实现Service_Reporter类,Service_Reporter类提供了一种元服务,客户可将其获取ACE Service Configurator框架静态或动态配置某个应用的所有服务信息。
客户通过下述方法与Service_Reporter完成交互。
1、客户建立一个到 Service_Reporter 对象的 TCP 连接,
2、Service_Reporter返回服务器的所有服务的列表给客户。
3、Service_Reporter 关闭 TCP/IP 连接。
首先定义Service_Reporter.h头文件,完成对Service_Reporter的声明,需要继承ACE_Service_Object对象,才能使用ACE Service Configuration框架
class Service_Reporter : public ACE_Service_Object {
public:
Service_Reporter (ACE_Reactor *r = ACE_Reactor::instance ())
: ACE_Service_Object (r) {}
protected:
// Hook methods inherited from <ACE_Service_Object>.
virtual int init (int argc, ACE_TCHAR *argv[]);
virtual int fini ();
virtual int info (ACE_TCHAR **, size_t) const;
virtual int suspend ();
virtual int resume ();
// Reactor hook methods.
virtual int handle_input (ACE_HANDLE);
virtual ACE_HANDLE get_handle () const
{ return acceptor_.get_handle (); }
private:
ACE_SOCK_Acceptor acceptor_; // Acceptor instance.
enum { DEFAULT_PORT = 9411 };
};
Service_Reporter类同样继承于ACE_Service_Object,可以通过ACE Service Configuration框架进行管理及控制。同时ACE内部提供了ACE_Dynamic_Service类模块访问ACE_Server_Repository登记的服务。服务注册之后,可通过ACE_Dynamic_Service获取服务。
如果 Server_Logging_Daemon服务的某个实例已由 ACE Service Configurator 框架进行了动态链接和初始化,应用可以使用ACE_Dynamic_Service模板来在程序中访问服务。如下所示:
Service_Reporter方法挂钩实现在Service_Reporter.cpp中,init方法实现如下:
int Service_Reporter::init (int argc, ACE_TCHAR *argv[]) {
//将local_addr 初始化为Service_Reporter缺省的TCP端口
ACE_INET_Addr local_addr (Service_Reporter::DEFAULT_PORT);
//从argv[0]开始解析 解析到-p的指令
ACE_Get_Opt get_opt (argc, argv, ACE_TEXT ("p:"), 0);
get_opt.long_option (ACE_TEXT ("port"),
'p',
ACE_Get_Opt::ARG_REQUIRED);
for (int c; (c = get_opt ()) != -1; )
switch (c) {
case 'p':
local_addr.set_port_number
(ACE_OS::atoi (get_opt.opt_arg ()));
}
//初始化ACE_SOCK_Acceptor,让其在local_addr地址上监听
//当请求来的时候,反应器分发到下面的Service_Reporter::hand_input()方法中
acceptor_.open (local_addr);
return reactor ()->register_handler
(this,
ACE_Event_Handler::ACCEPT_MASK);
}
当请求来的时候,反应器分发到下面的Service_Reporter::hand_input()方法中
int Service_Reporter::handle_input (ACE_HANDLE) {
//接受新的客户连接 循环式服务 同时只处理一个客户
ACE_SOCK_Stream peer_stream;
acceptor_.accept (peer_stream);
//初始化ACE_Service_Repository_Iterator 0代表已挂起服务信息
ACE_Service_Repository_Iterator iterator
(*ACE_Service_Repository::instance (), 0);
//对于每一个服务 调用info获取信息
//通过 sendv_n 集中写发送数据
for (const ACE_Service_Type *st;
iterator.next (st) != 0;
iterator.advance ()) {
iovec iov[3];
iov[0].iov_base = const_cast<ACE_TCHAR *> (st->name ());
iov[0].iov_len =
ACE_Utils::truncate_cast<u_long> (
ACE_OS::strlen (st->name ()) * sizeof (ACE_TCHAR));
const ACE_TCHAR *state = st->active () ?
ACE_TEXT (" (active) ") : ACE_TEXT (" (paused) ");
iov[1].iov_base = const_cast<ACE_TCHAR *> (state);
iov[1].iov_len =
ACE_Utils::truncate_cast<u_long> (
ACE_OS::strlen (state) * sizeof (ACE_TCHAR));
ACE_TCHAR *report = 0; // Ask info() to allocate buffer
int len = st->type ()->info (&report, 0);
iov[2].iov_base = static_cast<ACE_TCHAR *> (report);
iov[2].iov_len = static_cast<u_long> (len);
iov[2].iov_len *= sizeof (ACE_TCHAR);
//同时发送三个对象
peer_stream.sendv_n (iov, 3);
ACE::strdelete (report);
}
//关闭客户端连接 释放socket
peer_stream.close ();
return 0;
}
Service_Reporter::info方法传回一个字符串,用于侦听TCP端口号所作的事情:
int Service_Reporter::info (ACE_TCHAR **bufferp,
size_t length) const {
//设置监听的端口号和地址
ACE_INET_Addr local_addr;
acceptor_.get_local_addr (local_addr);
//处理的事情
ACE_TCHAR buf[BUFSIZ];
ACE_OS::sprintf
(buf, ACE_TEXT ("%hu"), local_addr.get_port_number ());
ACE_OS::strcat
(buf, ACE_TEXT ("/tcp # lists services in daemon\n"));
if (*bufferp == 0)
*bufferp = ACE::strnew (buf); //strnew 需要使用strdelete进行释放
else
ACE_OS::strncpy (*bufferp, buf, length);
return ACE_Utils::truncate_cast<int> (ACE_OS::strlen (*bufferp));
}
Service_Reporter::suspend和Service_Reporter::resume同样调用反应堆的方法。
//调用反应堆suspend_handler方法
int Service_Reporter::suspend ()
{ return reactor ()->suspend_handler (this); }
//调用反应堆resume_handler方法
int Service_Reporter::resume ()
{ return reactor ()->resume_handler (this); }
Service_Reporter::fini方法如下所示:
//调用remove_handler 发送DONT_CALL防止重复调用回调
int Service_Reporter::fini () {
reactor ()->remove_handler
(this,
ACE_Event_Handler::ACCEPT_MASK
| ACE_Event_Handler::DONT_CALL);
return acceptor_.close ();
}
最后,我们必须给ACE Service Configurator 框架提供一些关于这个新服务的“薄记”信息。尽管这个服务的代码将被静态地链接进示例程序中,我们想要让框架实例化一个Service_Reporter 对象,以在服务被激活时执行服务。因此,我们将一些必需的ACE 服务宏增加到 Service_Reporter 实现文件中。这些宏创建一个Service_Reporter,并将其登记到ACE_Service_Repository.
// 用于实现服务定义工程 和 gobble函数。
ACE_FACTORY_DEFINE (ACE_Local_Service, Service_Reporter)
// 定义将服务信息传递给ACE_service_Config的ACE_Static_Svc_Descriptor。
// 静态服务信息宏 用于登记信息
ACE_STATIC_SVC_DEFINE (
Reporter_Descriptor,
ACE_TEXT ("Service_Reporter"),
ACE_SVC_OBJ_T,
&ACE_SVC_NAME (Service_Reporter),
ACE_Service_Type::DELETE_THIS //删除代表服务的ACE_Service_Type对象
| ACE_Service_Type::DELETE_OBJ, //使得gobble函数调用,清除Service_Reporter对象
0 // 此对象最初未处于活动状态
)
//ACE_STATIC_SVC_REQUIRE宏定义一个对象 它向ACE_Service_Repository 登记 Service_Reporter
//的ACE_Static_Svc_Descriptor对象的实例。在许多平台上,这个宏还确保对象被实例化。
//有的地方也会在main函数中直接定义该宏
ACE_STATIC_SVC_REQUIRE (Reporter_Descriptor)
ACE FACTORY _DEFINE, 宏这样来简化对 ACE Service Configurator 框架的使用:
1、它生成不依赖于编译器的工厂方法这个宏生成的_make_Service_Reporer()工厂函数具有exterm“C”linkage,允许框架在 DLL的符号表中定位该函数,而无需知道C++编译器的“名称搅乱”方案。
2、它一致地管理动态内存,为了确保正确的跨平台行为,保证在某个 DLL,中分配的内存也在同一 DLL, 中释放是十分重要的。因此,gobbler 函数被传递给make_Service_Reporter(),以使ACE Service Configuralor 框架能够确保内存在相同的堆中进行分配和释放。
ACE_Server_Config类
上述讲得Reactor_Logging_Server使用了静态配置,但会导致下面几种不好的情况:
在开发周期中过早地做出服务配置决策:过早做出决定,最佳配置可能会发生变化。
如果服务的实现与其初始配置紧密地耦合在一起,修改某个服务可能会给其他服务带来不良影响。
系统的伸缩性很糟糕:可能会浪费资源,或资源利用不充分
ACE_Service_Config 实现了外观模式来集成 ACE Service Configuralor 框架中的其他类并对管理应用中的各个服务所必需的活动进行协调。外观模式为子系统中的一组接口提供一个一致的界面,此模式定义了一个高层接口,这个接口使得这一子系统更加容易使用。这个类提供了以下能力:
ACE_Service_Config的接ㄩ如图5.6所示。这个类有着一个丰富的接口,因为它输出了 ACE Service Configurator 框架中的所有特性。因此,我们将对其方法的描述分组进在下面描述的三个范畴中。
1、服务配置生命周期管理方法:下列方法初始化和关闭ACE_Service_Config。
ACE_Service_Config类是单例模式的一个变种,所有的数据成员都是声明为静态的。确保各个实例状态是唯一的。
open()方法足最为常用的初始化 ACE_Service_Config的方式。它解析在argc和argv 中传入的参数,跳过第一个参数(argv[0]),因为那是程序名。下表概述了ACE_Service_Config 所能识别的选项
服务配置方法:在解析其所有的 argc/argv参数之后,ACE_Service_Confg:open()方法调用下面
这两个方法中的一个或全部来配置应用:
下表总结了可由这两个ACE_Service_Confg方法处理的服务配置指令
动态和初始化服务:
dynamic svc-name svc-type DLL-name:factory_func()["argc/argv options"]
dynamic 表示动态链接和初始化某个服务对象。svc-name是指派给服务的名称。svc-type指定服务的类型,可以足Service_Obiect*、Module*或是Stream*。DLL-name是factory_func()符号的动态链接库的名称。DLL-name 既可以是全路径名,也可以是没有后缀的文件名。如果它是全路径名,ACE_DLL::open()方法会被用于将指定的文件动态链接进应用进程中。但如果它是文件名,open()使用 ACE::ldfind()来定位 DLL,并将其动态链接进进程的地址空间中。
argc/argv options是可选参数列表,可以用来通过服务对象的init()挂钩方法对其进行初始化ACE Service Configurator 框架使用了 ACE_ARGV 类来将字符串分解进多个参数,并替换包含在字符串中的环境变量的值。
上述提到了ACE_DLL类,这里对其进行详细的介绍。显式链接DLL可能和解除可能会造成不统一编程接口,不安全类型等问题。ACE定义了ACE_DLL wrapper facade类来封装显式的链接,使用ACE::ldfind方法来定位DLL。
DLL文件名扩展:ACE::ldfind()通过增加适当的前缀和后缀来确定 DL, 的名称。例如,它在 Solaris 上增加 lib 前级和,so 后级,在 Windows 增加.dll 后缀。
DIL 搜索路径:ACE::ldfind()还会使用平台的 DLL 搜索路径环境变量来搜索指定的DLL,例如,它在许多 UNIX系统上使用LD_LIBRARY_PATH,在 Windows 上使用 PATH·来查找 DLL。
ACE_DLL关键方法如下所示。
初始化静态服务:
static svc-name ["argc/argv options"]
尽管ACE_Service_Confg常用于动态配置,但也可用于静态配置。static表示静态配置,svc-name是指派给服务的名称,argc/argv options与上述方法一样。语法更加简单,因为不需要动态链接到某个对象中。缺省情况下是禁用的,必须显示地启用才行。
完全移除服务:
revome svn-name
remove 指令让 ACE_Service_Confg 解释器询间 ACE_Service_Repository,查找指定的服务。如果找到了该服务,解释器调用它的fini()挂钩方法。如果服务原来是从DLL中动态链接的,它将通过 ACE_DLL::close()法被解除链接。
挂起服务,但不进行移除:
suspend svc-name
suspend 指令让 ACE_Service,Config解释器询问ACE_Service_Repository,查找指定的svc-name服务。如果找到该服务,它的suspend()挂钩方法会被调用。服务可以重新定义这个方法,以实现挂起其处理所需的适当动作。
恢复之前挂起服务:
resume svc-name
resume 指令让 ACE_Service_Confg 解释器询间 ACE_Service_Repository,查找指定的 svc-name服务。如果找到该服务,它的resume()挂钩方法会被调用。服务可以重新定义这个方法,以实现恢复其处理所需的适当动作--通常是反转suspend()方法的效果。
初始化一列有序的层次相关的模块:
stream svc-name '['model-list']'
stream指令让 ACE_Service_Config 解释器初始化一列层次有序相关的模块。每个模块由一对服务组成,这些模块互相连接,并通过传递ACE_Message_Block对象进行通信。stream 指令的实现使用了ACE Streanms 框架。
示例如下所示:
//静态启动Service_Reporter 服务
static Service_Reporter "-p $SERVICE_REPORTER_PORT"
//动态启动AIO_Client_Logging_Daemon 服务 的_make_AIO_Client_Logging_Daemon函数
dynamic AIO_Client_Logging_Daemon Service_Object *
AIO_CLD:_make_AIO_Client_Logging_Daemon()
"-p 7777"
//动态启动Server_Logging_Daemon 的_make_TPC_Logging_Server函数
dynamic Server_Logging_Daemon Service_Object *
TPCLS:_make_TPC_Logging_Server()
3、实用方法
ACE_Service_Config能够解析的完整BNF语法,主要包括常用方法指令 以及名字状态等信息,如下所示:
reconfigure()方法可被用于迫使 ACE_Service_Config 解释器重新处理服务配置文件。如果服务的名称已知的话,通过 suspend()和resume()万法可以进行服务的挂起和恢复。这是使用在 ACE_Service_Repository 单体中所定义的方法的捷径。
下面将通过一个示例来应用ACE Service Configurator 框架创建一个服务器。步骤如下:
1、静态配置Service_Reporter的一个实例
2、将 Reactor_Logging_Server_Adapler 模板动态地链接和配置进服务器的地址空间中。
3、初始化服务配置
我们从编写下衘这个位于Configurable_Logging_Server.cpp中的-般化的main()程序开始。这个程序将 Service_Reporter和 Reactor_Logging_Server_Adapter 服务配置进应用进程中,随即运行反应器的事件循环。
#include "ace/OS_main.h"
#include "ace/Service_Config.h"
#include "ace/Reactor.h"
int ACE_TMAIN (int argc, ACE_TCHAR *argv[]) {
//向ACE Service Configurator 框架登记静态Reporter服务
ACE_STATIC_SVC_REGISTER (Reporter_Descriptor);
//调用ACE_Service_Config::open()配置应用 加载相关决策
//因为明确了是静态加载 则加了第四个参数0
//也可以直接在指令行上提供-y来控制是否禁用静态
ACE_Service_Config::open
(argc, argv, ACE_DEFAULT_LOGGER_KEY, 0);
ACE_Reactor::instance ()->run_reactor_event_loop ();
return 0;
}
在ACE_Service_Config:open调用时会解析下面这个svc.conf文件。
//将Service_Reporter 静态链接到可执行程序中
//让其调用Service_Reporter 中的init()方法进行初始化
static Service_Reporter "-p $SERVICE_REPORTER_PORT"
//将SLD DLL动态连接到地址空间
//提取_make_Server_Logging_Daemon工厂函数
//调用工厂模式分配一个Server_Logging_Daemon 对象
//调用Server_Logging_Daemon init() 将SERVICE_LOGGING_DAEMON_PORT作为argc/argv参数传递
//如果init成功。Server_Logging_Daemon指针存储在ACE_Service_Respository中
//在"Server_Logging_Daemon"名下
dynamic Server_Logging_Daemon Service_Object *
SLD:_make_Server_Logging_Daemon()
"$SERVICE_LOGGING_DAEMON_PORT"
上述svc.conf的xml表示则更为繁琐,如下所示:
SLD DLL是从下面的SLD.cpp中生成的
#include "Reactor_Logging_Server_Adapter.h"
#include "Logging_Acceptor.h"
#include "SLD_export.h"
//定义Logging_Acceptor来实例化Reactor_Logging_Server_Adapter模板
typedef Reactor_Logging_Server_Adapter<Logging_Acceptor>
Server_Logging_Daemon;
//ACE_FACTORY_DEFINE宏在含有_make_Server_Logging_Daemon()工厂函数中
//如果需要引用工厂函数 则通过ACE_FACTORY_DECLARE宏来进行引入操作
ACE_FACTORY_DEFINE (SLD, Server_Logging_Daemon)
ACE的输入/输出助手宏,能够确保在所有支持的平台上,如下所示。
日志服务器的执行步骤如图所示:
图5.8中的 UML序列图阐释了基于上面所示的 svc.conf 文件配置服务器口志 daemon 所涉及的各个步骤。在程序启动时,ACESTATIC_SVC_REQUIRE宏所生成的对把使用ACE_STATIC_SVC_DEFINE宏创建的Service_Reponer信息记进ACE_Service_Config。当ACE_Service_Config::open()方法被调用时,它使用指定的工厂函数来实例化一个 Service_Reponer 对象,但并不将其激活。open()方法随即调用process_directives(),解释svc.conf文件中的指令。第一条指令激活静态的 Service_Reporter 服务。第二条指令触发以下动作:
1、动态链接 SLD DLL:
2、调用_make_Server_Logging_Daemon 工厂函数来创建 Reactor_Logeing_Server_Adapter的个实例
3、调用新服务对象的init()方法来激活服务
当所有的配置活动完成时,main()程序调用 ACE_Reactor::run_reactor_event_loop()。在那里,服务运行,就像先前的示例中被静态配置的对象一样。
重配置服务器
ACE Service Configurator框架可以响应外部的事件(比如信号或命令),在运行时重配置服务器。这时,框架重新读取它的svc.conf文件,并执行指定的指令,比如将服务对象插入服务器或从中移除以及挂起或恢复已有的服务对象。我们现在闸释怎样使用这些特性来动态地重配置我们的服务器日志 daemon。通过定义一个新的svc.conf文件,并发送信息(SIGHUP或SIGINT)指令重新配置,可以增加以下能力,而不影已有的代码或进程中的Service_Reporter服务。
//移除日志daemon 从应用的地址空间解除其链接
remove Server_Logging_Daemon
//动态将Reactor_Logging_Server_Adapter模块的一个不同的实例重新配置到日志daemon中
//SERVER_LOGGING_DAEMON_PORT位于SLDex.cpp文件中 由ACE_FACTORY_DEFINE 生成
dynamic Server_Logging_Daemon Service_Object *
SLDex:_make_Server_Logging_Daemon_Ex()
"$SERVER_LOGGING_DAEMON_PORT"
//动态配置一个Server_Shutdown服务对象,该对象controller()函数和Quit_handler类
//来等待管理员通过其标准输入发送命令关闭服务器
dynamic Server_Shutdown Service_Object *
SLDex:_make_Server_Shutdown()
上述中的 SERVER_LOGGING_DAEMON_PORT位于SLDex.cpp文件中由ACE_FACTORY_DEFINE 生成,具体代码如下:
//通过Logging_Acceptor_Ex实例化Reactor_Logging_Server_Adapter模板
typedef Reactor_Logging_Server_Adapter<Logging_Acceptor_Ex>
Server_Logging_Daemon_Ex;
ACE_FACTORY_DEFINE (SLDEX, Server_Logging_Daemon_Ex)
上述dynamic Server_Shutdown xxx动态配置一个Server_Shutdown服务对象,该对象controller()函数和Quit_handler类,下面所示的Server_Shutdown继承自ACE_Server_Object对象。
class Server_Shutdown : public ACE_Service_Object {
public:
//初始化数据
//将THR_DETACHED传给spawn 控制器的线程标识符和其他资源会在线程终止后由OS自动回收
virtual int init (int, ACE_TCHAR *[]) {
reactor_ = ACE_Reactor::instance ();
return ACE_Thread_Manager::instance ()->spawn
(controller, reactor_, THR_DETACHED);
}
//通知反应器关闭
virtual int fini () {
Quit_Handler *quit_handler = 0;
ACE_NEW_RETURN (quit_handler,
Quit_Handler (reactor_), -1);
return reactor_->notify (quit_handler);
}
private:
ACE_Reactor *reactor_;
};
使用ACE_FACTORY_DEFINE来生成ACE Service Configurator框架所需的_make_Server_Shutdown()函数
ACE_FACTORY_DEFINE (SLDEX, Server_Shutdown)
基于上面所示的svc.conf文件来配置服务器日志daemon所涉及的各个步骤:
ACE Servicc Configurator框架中的动态重配置机制使得开发者能够修改服务器功能,或足调谐性能,而无需重新进行大量的开发和安装。例如,调试一个有问题的日志服务实现只是涉及到动态地承配置一个功能等价的服务,在此服务中包含有额外的装置来帮助标定错误的行为。不用修改、重编译、重链接或是重启正在执行的服务器日志 daemon,就可以完成这样的重配置过程。特别地,这样的重配置小会影响先前静态配置的Service_Reporer。
ACE Task框架
ACE Task框架提供了强大而可扩展的面向对象并发能力,比如在对象的上下文中派生线程,以及在执行在不同线程中的对象之间传送消息和对消息进行排队。该框架可被应用于实现一些关键的并发模式,比如:
Active Object模式:它解除了“调用某个方法的线程”与“执行该方法的线程”的耦合。该模式增强了并发性,并简化了对“执行在一个或多个线程的上下文中的对象”的同步化访问。它通过对方法的调用(Method Invocation)与方法的执行(Method Execution)进行解耦来提高并发性。c++实现Active Object模式代码如下:
#include <pthread.h>
#include <iostream>
#include <queue>
// 定义一个ActiveObject类
class ActiveObject {
public:
ActiveObject() {
pthread_mutex_init(&mutex_, NULL); // 初始化互斥锁
pthread_cond_init(&cond_, NULL); // 初始化条件变量
pthread_create(&worker_thread_, NULL, WorkerThread, this);// 创建一个工作线程,运行WorkerThread函数,传入this指针作为参数
}
// 析构函数
~ActiveObject() {
{
pthread_mutex_lock(&mutex_); // 加锁
stop_worker_ = true; // 设置停止工作线程的标志
pthread_cond_signal(&cond_); // 发送条件信号,通知等待在条件变量上的线程
pthread_mutex_unlock(&mutex_); // 解锁
}
pthread_join(worker_thread_, NULL); // 等待工作线程结束
pthread_mutex_destroy(&mutex_); // 销毁互斥锁
pthread_cond_destroy(&cond_); // 销毁条件变量
}
// 添加一个任务到队列
void Enqueue(void (*task)(void*), void* arg) {
pthread_mutex_lock(&mutex_); // 加锁
tasks_.push(std::make_pair(task, arg)); // 将任务及其参数添加到队列中
pthread_cond_signal(&cond_); // 发送条件信号
pthread_mutex_unlock(&mutex_); // 解锁
}
private:
// 工作线程函数,静态成员函数,可以通过传入的参数访问ActiveObject实例
static void* WorkerThread(void* arg) {
ActiveObject* self = static_cast<ActiveObject*>(arg);
self->ProcessQueue(); // 处理任务队列
return NULL;
}
// 处理任务队列的函数
void ProcessQueue() {
while (true) {
void (*task)(void*) = NULL;
void* arg = NULL;
pthread_mutex_lock(&mutex_); // 加锁
// 当队列为空且没有收到停止信号时,等待在条件变量上
while (tasks_.empty() && !stop_worker_) {
pthread_cond_wait(&cond_, &mutex_);
}
// 如果收到停止信号且队列为空,则退出循环
if (stop_worker_ && tasks_.empty()) {
pthread_mutex_unlock(&mutex_);
break;
}
// 取出队列中的任务及其参数
task = tasks_.front().first;
arg = tasks_.front().second;
tasks_.pop();
// 解锁
pthread_mutex_unlock(&mutex_);
// 执行任务
task(arg);
}
}
// 任务队列,存储任务和对应的参数
std::queue<std::pair<void (*)(void*), void*> > tasks_;
pthread_mutex_t mutex_; // 互斥锁,用于保护任务队列和停止标志的访问
pthread_cond_t cond_; // 条件变量,用于在任务队列为空时等待新的任务
bool stop_worker_ = false; // 停止工作线程的标志
pthread_t worker_thread_; // 工作线程的线程ID
};
// 使用示例中的任务函数,打印一个整数值
void PrintTask(void* arg) {
int value = *static_cast<int*>(arg);
std::cout << "Executing task with value " << value << std::endl;
}
// 主函数,创建一个ActiveObject实例,并添加一个任务到队列中
int main() {
ActiveObject active_object;
int value = 42;
active_object.Enqueue(PrintTask, &value);
// 等待1秒,仅用于示例,确保任务有足够的时间执行
sleep(1);
return 0;
}
Half-Sync/Half-Async 模式:它解除了并发系统中的异步与同步处理的耦合,从而能够简化编程而又不会带来严重的性能下降。该模式引入了了个层次: “层用于异步(或反应式)处理,层用于同步服务处理,还有一个排队层,负责协调异步/反应式和同步层之间的通信。c++多线程实现Half-Sync/Half-Async 模式代码如下:
#include <pthread.h>
#include <iostream>
#include <queue>
// HalfSyncHalfAsync 类实现了一个半同步半异步的任务队列处理器
class HalfSyncHalfAsync {
public:
// 构造函数,初始化互斥锁、条件变量,并启动工作线程
HalfSyncHalfAsync() {
pthread_mutex_init(&mutex_, NULL); // 初始化互斥锁
pthread_cond_init(&cond_, NULL); // 初始化条件变量
pthread_create(&worker_thread_, NULL, WorkerThread, this); // 创建并启动工作线程
}
// 析构函数,停止工作线程,并清理资源
~HalfSyncHalfAsync() {
{
pthread_mutex_lock(&mutex_); // 加锁
stop_worker_ = true; // 设置停止工作线程的标志
pthread_cond_signal(&cond_); // 发送条件信号,唤醒等待的工作线程
pthread_mutex_unlock(&mutex_); // 解锁
}
pthread_join(worker_thread_, NULL); // 等待工作线程结束
pthread_mutex_destroy(&mutex_); // 销毁互斥锁
pthread_cond_destroy(&cond_); // 销毁条件变量
}
// 向任务队列中添加一个异步任务
void EnqueueTask(void (*task)(void*), void* arg) {
pthread_mutex_lock(&mutex_); // 加锁
tasks_.push(std::make_pair(task, arg)); // 将任务添加到队列中
pthread_cond_signal(&cond_); // 发送条件信号,唤醒等待的工作线程
pthread_mutex_unlock(&mutex_); // 解锁
}
// 同步执行一个任务,即在当前线程立即执行
// 另外一种模式则全部采用异步的方式进行执行任务
void SyncExecute(void (*task)(void*), void* arg) {
task(arg); // 直接执行任务
}
private:
// 工作线程的入口函数
static void* WorkerThread(void* arg) {
HalfSyncHalfAsync* self = static_cast<HalfSyncHalfAsync*>(arg); // 转换参数为HalfSyncHalfAsync对象指针
self->ProcessQueue(); // 处理任务队列
return NULL;
}
// 处理任务队列中的任务
void ProcessQueue() {
while (true) {
void (*task)(void*) = NULL; // 任务函数指针
void* arg = NULL; // 任务参数
pthread_mutex_lock(&mutex_); // 加锁
// 等待任务队列非空或收到停止信号
while (tasks_.empty() && !stop_worker_) {
pthread_cond_wait(&cond_, &mutex_); // 等待条件信号
}
// 如果收到停止信号且任务队列为空,则退出循环
if (stop_worker_ && tasks_.empty()) {
pthread_mutex_unlock(&mutex_); // 解锁
break;
}
// 取出任务队列中的任务
task = tasks_.front().first;
arg = tasks_.front().second;
tasks_.pop(); // 弹出任务队列中的任务
pthread_mutex_unlock(&mutex_); // 解锁
task(arg); // 执行任务
}
}
std::queue<std::pair<void (*)(void*), void*> > tasks_; // 任务队列,存储任务函数指针和参数
pthread_mutex_t mutex_; // 互斥锁,用于保护任务队列的访问
pthread_cond_t cond_; // 条件变量,用于唤醒等待的工作线程
bool stop_worker_ = false; // 停止工作线程的标志
pthread_t worker_thread_; // 工作线程的线程ID
};
// 示例任务函数,打印参数值
void AsyncTask(void* arg) {
int value = *static_cast<int*>(arg);
std::cout << "Asynchronous task executed with value " << value << std::endl;
}
int main() {
HalfSyncHalfAsync hsha; // 创建HalfSyncHalfAsync对象
// 添加异步任务到任务队列中
int async_value = 10;
hsha.EnqueueTask(AsyncTask, &async_value);
// 同步执行任务
int sync_value = 20;
hsha.SyncExecute(AsyncTask, &sync_value);
// 等待异步任务完成,仅用于示例(实际使用中应该有更好的同步机制)
sleep(1);
return 0;
}
Active Object模式和Half-Sync/Half-Async 模式的主要区别如下:
Half-Sync/Half-Async(半同步/半异步)模式是一种处理并发I/O的模式,它结合了同步I/O和异步I/O的优点。在这种模式中,通常有一个单独的线程或核心来处理同步I/O操作(例如,读写文件或网络通信),而其他部分的程序则继续异步执行。当同步I/O操作完成时,通常会通过某种机制(如回调函数、事件、信号量等)通知异步部分。
Active Object模式则是一种将方法的执行进行异步处理的设计模式。在Active Object模式中,一个对象将其方法的执行委托给一个独立的线程,从而使这些方法可以异步执行。这种模式有助于将方法的执行与方法的调用解耦,提高了系统的并发性和响应性。
主要区别:
- 关注点不同:Half-Sync/Half-Async主要关注I/O操作的处理方式,而Active Object模式主要关注对象方法的异步执行。
- 实现方式不同:Half-Sync/Half-Async通常涉及一个单独的线程处理同步I/O,而其他部分异步执行。Active Object模式则通常涉及一个或多个工作线程来处理对象的方法调用。
- 适用场景不同:Half-Sync/Half-Async适用于需要处理大量I/O操作且希望保持程序其他部分响应性的场景。Active Object模式则适用于需要将对象方法的执行进行异步处理的场景,以提高系统的并发性和响应性。
ACE Task框架类如下:
ACE TASK框架的好处如下:
ACE_Message_Queue类
ACE定义了ACE_Message_Queue 类。这是一种可移植且又高效的进程内消总排队机制,其中利用了ACE_Message_Block 的各种高级能力。ACE_Message_Queue 是一种可移植的轻量级进程内消息排队机制,它提供了以下能力:
ACE_Message_Queue 类的主要接口如图所示:
主要分为以下4个范畴:
1、初始化和流控制方法:下面的方法可被用于在ACE_Message_Queue 中进行初始化和管理流控制,通过水位标来防止快速的发送者发送的数据超出较慢的接受者的缓冲和计算资源的限度。默认的高水位标志是16K,当高于高水位时,会进行阻塞处理,直到总字节数降到低水位。
2.入队/出队方法和消息缓冲:下面的方法执行ACE_Message_Queue 中的大量工作:
ACE_Message_Queue中的消息分为简单消息和复合消息,简单消息包括单个的ACE_Message_Block,而复合消息则包括多个ACE_Message_Block对象,使用复合模式链接在一起。队列中的消息通过一对指针双向地链接在一起,使用next和prev获取这对指针,按照先进先出规则。复合消息则通过一个continuation指针而单向得串联在一起,通过cont方法获取指针。
3、参数的同步策略:ACE_Message_Queue模块是通过ACE_SYNCH_DECL类及TIME_POLICY 策略类来参数化的
template <ACE_SYNCH_DECL, class TIME_POLICY = ACE_System_Time_Policy>
class ACE_Message_Queue : public ACE_Message_Queue_Base
{
public:
//
protected:
// = 用于控制并发访问的同步.
// 保护队列免受并发访问
ACE_SYNCH_MUTEX_T lock_;
/// 用于使线程休眠,直到队列不再为空.
ACE_SYNCH_CONDITION_T not_empty_cond_;
/// 用于使线程休眠,直到队列不再满为止.
ACE_SYNCH_CONDITION_T not_full_cond_;
/// 返回当前时间的策略
TIME_POLICY time_policy_;
}
ACE提供了两种ACE_SYNCH_DECL traits类,预先包装了最为常用的同步traits。
ACE_NULL_SYNCH 这个类中的 traits 是根据“null”加锁机制来实现的,如下所示:
class ACE_Export ACE_NULL_SYNCH
{
public:
typedef ACE_Null_Mutex MUTEX;
typedef ACE_Null_Mutex NULL_MUTEX;
typedef ACE_Null_Mutex PROCESS_MUTEX;
typedef ACE_Null_Mutex RECURSIVE_MUTEX;
typedef ACE_Null_Mutex RW_MUTEX;
typedef ACE_Null_Condition CONDITION;
typedef ACE_Null_Condition RECURSIVE_CONDITION;
typedef ACE_Null_Semaphore SEMAPHORE;
typedef ACE_Null_Mutex NULL_SEMAPHORE;
};
ACE_NULL_SYNCH 类是 Null Object 模式的一个例子,这种模式通过定义“空操作”占位符、移除类实现中的条件语句而简化了应用。ACE_NULL_SYNCH常常被用于单线程化应用或是这样的应用中:在其中线程间同步或是已经通过仔细地设计而被消除,或是已经通过其他的某种机制而被实现。7.4节中的客户日志 daemon 小例(238 页)阐释了对 ACE NULL SYNCHtraits 类的使用。
ACE_MT_SYNCH 这个预定义类中的taits 是根据实际的加锁机制来实现的,如下所示:
class ACE_Export ACE_MT_SYNCH
{
public:
typedef ACE_Thread_Mutex MUTEX;
typedef ACE_Null_Mutex NULL_MUTEX;
typedef ACE_Process_Mutex PROCESS_MUTEX;
typedef ACE_Recursive_Thread_Mutex RECURSIVE_MUTEX;
typedef ACE_RW_Thread_Mutex RW_MUTEX;
typedef ACE_Condition_Thread_Mutex CONDITION;
typedef ACE_Condition_Recursive_Thread_Mutex RECURSIVE_CONDITION;
typedef ACE_Thread_Semaphore SEMAPHORE;
typedef ACE_Null_Semaphore NULL_SEMAPHORE;
};
通过使用上述trait类能够带来以下好处:
它既允许ACE_Message_Queue 工作在单线程化配置中,也允许它工作在多线程化配置中,并且
无需改变类实现。
它允许程序员通过 Strategized Locking模式,批量地改变 ACE_Message_Queue 的实例化的同步方面。
如果使用的是ACE_NULL_SYNCH 类,MUTEX和CONDITION会被解析为ACE_Null_Mutex和ACE_Null_Condition。如果使用的是ACE_MT_SYNCH 类,MUTEX和CONDITION会被解析为ACE_Thread_Mutex 和 ACE_Condition_Thread_Mutex。
如果 ACE_Message_Queue 是通过 ACE_NULL_SYNCH 进行参数化的,对其入队和出队方法的调用在到达队列的边界条件时,不会阻塞调用线程。它们会返回-1,并将errno设置为EWOULDBLOCK.与此相反,如果 ACE_Message_Qucue 是通过 ACE_MT_SYNCH 实例化的,其入队、出队方法支持阻寒、阻寒和定时操作。例如,当同步化队列为空时,在缺省情况下对其出队方法的调用将会阻塞,直至有消息入队,队列不再为空为止。同样,当问步化队列为满时,在缺省情况下对其入队方法的调用将会阻塞,直至有足够的消息出队、使队列中的字节数降到低水位标以下,队列不再为满为止。通过将下列类型的 ACE_Time_Value 值传给这些方法,可以改变缺省的阻塞行为。
4、关闭和消息释放:下列方法可用于关闭ACE_Message_Queue,使其停止活动,以及释放其中的消息
在内部,ACE_Message_Queue总是有以下3种状态之一:
ACTIVATED 在此状态中所有操作都会正常工作(队列启动时总是处在ACTIVATED状态)。DEACTIVATED 在此状态中所有入队和出队操作都立刻返回-1,并将error设为ESHUTDOWN,直至队列被再次激活。
PULSED 迁移到此状态会使等待中的入队和出队操作立刻返回,就好像队列被停用(Deactivaed)一样,但所有在PULSED状态中发起的操作的行为和在ACTIVATED状态中发起的一样。
在必须通知所有生产者和消费者线程,有重要事件发生的情况下,DEACTIVATED和PULSED状态十分有用。迁移到这两种状态的任何一种都会唤醒所有等待中的生产者和消费者线程。它们之间的区别是迁移之后队列的入队/出队行为。在DEACTIVATED状态中,所有的入队和出队操作都会失败,直至队列的状态变为ACTIVATED。而PULSED状态在行为上与ACETIVATED 状态是等价的,也就是,所有入队/出队操作都会正常进行。PULSED状态主要是用于提供信息--被唤醒的生产者/消费者可以通过检查state()方法的返回值来决定是否尝试进一步的队列操作。
下面将介绍一个实例:
将ACE_Meassage_Queue用于实现客户日志。客户日志 daemon 这行在所有参与网络化日志志服务的主机上,并执行以下任务:
它使用本地IPC机制(比如共字内存、管道或是loopback socket)从在客户日志 daemon 的主机
上的客户那里接收日志记录。
它使用远地IPC机制(比如TCP/IP)来将日志记录转发给运行在指定主机上的服务器口志daemon.
在我们的客户日志 daemon中,主线程使用一个事件处理器和ACE Reactor 框架来从通过网络loopback 设备连接到客户应用的socket那里读取日志记录。该事件处理器在同步化ACE_Message_Queue中对每个口志记录进行排队。另个转发者线程并发地运行着,持续地执行以下步骤:
1、 从消息队列中取出消息。
2、将消息缓冲进更大的chunk(大块)中
3、在TCP连接上将这些chunk转发给服务器日志 daemon。
定义了下列类对其进行实现:
如图 6.4 所示,这个客户日志 daemon 实现中的各个类是依照 Acceptor-Connector模式来设计的,CLD_Acceptor扮演接受器角色,CLD_Connector 扮演连接器角色,而 CLD_Handler扮演服务处理器角色。在图6.5(170页)中显示了主线程、转发者线、图6.4中的各个类以及将它们结合在一起的同步化 ACE_Message_Queue 之间的关系。
需要包含下述头文件:
#include "ace/os_include/os_netdb.h"
#include "ace/OS_NS_sys_time.h"
#include "ace/OS_NS_sys_socket.h"
#include "ace/OS_NS_unistd.h"
#include "ace/OS_NS_string.h"
#include "ace/Event_Handler.h"
#include "ace/INET_Addr.h"
#include "ace/Get_Opt.h"
#include "ace/Log_Record.h"
#include "ace/Truncate.h"
#include "ace/Message_Block.h"
#include "ace/Message_Queue.h"
#include "ace/Reactor.h"
#include "ace/Service_Object.h"
#include "ace/Signal.h"
#include "ace/SOCK_Acceptor.h"
#include "ace/SOCK_Connector.h"
#include "ace/SOCK_Stream.h"
#include "ace/Thread_Manager.h"
#include "Logging_Acceptor.h"
#include "CLD_export.h"
上述的各个类在Client_Logging_Daemon.cpp中进行实现:
CLD_Handlerz主要提供了以下功能:
它从客户那里接收日志记录。
它将日志记录转换为ACE_Message_Block。
它将消息块放入同步化的消息队列中
它运行一个单独的线程,由该线程负责从队列中取出消息块,并以大块的形式转发给日志服务器
class CLD_Handler : public ACE_Event_Handler {
public:
enum { QUEUE_MAX = sizeof (ACE_Log_Record) * ACE_IOV_MAX };
// FUZZ:禁用check_for_lack_ACE_OS
// 初始化挂钩方法
virtual int open (CLD_Connector *);
virtual int close (); // Shut down hook method.
//FUZZ: enable check_for_lack_ACE_OS
// 访问日志服务器连接的访问器
virtual ACE_SOCK_Stream &peer () { return peer_; }
// 反应堆挂钩方法
virtual int handle_input (ACE_HANDLE handle);
virtual int handle_close (ACE_HANDLE = ACE_INVALID_HANDLE,
ACE_Reactor_Mask = 0);
protected:
// 将日志记录转发到服务器日志守护进程
virtual ACE_THR_FUNC_RETURN forward ();
//FUZZ:禁用check_for_lack_ACE_OS
// 使用收集写入操作发送缓冲的日志记录
virtual int send (ACE_Message_Block *chunk[], size_t &count);
//FUZZ: enable check_for_lack_ACE_OS
// 转发器线程的入口点
static ACE_THR_FUNC_RETURN run_svc (void *arg);
// 对消息进行排队的同步<ACE_Message_Queue>
ACE_Message_Queue<ACE_SYNCH> msg_queue_;
// 管理转发器线程
ACE_Thread_Manager thr_mgr_;
// 指向<CLD_Connector>的指针
CLD_Connector *connector_;
// 连接到日志服务器.
ACE_SOCK_Stream peer_;
};
在 CLD Handler 中不需要构造器或析构器,因为它的open()和 close()挂钩会在被 CLD_Acceptor工厂类调用时执行初始化和析构活动。CLD Handler 完成两种角色:输入和输出,下面对这两种角色进行解释。
输入角色:因为CLD_Handler 继承自ACE_Event_Handler,它可以使用 ACE Reactor 框架来等待记录日志记录从客户应用到达:客户应用是通过loopback TCP socket连接到客户日志daemon的。当口志记录到达客户日志daemon时,单体ACEReactor分派下面的CLD_Handler:handle_input()挂钩方法:
// 输入挂钩方法
int CLD_Handler::handle_input (ACE_HANDLE handle)
{
//从socket句柄中读出一个日志记录存储在ACE_Message_Block 中
ACE_Message_Block *mblk = 0;
Logging_Handler logging_handler (handle);
if (logging_handler.recv_log_record (mblk) != -1)
{
//将消息插入到转发者线程中
if (msg_queue_.enqueue_tail (mblk->cont ()) != -1)
{
//只将日志记录数据插入队列中即可
mblk->cont (0);
mblk->release (); //只回收存储主机名的消息块
return 0; // Success return.
}
else
{
mblk->release ();
}
}
//tcp断开或是发生错误时,返回-1
return -1; // Error return.
}
//tcp断开或是发生错误时,返回-1,这个值会触发调用hand_close()函数:
//关闭释放资源函数
int CLD_Handler::handle_close (ACE_HANDLE handle,
ACE_Reactor_Mask)
{ return ACE_OS::closesocket (handle); }
不需要在handle_close()中删除this,因为是由Client_Logging_Daemon 类来进行管理的。
输出角色: CLD_Handler 对象是在 CLD_Connector 的connect()方法调用下面的 open()挂钩方法时被初始化的:
int CLD_Handler::open (CLD_Connector *connector) {
//存储指向CLD_Connector 指针
connector_ = connector;
int bufsiz = ACE_DEFAULT_MAX_SOCKET_BUFSIZ;
//将peer() socket发送缓冲区设置到最大,以使在长延迟和高速网络之上的吞吐量最大化
peer ().set_option (SOL_SOCKET, SO_SNDBUF,
&bufsiz, sizeof bufsiz);
//设置msg_queue_的最高水位为sizeof(ACE_Log_Record)*ACE_IOV_MAX
msg_queue_.high_water_mark (CLD_Handler::QUEUE_MAX);
//使用ACE_Thread_Manager派生一个系列范围的线程
//相对并发执行run_svc静态方法,run_svc将其void参数强转为CLD_Handler指针
//并随即将其处理委托给forward方法。
return thr_mgr_.spawn (&CLD_Handler::run_svc,
this, THR_SCOPE_SYSTEM);
}
最后两行代码:相对并发执行run_svc静态方法,run_svc将其void参数强转为CLD_Handler指针,并随即将其处理委托给forward方法。
ACE_THR_FUNC_RETURN CLD_Handler::run_svc (void *arg) {
CLD_Handler *handler = static_cast<CLD_Handler *> (arg);
return handler->forward ();
}
我们现在描述 CLD_Handler.:forward()方法,它运行在自己的线程中,并将日志记录转发给服务器日志 daemon。如下所示,通过缓冲日志记录,直至达到最大限度的数目或是最大限度的时间已流逝,这个方法优化了网络吞吐量。
ACE_THR_FUNC_RETURN CLD_Handler::forward () {
ACE_Message_Block *chunk[ACE_IOV_MAX]; //尽可能多地缓存消息块
size_t message_index = 0; /定义索引来跟踪被缓存的记录的数目
ACE_Time_Value time_of_last_send (ACE_OS::gettimeofday ());
ACE_Time_Value timeout; //记录最后发送日志时间和下一次刷出时间
//忽略SIGPIPE信号 能够正常执行路径中所有的send函数
ACE_Sig_Action no_sigpipe ((ACE_SignalHandler) SIG_IGN);
ACE_Sig_Action original_action;
no_sigpipe.register_action (SIGPIPE, &original_action);
//循环到消息队列被停用
for (;;) {
//在第一次循环,一次超时后,以及消息块被转发之后使刷出超时复位
if (message_index == 0) {
timeout = ACE_OS::gettimeofday ();
timeout += FLUSH_TIMEOUT;
}
//从队列中取到指向下一个ACE_Message_Block指针,最多等待到下次刷出超时
ACE_Message_Block *mblk = 0;
if (msg_queue_.dequeue_head (mblk, &timeout) == -1) {
//出队失败 且不是超时 直接返回
if (errno != EWOULDBLOCK) break;
else if (message_index == 0) continue; //超时且没有缓存任何消息记录
} else {
//取出消息块 判断大小及类型
if (mblk->size () == 0
&& mblk->msg_type () == ACE_Message_Block::MB_STOP)
{ mblk->release (); break; }
//将消息块存储到可用槽中,数量+1
chunk[message_index] = mblk;
++message_index;
}
//缓存器填满或FLUSH_TIMEOUT到期
//调用send方法在一次集中写时刷出所缓存的日志记录
if (message_index >= ACE_IOV_MAX ||
(ACE_OS::gettimeofday () - time_of_last_send
>= ACE_Time_Value(FLUSH_TIMEOUT))) {
if (this->send (chunk, message_index) == -1) break;
time_of_last_send = ACE_OS::gettimeofday ();
}
}
//如果dequeue_head()失败或是接收到关闭消息
//余下的所有缓存的日志记录都将通过CLD_Handler::send()调用而被刷出
if (message_index > 0)
this->send (chunk, message_index);
msg_queue_.close ();//关闭消息队列 释放资源
no_sigpipe.restore_action (SIGPIPE, original_action);//恢复SIGPIPE信号
return 0;//从CLD_Handler::forward()中返回,从而结束线程
}
CLD_Handler::send()方法将所缓冲的日志记录发送给日志服务器。它还负责在连接被关闭的情况下重新连接到服务器。
int CLD_Handler::send (ACE_Message_Block *chunk[], size_t &count) {
//集中写操作,从chunk获取的数据指针和长度 放入到iovec 数组中
iovec iov[ACE_IOV_MAX];
size_t iov_size;
int result = 0;
for (iov_size = 0; iov_size < count; ++iov_size) {
iov[iov_size].iov_base = chunk[iov_size]->rd_ptr ();
iov[iov_size].iov_len =
ACE_Utils::truncate_cast<u_long> (chunk[iov_size]->length ());
}
//sendv_n 方法在一次集中写操作中刷出所缓存的日志记录
while (peer ().sendv_n (iov, ACE_Utils::truncate_cast<int> (iov_size)) == -1)
//sendv_n 失败 会进行重连接操作,如果重链接成功 会再次调用sendv_n 函数
if (connector_->reconnect () == -1) {
result = -1;
break;
}
//释放所有的日志数据记录 将所有的ACE_Message_Block 指针置为0
while (iov_size > 0) {
chunk[--iov_size]->release (); chunk[iov_size] = 0;
}
count = iov_size; //将cout置为0
return result;
}
CLD_Handler:close()是一个 public 方法,CLD_Acceptor::handle_close()方法或Client_Logging_Daemon:fini()方法调用它来关闭处理器。它这样将一个人小为 0、类型为MB_STOP 的消息插入消息队列中:
int CLD_Handler::close () {
ACE_Message_Block *shutdown_message = 0;
ACE_NEW_RETURN
(shutdown_message,
ACE_Message_Block (0, ACE_Message_Block::MB_STOP), -1);
//在队尾插入一个MB_STOP关闭信号
msg_queue_.enqueue_tail (shutdown_message);
return thr_mgr_.wait ();
}
当转发者线程接收到 shutdown_message 时,它将余下的日志记录刷出给日志服务器,关闭消息队列,并退出线程。我们使用 ACE_Thread_Manager:wait()方法在返回之前阻塞至转发者线程退出为止。这个方法还收取转发者线程的退出状态,以防止内存泄漏。
接下来介绍CLD_Accept类,定义如下:
//同样继承于ACE_Event_Handler类
class CLD_Acceptor : public ACE_Event_Handler {
public:
//FUZZ: disable check_for_lack_ACE_OS
// Initialization hook method.
virtual int open (CLD_Handler *, const ACE_INET_Addr &,
ACE_Reactor * = ACE_Reactor::instance ());
//FUZZ: enable check_for_lack_ACE_OS
// Reactor hook methods.
virtual int handle_input (ACE_HANDLE handle);
virtual int handle_close (ACE_HANDLE = ACE_INVALID_HANDLE,
ACE_Reactor_Mask = 0);
virtual ACE_HANDLE get_handle () const;
protected:
// Factory that passively connects <ACE_SOCK_Stream>s.
ACE_SOCK_Acceptor acceptor_;
//日志处理器指针
CLD_Handler *handler_;
};
CLD_Accept同样继承于ACE_Event_Handler类,它可以向ACE Reactor登记自身来接收连接,如下所示:
int CLD_Acceptor::open (CLD_Handler *handler,
const ACE_INET_Addr &local_addr,
ACE_Reactor *r) {
reactor (r); // Store the reactor pointer.
handler_ = handler;
//指示acceptor_开始监听 向反应堆登记this对象 以接收新的连接
if (acceptor_.open (local_addr) == -1
|| reactor ()->register_handler
(this, ACE_Event_Handler::ACCEPT_MASK) == -1)
return -1;
return 0;
}
反应器双重分配到下面的CLD_Acceptor::get_handle()方法,从而获取acceptor的socket句柄
ACE_HANDLE CLD_Acceptor::get_handle () const
{ return acceptor_.get_handle (); }
当连接到达日志daemon时,单体反应器分派下面的CLD_Acceptor::handle_input挂钩方法:
int CLD_Acceptor::handle_input (ACE_HANDLE) {
//连接进入peer_stream后,后者只是接收连接并初始化登记到反应器的新socket句柄
//因此它无需在hand_input后存在
ACE_SOCK_Stream peer_stream;
if (acceptor_.accept (peer_stream) == -1) return -1;
//使用register_handler的参数变体向反应器,为read事件登记指向CLD_Handler的指针
//register_handler使得日志记录到达时,会进入CLD_Handler::handle_input 函数中
else if (reactor ()->register_handler
(peer_stream.get_handle (),
handler_,
ACE_Event_Handler::READ_MASK) == -1)
return -1;
else return 0;
}
如果在接受连接或登记句柄及事件处理器时发生错误,反应器自动会调用下面的handle_close()方法:
//关闭连接
int CLD_Acceptor::handle_close (ACE_HANDLE, ACE_Reactor_Mask) {
acceptor_.close ();
handler_->close ();
return 0;
}
这个方法既关闭接受器工厂,也关闭CLD_Handler。CLD_Handler:close()触发对消息队列和转发备线程的关闭。
接下来介绍CLD_Connect类,这个类提供了以下的功能:
class CLD_Connector {
public:
//FUZZ: disable check_for_lack_ACE_OS
// 建立与日志服务器的连接
// at the <remote_addr>.
int connect (CLD_Handler *handler,
const ACE_INET_Addr &remote_addr);
//FUZZ: enable check_for_lack_ACE_OS
// 重新建立与日志服务器的连接
int reconnect ();
private:
// 指向我们正在连接的<CLD_Handler>的指针.
CLD_Handler *handler_;
//日志服务器正在侦听连接的地址.
ACE_INET_Addr remote_addr_;
};
CLD_Connect主要负责连接和重连,连接函数如下:
int CLD_Connector::connect
(CLD_Handler *handler,
const ACE_INET_Addr &remote_addr) {
//使用ACE socket与服务器日志建立连接
ACE_SOCK_Connector connector;
if (connector.connect (handler->peer (), remote_addr) == -1)
return -1;
//调用open激活CLD_Handler,c成功该方法派生一个线程来运行CLD::Handler::forward()
//失败则在handler上调用handler_close()函数关闭socket
else if (handler->open (this) == -1)
{ handler->handle_close (); return -1; }
//存储处理器和远地地址
//简化reconnect方法的实现
handler_ = handler;
remote_addr_ = remote_addr;
return 0;
}
reconnect该方法被用于在日志服务器关闭了客户连接时(无论是由于崩溃,还是由下Evictor模式,重新连接到日志服务器。如下所示,reconnect()方法使用了一种指数后退算法水避免发生日志服务器被连接请求“淹没”的情况:
int CLD_Connector::reconnect () {
// 重试连接的最大次数
const size_t MAX_RETRIES = 5;
ACE_SOCK_Connector connector;
ACE_Time_Value timeout (1); // Start with 1 second timeout.
size_t i;
for (i = 0; i < MAX_RETRIES; ++i) {
if (i > 0) ACE_OS::sleep (timeout);
if (connector.connect (handler_->peer (), remote_addr_,
&timeout) == -1)
timeout *= 2; // Exponential backoff.
else {
//同样将peer ()设置为最大的尺寸 以使长延迟或高速网络上的吞吐最大量
int bufsiz = ACE_DEFAULT_MAX_SOCKET_BUFSIZ;
handler_->peer ().set_option (SOL_SOCKET, SO_SNDBUF,
&bufsiz, sizeof bufsiz);
break;
}
}
return i == MAX_RETRIES ? -1 : 0;
}
Client_Logging_Daemon:这个类是facade,它集成了上面描述的三个类CLD_Handler 、CLD_Accept、CLD_Connect来实现日志服务器daemon,如下所示:
class Client_Logging_Daemon : public ACE_Service_Object {
public:
virtual ~Client_Logging_Daemon () {} // Turn off g++ warnings.
// Service Configurator hook methods.服务配置方法
virtual int init (int argc, ACE_TCHAR *argv[]);
virtual int fini ();
#if 0
// Implementing these methods is left as an exercise for the reader.
virtual int info (ACE_TCHAR **bufferp, size_t length = 0) const;
virtual int suspend ();
virtual int resume ();
#endif
protected:
// Receives, processes, and forwards log records.
CLD_Handler handler_;
// Factory that passively connects the <CLD_Handler>.
CLD_Acceptor acceptor_;
// Factory that actively connects the <CLD_Handler>.
CLD_Connector connector_;
};
Client_Logging_Daemon 继承自 ACE_Service_Object。因此,可以通过一个svc.conf 文件来对它进行动态配置,svc.conf 文件是由上节所描述的ACE Service Confgurator 来进行处理的。当Client_Logging_Dacmon 的实例被动态链接时,ACE Service Configurator 框架会调用下而显示的Client Logging Daemon::init():
int Client_Logging_Daemon::init (int argc, ACE_TCHAR *argv[]) {
//指定缺省的客户日志daemon侦听端口cld_port、客户日志服务器daemon端口sld_port
//及主机名sld_host 通过传入的参数来进行改变 通常通过-s选项来设置服务器日志daemon的主机名
u_short cld_port = ACE_DEFAULT_SERVICE_PORT;
u_short sld_port = ACE_DEFAULT_LOGGING_SERVER_PORT;
ACE_TCHAR sld_host[MAXHOSTNAMELEN];
ACE_OS::strcpy (sld_host, ACE_LOCALHOST);
//解析svc.conf文件中的参数 可以是-p -r -s的任何选项
ACE_Get_Opt get_opt (argc, argv, ACE_TEXT ("p:r:s:"), 0);
get_opt.long_option (ACE_TEXT ("client_port"), 'p',
ACE_Get_Opt::ARG_REQUIRED);
get_opt.long_option (ACE_TEXT ("server_port"), 'r',
ACE_Get_Opt::ARG_REQUIRED);
get_opt.long_option (ACE_TEXT ("server_name"), 's',
ACE_Get_Opt::ARG_REQUIRED);
for (int c; (c = get_opt ()) != -1;)
switch (c) {
case 'p': // Client logging daemon acceptor port number.
cld_port = static_cast<u_short> (ACE_OS::atoi (get_opt.opt_arg ()));
break;
case 'r': // Server logging daemon acceptor port number.
sld_port = static_cast<u_short> (ACE_OS::atoi (get_opt.opt_arg ()));
break;
case 's': // Server logging daemon hostname.
ACE_OS::strsncpy
(sld_host, get_opt.opt_arg (), MAXHOSTNAMELEN);
break;
}
//得到地址后构造建立连接所需要的地址
ACE_INET_Addr cld_addr (cld_port);
ACE_INET_Addr sld_addr (sld_port, sld_host);
//初始化acceptor_和connector_
if (acceptor_.open (&handler_, cld_addr) == -1)
return -1;
else if (connector_.connect (&handler_, sld_addr) == -1)
{ acceptor_.handle_close (); return -1; }
return 0;
}
在日志服务器被移除时,ACE Service Confgurator框架会调用下面的Client_Logging_Daemon::fini()挂钩方法:
int Client_Logging_Daemon::fini () {
acceptor_.handle_close ();
handler_.close ();
return 0;
}
这个 fini()方法关闭ACE_SOCK_Acceptor工厂的socket和CLD_Handler,从而触发消息队列和转发者线程的火闭。ACE Service Configurator 框架将在fini()返间之后删除 Client_Logging_Daemon 实例。
现在我们已经实现了客户日志 daemon的所有类,我们将描述的ACE_FACTORY_DEFINE 宏放在实现文件中:一般放在实现的最下面
ACE_FACTORY_DEFINE (CLD, Client_Logging_Daemon)
这个宏定义_make_AIO_Client_Logging_Daemon()工厂函数,该函数被用于下面的svc.conf 文件中:
dynamic AIO_Client_Logging_Daemon Service_Object *
CLD:_make_Client_Logging_Daemon() "-p $CLINET_LOGGING_DAEMON_PORT"
这个文件指示 ACE Service Configurator 框架通过以下步骤来配置客户日志 daemon:
ACE_TASK类
上述的ACE_Meassage_Queue类可被用于:
1、解除信息流与其处埋的耦合
2、将并发地执行生产者/消费者服务的线程链接在一起
为了保持模块性和内聚性,并降低耦,系统引入了ACE_task,主要提供了以下能力:
ACE_task类提供了丰富的接口,主要将其分为以下三个领域:,现在的ACE_Task类设计继承了ACE_Task_Base类,ACE_Task_Base继承于ACE_Service_Object,ACE_Task_Base中主要提供了open、close、put、svc、activate、wait、thr_cond等方法。
1、任务初始化方法 初始化任务的方法
应用可以通过重新定义open()挂钩方法来定制ACE_Task的启动行为。该方法分配任务所用的资源,比如连接处理器、IO 句柄以及同步锁。通过调用ACE_Task::activaie(),open()还常常被用于将任务转换为主动对象。
thr_mgr()和 msg_queue()方法便得程序员有可能访问和改变某个任务所用的线程管理和消息排队机制。也可在类的实例被创建时将其他的线程管理和消息排队机制传给ACETask 构造器
activate()方法使用 r_mgr()访问器所返回的ACE_Thread_Manager指针来派生一个或多个运行在任务之中的线程。该方法将任务转换为主动对象,其线程导引自身的执行,对事件做出响应,而不是完全由“借用调用者的线程的坡动方法调用”来驱动。
2、任务通信、处理和同步方法 用于在任务间进行通以及在务中主动和被动地处理消息的方法:
通过重新定义 ACE_Task 的put()和 svc()挂钩方法,其子类可在传递给它的消息上执行应用定义的处理,从而实现下面的两种处理模型:
被动处理 :put()方法被用于传递消息给某个ACE_Task。在任务之间传递的是指向ACE_Message_Block的指针,以避免数据拷贝开销。任务处理可以完全在put()的上下文中执行:在那里,其处理期间,会借用调用者的线程。如果任务只在put()中被动地处理请求,就无需使用任务的 svc()挂钩方法。
主动处理 :任务的应用定义的处理也可以被主动地执行。在这种情况下,一个或多个线程执行任务的 svc()挂钩方法,以相对于应用中的其他活动并发地处理消息。如果任务的put()方法没有完成某个消息上的所有处理,它可以使用putq()将消息放入队列,并立刻返回其调用者。
任务的 svc()挂钩方法可以使用ACE_Task::getq()来取出放置在队列中的消息,并且并发地处理它们。getq()方法会阻塞到消息队列中有消息可用,或是规定的绝对超时到期。geq()的阻塞本性允许任务的线程阻塞起米,仅在消息队列中有工作可做时才苏醒过来。
不像 put(),svc()方法从米都不会被任务的客户直接调用。相反,它会在化务的 activate()方法被调用之后,任务变为主动对象时,被一个或多个线程调用。activate()方法使用与ACE_Task相关联的ACE_Thread_Manager来派生个或多个线程,如下所示:
int ACE_Task_Base::activate (long flags,
int n_threads,
int force_active,
long priority,
int grp_id,
ACE_Task_Base *task,
ACE_hthread_t thread_handles[],
void *stack[],
size_t stack_size[],
ACE_thread_t thread_ids[],
const char* thr_name[])
{
/---/
//flags主要负责传标识
grp_spawned =
this->thr_mgr_->spawn_n (thread_ids,
n_threads,
&ACE_Task_Base::svc_run,
(void *) this,
flags,
priority,
grp_id,
stack,
stack_size,
thread_handles,
task,
thr_name);
/---/
}
ACE_Task:svc_run()是一个静态方法,activate()将其用作适配器(Adapter)函数。它运行在新派生的控制线程中,这些线程为svc()挂钩方法提供了执行上下文。上图说明了与“使用 Windows _beginthreadx()函数派生线程,从而激活 ACE _Task”相关联的各个步骤。自然,ACE_Task 类将应用与OS特有的细节屏蔽开来。
在某个 ACE_Task 子类作为主动对象执行时,它的svc()方法运行一个事件循环,使用其 getq()方法来等待消息到达任务的消息队列。这个队列可以为任务的svc()方法所进行的后续处理缓冲系列数据消息和控制消息。如下图所示,当消息到达,并被任务的 put()方法放入队列中时,它的svc()方法运行在另外的线程中,从队列中取出消息,并且并发地执行应用所定义的处理。
3、任务解析 任务的全部或部分析构中所用的方法
如果任务激活了多个线程,close()方法在仍有其他线程在执行的情况下,不能放资源(或是删除任务对象自身)。thr_count()方法返回任务中仍然在活动的线程的数目。ACE_Task在调用 close()之前将线程计数减一,所以如果 thr_count()返回的值大于0,对象就仍然是活动的。wait()方法可被用于阻塞至运行在此任务中的所有线程退出,这时 tr_count()的返回值等于0
下面将通过一个案例将ACE_Task、ACE_Meassage_Queue以及ACE_Reator和ACE_Server_Config结合在一起,实现并发的日志服务器。其服务器设计基于Half-Sync/Half-Async模式,使用饥饿模式的线程池策略。
图中显示了可怎样在日志服务器被装入时预先派生一池工作者线程。多个日志记录可被并发地处理,直至同时发生的客户清求的数目超过工作者线程的数目。这时,主线程在一个同步化ACE_Message_Queue 中缓冲多出来的请求,直至有工作各线程可用或是队列变满为止。
ACE_Message_Queue在线程池日志中扮演的半同步/半异步并发设计中扮演了若干角色:
它解除了主反应器线程与线程池的耦合:该设计允许多个工作者线程同时活动。它还将维护日志记录数据队列的责任从内核空间移到了用户空间,在其中有比内核更多的虚拟内存来对日志记录进行排队。
它有助于在客户和服务器之间实施流控制 :当消息队列中的字节数到达其高水位标时,它的流控制协议会阻塞主线程。当底层的TCP socket缓冲区填满时,流控制会传播(Propagate)回服务器的客户。这将阻止客户建立新的连接或是发送日志记录,直至工作者线程有机会赶上,并解除主线程的阻塞。
线程池日志服务器所覆盖的类主要如下所示:
图6.10显示了这些类之间的关系。TP_Logging_Acceptor和TP_Logging_Handler 类扮演 Half-Sync/Half-Async 模式中的反应式角色,而TP_Logging_Task::svc()方法(它并发地运行在各个工厂者线程中)扮演该模式中的同步角色。包含的头文件主要如下:
#include "ace/Singleton.h"
#include "ace/Synch.h"
#include "ace/Task.h"
#include "Logging_Acceptor.h"
#include "Logging_Event_Handler.h"
#include "Reactor_Logging_Server_T.h"
#include "TPLS_export.h"
#include <memory>
首先介绍TP_Logging_Task类,该类主要提供以下能力:
它派生自 ACE Task,前者实例化后者来提供同步化的ACE_Message_Qucue。
它派生一池工作者线程,它们运行同一个svc()方法来处理和存储插入其同步化消息队列的日志记录。
class TP_Logging_Task : public ACE_Task<ACE_SYNCH> {
// Instantiated with an MT synchronization trait.
public:
enum { MAX_THREADS = 4 };
//调用ACE_Task::activate将任务转换为主动对象
//如果activate 返回成功,TP_Logging_Task::svc()会运行在MAX_THREADS分离的线程中
virtual int open (void * = 0)
{ return activate (THR_NEW_LWP, MAX_THREADS); }
//将一个含有日志记录的消息消息块插入到队列中
virtual int put (ACE_Message_Block *mblk,
ACE_Time_Value *timeout = 0)
{ return putq (mblk, timeout); }
virtual int svc ();
};
//因为是dll,所有使用ACE_Unmanaged_Singleton 而不是ACE_Singleton
//用于定义工厂 因为在主线程执行 所有使用ACE_Null_Mutex
typedef ACE_Unmanaged_Singleton<TP_Logging_Task, ACE_Null_Mutex>
TP_LOGGING_TASK;
我们只需要 TP_Logging_Task 的一个实例,所以我们使用了单体适配器模板将其转换为单体。但是,因为TP_Logging_Task将位于DLL中,我们必需使用ACE_Unmanaged_Singleton,而不是ACE_Singleton。这一设计要求我们在日志任务于TP_Logging_Server::fini()中关闭时显式地关闭单体。
因为 TP_LOGGING_TASK::instance()只在主线程中被访问,我们使用 ACE_Null_Mutex 来作为ACE_Unmanaged_Singleton 的同步类型参数。如果此单体要由其他线程并发访问,我们就需要通过ACE_Recursive_Thread Mutex来参数化它,以使访问序列化。
下面将介绍TP_Logging_Accept,主要功能如下:
它接受来自客户日志 daemon 的连接
它创建 TP_Logging_Handler,由后者接收来自相连客户的日志记录,TP_Logging Acceptor 类如下所示:
//继承于上述描述过的Logging_Acceptor
class TP_Logging_Acceptor : public Logging_Acceptor {
public:
TP_Logging_Acceptor (ACE_Reactor *r = ACE_Reactor::instance ())
: Logging_Acceptor (r) {}
//有数据发送时执行handler_input函数
//创建TP_Logging_Handler 实例
virtual int handle_input (ACE_HANDLE) {
TP_Logging_Handler *peer_handler = 0;
ACE_NEW_RETURN (peer_handler,
TP_Logging_Handler (reactor ()), -1);
if (acceptor_.accept (peer_handler->peer ()) == -1) {
delete peer_handler;
return -1;
} else if (peer_handler->open () == -1)//TP_Logging_Handler 调用open方法处理数据
peer_handler->handle_close (ACE_INVALID_HANDLE, 0);
return 0;
}
};
下一个将介绍TP_Logging_Handler 类,主要提供了以下能力:
它接收来自相连客户的日志记录。
它将日志记录放入 TP_LOGGING_TASK单体的同步化消息队列中。
class TP_Logging_Acceptor;
//派生自Logging_Event_Handler类
class TP_Logging_Handler : public Logging_Event_Handler {
friend class TP_Logging_Acceptor;
protected: //析构为protected是为了动态分配 handle_close会删除该对象
virtual ~TP_Logging_Handler () {} // No-op destructor.
// 指向当前位于<TP_LOGGING_TASK>单例消息队列中的此类实例的指针数.
int queued_count_;
// 指示是否必须调用<Logging_Event_Handler::handle_close()>来清理和删除此对象
int deferred_close_;
// 序列化对<queued_count_>和<deferred_close_>的访问
ACE_SYNCH_MUTEX lock_;
public:
TP_Logging_Handler (ACE_Reactor *reactor)
: Logging_Event_Handler (reactor),
queued_count_ (0),
deferred_close_ (0) {}
// 当输入事件发生时调用,例如连接或数据
// 扮演了Half-Sync/Half-Async模式中的反应式角色,它不会立刻处理而是插入到队列中进行并发处理
// 将日志记录一个含有处理器的指针的消息块结合到了一起,然后插入到末尾
virtual int handle_input (ACE_HANDLE);
// 当该对象被销毁时调用,例如,当它从反应器中移除时
virtual int handle_close (ACE_HANDLE, ACE_Reactor_Mask);
};
对多个并发的TP_Logging_Handler 对象进行关闭也需要比较谨慎,方法如下:
当日志客户关闭连接时,TP_Iogging_Handler::hande_input()返回-1。反应器随即调用处理器的 handle_close()方法,该方法通常会清理资源,并删除处理器。使用了一种引用计数协议来确保在指向某个处理器的指针仍在使用中时,该处理器不会被销毁。
该协议对某个处理器驻留在 TP_LOGGING_TASK 单体的消息队列中的次数进行计数。如果在日志客户 socket 被关闭时计数大于0,TP_Logging_Handler;:handle_close()就还不能销毁处理器随后,随着 TP_LOGGING_TASK处理各个日志记录,处理器的引用计数会被减少。当计数到达0 时,处理器就可以完成对先前延缓的关闭请求的处理了。
TP_Logging_Handler::handle_input()扮演了Half-Sync/Half-Async模式中的反应式角色,它不会立刻处理,而是插入到队列中进行并发处理,将日志记录一个含有处理器的指针的消息块结合到了一起,然后插入到末尾。
int TP_Logging_Handler::handle_input (ACE_HANDLE) {
//从socket中读入消息 放入到ACE_Message_Block中
ACE_Message_Block *mblk = 0;
if (logging_handler_.recv_log_record (mblk) != -1) {
//创建一个log_blk消息块,其中包括this指针,内部设置DONT_DELETE标识
//确保当消息块在TP_Logging_Task::svc()方法中被释放,处理器自身不会被销毁
ACE_Message_Block *log_blk = 0;
ACE_NEW_RETURN
(log_blk, ACE_Message_Block
(reinterpret_cast<char *> (this)), -1);
log_blk->cont (mblk);
//使用ACE_GUARD获取lock_,序列化queued_count_的访问 在put前获取
ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, guard, lock_, -1);
//put成功,计数++
//release 会释放guard
if (TP_LOGGING_TASK::instance ()->put (log_blk) == -1)
{ log_blk->release (); return -1; }
++queued_count_;
return 0;
}
//客户端关闭连接或者发生错误,会调用handle_close函数
else return -1;
}
当客户端关闭连接或者发生错误,TP_Logging_Handler::handle_input会返回-1,之后会调用handle_close函数 :
int
TP_Logging_Handler::handle_close (ACE_HANDLE handle,
ACE_Reactor_Mask) {
//标识是否调用handle_close
int close_now = 0;
if (handle != ACE_INVALID_HANDLE) {
//ACE_GUARD保护临界区数据
ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, guard, lock_, -1);
if (queued_count_ == 0)//TP_Logging_TASK不再存在这个对象的引用
close_now = 1;
else
deferred_close_ = 1;//记住只要引用计数为0就销毁这个处理器
} else
//TP_Logging_TASK::svc()中调用handle == ACE_INVALID_HANDLE
//如果计数为0 则释放 同样使用ACE_GUARD保护临界区数据
{
ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, guard, lock_, -1);
queued_count_--;
if (queued_count_ == 0) close_now = deferred_close_;
}
if (close_now)//如果不存在该对象引用 则释放对象并随即删除自身
return Logging_Event_Handler::handle_close ();
return 0;
}
上面已经描述了TP_Logging_Handler类,下面将介绍TP_Logging_TASK::svc()方法。该方法并行地运行在各个工作者线程中,实现Half-Sync/Half-Async模式中的同步角色。它运行在自己的事件循环,这个事件循环阻寨在同步化的消息队列上。在消息被TP_Logging_Handler::handle_input()放入队列之后,它将由某个空闲的工作者线程从队列中取出,并写入与客户对应的适当的日志文件中。如下所示:
int TP_Logging_Task::svc () {
//调用getq()方法,阻塞到消息块可用
//每个消息块都是一个复合消息 通过继续指针连接起来
for (ACE_Message_Block *log_blk; getq (log_blk) != -1; ) {
TP_Logging_Handler *tp_handler = reinterpret_cast<TP_Logging_Handler *> (log_blk->rd_ptr ());
//通过log_file 初始化logging_handler 调用write_log_record 将日志记录写到日志文件中
Logging_Handler logging_handler (tp_handler->log_file ());
logging_handler.write_log_record (log_blk->cont ());
//回收已分配的资源 减少了TP_Logging_Handler的计数,并适当的清理对象
log_blk->release ();
tp_handler->handle_close (ACE_INVALID_HANDLE, 0);
}
return 0;
}
接下来介绍TP_Logging_Server。这个facade 类继承目ACE_Service_Object,包含一个Reactor_Logging_Server,并且使用了TP_LOGGING_TASK单体。如下所示:
class TP_Logging_Server : public ACE_Service_Object {
protected:
// Contains the reactor, acceptor, and handlers.
typedef Reactor_Logging_Server<TP_Logging_Acceptor>
LOGGING_DISPATCHER;
LOGGING_DISPATCHER *logging_dispatcher_;
public:
TP_Logging_Server (): logging_dispatcher_ (0) {}
//增强了反应式日志服务器的实现
//分配了TP_Logging_Server::LOGGING_DISPATCHER,并将其存储在logging_dispatcher_中
//调用来TP_Logging_Task::open 预先派生一池工作者线程
virtual int init (int argc, ACE_TCHAR *argv[]) {
int i;
char **array = 0;
ACE_NEW_RETURN (array, char*[argc], -1);
std::unique_ptr<char *[]> char_argv (array);
for (i = 0; i < argc; ++i)
char_argv[i] = ACE::strnew (ACE_TEXT_ALWAYS_CHAR (argv[i]));
ACE_NEW_NORETURN
(logging_dispatcher_,
TP_Logging_Server::LOGGING_DISPATCHER
(i, char_argv.get (), ACE_Reactor::instance ()));
for (i = 0; i < argc; ++i) ACE::strdelete (char_argv[i]);
if (logging_dispatcher_ == 0) return -1;
else return TP_LOGGING_TASK::instance ()->open ();
}
//反初始化函数
virtual int fini () {
TP_LOGGING_TASK::instance ()->flush ();//关闭与任务相关联的消息队列
TP_LOGGING_TASK::instance ()->wait ();//等待线程池退出
TP_LOGGING_TASK::close ();//显式关闭单体
delete logging_dispatcher_;//删除init初始化的logging_dispatcher_
return 0;
}
};
最后将ACE_FACTORY_DEFINE 放进TP_Logging_Server.cpp中
ACE_FACTORY_DEFINE (TPLS, TP_Logging_Server)
这个宏自动定义_make_TP_Logging_Server()工厂函数,该函数被用在svc.conf文件中:
dynamic TP_Logging_ServerService_Object *
TPLS:_make_TP_Logging_Server() "&TP_LOGGING_SEVRER_PORT"
这个文件指示 ACE Service Confgurator 框架通过以下步骤来配置线程池日志服务器:
1、动态地将TPLS DLL链接到进程的地址空间中。
2、使用 ACE_DLL 类米从 TPLS DLL, 符号表中提取_make_TP_Logging_Server()工厂函数。
3、调用该函数,获取一个指问动态分配的TP_Logging_Server 的指针。
4、Service Configurator框架通过该指针调用TP_Logging_Server::init()挂钩方法,将TP_LOGGING SERVER_PORT环境变量的值作为其唯一的参数传给它。这个字符串指定日志服务器用于侦听客户连接谁求的端口号。
5、如果 init()成功的话,TP_Logging_Server 指针被存储在 ACF_Service_Repository 中、在TP_Logging_Server名下
ACE Service Configurator 框果赋予了我们复用来自 Configurable_Logging_Server.cpp 的main()程序的能力。也就是下面这段代码:
#include "ace/OS_main.h"
#include "ace/Service_Config.h"
#include "ace/Reactor.h"
int ACE_TMAIN (int argc, ACE_TCHAR *argv[]) {
//向ACE Service Configurator 框架登记静态Reporter服务
ACE_STATIC_SVC_REGISTER (Reporter_Descriptor);
//调用ACE_Service_Config::open()配置应用 加载相关决策
//因为明确了是静态加载 则加了第四个参数0
//也可以直接在指令行上提供-y来控制是否禁用静态
ACE_Service_Config::open
(argc, argv, ACE_DEFAULT_LOGGER_KEY, 0);
ACE_Reactor::instance ()->run_reactor_event_loop ();
return 0;
}
ACE Task 框果允许开发者使用强大而可扩展的面向对象设计来创建和配置并发的网络化应用,该框架提供了 ACE Task类,将多线程与面向对象的编程和排队集成在一起。ACE Task中的排队机制使用了 ACE_Message_Queue 类在任务间高效地传输消息。因为 ACE_Task派生ACE_Service_Object,开发者很容易设计能被动态配置、以作为主动对象运行的服务,并且可以通过ACE Reactor 框架对这些服务进行分派。