Отправка сообщений о событиях по HTTP

Содержание раздела
  1. Требования к подписчикам
  2. Включение публикации событий
  3. Как отправляются сообщения
    1. Накопление сообщений
    2. Отправка сообщений
    3. Буфер неотправленных сообщений
  4. Формат сообщений
    1. Параметры сообщения
    2. Параметры массива eventParams
  5. Примеры сообщений
    1. Сообщение о выполнении DDL-запроса
    2. Сообщение о завершении операции записи
    3. Сообщение о закрытии дельты
    4. Сообщение о синхронизации материализованного представления

Система поддерживает отправку HTTP-сообщений о событиях внешним подписчикам.

Если публикация событий включена, сообщения в заданном формате отправляются всем сконфигурированным подписчикам.

Требования к подписчикам

Для получения HTTP-сообщений о событиях подписчики должны уметь принимать сообщения в заданном формате, обрабатывать их и возвращать ответ. Каждый подписчик также должен выделить URL-адрес, на который Prostore будет отправлять сообщения.

Спецификацию OpenAPI для подписчиков см. ниже или по ссылке.

Спецификация HTTP-метода обработки сообщений о событиях
openapi: 3.0.0
info:
  title: StatusEventAPI
  version: 1.0.2
  description: Status Event Rest API
  contact:
    name: Alexander Senko
servers:
  - url: 'http://{host}:{port}'
    variables:
      host:
        default: localhost
      port:
        default: '8080'
paths:
  /custom-path/custom-endpointS:
    post:
      summary: Status event
      operationId: post-status-event
      responses:
        '200':
          description: OK
        '500':
          description: Internal Server Error
      description: Custom status event endpoint
      tags:
        - Status event
      requestBody:
        content:
          application/json:
            schema:
              type: array
              items:
                allOf:
                  - $ref: '#/components/schemas/status-event-common'
                  - $ref: '#/components/schemas/status-event-params'
            examples:
              DELTA_OPEN:
                value:
                  - datamart: testdb
                    datetime: '2019-08-24 14:15:22.123'
                    event: DELTA_OPEN
                    eventLogId: 1a8ae2c9-9963-4453-848b-31a196c58c03
                    eventParams:
                      deltaNum: 5
              DELTA_CLOSE:
                value:
                  - datamart: testdb
                    datetime: '2019-08-24 14:15:23.123'
                    event: DELTA_CLOSE
                    eventLogId: 1a8ae2c9-9963-4453-848b-31a196c58c03
                    eventParams:
                      deltaNum: 5
                      deltaDate: '2019-08-24 14:15:22'
              DELTA_CANCEL:
                value:
                  - datamart: testdb
                    datetime: '2019-08-24 14:15:22.123'
                    event: DELTA_CANCEL
                    eventLogId: 1a8ae2c9-9963-4453-848b-31a196c58c03
                    eventParams:
                      deltaNum: 6
              DATAMART_SCHEMA_CHANGED:
                value:
                  - datamart: testdb
                    datetime: '2019-08-24 14:15:23.123'
                    event: DATAMART_SCHEMA_CHANGED
                    eventLogId: 1a8ae2c9-9963-4453-848b-31a196c58c03
                    eventParams:
                      datamart: testdb
                      entityName: tbl
                      entityDefinition: TABLE.DEFAULT
                      changeDateTime: '2019-08-24 14:15:22.123'
              WRITE_OK:
                value:
                  - datamart: testdb
                    datetime: '2019-08-24 14:15:23.123'
                    event: WRITE_OK
                    eventLogId: 1a8ae2c9-9963-4453-848b-31a196c58c03
                    eventParams:
                      entity: tbl
                      cn: 21
                      ts: 1566656122000000
                      rowsAffected: 10
              WRITE_CANCEL:
                value:
                  - datamart: testdb
                    datetime: '2019-08-24 14:15:22.123'
                    event: WRITE_CANCEL
                    eventLogId: 1a8ae2c9-9963-4453-848b-31a196c58c03
                    eventParams:
                      entity: tbl
                      cn: 22
              ARRAY:
                value:
                  - datamart: testdb
                    datetime: '2019-08-24 14:15:22.123'
                    event: DELTA_OPEN
                    eventLogId: 1a8ae2c9-9963-4453-848b-31a196c58c03
                    eventParams:
                      deltaNum: 5
                  - datamart: testdb
                    datetime: '2019-08-24 14:15:23.123'
                    event: DELTA_CLOSE
                    eventLogId: 1a8ae2c9-9963-4453-848b-31a196c58c03
                    eventParams:
                      deltaNum: 5
                      deltaDate: '2019-08-24 14:15:22'
                  - datamart: testdb
                    datetime: '2019-08-24 14:15:23.123'
                    event: WRITE_OK
                    eventLogId: 1a8ae2c9-9963-4453-848b-31a196c58c03
                    eventParams:
                      entity: tbl
                      cn: 21
                      ts: 1566656122000000
                      rowsAffected: 10
