00001
00002
00003
00004
00005
00006
00007
00008 #include <iostream>
00009 #include <set>
00010
00011 #include <stdio.h>
00012
00013 #include <so_4/api/h/api.hpp>
00014 #include <so_4/rt/h/rt.hpp>
00015
00016 #include <so_4/rt/comm/h/a_cln_channel.hpp>
00017 #include <so_4/socket/channels/h/channels.hpp>
00018
00019 #include <so_4/timer_thread/simple/h/pub.hpp>
00020 #include <so_4/disp/active_obj/h/pub.hpp>
00021
00022 #include "common.cpp"
00023
00024
00025 class a_sender_t
00026 : public so_4::rt::agent_t
00027 {
00028 typedef so_4::rt::agent_t base_type_t;
00029 private :
00030
00031 unsigned int m_data_size;
00032
00033
00034 unsigned int m_request_count;
00035
00036 unsigned int m_reply_received;
00037
00038
00039 unsigned int m_group_size;
00040
00041 unsigned int m_timeout;
00042
00043
00044 unsigned int m_uid;
00045
00046
00047
00048 std::set< unsigned int > m_no_reply_uids;
00049
00050
00051 bool m_is_connected;
00052
00053 void
00054 show_stat() const
00055 {
00056 double percents = double( m_reply_received ) /
00057 double( m_request_count ) * 100.0;
00058 std::cout << "*** "
00059 << ( m_is_connected ? "connected" : "not connected" )
00060 << ", " << percents << "% ("
00061 << m_no_reply_uids.size() << ")\r"
00062 << std::flush;
00063 }
00064
00065 void
00066 send( unsigned int uid,
00067 bool insert_to_no_reply_uids = true )
00068 {
00069 so_4::api::send_msg_safely(
00070 a_common_t::agent_name(),
00071 "msg_request",
00072 new a_common_t::msg_request( uid, m_data_size ) );
00073 if( insert_to_no_reply_uids )
00074 m_no_reply_uids.insert( uid );
00075 }
00076
00077 public :
00078 a_sender_t(
00079 unsigned int data_size,
00080 unsigned int request_count,
00081 unsigned int group_size,
00082 unsigned int timeout )
00083 : base_type_t( "a_receiver" )
00084 , m_data_size( data_size )
00085 , m_request_count( request_count )
00086 , m_reply_received( 0 )
00087 , m_group_size( group_size )
00088 , m_timeout( timeout )
00089 , m_uid( 0 )
00090 , m_is_connected( false )
00091 {
00092 }
00093 virtual ~a_sender_t()
00094 {}
00095
00096 struct msg_timeout {};
00097
00098 virtual const char *
00099 so_query_type() const;
00100
00101 virtual void
00102 so_on_subscription()
00103 {
00104
00105 so_4::api::make_global_agent(
00106 a_common_t::agent_name(),
00107 a_common_t::agent_type() );
00108
00109 so_subscribe( "evt_start",
00110 so_4::rt::sobjectizer_agent_name(), "msg_start" );
00111
00112 so_subscribe( "evt_client_connected",
00113 so_4::rt::comm::communicator_agent_name(),
00114 "msg_client_connected" );
00115
00116 so_subscribe( "evt_client_disconnected",
00117 so_4::rt::comm::communicator_agent_name(),
00118 "msg_client_disconnected" );
00119
00120 so_subscribe( "evt_reply",
00121 a_common_t::agent_name(),
00122 "msg_reply" );
00123
00124 so_subscribe( "evt_timeout", "msg_timeout" );
00125 }
00126
00127 void
00128 evt_start(
00129 const so_4::rt::event_data_t & )
00130 {
00131 so_4::api::send_msg( so_query_name(), "msg_timeout", 0,
00132 so_query_name(), 1000, 1000 * m_timeout );
00133 }
00134
00135 void
00136 evt_client_connected(
00137 const so_4::rt::event_data_t & data,
00138 const so_4::rt::comm::msg_client_connected * cmd )
00139 {
00140 m_is_connected = true;
00141 show_stat();
00142 }
00143
00144 void
00145 evt_client_disconnected(
00146 const so_4::rt::event_data_t & data,
00147 const so_4::rt::comm::msg_client_disconnected * cmd )
00148 {
00149 m_is_connected = false;
00150 show_stat();
00151 }
00152
00153 void
00154 evt_reply(
00155 const so_4::rt::event_data_t &,
00156 const a_common_t::msg_reply * cmd )
00157 {
00158 if( m_no_reply_uids.find( cmd->m_uid ) !=
00159 m_no_reply_uids.end() )
00160 {
00161 m_no_reply_uids.erase( cmd->m_uid );
00162 ++m_reply_received;
00163 show_stat();
00164
00165 if( m_reply_received == m_request_count )
00166 {
00167 so_4::api::send_msg(
00168 so_4::rt::sobjectizer_agent_name(),
00169 "msg_normal_shutdown", 0 );
00170 }
00171 }
00172 }
00173
00174 void
00175 evt_timeout(
00176 const so_4::rt::event_data_t & )
00177 {
00178 unsigned int sent = 0;
00179
00180
00181
00182 for( std::set< unsigned int >::iterator
00183 it = m_no_reply_uids.begin(),
00184 it_end = m_no_reply_uids.end();
00185 it != it_end && sent != m_group_size;
00186 ++it,
00187 ++sent )
00188 {
00189 send( *it, false );
00190 }
00191
00192
00193 for( unsigned int count = 0;
00194 sent + count != m_group_size &&
00195 m_uid != m_request_count;
00196 ++count, ++m_uid )
00197 {
00198 send( m_uid );
00199 }
00200 }
00201 };
00202
00203 SOL4_CLASS_START( a_sender_t )
00204
00205 SOL4_MSG_START( msg_timeout, a_sender_t::msg_timeout )
00206 SOL4_MSG_FINISH()
00207
00208 SOL4_EVENT( evt_start )
00209 SOL4_EVENT_WITH_INCIDENT_TYPE(
00210 evt_client_connected,
00211 so_4::rt::comm::msg_client_connected )
00212 SOL4_EVENT_WITH_INCIDENT_TYPE(
00213 evt_client_disconnected,
00214 so_4::rt::comm::msg_client_disconnected )
00215 SOL4_EVENT_WITH_INCIDENT_TYPE(
00216 evt_reply,
00217 a_common_t::msg_reply )
00218 SOL4_EVENT( evt_timeout )
00219
00220 SOL4_STATE_START( st_normal )
00221 SOL4_STATE_EVENT( evt_start )
00222 SOL4_STATE_EVENT( evt_client_connected )
00223 SOL4_STATE_EVENT( evt_client_disconnected )
00224 SOL4_STATE_EVENT( evt_reply )
00225 SOL4_STATE_EVENT( evt_timeout )
00226 SOL4_STATE_FINISH()
00227
00228 SOL4_CLASS_FINISH()
00229
00230 int
00231 main( int argc, char ** argv )
00232 {
00233 if( 6 == argc )
00234 {
00235 unsigned int data_size = 0;
00236 sscanf( argv[ 2 ], "%u", &data_size );
00237
00238 unsigned int count = 0;
00239 sscanf( argv[ 3 ], "%u", &count );
00240
00241 unsigned int group_size = 0;
00242 sscanf( argv[ 4 ], "%u", &group_size );
00243
00244 unsigned int timeout = 0;
00245 sscanf( argv[ 5 ], "%u", &timeout );
00246
00247
00248 so_4::sop::std_filter_t * filter =
00249 so_4::sop::create_std_filter();
00250 filter->insert( a_common_t::agent_name() );
00251
00252 so_4::rt::comm::a_cln_channel_t a_channel(
00253 "a_channel",
00254 so_4::socket::channels::create_client_factory( argv[ 1 ] ),
00255 filter,
00256 so_4::rt::comm::a_cln_channel_t::
00257 create_def_disconnect_handler( 5000, 0 ) );
00258 a_channel.so_add_traits(
00259 so_4::disp::active_obj::query_active_obj_traits() );
00260 std::cout << "Client threshold: in " << a_channel.in_threshold()
00261 << ", out " << a_channel.out_threshold()
00262 << std::endl;
00263
00264 a_sender_t a_sender(
00265 data_size, count, group_size, timeout );
00266
00267 so_4::rt::agent_t * agents[] =
00268 {
00269 &a_channel, &a_sender
00270 };
00271 so_4::rt::agent_coop_t coop( "server",
00272 agents, sizeof( agents ) / sizeof( agents[ 0 ] ) );
00273
00274 so_4::ret_code_t rc = so_4::api::start(
00275
00276 so_4::disp::active_obj::create_disp(
00277
00278 so_4::timer_thread::simple::create_timer_thread(),
00279 so_4::auto_destroy_timer ),
00280 so_4::auto_destroy_disp,
00281 &coop );
00282
00283 if( rc )
00284 {
00285 std::cerr << "start: " << rc << std::endl;
00286 }
00287
00288 return int( rc );
00289 }
00290
00291 std::cerr << "sample_high_traffic_client <[ip]:port> "
00292 "data_size count group_size timeout" << std::endl;
00293 return 1;
00294 }