Обработка большого объема данных при помощи Workflow

15 13

Назначение

В данной статье описывается один из возможных методов обработки большого объема данных.

Анализ

Рассмотрим данную задачу на двух примерах: интеграция со сторонними системами и процесс, подразумевающий запуск большого количества параллельных задач.

Интеграция со сторонней системой средствами веб-сервисов

Объем данных, передаваемый из интегрируемой системы, может быть различным, от нескольких записей, до нескольких тысяч записей. Для обработки принятых данных, веб-сервис запускает прикладной сценарий, реализующий прикладную логику. Существует особенность, пока веб-сервис обрабатывает прикладной сценарий, веб-сервис будет недоступен для обращения. Таким образом, для передачи нескольких пакетов данных в DIRECTUM может потребоваться очень много времени.

Запуск неограниченного количества задач

Допустим, существует определенное действие пользователя, по которому в системе должно создаваться N задач по исполнению поручения по документу, при этом должны происходить какие то действия с объектами системы (выдача прав на документ, изменение реквизитов и т.д.). N – может быть очень большим числом, например вышел приказ по рабочим дням на предприятии и нужно с данным приказом ознакомить всех автоматизированных руководителей или их делопроизводителей. В данном случае количество исполнителей по задачам может быть очень большим (в зависимости от масштаба предприятия) до нескольких сотен. Если реализовывать создание задач на клиенте у пользователя, то просто создание задач будет занимать минуты. Если же дополнительно реализуется сложная прикладная логика по выдаче прав, заполнению реквизитов и т.д., время создания задач может увеличится до десятков минут. Подобное время выполнения операции будет неприемлемым для пользователя.

Реализация

Один из вариантов реализации таких задач – перекладывание непосредственной обработки на Workflow. При этом потребуется только запустить задачу по жесткому типовому маршруту, что является достаточно быстрой операцией.

Обработка большого объема данных на Workflow имеет ряд особенностей:

  1. Таймаут работы Workflow
  2. Режим транзакций обработки данных на Workflow
  3. Неравномерная загрузка процессов Workflow

Таймаут работы Workflow

Таймаут работы workflow - Максимальное время работы процесса над одной задачей. По истечении этого времени процесс считается «зависшим» и будет автоматически перезапущен.

Таймаут работы workflow накладывает существенное ограничение на количество данных, которые мы можем обработать. Максимальное время, которое может работать блок сценарий, равно 60 минутам (соответствует установке в конфиге WF таймаута 3600). В случае обработки нескольких тысяч записей, этого может быть мало (в среднем обработка одной записи 5 сек, обработка 1000 записей при этом будет 80 минут.). К тому же крайне не рекомендуется устанавливать максимальное значение таймаута.

Данное ограничение можно обойти, организовав циклическую обработку блока, обрабатывающего данные. Самый простой способ организовать данный цикл – использовать блок с типом мониторинг. Саму обработку данных в блоке можно организовать следующими вариантами:

  1. Обработка заранее определенного количества данных
  2. Обработка динамически изменяемого количества данных

Обработка заранее определенного количества данных

Данный способ подразумевает, что за одну итерацию работы прикладного блока, будет обрабатываться то количество данных, которое задано (например, за одну итерацию обрабатывается только 100 записей). Количество обрабатываемых данных можно задавать при помощи константы.

Минусы решения:

  1. Количество данных, которые могут обработаться за итерацию, сложно получить. Время обработки данных сильно зависит от загрузки сервера, в один момент времени на обработку одной записи может требоваться 5 секунд, в следующий уже 7 секунд, при этом есть вероятность что блок просто не успеет выполнить обработку, заложенного количества данных.
  2. Служба Workflow может быть распараллелена. При этом несколько служб Workflow будут располагаться на нескольких ЭВМ и у каждой службы будет своя настройка. В данном случае при расчете максимально возможного количества обрабатываемых данных, нужно закладывать минимальный таймаут из всех настроек.

Плюсы решения:

  1. Простота реализации

