Идея о том, как сделать адресацию сообщений в SObjectizer через mbox-ы

Author: Евгений Охотников
Contact: eao197 at intervale dot ru; eao197 at yahoo dot com
Version: 0.1
Date: 2009.10.06

Оглавление

Предисловие

Когда-то, давным-давно, над SObjectizer был написан дополнительный слой для передачи сообщений, который получил название MBAPI. Идея MBAPI заключалась в том, что сообщения пересылаются с одного mbox-а (почтового ящика) на другой. В процессе пересылки возможны и доставки сообщения до нескольких "слушателей" почтового ящика, и перехват, и перепосылка сообщения.

MBAPI оказалася настолько удобным, что все крупные проекты на SObjectizer, разработкой которых я занимался, используют MBAPI в качестве основного механизма обмена сообщениями. Что заставило меня задуматься о том, как в SObjectizer-5 сделать механизм MBAPI штатным механизмом самого SObjectizer, а не одной из его надстроек. Описываемая ниже идея является первым осязаемым результатом этих размышлений.

Идея

Суть предложения заключается в том, что сообщения в SObjectizer-5 отсылаются не между агентами, а между почтовыми ящиками -- mbox-ами. Для того, чтобы отослать сообщение, нужно получить доступ к mbox-у (либо владеть им, либо найти по имени). Для того, чтобы получить сообщение, необходимо добавить себя в список "слушателей" mbox-а.

В самом простом случае каждый агент будет владеть собственным mbox-ом (т.е., если проводить аналогию с SObjectizer-4, то в SObjectizer-4 имя владельца было mbox-ом). Для того, чтобы отослать сообщение агенту нужно отправить сообщение на его mbox. Например:

class a_master_t : public so_5::agent_t {
  private :
    // Ссылка на mbox подчиненного агента.
    so_5::mbox_t m_slave_mbox;
  public :
    // Собственный mbox.
    so_5::local_mbox_t self_mbox;
    ...
    void set_slave( so_5::mbox_t mbox ) { m_slave_mbox = mbox; }
    ...
    void send_command_to_slave(...) {
      so_5::msg_t< msg_command > message(...);
      // Вот так сообщение маршрутизируется на mbox
      // подчиненного агента.
      so_5::route( m_slave_mbox, message, self_mbox );
    }
  };
  class a_slave_t : public so_5::agent_t {
    private :
      // Ссылка на mbox агента-владельца.
      so_5::mbox_t m_master_mbox;
    public :
      // Собственный mbox.
      so_5::local_mbox_t self_mbox;
      ...
      void set_master( so_5::mbox_t mbox ) { m_master_mbox = mbox; }
      ...
      void send_notify_to_master(...) {
        so_5::msg_t< msg_notify > message(...);
        so_5::route( m_master_mbox, message, self_mbox );
      }
  };

  // Так может выглядеть взаимная подписка агентов.
  so_5::agent_t< a_master_t > master( new a_master_t(...) );
  so_5::agent_t< a_slave_t > slave( new a_slave_t(...) );
  master.set_slave( slave.self_mbox );
  slave.set_master( master.self_mbox );
  // ... После чего агенты регистрируются.

Смысл здесь в том, что агенты, как форма получения сообщений, вообще выводются из фокуса. Т.е. mbox может существовать и отдельно от агентов. А агент становиться просто основной формой обработки сообщений, которые отсылаются на mbox-ы.

Виды mbox-ов

Mbox-ы могут быть трех видов:

  • локальные, которые существуют только в рамках процесса и не видны другим процессам, связанным с текущим процессам через SOP;
  • глобальные, информация о которых рассылается по SOP-каналам во все подключившиеся процессы;
  • прокси, которые являются заглушками для глобальных mbox-ов на стороне клиента.

Локальные mbox-ы играют такую же роль, как сейчас имена обычных агентов в SObjectizer-4. С помощью локальных mbox-ов производится взаимодействие только в рамках одного SObjectizer-процесса. Поэтому локальные mbox-ы вполне могут быть анонимными (без имени). Либо же они могут иметь имя для того, чтобы их можно было найти через lockup_mbox.

