2. Конфигурация модуля СМЭВ4-адаптер - Модуль подписок (application.yml)
Файл application.yml – основной конфигурационный файл модуля, в котором задана его логика и порядок работы модуля:
настройка подключения к Prostore (секция:
prostore);подключение к Брокеру сообщений Kafka, Zookeeper;
порядок обработки запросов между Получателем и Поставщиком данных (секция:
kafka);настройка метрик (секция:
metrics) и другие настройки необходимые для корректной работы адаптера.
2.1. Пример файла application.yml
В конфигурационном файле следует задавать только те настройки, которые необходимы для решения текущих бизнес-задач.
environment:
name: ${ENVIRONMENT_NAME:test}
http-server:
port: ${HTTP_PORT:8085}
executor:
reader-pool-size: ${EXECUTOR_READER_POOL_SIZE:20}
zookeeper:
connection-string: ${ZOOKEEPER_DS_ADDRESS:localhost}
connection-timeout-ms: ${ZOOKEEPER_DS_CONNECTION_TIMEOUT_MS:30000}
session-timeout-ms: ${ZOOKEEPER_DS_SESSION_TIMEOUT_MS:86400000}
chroot: ${ZOOKEEPER_DS_CHROOT:/adapter}
migration:
enabled: ${MIGRATION_ENABLE:false}
old-connection-string: ${OLD_ZOOKEEPER_DS_ADDRESS:localhost}
table-metadata:
cache:
enabled: false
prostore-rest-client:
host: ${PS_HOST:localhost}
port: ${PS_PORT:9195}
http:
max-pool-size: ${PS_MAX_POOL_SIZE:8}
prostore:
status-event-topic:
topic: ${PS_STATUS_EVENT_TOPIC:status.event}
property:
bootstrap.servers: ${kafka.internal.bootstrap.servers}
group.id: ${kafka.external.topic.prefix}replicator-status-event
auto.offset.reset: earliest
enable.auto.commit: false
subscription:
consumer:
# режим отмены подписки на потребителе. Возможные значения rename, drop, none
cancel-mode: none
# Продолжительность исключения подписки в случае ошибки применения
exclusion-duration: 24h
# Периодичность срабатывания механизма очистки для ошибочных подписок
cleanup-interval: 5s
replication:
max-deltas-in-batch: 10 # максимальное число дельт в выгружаемой пачке
# Массив описания standalone таблиц, участвующих в репликации
#standalone-tables: []
# Пример описания
#standalone-tables:
# - table: "misdm05.readable_book"
# anchor: "update_at"
# soft-delete: "delete_at"
storage: # блок настроек хранения чанков данных репликации. При изменении параметров необходимо синхронизировать аналогичные параметры в сервисе mppw
type: ${STORAGE_TYPE:postgres} # тип, postgres|kafka
postgres: # параметры подключения к базе. Используется только при type=postgres
connection:
database: ${STORAGE_DATABASE:replication}
schema: ${STORAGE_SCHEMA:public}
host: ${STORAGE_HOST:localhost}
port: ${STORAGE_PORT:5432}
user: ${STORAGE_USER:postgres}
password: ${STORAGE_PASSWORD:postgres}
pool:
max-size: ${STORAGE_POOL_SIZE:30}
kafka:
agent.topic.prefix: ${AGENT_TOPIC_PREFIX:}
max-concurrent-handle: ${KAFKA_MAX_CONCURRENT_HANDLE:10}
commit-interval: ${KAFKA_COMMIT_INTERVAL:5s}
external:
bootstrap.servers: ${KAFKA_BOOTSTRAP_SERVERS:localhost:9092}
topic.prefix: ${EXTERNAL_TOPIC_PREFIX:${kafka.agent.topic.prefix}}
internal:
bootstrap.servers: ${PS_KAFKA:localhost:9092}
topic.prefix: ${INTERNAL_TOPIC_PREFIX:${kafka.agent.topic.prefix}}
consumer:
subscription-request:
topic: ${kafka.external.topic.prefix}replication.rq
max-concurrent-handle: ${kafka.max-concurrent-handle}
commit-interval: ${kafka.commit-interval}
property:
bootstrap.servers: ${kafka.external.bootstrap.servers}
group.id: ${kafka.external.topic.prefix}replicator-subscription-request
auto.offset.reset: earliest
enable.auto.commit: false
subscription-cancel-request:
topic: ${kafka.external.topic.prefix}replication.cancel.rq
max-concurrent-handle: ${kafka.max-concurrent-handle}
commit-interval: ${kafka.commit-interval}
property:
bootstrap.servers: ${kafka.external.bootstrap.servers}
group.id: ${kafka.external.topic.prefix}replicator-subscription-cancel-request
auto.offset.reset: earliest
enable.auto.commit: false
subscription-consumer-cancel-request:
topic: ${kafka.external.topic.prefix}replication.cancel.in.rq
max-concurrent-handle: ${kafka.max-concurrent-handle}
commit-interval: ${kafka.commit-interval}
property:
bootstrap.servers: ${kafka.external.bootstrap.servers}
group.id: ${kafka.external.topic.prefix}}replicator-subscription-consumer-cancel-request
auto.offset.reset: earliest
enable.auto.commit: false
subscription-storage-request:
topic: ${kafka.external.topic.prefix}replication.in.rq
max-concurrent-handle: ${kafka.max-concurrent-handle}
commit-interval: ${kafka.commit-interval}
property:
bootstrap.servers: ${kafka.external.bootstrap.servers}
group.id: ${kafka.external.topic.prefix}replicator-subscription-storage-request
auto.offset.reset: earliest
enable.auto.commit: false
delta-request:
topic: ${kafka.external.topic.prefix}delta.rq
max-concurrent-handle: ${kafka.max-concurrent-handle}
commit-interval: ${kafka.commit-interval}
property:
bootstrap.servers: ${kafka.external.bootstrap.servers}
group.id: ${kafka.external.topic.prefix}replicator-delta-request
auto.offset.reset: earliest
enable.auto.commit: false
delta-apply-notification:
topic: ${kafka.internal.topic.prefix}subscription.in
max-concurrent-handle: ${kafka.max-concurrent-handle}
commit-interval: ${kafka.commit-interval}
property:
bootstrap.servers: ${kafka.internal.bootstrap.servers}
group.id: ${kafka.internal.topic.prefix}replicator-delta-apply-notification
auto.offset.reset: earliest
enable.auto.commit: false
mppw-delta-apply-result:
topic: ${kafka.internal.topic.prefix}mppw.delta.in.rs
max-concurrent-handle: ${kafka.max-concurrent-handle}
commit-interval: ${kafka.commit-interval}
property:
bootstrap.servers: ${kafka.internal.bootstrap.servers}
group.id: ${kafka.internal.topic.prefix}replicator-delta-apply-result
auto.offset.reset: earliest
enable.auto.commit: false
producer:
subscription-result: ${kafka.external.topic.prefix}replication.rs
subscription-error: ${kafka.external.topic.prefix}replication.err
subscription-cancel-error: ${kafka.external.topic.prefix}replication.cancel.rs
subscription-cancel-result: ${kafka.external.topic.prefix}replication.cancel.rs
subscription-consumer-cancel-result: ${kafka.external.topic.prefix}replication.cancel.in.rs
subscription-storage-result: ${kafka.external.topic.prefix}replication.in.rs
subscription-storage-error: ${kafka.external.topic.prefix}replication.in.err
delta-error: ${kafka.external.topic.prefix}delta.err
delta-apply-error: ${kafka.external.topic.prefix}delta.in.err
delta-apply-result: ${kafka.external.topic.prefix}delta.in.rs
delta-notification: ${kafka.external.topic.prefix}delta.notification
property:
bootstrap.servers: ${kafka.external.bootstrap.servers}
internal:
mppr-delta-request: ${kafka.internal.topic.prefix}mppr.delta.rq
mppw-delta-apply-request: ${kafka.internal.topic.prefix}mppw.delta.in.rq
property:
bootstrap.servers: ${kafka.internal.bootstrap.servers}
metrics:
port: ${METRICS_PORT:9837}
log:
replRequest: ${REPL_REQUEST_LOG_ENABLED:false}
replResponse: ${REPL_RESPONSE_LOG_ENABLED:false}
backup:
zk-path: ${REPLICATOR_BACKUP_ZK_PATH:/${environment.name}/podd-adapter-replicator}
commandTopic: ${BACKUP_COMMAND_TOPIC:adapter.command}
adapterCommandBroadcast: ${REPLICATOR_COMMAND_BROADCAST_TOPIC:adapter.command.broadcast}
backupTopic: ${BACKUP_TOPIC:adapter.backup}
statusTopic: ${STATUS_TOPIC:adapter.status}
timeout: ${BACKUP_TIMEOUT:PT180s}
idleDelay: ${BACKUP_DELAY:500}
kafka:
consumer:
property:
bootstrap.servers: ${kafka.internal.bootstrap.servers}
group.id: ${REPLICATOR_BACKUP_GROUP_ID:podd_adapter_replicator_adapter_command}
auto.offset.reset: latest
producer:
property:
bootstrap.servers: ${kafka.internal.bootstrap.servers}
3. Параметры конфигурации
Настройка конфигурации СМЭВ4-адаптера - Модуль подписок осуществляется путем редактирования параметров настроек в
файле application.yml, где настраиваются секции:
environment- указывается название окружения (test,prodи т.д.);http-server- настройки порта подключения;executor- масштабирования нагрузки на модуль;zookeeper– параметры подключения к Zookeeper;migration- настройки миграции;prostore-api-client- блок параметров конфигурирования взаимодействия с Prostore;subscription- настройки подписки;replication- настройки репликации;storage- блок настроек хранения чанков данных репликации. При изменении параметров необходимо синхронизировать аналогичные параметры в сервисе MPPW.kafka- настройки параметров подключения к шине данных Apache Kafka;log- настройка сохранения лог-файла;metrics- настройка получения метрик;backup- настройки бекапирования.
3.1. Секция environment
В секции environment указывается среда разработки (dev, test, stable, prod).
Например:
environment:
name: ${ENVIRONMENT_NAME:test}
Параметры настроек
name- среда разработки, напримерENVIRONMENT_NAME:test.
3.2. Секция http-server
Секция http-server предназначена для настройки порта и протокола передачи данных (одно из значений http или https).
Например:
http-server:
port: ${HTTP_PORT:8085}
Параметры настроек
port- порт веб-сервера, например:HTTP_PORT:8085.
3.3. Секция executor
Секция executor предназначена для масштабирования нагрузки (увеличения / уменьшения) на модуль.
Например:
executor:
reader-pool-size: ${EXECUTOR_READER_POOL_SIZE:20}
Параметры настроек
reader-pool-size- размер пула для чтения Kafka, напримерEXECUTOR_READER_POOL_SIZE:20.
3.4. Секция zookeeper
Секция zookeeper предназначена для настройки параметров подключения к Zookeeper.
Например:
zookeeper:
connection-string: ${ZOOKEEPER_DS_ADDRESS:t5-adsp-01.ru-central1.internal}
connection-timeout-ms: ${ZOOKEEPER_DS_CONNECTION_TIMEOUT_MS:30000}
session-timeout-ms: ${ZOOKEEPER_DS_SESSION_TIMEOUT_MS:86400000}
chroot: ${ZOOKEEPER_DS_CHROOT:/adapter}
Параметры настроек
ZOOKEEPER_DS_ADDRESS- адрес сервера Zookeeper DS;ZOOKEEPER_DS_SESSION_TIMEOUT_MS- таймаут подключения к Zookeeper DS,максимальное время ожидания для выявления сбоев потребителей, указывается в миллисекундах (MS.);ZOOKEEPER_DS_SESSION_TIMEOUT_MS- таймаут сессии, максимальное время ожидания подключения к Zookeeper. Если ответ не получен до истечения установленного значения, клиент повторно отправляет запрос при необходимости, указывается в миллисекундах (MS.);ZOOKEEPER_DS_CHROOT- Zookeeper DS chroot path.
3.5. Секция migration
Секция migration реализована настройка миграции Zookeeper для задачи бекапирования.
Например:
migration:
enabled: ${MIGRATION_ENABLE:false}
old-connection-string: ${OLD_ZOOKEEPER_DS_ADDRESS:localhost}
Параметры настроек
enabled- подключение миграции, например{MIGRATION_ENABLE:false};old-connection-string- адрес Zookeeper, например{OLD_ZOOKEEPER_DS_ADDRESS:localhost}.
3.6. Секция prostore-rest-client
В секции prostore-rest-client настраиваются параметры конфигурирования взаимодействия с Prostore.
Например:
prostore-rest-client:
host: ${PS_HOST:t5-prostore-01.ru-central1.internal}
port: ${PS_PORT:9195}
http:
max-pool-size: ${PS_MAX_POOL_SIZE:8}
Параметры настроек
host- адрес Prostore, напримерPS_HOST:t5-prostore-01.ru-central1.internal;port- порт Prostore, напримерPS_PORT:9195;max-pool-size- максимальное число подключений к Prostore, напримерPS_MAX_POOL_SIZE:8.
3.7. Секция subscription
В секции subscription настраиваются подписки на потребителе.
Например
subscription:
consumer:
# режим отмены подписки на потребителе. Возможные значения rename, drop, none
cancel-mode: none
# Продолжительность исключения подписки в случае ошибки применения
exclusion-duration: 24h
# Периодичность срабатывания механизма очистки для ошибочных подписок
cleanup-interval: 5s
Параметры конфигурации
cancel-mode- режим отмены подписки на потребителе. Возможные значения:rename,drop,none;exclusion-duration- продолжительность исключения подписки в случае ошибки применения (в часах), напримерexclusion-duration: 24h;cleanup-interval- периодичность срабатывания механизма очистки для ошибочных подписок (в секундах), напримерcleanup-interval: 5s.
3.8. Секция replication
В секции subscription указываются настройки репликации.
Например
replication:
max-deltas-in-batch: 10
Параметры конфигурации
max-deltas-in-batch- максимальное число дельт в выгружаемой пачке, напримерmax-deltas-in-batch: 10.
3.9. Секция storage
В секции storage указываются настройки хранения чанков данных репликации.
ри изменении параметров необходимо синхронизировать аналогичные параметры в сервисе MPPW.
Например:
storage:
type: ${STORAGE_TYPE:postgres}
postgres:
connection:
database: ${STORAGE_DATABASE:replication}
schema: ${STORAGE_SCHEMA:public}
host: ${STORAGE_HOST:localhost}
port: ${STORAGE_PORT:5432}
user: ${STORAGE_USER:postgres}
password: ${STORAGE_PASSWORD:postgres}
pool:
max-size: ${STORAGE_POOL_SIZE:30}
Параметры настроек
type- тип, postgres|kafka, например:STORAGE_TYPE:postgres;connection- параметры подключения к базе.
3.10. Секция kafka
В секции kafka настраиваются параметры подключения к шине данных Apache Kafka (используется для взаимодействия с СМЭВ4-адаптером)
и настройки взаимодействия через топики модуля СМЭВ4-адаптер - Модуль исполнения запросов.
Модуль взаимодействует через топики:
replication.rq/rs/err- запрос создания подписки (Поставщик данных);replication.cancel.rq/rs/err- запрос отмены подписки (Поставщик данных);delta.rq/mppr.delta.rq- запрос дельты (Поставщик данных);replication.in.rq/rs/err- запрос создания структуры по подписке (Получатель данных);subscription.in/delta.in.rs/delta.in.err- запрос применения дельты (Получатель данных);status.event/delta.notification- статусы с Prostore (Поставщик данных).
Например:
kafka:
agent.topic.prefix: ${AGENT_TOPIC_PREFIX:}
max-concurrent-handle: ${KAFKA_MAX_CONCURRENT_HANDLE:10}
commit-interval: ${KAFKA_COMMIT_INTERVAL:5s}
external:
bootstrap.servers: ${KAFKA_BOOTSTRAP_SERVERS:localhost:9092}
topic.prefix: ${EXTERNAL_TOPIC_PREFIX:${kafka.agent.topic.prefix}}
internal:
bootstrap.servers: ${PS_KAFKA:localhost:9092}
topic.prefix: ${INTERNAL_TOPIC_PREFIX:${kafka.agent.topic.prefix}}
consumer:
subscription-request:
topic: ${kafka.external.topic.prefix}replication.rq
max-concurrent-handle: ${kafka.max-concurrent-handle}
commit-interval: ${kafka.commit-interval}
property:
bootstrap.servers: ${kafka.external.bootstrap.servers}
group.id: ${kafka.external.topic.prefix}replicator-subscription-request
auto.offset.reset: earliest
enable.auto.commit: false
subscription-cancel-request:
topic: ${kafka.external.topic.prefix}replication.cancel.rq
max-concurrent-handle: ${kafka.max-concurrent-handle}
commit-interval: ${kafka.commit-interval}
property:
bootstrap.servers: ${kafka.external.bootstrap.servers}
group.id: ${kafka.external.topic.prefix}replicator-subscription-cancel-request
auto.offset.reset: earliest
enable.auto.commit: false
subscription-consumer-cancel-request:
topic: ${kafka.external.topic.prefix}replication.cancel.in.rq
max-concurrent-handle: ${kafka.max-concurrent-handle}
commit-interval: ${kafka.commit-interval}
property:
bootstrap.servers: ${kafka.external.bootstrap.servers}
group.id: ${kafka.external.topic.prefix}}replicator-subscription-consumer-cancel-request
auto.offset.reset: earliest
enable.auto.commit: false
subscription-storage-request:
topic: ${kafka.external.topic.prefix}replication.in.rq
max-concurrent-handle: ${kafka.max-concurrent-handle}
commit-interval: ${kafka.commit-interval}
property:
bootstrap.servers: ${kafka.external.bootstrap.servers}
group.id: ${kafka.external.topic.prefix}replicator-subscription-storage-request
auto.offset.reset: earliest
enable.auto.commit: false
delta-request:
topic: ${kafka.external.topic.prefix}delta.rq
max-concurrent-handle: ${kafka.max-concurrent-handle}
commit-interval: ${kafka.commit-interval}
property:
bootstrap.servers: ${kafka.external.bootstrap.servers}
group.id: ${kafka.external.topic.prefix}replicator-delta-request
auto.offset.reset: earliest
enable.auto.commit: false
delta-apply-notification:
topic: ${kafka.internal.topic.prefix}subscription.in
max-concurrent-handle: ${kafka.max-concurrent-handle}
commit-interval: ${kafka.commit-interval}
property:
bootstrap.servers: ${kafka.internal.bootstrap.servers}
group.id: ${kafka.internal.topic.prefix}replicator-delta-apply-notification
auto.offset.reset: earliest
enable.auto.commit: false
mppw-delta-apply-result:
topic: ${kafka.internal.topic.prefix}mppw.delta.in.rs
max-concurrent-handle: ${kafka.max-concurrent-handle}
commit-interval: ${kafka.commit-interval}
property:
bootstrap.servers: ${kafka.internal.bootstrap.servers}
group.id: ${kafka.internal.topic.prefix}replicator-delta-apply-result
auto.offset.reset: earliest
enable.auto.commit: false
producer:
subscription-result: ${kafka.external.topic.prefix}replication.rs
subscription-error: ${kafka.external.topic.prefix}replication.err
subscription-cancel-error: ${kafka.external.topic.prefix}replication.cancel.rs
subscription-cancel-result: ${kafka.external.topic.prefix}replication.cancel.rs
subscription-consumer-cancel-result: ${kafka.external.topic.prefix}replication.cancel.in.rs
subscription-storage-result: ${kafka.external.topic.prefix}replication.in.rs
subscription-storage-error: ${kafka.external.topic.prefix}replication.in.err
delta-error: ${kafka.external.topic.prefix}delta.err
delta-apply-error: ${kafka.external.topic.prefix}delta.in.err
delta-apply-result: ${kafka.external.topic.prefix}delta.in.rs
delta-notification: ${kafka.external.topic.prefix}delta.notification
property:
bootstrap.servers: ${kafka.external.bootstrap.servers}
internal:
mppr-delta-request: ${kafka.internal.topic.prefix}mppr.delta.rq
mppw-delta-apply-request: ${kafka.internal.topic.prefix}mppw.delta.in.rq
property:
bootstrap.servers: ${kafka.internal.bootstrap.servers}
Параметры конфигурации
AGENT_TOPIC_PREFIX- значение префикса для топиков. Топики взаимодействия с СМЭВ4-адаптером - Модуль исполнения запросов
3.11. Секция metrics
В секции metrics настраиваются параметры метрик.
Например:
metrics:
port: ${METRICS_PORT:9837
Параметры конфигурации
port- порт для получения метрик, например9837.
3.12. Секция log
В секции log настраиваются параметры логирования.
Например:
log:
replRequest: ${REPL_REQUEST_LOG_ENABLED:false}
replResponse: ${REPL_RESPONSE_LOG_ENABLED:false}
Параметры конфигурации
repl-request- журналировать запросы к модулю подписок, напримерREPL_REQUEST_LOG_ENABLED:false;repl-response- журналировать ответы модуля подписок, напримерREPL_RESPONSE_LOG_ENABLED:false.
3.13. Секция backup
В секции backup настраиваются параметры бекапирования модуля.
Например:
backup:
zk-path: ${REPLICATOR_BACKUP_ZK_PATH:/${environment.name}/podd-adapter-replicator}
commandTopic: ${BACKUP_COMMAND_TOPIC:adapter.command}
adapterCommandBroadcast: ${REPLICATOR_COMMAND_BROADCAST_TOPIC:adapter.command.broadcast}
backupTopic: ${BACKUP_TOPIC:adapter.backup}
statusTopic: ${STATUS_TOPIC:adapter.status}
timeout: ${BACKUP_TIMEOUT:PT180s}
idleDelay: ${BACKUP_DELAY:500}
kafka:
consumer:
property:
bootstrap.servers: ${kafka.internal.bootstrap.servers}
group.id: ${REPLICATOR_BACKUP_GROUP_ID:podd_adapter_replicator_adapter_command}
auto.offset.reset: latest
producer:
property:
bootstrap.servers: ${kafka.internal.bootstrap.servers}
Параметры настроек
zk-path- путь к корневой ноде Zookeeper для бэкапирования, например{COUNTER_BACKUP_ZK_PATH:/${environment.name}/counter-provider/counters};commandTopic- топик команд бэкапирования, например:{BACKUP_COMMAND_TOPIC:adapter.command};backupTopic- топик для отправки сохраненных данных, например:{BACKUP_TOPIC:adapter.backup};statusTopic- топик для отправки статусов бэкапирования, например:{STATUS_TOPIC:adapter.status}.