2. Основные требования к исходным файлам

Загрузка данных в систему производится в виде файлов по HTTP, каждый из которых имеет структуру, представленную на Структура загружаемых сообщений.

Структура загружаемых сообщений

Рисунок - 2.46 Структура загружаемых сообщений

Для успешной загрузки данные должны соответствовать следующим условиям:

  1. Тело сообщения представляет собой файл Avro (Object Container File), который состоит из заголовка и блоков данных.

  2. Заголовок файла содержит схему данных Avro.

  3. Схема данных содержит следующие элементы:

    • имя;

    • тип “record”;

    • перечень полей.

  4. Последним полем схемы должно быть указано служебное поле sys_op с типом данных avro.int.

  5. Каждый блок данных содержит запись, представленную в бинарной кодировке.

  6. Каждая запись содержит перечень полей и их значений.

  7. Состав и порядок полей должны совпадать во всех следующих объектах:

    • в схеме данных заголовка файла Avro;

    • в наборе загружаемых записей;

    • во внешней таблице загрузки (поле sys_op должно отсутствовать);

    • в таблице-приемнике данных (поле sys_op должно отсутствовать).

Более подробно про формат данных Avro описано в источнике: https://avro.apache.org/docs/1.10.2/spec.html#Object+Container+Files

Примечание

В загружаемой схеме данных Avro и записях Avro важны порядок и тип полей. Имена полей не сравниваются с именами полей внешней таблицы и таблицы-приемника.

Пример ниже содержит схему данных Avro, используемую для загрузки данных о сотрудниках в таблицу staff.

Для поля date_of_birth указан логический тип Avro, для поля middle_name — элемент union (поле является не обязательным для заполнения, поэтому маркер null выведен в отдельный параметр).

Пример схемы данных Avro:

{
  "name": "staff",
  "type": "record",
  "fields": [
      {
      "name": "id",
      "type": "long"
      },
      {
      "name": "firstname",
      "type": "string"
      },
      {
      "name": "lastname",
      "type": "string"
      },
      {
      "name": "middle_name",
      "type": [
          "null",
          "string"
      ]
      },
      {
      "name": "date_of_birth",
      "type": {
      "logicalType": "timestamp-micros",
      "type": "long"
      }
      },
      {
      "name": "employee_position",
      "type": "string"
      },
      {
      "name": "department_category",
      "type": "long"
      },
      {
      "name": "sys_op",
      "type": "int"
      }
  ]
}

Пример ниже содержит набор записей о сотрудниках, загружаемых в таблицу staff.

Для наглядности примера бинарные данные представлены в JSON-формате.

[
  {
      "id": 1000111,
      "firstname": "Елена",
      "lastname": "Фролова",
      "middle_name": "Андреевна",
      "date_of_birth": 4641084000000000,
      "employee_position": "Менеджер по подбору персонала",
      "department_category": 1,
      "description": "Сотрудники отдела кадров",
      "sys_op": 0
  },
  {
      "id": 1000005,
      "firstname": "Пётр",
      "lastname": "Платонов",
      "middle_name": "",
      "date_of_birth": 5639904000000000,
      "employee_position": "Руководитель отдела кадров",
      "department_category": 1,
      "description": "Сотрудники отдела кадров",
      "sys_op": 0
  }
]

3. Особенности реализации ETL

Функциональные особенности реализованного ETL включают в себя следующие пункты:

  1. Генерация первичных ключей записей, передаваемых для загрузки, производится на стороне источника.

  2. Каждая Avro-структура должна содержать данные только для одной таблицы Витрины.

  3. В Avro-структурах данных источник заполняет тип операции sys_op:

  4. ETL не выполняет преобразования данных, предназначенных для загрузки на Витрину данных.

  5. При выполнении операций, требующих консистентности данных, в рамках одной дельты могут быть только операции одного типа: либо добавления/обновления, либо удаления. При этом в рамках одной дельты первичные ключи всех записей должны быть уникальны.

  6. Не должно быть двух версий одной записи в рамках одной дельты.

  7. Нельзя менять порядок атрибутов в avro-схеме, поскольку данные при загрузке в БД распределяются в соответствии с тем перечнем, который был указан в avro-схеме.

4. Получение токена Рroxy API

Этапы настройки Proxy API включают в себя следующие шаги:

  1. Для начала необходимо пройти авторизацию в Datamart Studio. Сделать это можно, направив запрос ниже:

