Обработка большого объема данных на службе обработки событий

19 0

Ранее уже публиковалась статья, в которой описывался один из возможных методов обработки большого объема данных. Суть метода — перекладывать обработку большого объема данных с клиента на службу Workflow. В версии 5.4 появилась служба обработки событий, позволяющая выносить вычисления с клиента в асинхронную обработку на сервере. При обработке большого объема данных службу обработки событий можно использовать в качестве альтернативы Workflow.

Реализация

С реализацией все достаточно просто: 

  1. Создаем серверное событие.
  2. Выносим вычисления, в которых происходит обработка большого объема данных, в сценарий-обработчик серверного события.
  3. В месте разработки, где ранее использовались трудоемкие вычисления:
  • получаем серверное событие по имени;
  • заполняем параметры серверного события;
  • запускаем серверное событие.
ServerEvent = ServerEvents.GetObjectByName('ServerEvent')
Index = ServerEvent.Params.IndexOfName('Param') 
if Index > -1
  ServerEvent.Params.Values(Index) = <значение параметра>
endif
ServerEvent.Start  

При работе со службой обработки событий нужно учитывать такие особенности, как таймаут, рекурсивный вызов, режим транзакций и приоритет выполнения сценариев-обработчиков.

Таймаут работы службы обработки серверных событий

Здесь ситуация аналогична службе Workflow: у службы обработки событий также есть параметр Таймаут (TimeOut), в котором хранится значение времени (в минутах), по истечении которого служба обработки событий прекращает выполнение сценария-обработчика. Разница только в том, что в компоненте "Серверные события" таймаут можно указать конкретно для каждого сценария-обработчика. В том случае, если таймаут не заполнен в серверном событии, будет использоваться значение настройки TimeOut из конфигурационного файла SBEventProcessingSrvSettings.xml.

Поскольку принцип работы по таймауту принципиально не отличается от Workflow, получаем такое же ограничение по количеству данных, которые можно обработать за определенный промежуток времени. А значит и варианты решения, описанные для обработки данных на Workflow, должны работать и на службе обработки событий:

  1. Обработка заранее определенного количества данных, обработка которого заведомо и гарантированно укладывается в таймаут обработчика.
  2. Отслеживание времени, оставшегося до истечения таймаута, в процессе обработки.

И вроде бы все хорошо, но возникает вопрос: а как заставить сценарий, обрабатывающий данные, вызываться циклично, до тех пор, пока все данные не обработаются? На Workflow эту функцию выполнял блок с типом Мониторинг. Очевидно, нужно логику работы блока "Мониторинг" перенести в наш сценарий. Так же как в блоке "Мониторинг", обрабатываем данные, а затем проверяем, остались ли еще необработанные данные. Если в текущей итерации обработаны не все данные, то нужно просто сгенерировать некритичное исключение, которое приведет к повторному добавлению обработчика серверного события в очередь, либо рекурсивно стартуем текущее серверное событие (в чем разница, расскажу далее).

Рассмотрим более подробно второй вариант реализации — обработка динамически изменяемого количества данных для службы серверных событий, так как в нем будут небольшие отличия от обработки на Workflow:

Получение таймаута серверного события

