2. Конфигурация модуля СМЭВ4-адаптер - Модуль подписок (application.yml)

Файл application.yml – основной конфигурационный файл модуля, в котором задана его логика и порядок работы модуля:

  • настройка подключения к Prostore (секция: prostore);

  • подключение к Брокеру сообщений Kafka, Zookeeper;

  • порядок обработки запросов между Получателем и Поставщиком данных (секция: kafka);

  • настройка метрик (секция: metrics) и другие настройки необходимые для корректной работы адаптера.

2.1. Пример файла application.yml

В конфигурационном файле следует задавать только те настройки, которые необходимы для решения текущих бизнес-задач.

environment:
  name: ${ENVIRONMENT_NAME:test}

http-server:
  port: ${HTTP_PORT:8085}

executor:
  reader-pool-size: ${EXECUTOR_READER_POOL_SIZE:20}

zookeeper:
  connection-string: ${ZOOKEEPER_DS_ADDRESS:localhost}
  connection-timeout-ms: ${ZOOKEEPER_DS_CONNECTION_TIMEOUT_MS:30000}
  session-timeout-ms: ${ZOOKEEPER_DS_SESSION_TIMEOUT_MS:86400000}
  chroot: ${ZOOKEEPER_DS_CHROOT:/adapter}

migration:
  enabled: ${MIGRATION_ENABLE:false}
  old-connection-string: ${OLD_ZOOKEEPER_DS_ADDRESS:localhost}

table-metadata:
  cache:
    enabled: false

prostore-rest-client:
  host: ${PS_HOST:localhost}
  port: ${PS_PORT:9195}
  http:
    max-pool-size: ${PS_MAX_POOL_SIZE:8}

prostore:
  status-event-topic:
    topic: ${PS_STATUS_EVENT_TOPIC:status.event}
    property:
      bootstrap.servers: ${kafka.internal.bootstrap.servers}
      group.id: ${kafka.external.topic.prefix}replicator-status-event
      auto.offset.reset: earliest
      enable.auto.commit: false

subscription:
  consumer:
    # режим отмены подписки на потребителе. Возможные значения rename, drop, none
    cancel-mode: none
    # Продолжительность исключения подписки в случае ошибки применения
    exclusion-duration: 24h
    # Периодичность срабатывания механизма очистки для ошибочных подписок
    cleanup-interval: 5s

replication:
  max-deltas-in-batch: 10 # максимальное число дельт в выгружаемой пачке

# Массив описания standalone таблиц, участвующих в репликации
#standalone-tables: []
# Пример описания
#standalone-tables:
#  - table: "misdm05.readable_book"
#    anchor: "update_at"
#    soft-delete: "delete_at"

storage: # блок настроек хранения чанков данных репликации. При изменении параметров необходимо синхронизировать аналогичные параметры в сервисе mppw
  type: ${STORAGE_TYPE:postgres} # тип, postgres|kafka
  postgres: # параметры подключения к базе. Используется только при type=postgres
    connection:
      database: ${STORAGE_DATABASE:replication}
      schema: ${STORAGE_SCHEMA:public}
      host: ${STORAGE_HOST:localhost}
      port: ${STORAGE_PORT:5432}
      user: ${STORAGE_USER:postgres}
      password: ${STORAGE_PASSWORD:postgres}
    pool:
      max-size: ${STORAGE_POOL_SIZE:30}


