Рекомендации по разработке асинхронных обработчиков

40 1

Почти каждый разработчик, работавший с Directum RX, сталкивался с асинхронными обработчиками. Несмотря на индивидуальность задач обработчиков, у большинства есть общие черты и проблемы.

Цель статьи - собрать лучшие практики разработки асинхронных обработчиков, чтобы минимизировать ошибки и упростить их анализ. Так же рассмотрим способ поиска зацикленных обработчиков.

Что нужно учесть при разработке асинхронного обработчика:

  • валидацию входных параметров;
  • период повтора и ограничение количества итераций;
  • обработку блокировок и прочих ошибок;
  • логирование работы обработчика.

Валидация входных параметров

Перед выполнением основной логики обработчика необходимо проверить данные, с которыми он будет работать. Обязательно проверить, что:

  1. Заполнены обязательные параметры.
  2. Если в параметре передан ИД сущности, то проверить, что она существует.

Зачем проверять входные параметры, если мы запускаем обработчик из этой же сущности и уверены на момент запуска, что они валидны?

  • обработчик может уйти на переповтор (например, из-за блокировки), а за это время сущность могли удалить или изменить;
  • в будущем другие разработчики могут переиспользовать обработчик в других местах.

Важно. Через параметры обработчика нельзя передавать большие объекты в строковых параметрах (например, файлы или большие строки), т.к. это приведет к большому потреблению ресурсов на сервисе Worker, в RabbitMQ, и сообщения будут занимать много места в таблице отложенных операций.

Период повтора и максимальное количество повторений

Наиболее частые причины отправки асинхронных обработчиков на повтор:

  1. Блокировка сущности, которую обработчик должен изменить.
  2. Логика должна выполниться только после наступления определенного события (например, дождаться параллельного создания сущностей, которые нужно использовать в текущем обработчике).
  3. Недоступность или ошибка внешнего сервиса, если она не является критичной и можно отправить обработчик на повтор (например, при рассылке писем по эл. почте почтовый сервер вернул ошибку из-за превышения лимита отправляемых писем в единицу времени).

Способы задания периода повторов:

  1. В Редакторе обработчика для свойства «Повторять» выбрать значение «С увеличением интервала до часа». В данном случае время каждого повторного запуска будет увеличиваться по экспоненте от пары секунд до часа.
  2. В Редакторе обработчика для свойства «Повторять» выбрать значение «Через равные промежутки времени». В данном случае время каждого повторного запуска будет фиксировано – через столько минут, сколько будет указано в свойстве «Период (в минутах)».
  3. В самом коде обработчика использовать аргумент args.NextRetryTime. В нем программно задавать необходимую дату и время следующего запуска. Имеет приоритет над настроенным периодом повтора по умолчанию.

Период повтора обработчика выбирается в зависимости от критичности вычислений и вероятной частоты появления причин, из-за которых он может уйти на повтор. Если критично, чтобы он выполнился как можно раньше, то лучше использовать 2-й и 3-й способ с небольшим периодом. Если понимаем, что обработчик может отправить на повтор только блокировка, которая будет снята в момент запуска, то можно использовать 1-й способ, т.к. он будет быстрее. Если к времени выполнения нет жестких требований, то лучше использовать 1-й способ.

Важно. Для любого асинхронного обработчика необходимо ограничивать количество повторений, чтобы избежать бесконечный цикл перезапусков при ошибках. Начиная с версии 4.8, в редакторе асинхронных обработчиков появилась возможность настраивать это количество через свойство «Максимальное количество повторов». Если версия ниже 4.8 или при превышении максимального количества повторов необходимо отправить уведомление ответственному или администратору, то ограничение вводим в самом коде обработчика, проверяя свойство args.RetryIteration:

if (args.RetryIteration > Constants.Module.MaxAsyncHandlersRetryIteration)
{
  PublicFunctions.Module.SendNoticeToResponsible(subject, text, responsible);
  return;
}

Обработка блокировок и прочих ошибок

Для чего нужна обработка блокировок?

  1. Если не поставить блокировку, сущность может быть заблокирована другим процессом или измениться в другой транзакции, что приведет к ошибке при сохранении.
  2. Блокировка -- некритичная ошибка, на которую не всегда надо обращать внимание. Без обработки блокировки будут в топе ошибок Мониторинга и отвлекать от других, более серьезных, ошибок.