А вот глобальные и прокси mbox-ы служат для огранизации распределенных SObjectizer-приложений. На стороне сервера (т.е. поставщика услуги) объявляется именованный глобальный mbox. Имя этого mbox-а сохраняется в специальном репозитории внутри SObjectizer. И, когда SObjectizer обнаруживает подключение нового приложения, он обменивается с удаленным приложением содержимым этого репозитория. Благодаря чему приложения узнают об удаленных mbox-ах.

На стороне клиента (т.е. потребителя услуги) объявляется именованный прокси mbox с тем же самым именем, как у глобального mbox-а. Это позволяет клиенту отсылать сообщения на удаленный mbox. За всем этим будет стоять внутренняя кухня ядра SObjectizer -- оно будет знать, какие есть прокси и в каких глобальных mbox-ах они заинтересованы. И как только появляется удаленный глобальный mbox, для которого есть прокси, ядро SObjectizer автоматически устанавливает связи между ними.

Т.е. пара из глобального mbox-а и прокси mbox-а будет играть такую же роль в SObjectizer-5, как и глобальные агенты в SObjectizer-4.

Вот как все это может выглядеть в коде:

// Поставщик услуги.
class a_service_t : public so_5::agent_t {
  private :
    so_5::global_mbox_t m_self_mbox;
  public :
    a_service_t( const std::string & service_name )
      : ... // передача параметров в конструктор базового типа.
      // Глобальный mbox должен быть проинициализирован.
      // Его именем будет имя предоставляемого агентом сервиса.
      , m_self_mbox( service_name )
      {}
  ...
};

// Потребитель услуги.
class a_client_t : public so_5::agent_t {
  private :
    so_5::proxy_mbox_t m_service_mbox;
  public :
    a_client_t( const std::string & service_name )
      : ... // передача параметров в конструктор базового типа.
      // Прокси mbox должен быть проинициализирован.
      , m_service_mbox( service_name )
      {}
  ...
};

Для ядра SObjectizer обычным делом будет существование глобального и прокси mbox-а в рамках одного процесса. В этом случае SObjectizer просто сразу свяжет их и все. Что позволит легко переконфигурировать SObjectizer-приложение -- для поставщика и потребителя услуги не будет разницы, работают ли они вместе или распределенно.

Для метода so_5::route() не будет разницы, какой именно mbox ему передают -- локальный, глобальный или прокси. Предположительно, все типы proxy-объектов будут либо производными от общего базового типа, либо же будет класс универсальной ссылки (so_5::mbox_t), с помощью которого будут представляться ссылки на любой тип mbox-а.

Широковещательная отсылка сообщений

При использовании mbox-ов отсылка сообщений становится, по сути, целенаправленной. Т.к., в большинстве случаев, за mbox-ом будет скрываться конкретный агент.

Но как выразить через конкретные mbox-ы широковещательную отсылку? Например, есть агент a_communicator, который рассылает уведомления о состояниях коммуникационных каналов. Как он будет извещать широковещательно всех желающих? Очень просто -- он будет владеть локальным mbox-ом. И отсылать сообщения будет на этот mbox. А все желающие будут на этот mbox подписываться.

Некоторые технические соображения

Mbox-ы -- это C++ объекты. В которых будет сохраняться информация о подписчиках. Т.е. когда агент выполняет подписку на сообщение, например, вот так:

void a_my_agent_t::so_define_agent() {
  so_subscribe( self_mbox )
      .in( st_normal )
          .event( &a_my_agent_t::evt_some_message );
  ...
}
void a_my_agent_t::evt_some_message(
  const so_5::event_t< msg_some_message > & ) { ... }

то SObjectizer будет сохранять информацию о потребителе сообщения во внетренних структурах mbox-а.

А при отсылке сообщения на mbox SObjectizer сразу получает от mbox-а список подписчиков, заинтересованных в получении данного типа сообщения. Что позволит, я надеюсь, не блокировать приложение на каком-то общем mutex-е (как это сейчас происходит в SObjectizer-4 при отсылке сообщения).

Другие вкусности

Перехват сообщений

