=========================================================================== Идея контроля переполненности очереди сообщений на основе исходящих каналов =========================================================================== :Author: Евгений Охотников :Contact: eao197 at intervale dot ru; eao197 at yahoo dot com :Version: 0.1 :Date: 2009.01.04 .. _SObjectizer: http://sobjectizer.sourceforge.net .. _ACE: http://www.cs.wustl.edu/~schmidt/ACE.html .. contents:: **Оглавление** Мотивация ========== В SObjectizer_ одной из нерешенных проблем является отсутствие контроля за переполненностью очередей сообщений. Сейчас агенту-получателю можно сгенерировать столько сообщений, сколько он не в состоянии будет обработать. В ряде случаев такие проблемы имели место быть и тогда наблюдалась деградация приложения: агенты в нем генерировали друг для друга больше сообщений, чем в состоянии были обработать. В результате чего приложение неудержимо потребляло память и на 100% загружало процессор, но не выполняло полезной работы. В некоторых системах, построенных на обмене сообщениями, возможно временное приостановление работы нити, осуществляющей отправку сообщения, если получатель сообщений не успевает разгрести уже имеющиеся у него сообщения. Например, при использовании библиотеки ACE_ методы ``ACE_Message_Queue::enqueue*`` приостанавливают работу вызвавшей их нити, если очередь сообщений переполнена. Применить такой подход один в один в SObjectizer не представляется возможным. Ведь вызов ``send_msg`` обычно происходит на контексте какой-нибудь рабочей нити диспетчера. Поэтому, если внутри ``send_msg`` произойдет приостановка этой рабочей нити, то не смогут продолжать работать другие агенты, попавшие на эту рабочую нить. В том числе, возможно, и агент-получатель сообщения. Из-за этого ранее рассматривалось несколько других подходов к решению данной проблемы: 1. Выбрасывание "лишних" сообщений. Т.е., если агента перегрузили, то следует выбросить часть нагрузки, чтобы помочь агенту. При этом программисту предоставляются возможности выбора сообщений для выбрасывания. Например, либо игнорируются новые сообщения, либо удаляются самые старые сообщения. 2. Перенаправление "лишних" сообщений на других агентов. Т.е., если основной агент выполняет реальную, возможно, длительную обработку сообщений, то в случае увеличения нагрузки можно передать сообщение агенту, который будет делать совсем другую, менее затратную обработку. Например, просто фиксировать данную ситуацию в журнале работы. В ряде случаев ни один из данных подходов не является таким удобным, как приостановка нити отправителя сообщения. Например, в задачах, в которых агенты образуют цепочку (конвейер) для выполнения последовательности действий. К примеру, первый агент считывает фрагмент csv-файл и передает его второму агенту. Второй агент производит разбор csv-файла и выбирает оттуда транзакции, подлежащие импорту в БД. Эти транзакции передаются третьему агенту, который выполняет импорт. Каждый из агентов может быть активным агентом, работать на контексте собственной рабочей нити, т.е. все стадии будут работать параллельно. Но нужно следить за тем, чтобы никакой из агентов этой цепочки не сгенерировал для следующего агента слишком много работы. Очень просто это решается, если для каждого агента устанавливается некоторый размер очереди. Тогда при попытке отправить агенту "лишнее" сообщение, отправитель приостанавливается до тех пор, пока в очереди получателя не появится свободное место. Идея ==== Идея состоит в том, чтобы для каждого агента (может быть даже для каждого события агента) было известно, кому он может отсылать сообщения. Тогда перед диспетчеризацией очередного события агента диспетчер может проверить, есть ли свободное место в очередях потенциальных получателей. Если место есть, то событие агента запускается на обработку. Если же очереди потенциальных получателей переполнены, то событие агента "откладывается" -- диспетчер не выполняет его до тех пор, пока очереди не освободятся. Отсылку сообщений агентом можно рассматривать как передачу сообщения в какой-то исходящий канал (channel, pipeline). Поскольку агенты, обычно, точно знают какие сообщения они будут отсылать (хотя не всегда заранее известны имена получателей сообщений), то существует возможность в описании агента указать его исходящие каналы. Например, что-то вроде: :: # Класс агента, считывающего фрагменты csv-файла. agent_class :a_csv_reader do |c| c.msg_slot :msg_start c.msg_slot :msg_read_next_fragment # Канал для отсылки сообщений с прочитанными фрагментами. c.ochannel :read_fragments, :msg_next_fragment c.state :st_normal do |st| st.on :msg_start => :evt_open st.on :msg_read_next_fragment => :evt_read_next_fragment end end # Класс агента, выполняющего разбор фрагментов csv-файла. agent_class :a_csv_parser do |c| c.msg_slot :msg_next_fragment # Канал для отсылки сообщений с разобранными строками. c.ochannel :parsed_lines, :msg_parsed_lines c.state :st_normal do |st| s.on :msg_next_fragment => :evt_next_fragment end end # Класс агента, выполняющего импорт строк в БД. agent_class :a_db_importer do |c| c.msg_slot :msg_start c.msg_slot :msg_parsed_lines c.state :st_initial do |st| st.on :msg_start => :evt_connect_to_db end c.state :st_normal do |st| st.on :msg_parsed_lines => :evt_import_lines end c.state :st_failure do |st| st.on_enter :on_enter_st_failure end end Здесь у агентов типа ``a_csv_reader`` и ``a_csv_parser`` есть по одному исходящему каналу и SObjectizer будет знать о них. Когда агенты выполнят привязку каналов к реальным получателям, SObjectizer получит возможность проверять уровень заполненности каждого из каналов. Что позволит диспетчеру приостанавливать события агентов, исходящие каналы которых переполнены. Белые пятна =========== Данная идея является очень сырой. Она была зафиксирована в виде RFC для того, чтобы не потеряться -- может быть через какое-то время эта идея найдет свое продолжение в каких-то других воплощениях. Пока же она оставляет большое количество "белых пятен". Сообщения, которые отсылают не агенты ------------------------------------- В SObjectizer сообщения могут быть отосланны не только агентами, но и той частью приложения, которая не построена на основе SObjectizer. Например, GUI-приложения, вроде Globe, в которых объекты-окна, не являющиеся агентами, могут отсылать сообщения SObjectizer-агентам. В принципе, ``send_msg`` может проверять, вызывают ли ее на контексте одной из рабочих нитей SObjectizer. И, если это не так, то ``send_msg`` может приостанавливать работу вызвавшей ее нити. Сообщения, которые агент отсылает сам себе ------------------------------------------ Допустим, агент владеет сообщением ``msg_do_something``, которое он может отсылать самому себе. Так же это сообщение ему могут отсылать и другие агенты. В один прекрасный момент сообщений ``msg_do_something`` может скопиться слишком много и любой агент, который попытается его отослать, будет приостановлен. В том числе и сам агент-владелец. В такой ситуации SObjectizer не должен откладывать выполнение событий агента-владельца -- они всегда должны исполняться. Даже если в результате количество ``msg_do_something`` будет увеличиваться. Накладные расходы на проверку заполненности каналов --------------------------------------------------- Одним из ключевых факторов обеспечения высокой скорости работы SObjectizer являтся скорость, с которой диспетчер выбирает и обслуживает заявки на запуск событий агентов. Естественно, что при необходимости проверять уровень заполненности каналов агента эта скорость может снизиться. Вопрос: на сколько? В принципе, можно не проверять каналы агента в диспетчере. Вместо этого, для каждого канала будет известно, какой агент в него пишет (т.е. отсылает сообщения). Когда канал переполняется (что определяется в методе ``send_msg``) для этого агента будет выставляться признак того, что он должен быть приостановлен. А диспетчер при извлечении очередной заявки просто проверит этот признак. Что может быть гораздо эффективнее, чем опрос состояния каналов агента. .. vim:ts=2:sw=2:sts=2:expandtab:tw=78:ft=rst: