Перехватчики сообщений в SObjectizer

Author: Евгений Охотников
Contact: eao197 at intervale dot ru; eao197 at yahoo dot com
Version: 1.0
Date: 2007.09.03

Оглавление

Мотивация

Идея о том, что в SObjectizer желательно было бы иметь какой-то механизм перехвата сообщений, существует достаточно давно. Ее реализации препятствовало два момента: во-первых, не часто возникали задачи, в которых подобный перехват был бы критически важен (поскольку такого механизма в SObjectizer до сих пор нет, то можно сказать, что таких задач не было) [1]; во-вторых не хотелось усложнять SObjectizer дополнительными механизмами.

Тем не менее, временами появляются задачи, которые могли бы быть решены с помощью перехвата сообщений проще, чем без перехвата. В качестве примера такой задачи можно рассмотреть агента, обслуживающего серверный TCP/IP сокет. Этот агент должен обрабатывать не один сокет, а несколько -- сам серверный сокет, на котором выполняются операции accept, и по одному сокету на каждого подключившегося клиента.

Подобный серверный агент мог бы для каждого клиентского подключения создавать отдельного клиента, обслуживающего только одно соединение. Серверный агент создавал бы дочерних агентов тайно, т.е. про их существование не нужно никому знать, снаружи должен быть виден только серверный агент.

Но для таких неявных агентов возникает проблема: пользователь серверного агента отсылает сообщения msg_send_package, msg_unblock_channel, msg_close_channel серверному агенту. Причем пользователь может отсылать эти сообщения целенаправлено. В результате серверному агенту пришлось бы получать эти сообщения только для того, чтобы переслать их нужному дочернему агенту. Но, например, для msg_send_package это слишком расточительное решение.

При наличии перехвата сообщений эта задача решается проще: имена дочерних агентов могут строится по какому-то правилу на основании идентификатора канала. Серверный агент регистрирует для своих сообщений msg_send_package, msg_unblock_channel и др. перехватчик, который извлекает из сообщения идентификатор канала, строит на его основе имя дочернего агента и адресует сообщение этому агенту.

[1]Если говорить более точно, то задачи, в которых был критически важен перехват и перемаршрутизация сообщений, существовали и успешно решались с помощью библиотеки MBAPI. Но MBAPI в дополнение к перехвату и перемаршрутизации сообщений добавляла еще некоторые специфические возможности. А вот критичных к перехвату на уровне именно SObjectizer-сообщений задач действительно не было.

Как может выглядеть перехватчик

Перехватчиком может служить объект, реализующий следующий интерфейс:

class message_interceptor_t
  {
  public :
    virtual ~message_interceptor_t();

    virtual so_4::ret_code_t
    handle(
      const message_routing_info_t & source_info,
      const message_routing_info_t & current_info,
      message_interception_info_t & result ) const = 0;
  };

Метод message_interceptor_t::handle получает в качестве аргумента source_info структуру с полным описанием начального экземпляра сообщения (в которое входит имя агента-владельца, имя сообщения, имя агента-получателя, указатель на тело сообщения, идентификатор коммуникационного канала, в который отсылается сообщение и т.д.). Аргумент current_info содержит полное описание текущего экземпляра сообщения. Начальное и текущее описание может не совпадать при переходе от одного перехватчика в списке к другому.

Метод message_interceptor_t::handle должен возвратить в параметре result те значения, которые заменяют параметры исходного сообщения. Так, если перехватчик заменяет имя получателя, то это имя должно быть задано в result.

Что может изменять перехватчик

Перехватчик может указать:

  • что проход по списку перехватчиков нужно остановить, а экземпляр сообщения выбросить и не выполнять диспетчеризации сообщения;
  • что проход по списку перехватчиков нужно остановить и использовать значения current_info и result для формирования измененного экземпляра сообщения. Этот экземпляр уходит на диспетчеризацию;
  • что проход по списку перехватчиков нужно продолжить и использовать значения current_info и result для формирования нового current_info перед обращением к следующему перехватчику.

Что перехватчик может изменить:

  • имя получателя сообщения;
  • идентификатор канала в который отосланно сообщение.

Как может выглядеть процедура установки перехватчиков

Для установки перехватчиков можно использовать следующие функции:

/*
 * Добавление перехватчика в конец списка перехватчиков.
 * Функция предназначена для случая, когда перехватчику не важно,
 * кто будет в списке перехватчиков до и после него.
 */
so_4::ret_code_t
setup_interceptor(
  // Владелец перехватываемого сообщения.
  const std::string & agent,
  // Имя перехватываемого сообщения.
  const std::string & msg,
  // Уникальное имя перехватчика.
  const std::string & interceptor_name,
  // Сам перехватчик.
  std::auto_ptr< message_interceptor_t > interceptor );

/*
 * Добавление перехватчика в список перехватчиков перед указанным
 * перехватчиком.
 */
