00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019 #include <iostream>
00020 #include <map>
00021
00022 #include <cpp_util_2/h/lexcast.hpp>
00023
00024 #include <threads_1/h/threads.hpp>
00025
00026 #include <so_4/api/h/api.hpp>
00027 #include <so_4/rt/h/rt.hpp>
00028
00029 #include <so_4/rt/comm/h/a_raw_srv_channel.hpp>
00030 #include <so_4/socket/channels/h/channels.hpp>
00031
00032 #include <so_4/timer_thread/simple/h/pub.hpp>
00033 #include <so_4/disp/active_obj/h/pub.hpp>
00034
00035
00036 const std::string router_agent_name( "a_router" );
00037
00038 const std::string server_agent_name( "a_server" );
00039
00040
00041 class a_listener_t
00042 : public so_4::rt::agent_t
00043 {
00044 typedef so_4::rt::agent_t base_type_t;
00045 public :
00046 a_listener_t( const std::string & agent_name )
00047 : base_type_t( agent_name )
00048 {
00049 so_4::disp::active_obj::make_active( *this );
00050
00051 std::cout << so_query_name() << " created" << std::endl;
00052 }
00053 virtual ~a_listener_t()
00054 {
00055 std::cout << so_query_name() << " destroyed" << std::endl;
00056 }
00057
00058
00059 typedef so_4::rt::comm::msg_raw_package msg_data;
00060
00061 virtual const char *
00062 so_query_type() const;
00063
00064 void
00065 so_on_subscription()
00066 {
00067 so_subscribe( "evt_data", "msg_data" );
00068 }
00069
00070 void
00071 evt_data( const msg_data & cmd )
00072 {
00073 std::cout << "agent: " << so_query_name()
00074 << ", channel: " << cmd.m_channel
00075 << ", received: " << cmd.m_package.size()
00076 << " byte(s)" << std::endl;
00077
00078 cmd.unblock_channel();
00079 }
00080 };
00081
00082 SOL4_CLASS_START( a_listener_t )
00083
00084 SOL4_MSG_START( msg_data, a_listener_t::msg_data )
00085 SOL4_MSG_CHECKER( a_listener_t::msg_data::check )
00086 SOL4_MSG_FINISH()
00087
00088 SOL4_EVENT_STC( evt_data, a_listener_t::msg_data )
00089
00090 SOL4_STATE_START( st_normal )
00091 SOL4_STATE_EVENT( evt_data )
00092 SOL4_STATE_FINISH()
00093
00094 SOL4_CLASS_FINISH()
00095
00096
00097 class a_router_t
00098 : public so_4::rt::agent_t
00099 {
00100 typedef so_4::rt::agent_t base_type_t;
00101 private :
00102
00103
00104
00105
00106 so_4::rt::comm::a_raw_srv_channel_t m_srv_channel;
00107 so_4::rt::agent_coop_t m_srv_channel_coop;
00108
00109
00110 typedef std::map< so_4::rt::comm_channel_t, std::string >
00111 client_map_t;
00112
00113
00114
00115
00116 client_map_t m_clients;
00117
00118
00119 int m_child_counter;
00120
00121
00122
00123 void
00124 setup_subscr_hook(
00125 const std::string & event,
00126 const std::string & msg )
00127 {
00128 so_4::rt::def_subscr_hook( m_srv_channel_coop,
00129 *this, event, m_srv_channel, msg, 0, &std::cerr,
00130 so_4::rt::evt_subscr_t::insend_dispatching );
00131 }
00132
00133 public :
00134 a_router_t(
00135 const std::string & ip )
00136 : base_type_t( router_agent_name )
00137 , m_srv_channel( server_agent_name,
00138 so_4::socket::channels::create_server_channel( ip ) )
00139 , m_srv_channel_coop( m_srv_channel )
00140 , m_child_counter( 0 )
00141 {
00142
00143 so_4::disp::active_obj::make_active( m_srv_channel );
00144
00145 std::cout << so_query_name() << " created" << std::endl;
00146 }
00147 virtual ~a_router_t()
00148 {
00149 std::cout << so_query_name() << " destroyed" << std::endl;
00150 }
00151
00152 virtual const char *
00153 so_query_type() const;
00154
00155 virtual void
00156 so_on_subscription()
00157 {
00158 so_subscribe( "evt_start",
00159 so_4::rt::sobjectizer_agent_name(), "msg_start" );
00160 }
00161
00162 void
00163 evt_start()
00164 {
00165
00166
00167 setup_subscr_hook( "evt_srv_channel_success", "msg_success" );
00168 setup_subscr_hook( "evt_srv_channel_fail", "msg_fail" );
00169 setup_subscr_hook( "evt_client_connected",
00170 "msg_client_connected" );
00171 setup_subscr_hook( "evt_client_disconnected",
00172 "msg_client_disconnected" );
00173 setup_subscr_hook( "evt_channel_data",
00174 "msg_raw_package" );
00175
00176
00177
00178 m_srv_channel_coop.set_parent_coop_name(
00179 so_query_coop()->query_name() );
00180 so_4::api::register_coop( m_srv_channel_coop );
00181 }
00182
00183 void
00184 evt_srv_channel_success()
00185 {
00186 std::cout << "server channel created" << std::endl;
00187 }
00188
00189 void
00190 evt_srv_channel_fail(
00191 const so_4::rt::comm::a_srv_channel_base_t::msg_fail & cmd )
00192 {
00193 std::cout << "server channel not created: "
00194 << cmd.m_ret_code << std::endl;
00195 }
00196
00197 void
00198 evt_client_connected(
00199 const so_4::rt::comm::msg_client_connected & cmd )
00200 {
00201 std::cout << "client connected: "
00202 << cmd.m_channel << std::endl;
00203
00204
00205 std::string agent_name( "a_listener_" +
00206 cpp_util_2::slexcast( ++m_child_counter ) );
00207 so_4::rt::dyn_agent_coop_t * coop(
00208 new so_4::rt::dyn_agent_coop_t(
00209 new a_listener_t( agent_name ) ) );
00210
00211 coop->set_parent_coop_name(
00212 so_query_coop()->query_name() );
00213
00214
00215 so_4::rt::dyn_agent_coop_helper_t h( coop );
00216 if( !h.result() )
00217 {
00218 m_clients[ cmd.m_channel ] = agent_name;
00219 }
00220 }
00221
00222 void
00223 evt_client_disconnected(
00224 const so_4::rt::comm::msg_client_disconnected & cmd )
00225 {
00226 std::cout << "client disconnected: "
00227 << cmd.m_channel << std::endl;
00228
00229 client_map_t::iterator it( m_clients.find( cmd.m_channel ) );
00230 if( it != m_clients.end() )
00231 {
00232
00233
00234 so_4::api::deregister_coop( it->second );
00235 m_clients.erase( it );
00236 }
00237 }
00238
00239 void
00240 evt_channel_data(
00241 const so_4::rt::comm::msg_raw_package & cmd )
00242 {
00243
00244 client_map_t::iterator it( m_clients.find( cmd.m_channel ) );
00245 if( it != m_clients.end() )
00246 {
00247 so_4::api::send_msg_safely( it->second, "msg_data",
00248 new a_listener_t::msg_data( cmd ) );
00249 }
00250 }
00251 };
00252
00253 SOL4_CLASS_START( a_router_t )
00254
00255 SOL4_EVENT( evt_start )
00256 SOL4_EVENT( evt_srv_channel_success )
00257 SOL4_EVENT_STC( evt_srv_channel_fail,
00258 so_4::rt::comm::a_srv_channel_base_t::msg_fail )
00259 SOL4_EVENT_STC( evt_client_connected,
00260 so_4::rt::comm::msg_client_connected )
00261 SOL4_EVENT_STC( evt_client_disconnected,
00262 so_4::rt::comm::msg_client_disconnected )
00263 SOL4_EVENT_STC( evt_channel_data,
00264 so_4::rt::comm::msg_raw_package )
00265
00266 SOL4_STATE_START( st_normal )
00267 SOL4_STATE_EVENT( evt_start )
00268 SOL4_STATE_EVENT( evt_srv_channel_success )
00269 SOL4_STATE_EVENT( evt_srv_channel_fail )
00270 SOL4_STATE_EVENT( evt_client_connected )
00271 SOL4_STATE_EVENT( evt_client_disconnected )
00272 SOL4_STATE_EVENT( evt_channel_data )
00273 SOL4_STATE_FINISH()
00274
00275 SOL4_CLASS_FINISH()
00276
00277
00278
00279
00280 class sobj_thread_t :
00281 public threads_1::thread_t
00282 {
00283 public :
00284 sobj_thread_t();
00285 virtual ~sobj_thread_t();
00286
00287 protected :
00288 virtual void
00289 body();
00290 };
00291
00292 sobj_thread_t::sobj_thread_t()
00293 {
00294 }
00295
00296 sobj_thread_t::~sobj_thread_t()
00297 {
00298 }
00299
00300 void
00301 sobj_thread_t::body()
00302 {
00303 so_4::ret_code_t rc = so_4::api::start(
00304
00305 so_4::disp::active_obj::create_disp(
00306
00307 so_4::timer_thread::simple::create_timer_thread(),
00308 so_4::auto_destroy_timer ),
00309 so_4::auto_destroy_disp, 0 );
00310 if( rc )
00311 {
00312 std::cerr << "start: " << rc << std::endl;
00313 }
00314 }
00315
00316 void
00317 create_coop( const std::string & ip )
00318 {
00319 so_4::rt::dyn_agent_coop_helper_t helper(
00320 new so_4::rt::dyn_agent_coop_t(
00321 new a_router_t( ip ) ) );
00322 }
00323
00324 void
00325 destroy_coop()
00326 {
00327 so_4::api::deregister_coop( router_agent_name );
00328 }
00329
00330 int
00331 main( int argc, char ** argv )
00332 {
00333 if( 2 == argc )
00334 {
00335 sobj_thread_t thread;
00336 thread.start();
00337
00338
00339
00340 threads_1::sleep_thread( 1000 );
00341
00342 bool is_continue = true;
00343 while( is_continue )
00344 {
00345 std::string choice;
00346
00347 std::cout << "Choose action:\n"
00348 "\t0 - quit\n"
00349 "\t1 - create router coop\n"
00350 "\t2 - destroy router coop\n> "
00351 << std::flush;
00352
00353 std::cin >> choice;
00354
00355 if( choice == "0" )
00356 {
00357
00358 so_4::api::send_msg(
00359 so_4::rt::sobjectizer_agent_name(),
00360 "msg_normal_shutdown", 0,
00361 so_4::rt::sobjectizer_agent_name() );
00362 is_continue = false;
00363 }
00364 else if( choice == "1" )
00365
00366 create_coop( argv[ 1 ] );
00367 else if( choice == "2" )
00368
00369 destroy_coop();
00370 }
00371
00372
00373 thread.wait();
00374
00375 return 0;
00376 }
00377 else
00378 {
00379 std::cerr << "sample_parent_insend <server_sock_addr>"
00380 << std::endl;
00381
00382 return -1;
00383 }
00384 }