Обрабатываем, пытаясь поставить блокировку с Locks.TryLock(entity). Если не удалось, не продолжаем логику и отправляем обработчик на переповтор. Не рекомендую Locks.GetLockInfo(entity) – он получает информацию о текущем состоянии блокировки и не гарантирует, что дальше никто не успеет заблокировать.

Важно. При установке блокировки через TryLock или Lock блокировка не снимается автоматически, ее нужно снимать программно через Locks.Unlock. Делать это нужно в любом случае, произошла ошибка или нет. Поэтому всю логику надо обернуть в try, обработать ошибки в catch, а снимать блокировку в finally.

Пример логики с обработкой ошибок.

// Пытаемся установить блокировку на сущность, которую необходимо изменить.
if (!Locks.TryLock(entity))
{
  Logger.DebugFormat("AsyncHandlerName. SomeEntity with id = {0} is locked. Sent to retry", entity.Id);
  args.Retry = true;
  return;
}
      
try
{  
  // Логика изменения сущностей.
}
catch (Exception ex)
{
  Logger.ErrorFormat("AsyncHandlerName. <Полезная информация для анализа ошибки>", ex, <полезная информация>);
  // Отправляем обработчик на переповтор, если есть такая необходимость.
  args.Retry = true;
}
finally
{
  // Снимаем блокировку.
  Locks.Unlock(entity);
}

Логирование

Для чего нужно логировать?

Для упрощения процесса анализа ошибок или некорректного выполнения логики. Логирование позволяет ответить на вопросы, что происходило, когда и при каких обстоятельствах. Без логов сложно понять, из-за чего возникает ошибка, если она возникает периодически и только при определенных условиях.

Минимальный набор информации, который необходимо логировать:

  • факты запуска и завершения обработчика и его основных этапов;
  • с какими аргументами был запущен обработчик. Фиксируем только значимые аргументы, которые влияют на логику вычислений – ИД сущностей, даты, значения перечислений. Не фиксируем конфиденциальную информацию;
  • ИД записей, документов или пользователей, связанных с логируемым событием;
  • результат работы процесса и его основных этапов;
  • имя обработчика вместе с каждым сообщением в лог (не обязательно при указании имени через метод WithLogger).

Универсальный шаблон асинхронного обработчика

И теперь, исходя из всех описанных выше моментов, можно составить некий «эталонный» АО:

public virtual void AsyncHandlerName(MyCode.MyModule.Server.AsyncHandlerInvokeArgs.AsyncHandlerNameInvokeArgs args)
{
  // В лог добавляем все параметры, которые могут помочь в анализе при возникновении ошибок.
  var logPostfix = string.Format("SomeId = '{0}', SomeValue - '{1}'", args.SomeId, args.SomeValue);
  Logger.DebugFormat("AsyncHandlerName. Start. {0}" logPostfix);
  
  // Если обработчик превысил максимальное число итераций запуска, то отправляем уведомление ответственному и завершаем выполнение.
  if (args.RetryIteration > Constants.Module.MaxAsyncHandlersRetryIteration)
  {
    PublicFunctions.Module.SendNoticeToResponsible(subject, text, responsible);
    return;
  }
  
  // Получаем все необходимые сущности и валидируем их.
  var entity = SomeEntities.GetAll(x => x.Id == args.SomeId).FirstOrDefault();
  
  if (entity == null)
  {
    Logger.ErrorFormat("AsyncHandlerName. SomeEntity with id = {0} not found.", args.SomeId);
    return;
  }
  
  // Пытаемся заблокировать необходимую сущность.
  if (!Locks.TryLock(entity))
  {
    // При неудачной попытке делаем запись в лог и отправляем обработчик на повтор.
    Logger.DebugFormat("AsyncHandlerName. SomeEntity with id = {0} is locked. Sent to retry", args.SomeId);
    args.Retry = true;
    return;
  }
  
  try
  {
    // Логика изменения сущности.
    Logger.DebugFormat("AsyncHandlerName. SomeEntity with id = {0} updated", args.SomeId);
  }
  catch (Exception ex)
  {
    // В случае возникновения ошибки пишем сообщение в лог с уровнем Error и логируем полный стек ошибки.
    Logger.ErrorFormat("AsyncHandlerName. An error occured. SomeEntityId – {0}, <прочая полезная для анализа информация> – {1}", ex, <ИД сущности, на которой произошла ошибка>, <полезная информация>);
  
    // Отправляем уведомление ответственному, если есть такая необходимость.
    PublicFunctions.Module.SendNoticeToResponsible(subject, text, responsible);
    
    // Отправляем АО на переповтор, если есть такая необходимость.
    args.Retry = true;
  
  }
  finally
  {
    // Снимаем блокировку с сущности.
    Locks.Unlock(entity);
  }
      
  // Логируем завершение работы обработчика.
  Logger.DebugFormat("AsyncHandlerName. Finish. {0}" logPostfix);
}

