5. Загрузка/ удаление данных

Сначала источник данных формирует и передаёт файлы по REST API, которые накапливаются Компонентом ETL. Далее данные загружаются и вставляются в Витрину данных через внешние таблицы, или удаляются. Статусы обработки каждой операции необходимо отслеживать через Endpoint /status.

Информация о каждом шаге процесса содержится в подразделах ниже.

5.1. Начало операции загрузки / удаления согласованных данных (Endpoint – newDelta)

Под Endpoint’ом /newDelta регистрируется новая порция данных. Для того чтобы начать работу с данными, источнику данных необходимо сгенерировать UUID (идентификатор для новой порции данных) и вставить его в запрос c Endpoint’ом /newDelta.

Согласованные данные – это данные для нескольких таблиц, которые должны попасть в Витрину данных за одну операцию вставки (то есть в одной дельте), а значит будут доступны для потребителя одномоментно (такой способ загрузки актуален, когда необходимо обновить данные).

Пример набора данных, который будет загружен или удален в рамках одной дельты представлен ниже:

{
  "requestId": "6B29FC40-CA47-1067-B31D-00DD010662DA",
  "dataSetName": ["product", "stock"]
}

где:

  • requestId — идентификатор порции изменений (дельты);

  • dataSetName — массив имен набора данных (product и stock - имена таблиц).

Запрос с Endpoint’ом /newDelta будет иметь вид:

curl -X POST "http://<ip-studio>:8088/api/v1/secure/<organization_ogrn>/<datamart_mnemonic>/<installation_name>/<installation_id>/newDelta" -H "Authorization: Bearer <access_token>" -H 'Content-Type: application/json' -d '{"requestId": "6B29FC40-CA47-1067-B31D-00DD010662DA", "dataSetName": ["product", "stock"]}'

где:

  • requestId — идентификатор порции изменений (дельты), который был ранее сгенерирован;

  • dataSetName — имя набора данных (имена таблиц).

Примечание

Данные для обновления/вставки и для удаления должны быть отдельно зарегистрированы в newDelta с разными requestId. Если требуется обновить/вставить данные, то сначала нужно зарегистрировать порцию данных через newDelta с requestId, затем прислать данные с зарегистрированным requestId через endpoint /partOfDelta (описан в Загрузка / удаление согласованных данных (Endpoint – partOfDelta)). Допускается передача как в одном файле одним запросом, так и в двух файлах двумя запросами - это не имеет значения. Главное условие - нужно отправлять только данные на обновление и вставку, и они должны быть уникальны по первичному ключу. Также в них не должно быть одинаковых записей с одним первичным ключом. Пример: отправлены два обновления одной записи (с одинаковым первичным ключом). В этом случае в хранилище Prostore попадет только одна запись, и неизвестно какая, поэтому дублей по первичному ключу отправленных с одним requestId быть не должно!

Чтобы проверить статус выполнения запроса, необходимо направить запрос с Endpoint’ом /status (пример запроса описан в Проверка статусной информации по загрузке / удалению данных (Endpoint – status))

Если обработка запроса завершится успешно, то Витрина данных вернёт JSON-ответ, содержащий статус-сообщение об успешной операции:

{
    "requestId": "6B29FC40-CA47-1067-B31D-00DD010662DA",
    "dataSets": [
    "product",
        "stock"
    ],
    "inDeltaFlag": true,
    "statusCode": "SUCCESS",
    "statusMessage": "Загрузка порции данных успешно завершена"
    "errors": []
}

где:

  • requestId — идентификатор порции изменений (дельты);

  • statusCode — код возвращаемого статуса (SUCCESS – запрос выполнен успешно).

Пример JSON-ответа на проверку статуса, завершившегося ошибкой, приведен ниже:

{
    "requestId": "6B29FC40-CA47-1067-B31D-00DD010662DA",
    "statusCode": "REQUESTID_ALREADY_EXIST",
    "statusMessage": "Дельта с requestId = 6B29FC40-CA47-1067-B31D-00DD010662DA уже существует. Статус загрузки дельты: SUCCESS"
}

