sample/parent_insend/main.cpp

00001 /*
00002   Демонстрация insend-событий и взаимоотношений коопераций.
00003 
00004   Пример работает в интерактивном режиме. Оператор указывает
00005   момент регистрации и дерегистрации кооперации маршрутизатора.
00006   Агент-маршрутизатор в своем событии evt_start создает
00007   дочернюю кооперацию с агентом серверного сокета.
00008 
00009   Далее агент-маршрутизатор отслеживает сообщения о подключениях
00010   новых клиентов. Для каждого нового клиента создается агент-обработчик.
00011   Все приходящие от клиента данные пересылаются маршрутизатором
00012   агентам-обработчикам.
00013 
00014   Агент серверного сокета и агенты-обработчики являются активными
00015   агентами. Агент-маршрутизатор является пассивным агентом, но
00016   он использует insend-события.
00017 */
00018 
00019 #include <iostream>
00020 #include <map>
00021 
00022 #include <cpp_util_2/h/lexcast.hpp>
00023 
00024 #include <threads_1/h/threads.hpp>
00025 
00026 #include <so_4/api/h/api.hpp>
00027 #include <so_4/rt/h/rt.hpp>
00028 
00029 #include <so_4/rt/comm/h/a_raw_srv_channel.hpp>
00030 #include <so_4/socket/channels/h/channels.hpp>
00031 
00032 #include <so_4/timer_thread/simple/h/pub.hpp>
00033 #include <so_4/disp/active_obj/h/pub.hpp>
00034 
00035 // Имя агента-маршрутизатора и его кооперации.
00036 const std::string router_agent_name( "a_router" );
00037 // Имя агента-серверного сокета.
00038 const std::string server_agent_name( "a_server" );
00039 
00040 // Класс агента, который обслуживает конкретный канал.
00041 class a_listener_t
00042 : public so_4::rt::agent_t
00043 {
00044   typedef so_4::rt::agent_t base_type_t;
00045   public :
00046     a_listener_t( const std::string & agent_name )
00047     : base_type_t( agent_name )
00048     {
00049       so_4::disp::active_obj::make_active( *this );
00050 
00051       std::cout << so_query_name() << " created" << std::endl;
00052     }
00053     virtual ~a_listener_t()
00054     {
00055       std::cout << so_query_name() << " destroyed" << std::endl;
00056     }
00057 
00058     // Владеет сообщением msg_data, тип которого уже определен.
00059     typedef so_4::rt::comm::msg_raw_package msg_data;
00060 
00061     virtual const char *
00062     so_query_type() const;
00063 
00064     void
00065     so_on_subscription()
00066     {
00067       so_subscribe( "evt_data", "msg_data" );
00068     }
00069 
00070     void
00071     evt_data( const msg_data & cmd )
00072     {
00073       std::cout << "agent: " << so_query_name()
00074         << ", channel: " << cmd.m_channel
00075         << ", received: " << cmd.m_package.size()
00076         << " byte(s)" << std::endl;
00077 
00078       cmd.unblock_channel();
00079     }
00080 };
00081 
00082 SOL4_CLASS_START( a_listener_t )
00083 
00084   SOL4_MSG_START( msg_data, a_listener_t::msg_data )
00085     SOL4_MSG_CHECKER( a_listener_t::msg_data::check )
00086   SOL4_MSG_FINISH()
00087 
00088   SOL4_EVENT_STC( evt_data, a_listener_t::msg_data )
00089 
00090   SOL4_STATE_START( st_normal )
00091     SOL4_STATE_EVENT( evt_data )
00092   SOL4_STATE_FINISH()
00093 
00094 SOL4_CLASS_FINISH()
00095 
00096 // Класс агента-маршрутизатора.
00097 class a_router_t
00098 : public so_4::rt::agent_t
00099 {
00100   typedef so_4::rt::agent_t base_type_t;
00101   private :
00102     // Кооперация с агентом серверного сокета является
00103     // атрибутом агента-маршрутизатора. При этом кооперация
00104     // регистрируется как статическая, а не динамическая. Т.е. она не
00105     // будет уничтожена через delete при дерегистрации.
00106     so_4::rt::comm::a_raw_srv_channel_t m_srv_channel;
00107     so_4::rt::agent_coop_t  m_srv_channel_coop;
00108 
00109     // Тип карты подключенных клиентов.
00110     typedef std::map< so_4::rt::comm_channel_t, std::string >
00111       client_map_t;
00112 
00113     // Карта подключенных клиентов.
00114     // Доступ к ней не синхронизируется, т.к. маршрутизатор работает
00115     // только на контексте нити агента серверного сокета.
00116     client_map_t  m_clients;
00117 
00118     // Счетчик для создания уникальных имен дочерних агентов.
00119     int m_child_counter;
00120 
00121     // Вспомогательный метод для упрощения hook-а подписки
00122     // на сообщения агента серверного сокета.
00123     void
00124     setup_subscr_hook(
00125       const std::string & event,
00126       const std::string & msg )
00127     {
00128       so_4::rt::def_subscr_hook( m_srv_channel_coop,
00129         *this, event, m_srv_channel, msg, 0, &std::cerr,
00130         so_4::rt::evt_subscr_t::insend_dispatching );
00131     }
00132 
00133   public :
00134     a_router_t(
00135       const std::string & ip )
00136     : base_type_t( router_agent_name )
00137     , m_srv_channel( server_agent_name,
00138         so_4::socket::channels::create_server_channel( ip ) )
00139     , m_srv_channel_coop( m_srv_channel )
00140     , m_child_counter( 0 )
00141     {
00142       // Серверный агент должен быть активным агентом.
00143       so_4::disp::active_obj::make_active( m_srv_channel );
00144 
00145       std::cout << so_query_name() << " created" << std::endl;
00146     }
00147     virtual ~a_router_t()
00148     {
00149       std::cout << so_query_name() << " destroyed" << std::endl;
00150     }
00151 
00152     virtual const char *
00153     so_query_type() const;
00154 
00155     virtual void
00156     so_on_subscription()
00157     {
00158       so_subscribe( "evt_start",
00159         so_4::rt::sobjectizer_agent_name(), "msg_start" );
00160     }
00161 
00162     void
00163     evt_start()
00164     {
00165       // Регистрируем кооперацию с серверным сокетом.
00166       // И подписываемся на сообщения агента серверного сокета.
00167       setup_subscr_hook( "evt_srv_channel_success", "msg_success" );
00168       setup_subscr_hook( "evt_srv_channel_fail", "msg_fail" );
00169       setup_subscr_hook( "evt_client_connected",
00170         "msg_client_connected" );
00171       setup_subscr_hook( "evt_client_disconnected",
00172         "msg_client_disconnected" );
00173       setup_subscr_hook( "evt_channel_data",
00174         "msg_raw_package" );
00175 
00176       // Регистрируем кооперацию.
00177       // Мы должны быть родителями для кооперации.
00178       m_srv_channel_coop.set_parent_coop_name(
00179         so_query_coop()->query_name() );
00180       so_4::api::register_coop( m_srv_channel_coop );
00181     }
00182 
00183     void
00184     evt_srv_channel_success()
00185     {
00186       std::cout << "server channel created" << std::endl;
00187     }
00188 
00189     void
00190     evt_srv_channel_fail(
00191       const so_4::rt::comm::a_srv_channel_base_t::msg_fail & cmd )
00192     {
00193       std::cout << "server channel not created: "
00194         << cmd.m_ret_code << std::endl;
00195     }
00196 
00197     void
00198     evt_client_connected(
00199       const so_4::rt::comm::msg_client_connected & cmd )
00200     {
00201       std::cout << "client connected: "
00202         << cmd.m_channel << std::endl;
00203 
00204       // Создаем для этого канала новую кооперацию.
00205       std::string agent_name( "a_listener_" +
00206         cpp_util_2::slexcast( ++m_child_counter ) );
00207       so_4::rt::dyn_agent_coop_t * coop(
00208         new so_4::rt::dyn_agent_coop_t(
00209           new a_listener_t( agent_name ) ) );
00210       // Указываем, что мы являемся родителями этой кооперации.
00211       coop->set_parent_coop_name(
00212         so_query_coop()->query_name() );
00213 
00214       // Регистрируем новую кооперацию.
00215       so_4::rt::dyn_agent_coop_helper_t h( coop );
00216       if( !h.result() )
00217       {
00218         m_clients[ cmd.m_channel ] = agent_name;
00219       }
00220     }
00221 
00222     void
00223     evt_client_disconnected(
00224       const so_4::rt::comm::msg_client_disconnected & cmd )
00225     {
00226       std::cout << "client disconnected: "
00227         << cmd.m_channel << std::endl;
00228 
00229       client_map_t::iterator it( m_clients.find( cmd.m_channel ) );
00230       if( it != m_clients.end() )
00231       {
00232         // Такой клиент нам известен. Нужно уничтожить прикладного
00233         // агента для этого клиента.
00234         so_4::api::deregister_coop( it->second );
00235         m_clients.erase( it );
00236       }
00237     }
00238 
00239     void
00240     evt_channel_data(
00241       const so_4::rt::comm::msg_raw_package & cmd )
00242     {
00243       // Определяем, какому агенту нужно переслать эти данные.
00244       client_map_t::iterator it( m_clients.find( cmd.m_channel ) );
00245       if( it != m_clients.end() )
00246       {
00247         so_4::api::send_msg_safely( it->second, "msg_data",
00248           new a_listener_t::msg_data( cmd ) );
00249       }
00250     }
00251 };
00252 
00253 SOL4_CLASS_START( a_router_t )
00254 
00255   SOL4_EVENT( evt_start )
00256   SOL4_EVENT( evt_srv_channel_success )
00257   SOL4_EVENT_STC( evt_srv_channel_fail,
00258     so_4::rt::comm::a_srv_channel_base_t::msg_fail )
00259   SOL4_EVENT_STC( evt_client_connected,
00260     so_4::rt::comm::msg_client_connected )
00261   SOL4_EVENT_STC( evt_client_disconnected,
00262     so_4::rt::comm::msg_client_disconnected )
00263   SOL4_EVENT_STC( evt_channel_data,
00264     so_4::rt::comm::msg_raw_package )
00265 
00266   SOL4_STATE_START( st_normal )
00267     SOL4_STATE_EVENT( evt_start )
00268     SOL4_STATE_EVENT( evt_srv_channel_success )
00269     SOL4_STATE_EVENT( evt_srv_channel_fail )
00270     SOL4_STATE_EVENT( evt_client_connected )
00271     SOL4_STATE_EVENT( evt_client_disconnected )
00272     SOL4_STATE_EVENT( evt_channel_data )
00273   SOL4_STATE_FINISH()
00274 
00275 SOL4_CLASS_FINISH()
00276 
00277 //
00278 // Нить, на которой будет происходить запуск SObjectizer-а
00279 //
00280 class sobj_thread_t :
00281   public threads_1::thread_t
00282 {
00283   public :
00284     sobj_thread_t();
00285     virtual ~sobj_thread_t();
00286 
00287   protected :
00288     virtual void
00289     body();
00290 };
00291 
00292 sobj_thread_t::sobj_thread_t()
00293 {
00294 }
00295 
00296 sobj_thread_t::~sobj_thread_t()
00297 {
00298 }
00299 
00300 void
00301 sobj_thread_t::body()
00302 {
00303   so_4::ret_code_t rc = so_4::api::start(
00304     // Диспетчер будет уничтожен при выходе из start().
00305     so_4::disp::active_obj::create_disp(
00306       // Таймер будет уничтожен диспетчером.
00307       so_4::timer_thread::simple::create_timer_thread(),
00308       so_4::auto_destroy_timer ),
00309     so_4::auto_destroy_disp, 0 );
00310   if( rc )
00311   {
00312     std::cerr << "start: " << rc << std::endl;
00313   }
00314 }
00315 
00316 void
00317 create_coop( const std::string & ip )
00318 {
00319   so_4::rt::dyn_agent_coop_helper_t helper(
00320     new so_4::rt::dyn_agent_coop_t(
00321       new a_router_t( ip ) ) );
00322 }
00323 
00324 void
00325 destroy_coop()
00326 {
00327   so_4::api::deregister_coop( router_agent_name );
00328 }
00329 
00330 int
00331 main( int argc, char ** argv )
00332 {
00333   if( 2 == argc )
00334   {
00335     sobj_thread_t thread;
00336     thread.start();
00337 
00338     // Засыпаем, чтобы дать стартовать SObjectizer.
00339     // Это самый простой способ синхронизации с sobj_thread_t.
00340     threads_1::sleep_thread( 1000 );
00341 
00342     bool is_continue = true;
00343     while( is_continue )
00344     {
00345       std::string choice;
00346 
00347       std::cout << "Choose action:\n"
00348         "\t0 - quit\n"
00349         "\t1 - create router coop\n"
00350         "\t2 - destroy router coop\n> "
00351         << std::flush;
00352 
00353       std::cin >> choice;
00354 
00355       if( choice == "0" )
00356       {
00357         // Завершаем работу.
00358         so_4::api::send_msg(
00359           so_4::rt::sobjectizer_agent_name(),
00360           "msg_normal_shutdown", 0,
00361           so_4::rt::sobjectizer_agent_name() );
00362         is_continue = false;
00363       }
00364       else if( choice == "1" )
00365         // Создаем кооперацию клиента.
00366         create_coop( argv[ 1 ] );
00367       else if( choice == "2" )
00368         // Уничтожаем кооперацию клиента.
00369         destroy_coop();
00370     }
00371 
00372     // Ожидаем завершения SObjectizer.
00373     thread.wait();
00374 
00375     return 0;
00376   }
00377   else
00378   {
00379     std::cerr << "sample_parent_insend <server_sock_addr>"
00380       << std::endl;
00381 
00382     return -1;
00383   }
00384 }

Документация по SObjectizer. Последние изменения: Thu Jan 12 10:52:50 2006. Создано системой  doxygen 1.4.6-NO
Hosted by uCoz