so_4::ret_code_t
setup_interceptor_before(
  // Владелец перехватываемого сообщения.
  const std::string & agent,
  // Имя перехватываемого сообщения.
  const std::string & msg,
  // Уникальное имя перехватчика.
  const std::string & interceptor_name,
  // Сам перехватчик.
  std::auto_ptr< message_interceptor_t > interceptor,
  // Имя перехватчика, перед которым нужно вставлять
  // новый перехватчик.
  const std::string & interceptor_before );

/*
 * Добавление перехватчика в список перехватчиков после указанного
 * перехватчика.
 */
so_4::ret_code_t
setup_interceptor_after(
  // Владелец перехватываемого сообщения.
  const std::string & agent,
  // Имя перехватываемого сообщения.
  const std::string & msg,
  // Уникальное имя перехватчика.
  const std::string & interceptor_name,
  // Сам перехватчик.
  std::auto_ptr< message_interceptor_t > interceptor,
  // Имя перехватчика, перед которым нужно вставлять
  // новый перехватчик.
  const std::string & interceptor_after );

Для изъятия перехватчика может использоваться функция:

so_4::ret_code_t
remove_interceptor(
  // Владелец перехватываемого сообщения.
  const std::string & agent,
  // Имя перехватываемого сообщения.
  const std::string & msg,
  // Уникальное имя перехватчика.
  const std::string & interceptor_name,
  // Приемник изымаемого перехватчика.
  std::auto_ptr< message_interceptor_t > & interceptor );

Примечание. Открытым остается вопрос о том, что делать при изъятии перехватчика, для добавления которого использовалась функция setup_interceptor_before или setup_interceptor_after -- ведь тогда разрывается специально выстроенная пользователем цепочка.

Момент обращения к перехватчикам в SObjectizer Run-Time

Предполагается задействовать перехватчики после того, как для экземпляра сообщения будет проведена проверка корректности (т.е. задействован метод-checker) и перед помещением заявок в очередь диспетчера.

Это означает, что отложенные и периодические сообщения будут проходить через список перехватчиков только после того, как нить таймера отправит их на диспетчеризацию.

Поскольку проверка корректности сообщения выполняется при разблокированном ядре SObjectizer, то и вызов перехватчиков сообщений имеет смысл делать при разблокированном ядре. Это означает, что перехватчики могут осуществлять вызов функций SObjectizer Run-Time, например, для отсылки новых сообщений.

Процедура обращения к перехватчикам

Перед обращением к перехватчикам создается исходный объект message_routing_info_t с начальным описанием сообщения. Это будет объект для параметра source_info. Из него же строится объект для параметра current_info.

После этого начинается движение по списку перехватчиков. После обращения к очередному перехватчику на основании result и current_info строится новое значение current_info. Если перехватчик запретил обращение к остальным перехватчикам (т.е. перехватил сообщение), то движение по списку перехватчиков прекращается. В противном случае выполняется вызов следующего перехватчика в списке.

После завершения движения по списку перехватчиков:

Пример перехватчика-перемаршрутизатора сообщения

Ниже приведен пример перехватчика сообщения msg_send_package для агента, обслуживающего серверный сокет. Этот агент создает по одному дочернему агенту на каждое клиентское подключение. У дочерних агентов имена формируются по принципу <parent>:<client>, где parent -- это имя родительского агента, а client -- это внутренний идентификатор клиента, совпадающий со значением comm_channel_t::client().

Перехват заключается в том, чтобы назначить сообщению имя дочернего агента в качестве имени агента-получателя (т.е. произвести целенаправленную отсылку сообщения дочернему агенту).

/*
 * Код перехватчика сообщения.
 */
class send_package_rerouter_t
  : public so_4::rt::message_interceptor_t
  {
  public :
    send_package_rerouter_t(
      const std::string & parent )
      : m_parent( parent )
      {}
    virtual ~send_package_rerouter_t()
      {}

    virtual so_4::ret_code_t
    handle(
      const message_routing_info_t & /* исходная информация не важна */,
      const message_routing_info_t & current_info,
      message_interception_info_t & result )
      {
        msg_send_package * msg = reinterpret_cast< msg_send_package * >(
            current_info.message_data() );
        result.set_receiver( m_parent + ":" + msg->m_client );

        return so_4::ret_code_t();
      }

  private :
    const std::string m_parent;
  };

/*
 * Так выглядит установка перехватчика в агенте серверного сокета.
 */
void
a_raw_incoming_channel_t::so_on_subscription()
  {
    ...
    so_4::api::setup_interceptor(
        so_query_name(),
        "msg_send_message",
        "msg_send_message::rerouter",
        std::auto_ptr< so_4::rt::message_interceptor_t >(
            new send_package_rerouter_t( so_query_name() ) ) );
  }
Hosted by uCoz