Поиск и очистка зациклившихся асинхронных обработчиков

Как найти обработчики, которые постоянно уходят на повтор?

Использовать для поиска таблицу sungero_system_delayedoperation. В ней хранится список отложенных операций, которые позднее попадут в RabbitMQ, а из него – на выполнение в нужный сервис. Нас интересует, что сюда попадают обработчики при возникновении прикладных ошибок.

В столбце Headers в формате JSON хранится основная информация об отложенной операции. Для обработчика нас интересуют поля:

  • QueueName - Имя очереди, для обработчика эквивалентно его идентификатору (можно взять из поля "Идентификатор" в редакторе обработчика в DDS);
  • Iteration - Номер итерации запуска обработчика.

Поиск зациклившихся обработчиков в PostgreSQL:

select *
from sungero_system_delayedoperation
where (headers::json->>'QueueName') = '<Идентификатор АО>'
and (headers::json->>'Iteration')::int > 100

Поиск зациклившихся обработчиков в MSSQL (начиная с версии SQL Server 2016):

select *
from Sungero_System_DelayedOperation
where JSON_VALUE(Headers, '$.QueueName') = '<Идентификатор АО>'
and JSON_VALUE(Headers, '$.Iteration') > 100

Данный запрос найдет все отложенные экземпляры искомого обработчика с количеством итераций повтора более 100.

Поиск зациклившихся обработчиков в MSSQL до SQL Server 2016 с помощью like-поиска (к сожалению, без проверки количества итераций):

select *
from Sungero_System_DelayedOperation
where Headers like '%<Идентификатор АО>%'

Если необходимо искать зациклившиеся обработчики только с определенными аргументами запуска, то нас интересует столбец Message. В нем в бинарном виде хранится JSON с аргументами запуска.

Пример хранимой информации:

{
  "$type": "Sungero.Agent.Api.AsyncHandlerMessage, Sungero.Agent.AsyncHandlers",
  "Id": "70c9e8cf-633e-4a77-8a31-0bcb803a4d14",
  "AsyncHandlerId": "767ae28b-d043-48b3-9e31-e54c288bde75",
  "Args": {
	"$type": "Sungero.Docflow.Server.AsyncHandlerInvokeArgs.GrantAccessRightsToDocumentInvokeArgs, Sungero.Domain.Interfaces",
	"GrantRightToChildDocuments": true,
	"RuleId": 167,
	"DocumentId": 14445190,
	"RetryIteration": 216
  }
}

Для получения или вставки json-строки необходимо соответственно раскодировать или закодировать данное поле.

Пример для PostgreSQL:

encode(message, 'escape') --раскодировать из binary в текст
decode('<json-строка>', 'escape')  --закодировать текст в binary

Пример для MSSQL:

CAST(Message as VARCHAR(max))  --раскодировать из binary в текст
CAST('<json-строка>' AS VARBINARY(MAX))  --закодировать текст в binary

Пример поиска зациклившихся обработчиков по выдаче прав на документ с ИД 5000 (PostgreSQL):

select * 
  from sungero_system_delayedoperation d 
  where (d.headers::json->>'QueueName') = '767ae28b-d043-48b3-9e31-e54c288bde75' 
  and (d.headers::json->>'Iteration')::int > 50
  and (encode(d.message, 'escape')::json ->'Args'->>'DocumentId')::int = 5000

У найденных проблемных асинхронных обработчиков необходимо:

  1. Исправить логику, которая приводит к бесконечному циклу перезапусков.
  2. Добавить ограничение на количество перезапусков.

Статья про то, что всегда должно быть под рукой. Особенно раздел "Поиск и очистка зациклившихся асинхронных обработчиков" прямо то, что нужно

Авторизуйтесь, чтобы написать комментарий