Обработка динамически изменяемого количества данных

При данном варианте необходимо динамически вычислять какое количество данных мы еще можем "успеть" обработать. Ключевые моменты при этом:

  1. Получение значения таймаута workflow непосредственно из файла конфигурации. Реализуется это следующим образом:
  try
    // Получаем из константы минимальное значение таймаута. Оно зарание устанавливается
    ConstWFTimeOut = GetConstant('WFTimeOut')
    if not Assigned(ConstWFTimeOut)
      // Если константа не установлена, берем зашитое в коде значение
      // в данном месте необходимо руками указать то значение, которое будет использоваться у заказчика
      ConstWFTimeOut = 600
    endif
    // получаем значение таймаута из настройки
    CorePath = Application.Connection.SystemInfo.CorePath
    WFXML = CorePath & 'SBWorkflowSrvSettings.xml'
    if FileExists(WFXML)
      XmlDoc = CreateObject("MSXml2.DomDocument")
      XmlDoc.Load(WFXML)
      SettingsNode = XmlDoc.GetElementsByTagName("Settings")
      SettingsItem = SettingsNode.item(0)
      if Assigned(SettingsItem)
        WFTime = SettingsItem.getAttribute("Timeout")
        if Assigned(WFTime)
          WFTime = Replace(WFTime; ' '; '')
          if Assigned(WFTime)
            Result = WFTime
          endif
        endif
      endif 
    endif
    if not Assigned(Result)
      Result = ConstWFTimeOut
    endif
  except
    // в случае если где то выше произошли какие то ошибки по любым причинам, берем значение зашитое в коде
    Result = 600
  endexcept

Данный код имеет большое количество "перестраховок", т.к. он будет основополагающим в решении и код должен быть максимально стабильным.

2.    Организация цикла обработки внутри блока.

  // Получаем время начала работы блока
  StartTime = Time()
  Result = TRUE
  // получаем значение таймаута
  WFTime = GetWFTimeOut()
  // константа DELAY_TIME содержит в себе страховочное время. Данное время необходимо для операций, которые будут выполняться после основной обработки. Очистка переменных, запись в лог и т.д.
  if ConstantExists('DELAY_TIME')
    DelayTime = GetConstant('DELAY_TIME')
    if not Assigned(DelayTime)
      Exception = CreateException('Error'; 'не заполнена константа DELAY_TIME'; ecWarning)
      Raise(Exception)
    endif
  else
    Exception = CreateException('Error'; 'не найдена константа DELAY_TIME'; ecWarning)
    Raise(Exception)
  endif
  <получаем еще не обработанные данные>
  // проверяем что у нас еще есть данные для обработки
  if RecordCount > 0
    foreach Record in Reference
      // перед началом обработки каждой записи, проверяем сколько времени у нас еще есть и успеваем ли мы обработать запись
      NowTime = Time()
      // получаем разницу между временем старта работы блока и текущим временем
      WorkTimeDiff = TimeDiff('S'; StartTime; NowTime)
      // вычитаем из таймаута WF время, необходимое на сопутствующие операции
      TimeDiff = WFTime - DelayTime
      // проверяем есть ли у нас еще время на обработку записи
      if WorkTimeDiff < TimeDiff
        <обработка одной записи данных>
      else
        // если времени нет, завершаем работу итерации блока и передаем в результат работы блока мониторинг FALSE, что бы начать новую итерацию блока.
        Result = FALSE
        exitfor
      endif
    endforeach
  endif

Минусы решения:

  1. Сложно точно подобрать значение константы 'DELAY_TIME'. Но данная константа не требует большой точности.

Плюсы решения:

  1. Решение универсально и будет работать независимо от настроек Workflow. Для каждой службы автоматически будет определяться то количество данных, которое данная служба успеет обработаться, не зависимо от мощности сервера.
  2. За каждую итерацию обработки будет охватываться максимальное количество данных, т.е. все данные обработаются максимально быстро за счет того, что будет происходить минимальное количество итераций обработки задачи.

Режим транзакций Workflow