components:
  schemas:
    status-event-common:
      title: status-event-common
      type: object
      description: Model for status event common information
      properties:
        datamart:
          type: string
        datetime:
          type: string
          format: date-time
        event:
          type: string
          enum:
            - DELTA_OPEN
            - DELTA_CLOSE
            - DELTA_CANCEL
            - DATAMART_SCHEMA_CHANGED
            - WRITE_OK
            - WRITE_CANCEL
        eventLogId:
          type: string
          format: uuid
      required:
        - datamart
        - datetime
        - event
        - eventLogId
    status-event-params:
      title: status-event-params
      type: object
      description: Model for status event params
      properties:
        eventParams:
          anyOf:
            - $ref: '#/components/schemas/status-event-delta-open'
            - $ref: '#/components/schemas/status-event-delta-close'
            - $ref: '#/components/schemas/status-event-delta-cancel'
            - $ref: '#/components/schemas/status-event-datamart-schema-changed'
            - $ref: '#/components/schemas/status-event-write-ok'
            - $ref: '#/components/schemas/status-event-write-cancel'
          type: object
      required:
        - eventParams
    status-event-delta-open:
      title: status-event-delta-open
      type: object
      description: Model for DELTA_OPEN status event specific params
      properties:
        deltaNum:
          type: integer
      required:
        - deltaNum
    status-event-delta-close:
      title: status-event-delta-close
      type: object
      description: Model for DELTA_CLOSE status event specific params
      properties:
        deltaNum:
          type: integer
        deltaDate:
          type: string
          format: date-time
      required:
        - deltaNum
        - deltaDate
    status-event-delta-cancel:
      title: status-event-delta-cancel
      type: object
      description: Model for DELTA_CANCEL status event specific params
      properties:
        deltaNum:
          type: integer
      required:
        - deltaNum
    status-event-datamart-schema-changed:
      title: status-event-datamart-schema-changed
      type: object
      description: Model for DATAMART_SCHEMA_CHANGED status event specific params
      properties:
        datamart:
          type: string
        entityName:
          type: string
        entityDefinition:
          type: string
        changeDateTime:
          type: string
          format: date-time
      required:
        - datamart
        - entityName
        - entityDefinition
        - changeDateTime
    status-event-write-ok:
      title: status-event-write-ok
      type: object
      description: Model for WRITE_OK status event specific params
      properties:
        entity:
          type: string
        cn:
          type: integer
        ts:
          type: integer
        rowsAffected:
          type: integer
      required:
        - entity
        - cn
        - ts
        - rowsAffected
    status-event-write-cancel:
      title: status-event-write-cancel
      type: object
      description: Model for WRITE_CANCEL status event specific params
      properties:
        entity:
          type: string
        cn:
          type: integer
      required:
        - entity
        - cn
tags:
  - name: Status event
    description: defines status event endpoints

Включение публикации событий

По умолчанию публикация всех событий отключена.

