2. Конфигурация модуля ПОДД-адаптер - Модуль MPPW (application.yml)

Файл application.yml – основной конфигурационный файл модуля, в котором задана его логика и порядок работы модуля: подключение к Агенту ПОДД, Брокеру сообщений Kafka, Zookeeper, а также настройка подключения к Prostore (секция: prostore), настройка метрик (секция: metrics) и другие настройки необходимые для корректной работы адаптера.

2.1. Пример файла application.yml

В конфигурационном файле следует задавать только те настройки, которые необходимы для решения текущих бизнес-задач.

tp:
  data-topic-prefix: ${DATA_TOPIC_PREFIX:tp.data}
  upload-topic-prefix: ${UPLOAD_TOPIC_PREFIX:tmp.w}

upload:
  data-topic-prefix: ${AGENT_TOPIC_PREFIX:}
  retry:
    attempts: 5
    delay: 1m


http-server:
  port: ${HTTP_PORT:8090}

environment:
  name: ${ENVIRONMENT_NAME:test}

executor:
  reader-pool-size: ${EXECUTOR_READER_POOL_SIZE:20}
  max-execute-time: ${EXECUTOR_MAX_EXECUTE_TIME:600}

scheduler:
  delta-apply-request-timeout: ${DELTA_APPLY_REQUEST_TIMEOUT:60}

spring:
  liquibase:
    enabled: ${LIQUIBASE_ENABLED:true} # Для replication.storage.type=kafka необходимо выключить
    change-log: classpath:/liquibase-changes/changelog.xml
    driver-class-name: org.postgresql.Driver
    url: jdbc:postgresql://${replication.storage.postgres.connection.host}:${replication.storage.postgres.connection.port}/${replication.storage.postgres.connection.database}?currentSchema=${replication.storage.postgres.connection.schema}
    user: ${replication.storage.postgres.connection.user}
    password: ${replication.storage.postgres.connection.password}

replication:
  should-check-source-topic: false
  should-delete-prostore-topic: false
  prostore-topic-template: "mppw.{datamart}.{table}"
  storage: # блок настроек хранения чанков данных репликации. При изменении параметров необходимо синхронизировать аналогичные параметры в сервисе mppw
    type: ${STORAGE_TYPE:postgres} # тип, postgres|kafka
    postgres: # параметры подключения к базе. Используется только при type=postgres
      connection:
        database: ${STORAGE_DATABASE:replication}
        schema: ${STORAGE_SCHEMA:public}
        host: ${STORAGE_HOST:localhost}
        port: ${STORAGE_PORT:5432}
        user: ${STORAGE_USER:postgres}
        password: ${STORAGE_PASSWORD:postgres}
      pool:
        max-size: ${STORAGE_POOL_SIZE:30}

delta:
  # параметр ожидания перед повторной попыткой открытия дельты в случае ошибки
  open-delay: ${DELTA_OPEN_DELAY:${scheduler.delta-apply-request-timeout}}s
  # количество попыток открытия дельты в случае ошибки
  open-attempts: ${DELTA_OPEN_ATTEMPTS:5}
  # период проверки открытых дельт
  open-check: ${DELTA_OPEN_CHECK:60}s
  # количество попыток фиксации дельты в случае ошибки
  commit-attempts: ${DELTA_COMMIT_ATTEMPTS:5}
  # период ожидания перед повторной попыткой фиксации дельты
  commit-error-delay: ${DELTA_COMMIT_DELAY:1}s
  # количество попыток отката дельты в случае ошибки
  rollback-attempts: ${DELTA_COMMIT_ATTEMPTS:3}
  # период ожидания перед повторной попыткой отката дельты
  rollback-error-delay: ${DELTA_COMMIT_DELAY:5}s
  #enable/disable/period
  creating-delta-on-upload-request: ${DELTA_CREATING_MODE:enable}
  period: ${CREATING_DELTA_ON_UPLOAD_REQUEST_PERIOD:300}s



send:
  channel-size: ${SEND_CHANNEL_SIZE:10}
  timeout: ${SEND_TIMEOUT:60}

zookeeper:
  connection-string: ${ZOOKEEPER_DS_ADDRESS:localhost}
  connection-timeout-ms: ${ZOOKEEPER_DS_CONNECTION_TIMEOUT_MS:30000}
  session-timeout-ms: ${ZOOKEEPER_DS_SESSION_TIMEOUT_MS:86400000}
  chroot: ${ZOOKEEPER_DS_CHROOT:/adapter}

prostore-rest-client:
  host: ${PS_HOST:localhost}
  port: ${PS_PORT:9195}
  http:
    max-pool-size: ${PS_MAX_POOL_SIZE:8}

