CORBAイベントループ
CORBAイベントループは、COBRAにおけるServantのサービスにおけるメッセージの送受信イベントをハンドリングするための無限ループである。RtORBでは、基本的にThreadを用いないため、selectシステムコールを用いて、他のプロセスからのメッセージ入力を一定時間(最大待ち時間は一秒)待ち、その後、すべてのリクエストに対する応答処理(PortableServer_execute_request関数の実行)を行うようになっている。
CORBAイベントループの流れ
CORBAイベントループは、COBRA_ORB_run関数を実行することで、起動される。この関数では、
POA_main_loop( (PortableServer_POA)PtrArray_get(orb->_adapters, 0) );
を実行しているのみである。POA_main_loop関数へは、orbの一番目のアダプタすなわちRootPOAが渡されることになる。POA_main_loop関数では、
server_loop(1000.0, PortableServer_execute_request, rootPOA);
を実行している。一番目の引数は、selectシステムコールのインターバルタイムであり、単位はmsecである。二番目の引数は、selectシステムコール後、socketの受信処理後に呼び出す関数であり、前述のように関数呼び出しの実体である。三番目の引数は、二番目のPortableServer_execute_request関数への引数となる。
server_loop関数
server_loop関数は、server_loop.cに下記のように記述されている。
void server_loop(float time_out_float, void (*idle)(void*), void *arg)
{
struct timeval time_out;
init_socket_servers();
#ifdef USE_SHMC
init_shmc_servers();
#endif
time_out.tv_sec = (unsigned long)(time_out_float / 1000);
time_out.tv_usec = (time_out_float - time_out.tv_sec * 1000) * 1000 ;
#if DEBUG
fprintf(stderr, "timeout = %d, %d\n", (int)time_out.tv_sec, (int)time_out.tv_usec);
#endif
while (1) {
#if _CYCLIC_TASK
gettimeofday(&time1, &tz);
#endif
#ifdef USE_SHMC
select_shmc_servers(time_out);
#else
select_socket_servers(time_out);
#endif
if (idle) (*idle)(arg);
#if _CYCLIC_TASK
gettimeofday(&time2, &tz);
time_cost = (time2.tv_sec - time1.tv_sec) * 1000000 + time2.tv_usec
- time1.tv_usec - time_out_float * 1000 ;
if (time_cost > 0) usleep(time_cost);
#endif
}
}
本来、この関数は、無限ループにすべきではない。それは、CORBA_ORB_shutdown関数コールによって、無限ループから抜けるようにすべきである。そのためには、while(1) でななくて global_loop 等のグローバル変数を使って、while(global_loop) と書き換えるべきである。<
では、この関数をみてみよう。この関数が実行されえると init_socket_servers関数が実行される。これは、GLOBAL変数で定義されている SocketProfile[FD_SETSIZE]からサーバーとして動作するファイルディスクリプタ(Socket fd)を抽出し、selectシステムコールの入力待ち fd_set を作成している。このfd_setは、main_socket_bitsというGLOBAL変数で定義されている。
次に、whileループの中では、select_socket_servers関数で外部からのMessage待ちを行い、タイムアウトまたはMessage到着後に、第二引数で渡された関数を三引数をその引数として呼出を行っている。
select_socket_servers関数は、select_socket_ports関数を呼び出しているだけであり、このselect_socket_ports関数は、以下の処理を行う。
- selectシステムコールでmain_socket_bitsに設定されたsocketの受信待ちを行う。タイムアウトであれば、特に何もしない。
- 受信すべきsocketポートが発見された場合には、そのすべてに対して以下の処理を行う。
- 受信socket(active_portとする)がSOCK_SERVERであった場合。すなわちSockProfile[active_port].type == SOCK_SERVER であった場合は、新たなリクエストであるので、accept_connection関数を呼び出して、通信用のsocket(newfdとする) を作成する。そしてSockProfile[newfd].type = SOCK_SERVICE に設定し、さらにSockProfile[socket_fd].connection_procに関数が設定されていれば、その関数を呼び出す。RtORBでは、この関数は設定されていないはずである。
- 受信socketがSOCK_SERVICEまたはSOCK_CLIENTの場合に(実際には、SOCK_CLIENTではありえない)、SockProfile[active_port].command_procに設定された関数を実行する。RtORBでは、PortableServer_enqueue_request関数になっているハズである。この関数からの返値が -1 だった場合には、通信エラーの発生を意味するので、SockProfile[active_port].disconnect_procで設定された関数(RtORBでは、NULLのハズなので実行しない)を実行し、このsocketを閉し、SockProfile[active_port].type = SOCK_CLOSED とする。
- 受信socket(active_portとする)がSOCK_SERVERであった場合。
上記のselect_socket_ports関数の処理が終われば、PortableServer_execution_request関数を実行し、リクエストに応じた関数呼出を行う。
PortableServer_execution_request関数の動作
この関数は、RtORBのCORBAイベントループでメッセージ受信後の処理を呼び出す。すなわち、この関数をコールすることで、リモートオブジェクトのメッソドが実行される。この関数は、poa.cで、下記のように記述されている。
void PortableServer_execute_request(void *arg){
int i;
PortableServer_POA poa = (PortableServer_POA)arg;
poa->requests = (PtrList *)GIOP_execute_request(poa, poa->requests);
if(poa->children){
for(i=0;i<poa->children->length;i++){
PortableServer_POA cpoa = (PortableServer_POA)PtrArray_get(poa->children, i);
PortableServer_execute_request(cpoa);
}
}
}
この関数は、引数としてPOAオブジェクトととり、poa->requestsに格納されたリクエスト(このリクエストは、GIOP_enqueue_request関数内で生成されている)のそれぞれにに対して、GIOP_execute_request(poa, poa->requests)関数を実行することで、実際のリモート呼出を行っている。さらに、poaの子poaに対してもこの関数を再帰呼び出しすることで、argで引渡されたpoa以下のすべてのpoaに対するリクエストの処理を実現している。POA_main_loopでは、この引数としてRootPOAを渡しており、これによりORBの内のすべてのリクエストに対する処理を行っている。
GIOP_execute_request関数の動作
GIOP_execute_request関数で実際の関数呼出を行っている。この関数は、各POAごとに格納されたリクエストに対する処理を行う。この関数は、giop.cの中で下記のように記述されている。
PtrList *GIOP_execute_request(PortableServer_POA poa, PtrList *lst){
GIOP_ConnectionHandler *h;
int version ;
int byte_order = 0 ;
int fragment = -1 ;
char *buf;
GIOP_Request_Item request;
PtrList *current_request, *retval;
GIOP_MessageHeader *header;
CORBA_Sequence_Octet *body;
CORBA_Environment env;
CORBA_MUTEX_LOCK();
body = (CORBA_Sequence_Octet *)new_CORBA_Sequence_Octet2(0);
current_request = lst;
CORBA_MUTEX_UNLOCK();
while(current_request){
CORBA_MUTEX_LOCK();
request = (GIOP_Request_Item)current_request->item;
header = (GIOP_MessageHeader *)request->buf;
buf = request->buf;
h = request->connh;
poa = (PortableServer_POA) GIOP_ConnectionHandler_get_arg(h);
version = header->version.minor;
if((header->flags & 0x01) == 0 ) header->message_size=ntohl(header->message_size);
else byte_order = 1;
if(header->version.minor > 0) fragment = header->flags & 0x02;
memset(&env, 0, sizeof(CORBA_Environment));
CORBA_MUTEX_UNLOCK();
switch(header->message_type){
case GIOP_Request: /// Sample Implementation only
CORBA_MUTEX_LOCK();
GIOP_Request_perform(h, (octet *)buf, poa, header, body, &env);
CORBA_MUTEX_UNLOCK();
break;
case GIOP_Reply: // This is for a client, not implemented
CORBA_MUTEX_LOCK();
GIOP_Reply_perform(h, (octet *)buf, poa, header, body, &env);
CORBA_MUTEX_UNLOCK();
break;
case GIOP_CancelRequest:
CORBA_MUTEX_LOCK();
GIOP_CancelRequest_perform(h, (octet *)buf, poa, header, body, &env);
CORBA_MUTEX_UNLOCK();
break;
case GIOP_LocateRequest:
CORBA_MUTEX_LOCK();
GIOP_LocationRequest_perform(h, (octet *)buf, poa, header, body, &env);
CORBA_MUTEX_UNLOCK();
break;
case GIOP_LocateReply: // This is for a client, not complete
CORBA_MUTEX_LOCK();
GIOP_LocationReply_perform(h, (octet *)buf, poa, header, body, &env);
CORBA_MUTEX_UNLOCK();
break;
case GIOP_CloseConnection:
# ifndef USE_SHMC
close(h->sock);
clear_socket_profile(h->sock, 0);
#endif
break;
case GIOP_MessageError:
fprintf(stderr, "Message Error occured\n");
break;
case GIOP_Fragment: // Not implemented
fprintf(stderr, "FragmentMessage have not implemented.....\n");
break;
default:
fprintf(stderr, "Invalid message type (%d)\n", header->message_type);
break;
}
CORBA_MUTEX_LOCK();
current_request->released = 1;
current_request = current_request->next;
CORBA_MUTEX_UNLOCK();
}
CORBA_MUTEX_LOCK();
delete_CORBA_Sequence_Octet2(body, 1);
retval = (PtrList *)PtrList_remove_released_items(lst);
CORBA_MUTEX_UNLOCK();
return retval;
}
この実装では、MUTEX_LOCKが度々呼ばれているが、Threadに対して正式に対応していないので、意味がない。この関数では、GIOPのメセージにしたがった動作を規定している。この動作は、呼出側、実装側の区別なく記述されており、GIOPのメッセージ処理の本体になる。




