Идея контроля переполненности очереди сообщений на основе исходящих каналов

Author: Евгений Охотников
Contact: eao197 at intervale dot ru; eao197 at yahoo dot com
Version: 0.1
Date: 2009.01.04

Оглавление

Мотивация

В SObjectizer одной из нерешенных проблем является отсутствие контроля за переполненностью очередей сообщений. Сейчас агенту-получателю можно сгенерировать столько сообщений, сколько он не в состоянии будет обработать. В ряде случаев такие проблемы имели место быть и тогда наблюдалась деградация приложения: агенты в нем генерировали друг для друга больше сообщений, чем в состоянии были обработать. В результате чего приложение неудержимо потребляло память и на 100% загружало процессор, но не выполняло полезной работы.

В некоторых системах, построенных на обмене сообщениями, возможно временное приостановление работы нити, осуществляющей отправку сообщения, если получатель сообщений не успевает разгрести уже имеющиеся у него сообщения. Например, при использовании библиотеки ACE методы ACE_Message_Queue::enqueue* приостанавливают работу вызвавшей их нити, если очередь сообщений слишком сильно заполнена.

Применить такой подход один в один в SObjectizer не представляется возможным. Ведь вызов send_msg обычно происходит на контексте какой-нибудь рабочей нити диспетчера. Поэтому, если внутри send_msg произойдет приостановка этой рабочей нити, то не смогут продолжать работать другие агенты, попавшие на эту рабочую нить. В том числе, возможно, и агент-получатель сообщения.

Из-за этой особенности SObjectizer ранее приходилось рассматривать несколько других подходов к решению данной проблемы:

  1. Выбрасывание "лишних" сообщений. Т.е., если агента перегрузили, то лучше выбросить часть нагрузки, чтобы помочь агенту. При этом программисту предоставляются возможности выбора сообщений для выбрасывания. Например, либо выбрасываются новые сообщения, либо наоборот, выбрасываются старые сообщения.
  2. Перенаправление "лишних" сообщений на других агентов. Т.е., если основной агент выполняет реальную, возможно, длительную обработку сообщений, то в случае увеличения нагрузки можно передать сообщение агенту, который будет делать совсем другую, менее затратную обработку. Например, просто фиксировать данную ситуацию в журнале работы и отправлять в ответ сообщение об ошибке.

Ни один из данных подходов не является в ряде случаев таким удобным, как приостановка нити отправителя сообщения. Например, в задачах, в которых агенты образуют цепочку (конвейер) для выполнения последовательности действий. К примеру, первый агент считывает фрагмент csv-файл и передает его второму агенту. Второй агент производит разбор csv-файла и выбирает оттуда транзакции, подлежащие импорту в БД. Эти транзакции передаются третьему агенту, который выполняет импорт. Каждый из агентов может быть активным агентом, работать на контексте собственной рабочей нити, что позволяет им выполнять часть своей работы параллельно. Но нужно следить за тем, чтобы никакой из агентов этой цепочки не сгенерировал для следующего агента слишком много работы. Очень просто это решается, если для каждого агента устанавливается некоторый размер очереди (скажем, пять сообщений). Тогда при попытке отправить агенту "лишнее" сообщение, отправитель приостанавливается до тех пор, пока в очереди получателя не появится свободное место.

Идея

Идея состоит в том, чтобы для каждого агента (может быть даже для каждого события агента) было известно, кому он может отсылать сообщения. Тогда перед диспетчеризацией очередного события агента диспетчер может проверить, есть ли свободное место в очередях потенциальных получателей. Если место есть, то событие агента диспетчируется. Если же очереди потенциальных получателей переполнены, то событие агента "откладывается" -- диспетчер не выполняет его до тех пор, пока очереди не освободятся.

Отсылку сообщений агентом можно рассматривать как передачу сообщения в какой-то исходящий канал (channel, pipeline). Поскольку агенты, обычно, точно знают какие сообщения они будут отсылать (хотя не всегда заранее известны имена получателей сообщений), то есть возможность в описании агента указать его исходящие каналы. Например, что-то вроде:

