sample/high_traffic/client.cpp

00001 /*
00002   Тестирование поведения коммуникационных агентов при
00003   большом трафике.
00004 
00005   Клиентская часть.
00006 */
00007 
00008 #include <iostream>
00009 #include <set>
00010 
00011 #include <stdio.h>
00012 
00013 #include <so_4/api/h/api.hpp>
00014 #include <so_4/rt/h/rt.hpp>
00015 
00016 #include <so_4/rt/comm/h/a_cln_channel.hpp>
00017 #include <so_4/socket/channels/h/channels.hpp>
00018 
00019 #include <so_4/timer_thread/simple/h/pub.hpp>
00020 #include <so_4/disp/active_obj/h/pub.hpp>
00021 
00022 #include "common.cpp"
00023 
00024 // Класс тестового агента, который отсылает сообщения.
00025 class a_sender_t
00026   : public so_4::rt::agent_t
00027 {
00028   typedef so_4::rt::agent_t base_type_t;
00029   private :
00030     // Размер одного запроса.
00031     unsigned int  m_data_size;
00032 
00033     // Количество запросов, которые нужно отослать на сервер.
00034     unsigned int  m_request_count;
00035     // Количество полученных от сервера ответов.
00036     unsigned int  m_reply_received;
00037 
00038     // Количество запросов в группе.
00039     unsigned int  m_group_size;
00040     // Тайм-аут между отсылкой групп.
00041     unsigned int  m_timeout;
00042 
00043     // Счетчик идентификаторов для отсылаемых запросов.
00044     unsigned int  m_uid;
00045 
00046     // Множество идентификаторов отосланных запросов,
00047     // на которые еще не были полученны ответы.
00048     std::set< unsigned int >  m_no_reply_uids;
00049 
00050     // Признак того, что есть соединение с сервером.
00051     bool  m_is_connected;
00052 
00053     void
00054     show_stat() const
00055     {
00056       double percents = double( m_reply_received ) /
00057         double( m_request_count ) * 100.0;
00058       std::cout << "*** "
00059         << ( m_is_connected ? "connected" : "not connected" )
00060         << ", " << percents << "% ("
00061         << m_no_reply_uids.size() << ")\r"
00062         << std::flush;
00063     }
00064 
00065     void
00066     send( unsigned int uid,
00067       bool insert_to_no_reply_uids = true )
00068     {
00069       so_4::api::send_msg_safely(
00070         a_common_t::agent_name(),
00071         "msg_request",
00072         new a_common_t::msg_request( uid, m_data_size ) );
00073       if( insert_to_no_reply_uids )
00074         m_no_reply_uids.insert( uid );
00075     }
00076 
00077   public :
00078     a_sender_t(
00079       unsigned int data_size,
00080       unsigned int request_count,
00081       unsigned int group_size,
00082       unsigned int timeout )
00083     : base_type_t( "a_receiver" )
00084     , m_data_size( data_size )
00085     , m_request_count( request_count )
00086     , m_reply_received( 0 )
00087     , m_group_size( group_size )
00088     , m_timeout( timeout )
00089     , m_uid( 0 )
00090     , m_is_connected( false )
00091     {
00092     }
00093     virtual ~a_sender_t()
00094     {}
00095 
00096     struct  msg_timeout {};
00097 
00098     virtual const char *
00099     so_query_type() const;
00100 
00101     virtual void
00102     so_on_subscription()
00103     {
00104       // Агент a_common должен быть глобальным.
00105       so_4::api::make_global_agent(
00106         a_common_t::agent_name(),
00107         a_common_t::agent_type() );
00108 
00109       so_subscribe( "evt_start",
00110         so_4::rt::sobjectizer_agent_name(), "msg_start" );
00111 
00112       so_subscribe( "evt_client_connected",
00113         so_4::rt::comm::communicator_agent_name(),
00114         "msg_client_connected" );
00115 
00116       so_subscribe( "evt_client_disconnected",
00117         so_4::rt::comm::communicator_agent_name(),
00118         "msg_client_disconnected" );
00119 
00120       so_subscribe( "evt_reply",
00121         a_common_t::agent_name(),
00122         "msg_reply" );
00123 
00124       so_subscribe( "evt_timeout", "msg_timeout" );
00125     }
00126 
00127     void
00128     evt_start(
00129       const so_4::rt::event_data_t & )
00130     {
00131       so_4::api::send_msg( so_query_name(), "msg_timeout", 0,
00132         so_query_name(), 1000, 1000 * m_timeout );
00133     }
00134 
00135     void
00136     evt_client_connected(
00137       const so_4::rt::event_data_t & data,
00138       const so_4::rt::comm::msg_client_connected * cmd )
00139     {
00140       m_is_connected = true;
00141       show_stat();
00142     }
00143 
00144     void
00145     evt_client_disconnected(
00146       const so_4::rt::event_data_t & data,
00147       const so_4::rt::comm::msg_client_disconnected * cmd )
00148     {
00149       m_is_connected = false;
00150       show_stat();
00151     }
00152 
00153     void
00154     evt_reply(
00155       const so_4::rt::event_data_t &,
00156       const a_common_t::msg_reply * cmd )
00157     {
00158       if( m_no_reply_uids.find( cmd->m_uid ) !=
00159         m_no_reply_uids.end() )
00160       {
00161         m_no_reply_uids.erase( cmd->m_uid );
00162         ++m_reply_received;
00163         show_stat();
00164 
00165         if( m_reply_received == m_request_count )
00166         {
00167           so_4::api::send_msg(
00168             so_4::rt::sobjectizer_agent_name(),
00169             "msg_normal_shutdown", 0 );
00170         }
00171       }
00172     }
00173 
00174     void
00175     evt_timeout(
00176       const so_4::rt::event_data_t & )
00177     {
00178       unsigned int sent = 0;
00179 
00180       // Сначала отсылаем повторно те запросы, на которые не получено
00181       // ответов.
00182       for( std::set< unsigned int >::iterator
00183           it = m_no_reply_uids.begin(),
00184           it_end = m_no_reply_uids.end();
00185         it != it_end && sent != m_group_size;
00186         ++it,
00187         ++sent )
00188       {
00189         send( *it, false );
00190       }
00191 
00192       // Затем доведем группу до нужного размера новыми запросами.
00193       for( unsigned int count = 0;
00194         sent + count != m_group_size &&
00195         m_uid != m_request_count;
00196         ++count, ++m_uid )
00197       {
00198         send( m_uid );
00199       }
00200     }
00201 };
00202 
00203 SOL4_CLASS_START( a_sender_t )
00204 
00205   SOL4_MSG_START( msg_timeout, a_sender_t::msg_timeout )
00206   SOL4_MSG_FINISH()
00207 
00208   SOL4_EVENT( evt_start )
00209   SOL4_EVENT_WITH_INCIDENT_TYPE(
00210     evt_client_connected,
00211     so_4::rt::comm::msg_client_connected )
00212   SOL4_EVENT_WITH_INCIDENT_TYPE(
00213     evt_client_disconnected,
00214     so_4::rt::comm::msg_client_disconnected )
00215   SOL4_EVENT_WITH_INCIDENT_TYPE(
00216     evt_reply,
00217     a_common_t::msg_reply )
00218   SOL4_EVENT( evt_timeout )
00219 
00220   SOL4_STATE_START( st_normal )
00221     SOL4_STATE_EVENT( evt_start )
00222     SOL4_STATE_EVENT( evt_client_connected )
00223     SOL4_STATE_EVENT( evt_client_disconnected )
00224     SOL4_STATE_EVENT( evt_reply )
00225     SOL4_STATE_EVENT( evt_timeout )
00226   SOL4_STATE_FINISH()
00227 
00228 SOL4_CLASS_FINISH()
00229 
00230 int
00231 main( int argc, char ** argv )
00232 {
00233   if( 6 == argc )
00234   {
00235     unsigned int data_size = 0;
00236     sscanf( argv[ 2 ], "%u", &data_size );
00237 
00238     unsigned int count = 0;
00239     sscanf( argv[ 3 ], "%u", &count );
00240 
00241     unsigned int group_size = 0;
00242     sscanf( argv[ 4 ], "%u", &group_size );
00243 
00244     unsigned int timeout = 0;
00245     sscanf( argv[ 5 ], "%u", &timeout );
00246 
00247 
00248     so_4::sop::std_filter_t * filter =
00249       so_4::sop::create_std_filter();
00250     filter->insert( a_common_t::agent_name() );
00251 
00252     so_4::rt::comm::a_cln_channel_t a_channel(
00253       "a_channel",
00254       so_4::socket::channels::create_client_factory( argv[ 1 ] ),
00255       filter,
00256       so_4::rt::comm::a_cln_channel_t::
00257         create_def_disconnect_handler( 5000, 0 ) );
00258     a_channel.so_add_traits(
00259       so_4::disp::active_obj::query_active_obj_traits() );
00260     std::cout << "Client threshold: in " << a_channel.in_threshold()
00261       << ", out " << a_channel.out_threshold()
00262       << std::endl;
00263 
00264     a_sender_t a_sender(
00265       data_size, count, group_size, timeout );
00266 
00267     so_4::rt::agent_t * agents[] =
00268     {
00269       &a_channel, &a_sender
00270     };
00271     so_4::rt::agent_coop_t coop( "server",
00272       agents, sizeof( agents ) / sizeof( agents[ 0 ] ) );
00273 
00274     so_4::ret_code_t rc = so_4::api::start(
00275       // Диспетчер будет уничтожен при выходе из start().
00276       so_4::disp::active_obj::create_disp(
00277         // Таймер будет уничтожен диспетчером.
00278         so_4::timer_thread::simple::create_timer_thread(),
00279         so_4::auto_destroy_timer ),
00280       so_4::auto_destroy_disp,
00281       &coop );
00282 
00283     if( rc )
00284     {
00285       std::cerr << "start: " << rc << std::endl;
00286     }
00287 
00288     return int( rc );
00289   }
00290 
00291   std::cerr << "sample_high_traffic_client <[ip]:port> "
00292     "data_size count group_size timeout" << std::endl;
00293   return 1;
00294 }

Документация по SObjectizer. Последние изменения: Thu Jan 12 10:52:50 2006. Создано системой  doxygen 1.4.6-NO
Hosted by uCoz