kafka:
  agent.topic.prefix: ${AGENT_TOPIC_PREFIX:}
  max-concurrent-handle: ${KAFKA_MAX_CONCURRENT_HANDLE:10}
  commit-interval: ${KAFKA_COMMIT_INTERVAL:5s}
  external:
    bootstrap.servers: ${KAFKA_BOOTSTRAP_SERVERS:localhost:9092}
    topic.prefix: ${EXTERNAL_TOPIC_PREFIX:${kafka.agent.topic.prefix}}
  internal:
    bootstrap.servers: ${PS_KAFKA:localhost:9092}
    topic.prefix: ${INTERNAL_TOPIC_PREFIX:${kafka.agent.topic.prefix}}
  consumer:
    subscription-request:
      topic: ${kafka.external.topic.prefix}replication.rq
      max-concurrent-handle: ${kafka.max-concurrent-handle}
      commit-interval: ${kafka.commit-interval}
      property:
        bootstrap.servers: ${kafka.external.bootstrap.servers}
        group.id: ${kafka.external.topic.prefix}replicator-subscription-request
        auto.offset.reset: earliest
        enable.auto.commit: false
    subscription-cancel-request:
      topic: ${kafka.external.topic.prefix}replication.cancel.rq
      max-concurrent-handle: ${kafka.max-concurrent-handle}
      commit-interval: ${kafka.commit-interval}
      property:
        bootstrap.servers: ${kafka.external.bootstrap.servers}
        group.id: ${kafka.external.topic.prefix}replicator-subscription-cancel-request
        auto.offset.reset: earliest
        enable.auto.commit: false
    subscription-consumer-cancel-request:
      topic: ${kafka.external.topic.prefix}replication.cancel.in.rq
      max-concurrent-handle: ${kafka.max-concurrent-handle}
      commit-interval: ${kafka.commit-interval}
      property:
        bootstrap.servers: ${kafka.external.bootstrap.servers}
        group.id: ${kafka.external.topic.prefix}}replicator-subscription-consumer-cancel-request
        auto.offset.reset: earliest
        enable.auto.commit: false
    subscription-storage-request:
      topic: ${kafka.external.topic.prefix}replication.in.rq
      max-concurrent-handle: ${kafka.max-concurrent-handle}
      commit-interval: ${kafka.commit-interval}
      property:
        bootstrap.servers: ${kafka.external.bootstrap.servers}
        group.id: ${kafka.external.topic.prefix}replicator-subscription-storage-request
        auto.offset.reset: earliest
        enable.auto.commit: false
    delta-request:
      topic: ${kafka.external.topic.prefix}delta.rq
      max-concurrent-handle: ${kafka.max-concurrent-handle}
      commit-interval: ${kafka.commit-interval}
      property:
        bootstrap.servers: ${kafka.external.bootstrap.servers}
        group.id: ${kafka.external.topic.prefix}replicator-delta-request
        auto.offset.reset: earliest
        enable.auto.commit: false
    delta-apply-notification:
      topic: ${kafka.internal.topic.prefix}subscription.in
      max-concurrent-handle: ${kafka.max-concurrent-handle}
      commit-interval: ${kafka.commit-interval}
      property:
        bootstrap.servers: ${kafka.internal.bootstrap.servers}
        group.id: ${kafka.internal.topic.prefix}replicator-delta-apply-notification
        auto.offset.reset: earliest
        enable.auto.commit: false
    mppw-delta-apply-result:
      topic: ${kafka.internal.topic.prefix}mppw.delta.in.rs
      max-concurrent-handle: ${kafka.max-concurrent-handle}
      commit-interval: ${kafka.commit-interval}
      property:
        bootstrap.servers: ${kafka.internal.bootstrap.servers}
        group.id: ${kafka.internal.topic.prefix}replicator-delta-apply-result
        auto.offset.reset: earliest
        enable.auto.commit: false

  producer:
    subscription-result: ${kafka.external.topic.prefix}replication.rs
    subscription-error: ${kafka.external.topic.prefix}replication.err
    subscription-cancel-error: ${kafka.external.topic.prefix}replication.cancel.rs
    subscription-cancel-result: ${kafka.external.topic.prefix}replication.cancel.rs
    subscription-consumer-cancel-result: ${kafka.external.topic.prefix}replication.cancel.in.rs
    subscription-storage-result: ${kafka.external.topic.prefix}replication.in.rs
    subscription-storage-error: ${kafka.external.topic.prefix}replication.in.err
    delta-error: ${kafka.external.topic.prefix}delta.err
    delta-apply-error: ${kafka.external.topic.prefix}delta.in.err
    delta-apply-result: ${kafka.external.topic.prefix}delta.in.rs
    delta-notification: ${kafka.external.topic.prefix}delta.notification
    property:
      bootstrap.servers: ${kafka.external.bootstrap.servers}
    internal:
      mppr-delta-request: ${kafka.internal.topic.prefix}mppr.delta.rq
      mppw-delta-apply-request: ${kafka.internal.topic.prefix}mppw.delta.in.rq
      property:
        bootstrap.servers: ${kafka.internal.bootstrap.servers}