prostore:
  key:
    primary: ${PRIMARY_KEY_NAME:tmp_id}
    type: ${PRIMARY_KEY_TYPE:BIGINT}
  kafka:
    message-limit: ${PS_MESSAGE_LIMIT:1000}
    zk-url: ${PS_ZK_KAFKA_URL:localhost:2181}
    properties:
      bootstrap.servers: ${PS_KAFKA:localhost:9092}

kafka:
  agent.topic.prefix: ${AGENT_TOPIC_PREFIX:}
  max-concurrent-handle: ${KAFKA_MAX_CONCURRENT_HANDLE:10}
  commit-interval: ${KAFKA_COMMIT_INTERVAL:5s}
  internal:
    bootstrap.servers: ${PS_KAFKA:localhost:9092}
    topic.prefix: ${INTERNAL_TOPIC_PREFIX:${kafka.agent.topic.prefix}}
  consumer:
    tp-request:
      topic: ${kafka.internal.topic.prefix}mppw.tp
      max-concurrent-handle: ${kafka.max-concurrent-handle}
      commit-interval: ${kafka.commit-interval}
      property:
        bootstrap.servers: ${kafka.internal.bootstrap.servers}
        group.id: ${kafka.internal.topic.prefix}podd-adapter-mppw-tp
        auto.offset.reset: earliest
        enable.auto.commit: false
    upload-request:
      topic: ${kafka.internal.topic.prefix}mppw.upload.rq
      commit-interval: ${kafka.commit-interval}
      property:
        bootstrap.servers: ${kafka.internal.bootstrap.servers}
        group.id: ${kafka.internal.topic.prefix}podd-adapter-mppw-upload
        auto.offset.reset: earliest
        enable.auto.commit: false
    delta-apply-request:
      topic: ${kafka.internal.topic.prefix}mppw.delta.in.rq
      commit-interval: ${kafka.commit-interval}
      property:
        bootstrap.servers: ${kafka.internal.bootstrap.servers}
        group.id: ${kafka.internal.topic.prefix}podd-adapter-mppw-delta
        auto.offset.reset: earliest
        enable.auto.commit: false
    upload-data:
      property:
        bootstrap.servers: ${kafka.internal.bootstrap.servers}
        group.id: ${kafka.internal.topic.prefix}podd-adapter-mppw-data
        auto.offset.reset: earliest
        enable.auto.commit: false
  producer:
    tp-result: ${kafka.internal.topic.prefix}mppw.rs
    upload-result: ${kafka.internal.topic.prefix}mppw.upload.rs
    delta-apply-result: ${kafka.internal.topic.prefix}mppw.delta.in.rs
    property:
      bootstrap.servers: ${kafka.internal.bootstrap.servers}

metrics:
  port: ${METRICS_PORT:9843}

jet-connector:
  use: ${JET_CONNECTOR_USE:true}

3. Параметры конфигурации

Настройка конфигурации ПОДД-адаптера - Модуль MPPW осуществляется путем редактирования параметров настроек в файле application.yml, где настраиваются секции:

  • tp - указываются настройки топиков для табличных параметров;

  • upload - настройки загрузки через DATA-Uploader;

  • http-server - указывается порт для подключения;

  • environment - указывается название окружения (test, prod и т.д.);

  • executor - настраивается размер пула для запросов;

  • scheduler - настройки планировщика отложенных заданий;

  • replication - блок настроек хранения чанков данных репликации;

  • send - настраиваются ограничения на размер загружаемого файла;

  • zookeeper - указываются настройки подключения к Zookeeper;

  • prostore-rest-client - блок параметров конфигурирования взаимодействия с ProStore;

  • prostore - указываются настройки генерации на создание и удаление writable table;

  • kafka - настройки параметров подключения к шине данных Apache Kafka;

  • metrics - настройка получения метрик;

  • jet-connector - подготовлен для оптимизации задержек записи.

3.1. Секция tp

Секция tp предназначена для настройки префиксов топиков, откуда будут вычитываться и куда будут загружаться данные.

Например:

tp:
  data-topic-prefix: ${DATA_TOPIC_PREFIX:tp.data}
  upload-topic-prefix: ${UPLOAD_TOPIC_PREFIX:tmp.w}

Параметры конфигурации

  • data-topic-prefix - префикс топика, откуда будут вычитываться данные, например DATA_TOPIC_PREFIX:tp.data;

  • upload-topic-prefix - префикс топика, куда будут загружаться данные для их последующей загрузки в витрину, например UPLOAD_TOPIC_PREFIX:tmp.w.

3.2. Секция upload

