RtORBのイベントループ
An Open Software Platform for Robotic Technologies
Home RtORB
RtORBの開発 >> 開発メモ >> RtORBの開発メモ >> RtORBのイベントループ

CORBAイベントループ

CORBAイベントループは、COBRAにおけるServantのサービスにおけるメッセージの送受信イベントをハンドリングするための無限ループである。RtORBでは、基本的にThreadを用いないため、selectシステムコールを用いて、他のプロセスからのメッセージ入力を一定時間(最大待ち時間は一秒)待ち、その後、すべてのリクエストに対する応答処理(PortableServer_execute_request関数の実行)を行うようになっている。

CORBAイベントループの流れ

CORBAイベントループは、COBRA_ORB_run関数を実行することで、起動される。この関数では、
 POA_main_loop( (PortableServer_POA)PtrArray_get(orb->_adapters, 0) );
を実行しているのみである。POA_main_loop関数へは、orbの一番目のアダプタすなわちRootPOAが渡されることになる。POA_main_loop関数では、
  server_loop(1000.0, PortableServer_execute_request, rootPOA);
を実行している。一番目の引数は、selectシステムコールのインターバルタイムであり、単位はmsecである。二番目の引数は、selectシステムコール後、socketの受信処理後に呼び出す関数であり、前述のように関数呼び出しの実体である。三番目の引数は、二番目のPortableServer_execute_request関数への引数となる。

server_loop関数

server_loop関数は、server_loop.cに下記のように記述されている。
 void server_loop(float time_out_float, void (*idle)(void*), void *arg)
 {
   struct timeval time_out;
 
   init_socket_servers();
 
 #ifdef USE_SHMC
   init_shmc_servers();
 #endif
 
   time_out.tv_sec = (unsigned long)(time_out_float / 1000);
   time_out.tv_usec = (time_out_float - time_out.tv_sec * 1000) * 1000 ;
 #if DEBUG
   fprintf(stderr, "timeout = %d, %d\n", (int)time_out.tv_sec, (int)time_out.tv_usec);
 #endif
 
  while (1) {
 #if _CYCLIC_TASK
      gettimeofday(&time1, &tz); 
 #endif
 
 #ifdef USE_SHMC
       select_shmc_servers(time_out);
 #else
       select_socket_servers(time_out);
 #endif
 
       if (idle) (*idle)(arg); 
 #if _CYCLIC_TASK
      
       gettimeofday(&time2, &tz); 
 
       time_cost = (time2.tv_sec - time1.tv_sec) * 1000000 + time2.tv_usec
 	      - time1.tv_usec - time_out_float * 1000 ;
 
       if (time_cost > 0) usleep(time_cost);
       
 #endif 
   }
 }
本来、この関数は、無限ループにすべきではない。それは、CORBA_ORB_shutdown関数コールによって、無限ループから抜けるようにすべきである。そのためには、while(1) でななくて global_loop 等のグローバル変数を使って、while(global_loop) と書き換えるべきである。
<
では、この関数をみてみよう。この関数が実行されえると init_socket_servers関数が実行される。これは、GLOBAL変数で定義されている SocketProfile[FD_SETSIZE]からサーバーとして動作するファイルディスクリプタ(Socket fd)を抽出し、selectシステムコールの入力待ち fd_set を作成している。このfd_setは、main_socket_bitsというGLOBAL変数で定義されている。
次に、whileループの中では、select_socket_servers関数で外部からのMessage待ちを行い、タイムアウトまたはMessage到着後に、第二引数で渡された関数を三引数をその引数として呼出を行っている。
select_socket_servers関数は、select_socket_ports関数を呼び出しているだけであり、このselect_socket_ports関数は、以下の処理を行う。
  • selectシステムコールでmain_socket_bitsに設定されたsocketの受信待ちを行う。タイムアウトであれば、特に何もしない。
  • 受信すべきsocketポートが発見された場合には、そのすべてに対して以下の処理を行う。
    • 受信socket(active_portとする)がSOCK_SERVERであった場合。
      すなわちSockProfile[active_port].type == SOCK_SERVER であった場合は、新たなリクエストであるので、accept_connection関数を呼び出して、通信用のsocket(newfdとする) を作成する。
      そしてSockProfile[newfd].type = SOCK_SERVICE に設定し、さらにSockProfile[socket_fd].connection_procに関数が設定されていれば、その関数を呼び出す。RtORBでは、この関数は設定されていないはずである。
    • 受信socketがSOCK_SERVICEまたはSOCK_CLIENTの場合に(実際には、SOCK_CLIENTではありえない)、SockProfile[active_port].command_procに設定された関数を実行する。
      RtORBでは、PortableServer_enqueue_request関数になっているハズである。この関数からの返値が -1 だった場合には、通信エラーの発生を意味するので、SockProfile[active_port].disconnect_procで設定された関数(RtORBでは、NULLのハズなので実行しない)を実行し、このsocketを閉し、SockProfile[active_port].type = SOCK_CLOSED とする。
上記のselect_socket_ports関数の処理が終われば、PortableServer_execution_request関数を実行し、リクエストに応じた関数呼出を行う。

PortableServer_execution_request関数の動作

