2. Конфигурация модуля СМЭВ4-адаптер - Модуль MPPW (application.yml)
Файл application.yml – основной конфигурационный файл модуля, в котором задана его логика и порядок работы модуля:
подключение к Агенту СМЭВ4, Брокеру сообщений Kafka, Zookeeper, а также настройка подключения к Prostore
(секция: prostore), настройка метрик (секция: metrics) и другие настройки необходимые для корректной работы адаптера.
2.1. Пример файла application.yml
В конфигурационном файле следует задавать только те настройки, которые необходимы для решения текущих бизнес-задач.
tp:
data-topic-prefix: ${DATA_TOPIC_PREFIX:tp.data}
upload-topic-prefix: ${UPLOAD_TOPIC_PREFIX:tmp.w}
upload:
data-topic-prefix: ${AGENT_TOPIC_PREFIX:}
retry:
attempts: 5
delay: 1m
http-server:
port: ${HTTP_PORT:8090}
environment:
name: ${ENVIRONMENT_NAME:test}
executor:
reader-pool-size: ${EXECUTOR_READER_POOL_SIZE:20}
max-execute-time: ${EXECUTOR_MAX_EXECUTE_TIME:600}
scheduler:
delta-apply-request-timeout: ${DELTA_APPLY_REQUEST_TIMEOUT:60}
spring:
liquibase:
enabled: ${LIQUIBASE_ENABLED:true} # Для replication.storage.type=kafka необходимо выключить
change-log: classpath:/liquibase-changes/changelog.xml
driver-class-name: org.postgresql.Driver
url: jdbc:postgresql://${replication.storage.postgres.connection.host}:${replication.storage.postgres.connection.port}/${replication.storage.postgres.connection.database}?currentSchema=${replication.storage.postgres.connection.schema}
user: ${replication.storage.postgres.connection.user}
password: ${replication.storage.postgres.connection.password}
replication:
should-check-source-topic: false
should-delete-prostore-topic: false
prostore-topic-template: "mppw.{datamart}.{table}"
storage: # блок настроек хранения чанков данных репликации. При изменении параметров необходимо синхронизировать аналогичные параметры в сервисе mppw
type: ${STORAGE_TYPE:postgres} # тип, postgres|kafka
postgres: # параметры подключения к базе. Используется только при type=postgres
connection:
database: ${STORAGE_DATABASE:replication}
schema: ${STORAGE_SCHEMA:public}
host: ${STORAGE_HOST:localhost}
port: ${STORAGE_PORT:5432}
user: ${STORAGE_USER:postgres}
password: ${STORAGE_PASSWORD:postgres}
pool:
max-size: ${STORAGE_POOL_SIZE:30}
delta:
# параметр ожидания перед повторной попыткой открытия дельты в случае ошибки
open-delay: ${DELTA_OPEN_DELAY:${scheduler.delta-apply-request-timeout}}s
# количество попыток открытия дельты в случае ошибки
open-attempts: ${DELTA_OPEN_ATTEMPTS:5}
# период проверки открытых дельт
open-check: ${DELTA_OPEN_CHECK:60}s
# количество попыток фиксации дельты в случае ошибки
commit-attempts: ${DELTA_COMMIT_ATTEMPTS:5}
# период ожидания перед повторной попыткой фиксации дельты
commit-error-delay: ${DELTA_COMMIT_DELAY:1}s
# количество попыток отката дельты в случае ошибки
rollback-attempts: ${DELTA_COMMIT_ATTEMPTS:3}
# период ожидания перед повторной попыткой отката дельты
rollback-error-delay: ${DELTA_COMMIT_DELAY:5}s
#enable/disable/period
creating-delta-on-upload-request: ${DELTA_CREATING_MODE:enable}
period: ${CREATING_DELTA_ON_UPLOAD_REQUEST_PERIOD:300}s
send:
channel-size: ${SEND_CHANNEL_SIZE:10}
timeout: ${SEND_TIMEOUT:60}
zookeeper:
connection-string: ${ZOOKEEPER_DS_ADDRESS:localhost}
connection-timeout-ms: ${ZOOKEEPER_DS_CONNECTION_TIMEOUT_MS:30000}
session-timeout-ms: ${ZOOKEEPER_DS_SESSION_TIMEOUT_MS:86400000}
chroot: ${ZOOKEEPER_DS_CHROOT:/adapter}
prostore-rest-client:
host: ${PS_HOST:localhost}
port: ${PS_PORT:9195}
http:
max-pool-size: ${PS_MAX_POOL_SIZE:8}
prostore:
key:
primary: ${PRIMARY_KEY_NAME:tmp_id}
type: ${PRIMARY_KEY_TYPE:BIGINT}
kafka:
message-limit: ${PS_MESSAGE_LIMIT:1000}
zk-url: ${PS_ZK_KAFKA_URL:localhost:2181}
properties:
bootstrap.servers: ${PS_KAFKA:localhost:9092}
kafka:
agent.topic.prefix: ${AGENT_TOPIC_PREFIX:}
max-concurrent-handle: ${KAFKA_MAX_CONCURRENT_HANDLE:10}
commit-interval: ${KAFKA_COMMIT_INTERVAL:5s}
internal:
bootstrap.servers: ${PS_KAFKA:localhost:9092}
topic.prefix: ${INTERNAL_TOPIC_PREFIX:${kafka.agent.topic.prefix}}
consumer:
tp-request:
topic: ${kafka.internal.topic.prefix}mppw.tp
max-concurrent-handle: ${kafka.max-concurrent-handle}
commit-interval: ${kafka.commit-interval}
property:
bootstrap.servers: ${kafka.internal.bootstrap.servers}
group.id: ${kafka.internal.topic.prefix}podd-adapter-mppw-tp
auto.offset.reset: earliest
enable.auto.commit: false
upload-request:
topic: ${kafka.internal.topic.prefix}mppw.upload.rq
commit-interval: ${kafka.commit-interval}
property:
bootstrap.servers: ${kafka.internal.bootstrap.servers}
group.id: ${kafka.internal.topic.prefix}podd-adapter-mppw-upload
auto.offset.reset: earliest
enable.auto.commit: false
delta-apply-request:
topic: ${kafka.internal.topic.prefix}mppw.delta.in.rq
commit-interval: ${kafka.commit-interval}
property:
bootstrap.servers: ${kafka.internal.bootstrap.servers}
group.id: ${kafka.internal.topic.prefix}podd-adapter-mppw-delta
auto.offset.reset: earliest
enable.auto.commit: false
upload-data:
property:
bootstrap.servers: ${kafka.internal.bootstrap.servers}
group.id: ${kafka.internal.topic.prefix}podd-adapter-mppw-data
auto.offset.reset: earliest
enable.auto.commit: false
producer:
tp-result: ${kafka.internal.topic.prefix}mppw.rs
upload-result: ${kafka.internal.topic.prefix}mppw.upload.rs
delta-apply-result: ${kafka.internal.topic.prefix}mppw.delta.in.rs
property:
bootstrap.servers: ${kafka.internal.bootstrap.servers}
metrics:
port: ${METRICS_PORT:9843}
jet-connector:
use: ${JET_CONNECTOR_USE:true}
3. Параметры конфигурации
Настройка конфигурации СМЭВ4-адаптера - Модуль MPPW осуществляется путем редактирования параметров настроек в
файле application.yml, где настраиваются секции:
tp- указываются настройки топиков для табличных параметров;upload- настройки загрузки через DATA-Uploader;http-server- указывается порт для подключения;environment- указывается название окружения (test,prodи т.д.);executor- настраивается размер пула для запросов;scheduler- настройки планировщика отложенных заданий;replication- блок настроек хранения чанков данных репликации;send- настраиваются ограничения на размер загружаемого файла;zookeeper- указываются настройки подключения к Zookeeper;prostore-rest-client- блок параметров конфигурирования взаимодействия с ProStore;prostore- указываются настройки генерации на создание и удаление writable table;kafka- настройки параметров подключения к шине данных Apache Kafka;metrics- настройка получения метрик;jet-connector- подготовлен для оптимизации задержек записи.
3.1. Секция tp
Секция tp предназначена для настройки префиксов топиков, откуда будут вычитываться и куда будут загружаться данные.
Например:
tp:
data-topic-prefix: ${DATA_TOPIC_PREFIX:tp.data}
upload-topic-prefix: ${UPLOAD_TOPIC_PREFIX:tmp.w}
Параметры конфигурации
data-topic-prefix- префикс топика, откуда будут вычитываться данные, напримерDATA_TOPIC_PREFIX:tp.data;upload-topic-prefix- префикс топика, куда будут загружаться данные для их последующей загрузки в витрину, напримерUPLOAD_TOPIC_PREFIX:tmp.w.
3.2. Секция upload
В секции upload - указываются настройки загрузки через DATA-Uploader.
Например:
upload:
data-topic-prefix: ${AGENT_TOPIC_PREFIX:}
retry:
attempts: 5
delay: 1m
Параметры конфигурации
AGENT_TOPIC_PREFIX:- значение префикса для топиков. Топики взаимодействия с СМЭВ4-адаптером - Модуль исполнения запросов (см. Спецификация Модуля исполнения запросов);retry- повтор попытки загрузки;attempts- количество попыток;delay- период задержки (в минутах).
3.3. Секция http-server
В секции http-server указывается порт веб-сервера.
Например:
http-server:
port: ${HTTP_PORT:8090}
Параметры настроек
port- указывается порт веб-сервера, например:HTTP_PORT:8090.
3.4. Секция environment
В секции environment указывается среда разработки (dev, test, stable, prod).
Например:
environment:
name: ${ENVIRONMENT_NAME:test}
Параметры настроек
name- среда разработки, напримерENVIRONMENT_NAME:test.
3.5. Секция executor
Секция executor предназначена для указания размера пула для чтения Kafka и времени выполнения задач.
Например:
executor:
reader-pool-size: ${EXECUTOR_READER_POOL_SIZE:20}
max-execute-time: ${EXECUTOR_MAX_EXECUTE_TIME:600}
Параметры настроек
reader-pool-size- размер пула для чтения Kafka, напримерEXECUTOR_READER_POOL_SIZE:20;max-execute-time- максимальное время выполнения задачи (в секундах), напримерEXECUTOR_MAX_EXECUTE_TIME:600.
3.6. Секция scheduler
Секция scheduler предназначена для настроек планировщика отложенных заданий.
Например:
scheduler:
delta-apply-request-timeout: ${DELTA_APPLY_REQUEST_TIMEOUT:60}
Параметры настроек
delta-apply-request-timeout- таймаут применения отложенной дельты, напримерDELTA_APPLY_REQUEST_TIMEOUT:60.
3.7. Секция replication
секции
replicationуказываются настройки хранения чанков данных репликации.
Например:
replication:
should-check-source-topic: false
should-delete-prostore-topic: false
prostore-topic-template: "mppw.{datamart}.{table}"
storage: # блок настроек хранения чанков данных репликации. При изменении параметров необходимо синхронизировать аналогичные параметры в сервисе mppw
type: ${STORAGE_TYPE:postgres} # тип, postgres|kafka
postgres: # параметры подключения к базе. Используется только при type=postgres
connection:
database: ${STORAGE_DATABASE:replication}
schema: ${STORAGE_SCHEMA:public}
host: ${STORAGE_HOST:localhost}
port: ${STORAGE_PORT:5432}
user: ${STORAGE_USER:postgres}
password: ${STORAGE_PASSWORD:postgres}
pool:
max-size: ${STORAGE_POOL_SIZE:30}
Параметры настроек
type- тип, postgres|kafka, например:STORAGE_TYPE:postgres;connection- параметры подключения к базе.
3.8. Секция delta
В секции delta настраивается параметр ожидания перед повторной попыткой открытия дельты при ошибке.
Например:
delta:
# параметр ожидания перед повторной попыткой открытия дельты в случае ошибки
open-delay: ${DELTA_OPEN_DELAY:${scheduler.delta-apply-request-timeout}}s
# количество попыток открытия дельты в случае ошибки
open-attempts: ${DELTA_OPEN_ATTEMPTS:5}
# период проверки открытых дельт
open-check: ${DELTA_OPEN_CHECK:60}s
# количество попыток фиксации дельты в случае ошибки
commit-attempts: ${DELTA_COMMIT_ATTEMPTS:5}
# период ожидания перед повторной попыткой фиксации дельты
commit-error-delay: ${DELTA_COMMIT_DELAY:1}s
# количество попыток отката дельты в случае ошибки
rollback-attempts: ${DELTA_COMMIT_ATTEMPTS:3}
# период ожидания перед повторной попыткой отката дельты
rollback-error-delay: ${DELTA_COMMIT_DELAY:5}s
#enable/disable/period
creating-delta-on-upload-request: ${DELTA_CREATING_MODE:enable}
period: ${CREATING_DELTA_ON_UPLOAD_REQUEST_PERIOD:300}s
Параметры настроек
open-delay- параметр ожидания (в секундах) перед повторной попыткой открытия дельты в случае ошибки, напримерDELTA_OPEN_DELAY:${scheduler.delta-apply-request-timeout};open-attempts- количество попыток открытия дельты в случае ошибки, напримерDELTA_OPEN_ATTEMPTS:5;open-check- период проверки открытых дельт (в секундах), напримерDELTA_OPEN_CHECK:60;commit-attempts- количество попыток фиксации дельты в случае ошибки, напримерDELTA_COMMIT_ATTEMPTS:5;commit-error-delay- период ожидания перед повторной попыткой фиксации дельты (в секундах), напримерDELTA_COMMIT_DELAY:1;rollback-attempts- количество попыток отката дельты в случае ошибки, напримерDELTA_COMMIT_ATTEMPTS:3;rollback-error-delay- период ожидания перед повторной попыткой отката дельты (в секундах), напримерDELTA_COMMIT_DELAY:5;creating-delta-on-upload-request- создание дельты при запросе загрузки, напримерDELTA_CREATING_MODE:enable;period- период создания дельты при запросе загрузки (в секундах), напримерCREATING_DELTA_ON_UPLOAD_REQUEST_PERIOD:300, используется только при mode=llw и creating-delta-on-upload-request=period;
Для параметра creating-delta-on-upload-request доступны значения:
enable- формируются дельты для загружаемых порций данных, данное значение используется для работы подписок;disable- формирование дельт отключено;period- исключается формирование дельт в процессе исполнения команд загрузки данных, вместо этого дельты периодически открываются и закрываются с периодом, указанным в данном параметре. При этом приложение запоминает датамарты по которым завершены инсерты за истекший период. Дельты открываются и закрываются только для датамартов, в которые происходила вставка данных.
Внимание
При потере связи коннектора с Prostore отмены операции не происходит автоматически и модуль MPPW получает ошибку.
После восстановления связи коннектора с Prostore и попытке загрузки данных в эту же таблицу возвращается ошибка о блокировке таблицы предыдущей операции вставки.
Для исправления ошибки Администратор должен вручную выполнить команду resume или erase в Prostore.
3.9. Секция send
В секции send настраиваются ограничения на размер загружаемого файла.
Например:
send:
channel-size: ${SEND_CHANNEL_SIZE:10}
timeout: ${SEND_TIMEOUT:60}
Параметры настроек
channel-size- размер канала на отправку сообщения, напримерSEND_CHANNEL_SIZE:10;timeout- таймаут вычитывания данных из топика (сек), напримерSEND_TIMEOUT:60.
3.10. Секция zookeeper
Секция zookeeper предназначена для настройки параметров подключения к серверу Zookeeper.
Например:
zookeeper:
connection-string: ${ZOOKEEPER_DS_ADDRESS:t5-adsp-01.ru-central1.internal}
connection-timeout-ms: ${ZOOKEEPER_DS_CONNECTION_TIMEOUT_MS:30000}
session-timeout-ms: ${ZOOKEEPER_DS_SESSION_TIMEOUT_MS:86400000}
chroot: ${ZOOKEEPER_DS_CHROOT:/adapter}
Параметры конфигурации
connection-string- подключение к Zookeeper DS, напримерZOOKEEPER_DS_ADDRESS:t5-adsp-01.ru-central1.internal;connection-timeout-ms- Zookeeper DS таймаут подключения, напримерZOOKEEPER_DS_CONNECTION_TIMEOUT_MS:30000;session-timeout-ms- Zookeeper DS таймаут сессии, напримерZOOKEEPER_DS_SESSION_TIMEOUT_MS:86400000;chroot- Zookeeper DS chroot path, напримерZOOKEEPER_DS_CHROOT:/adapter.
3.11. Секция prostore-rest-client
В секции prostore-rest-client реализован блок параметров конфигурирования взаимодействия с Prostore.
Например:
prostore-rest-client:
host: ${PS_HOST:localhost}
port: ${PS_PORT:9195}
http:
max-pool-size: ${PS_MAX_POOL_SIZE:8}
Параметры настроек
host- адрес Prostore, напримерPS_HOST:localhost;port- порт Prostore, напримерPS_PORT:9195;max-pool-size- максимальное число подключений к Prostore, напримерPS_MAX_POOL_SIZE:8.
3.12. Секция prostore
Секция prostore
Например:
prostore:
key:
primary: ${PRIMARY_KEY_NAME:tmp_id}
type: ${PRIMARY_KEY_TYPE:BIGINT}
kafka:
message-limit: ${PS_MESSAGE_LIMIT:1000}
zk-url: ${PS_ZK_KAFKA_URL:localhost:2181}
properties:
bootstrap.servers: ${PS_KAFKA:localhost:9092}
Параметры настроек
primary- название первичного ключа, напримерtmp_id;type- тип первичного ключа, напримерBIGINT;message-limit- лимит сообщений, напримерPS_MESSAGE_LIMIT:1000;zk-url- адрес сервера Zookeeper для загрузки данных в Prostore, напримерPS_ZK_KAFKA_URL:localhost:2181.
3.13. Секция kafka
В секции kafka собраны настройки параметров подключения к шине данных Apache Kafka.
Например:
kafka:
agent.topic.prefix: ${AGENT_TOPIC_PREFIX:}
max-concurrent-handle: ${KAFKA_MAX_CONCURRENT_HANDLE:10}
commit-interval: ${KAFKA_COMMIT_INTERVAL:5s}
internal:
bootstrap.servers: ${PS_KAFKA:localhost:9092}
topic.prefix: ${INTERNAL_TOPIC_PREFIX:${agent.topic.prefix}}
consumer:
tp-request:
topic: ${kafka.internal.topic.prefix}mppw.tp
max-concurrent-handle: ${kafka.max-concurrent-handle}
commit-interval: ${kafka.commit-interval}
property:
bootstrap.servers: ${kafka.internal.bootstrap.servers}
group.id: ${kafka.internal.topic.prefix}podd-adapter-mppw-tp
auto.offset.reset: earliest
enable.auto.commit: false
upload-request:
topic: ${kafka.internal.topic.prefix}mppw.upload.rq
commit-interval: ${kafka.commit-interval}
property:
bootstrap.servers: ${kafka.internal.bootstrap.servers}
group.id: ${kafka.internal.topic.prefix}podd-adapter-mppw-upload
auto.offset.reset: earliest
enable.auto.commit: false
delta-apply-request:
topic: ${kafka.internal.topic.prefix}mppw.delta.in.rq
commit-interval: ${kafka.commit-interval}
property:
bootstrap.servers: ${kafka.internal.bootstrap.servers}
group.id: ${kafka.internal.topic.prefix}podd-adapter-mppw-delta
auto.offset.reset: earliest
enable.auto.commit: false
upload-data:
property:
bootstrap.servers: ${kafka.internal.bootstrap.servers}
group.id: ${kafka.internal.topic.prefix}podd-adapter-mppw-data
auto.offset.reset: earliest
enable.auto.commit: false
producer:
tp-result: ${kafka.internal.topic.prefix}mppw.rs
upload-result: ${kafka.internal.topic.prefix}mppw.upload.rs
delta-apply-result: ${kafka.internal.topic.prefix}mppw.delta.in.rs
property:
bootstrap.servers: ${kafka.internal.bootstrap.servers}
Параметры конфигурации
AGENT_TOPIC_PREFIX- префикс для топиков Агента СМЭВ4, например.
3.14. Секция metrics
Секция metrics предназначена для настройки параметров метрик.
Например:
metrics:
port: ${METRICS_PORT:9843}
Параметры конфигурации
port- порт для метрик, напримерMETRICS_PORT:9843.
3.15. Секция jet-connector
Секция jet-connector предназначена для оптимизации задержек записи данных.
Например:
jet-connector:
use: ${JET_CONNECTOR_USE:false}
Параметры настроек
use- флаг активации jet-connector, например{JET_CONNECTOR_USE:false};
Режим/СУБД |
ADB |
ADP |
ADQM |
ADG |
|---|---|---|---|---|
Чтение |
Нет |
Нет |
Нет |
Нет |
Запись |
В перспективе |
Да |
Нет |
Нет |
Чтобы воспользоваться jet-connector требуется вместо upload external table создавать readable external table, указывающую на топик,
как отражено в документации Prostore
В модуль MPPW не передается информация о том, в какую БД физически будут загружаться данные, синтаксис Prostore един для всех поддерживаемых СУБД. Конкретная СУБД указывается в настройках Prostore.
Даже если загрузка данных выполняется в более чем одну базу, на работе адаптеров это не сказывается.
При формировании запросов в табличными параметрами, в том числе при регистрации РЗ, необходимо явно перечислять поля таблиц, по которым будет выполняться фильтрация записей или объединение таблиц.
Переключение с jet-connector на kafka postgres writer и наоборот допускается при завершенных операциях загрузки данных.
Предупреждение
jet-connector в настоящее время применим для ADP, что делает его применимым, только для иснталляций одной единственной СУБД ADP. Это ограничение остается на уровне документации при использовании jet-connector с другими базами должна появиться ошибка.