curl -X POST \
'http://<ip-studio>:8088/api/v1/auth_system' \
-d "username=<username>" \
-d "password=<password>" \
-d "organization_ogrn=<organization_ogrn>" \
-d "datamart_mnemonic=<datamart_mnemonic>"

где:

  • ip-studio — ip-адрес Datamart Studio;

  • username — имя пользователя IAM;

  • password — пароль пользователя IAM;

  • organization_ogrn — ОГРН организации, в рамках которой развернута Витрина данных;

  • datamart_mnemonic — мнемоника Витрины (пример: eduejd##, где ## – номер региона).

Примечание

Безопасность передачи данных по протоколу HTTP обеспечивается защищенной сетью. Требуется обращать внимание на протокол запроса. Если он будет некорректным (например, HTTPS вместо HTTP), то ответ сервера будет содержать ошибку с кодом 500 - InternalServerError.

Пример успешного ответа Datamart Studio на запрос токена представлен ниже:

{
  "access_token": "eyJhbGciOiJSUzI1NiIsInR5cCIgOiAiSldUIiwia2lkIiA6ICI5NGRuNzhOeVF2TjBhRG3NMcUdkc0tTOTNlTWxjNkRwRjZ5V1NXSUo4In0.eyJleHAiOjE2OTMzMTg0NzIsImlhdCI6MTY5MzMxNDg3MiwianRpIjoiYTJkNTQ5NDktZWYwZC00MzA1LWI5OTAtMDIxMTAyZDkzODU2IiwiaXNzIjoiaHR0cHM6Ly9rYy5kYXRhbWFydC5ydS9yZWFsbXMvc3R1ZGlvLWRldiIsInN1YiI6IjE1NjdkYWI3LTc1OTAtNGM0Zi1iNWNhLWYzMmFkOTU1NThjYyIsInR5cCI6IkJlYXJlciIsImF6cCI6InN0dWRpbyIsInNlc3Npb25fc3RhdGUiOiJlMGE0MjJlZC1hNDQxLTQzY2MtYTAxMS01MzNiY2RiNTc5OGQiLCJhY3IiOiIxIiwicmVhbG1fYWNjZXNzIjp7InJvbGVzIjpbInN1cGVyYWRtaW4iLCJhZG1pbiJdfSwic2NvcGUiOiJvcGVuaWQgZW1haWwiLCJzaWQiOiJlMGE0MjJlZC1hNDQxLTQzY2MtYTAxMS01MzNiY2RiNTc5OGQiLCJ1cG4iOiJhZG1pbiIsImVtYWlsX3ZlcmlmaWVkIjp0cnVlLCJhZGRyZXNzIjp7fSwibmFtZSI6ImFkbWluIGFkbWluIiwiZ3JvdXBzIjpbInN1cGVyYWRtaW4iLCJhZG1pbiJdLCJwcmVmZXJyZWRfdXNlcm5hbWUiOiJhZG1pbiIsImdpdmVuX25hbWUiOiJhZG1pbiIsIm9yZ2FuaXphdGlvbl9vZ3JuIjpbIjExMTIyMjMzMzQ0NCIsIjExMTEiXSwiZmFtaWx5X25hbWUiOiJhZG1pbiIsImVtYWlsIjoiYWRtaW5AYWRtaW4uYWRtaW4ifQ.BC3sREXC3nf2LNvBX8SiHKouVGqJVfUBokVJa-B-9YW0zLhnNTs7mGZVOnC-kM-5mWE6bz8du0lvxQqiGpi3HRlAv1eedcGMTf_2TmjhohAaz--zSCdLC5NSmI79r54XYTLORiWKXj5T_AY8efFwWnWgUJ1LEkd5BTQyGSTvaoJkMv7xextA_isx_WoReHC5_-3GznNtcf_hOd2J1CfMHUFjhqMRSxMkIQDPHnspgU6WUz9IeVA1VWKh1GcggqYDtrruigQcl4_f7XeJQKJ49NNVdhjHtywUVbTpEDKYh4FsgAbf3vIPYUVwGWFW0Qm7LgUCpB8UvMQfb4UYZMF4UA",
  "expires_in": 3600,
  "refresh_expires_in": 3600,
  "refresh_token": "eyJhbGciOiJIUzI1NiIsInR5cCIgOiAiSldUIiwia2lkIiA6ICJmYmMzOWNmNi1kZTRkLTQ2YWEtYTAwZi1iZGU3ZmFkNTJmNTgifQ.eyJleHAiOjE2OTMzMTg0NzIsImlhdCI6MTY5MzMxNDg3MiwianRpIjoiYWIxN2M0MzQtZGI4Yi00N2QwLTkyM2YtMTFiYmM3NzBiNmE4IiwiaXNzIjoiaHR0cHM6Ly9rYy5kYXRhbWFydC5ydS9yZWFsbXMvc3R1ZGlvLWRldiIsImF1ZCI6Imh0dHBzOi8va2MuZGF0YW1hcnQucnUvcmVhbG1zL3N0dWRpby1kZXYiLCJzdWIiOiIxNTY3ZGFiNy03NTkwLTRjNGYtYjVjYS1mMzJhZDk1NTU4Y2MiLCJ0eXAiOiJSZWZyZXNoIiwiYXpwIjoic3R1ZGlvIiwic2Vzc2lvbl9zdGF0ZSI6ImUwYTQyMmVkLWE0NDEtNDNjYy1hMDExLTUzM2JjZGI1Nzk4ZCIsInNjb3BlIjoib3BlbmlkIGVtYWlsIiwic2lkIjoiZTBhNDIyZWQtYTQ0MS00M2NjLWEwMTEtNTMzYmNkYjU3OThkIn0.SI6Tb6CJ5HIHwzETOyJZ-2nTLibGNq7JEQwW07fY-2M",
  "token_type": "Bearer",
  "id_token": "eyJhbGciOiJSUzI1NiIsInR5cCIgOiAiSldUIiwia2lkIiA6ICI5NGRuNzhOeVF2TjBhRGtvX3NMcUdkc0tTOTNlTWxjNkRwRjZ5V1NXSUo4In0.eyJleHAiOjE2OTMzMTg0NzIsImlhdCI6MTY5MzMxNDg3MiwiYXV0aF90aW1lIjowLCJqdGkiOiI5NDM1ZDE3NS00MzE2LTQyN2QtODlkYi1lYTJkOWJlZjJmNWIiLCJpc3MiOiJodHRwczovL2tjLmRhdGFtYXJ0LnJ1L3JlYWxtcy9zdHVkaW8tZGV2IiwiYXVkIjoic3R1ZGlvIiwic3ViIjoiMTU2N2RhYjctNzU5MC00YzRmLWI1Y2EtZjMyYWQ5NTU1OGNjIiwidHlwIjoiSUQiLCJhenAiOiJzdHVkaW8iLCJzZXNzaW9uX3N0YXRlIjoiZTBhNDIyZWQtYTQ0MS00M2NjLWEwMTEtNTMzYmNkYjU3OThkIiwiYXRfaGFzaCI6Ik9UZzZoUFQtWjhhWHR1Yjl1VV91LWciLCJhY3IiOiIxIiwic2lkIjoiZTBhNDIyZWQtYTQ0MS00M2NjLWEwMTEtNTMzYmNkYjU3OThkIiwidXBuIjoiYWRtaW4iLCJlbWFpbF92ZXJpZmllZCI6dHJ1ZSwiYWRkcmVzcyI6e30sIm5hbWUiOiJhZG1pbiBhZG1pbiIsImdyb3VwcyI6WyJzdXBlcmFkbWluIiwiYWRtaW4iXSwicHJlZmVycmVkX3VzZXJuYW1lIjoiYWRtaW4iLCJnaXZlbl9uYW1lIjoiYWRtaW4iLCJvcmdhbml6YXRpb25fb2dybiI6WyIxMTEyMjIzMzM0NDQiLCIxMTExIl0sImZhbWlseV9uYW1lIjoiYWRtaW4iLCJlbWFpbCI6ImFkbWluQGFkbWluLmFkbWluIn0.K372NBffA3xtsxyM3hixr5GF1ouHNdr8DFMBYnGQ-t-REbYcwOymvs-D-HYEsmaUhkCWjKSeLM9taLmAPloSt2hb8xN_VG4s-gc_yvGs_aHkUehTqddjGMislPyAlydzCaDVxQ5Px-TplsDzIAwm5P0V23LDU3qwnVxVR7P3Dbxi5YB84_38zjClNrDWt9YOxnPMCzDT4fnBjGXkDcQZoHo5jsbFP_K5ymugsYEumKIZyekbY_l_A-XkRcTM6SMTRhKLAvQ3lq2YguLm2LFF3e-PrGsaEymeS-Peuff5qJw5Vf9r3_nD3APCivVc0Kunl8miRpr3lsZgSAp-xi3Jow",
  "not-before-policy": 0,
  "session_state": "e0a422ed-a441-43cc-a011-533bcdb5798d",
  "scope": "openidemail"
}
  1. С полученным токеном послать запрос для подключения к API инсталляции Компонента «Витрина данных» для выбранной организации:

curl -X <method>
'http://<ip-studio>:8088/api/v1/secure/<organization_ogrn>/<datamart_mnemonic>/<installation_name>/<installation_id>/<request_path>' \
-H "Authorization: Bearer <access_token>" \
-H "<headers>" \
-d "<data>"

где:

  • ip-studio — ip-адрес Datamart Studio;

  • organization_ogrn — ОГРН организации, в рамках которой развернута Витрина данных;

  • datamart_mnemonic — мнемоника Витрины (пример: eduejd##, где ## – номер региона);

  • installation_name — имя инсталляции в целевой Витрине;

  • installation_id — идентификатор инсталляции (присутствует в ее названии);

  • request_path — URI оригинального API инсталляции;

  • access_token — токен Proxy API;

  • headers — заголовки запроса;

  • data — данные запроса.

5. Загрузка и удаление данных через DTM-Uploader

Сначала источник данных формирует и передает файлы по REST API, которые накапливаются Компонентом ETL. Далее данные загружаются и вставляются на Витрину данных через внешние таблицы, или удаляются. Статусы обработки каждой операции необходимо отслеживать через Endpoint /status.

Информация о каждом шаге процесса содержится в подразделах ниже.

5.1. Начало операции загрузки / удаления согласованных данных (Endpoint – newDelta)

Под Endpoint’ом /newDelta регистрируется новая порция данных. Для того чтобы начать работу с данными, источнику данных необходимо сгенерировать UUID (идентификатор для новой порции данных) и вставить его в запрос c Endpoint’ом /newDelta.

Согласованные данные – это данные для нескольких таблиц, которые должны попасть на Витрину данных за одну операцию вставки (то есть в одной дельте), а значит будут доступны для потребителя одномоментно (такой способ загрузки актуален, когда необходимо обновить данные).

Пример набора данных, который будет загружен или удален в рамках одной дельты представлен ниже:

{
  "requestId": "6B29FC40-CA47-1067-B31D-00DD010662DA",
  "dataSetName": ["product", "stock"]
}

где:

  • requestId — идентификатор порции изменений (дельты);

  • dataSetName — массив имен набора данных (product и stock - имена таблиц).

Запрос с Endpoint’ом /newDelta будет иметь вид:

curl -X POST "http://<ip-studio>:8088/api/v1/secure/<organization_ogrn>/<datamart_mnemonic>/<installation_name>/<installation_id>/newDelta" -H "Authorization: Bearer <access_token>" -H 'Content-Type: application/json' -d '{"requestId": "6B29FC40-CA47-1067-B31D-00DD010662DA", "dataSetName": ["product", "stock"]}'

где:

  • requestId — идентификатор порции изменений (дельты), который был ранее сгенерирован;

  • dataSetName — имя набора данных (имена таблиц).

Примечание

Данные для обновления/вставки и для удаления должны быть отдельно зарегистрированы в newDelta с разными requestId. Если требуется обновить/вставить данные, то сначала нужно зарегистрировать порцию данных через newDelta с requestId, затем прислать данные с зарегистрированным requestId через endpoint /partOfDelta (описан в Загрузка / удаление согласованных данных (Endpoint – partOfDelta)). Допускается передача как в одном файле одним запросом, так и в двух файлах двумя запросами - это не имеет значения. Главное условие - нужно отправлять только данные на обновление и вставку, и они должны быть уникальны по первичному ключу. Также в них не должно быть одинаковых записей с одним первичным ключом. Пример: отправлены два обновления одной записи (с одинаковым первичным ключом). В этом случае в хранилище Prostore попадет только одна запись, и неизвестно какая, поэтому дублей по первичному ключу отправленных с одним requestId быть не должно!

Чтобы проверить статус выполнения запроса, необходимо направить запрос с Endpoint’ом /status (пример запроса описан в Проверка статусной информации по загрузке / удалению данных (Endpoint – status))

Если обработка запроса завершится успешно, то Витрина данных вернет JSON-ответ, содержащий статус-сообщение об успешной операции:

{
    "requestId": "6B29FC40-CA47-1067-B31D-00DD010662DA",
    "dataSets": [
    "product",
        "stock"
    ],
    "inDeltaFlag": true,
    "statusCode": "SUCCESS",
    "statusMessage": "Загрузка порции данных успешно завершена"
    "errors": []
}

где:

  • requestId — идентификатор порции изменений (дельты);

  • statusCode — код возвращаемого статуса (SUCCESS – запрос выполнен успешно).

Пример JSON-ответа на проверку статуса, завершившегося ошибкой, приведен ниже:

{
    "requestId": "6B29FC40-CA47-1067-B31D-00DD010662DA",
    "statusCode": "REQUESTID_ALREADY_EXIST",
    "statusMessage": "Дельта с requestId = 6B29FC40-CA47-1067-B31D-00DD010662DA уже существует. Статус загрузки дельты: SUCCESS"
}

где:

  • requestId — идентификатор порции изменений (дельты) который необходимо проверить;

  • statusCode — код возвращаемого статуса (REQUESTID_ALREADY_EXIST – идентификатор уже существует в БД);

  • statusMessage — сообщение с описанием кода ошибки.

Примечание

Для удаления записей необходимо зарегистрировать новую дельту во Endpoint’е /newDelta с новым requestId, и по зарегистрированному requestId должны быть присланы только данные на удаление.

5.2. Загрузка / удаление согласованных данных (Endpoint – partOfDelta)

На данном шаге выполняется накапливание порции данных, создается вставка на Витрину данных по Endpoint’у isLastChunk.

Примечание

Если в процессе загрузки вызван метод newDelta, то текущая загрузка будет прервана и порция не попадет на Витрину данных.

Чтобы отправить последнюю порцию данных для таблицы product, необходимо направить запрос с Endpoint’ом /partOfDelta с указанием dataSetName=product и isLastChunk=true (что означает что данная порция данных - последняя). Обработка и загрузка данных не начнется, пока не будет направлен такой же запрос, но уже по таблице stock: dataSetName=stock, isLastChunk=true.

Пример запроса на загрузку данных под ранее созданный набор данных:

curl -X POST "http://<ip-studio>:8088/api/v1/secure/<organization_ogrn>/<datamart_mnemonic>/<installation_name>/<installation_id>/partOfDelta" -H "Authorization: Bearer <access_token>" -F upload=@"./product.avro" -F dataSetName=product -F chunkNumber=0 -F isLastChunk=false -F requestId=a6212a7d-4526-4e2d-89a7-9828f380c91d

где:

  • upload — загружаемый avro-файл (пример avro-файла с данными представлен в разделе 2);

  • dataSetName — имя набора данных (имя таблицы);

  • chunkNumber — номер порции dataSet в рамках дельты;

  • isLastChunk — флаг последней порции dataSet;

  • requestId — идентификатор порции изменений (дельты).

В результате успешной загрузки при проверке статуса requestId (пример запроса по Endpoint’у /status представлен в Проверка статусной информации по загрузке / удалению данных (Endpoint – status)) на запрос Витрина данных вернет JSON-ответ, содержащий статус-сообщение об успешной операции.

Пример успешной загрузки:

{
    "requestId": "f3947645-88c8-4044-bd8b-de273f8a8461",
    "statusCode": "SUCCESS",
    "statusMessage": "Порция получена."
}

где:

  • requestId — идентификатор порции изменений (дельты);

  • statusCode — статус код результата запроса (SUCCESS - запрос выполнен успешно);

  • statusMessage — описание статусного сообщения.

Если загрузка прервалась ошибкой, то при проверке статуса requestId (пример запроса по Endpoint’у /status представлен в Проверка статусной информации по загрузке / удалению данных (Endpoint – status)) на запрос Витрина данных вернет JSON-ответ с описанием ошибки.

Пример загрузки, прерванной ошибкой:

{
    "requestId": "aef2f195-b037-4aaa-b171-f2746511e7e2",
    "dataSets": [
        "stock"
    ],
    "inDeltaFlag": true,
    "statusCode": "ERROR",
    "statusMessage": "Произошла ошибка"
    "errors": [
        {
            "dataSet": "stock",
            "errorType": "INSERT",
            "message": "Ошибка вставки в таблицы: stock"
        }
    ]
}

где:

  • requestId — идентификатор порции изменений (дельты);

  • dataSets — массив имен набора данных (имен таблиц где была допущена ошибка);

  • inDeltaFlag = true — загрузка согласованных данных производилась через endpoint /partOfDetla;

  • status — статус код результата запроса (ERROR – внутренняя ошибка);

  • statusMessage — описание статусного сообщения;

  • errors — массив, ошибки загрузки или парсинга входящих данных;

  • dataSet — название таблицы где допущена ошибка;

  • errorType — тип ошибки;

  • message — описание ошибки.

Примечание

Возможна ситуация, когда после падения ETL приходит запрос с requestId, который был до падения, в данном случае Витрина данных возвращает ошибку со статусом NOT_FOUND. Необходимо снова направить запрос по Endpoint’у /newDelta с новым requestId и начать процесс загрузки заново.

5.3. Загрузка / удаление несогласованных данных (Endpoint – data)

Для загрузки несогласованных данных поддерживается возможность накапливания данных, аналогично загрузке согласованных данных, описанной в Загрузка / удаление согласованных данных (Endpoint – partOfDelta).

Несогласованные данные – могут быть вставлены в разных дельтах и будут доступны потребителю постепенно по мере загрузки. Этот способ подходит для первоначальной загрузки, когда еще нет потребителей.

Вставка на Витрину данных выполнится после накопления порции или по флагу isLast, который используется для последней порции данных. Флаг isLast подает сигнал для завершения формирования дельты, для того чтобы выполнить вставку накопленных данных и закрыть транзакцию.

Пример запроса:

curl -X POST "http://<ip-studio>:8088/api/v1/secure/<organization_ogrn>/<datamart_mnemonic>/<installation_name>/<installation_id>/data" -H "Authorization: Bearer <access_token>" -F upload=@"./product.avro" -F dataSetName=product -F isLast=false -F requestId=a6212a7d-4526-4e2d-89a7-9828f380c91d

где:

  • upload — загружаемый avro-файл (пример avro-файла с данными представлен в Основные требования к исходным файлам);

  • dataSetName — имя набора данных (имя таблицы);

  • isLast — флаг последней порции данных (сигнал для завершения формирования дельты, для того чтобы выполнить вставку накопленных данных и закрыть транзакцию.);

  • requestId — идентификатор порции изменений (дельты).

В результате успешной операции при проверке статуса запроса по Endpoint’у /status (пример запроса описан в Проверка статусной информации по загрузке / удалению данных (Endpoint – status)) Витрина данных вернет JSON-ответ, содержащий статус-сообщение:

{
    "requestId": "6B29FC40-CA47-1067-B31D-00DD010662DA",
    "statusCode": "SUCCESS",
    "statusMessage": "Порция получена."
}

где:

  • requestId — идентификатор порции изменений (дельты);

  • statusCode — статус код результата запроса (SUCCESS - запрос выполнен успешно);

  • statusMessage — описание статусного сообщения.

Если загрузка прервалась ошибкой, то при проверке requestId (пример запроса по Endpoint’у /status представлен в Проверка статусной информации по загрузке / удалению данных (Endpoint – status)) Витрина данных вернет JSON-ответ с описанием ошибки.

Пример JSON-ответа:

{
    "requestId": "6B29FC40-CA47-1067-B31D-00DD010662DA",
    "statusCode": "NOT_FOUND",
    "statusMessage": "Не найдена дельта с requestId = 6B29FC40-CA47-1067-B31D-00DD010662DA"
}

где:

  • requestId — идентификатор порции изменений (дельты);

  • statusCode — статус код результата запроса (NOT_FOUND - данные по requestId были утеряны в результате остановки сервиса, необходимо зарегистрировать новую дельту и снова загрузить данные);

  • statusMessage — описание статусного сообщения.

5.4. Описание возвращаемых кодов

Таблица 5.6 Описание возвращаемых кодов

Наименование

Код

Описание

EMPTY_ATTACHMENT

400

Нет файла вложения

ERROR

500

Внутренняя ошибка

NOT_FOUND

400

Данные не найдены, либо были утеряны в результате остановки сервиса

PROCESSING

400

Идет обработка данных

REQUESTID_ALREADY_EXIST

400

requestId уже зарегистрирован

SUCCESS

200

Успешное выполнение

UNREGISTERED_DATASETNAME

400

Незарегистрированный набор данных

WRONG_ENDPOINT

400

requestId зарегистрирован для другого Endpoint’а

6. Проверка статусной информации по загрузке / удалению данных (Endpoint – status)

В данном разделе производится проверка статусной информации из сервисных таблиц по requestId.

Пример запроса:

Curl -X GET "http://<ip-studio>:8088/api/v1/secure/<organization_ogrn>/<datamart_mnemonic>/<installation_name>/<installation_id>/status/<requestId>" -H "Authorization: Bearer <access_token>"

где:

  • requestId — UUID идентификатор порции изменений (дельты).

Пример ответа на такой запрос представлен ниже.

{
    "requestId": "13f2475e-f3dc-4c9e-b2f6-3a98320261f1",
    "inDeltaFlag": false,
    "dataSets": [
        "stock"
    ],
    "status": "ERROR",
    "statusMessage": "Произошла ошибка",
    "errors": [
        {
            "dataSet": "stock",
            "errorType": "PARCING",
            "message": "Неверно указан тип поля count_pieces: LONG. Ожидается: INTEGER"
        },
        {
            "dataSet": "stock",
            "errorType": "PARCING",
            "message": "Неверно указан тип поля product_id: LONG. Ожидается: INTEGER"
        }
    ]
}

где:

  • requestId — UUID идентификатор порции изменений (дельты);

  • inDeltaFlag = false — загрузка несогласованных данных производилась через endpoint /data;

  • dataSets — массив имен набора данных (имен таблиц где была допущена ошибка);

  • status — статус код результата запроса (NOT_FOUND, PROCESSING, ERROR, SUCCESS);

  • statusMessage — описание статусного сообщения;

  • errors — массив, ошибки загрузки или парсинга входящих данных;

  • dataSet — название таблицы где допущена ошибка;

  • errorType — тип ошибки;

  • message — описание ошибки.

7. Работа с вложениями через S3

7.1. Загрузка данных в хранилище (Endpoint – uploadAttachment)

Перед загрузкой источнику данных необходимо сгенерировать UUID (идентификатор для новой порции данных) и вставить его в запрос с Endpoint’ом /uploadAttachment. При совпадении имен вложений в хранилище, вложение перезаписывается. Пример запроса на загрузку вложения в хранилище представлен ниже:

curl -X POST "http://<ip-studio>:8088/api/v1/secure/<organization_ogrn>/<datamart_mnemonic>/<installation_name>/<installation_id>/uploadAttachment" -H "Authorization: Bearer <access_token>" -F upload=@"document.pdf" -F requestId=13f2475e-f3dc-4c9e-b2f6-3a98320261f1 -F name=Doc_1

где:

  • upload — путь до загружаемого файла-вложения;

  • requestId — UUID идентификатор запроса;

  • name — уникальное имя вложения.

После успешной загрузки при проверке по Endpoint’у /status (пример запроса описан в Проверка статусной информации по загрузке / удалению данных (Endpoint – status)) Витрина данных вернет JSON-ответ, содержащий статус-сообщение:

{
    "requestId": "13f2475e-f3dc-4c9e-b2f6-3a98320261f1",
    "statusCode": "UPDATED",
    "statusMessage": "Файл Doc_1 успешно обновлен."
}

где:

  • requestId — UUID идентификатор запроса;

  • statusCode — статус код результата запроса (SUCCESS - запрос выполнен успешно; UPDATED – данные обновлены);

  • statusMessage — описание статусного сообщения.

Пример неуспешной загрузки после проверки по endpoint’у /status (пример запроса описан в Проверка статусной информации по загрузке / удалению данных (Endpoint – status)) представлен ниже:

{
    "requestId": "13f2475e-f3dc-4c9e-b2f6-3a98320261f1",
    "statusCode": "ERROR",
    "statusMessage": "Произошла ошибка"
}

где:

  • requestId — UUID идентификатор запроса;

  • statusCode — статус код результата запроса (ERROR – запрос завершился ошибкой);

  • statusMessage — описание статусного сообщения.

7.2. Удаление данных из хранилища (Endpoint – deleteAttachment)

Для того чтобы удалить вложения из хранилища S3 необходимо направить следующий запрос:

curl -X DELETE "http://<ip-studio>:8088/api/v1/secure/<organization_ogrn>/<datamart_mnemonic>/<installation_name>/<installation_id>/deleteAttachment/Doc_1/requestId/<requestId>" -H "Authorization: Bearer <access_token>"

где:

  • requestId — UUID идентификатор запроса.

В результате успешного удаления Витрина данных вернет JSON-ответ, содержащий статус-сообщение:

{
    "requestId": "13f2475e-f3dc-4c9e-b2f6-3a98320261f1",
    "statusCode": "SUCCESS",
    "statusMessage": "Файл Doc_1 успешно удален."
}

где:

  • requestId — UUID идентификатор запроса;

  • statusCode — статус код результата запроса (SUCCESS - запрос выполнен успешно);

  • statusMessage — описание статусного сообщения.

Если удаление завершилось ошибкой, то Витрина данных вернет JSON-ответ c кодом ошибки:

{
    "requestId": "13f2475e-f3dc-4c9e-b2f6-3a98320261f1",
    "statusCode": "NOT_FOUND",
    "statusMessage": "Файл Doc_1 не найден."
}

где:

  • requestId — UUID идентификатор запроса;

  • statusCode — статус код результата запроса (NOT_FOUND);

  • statusMessage — описание статусного сообщения.

7.3. Описание возвращаемых кодов

Таблица 7.2 Описание возвращаемых кодов

Наименование

Код

Описание

EMPTY_ATTACHMENT

400

Нет файла вложения

ERROR

500

Внутренняя ошибка

SUCCESS

200

Успешное выполнение

UPDATED

200

Данные обновлены

8. Маппинг данных (Endpoint – generateMapping)

В данном разделе описывается генерация файла маппинга. Endpoint предназначен для первичной настройки, а также перенастройки сервиса в случае изменения модели данных Витрины.

Файл маппинга генерируется источником данных в формате Kotlin-script. В файле описана модель данных Витрины данных в виде структуры объектов Kotlin (table, column). Объекты table описывают таблицы, каждый из них содержит имя таблицы и список колонок в том порядке, в котором они созданы на Витрине. Объекты column описывают колонки, каждый из них содержит имя колонки, тип данных, признак обязательности (nullable), признак первичного ключа.

Файл используется сервисом для описания модели данных и валидации входящих данных. Выполняются следующие проверки:

  • проверяется соответствие состава полей входящей avro-структуры составу полей, описанных в файле маппинга;

  • проверяется соответствие порядка полей входящей avro-структуры порядку полей, описанных в файле маппинга;

  • проверяется соответствие типов данных полей входящей avro-структуры типам полей, описанных в файле маппинга. Для полей с установленным признаком обязательности (nullable = false) выполняется проверка на null.

При вызове Endpoint’а /generateMapping сервис генерирует файл на основе информации о модели, полученной из развернутой Витрины данных. Файл складывается сервисом на диск, а также возвращается в ответе на вызов.

Пример запроса на генерацию маппинга представлен ниже:

curl -X GET "http://<ip-studio>:8088/api/v1/secure/<organization_ogrn>/<datamart_mnemonic>/<installation_name>/<installation_id>/generateMapping/aef2f195-0001-4aaa-b171-f2746511e889" -H "Authorization: Bearer <access_token>"

Результат Витрина данных вернет в формате Kotlin-script:

import ru.supercode.mapping.common.ColumnType.*
import ru.supercode.mapping.mapper.dsl.mappingAvro
mappingAvro {
    table("product") {
        column("id", INTEGER) { nullable = false; primary = true; }
        column("name", STRING) { nullable = false; }
    }
    table("stock") {
        column("product_id", INTEGER) { nullable = false; primary = true; }
        column("count_pieces", INTEGER) { nullable = false; }
    }
}

9. Валидация данных

Валидация порции данных производится в момент обработки и вставки.

Примечание

Помимо валидации данных осуществляется валидация параметров запроса. Во всех Endpoint’ах requestId должен быть в формате UUID.

В случае ошибок валидации результат будет возвращен при вызове Endpoint’а /status.

Ошибки, возникающие в процессе обработки Endpoint’а /newDelta:

  • отклоняются запросы, которые получены в момент обработки порции данных (statusCode: PROCESSED);

  • если пустой параметр dataSetName;

  • прислан запрос с уже зарегистрированным requestId и statusCode данного requestId не равен NOT_FOUND или WAIT_DATA.

Ошибки, возникающие в процессе обработки Endpoint’а /partOfDelta:

  • прислан запрос с незарегистрированным requestId;

  • прислан запрос с уже зарегистрированным requestId и statusCode данного requestId не равен NOT_FOUND или WAIT_DATA;

  • прислан запрос с requestId зарегистрированным для Endpoint’а /data;

  • прислан запрос с параметром dataSetName, который не был зарегистрирован в Endpoint’е /newDelta;

  • нет файла вложения в параметре upload.

Ошибки, возникающие в процессе обработки Endpoint’а /data:

  • отклоняются запросы, которые получены в момент обработки порции данных (statusCode: PROCESSED);

  • прислан запрос с незарегистрированным requestId;

  • прислан запрос с уже зарегистрированным requestId и statusCode данного requestId не равен NOT_FOUND или WAIT_DATA;

  • прислан запрос с requestId зарегистрированным для Endpoint’а /partOfDelta;

  • нет файла вложения в параметре upload.

Ошибки, возникающие в процессе обработки Endpoint’а /uploadAttachment:

  • нет файла вложения в параметре upload.

Ошибки, возникающие в процессе обработки Endpoint’а /generateMapping:

  • не созданы логические таблицы в схеме.