В секции upload - указываются настройки загрузки через DATA-Uploader.

Например:

upload:
  data-topic-prefix: ${AGENT_TOPIC_PREFIX:}
  retry:
    attempts: 5
    delay: 1m

Параметры конфигурации

  • AGENT_TOPIC_PREFIX: - значение префикса для топиков. Топики взаимодействия с ПОДД-адаптером - Модуль исполнения запросов (см. Спецификация Модуля исполнения запросов);

  • retry - повтор попытки загрузки;

  • attempts - количество попыток;

  • delay - период задержки (в минутах).

3.3. Секция http-server

В секции http-server указывается порт веб-сервера.

Например:

http-server:
  port: ${HTTP_PORT:8090}

Параметры настроек

  • port - указывается порт веб-сервера, например: HTTP_PORT:8090.

3.4. Секция environment

В секции environment указывается среда разработки (dev, test, stable, prod).

Например:

environment:
  name: ${ENVIRONMENT_NAME:test}

Параметры настроек

  • name - среда разработки, например ENVIRONMENT_NAME:test.

3.5. Секция executor

Секция executor предназначена для указания размера пула для чтения Kafka и времени выполнения задач.

Например:

executor:
  reader-pool-size: ${EXECUTOR_READER_POOL_SIZE:20}
  max-execute-time: ${EXECUTOR_MAX_EXECUTE_TIME:600}

Параметры настроек

  • reader-pool-size - размер пула для чтения Kafka, например EXECUTOR_READER_POOL_SIZE:20;

  • max-execute-time - максимальное время выполнения задачи (в секундах), например EXECUTOR_MAX_EXECUTE_TIME:600.

3.6. Секция scheduler

Секция scheduler предназначена для настроек планировщика отложенных заданий.

Например:

scheduler:
  delta-apply-request-timeout: ${DELTA_APPLY_REQUEST_TIMEOUT:60}

Параметры настроек

  • delta-apply-request-timeout - таймаут применения отложенной дельты, например DELTA_APPLY_REQUEST_TIMEOUT:60.

3.7. Секция replication

секции replication указываются настройки хранения чанков данных репликации.

Например:

replication:
  should-check-source-topic: false
  should-delete-prostore-topic: false
  prostore-topic-template: "mppw.{datamart}.{table}"
  storage: # блок настроек хранения чанков данных репликации. При изменении параметров необходимо синхронизировать аналогичные параметры в сервисе mppw
    type: ${STORAGE_TYPE:postgres} # тип, postgres|kafka
    postgres: # параметры подключения к базе. Используется только при type=postgres
      connection:
        database: ${STORAGE_DATABASE:replication}
        schema: ${STORAGE_SCHEMA:public}
        host: ${STORAGE_HOST:localhost}
        port: ${STORAGE_PORT:5432}
        user: ${STORAGE_USER:postgres}
        password: ${STORAGE_PASSWORD:postgres}
      pool:
        max-size: ${STORAGE_POOL_SIZE:30}

Параметры настроек

  • type - тип, postgres|kafka, например: STORAGE_TYPE:postgres;

  • connection - параметры подключения к базе.

3.8. Секция delta

В секции delta настраивается параметр ожидания перед повторной попыткой открытия дельты при ошибке.

Например:

delta:
  # параметр ожидания перед повторной попыткой открытия дельты в случае ошибки
  open-delay: ${DELTA_OPEN_DELAY:${scheduler.delta-apply-request-timeout}}s
  # количество попыток открытия дельты в случае ошибки
  open-attempts: ${DELTA_OPEN_ATTEMPTS:5}
  # период проверки открытых дельт
  open-check: ${DELTA_OPEN_CHECK:60}s
  # количество попыток фиксации дельты в случае ошибки
  commit-attempts: ${DELTA_COMMIT_ATTEMPTS:5}
  # период ожидания перед повторной попыткой фиксации дельты
  commit-error-delay: ${DELTA_COMMIT_DELAY:1}s
  # количество попыток отката дельты в случае ошибки
  rollback-attempts: ${DELTA_COMMIT_ATTEMPTS:3}
  # период ожидания перед повторной попыткой отката дельты
  rollback-error-delay: ${DELTA_COMMIT_DELAY:5}s
  #enable/disable/period
  creating-delta-on-upload-request: ${DELTA_CREATING_MODE:enable}
  period: ${CREATING_DELTA_ON_UPLOAD_REQUEST_PERIOD:300}s