Вся обработка данных в блоках типа сценарий и мониторинг происходят в рамках одной транзакции, т.е. если произойдет ошибка, выполниться откат всей транзакции (всех измененных данных в рамках транзакции). Например, мы обрабатываем за одну итерацию блока 100 записей, при изменении 99 записи происходит ошибка, при этом все предыдущие 98 записей так же откатятся. Если в блоке есть подавление исключений и вывод ошибок в прикладной лог файл, это будет означать зацикливание обработки. Блок будет пытаться обработать одну и туже группу данных до тех пор, пока обработка не будет остановлена вручную и будет устранена причина ошибки. В условиях эксплуатации системы и больших объемов данных это недопустимо.

Для того что бы вычисления в блоке происходили не в рамках одной транзакций достаточно все изменения данных, все запросы получения данных, всю работу с объектами системы выполнять в новом соединении (клоне приложения). При работе с клоном приложения все новые запросы к БД будут выполняться в новом соединении вне работы неявной транзакции.

Неравномерная загрузка процессов Workflow

При обработке большого объема данных на Workflow нужно понимать, что процесс WF, обрабатывающий данные, будет занят обработкой длительное время. Если будет запущено столько задач по обработке, сколько процессов workflow, workflow длительное время будет недоступна для обработки других задач. Это нужно учитывать при проектировании обработки. Например, реализована интеграция с HR системой. Произошла реструктуризация предприятия. Из HR системы пришли данные по всем сотрудникам предприятия. При этом данные загружали в DIRECTUM группами, в соответствии с подразделениями. Это означает, что в DIRECTUM запустится столько задач, сколько пакетов (подразделений) было загружено. Если в каждом пакете было по 200 сотрудников, мы получим что WF будет недоступна для других задач на протяжении часа.

Один из вариантов решения данной ситуации - программно регулировать количество одновременно обрабатываемых задач с обработкой данных. Регулировать количество одновременно работающих задач можно при помощи двух блоков в начале схемы задачи:

  1. Блок сценарий, который вычисляет сколько и какие задачи по определенным ТМ сейчас находятся в обработке и записывает их в параметр задачи
  2. Блок мониторинг с типом мониоринга "Список зависимостей" и списком зависимостей из параметра в п.1.
Михаил Сергеев

Задам вопрос моего коллеги.

Почему просто не подготовить данные на клиенте, а обработку их выполнять сценарием по расписанию?

Степан Мурашов
Вся обработка данных в блоках типа сценарий и мониторинг происходят в режиме неявных транзакций.

На самом деле не совсем так. Транзакция там вполне себе явная. Служба workflow стартует транзакцию перед началом обработки блока, и подтверждает при успешном завершении обработки блока. Если же при обработке блока возникла ошибка - транзакция откатывается.

Степан Мурашов
except // в случае если где то выше произошли какие то ошибки по любым причинам, берем значение зашитое в коде Result = 600 endexcept

Молчаливое подавление исключений - это очень опасный прием. Особенно в коде, который "должен быть максимально стабильным", как было заявлено.

Код может перестать работать по той или иной причине - и никто никак не сможет узнать об этом.

Я думаю, тут надо как минимум добавить запись ошибки в лог, чтобы администратор мог узнать о наличии проблем, как только они появятся.

Андрей Дозоров
Задам вопрос моего коллеги. Почему просто не подготовить данные на клиенте, а обработку их выполнять сценарием по расписанию?

Такой вариант тоже возможен, но он подойдет не для всех задач. При такой реализации не будет "онлайн" обработки, не получится настроить расписание так, что бы сценарий сразу после подготовки данных начинал их обработку. А если запускать назначенное задание очень часто, сценарий может просто не справиться с объемом.

Андрей Дозоров
Степан Мурашов

Спасибо за уточнения, поправил в статье.

По поводу лога - согласен, но это пример кода, в реальном решении логирование подобного кода это отдельный разговор.

Иван Чурбаков

