Почти каждый разработчик, работавший с Directum RX, сталкивался с асинхронными обработчиками. Несмотря на индивидуальность задач обработчиков, у большинства есть общие черты и проблемы.
Цель статьи - собрать лучшие практики разработки асинхронных обработчиков, чтобы минимизировать ошибки и упростить их анализ. Так же рассмотрим способ поиска зацикленных обработчиков.
Что нужно учесть при разработке асинхронного обработчика:
Перед выполнением основной логики обработчика необходимо проверить данные, с которыми он будет работать. Обязательно проверить, что:
Зачем проверять входные параметры, если мы запускаем обработчик из этой же сущности и уверены на момент запуска, что они валидны?
Важно. Через параметры обработчика нельзя передавать большие объекты в строковых параметрах (например, файлы или большие строки), т.к. это приведет к большому потреблению ресурсов на сервисе Worker, в RabbitMQ, и сообщения будут занимать много места в таблице отложенных операций.
Наиболее частые причины отправки асинхронных обработчиков на повтор:
Способы задания периода повторов:
Период повтора обработчика выбирается в зависимости от критичности вычислений и вероятной частоты появления причин, из-за которых он может уйти на повтор. Если критично, чтобы он выполнился как можно раньше, то лучше использовать 2-й и 3-й способ с небольшим периодом. Если понимаем, что обработчик может отправить на повтор только блокировка, которая будет снята в момент запуска, то можно использовать 1-й способ, т.к. он будет быстрее. Если к времени выполнения нет жестких требований, то лучше использовать 1-й способ.
Важно. Для любого асинхронного обработчика необходимо ограничивать количество повторений, чтобы избежать бесконечный цикл перезапусков при ошибках. Начиная с версии 4.8, в редакторе асинхронных обработчиков появилась возможность настраивать это количество через свойство «Максимальное количество повторов». Если версия ниже 4.8 или при превышении максимального количества повторов необходимо отправить уведомление ответственному или администратору, то ограничение вводим в самом коде обработчика, проверяя свойство args.RetryIteration:
if (args.RetryIteration > Constants.Module.MaxAsyncHandlersRetryIteration)
{
PublicFunctions.Module.SendNoticeToResponsible(subject, text, responsible);
return;
}
Для чего нужна обработка блокировок?
Обрабатываем, пытаясь поставить блокировку с Locks.TryLock(entity). Если не удалось, не продолжаем логику и отправляем обработчик на переповтор. Не рекомендую Locks.GetLockInfo(entity) – он получает информацию о текущем состоянии блокировки и не гарантирует, что дальше никто не успеет заблокировать.
Важно. При установке блокировки через TryLock или Lock блокировка не снимается автоматически, ее нужно снимать программно через Locks.Unlock. Делать это нужно в любом случае, произошла ошибка или нет. Поэтому всю логику надо обернуть в try, обработать ошибки в catch, а снимать блокировку в finally.
Пример логики с обработкой ошибок.
// Пытаемся установить блокировку на сущность, которую необходимо изменить.
if (!Locks.TryLock(entity))
{
Logger.DebugFormat("AsyncHandlerName. SomeEntity with id = {0} is locked. Sent to retry", entity.Id);
args.Retry = true;
return;
}
try
{
// Логика изменения сущностей.
}
catch (Exception ex)
{
Logger.ErrorFormat("AsyncHandlerName. <Полезная информация для анализа ошибки>", ex, <полезная информация>);
// Отправляем обработчик на переповтор, если есть такая необходимость.
args.Retry = true;
}
finally
{
// Снимаем блокировку.
Locks.Unlock(entity);
}
Для чего нужно логировать?
Для упрощения процесса анализа ошибок или некорректного выполнения логики. Логирование позволяет ответить на вопросы, что происходило, когда и при каких обстоятельствах. Без логов сложно понять, из-за чего возникает ошибка, если она возникает периодически и только при определенных условиях.
Минимальный набор информации, который необходимо логировать:
И теперь, исходя из всех описанных выше моментов, можно составить некий «эталонный» АО:
public virtual void AsyncHandlerName(MyCode.MyModule.Server.AsyncHandlerInvokeArgs.AsyncHandlerNameInvokeArgs args)
{
// В лог добавляем все параметры, которые могут помочь в анализе при возникновении ошибок.
var logPostfix = string.Format("SomeId = '{0}', SomeValue - '{1}'", args.SomeId, args.SomeValue);
Logger.DebugFormat("AsyncHandlerName. Start. {0}" logPostfix);
// Если обработчик превысил максимальное число итераций запуска, то отправляем уведомление ответственному и завершаем выполнение.
if (args.RetryIteration > Constants.Module.MaxAsyncHandlersRetryIteration)
{
PublicFunctions.Module.SendNoticeToResponsible(subject, text, responsible);
return;
}
// Получаем все необходимые сущности и валидируем их.
var entity = SomeEntities.GetAll(x => x.Id == args.SomeId).FirstOrDefault();
if (entity == null)
{
Logger.ErrorFormat("AsyncHandlerName. SomeEntity with id = {0} not found.", args.SomeId);
return;
}
// Пытаемся заблокировать необходимую сущность.
if (!Locks.TryLock(entity))
{
// При неудачной попытке делаем запись в лог и отправляем обработчик на повтор.
Logger.DebugFormat("AsyncHandlerName. SomeEntity with id = {0} is locked. Sent to retry", args.SomeId);
args.Retry = true;
return;
}
try
{
// Логика изменения сущности.
Logger.DebugFormat("AsyncHandlerName. SomeEntity with id = {0} updated", args.SomeId);
}
catch (Exception ex)
{
// В случае возникновения ошибки пишем сообщение в лог с уровнем Error и логируем полный стек ошибки.
Logger.ErrorFormat("AsyncHandlerName. An error occured. SomeEntityId – {0}, <прочая полезная для анализа информация> – {1}", ex, <ИД сущности, на которой произошла ошибка>, <полезная информация>);
// Отправляем уведомление ответственному, если есть такая необходимость.
PublicFunctions.Module.SendNoticeToResponsible(subject, text, responsible);
// Отправляем АО на переповтор, если есть такая необходимость.
args.Retry = true;
}
finally
{
// Снимаем блокировку с сущности.
Locks.Unlock(entity);
}
// Логируем завершение работы обработчика.
Logger.DebugFormat("AsyncHandlerName. Finish. {0}" logPostfix);
}
Как найти обработчики, которые постоянно уходят на повтор?
Использовать для поиска таблицу sungero_system_delayedoperation. В ней хранится список отложенных операций, которые позднее попадут в RabbitMQ, а из него – на выполнение в нужный сервис. Нас интересует, что сюда попадают обработчики при возникновении прикладных ошибок.
В столбце Headers в формате JSON хранится основная информация об отложенной операции. Для обработчика нас интересуют поля:
Поиск зациклившихся обработчиков в PostgreSQL:
select *
from sungero_system_delayedoperation
where (headers::json->>'QueueName') = '<Идентификатор АО>'
and (headers::json->>'Iteration')::int > 100
Поиск зациклившихся обработчиков в MSSQL (начиная с версии SQL Server 2016):
select *
from Sungero_System_DelayedOperation
where JSON_VALUE(Headers, '$.QueueName') = '<Идентификатор АО>'
and JSON_VALUE(Headers, '$.Iteration') > 100
Данный запрос найдет все отложенные экземпляры искомого обработчика с количеством итераций повтора более 100.
Поиск зациклившихся обработчиков в MSSQL до SQL Server 2016 с помощью like-поиска (к сожалению, без проверки количества итераций):
select *
from Sungero_System_DelayedOperation
where Headers like '%<Идентификатор АО>%'
Если необходимо искать зациклившиеся обработчики только с определенными аргументами запуска, то нас интересует столбец Message. В нем в бинарном виде хранится JSON с аргументами запуска.
Пример хранимой информации:
{
"$type": "Sungero.Agent.Api.AsyncHandlerMessage, Sungero.Agent.AsyncHandlers",
"Id": "70c9e8cf-633e-4a77-8a31-0bcb803a4d14",
"AsyncHandlerId": "767ae28b-d043-48b3-9e31-e54c288bde75",
"Args": {
"$type": "Sungero.Docflow.Server.AsyncHandlerInvokeArgs.GrantAccessRightsToDocumentInvokeArgs, Sungero.Domain.Interfaces",
"GrantRightToChildDocuments": true,
"RuleId": 167,
"DocumentId": 14445190,
"RetryIteration": 216
}
}
Для получения или вставки json-строки необходимо соответственно раскодировать или закодировать данное поле.
Пример для PostgreSQL:
encode(message, 'escape') --раскодировать из binary в текст
decode('<json-строка>', 'escape') --закодировать текст в binary
Пример для MSSQL:
CAST(Message as VARCHAR(max)) --раскодировать из binary в текст
CAST('<json-строка>' AS VARBINARY(MAX)) --закодировать текст в binary
Пример поиска зациклившихся обработчиков по выдаче прав на документ с ИД 5000 (PostgreSQL):
select *
from sungero_system_delayedoperation d
where (d.headers::json->>'QueueName') = '767ae28b-d043-48b3-9e31-e54c288bde75'
and (d.headers::json->>'Iteration')::int > 50
and (encode(d.message, 'escape')::json ->'Args'->>'DocumentId')::int = 5000
У найденных проблемных асинхронных обработчиков необходимо:
Статья про то, что всегда должно быть под рукой. Особенно раздел "Поиск и очистка зациклившихся асинхронных обработчиков" прямо то, что нужно
Авторизуйтесь, чтобы написать комментарий