Чтобы включить публикацию событий по HTTP:

  1. В конфигурации каждой ноды задайте параметры подписчика в секции poststatusevent.subscriberGroup:
    1. Укажите узлы подписчика с помощью параметра POST_STATUS_EVENT_SUBSCRIBER_GROUP_ENDPOINT_URL в формате http://host_1:port_1,host_2:port_2,.../endpoint. Рекомендуется указать несколько узлов для подписчика на случай недоступности некоторых из них.
    2. (Опционально) Включите публикацию дополнительных событий, установив для соответствующих параметров значение true:
      • POST_STATUS_EVENT_SUBSCRIBER_GROUP_WRITE_OPERATIONS_ENABLED — завершение и отмена операций записи;
      • POST_STATUS_EVENT_SUBSCRIBER_GROUP_EXTERNAL_DDL_ENABLED — создание и удаление внешних таблиц;
      • POST_STATUS_EVENT_SUBSCRIBER_GROUP_SYNC_ENABLED — синхронизация материализованных представлений.
    3. (Опционально) Внесите другие изменения в конфигурацию подписчика.
  2. (Опционально) Повторите шаг 1 для других подписчиков.

Как отправляются сообщения

Накопление сообщений

По умолчанию сообщения отправляются сразу после их формирования. Но, если значение параметра POST_STATUS_EVENT_SUBSCRIBER_GROUP_BUFFERING_PERIOD_MS больше 0, сообщения накапливаются заданное время перед отправкой.

При немедленной отправке система может объединить нескольких событий, если они произошли во время обработки первого.

Отправка сообщений

Сообщения о событиях отправляются случайному узлу каждого подписчика. Если узел недоступен, система пробует другие узлы подписчика по очереди.

Сообщения (или пачки сообщений, если включено накопление сообщений) отправляются последовательно — следующее отправляется после успешной отправки предыдущего.

Буфер неотправленных сообщений

Если подписчик недоступен и сообщение не доставлено, оно попадает в буфер и пытается отправиться повторно. Попытки делаются с интервалом POST_STATUS_EVENT_SUBSCRIBER_GROUP_ENDPOINT_RETRY_TIMEOUT_MS (по умолчанию — раз в 5 секунд).

Размер буфера ограничен параметром POST_STATUS_EVENT_SUBSCRIBER_GROUP_BUFFER_SIZE и по умолчанию равен 1000 сообщений. При переполнении буфера старые сообщения заменяются новыми.

Формат сообщений

Сообщения о событиях имеют следующий формат:

{
  "datamart": "<logical_db>",
  "datetime": "<event_timestamp>",
  "event": "<event_code>",
  "eventLogId": "<event_UUID>",
  "eventParams": {
    ...
  }
}

Параметры сообщения

logical_db

Имя логической базы данных, в которой произошло событие.

event_timestamp

Дата и время события в формате YYYY-MM-DD hh:mm:ss[.microseconds].

event_code

Код события. Возможные значения:

  • DATAMART_SCHEMA_CHANGED — выполнение DDL-запроса (кроме DDL для временных прокси-таблиц);
  • DELTA_OPEN — открытие дельты;
  • DELTA_CLOSE — закрытие дельты;
  • DELTA_CANCEL — откат дельты;
  • WRITE_OK — завершение операции записи;
  • WRITE_CANCEL — отмена операции записи;
  • SYNC_OK — синхронизация материализованного представления.

Подробнее о том, когда публикуются события, см. в разделе Уведомление о событиях.

event_UUID

Уникальный идентификатор события.

eventParams

Массив параметров события. Набор параметров в массиве зависит от вида события, как показано в таблице ниже.

Событие Массив параметров eventParams
Выполнение DDL-запроса {"datamart": "<logical_db>",
"entityName": "<entity_name>",
"entityDefinition": "<entity_definition>",
"changeDateTime": "<ddl_timestamp>"}
Открытие дельты {"deltaNum": <delta_number>}
Закрытие дельты {"deltaNum": <delta_number>,
"deltaDate": "<delta_commit_timestamp>"}
Откат дельты {"deltaNum": <delta_number>}
Завершение операции записи {"entity": "<entity_name>",
"cn": <sys_cn>,
"ts": <ts_unix_time>,
"rowsAffected": <rows_affected_quantity>}
Отмена операции записи {"entity": "<entity_name>",
"cn": <sys_cn>}
Синхронизация материализованного
представления
{"entity": "<entity_name>",
"deltaNum": <delta_number>,
"deltaDate": "<delta_commit_timestamp>",
"cnList": [
{"schema": "<source_logical_db>",
"name": "<source_entity_name>",
"cn": <source_sys_cn>},
{ ... }
]}

