Active Object 设计模式:
1) 根据对象被调用的方式,可以将对象分为两类:
Passive Object和Active Object。Passive 和 Object和调用者在同一个线程中,这就是我们通常所用的函数调用。而Active Object和调用在不同的线程中,它有自己的控制线程。Active Object设计模式是一种应用于并发编程的设计模式,它通过解耦对象的访问和对象的执行来增加并发性,从而简化对象的同步访问。
将方法的调用和方法的执行解耦。方法的调用在客户线程中,而方法的执行在另一个独立的线程中。另外,通过良性设计,使得客户端看起来就像是调用一个普通的方法。具体来说:我们用一个Proxy来描述Active Object的接口,由一个Servant来提供Active Object的实现。Proxy和Servant运行在两个不同的线程中,这样方法的调用和方法的执行就可以并发地运行了。其中Proxy运行在客户线程中,而Servant运行在另一个线程中。在运行期,Proxy将客户端的方法调用转化为方法请求,它们通过一个调度器保存在一个请求链表中,调度器运行在Servant线程中,它将方法请求从链表中取出,交由Servant执行。客户还可以获得运行结果。由于方法的调用和运行在不同的线程中,所以运行结果并不是立即返回的。客户只能通过阻塞等待或者循环查询的方式来获得运行结果,这个结果我们用Future来表示。
2) 在Active Object设计模式中,有6个关键的参与者:
Proxy:提供一个接口,客户通过该接口调用Active Object的方法。通过Proxy应用程序可以使用强类型的编程语言,而不是线程间松散的消息。Proxy位于客户线程中。
方法请求(Method Request):方法请求类定义用于执行Active Object方法的接口。客户每次调用Proxy定义的方法,就会构造一个方法请求对象。一个方法请求包含了方法参数等上下文信息,这些上下文信息用于方法的执行和执行结果的返回。
请求链表:Proxy将具体的方法请求对象插入到请求链表中。请求链表既用于保存方法请求,又用于判断方法请求的可执行性。请求链表将Proxy线程和Servant线程解耦,使得它们可以并发地运行。
调度器(Scheduler):调度器运行于Active Object线程,用于调度方法请求。调度器使用请求链表来管理方法请求,调度的策略可以根据应用程序的实际需要进行选择。
工作者(Servant):定义Active Object的行为和状态。工作者实现的方法需要与Proxy 接口、方法请求一致。当方法请求被调度器调度时,工作者的方法才会得到执行,因此,它总是运行在调度器线程中。
结果(Future):客户通过Proxy调用一个方法,就可以接收到Future。Future使得客户可以获得方法调用的执行结果。由于客户的请求和方法的执行在不同的线程中,所以Future只能通过阻塞等待或者循环查询的方式获取。
       
#include "ace/Log_Msg.h"
 #include "ace/OS.h"
 #include "ace/Task.h"
 #include "ace/Method_Request.h"
 #include "ace/Future.h"
 #include "ace/Activation_Queue.h"
 #include <memory.h>
 using namespace std;
#pragma comment(lib,"ace.lib")
// Servant 类对status_result_ 进行简单的累加工作
class Servant
 {
 public:
     Servant()
     {
         status_result_ = 1;
     }
     int status_update(void) 
     {
         ACE_DEBUG((LM_DEBUG, ACE_TEXT("Obtaining a status_update in %t ") ACE_TEXT("thread of control\n")));
         ACE_OS::sleep(2);
         return next_result_id();
     }
 private:
     int next_result_id(void)
     {
         return status_result_++;  
     };
     int status_result_;
 };
class StatusUpdate : public ACE_Method_Request
 {
 public:
     StatusUpdate(Servant & controller, ACE_Future<int>&returnval)
         : controller_(controller), returnVal_(returnval)
     {
         ACE_DEBUG((LM_DEBUG, "StatusUpdate::StatusUpdate.\n"));
     }
     virtual int call(void)
     {
         ACE_DEBUG((LM_DEBUG, "StatusUpdate::call.\n "));
         this->returnVal_.set(this->controller_.status_update());
         return 0;
     }
     
     private:
     Servant& controller_;
     ACE_Future<int> returnVal_;
};
class ExitMethod : public ACE_Method_Request
 {
 public:
     virtual int call(void)
     {
         return -1;
     }
 };
// 调度器类
 class Scheduler : public ACE_Task_Base
 {
 public:
     Scheduler()
     {
         ACE_DEBUG((LM_DEBUG, "Scheduler::Scheduler.\n"));
         this->activate();
     }
    virtual int svc(void)
     {
         ACE_DEBUG((LM_DEBUG, "Scheduler::svc.\n"));
         while(1)
         {
             auto_ptr<ACE_Method_Request>  request
             (this->activation_queue_.dequeue());
             if (request->call() == -1)
                 break;
         }
         return 0;
     }
    int enqueue(ACE_Method_Request* request) 
     {
         ACE_DEBUG((LM_DEBUG, "Proxy::status_update.\n"));
         return this->activation_queue_.enqueue(request);
     }
     private:
     ACE_Activation_Queue activation_queue_;
 };
//既然在Active Object设计模式中使用了ACE_Activation_Queue作为消息队列,那么也就没有有必要使用ACE_Task类了,因为ACE_Task使用的消息队列是ACE_Message_Queue。
class Proxy
 {
 public:
     ACE_Future<int> status_update(void)
     {
         ACE_DEBUG((LM_DEBUG, "Proxy::status_update.\n"));
         ACE_Future<int> result;
         this->scheduler_.enqueue(new StatusUpdate(this->controller_, result));
         return result;
     }
     void exit(void)
     {
          ACE_DEBUG((LM_DEBUG, "Proxy::exit.\n"));
          this->scheduler_.enqueue(new ExitMethod);
     }
     private:
         Scheduler scheduler_;
         Servant controller_;
 };
int ACE_TMAIN(int, ACE_TCHAR* [])
 {
     Proxy controller;
     ACE_Future<int> results[10];
 //客户通过Proxy接口发起累加方法请求
     for (int i = 0; i < 10; i++)
     results[i] = controller.status_update();
     ACE_OS::sleep(5);
 //客户获取运行结果
     for (int j = 0; j < 10; j++)
     {
          int result = 0;
          results[j].get(result);
          ACE_DEBUG((LM_DEBUG, "[%D(%t)] result %d\n", result));
     }
     controller.exit();
     ACE_Thread_Manager::instance()->wait();
     return 0;
 }  
运行结果如下:

 



















