Отправка сообщений о событиях по HTTP
Содержание раздела
Система поддерживает отправку HTTP-сообщений о событиях внешним подписчикам.
Если публикация событий включена, сообщения в заданном формате отправляются всем сконфигурированным подписчикам.
Требования к подписчикам
Для получения HTTP-сообщений о событиях подписчики должны уметь принимать сообщения в заданном формате, обрабатывать их и возвращать ответ. Каждый подписчик также должен выделить URL-адрес, на который Prostore будет отправлять сообщения.
Спецификацию OpenAPI для подписчиков см. ниже или по ссылке.
Спецификация HTTP-метода обработки сообщений о событиях
openapi: 3.0.0
info:
title: StatusEventAPI
version: 1.0.2
description: Status Event Rest API
contact:
name: Alexander Senko
servers:
- url: 'http://{host}:{port}'
variables:
host:
default: localhost
port:
default: '8080'
paths:
/custom-path/custom-endpointS:
post:
summary: Status event
operationId: post-status-event
responses:
'200':
description: OK
'500':
description: Internal Server Error
description: Custom status event endpoint
tags:
- Status event
requestBody:
content:
application/json:
schema:
type: array
items:
allOf:
- $ref: '#/components/schemas/status-event-common'
- $ref: '#/components/schemas/status-event-params'
examples:
DELTA_OPEN:
value:
- datamart: testdb
datetime: '2019-08-24 14:15:22.123'
event: DELTA_OPEN
eventLogId: 1a8ae2c9-9963-4453-848b-31a196c58c03
eventParams:
deltaNum: 5
DELTA_CLOSE:
value:
- datamart: testdb
datetime: '2019-08-24 14:15:23.123'
event: DELTA_CLOSE
eventLogId: 1a8ae2c9-9963-4453-848b-31a196c58c03
eventParams:
deltaNum: 5
deltaDate: '2019-08-24 14:15:22'
DELTA_CANCEL:
value:
- datamart: testdb
datetime: '2019-08-24 14:15:22.123'
event: DELTA_CANCEL
eventLogId: 1a8ae2c9-9963-4453-848b-31a196c58c03
eventParams:
deltaNum: 6
DATAMART_SCHEMA_CHANGED:
value:
- datamart: testdb
datetime: '2019-08-24 14:15:23.123'
event: DATAMART_SCHEMA_CHANGED
eventLogId: 1a8ae2c9-9963-4453-848b-31a196c58c03
eventParams:
datamart: testdb
entityName: tbl
entityDefinition: TABLE.DEFAULT
changeDateTime: '2019-08-24 14:15:22.123'
WRITE_OK:
value:
- datamart: testdb
datetime: '2019-08-24 14:15:23.123'
event: WRITE_OK
eventLogId: 1a8ae2c9-9963-4453-848b-31a196c58c03
eventParams:
entity: tbl
cn: 21
ts: 1566656122000000
rowsAffected: 10
WRITE_CANCEL:
value:
- datamart: testdb
datetime: '2019-08-24 14:15:22.123'
event: WRITE_CANCEL
eventLogId: 1a8ae2c9-9963-4453-848b-31a196c58c03
eventParams:
entity: tbl
cn: 22
ARRAY:
value:
- datamart: testdb
datetime: '2019-08-24 14:15:22.123'
event: DELTA_OPEN
eventLogId: 1a8ae2c9-9963-4453-848b-31a196c58c03
eventParams:
deltaNum: 5
- datamart: testdb
datetime: '2019-08-24 14:15:23.123'
event: DELTA_CLOSE
eventLogId: 1a8ae2c9-9963-4453-848b-31a196c58c03
eventParams:
deltaNum: 5
deltaDate: '2019-08-24 14:15:22'
- datamart: testdb
datetime: '2019-08-24 14:15:23.123'
event: WRITE_OK
eventLogId: 1a8ae2c9-9963-4453-848b-31a196c58c03
eventParams:
entity: tbl
cn: 21
ts: 1566656122000000
rowsAffected: 10
components:
schemas:
status-event-common:
title: status-event-common
type: object
description: Model for status event common information
properties:
datamart:
type: string
datetime:
type: string
format: date-time
event:
type: string
enum:
- DELTA_OPEN
- DELTA_CLOSE
- DELTA_CANCEL
- DATAMART_SCHEMA_CHANGED
- WRITE_OK
- WRITE_CANCEL
eventLogId:
type: string
format: uuid
required:
- datamart
- datetime
- event
- eventLogId
status-event-params:
title: status-event-params
type: object
description: Model for status event params
properties:
eventParams:
anyOf:
- $ref: '#/components/schemas/status-event-delta-open'
- $ref: '#/components/schemas/status-event-delta-close'
- $ref: '#/components/schemas/status-event-delta-cancel'
- $ref: '#/components/schemas/status-event-datamart-schema-changed'
- $ref: '#/components/schemas/status-event-write-ok'
- $ref: '#/components/schemas/status-event-write-cancel'
type: object
required:
- eventParams
status-event-delta-open:
title: status-event-delta-open
type: object
description: Model for DELTA_OPEN status event specific params
properties:
deltaNum:
type: integer
required:
- deltaNum
status-event-delta-close:
title: status-event-delta-close
type: object
description: Model for DELTA_CLOSE status event specific params
properties:
deltaNum:
type: integer
deltaDate:
type: string
format: date-time
required:
- deltaNum
- deltaDate
status-event-delta-cancel:
title: status-event-delta-cancel
type: object
description: Model for DELTA_CANCEL status event specific params
properties:
deltaNum:
type: integer
required:
- deltaNum
status-event-datamart-schema-changed:
title: status-event-datamart-schema-changed
type: object
description: Model for DATAMART_SCHEMA_CHANGED status event specific params
properties:
datamart:
type: string
entityName:
type: string
entityDefinition:
type: string
changeDateTime:
type: string
format: date-time
required:
- datamart
- entityName
- entityDefinition
- changeDateTime
status-event-write-ok:
title: status-event-write-ok
type: object
description: Model for WRITE_OK status event specific params
properties:
entity:
type: string
cn:
type: integer
ts:
type: integer
rowsAffected:
type: integer
required:
- entity
- cn
- ts
- rowsAffected
status-event-write-cancel:
title: status-event-write-cancel
type: object
description: Model for WRITE_CANCEL status event specific params
properties:
entity:
type: string
cn:
type: integer
required:
- entity
- cn
tags:
- name: Status event
description: defines status event endpoints
Включение публикации событий
По умолчанию публикация всех событий отключена.
Чтобы включить публикацию событий по HTTP:
- В конфигурации каждой ноды задайте параметры подписчика в секции
poststatusevent.subscriberGroup:- Укажите узлы подписчика с помощью параметра
POST_STATUS_EVENT_SUBSCRIBER_GROUP_ENDPOINT_URLв форматеhttp://host_1:port_1,host_2:port_2,.../endpoint. Рекомендуется указать несколько узлов для подписчика на случай недоступности некоторых из них. - (Опционально) Включите публикацию дополнительных событий, установив для соответствующих параметров значение
true:POST_STATUS_EVENT_SUBSCRIBER_GROUP_WRITE_OPERATIONS_ENABLED— завершение и отмена операций записи;POST_STATUS_EVENT_SUBSCRIBER_GROUP_EXTERNAL_DDL_ENABLED— создание и удаление внешних таблиц;POST_STATUS_EVENT_SUBSCRIBER_GROUP_SYNC_ENABLED— синхронизация материализованных представлений.
- (Опционально) Внесите другие изменения в конфигурацию подписчика.
- Укажите узлы подписчика с помощью параметра
- (Опционально) Повторите шаг 1 для других подписчиков.
Как отправляются сообщения
Накопление сообщений
По умолчанию сообщения отправляются сразу после их формирования. Но, если значение параметра POST_STATUS_EVENT_SUBSCRIBER_GROUP_BUFFERING_PERIOD_MS больше 0, сообщения накапливаются заданное время перед отправкой.
При немедленной отправке система может объединить нескольких событий, если они произошли во время обработки первого.
Отправка сообщений
Сообщения о событиях отправляются случайному узлу каждого подписчика. Если узел недоступен, система пробует другие узлы подписчика по очереди.
Сообщения (или пачки сообщений, если включено накопление сообщений) отправляются последовательно — следующее отправляется после успешной отправки предыдущего.
Буфер неотправленных сообщений
Если подписчик недоступен и сообщение не доставлено, оно попадает в буфер и пытается отправиться повторно. Попытки делаются с интервалом POST_STATUS_EVENT_SUBSCRIBER_GROUP_ENDPOINT_RETRY_TIMEOUT_MS (по умолчанию — раз в 5 секунд).
Размер буфера ограничен параметром POST_STATUS_EVENT_SUBSCRIBER_GROUP_BUFFER_SIZE и по умолчанию равен 1000 сообщений. При переполнении буфера старые сообщения заменяются новыми.
Формат сообщений
Сообщения о событиях имеют следующий формат:
{
"datamart": "<logical_db>",
"datetime": "<event_timestamp>",
"event": "<event_code>",
"eventLogId": "<event_UUID>",
"eventParams": {
...
}
}
Параметры сообщения
logical_db-
Имя логической базы данных, в которой произошло событие.
event_timestamp-
Дата и время события в формате
YYYY-MM-DD hh:mm:ss[.microseconds]. event_code-
Код события. Возможные значения:
DATAMART_SCHEMA_CHANGED— выполнение DDL-запроса (кроме DDL для временных прокси-таблиц);DELTA_OPEN— открытие дельты;DELTA_CLOSE— закрытие дельты;DELTA_CANCEL— откат дельты;WRITE_OK— завершение операции записи;WRITE_CANCEL— отмена операции записи;SYNC_OK— синхронизация материализованного представления.
Подробнее о том, когда публикуются события, см. в разделе Уведомление о событиях.
event_UUID-
Уникальный идентификатор события.
eventParams-
Массив параметров события. Набор параметров в массиве зависит от вида события, как показано в таблице ниже.
| Событие | Массив параметров eventParams |
|---|---|
| Выполнение DDL-запроса | {"datamart": "<logical_db>","entityName": "<entity_name>","entityDefinition": "<entity_definition>","changeDateTime": "<ddl_timestamp>"} |
| Открытие дельты | {"deltaNum": <delta_number>} |
| Закрытие дельты | {"deltaNum": <delta_number>,"deltaDate": "<delta_commit_timestamp>"} |
| Откат дельты | {"deltaNum": <delta_number>} |
| Завершение операции записи | {"entity": "<entity_name>","cn": <sys_cn>,"ts": <ts_unix_time>,"rowsAffected": <rows_affected_quantity>} |
| Отмена операции записи | {"entity": "<entity_name>","cn": <sys_cn>} |
| Синхронизация материализованного представления | {"entity": "<entity_name>","deltaNum": <delta_number>,"deltaDate": "<delta_commit_timestamp>","cnList": [{"schema": "<source_logical_db>","name": "<source_entity_name>","cn": <source_sys_cn>},{ ... }]} |
Параметры массива eventParams
logical_db-
Имя логической базы данных, в которой выполнен DDL-запрос.
entity_name-
Имя задействованной логической сущности.
entity_definition-
Определение логической сущности. Возможные значения:
TABLE.DEFAULT— обычная логическая таблица;TABLE.PROXY— постоянная прокси-таблица;TABLE.PARTITION— партиция;TABLE.PARTITIONED— партиционированная таблица;TABLE.SNAPSHOT— снапшот-таблица;MATERIALIZED VIEW.DEFAULT— материализованное представление, построенное на данных логических БД;MATERIALIZED VIEW.EXEC.AGENT— материализованное представление, построенное на данных внешнего источника;VIEW.DEFAULT— обычное логическое представление (не относящее к категории простых);VIEW.PLAIN.FILTERED— простое представление с условием;VIEW.PLAIN.UNFILTERED— простое представление без условия;READABLE EXTERNAL TABLE.CORE:<datasource_name>— readable-таблица для работы со standalone-таблицей, расположенной в датасорсе<datasource_name>;READABLE EXTERNAL TABLE.KAFKA— readable-таблица для загрузки данных из брокера сообщений Kafka;WRITABLE EXTERNAL TABLE.CORE:<datasource_name>— writable-таблица для работы со standalone-таблицей, расположенной в датасорсе<datasource_name>;UPLOAD EXTERNAL TABLE.KAFKA— внешняя таблица загрузки;DOWNLOAD EXTERNAL TABLE.KAFKA— внешняя таблица выгрузки.
ddl_timestamp-
Дата и время выполнения DDL-запроса в формате
YYYY-MM-DD hh:mm:ss[.microseconds]. delta_number-
Номер дельты.
Для материализованных представлений — это номер автоматической дельты, которая объединяет все изменения данных, загруженные в материализованные представления за один цикл синхронизации.
delta_commit_timestamp-
Дата и время закрытия дельты в формате
YYYY-MM-DD hh:mm:ss[.microseconds].Для материализованных представлений — это дата и время закрытия автоматической дельты, которая объединяет все изменения данных, загруженные в материализованные представления за один цикл синхронизации.
sys_cn-
Номер операции записи. Для прокси-таблиц и standalone-таблиц указывается значение
null. ts_unix_time-
Дата и время выполнения или отмены операции записи в Unix-формате, равное целому числу микросекунд с 00:00:00 UTC 1 января 1970 года. Для прокси-таблиц и standalone-таблиц указывается значение
null. rows_affected_quantity-
Количество добавленных, измененных и удаленных операцией строк.
source_logical_db-
Имя логической базы данных в источнике, содержащей сущность
source_entity_name. source_entity_name-
Имя логической сущности в источнике, с которой было синхронизировано материализованное представление.
source_sys_cn-
Номер операции записи в сущности
source_entity_name, по состоянию на которую было синхронизировано материализованное представление.
Примеры сообщений
Сообщение о выполнении DDL-запроса
[
{
"datamart": "marketing",
"datetime": "2024-05-10 11:10:46.104",
"event": "DATAMART_SCHEMA_CHANGED",
"eventLogId": "1a8ae2c9-9963-4453-848b-31a196c58c03",
"eventParams": {
"datamart": "marketing",
"entityName": "sales",
"entityDefinition": "TABLE.DEFAULT",
"changeDateTime": "2024-05-10 11:10:46.085"
}
}
]
Сообщение о завершении операции записи
[
{
"datamart": "marketing",
"datetime": "2024-08-24 14:15:23.278",
"event": "WRITE_OK",
"eventLogId": "1a8ae2c9-9963-4453-848b-31a196c58c03",
"eventParams": {
"entity": "sales",
"cn": 1724508923111000,
"ts": 1724508923278000,
"rowsAffected": 10
}
}
]
Сообщение о закрытии дельты
[
{
"datamart": "marketing",
"datetime": "2024-08-24 14:20:06.755",
"event": "DELTA_CLOSE",
"eventLogId": "1a8ae2c9-9963-4453-848b-31a196c58c03",
"eventParams": {
"deltaNum": 5,
"deltaDate": "2024-08-24 14:20:06"
}
}
]
Сообщение о синхронизации материализованного представления
{
"datamart": "matview_db",
"datetime": "2025-05-14 10:23:53.105474",
"event": "SYNC_OK",
"eventLogId": "38824069-04f3-4d58-9e5a-d2e1c8ac80a3",
"eventParams": {
"entity": "sales_and_stores",
"deltaNum": 3,
"deltaDate": "2025-05-14 10:23:53.101363",
"cnList": [
{
"schema": "marketing",
"name": "sales",
"cn": 1747193032105474
},
{
"schema": "marketing",
"name": "stores",
"cn": 1747193031105474
}
]
}
}