где:

  • requestId — идентификатор порции изменений (дельты) который необходимо проверить;

  • statusCode — код возвращаемого статуса (REQUESTID_ALREADY_EXIST – идентификатор уже существует в БД);

  • statusMessage — сообщение с описанием кода ошибки.

Примечание

Для удаления записей необходимо зарегистрировать новую дельту во Endpoint’е /newDelta с новым requestId, и по зарегистрированному requestId должны быть присланы только данные на удаление.

5.2. Загрузка / удаление согласованных данных (Endpoint – partOfDelta)

На данном шаге выполняется накапливание порции данных, создаётся вставка в Витрину данных по Endpoint’у isLastChunk.

Примечание

Если в процессе загрузки вызван метод newDelta, то текущая загрузка будет прервана и порция не попадет в Витрину данных.

Чтобы отправить последнюю порцию данных для таблицы product, необходимо направить запрос с Endpoint’ом /partOfDelta с указанием dataSetName=product и isLastChunk=true (что означает что данная порция данных - последняя). Обработка и загрузка данных не начнётся, пока не будет направлен такой же запрос, но уже по таблице stock: dataSetName=stock, isLastChunk=true.

Пример запроса на загрузку данных под ранее созданный набор данных:

curl -X POST "http://<ip-studio>:8088/api/v1/secure/<organization_ogrn>/<datamart_mnemonic>/<installation_name>/<installation_id>/partOfDelta" -H "Authorization: Bearer <access_token>" -F upload=@"./product.avro" -F dataSetName=product -F chunkNumber=0 -F isLastChunk=false -F requestId=a6212a7d-4526-4e2d-89a7-9828f380c91d

где:

  • upload — загружаемый avro-файл (пример avro-файла с данными представлен в разделе 2);

  • dataSetName — имя набора данных (имя таблицы);

  • chunkNumber — номер порции dataSet в рамках дельты;

  • isLastChunk — флаг последней порции dataSet;

  • requestId — идентификатор порции изменений (дельты).

В результате успешной загрузки при проверке статуса requestId (пример запроса по Endpoint’у /status представлен в Проверка статусной информации по загрузке / удалению данных (Endpoint – status)) на запрос Витрина данных вернёт JSON-ответ, содержащий статус-сообщение об успешной операции.

Пример успешной загрузки:

{
    "requestId": "f3947645-88c8-4044-bd8b-de273f8a8461",
    "statusCode": "SUCCESS",
    "statusMessage": "Порция получена."
}

где:

  • requestId — идентификатор порции изменений (дельты);

  • statusCode — статус код результата запроса (SUCCESS - запрос выполнен успешно);

  • statusMessage — описание статусного сообщения.

Если загрузка прервалась ошибкой, то при проверке статуса requestId (пример запроса по Endpoint’у /status представлен в Проверка статусной информации по загрузке / удалению данных (Endpoint – status)) на запрос Витрина данных вернёт JSON-ответ с описанием ошибки.

Пример загрузки, прерванной ошибкой:

{
    "requestId": "aef2f195-b037-4aaa-b171-f2746511e7e2",
    "dataSets": [
        "stock"
    ],
    "inDeltaFlag": true,
    "statusCode": "ERROR",
    "statusMessage": "Произошла ошибка"
    "errors": [
        {
            "dataSet": "stock",
            "errorType": "INSERT",
            "message": "Ошибка вставки в таблицы: stock"
        }
    ]
}

где:

  • requestId — идентификатор порции изменений (дельты);

  • dataSets — массив имен набора данных (имен таблиц где была допущена ошибка);

  • inDeltaFlag = true — загрузка согласованных данных производилась через endpoint /partOfDetla;

  • status — статус код результата запроса (ERROR – внутренняя ошибка);

  • statusMessage — описание статусного сообщения;

  • errors — массив, ошибки загрузки или парсинга входящих данных;

  • dataSet — название таблицы где допущена ошибка;

  • errorType — тип ошибки;

  • message — описание ошибки.