Параметры массива eventParams

logical_db

Имя логической базы данных, в которой выполнен DDL-запрос.

entity_name

Имя задействованной логической сущности.

entity_definition

Определение логической сущности. Возможные значения:

ddl_timestamp

Дата и время выполнения DDL-запроса в формате YYYY-MM-DD hh:mm:ss[.microseconds].

delta_number

Номер дельты.

Для материализованных представлений — это номер автоматической дельты, которая объединяет все изменения данных, загруженные в материализованные представления за один цикл синхронизации.

delta_commit_timestamp

Дата и время закрытия дельты в формате YYYY-MM-DD hh:mm:ss[.microseconds].

Для материализованных представлений — это дата и время закрытия автоматической дельты, которая объединяет все изменения данных, загруженные в материализованные представления за один цикл синхронизации.

sys_cn

Номер операции записи. Для прокси-таблиц и standalone-таблиц указывается значение null.

ts_unix_time

Дата и время выполнения или отмены операции записи в Unix-формате, равное целому числу микросекунд с 00:00:00 UTC 1 января 1970 года. Для прокси-таблиц и standalone-таблиц указывается значение null.

rows_affected_quantity

Количество добавленных, измененных и удаленных операцией строк.

source_logical_db

Имя логической базы данных в источнике, содержащей сущность source_entity_name.

source_entity_name

Имя логической сущности в источнике, с которой было синхронизировано материализованное представление.

source_sys_cn

Номер операции записи в сущности source_entity_name, по состоянию на которую было синхронизировано материализованное представление.

Примеры сообщений

Сообщение о выполнении DDL-запроса

[
  {
    "datamart": "marketing",
    "datetime": "2024-05-10 11:10:46.104",
    "event": "DATAMART_SCHEMA_CHANGED",
    "eventLogId": "1a8ae2c9-9963-4453-848b-31a196c58c03",
    "eventParams": {
      "datamart": "marketing",
      "entityName": "sales",
      "entityDefinition": "TABLE.DEFAULT",
      "changeDateTime": "2024-05-10 11:10:46.085"
    }
  }
]

Сообщение о завершении операции записи

[
  {
    "datamart": "marketing",
    "datetime": "2024-08-24 14:15:23.278",
    "event": "WRITE_OK",
    "eventLogId": "1a8ae2c9-9963-4453-848b-31a196c58c03",
    "eventParams": {
      "entity": "sales",
      "cn": 1724508923111000,
      "ts": 1724508923278000,
      "rowsAffected": 10
    }
  }
]

Сообщение о закрытии дельты

[
  {
    "datamart": "marketing",
    "datetime": "2024-08-24 14:20:06.755",
    "event": "DELTA_CLOSE",
    "eventLogId": "1a8ae2c9-9963-4453-848b-31a196c58c03",
    "eventParams": {
      "deltaNum": 5,
      "deltaDate": "2024-08-24 14:20:06"
    }
  }
]

Сообщение о синхронизации материализованного представления

{
  "datamart": "matview_db",
  "datetime": "2025-05-14 10:23:53.105474",
  "event": "SYNC_OK",
  "eventLogId": "38824069-04f3-4d58-9e5a-d2e1c8ac80a3",
  "eventParams": {
    "entity": "sales_and_stores",
    "deltaNum": 3,
    "deltaDate": "2025-05-14 10:23:53.101363",
    "cnList": [
      {
        "schema": "marketing",
        "name": "sales",
        "cn": 1747193032105474
      },
      {
        "schema": "marketing",
        "name": "stores",
        "cn": 1747193031105474
      }
    ]
  }
}