# Класс агента, считывающего фрагменты csv-файла.
agent_class :a_csv_reader do |c|
  c.msg_slot :msg_start
  c.msg_slot :msg_read_next_fragment

  # Канал для отсылки сообщений с прочитанными фрагментами.
  c.ochannel :read_fragments, :msg_next_fragment

  c.state :st_normal do |st|
    st.on :msg_start => :evt_open
    st.on :msg_read_next_fragment => :evt_read_next_fragment
  end
end

# Класс агента, выполняющего разбор фрагментов csv-файла.
agent_class :a_csv_parser do |c|
  c.msg_slot :msg_next_fragment

  # Канал для отсылки сообщений с разобранными строками.
  c.ochannel :parsed_lines, :msg_parsed_lines

  c.state :st_normal do |st|
    s.on :msg_next_fragment => :evt_next_fragment
  end
end

# Класс агента, выполняющего импорт строк в БД.
agent_class :a_db_importer do |c|
  c.msg_slot :msg_start
  c.msg_slot :msg_parsed_lines

  c.state :st_initial do |st|
    st.on :msg_start => :evt_connect_to_db
  end

  c.state :st_normal do |st|
    st.on :msg_parsed_lines => :evt_import_lines
  end

  c.state :st_failure do |st|
    st.on_enter :on_enter_st_failure
  end
end

Здесь у агентов типа a_csv_reader и a_csv_parser есть по одному исходящему каналу. SObjectizer будет знать о них. Когда агенты выполнят привязку каналов к реальным получателям, SObjectizer получит возможность проверять уровень заполненности каждого из каналов. Что позволит диспетчеру оставлять в очереди заявок события агентов, исходящие каналы которых переполнены.

Белые пятна

Данная идея является очень сырой. Она была зафиксирована в виде RFC для того, чтобы не потеряться -- может быть через какое-то время эта идея найдет свое продолжение в каких-то других воплощениях. Пока же она оставляет большое количество "белых пятен".

Сообщения, которые отсылают не агенты

В SObjectizer сообщения могут быть отосланны не только агентами, но и той частью приложения, которая не построена на основе SObjectizer. Например, GUI-приложения, вроде Globe, в которых объекты-окна, не являющиеся агентами, могут отсылать сообщения SObjectizer-агентам.

В принципе, send_msg может проверять, вызывают ли ее на контексте одной из рабочих нитей SObjectizer. И, если это не так, то send_msg может приостанавливать работу вызвавшей ее нити.

Сообщения, которые агент отсылает сам себе

Допустим, агент владеет сообщением msg_do_something, которое он может отсылать самому себе. Так же это сообщение ему могут отсылать и другие агенты. В один прекрасный момент сообщений msg_do_something может скопиться слишком много и любой агент, который попытается его отослать, будет приостановлен. В том числе и сам агент-владелец.

В такой ситуации SObjectizer не должен откладывать выполнение событий агента-владельца -- они всегда должны исполняться. Даже если в результате количество msg_do_something будет увеличиваться.

Накладные расходы на проверку заполненности каналов

Одним из ключевых факторов обеспечения высокой скорости работы SObjectizer являтся скорость, с которой диспетчер выбирает и обслуживает заявки на запуск событий агентов. Естественно, что при необходимости проверять уровень заполненности каналов агента эта скорость может снизиться. Вопрос: на сколько?

В принципе, можно не проверять каналы агента в диспетчере. Вместо этого, для каждого канала будет известно, какой агент в него пишет (т.е. отсылает сообщения). Когда канал переполняется (что определяется в методе send_msg) для этого агента будет выставляться признак того, что он должен быть приостановлен. А диспетчер при извлечении очередной заявки просто проверит этот признак. Что может быть гораздо эффективнее, чем опрос состояния каналов агента.

Hosted by uCoz