.. _etl_load: Загрузка/ удаление данных ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ Сначала источник данных формирует и передаёт файлы по REST API, которые накапливаются Компонентом ETL. Далее данные загружаются и вставляются в Витрину данных через внешние таблицы, или удаляются. Статусы обработки каждой операции необходимо отслеживать через Endpoint ``/status``. Информация о каждом шаге процесса содержится в подразделах ниже. .. _start_load_agreed_data: Начало операции загрузки / удаления согласованных данных (Endpoint – newDelta) ################################################################################## Под Endpoint’ом ``/newDelta`` регистрируется новая порция данных. Для того чтобы начать работу с данными, источнику данных необходимо сгенерировать UUID (идентификатор для новой порции данных) и вставить его в запрос c Endpoint’ом ``/newDelta``. **Согласованные данные** – это данные для нескольких таблиц, которые должны попасть в Витрину данных за одну операцию вставки (то есть в одной дельте), а значит будут доступны для потребителя одномоментно (такой способ загрузки актуален, когда необходимо обновить данные). Пример набора данных, который будет загружен или удален в рамках одной дельты представлен ниже: .. code-block:: json { "requestId": "6B29FC40-CA47-1067-B31D-00DD010662DA", "dataSetName": ["product", "stock"] } где: - ``requestId`` — идентификатор порции изменений (дельты); - ``dataSetName`` — массив имен набора данных (product и stock - имена таблиц). Запрос с Endpoint’ом ``/newDelta`` будет иметь вид: .. code-block:: curl -X POST "http://:8088/api/v1/secure/////newDelta" -H "Authorization: Bearer " -H 'Content-Type: application/json' -d '{"requestId": "6B29FC40-CA47-1067-B31D-00DD010662DA", "dataSetName": ["product", "stock"]}' где: - ``requestId`` — идентификатор порции изменений (дельты), который был ранее сгенерирован; - ``dataSetName`` — имя набора данных (имена таблиц). .. note:: Данные для обновления/вставки и для удаления должны быть отдельно зарегистрированы в newDelta с разными requestId. Если требуется обновить/вставить данные, то сначала нужно зарегистрировать порцию данных через newDelta с requestId, затем прислать данные с зарегистрированным requestId через endpoint /partOfDelta (описан в :numref:`load_agreed_data`). Допускается передача как в одном файле одним запросом, так и в двух файлах двумя запросами - это не имеет значения. Главное условие - нужно отправлять только данные на обновление и вставку, и они должны быть уникальны по первичному ключу. Также в них не должно быть одинаковых записей с одним первичным ключом. Пример: отправлены два обновления одной записи (с одинаковым первичным ключом). В этом случае в хранилище Prostore попадет только одна запись, и неизвестно какая, поэтому дублей по первичному ключу отправленных с одним requestId быть не должно! Чтобы проверить статус выполнения запроса, необходимо направить запрос с Endpoint’ом ``/status`` (пример запроса описан в :numref:`etl_check`) Если обработка запроса завершится успешно, то Витрина данных вернёт JSON-ответ, содержащий статус-сообщение об успешной операции: .. code-block:: json { "requestId": "6B29FC40-CA47-1067-B31D-00DD010662DA", "dataSets": [ "product", "stock" ], "inDeltaFlag": true, "statusCode": "SUCCESS", "statusMessage": "Загрузка порции данных успешно завершена" "errors": [] } где: - ``requestId`` — идентификатор порции изменений (дельты); - ``statusCode`` — код возвращаемого статуса (SUCCESS – запрос выполнен успешно). Пример JSON-ответа на проверку статуса, завершившегося ошибкой, приведен ниже: .. code-block:: json { "requestId": "6B29FC40-CA47-1067-B31D-00DD010662DA", "statusCode": "REQUESTID_ALREADY_EXIST", "statusMessage": "Дельта с requestId = 6B29FC40-CA47-1067-B31D-00DD010662DA уже существует. Статус загрузки дельты: SUCCESS" } где: - ``requestId`` — идентификатор порции изменений (дельты) который необходимо проверить; - ``statusCode`` — код возвращаемого статуса (REQUESTID_ALREADY_EXIST – идентификатор уже существует в БД); - ``statusMessage`` — сообщение с описанием кода ошибки. .. note:: Для удаления записей необходимо зарегистрировать новую дельту во Endpoint’е ``/newDelta`` с новым ``requestId``, и по зарегистрированному ``requestId`` должны быть присланы только данные на удаление. .. _load_agreed_data: Загрузка / удаление согласованных данных (Endpoint – partOfDelta) ################################################################### На данном шаге выполняется накапливание порции данных, создаётся вставка в Витрину данных по Endpoint’у ``isLastChunk``. .. note:: Если в процессе загрузки вызван метод ``newDelta``, то текущая загрузка будет прервана и порция не попадет в Витрину данных. Чтобы отправить последнюю порцию данных для таблицы product, необходимо направить запрос с Endpoint’ом ``/partOfDelta`` с указанием ``dataSetName=product`` и ``isLastChunk=true`` (что означает что данная порция данных - последняя). Обработка и загрузка данных не начнётся, пока не будет направлен такой же запрос, но уже по таблице ``stock: dataSetName=stock, isLastChunk=true``. Пример запроса на загрузку данных под ранее созданный набор данных: .. code-block:: curl -X POST "http://:8088/api/v1/secure/////partOfDelta" -H "Authorization: Bearer " -F upload=@"./product.avro" -F dataSetName=product -F chunkNumber=0 -F isLastChunk=false -F requestId=a6212a7d-4526-4e2d-89a7-9828f380c91d где: - ``upload`` — загружаемый avro-файл (пример avro-файла с данными представлен в разделе 2); - ``dataSetName`` — имя набора данных (имя таблицы); - ``chunkNumber`` — номер порции dataSet в рамках дельты; - ``isLastChunk`` — флаг последней порции dataSet; - ``requestId`` — идентификатор порции изменений (дельты). В результате успешной загрузки при проверке статуса ``requestId`` (пример запроса по Endpoint’у ``/status`` представлен в :numref:`etl_check`) на запрос Витрина данных вернёт JSON-ответ, содержащий статус-сообщение об успешной операции. Пример успешной загрузки: .. code-block:: { "requestId": "f3947645-88c8-4044-bd8b-de273f8a8461", "statusCode": "SUCCESS", "statusMessage": "Порция получена." } где: - ``requestId`` — идентификатор порции изменений (дельты); - ``statusCode`` — статус код результата запроса (SUCCESS - запрос выполнен успешно); - ``statusMessage`` — описание статусного сообщения. Если загрузка прервалась ошибкой, то при проверке статуса ``requestId`` (пример запроса по Endpoint’у ``/status`` представлен в :numref:`etl_check`) на запрос Витрина данных вернёт JSON-ответ с описанием ошибки. Пример загрузки, прерванной ошибкой: .. code-block:: json { "requestId": "aef2f195-b037-4aaa-b171-f2746511e7e2", "dataSets": [ "stock" ], "inDeltaFlag": true, "statusCode": "ERROR", "statusMessage": "Произошла ошибка" "errors": [ { "dataSet": "stock", "errorType": "INSERT", "message": "Ошибка вставки в таблицы: stock" } ] } где: - ``requestId`` — идентификатор порции изменений (дельты); - ``dataSets`` — массив имен набора данных (имен таблиц где была допущена ошибка); - ``inDeltaFlag`` = true — загрузка согласованных данных производилась через endpoint /partOfDetla; - ``status`` — статус код результата запроса (ERROR – внутренняя ошибка); - ``statusMessage`` — описание статусного сообщения; - ``errors`` — массив, ошибки загрузки или парсинга входящих данных; - ``dataSet`` — название таблицы где допущена ошибка; - ``errorType`` — тип ошибки; - ``message`` — описание ошибки. .. note:: Возможна ситуация, когда после падения ETL приходит запрос с ``requestId``, который был до падения, в данном случае Витрина данных возвращает ошибку со статусом NOT_FOUND. Необходимо снова направить запрос по Endpoint’у ``/newDelta`` с новым ``requestId`` и начать процесс загрузки заново. .. _load_unagreed_data: Загрузка / удаление несогласованных данных (Endpoint – data) ################################################################### Для загрузки несогласованных данных поддерживается возможность накапливания данных, аналогично загрузке согласованных данных, описанной в :numref:`load_agreed_data`. **Несогласованные данные** – могут быть вставлены в разных дельтах и будут доступны потребителю постепенно по мере загрузки. Этот способ подходит для первоначальной загрузки, когда еще нет потребителей. Вставка в Витрину данных выполнится после накопления порции или по флагу ``isLast``, который используется для последней порции данных. Флаг ``isLast`` подаёт сигнал для завершения формирования дельты, для того чтобы выполнить вставку накопленных данных и закрыть транзакцию. Пример запроса: .. code-block:: curl -X POST "http://:8088/api/v1/secure/////data" -H "Authorization: Bearer " -F upload=@"./product.avro" -F dataSetName=product -F isLast=false -F requestId=a6212a7d-4526-4e2d-89a7-9828f380c91d где: - ``upload`` — загружаемый avro-файл (пример avro-файла с данными представлен в :numref:`etl_config`); - ``dataSetName`` — имя набора данных (имя таблицы); - ``isLast`` — флаг последней порции данных (сигнал для завершения формирования дельты, для того чтобы выполнить вставку накопленных данных и закрыть транзакцию.); - ``requestId`` — идентификатор порции изменений (дельты). В результате успешной операции при проверке статуса запроса по Endpoint’у ``/status`` (пример запроса описан в :numref:`etl_check`) Витрина данных вернёт JSON-ответ, содержащий статус-сообщение: .. code-block:: json { "requestId": "6B29FC40-CA47-1067-B31D-00DD010662DA", "statusCode": "SUCCESS", "statusMessage": "Порция получена." } где: - ``requestId`` — идентификатор порции изменений (дельты); - ``statusCode`` — статус код результата запроса (SUCCESS - запрос выполнен успешно); - ``statusMessage`` — описание статусного сообщения. Если загрузка прервалась ошибкой, то при проверке ``requestId`` (пример запроса по Endpoint’у ``/status`` представлен в :numref:`etl_check`) Витрина данных вернёт JSON-ответ с описанием ошибки. Пример JSON-ответа: .. code-block:: json { "requestId": "6B29FC40-CA47-1067-B31D-00DD010662DA", "statusCode": "NOT_FOUND", "statusMessage": "Не найдена дельта с requestId = 6B29FC40-CA47-1067-B31D-00DD010662DA" } где: - ``requestId`` — идентификатор порции изменений (дельты); - ``statusCode`` — статус код результата запроса (NOT_FOUND - данные по requestId были утеряны в результате остановки сервиса, необходимо зарегистрировать новую дельту и снова загрузить данные); - ``statusMessage`` — описание статусного сообщения. .. _responce_code_load: Описание возвращаемых кодов ############################# .. table:: Описание возвращаемых кодов +--------------------------+-----+---------------------------------------------------------------------+ | Наименование | Код | Описание | +==========================+=====+=====================================================================+ | EMPTY_ATTACHMENT | 400 | Нет файла вложения | +--------------------------+-----+---------------------------------------------------------------------+ | ERROR | 500 | Внутренняя ошибка | +--------------------------+-----+---------------------------------------------------------------------+ | NOT_FOUND | 400 | Данные не найдены, либо были утеряны в результате остановки сервиса | +--------------------------+-----+---------------------------------------------------------------------+ | PROCESSING | 400 | Идет обработка данных | +--------------------------+-----+---------------------------------------------------------------------+ | REQUESTID_ALREADY_EXIST | 400 | ``requestId`` уже зарегистрирован | +--------------------------+-----+---------------------------------------------------------------------+ | SUCCESS | 200 | Успешное выполнение | +--------------------------+-----+---------------------------------------------------------------------+ | UNREGISTERED_DATASETNAME | 400 | Незарегистрированный набор данных | +--------------------------+-----+---------------------------------------------------------------------+ | WRONG_ENDPOINT | 400 | ``requestId`` зарегистрирован для другого Endpoint’а | +--------------------------+-----+---------------------------------------------------------------------+