.. _podd_adapter_mppw_config: Конфигурация модуля СМЭВ4-адаптер - Модуль MPPW (application.yml) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ Файл ``application.yml`` – основной конфигурационный файл модуля, в котором задана его логика и порядок работы модуля: подключение к Агенту СМЭВ4, Брокеру сообщений **Kafka**, **Zookeeper**, а также настройка подключения к **Prostore** (секция: ``prostore``), настройка метрик (секция: ``metrics``) и другие настройки необходимые для корректной работы адаптера. .. _podd_mppw_application_yml: Пример файла application.yml ############################### В конфигурационном файле следует задавать только те настройки, которые необходимы для решения текущих бизнес-задач. .. code-block:: yaml tp: data-topic-prefix: ${DATA_TOPIC_PREFIX:tp.data} upload-topic-prefix: ${UPLOAD_TOPIC_PREFIX:tmp.w} upload: data-topic-prefix: ${AGENT_TOPIC_PREFIX:} retry: attempts: 5 delay: 1m http-server: port: ${HTTP_PORT:8090} environment: name: ${ENVIRONMENT_NAME:test} executor: reader-pool-size: ${EXECUTOR_READER_POOL_SIZE:20} max-execute-time: ${EXECUTOR_MAX_EXECUTE_TIME:600} scheduler: delta-apply-request-timeout: ${DELTA_APPLY_REQUEST_TIMEOUT:60} spring: liquibase: enabled: ${LIQUIBASE_ENABLED:true} # Для replication.storage.type=kafka необходимо выключить change-log: classpath:/liquibase-changes/changelog.xml driver-class-name: org.postgresql.Driver url: jdbc:postgresql://${replication.storage.postgres.connection.host}:${replication.storage.postgres.connection.port}/${replication.storage.postgres.connection.database}?currentSchema=${replication.storage.postgres.connection.schema} user: ${replication.storage.postgres.connection.user} password: ${replication.storage.postgres.connection.password} replication: should-check-source-topic: false should-delete-prostore-topic: false prostore-topic-template: "mppw.{datamart}.{table}" storage: # блок настроек хранения чанков данных репликации. При изменении параметров необходимо синхронизировать аналогичные параметры в сервисе mppw type: ${STORAGE_TYPE:postgres} # тип, postgres|kafka postgres: # параметры подключения к базе. Используется только при type=postgres connection: database: ${STORAGE_DATABASE:replication} schema: ${STORAGE_SCHEMA:public} host: ${STORAGE_HOST:localhost} port: ${STORAGE_PORT:5432} user: ${STORAGE_USER:postgres} password: ${STORAGE_PASSWORD:postgres} pool: max-size: ${STORAGE_POOL_SIZE:30} delta: # параметр ожидания перед повторной попыткой открытия дельты в случае ошибки open-delay: ${DELTA_OPEN_DELAY:${scheduler.delta-apply-request-timeout}}s # количество попыток открытия дельты в случае ошибки open-attempts: ${DELTA_OPEN_ATTEMPTS:5} # период проверки открытых дельт open-check: ${DELTA_OPEN_CHECK:60}s # количество попыток фиксации дельты в случае ошибки commit-attempts: ${DELTA_COMMIT_ATTEMPTS:5} # период ожидания перед повторной попыткой фиксации дельты commit-error-delay: ${DELTA_COMMIT_DELAY:1}s # количество попыток отката дельты в случае ошибки rollback-attempts: ${DELTA_COMMIT_ATTEMPTS:3} # период ожидания перед повторной попыткой отката дельты rollback-error-delay: ${DELTA_COMMIT_DELAY:5}s #enable/disable/period creating-delta-on-upload-request: ${DELTA_CREATING_MODE:enable} period: ${CREATING_DELTA_ON_UPLOAD_REQUEST_PERIOD:300}s send: channel-size: ${SEND_CHANNEL_SIZE:10} timeout: ${SEND_TIMEOUT:60} zookeeper: connection-string: ${ZOOKEEPER_DS_ADDRESS:localhost} connection-timeout-ms: ${ZOOKEEPER_DS_CONNECTION_TIMEOUT_MS:30000} session-timeout-ms: ${ZOOKEEPER_DS_SESSION_TIMEOUT_MS:86400000} chroot: ${ZOOKEEPER_DS_CHROOT:/adapter} prostore-rest-client: host: ${PS_HOST:localhost} port: ${PS_PORT:9195} http: max-pool-size: ${PS_MAX_POOL_SIZE:8} prostore: key: primary: ${PRIMARY_KEY_NAME:tmp_id} type: ${PRIMARY_KEY_TYPE:BIGINT} kafka: message-limit: ${PS_MESSAGE_LIMIT:1000} zk-url: ${PS_ZK_KAFKA_URL:localhost:2181} properties: bootstrap.servers: ${PS_KAFKA:localhost:9092} kafka: agent.topic.prefix: ${AGENT_TOPIC_PREFIX:} max-concurrent-handle: ${KAFKA_MAX_CONCURRENT_HANDLE:10} commit-interval: ${KAFKA_COMMIT_INTERVAL:5s} internal: bootstrap.servers: ${PS_KAFKA:localhost:9092} topic.prefix: ${INTERNAL_TOPIC_PREFIX:${kafka.agent.topic.prefix}} consumer: tp-request: topic: ${kafka.internal.topic.prefix}mppw.tp max-concurrent-handle: ${kafka.max-concurrent-handle} commit-interval: ${kafka.commit-interval} property: bootstrap.servers: ${kafka.internal.bootstrap.servers} group.id: ${kafka.internal.topic.prefix}podd-adapter-mppw-tp auto.offset.reset: earliest enable.auto.commit: false upload-request: topic: ${kafka.internal.topic.prefix}mppw.upload.rq commit-interval: ${kafka.commit-interval} property: bootstrap.servers: ${kafka.internal.bootstrap.servers} group.id: ${kafka.internal.topic.prefix}podd-adapter-mppw-upload auto.offset.reset: earliest enable.auto.commit: false delta-apply-request: topic: ${kafka.internal.topic.prefix}mppw.delta.in.rq commit-interval: ${kafka.commit-interval} property: bootstrap.servers: ${kafka.internal.bootstrap.servers} group.id: ${kafka.internal.topic.prefix}podd-adapter-mppw-delta auto.offset.reset: earliest enable.auto.commit: false upload-data: property: bootstrap.servers: ${kafka.internal.bootstrap.servers} group.id: ${kafka.internal.topic.prefix}podd-adapter-mppw-data auto.offset.reset: earliest enable.auto.commit: false producer: tp-result: ${kafka.internal.topic.prefix}mppw.rs upload-result: ${kafka.internal.topic.prefix}mppw.upload.rs delta-apply-result: ${kafka.internal.topic.prefix}mppw.delta.in.rs property: bootstrap.servers: ${kafka.internal.bootstrap.servers} metrics: port: ${METRICS_PORT:9843} jet-connector: use: ${JET_CONNECTOR_USE:true} Параметры конфигурации ^^^^^^^^^^^^^^^^^^^^^^ Настройка конфигурации **СМЭВ4-адаптера - Модуль MPPW** осуществляется путем редактирования параметров настроек в файле ``application.yml``, где настраиваются секции: - ``tp`` - указываются настройки топиков для табличных параметров; - ``upload`` - настройки загрузки через DATA-Uploader; - ``http-server`` - указывается порт для подключения; - ``environment`` - указывается название окружения (``test``, ``prod`` и т.д.); - ``executor`` - настраивается размер пула для запросов; - ``scheduler`` - настройки планировщика отложенных заданий; - ``replication`` - блок настроек хранения чанков данных репликации; - ``send`` - настраиваются ограничения на размер загружаемого файла; - ``zookeeper`` - указываются настройки подключения к Zookeeper; - ``prostore-rest-client`` - блок параметров конфигурирования взаимодействия с **ProStore**; - ``prostore`` - указываются настройки генерации на создание и удаление **writable table**; - ``kafka`` - настройки параметров подключения к шине данных Apache Kafka; - ``metrics`` - настройка получения метрик; - ``jet-connector`` - подготовлен для оптимизации задержек записи. Секция tp ######### Секция ``tp`` предназначена для настройки префиксов топиков, откуда будут вычитываться и куда будут загружаться данные. Например: .. code-block:: yaml tp: data-topic-prefix: ${DATA_TOPIC_PREFIX:tp.data} upload-topic-prefix: ${UPLOAD_TOPIC_PREFIX:tmp.w} **Параметры конфигурации** - ``data-topic-prefix`` - префикс топика, откуда будут вычитываться данные, например ``DATA_TOPIC_PREFIX:tp.data``; - ``upload-topic-prefix`` - префикс топика, куда будут загружаться данные для их последующей загрузки в витрину, например ``UPLOAD_TOPIC_PREFIX:tmp.w``. Секция upload ############# В секции ``upload`` - указываются настройки загрузки через **DATA-Uploader**. Например: .. code-block:: yaml upload: data-topic-prefix: ${AGENT_TOPIC_PREFIX:} retry: attempts: 5 delay: 1m **Параметры конфигурации** - ``AGENT_TOPIC_PREFIX:`` - значение префикса для топиков. Топики взаимодействия с **СМЭВ4-адаптером - Модуль исполнения запросов** (см. :ref:`podd-specification`); - ``retry`` - повтор попытки загрузки; - ``attempts`` - количество попыток; - ``delay`` - период задержки (в минутах). Секция http-server ##################### В секции ``http-server`` указывается порт веб-сервера. Например: .. code-block:: yaml http-server: port: ${HTTP_PORT:8090} **Параметры настроек** - ``port`` - указывается порт веб-сервера, например: ``HTTP_PORT:8090``. Секция environment ################## В секции ``environment`` указывается среда разработки (dev, test, stable, prod). Например: .. code-block:: yaml environment: name: ${ENVIRONMENT_NAME:test} **Параметры настроек** - ``name`` - среда разработки, например ``ENVIRONMENT_NAME:test``. Секция executor ############### Секция ``executor`` предназначена для указания размера пула для чтения **Kafka** и времени выполнения задач. Например: .. code-block:: yaml executor: reader-pool-size: ${EXECUTOR_READER_POOL_SIZE:20} max-execute-time: ${EXECUTOR_MAX_EXECUTE_TIME:600} **Параметры настроек** - ``reader-pool-size`` - размер пула для чтения **Kafka**, например ``EXECUTOR_READER_POOL_SIZE:20``; - ``max-execute-time`` - максимальное время выполнения задачи (в секундах), например ``EXECUTOR_MAX_EXECUTE_TIME:600``. Секция scheduler ################ Секция ``scheduler`` предназначена для настроек планировщика отложенных заданий. Например: .. code-block:: yaml scheduler: delta-apply-request-timeout: ${DELTA_APPLY_REQUEST_TIMEOUT:60} **Параметры настроек** - ``delta-apply-request-timeout`` - таймаут применения отложенной дельты, например ``DELTA_APPLY_REQUEST_TIMEOUT:60``. Секция replication #################### секции ``replication`` указываются настройки хранения чанков данных репликации. Например: .. code-block:: yaml replication: should-check-source-topic: false should-delete-prostore-topic: false prostore-topic-template: "mppw.{datamart}.{table}" storage: # блок настроек хранения чанков данных репликации. При изменении параметров необходимо синхронизировать аналогичные параметры в сервисе mppw type: ${STORAGE_TYPE:postgres} # тип, postgres|kafka postgres: # параметры подключения к базе. Используется только при type=postgres connection: database: ${STORAGE_DATABASE:replication} schema: ${STORAGE_SCHEMA:public} host: ${STORAGE_HOST:localhost} port: ${STORAGE_PORT:5432} user: ${STORAGE_USER:postgres} password: ${STORAGE_PASSWORD:postgres} pool: max-size: ${STORAGE_POOL_SIZE:30} **Параметры настроек** - ``type`` - тип, postgres|kafka, например: ``STORAGE_TYPE:postgres``; - ``connection`` - параметры подключения к базе. Секция delta ############# В секции ``delta`` настраивается параметр ожидания перед повторной попыткой открытия дельты при ошибке. Например: .. code-block:: yaml delta: # параметр ожидания перед повторной попыткой открытия дельты в случае ошибки open-delay: ${DELTA_OPEN_DELAY:${scheduler.delta-apply-request-timeout}}s # количество попыток открытия дельты в случае ошибки open-attempts: ${DELTA_OPEN_ATTEMPTS:5} # период проверки открытых дельт open-check: ${DELTA_OPEN_CHECK:60}s # количество попыток фиксации дельты в случае ошибки commit-attempts: ${DELTA_COMMIT_ATTEMPTS:5} # период ожидания перед повторной попыткой фиксации дельты commit-error-delay: ${DELTA_COMMIT_DELAY:1}s # количество попыток отката дельты в случае ошибки rollback-attempts: ${DELTA_COMMIT_ATTEMPTS:3} # период ожидания перед повторной попыткой отката дельты rollback-error-delay: ${DELTA_COMMIT_DELAY:5}s #enable/disable/period creating-delta-on-upload-request: ${DELTA_CREATING_MODE:enable} period: ${CREATING_DELTA_ON_UPLOAD_REQUEST_PERIOD:300}s **Параметры настроек** - ``open-delay`` - параметр ожидания (в секундах) перед повторной попыткой открытия дельты в случае ошибки, например ``DELTA_OPEN_DELAY:${scheduler.delta-apply-request-timeout}``; - ``open-attempts`` - количество попыток открытия дельты в случае ошибки, например ``DELTA_OPEN_ATTEMPTS:5``; - ``open-check`` - период проверки открытых дельт (в секундах), например ``DELTA_OPEN_CHECK:60``; - ``commit-attempts`` - количество попыток фиксации дельты в случае ошибки, например ``DELTA_COMMIT_ATTEMPTS:5``; - ``commit-error-delay`` - период ожидания перед повторной попыткой фиксации дельты (в секундах), например ``DELTA_COMMIT_DELAY:1``; - ``rollback-attempts`` - количество попыток отката дельты в случае ошибки, например ``DELTA_COMMIT_ATTEMPTS:3``; - ``rollback-error-delay`` - период ожидания перед повторной попыткой отката дельты (в секундах), например ``DELTA_COMMIT_DELAY:5``; - ``creating-delta-on-upload-request`` - создание дельты при запросе загрузки, например ``DELTA_CREATING_MODE:enable``; - ``period`` - период создания дельты при запросе загрузки (в секундах), например ``CREATING_DELTA_ON_UPLOAD_REQUEST_PERIOD:300``, используется только при mode=llw и creating-delta-on-upload-request=period; Для параметра ``creating-delta-on-upload-request`` доступны значения: - ``enable`` - формируются дельты для загружаемых порций данных, данное значение используется для работы подписок; - ``disable`` - формирование дельт отключено; - ``period`` - исключается формирование дельт в процессе исполнения команд загрузки данных, вместо этого дельты периодически открываются и закрываются с периодом, указанным в данном параметре. При этом приложение запоминает датамарты по которым завершены инсерты за истекший период. Дельты открываются и закрываются только для датамартов, в которые происходила вставка данных. .. attention:: При потере связи коннектора с Prostore отмены операции не происходит автоматически и модуль MPPW получает ошибку. После восстановления связи коннектора с Prostore и попытке загрузки данных в эту же таблицу возвращается ошибка о блокировке таблицы предыдущей операции вставки. Для исправления ошибки Администратор должен вручную выполнить команду ``resume`` или ``erase`` в Prostore. Секция send ########### В секции ``send`` настраиваются ограничения на размер загружаемого файла. Например: .. code-block:: yaml send: channel-size: ${SEND_CHANNEL_SIZE:10} timeout: ${SEND_TIMEOUT:60} **Параметры настроек** - ``channel-size`` - размер канала на отправку сообщения, например ``SEND_CHANNEL_SIZE:10``; - ``timeout`` - таймаут вычитывания данных из топика (сек), например ``SEND_TIMEOUT:60``. Секция zookeeper ################ Секция ``zookeeper`` предназначена для настройки параметров подключения к серверу **Zookeeper**. Например: .. code-block:: yaml zookeeper: connection-string: ${ZOOKEEPER_DS_ADDRESS:t5-adsp-01.ru-central1.internal} connection-timeout-ms: ${ZOOKEEPER_DS_CONNECTION_TIMEOUT_MS:30000} session-timeout-ms: ${ZOOKEEPER_DS_SESSION_TIMEOUT_MS:86400000} chroot: ${ZOOKEEPER_DS_CHROOT:/adapter} **Параметры конфигурации** - ``connection-string`` - подключение к Zookeeper DS, например ``ZOOKEEPER_DS_ADDRESS:t5-adsp-01.ru-central1.internal``; - ``connection-timeout-ms`` - Zookeeper DS таймаут подключения, например ``ZOOKEEPER_DS_CONNECTION_TIMEOUT_MS:30000``; - ``session-timeout-ms`` - Zookeeper DS таймаут сессии, например ``ZOOKEEPER_DS_SESSION_TIMEOUT_MS:86400000``; - ``chroot`` - Zookeeper DS chroot path, например ``ZOOKEEPER_DS_CHROOT:/adapter``. Секция prostore-rest-client ########################### В секции ``prostore-rest-client`` реализован блок параметров конфигурирования взаимодействия с **Prostore**. Например: .. code-block:: yaml prostore-rest-client: host: ${PS_HOST:localhost} port: ${PS_PORT:9195} http: max-pool-size: ${PS_MAX_POOL_SIZE:8} **Параметры настроек** - ``host`` - адрес **Prostore**, например ``PS_HOST:localhost``; - ``port`` - порт **Prostore**, например ``PS_PORT:9195``; - ``max-pool-size`` - максимальное число подключений к **Prostore**, например ``PS_MAX_POOL_SIZE:8``. Секция prostore ################ Секция ``prostore`` Например: .. code-block:: yaml prostore: key: primary: ${PRIMARY_KEY_NAME:tmp_id} type: ${PRIMARY_KEY_TYPE:BIGINT} kafka: message-limit: ${PS_MESSAGE_LIMIT:1000} zk-url: ${PS_ZK_KAFKA_URL:localhost:2181} properties: bootstrap.servers: ${PS_KAFKA:localhost:9092} **Параметры настроек** - ``primary`` - название первичного ключа, например ``tmp_id``; - ``type`` - тип первичного ключа, например ``BIGINT``; - ``message-limit`` - лимит сообщений, например ``PS_MESSAGE_LIMIT:1000``; - ``zk-url`` - адрес сервера **Zookeeper** для загрузки данных в **Prostore**, например ``PS_ZK_KAFKA_URL:localhost:2181``. Секция kafka ############ В секции ``kafka`` собраны настройки параметров подключения к шине данных **Apache Kafka**. Например: .. code-block:: yaml kafka: agent.topic.prefix: ${AGENT_TOPIC_PREFIX:} max-concurrent-handle: ${KAFKA_MAX_CONCURRENT_HANDLE:10} commit-interval: ${KAFKA_COMMIT_INTERVAL:5s} internal: bootstrap.servers: ${PS_KAFKA:localhost:9092} topic.prefix: ${INTERNAL_TOPIC_PREFIX:${agent.topic.prefix}} consumer: tp-request: topic: ${kafka.internal.topic.prefix}mppw.tp max-concurrent-handle: ${kafka.max-concurrent-handle} commit-interval: ${kafka.commit-interval} property: bootstrap.servers: ${kafka.internal.bootstrap.servers} group.id: ${kafka.internal.topic.prefix}podd-adapter-mppw-tp auto.offset.reset: earliest enable.auto.commit: false upload-request: topic: ${kafka.internal.topic.prefix}mppw.upload.rq commit-interval: ${kafka.commit-interval} property: bootstrap.servers: ${kafka.internal.bootstrap.servers} group.id: ${kafka.internal.topic.prefix}podd-adapter-mppw-upload auto.offset.reset: earliest enable.auto.commit: false delta-apply-request: topic: ${kafka.internal.topic.prefix}mppw.delta.in.rq commit-interval: ${kafka.commit-interval} property: bootstrap.servers: ${kafka.internal.bootstrap.servers} group.id: ${kafka.internal.topic.prefix}podd-adapter-mppw-delta auto.offset.reset: earliest enable.auto.commit: false upload-data: property: bootstrap.servers: ${kafka.internal.bootstrap.servers} group.id: ${kafka.internal.topic.prefix}podd-adapter-mppw-data auto.offset.reset: earliest enable.auto.commit: false producer: tp-result: ${kafka.internal.topic.prefix}mppw.rs upload-result: ${kafka.internal.topic.prefix}mppw.upload.rs delta-apply-result: ${kafka.internal.topic.prefix}mppw.delta.in.rs property: bootstrap.servers: ${kafka.internal.bootstrap.servers} **Параметры конфигурации** - ``AGENT_TOPIC_PREFIX`` - префикс для топиков Агента СМЭВ4, например. Секция metrics ############## Секция ``metrics`` предназначена для настройки параметров метрик. Например: .. code-block:: yaml metrics: port: ${METRICS_PORT:9843} **Параметры конфигурации** - ``port`` - порт для метрик, например ``METRICS_PORT:9843``. Секция jet-connector ############################ Секция ``jet-connector`` предназначена для оптимизации задержек записи данных. Например: .. code-block:: yaml jet-connector: use: ${JET_CONNECTOR_USE:false} **Параметры настроек** - ``use`` - флаг активации **jet-connector**, например ``{JET_CONNECTOR_USE:false}``; .. table:: Область применения +------------+---------------+-----------+------------+-----------+ | Режим/СУБД | ADB | ADP | ADQM | ADG | +============+===============+===========+============+===========+ | Чтение | Нет | Нет | Нет | Нет | +------------+---------------+-----------+------------+-----------+ | Запись | В перспективе | Да | Нет | Нет | +------------+---------------+-----------+------------+-----------+ Чтобы воспользоваться **jet-connector** требуется вместо ``upload external table`` создавать ``readable external table``, указывающую на топик, как отражено в `документации Prostore `_ В модуль MPPW не передается информация о том, в какую БД физически будут загружаться данные, синтаксис **Prostore** един для всех поддерживаемых СУБД. Конкретная СУБД указывается в настройках **Prostore**. Даже если загрузка данных выполняется в более чем одну базу, на работе адаптеров это не сказывается. При формировании запросов в табличными параметрами, в том числе при регистрации РЗ, необходимо явно перечислять поля таблиц, по которым будет выполняться фильтрация записей или объединение таблиц. Переключение с **jet-connector** на **kafka postgres writer** и наоборот допускается при завершенных операциях загрузки данных. .. warning:: jet-connector в настоящее время применим для ADP, что делает его применимым, только для иснталляций одной единственной СУБД ADP. Это ограничение остается на уровне документации при использовании jet-connector с другими базами должна появиться ошибка.