metrics:
  port: ${METRICS_PORT:9837}

log:
  replRequest: ${REPL_REQUEST_LOG_ENABLED:false}
  replResponse: ${REPL_RESPONSE_LOG_ENABLED:false}

backup:
  zk-path: ${REPLICATOR_BACKUP_ZK_PATH:/${environment.name}/podd-adapter-replicator}
  commandTopic: ${BACKUP_COMMAND_TOPIC:adapter.command}
  adapterCommandBroadcast: ${REPLICATOR_COMMAND_BROADCAST_TOPIC:adapter.command.broadcast}
  backupTopic: ${BACKUP_TOPIC:adapter.backup}
  statusTopic: ${STATUS_TOPIC:adapter.status}
  timeout: ${BACKUP_TIMEOUT:PT180s}
  idleDelay: ${BACKUP_DELAY:500}
  kafka:
    consumer:
      property:
        bootstrap.servers: ${kafka.internal.bootstrap.servers}
        group.id: ${REPLICATOR_BACKUP_GROUP_ID:podd_adapter_replicator_adapter_command}
        auto.offset.reset: latest
    producer:
      property:
        bootstrap.servers: ${kafka.internal.bootstrap.servers}

3. Параметры конфигурации

Настройка конфигурации СМЭВ4-адаптера - Модуль подписок осуществляется путем редактирования параметров настроек в файле application.yml, где настраиваются секции:

  • environment - указывается название окружения (test, prod и т.д.);

  • http-server - настройки порта подключения;

  • executor - масштабирования нагрузки на модуль;

  • zookeeper – параметры подключения к Zookeeper;

  • migration - настройки миграции;

  • prostore-api-client - блок параметров конфигурирования взаимодействия с Prostore;

  • subscription - настройки подписки;

  • replication - настройки репликации;

  • storage - блок настроек хранения чанков данных репликации. При изменении параметров необходимо синхронизировать аналогичные параметры в сервисе MPPW.

  • kafka - настройки параметров подключения к шине данных Apache Kafka;

  • log - настройка сохранения лог-файла;

  • metrics - настройка получения метрик;

  • backup - настройки бекапирования.

3.1. Секция environment

В секции environment указывается среда разработки (dev, test, stable, prod).

Например:

environment:
  name: ${ENVIRONMENT_NAME:test}

Параметры настроек

  • name - среда разработки, например ENVIRONMENT_NAME:test.

3.2. Секция http-server

Секция http-server предназначена для настройки порта и протокола передачи данных (одно из значений http или https).

Например:

http-server:
  port: ${HTTP_PORT:8085}

Параметры настроек

  • port - порт веб-сервера, например: HTTP_PORT:8085.

3.3. Секция executor

Секция executor предназначена для масштабирования нагрузки (увеличения / уменьшения) на модуль.

Например:

executor:
  reader-pool-size: ${EXECUTOR_READER_POOL_SIZE:20}

Параметры настроек

  • reader-pool-size - размер пула для чтения Kafka, например EXECUTOR_READER_POOL_SIZE:20.

3.4. Секция zookeeper

Секция zookeeper предназначена для настройки параметров подключения к Zookeeper.

Например:

zookeeper:
  connection-string: ${ZOOKEEPER_DS_ADDRESS:t5-adsp-01.ru-central1.internal}
  connection-timeout-ms: ${ZOOKEEPER_DS_CONNECTION_TIMEOUT_MS:30000}
  session-timeout-ms: ${ZOOKEEPER_DS_SESSION_TIMEOUT_MS:86400000}
  chroot: ${ZOOKEEPER_DS_CHROOT:/adapter}

