Работа с RabbitMQ в Directum RX

16 4

Интеграции между различными системами и DirectumRX возможны различными способами, наиболее популярными, с точки зрения простоты реализации, являются:

  • файлы;
  • HTTP API.

Также, в зависимости от ситуации, компетенций специалистов заказчика, и особенностей сторонних систем, могут применяться, например, прямые запросы в базы данных, или брокеры сообщений. В этой статье поговорим про последний вариант.

Преимущества использования брокеров сообщений:

  • Асинхронный канал связи (отправленное брокеру сообщение будет сохранено "до востребования", нет необходимости повторять запросы, как, например, с HTTP API);
  • Разделение потоков данных (для данных конкретного типа или назначения выделяется отдельная очередь, и мы точно знаем, что, например, в очереди simple_documents_queue будут только данные по простым документам, и т.д.

Это, пожалуй, основные критерии, по которым следует выбрать брокер сообщений в качестве механизма передачи данных при интеграции.

Особенностью работы объектной модели брокеров сообщений является событийно-ориентированная модель взаимодействия с сервером. Запуская "прослушиватель", нужно передать ему функцию-обработчик, вызываемую при поступлении нового сообщения. Собственно, в Directum RX такая модель работы нереализуема, т.к. при старте прослушивания, функция (фонового процесса, асинхронного или другого обработчика) будет завершена. Можно, конечно, экспериментировать с Thread.Sleep, но это слишком рискованный вариант.

В контексте использования RabbitMQ в обработчиках прикладного кода DirectumRX можно реализовать следующий пайплайн - обработчик подключается, забирает все сообщения из очереди, которые есть там на текущий момент, прослушивание и реакция на события не выполняются. 

Ниже приведен пример кода, позволяющего получить список сообщений (как строки), и отправить сообщение (строку) в очередь.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace RMQClient
{
  public class RabbitClient
  {
    public string host, vhost, userName, password;
    public int port;

    public RabbitClient(string host = "localhost", int port = 5672, string vhost = "/", string userName = "guest", string password = "guest")
    {
      this.host = host;
      this.port = port;
      this.vhost = vhost;
      this.userName = userName;
      this.password = password;
    }
    public List<string> GetMessages(string queue)
    {
      var messages = new List<string>();
      var factory = new ConnectionFactory() { HostName = host, Port = port, VirtualHost = vhost, UserName = userName, Password = password };
      using (var connection = factory.CreateConnection())
      using (var channel = connection.CreateModel())
      {
        channel.QueueDeclare(queue: queue,
                             durable: true,
                             exclusive: false,
                             autoDelete: false,
                             arguments: null);
        var c = channel.MessageCount(queue);
        while (c != 0)
        {
          bool noAck = true;
          BasicGetResult result = channel.BasicGet(queue, noAck);
          if (result != null)
          {
            IBasicProperties props = result.BasicProperties;
            messages.Add(Encoding.UTF8.GetString(result.Body.ToArray()));
          }
          --c;
        }
      }
      return messages;
    }

    public void SendResponse(string message, string exchange, string queue)
    {
      var result = new List<string>();
      var factory = new ConnectionFactory() { HostName = host, Port = port, VirtualHost = vhost, UserName = userName, Password = password };
      using (var connection = factory.CreateConnection())
      using (var channel = connection.CreateModel())
      {
        channel.QueueDeclare(queue: queue,
                             durable: true,
                             exclusive: false,
                             autoDelete: false,
                             arguments: null);

        var body = Encoding.UTF8.GetBytes(message);

        channel.BasicPublish(exchange: exchange,
                             routingKey: "",
                             basicProperties: null,
                             body: body);
      }
    }
  }
}

Для полноценной интеграции в процессы остается только доработать сериализацию\десериализацию сообщений в формат данных, используемый сторонней системой.

16
Авторизуйтесь, чтобы оценить материал.
5
Андрей Почупайло

Отличная идея, странно что все проходили мимо нее.. Это универсальное решение.

А вот это:

... прослушивание и реакция на события не выполняются. 

можно реализовать через чат-бот например.

Чат-бот сервер будет "прослушивателем" поступлений новых сообщений для клиента, при поступлении сообщения, оно передается чат-клиенту, который вызывает функцию обработчик.

Андрей Почупайло

В разделе "Идеи" запостил - ""Прослушиватель" каналов данных очереди RabbitMQ для обработчика Directum RX" развитие темы в честь этой статьи. Не знаю доступна.

Рецензент так и не появился, прошу у кого есть права кинуть рецензию, думается полезно будет для общего кругозора..

Дмитрий Панкрашов

Андрей, идеи рецензируются сотрудниками компании, нам в данный момент, увы, идея недоступна :(

Компания Directum

Дмитрий, Идея https://club.directum.ru/idea/361130 теперь доступна, можно плюсовать 

Авторизуйтесь, чтобы написать комментарий