Параметры настроек

  • open-delay - параметр ожидания (в секундах) перед повторной попыткой открытия дельты в случае ошибки, например DELTA_OPEN_DELAY:${scheduler.delta-apply-request-timeout};

  • open-attempts - количество попыток открытия дельты в случае ошибки, например DELTA_OPEN_ATTEMPTS:5;

  • open-check - период проверки открытых дельт (в секундах), например DELTA_OPEN_CHECK:60;

  • commit-attempts - количество попыток фиксации дельты в случае ошибки, например DELTA_COMMIT_ATTEMPTS:5;

  • commit-error-delay - период ожидания перед повторной попыткой фиксации дельты (в секундах), например DELTA_COMMIT_DELAY:1;

  • rollback-attempts - количество попыток отката дельты в случае ошибки, например DELTA_COMMIT_ATTEMPTS:3;

  • rollback-error-delay - период ожидания перед повторной попыткой отката дельты (в секундах), например DELTA_COMMIT_DELAY:5;

  • creating-delta-on-upload-request - создание дельты при запросе загрузки, например DELTA_CREATING_MODE:enable;

  • period - период создания дельты при запросе загрузки (в секундах), например CREATING_DELTA_ON_UPLOAD_REQUEST_PERIOD:300, используется только при mode=llw и creating-delta-on-upload-request=period;

Для параметра creating-delta-on-upload-request доступны значения:

  • enable - формируются дельты для загружаемых порций данных, данное значение используется для работы подписок;

  • disable - формирование дельт отключено;

  • period - исключается формирование дельт в процессе исполнения команд загрузки данных, вместо этого дельты периодически открываются и закрываются с периодом, указанным в данном параметре. При этом приложение запоминает датамарты по которым завершены инсерты за истекший период. Дельты открываются и закрываются только для датамартов, в которые происходила вставка данных.

Внимание

При потере связи коннектора с Prostore отмены операции не происходит автоматически и модуль MPPW получает ошибку. После восстановления связи коннектора с Prostore и попытке загрузки данных в эту же таблицу возвращается ошибка о блокировке таблицы предыдущей операции вставки. Для исправления ошибки Администратор должен вручную выполнить команду resume или erase в Prostore.

3.9. Секция send

В секции send настраиваются ограничения на размер загружаемого файла.

Например:

send:
  channel-size: ${SEND_CHANNEL_SIZE:10}
  timeout: ${SEND_TIMEOUT:60}

Параметры настроек

  • channel-size - размер канала на отправку сообщения, например SEND_CHANNEL_SIZE:10;

  • timeout - таймаут вычитывания данных из топика (сек), например SEND_TIMEOUT:60.

3.10. Секция zookeeper

Секция zookeeper предназначена для настройки параметров подключения к серверу Zookeeper.

Например:

zookeeper:
  connection-string: ${ZOOKEEPER_DS_ADDRESS:t5-adsp-01.ru-central1.internal}
  connection-timeout-ms: ${ZOOKEEPER_DS_CONNECTION_TIMEOUT_MS:30000}
  session-timeout-ms: ${ZOOKEEPER_DS_SESSION_TIMEOUT_MS:86400000}
  chroot: ${ZOOKEEPER_DS_CHROOT:/adapter}

Параметры конфигурации

  • connection-string - подключение к Zookeeper DS, например ZOOKEEPER_DS_ADDRESS:t5-adsp-01.ru-central1.internal;

  • connection-timeout-ms - Zookeeper DS таймаут подключения, например ZOOKEEPER_DS_CONNECTION_TIMEOUT_MS:30000;

  • session-timeout-ms - Zookeeper DS таймаут сессии, например ZOOKEEPER_DS_SESSION_TIMEOUT_MS:86400000;

  • chroot - Zookeeper DS chroot path, например ZOOKEEPER_DS_CHROOT:/adapter.

3.11. Секция prostore-rest-client

В секции prostore-rest-client реализован блок параметров конфигурирования взаимодействия с Prostore.

Например:

prostore-rest-client:
  host: ${PS_HOST:localhost}
  port: ${PS_PORT:9195}
  http:
    max-pool-size: ${PS_MAX_POOL_SIZE:8}

Параметры настроек

  • host - адрес Prostore, например PS_HOST:localhost;

  • port - порт Prostore, например PS_PORT:9195;

  • max-pool-size - максимальное число подключений к Prostore, например PS_MAX_POOL_SIZE:8.

3.12. Секция prostore

Секция prostore

Например:

prostore:
  key:
    primary: ${PRIMARY_KEY_NAME:tmp_id}
    type: ${PRIMARY_KEY_TYPE:BIGINT}
  kafka:
    message-limit: ${PS_MESSAGE_LIMIT:1000}
    zk-url: ${PS_ZK_KAFKA_URL:localhost:2181}
    properties:
      bootstrap.servers: ${PS_KAFKA:localhost:9092}