Параметры настроек

  • ZOOKEEPER_DS_ADDRESS - адрес сервера Zookeeper DS;

  • ZOOKEEPER_DS_SESSION_TIMEOUT_MS - таймаут подключения к Zookeeper DS,максимальное время ожидания для выявления сбоев потребителей, указывается в миллисекундах (MS.);

  • ZOOKEEPER_DS_SESSION_TIMEOUT_MS - таймаут сессии, максимальное время ожидания подключения к Zookeeper. Если ответ не получен до истечения установленного значения, клиент повторно отправляет запрос при необходимости, указывается в миллисекундах (MS.);

  • ZOOKEEPER_DS_CHROOT - Zookeeper DS chroot path.

3.5. Секция migration

Секция migration реализована настройка миграции Zookeeper для задачи бекапирования.

Например:

migration:
  enabled: ${MIGRATION_ENABLE:false}
  old-connection-string: ${OLD_ZOOKEEPER_DS_ADDRESS:localhost}

Параметры настроек

  • enabled - подключение миграции, например {MIGRATION_ENABLE:false};

  • old-connection-string - адрес Zookeeper, например {OLD_ZOOKEEPER_DS_ADDRESS:localhost}.

3.6. Секция prostore-rest-client

В секции prostore-rest-client настраиваются параметры конфигурирования взаимодействия с Prostore.

Например:

prostore-rest-client:
  host: ${PS_HOST:t5-prostore-01.ru-central1.internal}
  port: ${PS_PORT:9195}
  http:
    max-pool-size: ${PS_MAX_POOL_SIZE:8}

Параметры настроек

  • host - адрес Prostore, например PS_HOST:t5-prostore-01.ru-central1.internal;

  • port - порт Prostore, например PS_PORT:9195;

  • max-pool-size - максимальное число подключений к Prostore, например PS_MAX_POOL_SIZE:8.

3.7. Секция subscription

В секции subscription настраиваются подписки на потребителе.

Например

subscription:
  consumer:
    # режим отмены подписки на потребителе. Возможные значения rename, drop, none
    cancel-mode: none
    # Продолжительность исключения подписки в случае ошибки применения
    exclusion-duration: 24h
    # Периодичность срабатывания механизма очистки для ошибочных подписок
    cleanup-interval: 5s

Параметры конфигурации

  • cancel-mode - режим отмены подписки на потребителе. Возможные значения: rename, drop, none;

  • exclusion-duration - продолжительность исключения подписки в случае ошибки применения (в часах), например exclusion-duration: 24h;

  • cleanup-interval - периодичность срабатывания механизма очистки для ошибочных подписок (в секундах), например cleanup-interval: 5s.

3.8. Секция replication

В секции subscription указываются настройки репликации.

Например

replication:
  max-deltas-in-batch: 10

Параметры конфигурации

  • max-deltas-in-batch - максимальное число дельт в выгружаемой пачке, например max-deltas-in-batch: 10.

3.9. Секция storage

В секции storage указываются настройки хранения чанков данных репликации.

ри изменении параметров необходимо синхронизировать аналогичные параметры в сервисе MPPW.

Например:

storage:
  type: ${STORAGE_TYPE:postgres}
  postgres:
    connection:
      database: ${STORAGE_DATABASE:replication}
      schema: ${STORAGE_SCHEMA:public}
      host: ${STORAGE_HOST:localhost}
      port: ${STORAGE_PORT:5432}
      user: ${STORAGE_USER:postgres}
      password: ${STORAGE_PASSWORD:postgres}
    pool:
      max-size: ${STORAGE_POOL_SIZE:30}

Параметры настроек

  • type - тип, postgres|kafka, например: STORAGE_TYPE:postgres;

  • connection - параметры подключения к базе.

3.10. Секция kafka

В секции kafka настраиваются параметры подключения к шине данных Apache Kafka (используется для взаимодействия с СМЭВ4-адаптером) и настройки взаимодействия через топики модуля СМЭВ4-адаптер - Модуль исполнения запросов.

