Публикация событий в сигнальный топик Kafka
Содержание раздела
Система поддерживает публикацию событий в сигнальный топик Kafka.
Если публикация включена, сообщения записываются в заданный топик (KAFKA_STATUS_EVENT_TOPIC) в JSON-формате. Каждое сообщение состоит из ключа и тела.
Включение публикации событий
По умолчанию публикация всех событий отключена.
Чтобы включить публикацию событий в топик Kafka:
- В конфигурации каждой ноды настройте общие параметры публикации в секции
status.event.publish:KAFKA_STATUS_EVENT_TOPIC— укажите имя топика для публикации событий;KAFKA_STATUS_EVENT_ENABLED— установите значениеtrue.
- (Опционально) Включите публикацию дополнительных событий, установив для соответствующих параметров значение
true:KAFKA_STATUS_EVENT_WRITE_OPERATIONS_ENABLED— завершение и отмена операций записи;KAFKA_STATUS_EVENT_EXTERNAL_DDL_ENABLED— создание и удаление внешних таблиц;KAFKA_STATUS_EVENT_SYNC_ENABLED— синхронизация материализованных представлений.
В конфигурации брокера Kafka рекомендуется настроить периодическую автоматическую очистку сигнального топика, заданного в KAFKA_STATUS_EVENT_TOPIC.
Формат ключа сообщения
Ключ сообщения в сигнальном топике Kafka имеет формат, показанный ниже. Описание параметров ключа сообщения см. в секции ниже.
{
"datamart": "<logical_db>",
"datetime": "<event_timestamp>",
"event": "<event_code>",
"eventLogId": "<event_UUID>"
}
Параметры ключа сообщения
logical_db-
Имя логической базы данных, в которой произошло событие.
event_timestamp-
Дата и время события в формате
YYYY-MM-DD hh:mm:ss[.microseconds]. event_code-
Код события. Возможные значения:
DATAMART_SCHEMA_CHANGED— выполнение DDL-запроса (кроме DDL для временных прокси-таблиц);DELTA_OPEN— открытие дельты;DELTA_CLOSE— закрытие дельты;DELTA_CANCEL— откат дельты;WRITE_OK— завершение операции записи;WRITE_CANCEL— отмена операции записи;SYNC_OK— синхронизация материализованного представления.
Подробнее о том, когда публикуются события, см. в разделе Уведомление о событиях.
event_UUID-
Уникальный идентификатор события.
Примеры ключей сообщения
Пример ключа в сообщении о закрытии дельты:
{
"datamart": "marketing",
"datetime": "2024-02-28 08:02:40.463",
"event": "DELTA_CLOSE",
"eventLogId": "6b80c392-3198-492f-a631-6a684e521b79"
}
Пример ключа в сообщении о завершении операции записи:
{
"datamart": "marketing",
"datetime": "2024-02-28 09:15:43.532",
"event": "WRITE_OK",
"eventLogId": "8b8dfa17-6605-4874-8ebf-1b8b47c201f3"
}
Пример ключа в сообщении об отмене операции записи:
{
"datamart": "marketing",
"datetime": "2024-02-28 09:51:11.148",
"event": "WRITE_CANCEL",
"eventLogId": "3d963cbc-8782-498e-923d-602d7ccf7c2d"
}
Формат тела сообщения
Тело сообщения в сигнальном топике Kafka имеет формат, показанный в таблице ниже. Описание параметров см. в секции ниже.
| Событие | Формат тела сообщения |
|---|---|
| Выполнение DDL-запроса | {"datamart": "<logical_db>","entityName": "<entity_name>","entityDefinition": "<entity_definition>","changeDateTime": "<ddl_timestamp>"} |
| Открытие дельты | {"deltaNum": <delta_number>} |
| Закрытие дельты | {"deltaNum": <delta_number>,"deltaDate": "<delta_commit_timestamp>"} |
| Откат дельты | {"deltaNum": <delta_number>} |
| Завершение операции записи | {"entity": "<entity_name>","cn": <sys_cn>,"ts": <ts_unix_time>,"rowsAffected": <rows_affected_quantity>} |
| Отмена операции записи | {"entity": "<entity_name>","cn": <sys_cn>} |
| Синхронизация материализованного представления | {"entity": "<entity_name>","deltaNum": <delta_number>,"deltaDate": "<delta_commit_timestamp>","cnList": [{"schema": "<source_logical_db>","name": "<source_entity_name>","cn": <source_sys_cn>},{ ... }]} |
Параметры тела сообщения
logical_db-
Имя логической базы данных, в которой выполнен DDL-запрос.
entity_name-
Имя задействованной логической сущности.
entity_definition-
Определение логической сущности. Возможные значения:
TABLE.DEFAULT— обычная логическая таблица;TABLE.PROXY— постоянная прокси-таблица;TABLE.PARTITION— партиция;TABLE.PARTITIONED— партиционированная таблица;TABLE.SNAPSHOT— снапшот-таблица;MATERIALIZED VIEW.DEFAULT— материализованное представление, построенное на данных логических БД;MATERIALIZED VIEW.EXEC.AGENT— материализованное представление, построенное на данных внешнего источника;VIEW.DEFAULT— обычное логическое представление (не относящее к категории простых);VIEW.PLAIN.FILTERED— простое представление с условием;VIEW.PLAIN.UNFILTERED— простое представление без условия;READABLE EXTERNAL TABLE.CORE:<datasource_name>— readable-таблица для работы со standalone-таблицей, расположенной в датасорсе<datasource_name>;READABLE EXTERNAL TABLE.KAFKA— readable-таблица для загрузки данных из брокера сообщений Kafka;WRITABLE EXTERNAL TABLE.CORE:<datasource_name>— writable-таблица для работы со standalone-таблицей, расположенной в датасорсе<datasource_name>;UPLOAD EXTERNAL TABLE.KAFKA— внешняя таблица загрузки;DOWNLOAD EXTERNAL TABLE.KAFKA— внешняя таблица выгрузки.
ddl_timestamp-
Дата и время выполнения DDL-запроса в формате
YYYY-MM-DD hh:mm:ss[.microseconds]. delta_number-
Номер дельты.
Для материализованных представлений — это номер автоматической дельты, которая объединяет все изменения данных, загруженные в материализованные представления за один цикл синхронизации.
delta_commit_timestamp-
Дата и время закрытия дельты в формате
YYYY-MM-DD hh:mm:ss[.microseconds].Для материализованных представлений — это дата и время закрытия автоматической дельты, которая объединяет все изменения данных, загруженные в материализованные представления за один цикл синхронизации.
sys_cn-
Номер операции записи. Для прокси-таблиц и standalone-таблиц указывается значение
null. ts_unix_time-
Дата и время выполнения или отмены операции записи в Unix-формате, равное целому числу микросекунд с 00:00:00 UTC 1 января 1970 года. Для прокси-таблиц и standalone-таблиц указывается значение
null. rows_affected_quantity-
Количество добавленных, измененных и удаленных операцией строк.
source_logical_db-
Имя логической базы данных в источнике, содержащей сущность
source_entity_name. source_entity_name-
Имя логической сущности в источнике, с которой было синхронизировано материализованное представление.
source_sys_cn-
Номер операции записи в сущности
source_entity_name, по состоянию на которую было синхронизировано материализованное представление.
Примеры тела сообщения
Тело сообщения о выполнении DDL-запроса
{
"changeDateTime": "2024-04-24 15:25:47.002",
"datamart": "marketing"
}
Тело сообщения о закрытии дельты
{
"deltaNum": 8,
"deltaDate": "2024-04-28 08:02:40"
}
Тело сообщения о синхронизации материализованного представления
{
"entity": "sales_and_stores",
"deltaNum": 3,
"deltaDate": "2025-05-14 10:23:53.101363",
"cnList": [
{
"schema": "marketing",
"name": "sales",
"cn": 1747193032105474
},
{
"schema": "marketing",
"name": "stores",
"cn": 1747193031105474
}
]
}