Примечание

Возможна ситуация, когда после падения ETL приходит запрос с requestId, который был до падения, в данном случае Витрина данных возвращает ошибку со статусом NOT_FOUND. Необходимо снова направить запрос по Endpoint’у /newDelta с новым requestId и начать процесс загрузки заново.

5.3. Загрузка / удаление несогласованных данных (Endpoint – data)

Для загрузки несогласованных данных поддерживается возможность накапливания данных, аналогично загрузке согласованных данных, описанной в Загрузка / удаление согласованных данных (Endpoint – partOfDelta).

Несогласованные данные – могут быть вставлены в разных дельтах и будут доступны потребителю постепенно по мере загрузки. Этот способ подходит для первоначальной загрузки, когда еще нет потребителей.

Вставка в Витрину данных выполнится после накопления порции или по флагу isLast, который используется для последней порции данных. Флаг isLast подаёт сигнал для завершения формирования дельты, для того чтобы выполнить вставку накопленных данных и закрыть транзакцию.

Пример запроса:

curl -X POST "http://<ip-studio>:8088/api/v1/secure/<organization_ogrn>/<datamart_mnemonic>/<installation_name>/<installation_id>/data" -H "Authorization: Bearer <access_token>" -F upload=@"./product.avro" -F dataSetName=product -F isLast=false -F requestId=a6212a7d-4526-4e2d-89a7-9828f380c91d

где:

  • upload — загружаемый avro-файл (пример avro-файла с данными представлен в Основные требования к исходным файлам);

  • dataSetName — имя набора данных (имя таблицы);

  • isLast — флаг последней порции данных (сигнал для завершения формирования дельты, для того чтобы выполнить вставку накопленных данных и закрыть транзакцию.);

  • requestId — идентификатор порции изменений (дельты).

В результате успешной операции при проверке статуса запроса по Endpoint’у /status (пример запроса описан в Проверка статусной информации по загрузке / удалению данных (Endpoint – status)) Витрина данных вернёт JSON-ответ, содержащий статус-сообщение:

{
    "requestId": "6B29FC40-CA47-1067-B31D-00DD010662DA",
    "statusCode": "SUCCESS",
    "statusMessage": "Порция получена."
}

где:

  • requestId — идентификатор порции изменений (дельты);

  • statusCode — статус код результата запроса (SUCCESS - запрос выполнен успешно);

  • statusMessage — описание статусного сообщения.

Если загрузка прервалась ошибкой, то при проверке requestId (пример запроса по Endpoint’у /status представлен в Проверка статусной информации по загрузке / удалению данных (Endpoint – status)) Витрина данных вернёт JSON-ответ с описанием ошибки.

Пример JSON-ответа:

{
    "requestId": "6B29FC40-CA47-1067-B31D-00DD010662DA",
    "statusCode": "NOT_FOUND",
    "statusMessage": "Не найдена дельта с requestId = 6B29FC40-CA47-1067-B31D-00DD010662DA"
}

где:

  • requestId — идентификатор порции изменений (дельты);

  • statusCode — статус код результата запроса (NOT_FOUND - данные по requestId были утеряны в результате остановки сервиса, необходимо зарегистрировать новую дельту и снова загрузить данные);

  • statusMessage — описание статусного сообщения.

5.4. Описание возвращаемых кодов

Таблица 5.4 Описание возвращаемых кодов

Наименование

Код

Описание

EMPTY_ATTACHMENT

400

Нет файла вложения

ERROR

500

Внутренняя ошибка

NOT_FOUND

400

Данные не найдены, либо были утеряны в результате остановки сервиса

PROCESSING

400

Идет обработка данных

REQUESTID_ALREADY_EXIST

400

requestId уже зарегистрирован

SUCCESS

200

Успешное выполнение

UNREGISTERED_DATASETNAME

400

Незарегистрированный набор данных

WRONG_ENDPOINT

400

requestId зарегистрирован для другого Endpoint’а