Модуль взаимодействует через топики:

  • replication.rq/rs/err - запрос создания подписки (Поставщик данных);

  • replication.cancel.rq/rs/err - запрос отмены подписки (Поставщик данных);

  • delta.rq/mppr.delta.rq - запрос дельты (Поставщик данных);

  • replication.in.rq/rs/err - запрос создания структуры по подписке (Получатель данных);

  • subscription.in/delta.in.rs/delta.in.err - запрос применения дельты (Получатель данных);

  • status.event/delta.notification - статусы с Prostore (Поставщик данных).

Например:

kafka:
  agent.topic.prefix: ${AGENT_TOPIC_PREFIX:}
  max-concurrent-handle: ${KAFKA_MAX_CONCURRENT_HANDLE:10}
  commit-interval: ${KAFKA_COMMIT_INTERVAL:5s}
  external:
    bootstrap.servers: ${KAFKA_BOOTSTRAP_SERVERS:localhost:9092}
    topic.prefix: ${EXTERNAL_TOPIC_PREFIX:${kafka.agent.topic.prefix}}
  internal:
    bootstrap.servers: ${PS_KAFKA:localhost:9092}
    topic.prefix: ${INTERNAL_TOPIC_PREFIX:${kafka.agent.topic.prefix}}
  consumer:
    subscription-request:
      topic: ${kafka.external.topic.prefix}replication.rq
      max-concurrent-handle: ${kafka.max-concurrent-handle}
      commit-interval: ${kafka.commit-interval}
      property:
        bootstrap.servers: ${kafka.external.bootstrap.servers}
        group.id: ${kafka.external.topic.prefix}replicator-subscription-request
        auto.offset.reset: earliest
        enable.auto.commit: false
    subscription-cancel-request:
      topic: ${kafka.external.topic.prefix}replication.cancel.rq
      max-concurrent-handle: ${kafka.max-concurrent-handle}
      commit-interval: ${kafka.commit-interval}
      property:
        bootstrap.servers: ${kafka.external.bootstrap.servers}
        group.id: ${kafka.external.topic.prefix}replicator-subscription-cancel-request
        auto.offset.reset: earliest
        enable.auto.commit: false
    subscription-consumer-cancel-request:
      topic: ${kafka.external.topic.prefix}replication.cancel.in.rq
      max-concurrent-handle: ${kafka.max-concurrent-handle}
      commit-interval: ${kafka.commit-interval}
      property:
        bootstrap.servers: ${kafka.external.bootstrap.servers}
        group.id: ${kafka.external.topic.prefix}}replicator-subscription-consumer-cancel-request
        auto.offset.reset: earliest
        enable.auto.commit: false
    subscription-storage-request:
      topic: ${kafka.external.topic.prefix}replication.in.rq
      max-concurrent-handle: ${kafka.max-concurrent-handle}
      commit-interval: ${kafka.commit-interval}
      property:
        bootstrap.servers: ${kafka.external.bootstrap.servers}
        group.id: ${kafka.external.topic.prefix}replicator-subscription-storage-request
        auto.offset.reset: earliest
        enable.auto.commit: false
    delta-request:
      topic: ${kafka.external.topic.prefix}delta.rq
      max-concurrent-handle: ${kafka.max-concurrent-handle}
      commit-interval: ${kafka.commit-interval}
      property:
        bootstrap.servers: ${kafka.external.bootstrap.servers}
        group.id: ${kafka.external.topic.prefix}replicator-delta-request
        auto.offset.reset: earliest
        enable.auto.commit: false
    delta-apply-notification:
      topic: ${kafka.internal.topic.prefix}subscription.in
      max-concurrent-handle: ${kafka.max-concurrent-handle}
      commit-interval: ${kafka.commit-interval}
      property:
        bootstrap.servers: ${kafka.internal.bootstrap.servers}
        group.id: ${kafka.internal.topic.prefix}replicator-delta-apply-notification
        auto.offset.reset: earliest
        enable.auto.commit: false
    mppw-delta-apply-result:
      topic: ${kafka.internal.topic.prefix}mppw.delta.in.rs
      max-concurrent-handle: ${kafka.max-concurrent-handle}
      commit-interval: ${kafka.commit-interval}
      property:
        bootstrap.servers: ${kafka.internal.bootstrap.servers}
        group.id: ${kafka.internal.topic.prefix}replicator-delta-apply-result
        auto.offset.reset: earliest
        enable.auto.commit: false

  producer:
    subscription-result: ${kafka.external.topic.prefix}replication.rs
    subscription-error: ${kafka.external.topic.prefix}replication.err
    subscription-cancel-error: ${kafka.external.topic.prefix}replication.cancel.rs
    subscription-cancel-result: ${kafka.external.topic.prefix}replication.cancel.rs
    subscription-consumer-cancel-result: ${kafka.external.topic.prefix}replication.cancel.in.rs
    subscription-storage-result: ${kafka.external.topic.prefix}replication.in.rs
    subscription-storage-error: ${kafka.external.topic.prefix}replication.in.err
    delta-error: ${kafka.external.topic.prefix}delta.err
    delta-apply-error: ${kafka.external.topic.prefix}delta.in.err
    delta-apply-result: ${kafka.external.topic.prefix}delta.in.rs
    delta-notification: ${kafka.external.topic.prefix}delta.notification
    property:
      bootstrap.servers: ${kafka.external.bootstrap.servers}
    internal:
      mppr-delta-request: ${kafka.internal.topic.prefix}mppr.delta.rq
      mppw-delta-apply-request: ${kafka.internal.topic.prefix}mppw.delta.in.rq
      property:
        bootstrap.servers: ${kafka.internal.bootstrap.servers}

