2. Конфигурация модуля DATA-Uploader (application.yml)
Файл application.yml – основной конфигурационный файл модуля, в котором задана его логика и порядок работы модуля:
обеспечение обработки очереди файлов, настройка подключения к ядру витрины (секция: prostore), а также другие
настройки необходимые для корректной работы модуля.
2.1. Пример файла application.yml
В конфигурационном файле следует задавать только те настройки, которые необходимы для решения текущих бизнес-задач.
http-server:
port: ${SERVER_PORT:8082}
prostore-rest-client:
host: ${PS_HOST:localhost}
port: ${PS_PORT:9195}
http:
max-pool-size: ${PS_MAX_POOL_SIZE:8}
redis:
type: ${REDIS_TYPE:STANDALONE}
connection-string: ${REDIS_CONNECTION_STRING:redis://localhost:6379}
password: ${REDIS_PASSWORD:eYVX7EwVmmxKPCDmwMtyKVge8oLd2t81}
max-pool-size: ${REDIS_MAX_POOL_SIZE:6}
max-pool-waiting: ${REDIS_MAX_POOL_WAITING:24}
max-waiting-handlers: ${REDIS_MAX_WAITING_HANDLERS:32}
upload:
concurrency: ${UPLOAD_CONCURRENCY:100} # Общее количество параллельных задач загрузки
send-concurrency: ${UPLOAD_SEND_CONCURRENCY:10} # Количество параллельных отправок данных в кафку для каждого файла
poll-interval: ${UPLOAD_POLL_INTERVAL:500ms} # Интервал опроса очереди сообщений на загрузку
mode: ${UPLOAD_MODE:mppw} # mppw|llw
llw-batch: ${UPLOAD_LLW_BATCH:1000} # Число записей в одной команде upsert/delete. Используется только при mode=llw
creating-delta-on-upload-request: ${DELTA_CREATING_MODE:enable} # enable|disable|period Используется только при mode=llw
period: ${CREATING_DELTA_ON_UPLOAD_REQUEST_PERIOD:300}s # Используется только при mode=llw и creating-delta-on-upload-request=period
check-uniqueness: true # выполнять ли проверку уникальности первичных ключей
pool:
size: ${UPLOAD_POOL_SIZE:16}
retry:
attempts: 5
delay: 1m
data-storage:
type: ${DATA_STORAGE_TYPE:dir} # redis|dir|s3
# Директория хранения файлов для типа dir
dir: ${DATA_STORAGE_DIR:/tmp}
s3:
endpoint: ${S3_ENDPOINT:http://127.0.0.1:9000/}
bucket: ${S3_BUCKET:data} # Имя бакета
region: ${S3_REGION:}
access-key: ${S3_USER:minioadmin} # Пользователь, под которым происходит взаимодействие с s3
secret-key: ${S3_PASSWORD:minioadmin} # Пароль пользователя для взаимодействия с s3
environment:
name: ${ENVIRONMENT_NAME:test}
zookeeper:
connection-string: ${ZOOKEEPER_DS_ADDRESS:localhost}
connection-timeout-ms: ${ZOOKEEPER_DS_CONNECTION_TIMEOUT_MS:30000}
session-timeout-ms: ${ZOOKEEPER_DS_SESSION_TIMEOUT_MS:8640000}
chroot: ${ZOOKEEPER_DS_CHROOT:/adapter}
csv-parser:
separator: ${CSV_PARSER_SEPARATOR:;}
quote-char: ${CSV_PARSER_QUOTE_CHAR:"}
escape-char: ${CSV_PARSER_ESCAPE_CHAR:'}
# Настройка интерпретации значений как null. Допустимые значения:
# - EMPTY_SEPARATORS - пустое значение между двумя разделителями, например ;;
# - EMPTY_QUOTES - пустые кавычки, например ;"";
# - BOTH - оба варианта
# - NEITHER - никогда. Пустая строка всегда определяется как пустая строка
field-as-null: ${CSV_PARSER_FIELD_AS_NULL:EMPTY_SEPARATORS}
# опция позволяет парсить мультистроковые значения
multiline-limit: 0
active:
timeout: ${ACTIVE_TIMEOUT:60}
time-to-live: ${ACTIVE_TTL:180}
delta:
timeout: ${DELTA_TIMEOUT:300}
row-max-count: ${DELTA_MAX_ROWS:500000}
chunk-row-max-count: ${DELTA_CHUNK_ROWS:10000}
chunk-data-max-size: ${DELTA_CHUNK_DATA_SIZE:1048576}
# параметр ожидания перед повторной попыткой открытия дельты в случае ошибки
open-delay: ${DELTA_OPEN_DELAY:60}s
# количество попыток открытия дельты в случае ошибки
open-attempts: ${DELTA_OPEN_ATTEMPTS:5}
# период проверки открытых дельт
open-check: ${DELTA_OPEN_CHECK:60}s
# количество попыток фиксации дельты в случае ошибки
commit-attempts: ${DELTA_COMMIT_ATTEMPTS:5}
# период ожидания перед повторной попыткой фиксации дельты
commit-error-delay: ${DELTA_COMMIT_DELAY:1}s
# количество попыток отката дельты в случае ошибки
rollback-attempts: ${DELTA_COMMIT_ATTEMPTS:3}
# период ожидания перед повторной попыткой отката дельты
rollback-error-delay: ${DELTA_COMMIT_DELAY:5}s
response:
time-to-live: ${RESPONSE_TTL:36000}
kafka:
agent.topic.prefix: ${AGENT_TOPIC_PREFIX:}
internal:
bootstrap.servers: ${PS_KAFKA:localhost:9092}
topic.prefix: ${INTERNAL_TOPIC_PREFIX:${kafka.agent.topic.prefix}}
mppw-consumer:
property:
bootstrap.servers: ${kafka.internal.bootstrap.servers}
group.id: data.uploader.group
auto.offset.reset: earliest
enable.auto.commit: true
mppw-producer:
property:
bootstrap.servers: ${kafka.internal.bootstrap.servers}
data-producer:
property:
bootstrap.servers: ${kafka.internal.bootstrap.servers}
data-topic-prefix: ${kafka.internal.topic.prefix}mppw.upload.data
mppw-upload-rq-topic: ${kafka.internal.topic.prefix}mppw.upload.rq
mppw-upload-rs-topic: ${kafka.internal.topic.prefix}mppw.upload.rs
metrics:
port: ${METRICS_PORT:9837}
3. Параметры конфигурации
Настройка конфигурации DATA-Uploader осуществляется путем редактирования параметров настроек в файле application.yml,
где настраиваются секции:
http-server- указывается порт сервера;prostore-rest-client- блок параметров конфигурирования взаимодействия с ProStore.redis- настройка подключения к Redis;upload- указываются настройки параллельной загрузки данных;data-storage- директория хранения файлов для типаdir;environment- определяет значение среды разработки;zookeeper- определяет настройки подключения к Zookeeper;csv-parser- настройка парсера CSV-файлов;active- настройки интервалов между попытками перехода в активное состояние и времени жизни ключа флага активности;delta- настройка обработок загрузок и отправок заданий на загрузку дельт;response- настройка времени хранения ответов по заданию в Redis;kafka- настройка распределений отправки топиков;metrics- настройка получения метрик.
3.1. Секция http-server
В секции http-server указывается порт веб-сервера.
Например:
server:
port: ${SERVER_PORT:8081}
Параметры настроек
port- порт веб-сервера, например:SERVER_PORT:8081.
3.2. Секция prostore-rest-client
В секции prostore-rest-client реализован блок параметров конфигурирования взаимодействия с Prostore.
Например:
prostore-rest-client:
host: ${PS_HOST:t5-prostore-01.ru-central1.internal}
port: ${PS_PORT:9195}
http:
max-pool-size: ${PS_MAX_POOL_SIZE:8}
Параметры настроек
host- адрес Prostore, напримерPS_HOST:t5-prostore-01.ru-central1.internal;port- порт Prostore, напримерPS_PORT:9195;max-pool-size- максимальное число подключений к Prostore, напримерPS_MAX_POOL_SIZE:8.
3.3. Секция redis
Секция redis определяет настройки подключения к Redis.
Например:
redis:
type: ${REDIS_TYPE:STANDALONE}
connection-string: ${REDIS_CONNECTION_STRING:redis://localhost:6379}
password: ${REDIS_PASSWORD:eYVX7EwVmmxKPCDmwMtyKVge8oLd2t81}
max-pool-size: ${REDIS_MAX_POOL_SIZE:6}
max-pool-waiting: ${REDIS_MAX_POOL_WAITING:24}
max-waiting-handlers: ${REDIS_MAX_WAITING_HANDLERS:32}
Параметры настроек
type- тип подключения к Redis (STANDALONE/CLUSTER), напримерREDIS_TYPE:STANDALONE;connection-string- указывается список серверов для подключения (перечисление через запятую), напримерREDIS_CONNECTION_STRING:redis://localhost:6379;password- пароль для подключения, напримерREDIS_PASSWORD:eYVX7EwVmmxKPCDmwMtyKVge8oLd2t81;max-pool-size- устанавливается максимальный размер пула, напримерREDIS_MAX_POOL_SIZE:6;max-pool-waiting- устанавливается максимальный размер пула ожидающих команд, напримерREDIS_MAX_POOL_WAITING:24;max-waiting-handlers- устанавливается максимальный размер ожидающих обработчиков, напримерREDIS_MAX_WAITING_HANDLERS:32;
3.4. Секция upload
Секция upload определяет настройки параллельной загрузки данных.
Например:
upload:
concurrency: ${UPLOAD_CONCURRENCY:100} # Общее количество параллельных задач загрузки
send-concurrency: ${UPLOAD_SEND_CONCURRENCY:10} # Количество параллельных отправок данных в кафку для каждого файла
poll-interval: ${UPLOAD_POLL_INTERVAL:500ms} # Интервал опроса очереди сообщений на загрузку
mode: ${UPLOAD_MODE:mppw} # mppw|llw
llw-batch: ${UPLOAD_LLW_BATCH:1000} # Число записей в одной команде upsert/delete. Используется только при mode=llw
creating-delta-on-upload-request: ${DELTA_CREATING_MODE:enable} # enable|disable|period Используется только при mode=llw
period: ${CREATING_DELTA_ON_UPLOAD_REQUEST_PERIOD:300}s # Используется только при mode=llw и creating-delta-on-upload-request=period
check-uniqueness: true # выполнять ли проверку уникальности первичных ключей
pool:
size: ${UPLOAD_POOL_SIZE:16}
retry:
attempts: 5
delay: 1m
Параметры настроек
concurrency- общее количество параллельных задач загрузки, например100;send-concurrency- количество параллельных отправок данных в кафку для каждого файла, например10;poll-interval- интервал опроса очереди сообщений на загрузку, например500ms;mode- режим загрузки, напримерUPLOAD_MODE:mppw;возможные значения:
mppw/llw;llw-batch- число записей в одной команде upsert/delete, напримерUPLOAD_LLW_BATCH:1000, используется только при mode=llw;creating-delta-on-upload-request- создание дельты при запросе загрузки, напримерDELTA_CREATING_MODE:enable, используется только при mode=llw;возможные значения:
enable/disable/period;period- период создания дельты при запросе загрузки (в секундах), напримерCREATING_DELTA_ON_UPLOAD_REQUEST_PERIOD:300, используется только при mode=llw и creating-delta-on-upload-request=period;check-uniqueness- флаг выполнения проверки уникальности первичных ключей;pool/size- размер пула загрузки, напримерUPLOAD_POOL_SIZE:16;retry- повтор попытки загрузки;attempts- количество попыток;delay- период задержки (в минутах).
3.5. Секция data-storage
В секции data-storage указывается директория хранения файлов для типа dir.
Например:
data-storage:
type: ${DATA_STORAGE_TYPE:dir} # redis|dir|s3
# Директория хранения файлов для типа dir
dir: ${DATA_STORAGE_DIR:/tmp}
s3:
endpoint: ${S3_ENDPOINT:http://127.0.0.1:9000/}
bucket: ${S3_BUCKET:data} # Имя бакета
region: ${S3_REGION:}
access-key: ${S3_USER:minioadmin} # Пользователь, под которым происходит взаимодействие с s3
secret-key: ${S3_PASSWORD:minioadmin} # Пароль пользователя для взаимодействия с s3
Параметры настроек
type- тип файлов, напримерDATA_STORAGE_TYPE:dir;возможные значения: redis / dir / s3;
dir- директория хранения файлов для типа dir, напримерDATA_STORAGE_DIR:/tmp;s3- настройки облачного хранилища S3;endpoint- адрес конечной точки, напримерS3_ENDPOINT:http://127.0.0.1:9000/;bucket- имя бакета, напримерS3_BUCKET:dataregion- регион хранилища;access-key- пользователь, под которым происходит взаимодействие с s3, напримерS3_USER:minioadmin;secret-key- пароль пользователя для взаимодействия с s3, напримерS3_PASSWORD:minioadmin.
3.6. Секция environment
В секции environment выбирается среда разработки (например, значение test, prod и т.д).
Например:
environment:
name: ${ENVIRONMENT_NAME:test}
Параметры конфигурации
name- название окружения, напримерENVIRONMENT_NAME:test.
3.7. Секция zookeeper
В секции zookeeper настраиваются параметры подключения к Zookeeper.
Например:
zookeeper:
connection-string: ${ZOOKEEPER_DS_ADDRESS:localhost}
connection-timeout-ms: ${ZOOKEEPER_DS_CONNECTION_TIMEOUT_MS:30000}
session-timeout-ms: ${ZOOKEEPER_DS_SESSION_TIMEOUT_MS:8640000}
chroot: ${ZOOKEEPER_DS_CHROOT:/adapter}
Параметры настроек
connection-string- Подключение к Zookeeper DS, напримерZZOOKEEPER_DS_ADDRESS:localhost;connection-timeout-ms- Zookeeper DS таймаут подключения, напримерZOOKEEPER_DS_CONNECTION_TIMEOUT_MS:30000;session-timeout-ms- Zookeeper DS таймаут сессии, напримерZOOKEEPER_DS_SESSION_TIMEOUT_MS:40000;chroot- Zookeeper DS chroot path, напримерZOOKEEPER_DS_CHROOT:/adapter.
3.8. Секция csv-parser
Внимание
При загрузке файлов с форматно-логическим контролем, важно, чтобы настройки секции csv-parser были одинаковы в модулях CSV-Uploader(если используется его UI),REST-Uploader и DATA-Uploader.
Секция csv-parser - настройка парсинга CSV.
Например:
csv-parser:
separator: ${CSV_PARSER_SEPARATOR:;}
quote-char: ${CSV_PARSER_QUOTE_CHAR:"}
escape-char: ${CSV_PARSER_ESCAPE_CHAR:'}
field-as-null: ${CSV_PARSER_FIELD_AS_NULL:EMPTY_SEPARATORS}
multiline-limit: 0
Параметры конфигурации
separator- Символ разделителя значений, напримерCSV_PARSER_SEPARATOR:;;quote-char- символ кавычки, напримерCSV_PARSER_QUOTE_CHAR:";escape-char- Символ экранирования значений, напримерCSV_PARSER_ESCAPE_CHAR:';Настройка интерпретации значений как null. Допустимые значения:
EMPTY_SEPARATORS - пустое значение между двумя разделителями, например ;;
EMPTY_QUOTES - пустые кавычки, например ;»»;
BOTH - оба варианта
NEITHER - никогда. Пустая строка всегда определяется как пустая строка
field-as-null- способ определения null поля, напримерCSV_PARSER_FIELD_AS_NULL:EMPTY_SEPARATORS.multiline-limit- параметры парсинга мультистроковых значений.
Внимание
Парсер может зависать если в данных встречаются вложенные неэкранированные кавычки..
3.8.1. Дополнительное описание параметров
Параметр
CSV_PARSER_ESCAPE_CHARработает следующим образом: если символ экранирования и символ кавычки равны", то будет использован RFC4180Parser, который считывает все символы между двумя двойными кавычками, при этом двойная кавычка в тексте поля должна быть экранирована двойной кавычкой (Например"поле, ""содержащее двойную кавычку"""будет считано какполе, "содержащее двойную кавычку"). В противном случае будет использован CSVParser, использующий символ экранирования для обозначения «непечатаемых символов».Параметр
CSV_PARSER_FIELD_AS_NULLможет принимать следующие значения:EMPTY_SEPARATORS- два разделителя полей (см. csv-parser/separator) подряд считаются null. Например: строка [aaa,,ccc] содержит значения [«aaa», null, «bbb»], а строка [aaa,»»,ccc] содержит значения [«aaa», «», «bbb»].EMPTY_QUOTES- два «ограничителя строки» (см. csv-parser/escape-char) подряд считаются null. Например: строка [aaa,»»,ccc] содержит значения [«aaa», null, «bbb»], а строка [aaa,,ccc] содержит значения [«aaa», «», «bbb»].BOTH- оба варианта (см.EMPTY_SEPARATORSиEMPTY_QUOTES) считаются null. Например: обе строки [aaa,»»,ccc] и [aaa,,bbb] содержат одинаковое значение [«aaa», null, «bbb»].NEITHER- ни один из вариантов (см.EMPTY_SEPARATORSиEMPTY_QUOTES) не считается null. Например: обе строки [aaa,»»,ccc] и [aaa,,bbb] содержат одинаковое значение [«aaa», «», «bbb»].
3.9. Секция active
В секции active настраиваются интервалы между попытками перехода в активное состояние и времени жизни ключа флага
активности.
Например:
active:
timeout: ${ACTIVE_TIMEOUT:60}
time-to-live: ${ACTIVE_TTL:180}
Параметры настроек
timeout- интервал между попытками перехода в активное состояние;time-to-live- время жизни ключа флага активности.
3.10. Секция delta
Секция delta предназначена для указания настройки обработок загрузок и отправок заданий на загрузку дельт.
Например:
delta:
timeout: ${DELTA_TIMEOUT:300}
row-max-count: ${DELTA_MAX_ROWS:500000}
chunk-row-max-count: ${DELTA_CHUNK_ROWS:10000}
chunk-data-max-size: ${DELTA_CHUNK_DATA_SIZE:1048576}
# параметр ожидания перед повторной попыткой открытия дельты в случае ошибки
open-delay: ${DELTA_OPEN_DELAY:60}s
# количество попыток открытия дельты в случае ошибки
open-attempts: ${DELTA_OPEN_ATTEMPTS:5}
# период проверки открытых дельт
open-check: ${DELTA_OPEN_CHECK:60}s
# количество попыток фиксации дельты в случае ошибки
commit-attempts: ${DELTA_COMMIT_ATTEMPTS:5}
# период ожидания перед повторной попыткой фиксации дельты
commit-error-delay: ${DELTA_COMMIT_DELAY:1}s
# количество попыток отката дельты в случае ошибки
rollback-attempts: ${DELTA_COMMIT_ATTEMPTS:3}
# период ожидания перед повторной попыткой отката дельты
rollback-error-delay: ${DELTA_COMMIT_DELAY:5}s
Параметры настроек
timeout- максимальный интервал времени между первой считанной записью цикла загрузки и отправкой заданий на загрузку дельт, напримерDELTA_TIMEOUT:300;row-max-count- максимальное количество обработанных записей для старта отправки заданий на загрузку дельт, напримерDELTA_CHUNK_ROWS:10000;chunk-row-max-count- количество сообщений в одном чанке, напримерDELTA_CHUNK_ROWS:10000;chunk-data-max-size- максимальный размер чанка, напримерDELTA_CHUNK_DATA_SIZE:1048576;open-delay- параметр ожидания (в секундах) перед повторной попыткой открытия дельты в случае ошибки, напримерDELTA_OPEN_DELAY:60;open-attempts- количество попыток открытия дельты в случае ошибки, напримерDELTA_OPEN_ATTEMPTS:5;open-check- период проверки открытых дельт (в секундах), напримерDELTA_OPEN_CHECK:60;commit-attempts- количество попыток фиксации дельты в случае ошибки, напримерDELTA_COMMIT_ATTEMPTS:5;commit-error-delay- период ожидания перед повторной попыткой фиксации дельты (в секундах), напримерDELTA_COMMIT_DELAY:1;rollback-attempts- количество попыток отката дельты в случае ошибки, напримерDELTA_COMMIT_ATTEMPTS:3;rollback-error-delay- период ожидания перед повторной попыткой отката дельты (в секундах), напримерDELTA_COMMIT_DELAY:5.
3.11. Секция response
Секция response определяет настройка времени хранения ответов по заданию в Redis.
Например:
response:
time-to-live: ${RESPONSE_TTL:36000}
Параметры настроек
time-to-live- Сколько времени Redis будет хранить ответ по заданию (ключ status.<uuid>).
3.12. Секция kafka
Секция kafka позволяет настраивать отправку топиков в заданные модули.
Например:
kafka:
agent.topic.prefix: ${AGENT_TOPIC_PREFIX:}
internal:
bootstrap.servers: ${PS_KAFKA:localhost:9092}
topic.prefix: ${INTERNAL_TOPIC_PREFIX:${kafka.agent.topic.prefix}}
mppw-consumer:
property:
bootstrap.servers: ${kafka.internal.bootstrap.servers}
group.id: data.uploader.group
auto.offset.reset: earliest
enable.auto.commit: true
mppw-producer:
property:
bootstrap.servers: ${kafka.internal.bootstrap.servers}
data-producer:
property:
bootstrap.servers: ${kafka.internal.bootstrap.servers}
data-topic-prefix: ${kafka.internal.topic.prefix}mppw.upload.data
mppw-upload-rq-topic: ${kafka.internal.topic.prefix}mppw.upload.rq
mppw-upload-rs-topic: ${kafka.internal.topic.prefix}mppw.upload.rs
Параметры настроек
data-topic-prefix- префикс топика, куда будут отправляться данные;mppw-upload-rq-topic- топик для отправки заданий на MPPW модуль;mppw-upload-rq-topic- топик, в который будут приходить результаты исполнения заданий MPPW модуля.
3.13. Секция metrics
Секция metrics предназначена для настройки параметров метрик.
Например:
metrics:
port: ${METRICS_PORT:9837}
Параметры настроек
port- порт для метрик, напримерMETRICS_PORT:9837.