Как я уже писала ранее, служба сначала будет искать параметр Timeout сценария-обработчика, указанный в компоненте "Серверные события", и только в случае, если там значение не указано, брать его из конфигурационного файла службы серверных событий. Поэтому получать значение таймаута будем получать следующим образом:

  try
    // получить из константы минимальное значение таймаута, можно брать константу для хранения таймаута WF
    ConstWFTimeOut = GetConstant('WFTimeOut')
    // если константа не установлена, взять зашитое в коде значение
    if not Assigned(ConstWFTimeOut)
      ConstWFTimeOut = 600
    endif
    // получить таймаут сценария в секундах
    ScriptTimeOut = SQL(Format("  select 
                                    ServerEventTimeOut
                                  from 
                                    dbo.MBReports 
                                  where 
                                    NameRpt = '%s'"; ScriptName))
    if ScriptTimeOut <<>> ''
      Result = ScriptTimeOut * 60
    else
      // получить таймаут из конфигурационного файла
      CorePath = Application.Connection.SystemInfo.CorePath
      XML = CorePath & 'SBEventProcessingSrvSettings.xml'
      if FileExists(XML)
        XmlDoc = CreateObject("MSXml2.DomDocument")
        XmlDoc.Load(XML)
        SettingsNode = XmlDoc.GetElementsByTagName("Settings")
        SettingsItem = SettingsNode.item(0)
        if Assigned(SettingsItem)
          SETime = SettingsItem.getAttribute("Timeout")
          if Assigned(SETime)
            SETime = Replace(SETime; ' '; '')
            if Assigned(SETime)
              Result = SETime
            endif
          endif
        endif  
      endif
    endif
    
    if not Assigned(Result)
      Result = ConstWFTimeOut
    endif
  except
   // в случае если где-то выше произошли ошибки, взять значение зашитое в коде
    Result = 600
  endexcept

ScriptName в данном случае — это имя сценария-обработчика.

Организация цикла обработки внутри блока

  // получить время начала работы блока
  StartTime = Time()
  Result = TRUE
  // получить значение таймаута серверного события функцией, описанной выше
  SETime = GetSETimeOut()
  // константа DELAY_TIME содержит в себе страховочное время. Данное время необходимо для операций, которые будут выполняться после основной обработки. Очистка переменных, запись в лог и т.д.
  if ConstantExists('DELAY_TIME')
    DelayTime = GetConstant('DELAY_TIME')
    if not Assigned(DelayTime)
      Exception = CreateException('Error'; 'не заполнена константа DELAY_TIME'; ecWarning)
      Raise(Exception)
    endif
  else
    Exception = CreateException('Error'; 'не найдена константа DELAY_TIME'; ecWarning)
    Raise(Exception)
  endif
  <получить еще не обработанные данные>
  // проверить что у нас еще есть данные для обработки
  if RecordCount > 0
    foreach Record in Reference
      // перед началом обработки каждой записи, проверить сколько времени у нас еще есть и успеваем ли мы обработать запись
      NowTime = Time()
      // получить разницу между временем старта работы блока и текущим временем
      ServerEventDiff = TimeDiff('S'; StartTime; NowTime)
      // вычесть из таймаута серверного события время, необходимое на сопутствующие операции
      TimeDiff = SETime - DelayTime
      // проверяем есть ли у нас еще время на обработку записи
      if ServerEventDiff < TimeDiff
        <обработка одной записи данных>
      else
        // если времени нет, завершить работу итерации блока и означить переменную Result как FALSE, что бы заново поставить в очередь серверное событие.
        Result = FALSE
        exitfor
      endif
    endforeach
  endif

  if not Result
    // сгенерировать некритичное исключение, которое перевставит событие в очередь
    Raise(CreateException(REPEAT_PROCESS_CURRENT_OBJECT_EXCEPTION_NAME; ExceptionMessage; ecWarning))
  endif

Рекурсивный вызов серверного события

Итак, мы уже знаем, что в случае возникновения некритичной ошибки в ходе обработки сценария, он будет переставлен повторно в очередь. Неприятность Особенность состоит в том, что сценарий будет перевставлен в очередь с теми же значениями параметров, что и при старте. Мы не можем из сценария управлять параметрами текущего серверного события. Но у нас есть потребность в циклической обработке сценария, в котором, скорее всего, какие-то параметры будут менять свое значение, и при следующей итерации обработки нам нужно передать в сценарий уже новое значение этих параметров. Для того, чтобы на службе обработки событий поставить в очередь сценарий с новыми значениями параметров, нужно из самого сценария вместо генерации некритичного исключения рекурсивно стартовать текущее серверное событие с новыми значениями параметров.

Здесь следует понимать разницу: при рекурсивном старте серверного события в очередь будут поставлены все добавленные в него сценарии-обработчики, а в случае возникновения некритичных ошибок повторно в очередь будет переставлен только тот обработчик, при выполнении которого произошло некритичное исключение.

Режим транзакций службы обработки серверных событий

Как мы помним, при обработке на Workflow всё вычисление блока-мониторинга выполняется в общей транзакции, которая и откатывается целиком в случае ошибки. В отличие от службы Workflow при выполнении обработчика серверного события общей транзакции нет, каждая команда, изменяющая данные в базе, самостоятельно начинает и фиксирует свою транзакцию так, как это происходит при работе вычислений на клиенте. Если в вычислениях обработчика происходит изменение объекта (например, записи справочника или документа), необходимо учитывать, что при возникновении критичной или некритичной ошибки во время обработки сценария, отката изменений не произойдет.

Если объединение вычислений в транзакцию с возможностью ее отката все-таки необходимо, то реализовать это можно, используя методы управления транзакциями объекта IConnection: CommitTransaction, RollbackTransaction, StartTransaction.

Приоритет выполнения сценариев-обработчиков

Для сценария можно задать приоритет его выполнения. Служба обработки событий берет сценарии в обработку в порядке приоритетов. Чем больше значение приоритета, тем фактический приоритет выше. Используя приоритеты, можно сделать так, чтобы критичный для бизнеса функционал обрабатывался в первую очередь. Но следует учитывать, что каждый обрабатывающий процесс независимо от других выполняет по одному сценарию серверного события. Может возникнуть ситуация, при которой сценарии одного серверного события, несмотря на разные приоритеты, будут обрабатываться одновременно разными процессами. Поэтому в одно серверное событие нужно добавлять независящие друг от друга сценарии. Если требуется последовательное выполнение сценариев, то следует вынести сценарии в разные серверные события, вызывая следующее серверное событие непосредственно в сценарии после необходимых вычислений, либо реализовать последовательную логику в одном обработчике, если предполагаемая длительность обработки не будет слишком большой.

Преимущества использования службы обработки событий

Главное преимущество использования службы обработки событий для обработки большого количества данных, по сравнению с использованием службы Workflow - это отказ от старта служебных задач. По опыту, количество служебных задач может достигать 30% от общего количества задач в системе. Совершенно очевидно, что такое количество служебных задач не может не влиять на оперативность обработки бизнес-логики. Перенос служебных задач на службу обработки событий дает нам:

  1. Высвобождение ресурсов службы Workflow для обработки задач бизнес-процессов по согласованию документов, исполнению поручений и так далее.
  2. Уменьшение скорости роста размера базы данных за счет уменьшения количества задач и их схем, хранящихся в системе.
Пока комментариев нет.

Авторизуйтесь, чтобы написать комментарий