Параметры конфигурации

3.11. Секция metrics

В секции metrics настраиваются параметры метрик.

Например:

metrics:
  port: ${METRICS_PORT:9837

Параметры конфигурации

  • port - порт для получения метрик, например 9837.

3.12. Секция log

В секции log настраиваются параметры логирования.

Например:

log:
  replRequest: ${REPL_REQUEST_LOG_ENABLED:false}
  replResponse: ${REPL_RESPONSE_LOG_ENABLED:false}

Параметры конфигурации

  • repl-request - журналировать запросы к модулю подписок, например REPL_REQUEST_LOG_ENABLED:false;

  • repl-response - журналировать ответы модуля подписок, например REPL_RESPONSE_LOG_ENABLED:false.

3.13. Секция backup

В секции backup настраиваются параметры бекапирования модуля.

Например:

backup:
  zk-path: ${REPLICATOR_BACKUP_ZK_PATH:/${environment.name}/podd-adapter-replicator}
  commandTopic: ${BACKUP_COMMAND_TOPIC:adapter.command}
  adapterCommandBroadcast: ${REPLICATOR_COMMAND_BROADCAST_TOPIC:adapter.command.broadcast}
  backupTopic: ${BACKUP_TOPIC:adapter.backup}
  statusTopic: ${STATUS_TOPIC:adapter.status}
  timeout: ${BACKUP_TIMEOUT:PT180s}
  idleDelay: ${BACKUP_DELAY:500}
  kafka:
    consumer:
      property:
        bootstrap.servers: ${kafka.internal.bootstrap.servers}
        group.id: ${REPLICATOR_BACKUP_GROUP_ID:podd_adapter_replicator_adapter_command}
        auto.offset.reset: latest
    producer:
      property:
        bootstrap.servers: ${kafka.internal.bootstrap.servers}

Параметры настроек

  • zk-path - путь к корневой ноде Zookeeper для бэкапирования, например {COUNTER_BACKUP_ZK_PATH:/${environment.name}/counter-provider/counters};

  • commandTopic - топик команд бэкапирования, например: {BACKUP_COMMAND_TOPIC:adapter.command};

  • backupTopic - топик для отправки сохраненных данных, например: {BACKUP_TOPIC:adapter.backup};

  • statusTopic - топик для отправки статусов бэкапирования, например: {STATUS_TOPIC:adapter.status}.