Очень востребованной возможностью существующего MBAPI является возможность перехвата и перемаршрутизации сообщений. При переносе идеи MBAPI в ядро SObjectizer эту возможность хотелось бы сохранить.

Вот так эта возможность может выглядеть:

// Агент, который хочет перехватывать сообщение какого-то mbox-а.
class a_interceptor_sample_t : public so_5::agent_t {
  public :
    // Так выглядит обработчик события-перехватчика.
    void evt_sample(
      const so_5::intercepted_event_t< msg_some > & event )
      { ... }

    // А вот так будет выполняться определение перехвата.
    void so_define_agent() {
      // Перехват сообщения msg_some, которое идет на some_mbox
      // и у перехватчика 9-й приоритет.
      so_intercept( some_mbox, 9 )
        .in( st_normal )
          .event( &a_interceptor_sample_t::evt_sample );
    }
};

Синхронное взаимодействие агентов

Синхронное взаимодействие -- это штука, которой в SObjectizer-4 нет, но о потребности которой разговоры возникают регулярно. Думаю, что в SObjectizer-5 эту возможность нужно добавить. И выглядеть она может следующим образом:

// Агент, который обслуживает простые сихронные запросы.
// Т.е. возврат результата выполняется сразу при обработке запроса.
class a_simple_request_handler_t : public so_5::agent_t {
  public :
    // Так выглядит обработчик запроса.
    void evt_request_handler(
      const so_5::sync_request_t< some_request_t, some_reply_t > & request )
    {
      ... // какая-то обработка.
      // Возврат ответа.
      so_5::reply_t< some_reply_t > reply( ... );
      request.reply( reply );
    }
    ...
    // А так выглядит подписка на синхронный запрос.
    void so_define_agent() {
      so_define_request( some_mbox )
        .in( st_normal )
          .event( &a_simple_request_handler_t::evt_request_handler );
    }
};

// А так будет выглядеть отсылка синхронного запроса.
// Вместо метода route() используется метод request().
so_5::request_t< some_request_t > request( ... );
so_5::reply_t< some_replay_t > reply;
so_5::request( some_mbox, request, reply, my_mbox );

Если поставщик услуги не может сразу сгенерировать ответ на синхронный запрос, он может сохранить у себя структуру sync_request_t, и воспользоваться ей, когда такая возможность у него появится:

// Агент, который выполняет сложные синхронные запросы.
// Информация о запросе сохраняется в агенте, а когда появляется
// возможность ответить -- используется вновь.
class a_complex_request_handler_t : public so_5::agent_t {
  public :
    void evt_request_registrator(
      const so_5::sync_request_t< some_request_t, some_reply_t > & request )
    {
      store_request( request );
      ... // иницирование каких-то действий для обслуживания запроса.
    }
    // А ответ на запрос отсылается в обработчике какого-то сообщения.
    void evt_distant_response(
      const so_5::event_t< msg_response > & event )
    {
      so_5::reply_t< some_reply_t > reply( ... );
      extract_request( ... ).reply( reply );
    }
};

При выполнении метода so_5::request() SObjectizer будет проверять, не работает ли исполнитель запроса на той же самой нити. Если работает, то выполнение запроса отвергается.

В отличии от подписки на сообщение, у mbox-а может быть только один обработчик синхронных запросов конкретного типа.

Лямбда-функции из C++0x

В приведенных выше примерах выполнялась подписка на сообщения в виде методов агентов. Ничто, в общем-то, не запрещает подписывать на сообщения лямбда-функции. Что-то вроде такого:

int main() {
  so_5::runtime_t sobjectizer;

  sobjectizer.subscribe(
      sobjectizer.sobjectizer_agent_mbox(),
      [&sobjectizer]( const so_5::event_t< so_5::msg_start > & ) {
          std::cout << "Hello, world!";
          sobjectizer.initiate_normal_shutdown();
      }
  );

  sobjectizer.start(...);
}

Сравнение данной нотации с нотацией SObjectizer-4

Здесь приведено сравнение описания одного и того же агента из реального проекта в нотации SObjectizer-4 и с использованием идеи об mbox-ах.

Hosted by uCoz