Параметры настроек

  • primary - название первичного ключа, например tmp_id;

  • type - тип первичного ключа, например BIGINT;

  • message-limit - лимит сообщений, например PS_MESSAGE_LIMIT:1000;

  • zk-url - адрес сервера Zookeeper для загрузки данных в Prostore, например PS_ZK_KAFKA_URL:localhost:2181.

3.13. Секция kafka

В секции kafka собраны настройки параметров подключения к шине данных Apache Kafka.

Например:

kafka:
  agent.topic.prefix: ${AGENT_TOPIC_PREFIX:}
  max-concurrent-handle: ${KAFKA_MAX_CONCURRENT_HANDLE:10}
  commit-interval: ${KAFKA_COMMIT_INTERVAL:5s}
  internal:
    bootstrap.servers: ${PS_KAFKA:localhost:9092}
    topic.prefix: ${INTERNAL_TOPIC_PREFIX:${agent.topic.prefix}}
  consumer:
    tp-request:
      topic: ${kafka.internal.topic.prefix}mppw.tp
      max-concurrent-handle: ${kafka.max-concurrent-handle}
      commit-interval: ${kafka.commit-interval}
      property:
        bootstrap.servers: ${kafka.internal.bootstrap.servers}
        group.id: ${kafka.internal.topic.prefix}podd-adapter-mppw-tp
        auto.offset.reset: earliest
        enable.auto.commit: false
    upload-request:
      topic: ${kafka.internal.topic.prefix}mppw.upload.rq
      commit-interval: ${kafka.commit-interval}
      property:
        bootstrap.servers: ${kafka.internal.bootstrap.servers}
        group.id: ${kafka.internal.topic.prefix}podd-adapter-mppw-upload
        auto.offset.reset: earliest
        enable.auto.commit: false
    delta-apply-request:
      topic: ${kafka.internal.topic.prefix}mppw.delta.in.rq
      commit-interval: ${kafka.commit-interval}
      property:
        bootstrap.servers: ${kafka.internal.bootstrap.servers}
        group.id: ${kafka.internal.topic.prefix}podd-adapter-mppw-delta
        auto.offset.reset: earliest
        enable.auto.commit: false
    upload-data:
      property:
        bootstrap.servers: ${kafka.internal.bootstrap.servers}
        group.id: ${kafka.internal.topic.prefix}podd-adapter-mppw-data
        auto.offset.reset: earliest
        enable.auto.commit: false
  producer:
    tp-result: ${kafka.internal.topic.prefix}mppw.rs
    upload-result: ${kafka.internal.topic.prefix}mppw.upload.rs
    delta-apply-result: ${kafka.internal.topic.prefix}mppw.delta.in.rs
    property:
      bootstrap.servers: ${kafka.internal.bootstrap.servers}

Параметры конфигурации

  • AGENT_TOPIC_PREFIX - префикс для топиков Агента СМЭВ4, например.

3.14. Секция metrics

Секция metrics предназначена для настройки параметров метрик.

Например:

metrics:
  port: ${METRICS_PORT:9843}

Параметры конфигурации

  • port - порт для метрик, например METRICS_PORT:9843.

3.15. Секция jet-connector

Секция jet-connector предназначена для оптимизации задержек записи данных.

Например:

jet-connector:
use: ${JET_CONNECTOR_USE:false}

Параметры настроек

  • use - флаг активации jet-connector, например {JET_CONNECTOR_USE:false};

Таблица 3.4 Область применения

Режим/СУБД

ADB

ADP

ADQM

ADG

Чтение

Нет

Нет

Нет

Нет

Запись

В перспективе

Да

Нет

Нет

Чтобы воспользоваться jet-connector требуется вместо upload external table создавать readable external table, указывающую на топик, как отражено в документации Prostore

В модуль MPPW не передается информация о том, в какую БД физически будут загружаться данные, синтаксис Prostore един для всех поддерживаемых СУБД. Конкретная СУБД указывается в настройках Prostore.

Даже если загрузка данных выполняется в более чем одну базу, на работе адаптеров это не сказывается.

При формировании запросов в табличными параметрами, в том числе при регистрации РЗ, необходимо явно перечислять поля таблиц, по которым будет выполняться фильтрация записей или объединение таблиц.

Переключение с jet-connector на kafka postgres writer и наоборот допускается при завершенных операциях загрузки данных.

Предупреждение

jet-connector в настоящее время применим для ADP, что делает его применимым, только для иснталляций одной единственной СУБД ADP. Это ограничение остается на уровне документации при использовании jet-connector с другими базами должна появиться ошибка.