У нас для некоторых длительных процессов в рамках задач запускается отдельный сценарий (просто sbrte с параметрами запускается), на который не распространяются ограничения службы. А в рамках задачи просто ведется мониторинг его работы, если сценарий упал - запускается снова. В данном случае еще удобно, что изменения данных выполняются не в одной транзакции, а значит блокировки таблиц гораздо более точечные.

Михаил Тарасов
Такой вариант тоже возможен, но он подойдет не для всех задач. При такой реализации не будет "онлайн" обработки, не получится настроить расписание так, что бы сценарий сразу после подготовки данных начинал их обработку. А если запускать назначенное задание очень часто, сценарий может просто не справиться с объемом.

Задача вполне себе решаемая. Один из сценариев у нас запускается каждые 5 минут. Для того, чтобы единовременно не запускалось 2 процесса обработки данных, реализовали простую блокировку на основе файла. В файл написали 1, значит второй процесс, едва начавшись, завершат работу... А в свете вышедшей недавно статьи про мьютексы и WinAPI, реализация становиться ещё проще и надежнее...

Михаил Тарасов

Ещё вариант есть такой: Для тог, что бы сценарий начинал обработку данных сразу, нужно, что бы этот сценарий на сервере кто-то пнул. В вашем варианте, насколько я понял, в этой роли выступает служба WorkFolw. А может выступать, например, простенький HTTP сервер, публикующий одну единственную страницу, в которой производился бы вызов SBRte.... с теми же самыми мьютексами...

Михаил Тарасов

В этом случае, сервером могла бы быть любая машина с "нужным" доменным именем, а не "и так загруженная" машина с WorkFlow

Денис Архипов
Транзакция там вполне себе явная. Служба workflow стартует транзакцию перед началом обработки блока, и подтверждает при успешном завершении обработки блока.

Расположите 2 блока сценария друг за другом. Параметр ТМ типа коллекция записей с несколькими значениями. В первом блоке сценария очищаем коллекцию. Во втором блоке проверяем количество записей в ней. Оно не меняется. А теперь расскажите нам о том в каких таких транзакция выполняется КАЖДЫЙ блок сценария.

В случае если блоки типа Задание, то все работает как-то совсем иначе.

Денис Архипов
Расположите 2 блока сценария друг за другом. Параметр ТМ типа коллекция записей с несколькими значениями. В первом блоке сценария очищаем коллекцию. Во втором блоке проверяем количество записей в ней. Оно не меняется. А теперь расскажите нам о том в каких таких транзакция выполняется КАЖДЫЙ блок сценария. В случае если блоки типа Задание, то все работает как-то совсем иначе.

Этот вопрос таки некому прокомментировать?

Денис Мурашов
Этот вопрос таки некому прокомментировать?

Параметры-коллекции нельзя очистить, нет такого метода в платформе. Есть возможность присвоить значение NULL элементу коллекции, в этом случае оно не сохраняется в схему. При следующем получении задачи из схемы этого значения в коллекции не будет. Но при обработке блоков служба Workflow не всегда выполняет переполучение задачи, она обрабатывает блоки до тех пор, пока еще есть возможность их обработать. Например, последовательно идущие блоки "сценарий" будут обработаны за один сеанс обработки, без переполучения задачи с сервера. Это никак не отменяет того факта, что каждый блок обрабатывается в отдельной транзакции. После обработки первого блока сценарий в транзакции на сервер уйдет схема задачи с пустым параметром-коллекцией. Но служба Workflow продолжит работать с текущим объектом задачи, в котором параметр-коллекция не пуст, а заполнен значениями NULL. Если же встретится блок, который потребует прекратить обработку блоков (например, блок задание, который требует совершения действий пользователем перед началом дальнейшей обработки на службе Workflow), то при последующей обработке (после выполнения задания) параметр-коллекция будет уже пустым.

Денис Архипов

а с какой целью эта информация замалчивается на курсах разработки? что бы каждый сам открыл для себя чудесный внутренний мир службы воркфлоу?

Авторизуйтесь, чтобы написать комментарий