この関数は、RtORBのCORBAイベントループでメッセージ受信後の処理を呼び出す。すなわち、この関数をコールすることで、リモートオブジェクトのメッソドが実行される。この関数は、poa.cで、下記のように記述されている。
 void PortableServer_execute_request(void *arg){
    int i;
   PortableServer_POA poa = (PortableServer_POA)arg;
 
   poa->requests = (PtrList *)GIOP_execute_request(poa, poa->requests);
 
   if(poa->children){
    for(i=0;i<poa->children->length;i++){
       PortableServer_POA cpoa = (PortableServer_POA)PtrArray_get(poa->children, i);
       PortableServer_execute_request(cpoa);
     }
   } 
 }
この関数は、引数としてPOAオブジェクトととり、poa->requestsに格納されたリクエスト(このリクエストは、GIOP_enqueue_request関数内で生成されている)のそれぞれにに対して、GIOP_execute_request(poa, poa->requests)関数を実行することで、実際のリモート呼出を行っている。さらに、poaの子poaに対してもこの関数を再帰呼び出しすることで、argで引渡されたpoa以下のすべてのpoaに対するリクエストの処理を実現している。POA_main_loopでは、この引数としてRootPOAを渡しており、これによりORBの内のすべてのリクエストに対する処理を行っている。

GIOP_execute_request関数の動作

GIOP_execute_request関数で実際の関数呼出を行っている。この関数は、各POAごとに格納されたリクエストに対する処理を行う。この関数は、giop.cの中で下記のように記述されている。
 PtrList *GIOP_execute_request(PortableServer_POA poa, PtrList *lst){
   GIOP_ConnectionHandler *h;
   int version ;
   int byte_order = 0 ;
   int fragment = -1 ;
   char *buf;
   GIOP_Request_Item request;
   PtrList *current_request, *retval;
   GIOP_MessageHeader *header;
   CORBA_Sequence_Octet *body;
   CORBA_Environment env;
 
   CORBA_MUTEX_LOCK(); 
   body = (CORBA_Sequence_Octet *)new_CORBA_Sequence_Octet2(0);
   current_request = lst;
   CORBA_MUTEX_UNLOCK();
 
   while(current_request){
     CORBA_MUTEX_LOCK();
     request = (GIOP_Request_Item)current_request->item;
     header = (GIOP_MessageHeader *)request->buf;
     buf = request->buf;
     h = request->connh;
     poa = (PortableServer_POA) GIOP_ConnectionHandler_get_arg(h); 
     version = header->version.minor;
     if((header->flags & 0x01) == 0 ) header->message_size=ntohl(header->message_size);
     else byte_order = 1;
     if(header->version.minor > 0) fragment = header->flags & 0x02;
     memset(&env, 0, sizeof(CORBA_Environment));
     CORBA_MUTEX_UNLOCK();
 
     switch(header->message_type){
       case GIOP_Request: /// Sample Implementation only
          CORBA_MUTEX_LOCK();
          GIOP_Request_perform(h, (octet *)buf, poa, header, body, &env);
 	 CORBA_MUTEX_UNLOCK();
          break;
 
       case GIOP_Reply:  // This is for a client, not implemented
 	 CORBA_MUTEX_LOCK();
          GIOP_Reply_perform(h, (octet *)buf, poa, header, body, &env);
 	 CORBA_MUTEX_UNLOCK();
          break;
 
       case GIOP_CancelRequest:
 	 CORBA_MUTEX_LOCK();
          GIOP_CancelRequest_perform(h, (octet *)buf, poa, header, body, &env);
 	 CORBA_MUTEX_UNLOCK();
         break;
 
       case GIOP_LocateRequest: 
 	 CORBA_MUTEX_LOCK();
 	 GIOP_LocationRequest_perform(h, (octet *)buf, poa, header, body, &env);
 	 CORBA_MUTEX_UNLOCK();
          break;
 
       case GIOP_LocateReply:  // This is for a client, not complete
 	 CORBA_MUTEX_LOCK();
 	 GIOP_LocationReply_perform(h, (octet *)buf, poa, header, body, &env);
 	 CORBA_MUTEX_UNLOCK();
          break;
 
       case GIOP_CloseConnection:
 # ifndef USE_SHMC
  	 close(h->sock);
          clear_socket_profile(h->sock, 0);
 #endif
 	 break;
       case GIOP_MessageError: 
 	 fprintf(stderr, "Message Error occured\n");
          break;
       case GIOP_Fragment: // Not implemented
 	 fprintf(stderr, "FragmentMessage have not implemented.....\n");
          break;
       default:
 	 fprintf(stderr, "Invalid message type (%d)\n", header->message_type);
          break;
     } 
      CORBA_MUTEX_LOCK();
      current_request->released = 1;  
      current_request = current_request->next;
      CORBA_MUTEX_UNLOCK();
   }
   CORBA_MUTEX_LOCK();
   delete_CORBA_Sequence_Octet2(body, 1);
   retval =  (PtrList *)PtrList_remove_released_items(lst); 
   CORBA_MUTEX_UNLOCK();
 
   return retval;
 }
この実装では、MUTEX_LOCKが度々呼ばれているが、Threadに対して正式に対応していないので、意味がない。この関数では、GIOPのメセージにしたがった動作を規定している。この動作は、呼出側、実装側の区別なく記述されており、GIOPのメッセージ処理の本体になる。

GIOPメッセージについて
RtORBの開発メモ
NamingServiceについて