.. _etl_config: Основные требования к исходным файлам ####################################### Загрузка данных в систему производится в виде файлов по HTTP, каждый из которых имеет структуру, представленную на :ref:`message_structure`. .. _message_structure: .. figure:: /_static/img/modules/etl/message_structure.png :align: center :alt: Структура загружаемых сообщений Структура загружаемых сообщений Для успешной загрузки данные должны соответствовать следующим условиям: 1. Тело сообщения представляет собой файл Avro (Object Container File), который состоит из заголовка и блоков данных. 2. Заголовок файла содержит схему данных Avro. 3. Схема данных содержит следующие элементы: - имя; - тип “record”; - перечень полей. 4. Последним полем схемы должно быть указано служебное поле ``sys_op`` с типом данных ``avro.int``. 5. Каждый блок данных содержит запись, представленную в бинарной кодировке. 6. Каждая запись содержит перечень полей и их значений. 7. Состав и порядок полей должны совпадать во всех следующих объектах: - в схеме данных заголовка файла Avro; - в наборе загружаемых записей; - во внешней таблице загрузки (поле ``sys_op`` должно отсутствовать); - в таблице-приемнике данных (поле ``sys_op`` должно отсутствовать). Более подробно про формат данных Avro описано в источнике: https://avro.apache.org/docs/1.10.2/spec.html#Object+Container+Files .. note:: В загружаемой схеме данных Avro и записях Avro важны порядок и тип полей. Имена полей не сравниваются с именами полей внешней таблицы и таблицы-приемника. Пример ниже содержит схему данных Avro, используемую для загрузки данных о сотрудниках в таблицу ``staff``. Для поля ``date_of_birth`` указан логический тип Avro, для поля ``middle_name`` — элемент union (поле является не обязательным для заполнения, поэтому маркер ``null`` выведен в отдельный параметр). Пример схемы данных Avro: .. code-block:: json { "name": "staff", "type": "record", "fields": [ { "name": "id", "type": "long" }, { "name": "firstname", "type": "string" }, { "name": "lastname", "type": "string" }, { "name": "middle_name", "type": [ "null", "string" ] }, { "name": "date_of_birth", "type": { "logicalType": "timestamp-micros", "type": "long" } }, { "name": "employee_position", "type": "string" }, { "name": "department_category", "type": "long" }, { "name": "sys_op", "type": "int" } ] } Пример ниже содержит набор записей о сотрудниках, загружаемых в таблицу ``staff``. Для наглядности примера бинарные данные представлены в JSON-формате. .. code-block:: json [ { "id": 1000111, "firstname": "Елена", "lastname": "Фролова", "middle_name": "Андреевна", "date_of_birth": 4641084000000000, "employee_position": "Менеджер по подбору персонала", "department_category": 1, "description": "Сотрудники отдела кадров", "sys_op": 0 }, { "id": 1000005, "firstname": "Пётр", "lastname": "Платонов", "middle_name": "", "date_of_birth": 5639904000000000, "employee_position": "Руководитель отдела кадров", "department_category": 1, "description": "Сотрудники отдела кадров", "sys_op": 0 } ] .. _etl_implementation: Особенности реализации ETL ############################# Функциональные особенности реализованного ETL включают в себя следующие пункты: 1. Генерация первичных ключей записей, передаваемых для загрузки, производится на стороне источника. 2. Каждая Avro-структура должна содержать данные только для одной таблицы Витрины. 3. В Avro-структурах данных источник заполняет тип операции ``sys_op``: - 0 – для добавления новой или обновления существующей записи; - 1 – для удаления существующей записи (см. пример записей Avro в :ref:`etl_config`). 4. ETL не выполняет преобразования данных, предназначенных для загрузки на Витрину данных. 5. При выполнении операций, требующих консистентности данных, в рамках одной дельты могут быть только операции одного типа: либо добавления/обновления, либо удаления. При этом в рамках одной дельты первичные ключи всех записей должны быть уникальны. 6. Не должно быть двух версий одной записи в рамках одной дельты. 7. Нельзя менять порядок атрибутов в avro-схеме, поскольку данные при загрузке в БД распределяются в соответствии с тем перечнем, который был указан в avro-схеме. .. _token_get: Получение токена Рroxy API ############################# Этапы настройки Proxy API включают в себя следующие шаги: 1. Для начала необходимо пройти авторизацию в Datamart Studio. Сделать это можно, направив запрос ниже: .. code-block:: curl -X POST \ 'http://:8088/api/v1/auth_system' \ -d "username=" \ -d "password=" \ -d "organization_ogrn=" \ -d "datamart_mnemonic=" где: - ``ip-studio`` — ip-адрес Datamart Studio; - ``username`` — имя пользователя IAM; - ``password`` — пароль пользователя IAM; - ``organization_ogrn`` — ОГРН организации, в рамках которой развернута Витрина данных; - ``datamart_mnemonic`` — мнемоника Витрины (пример: eduejd##, где ## – номер региона). .. note:: Безопасность передачи данных по протоколу HTTP обеспечивается защищенной сетью. Требуется обращать внимание на протокол запроса. Если он будет некорректным (например, HTTPS вместо HTTP), то ответ сервера будет содержать ошибку с кодом 500 - InternalServerError. Пример успешного ответа Datamart Studio на запрос токена представлен ниже: .. code-block:: { "access_token": "eyJhbGciOiJSUzI1NiIsInR5cCIgOiAiSldUIiwia2lkIiA6ICI5NGRuNzhOeVF2TjBhRG3NMcUdkc0tTOTNlTWxjNkRwRjZ5V1NXSUo4In0.eyJleHAiOjE2OTMzMTg0NzIsImlhdCI6MTY5MzMxNDg3MiwianRpIjoiYTJkNTQ5NDktZWYwZC00MzA1LWI5OTAtMDIxMTAyZDkzODU2IiwiaXNzIjoiaHR0cHM6Ly9rYy5kYXRhbWFydC5ydS9yZWFsbXMvc3R1ZGlvLWRldiIsInN1YiI6IjE1NjdkYWI3LTc1OTAtNGM0Zi1iNWNhLWYzMmFkOTU1NThjYyIsInR5cCI6IkJlYXJlciIsImF6cCI6InN0dWRpbyIsInNlc3Npb25fc3RhdGUiOiJlMGE0MjJlZC1hNDQxLTQzY2MtYTAxMS01MzNiY2RiNTc5OGQiLCJhY3IiOiIxIiwicmVhbG1fYWNjZXNzIjp7InJvbGVzIjpbInN1cGVyYWRtaW4iLCJhZG1pbiJdfSwic2NvcGUiOiJvcGVuaWQgZW1haWwiLCJzaWQiOiJlMGE0MjJlZC1hNDQxLTQzY2MtYTAxMS01MzNiY2RiNTc5OGQiLCJ1cG4iOiJhZG1pbiIsImVtYWlsX3ZlcmlmaWVkIjp0cnVlLCJhZGRyZXNzIjp7fSwibmFtZSI6ImFkbWluIGFkbWluIiwiZ3JvdXBzIjpbInN1cGVyYWRtaW4iLCJhZG1pbiJdLCJwcmVmZXJyZWRfdXNlcm5hbWUiOiJhZG1pbiIsImdpdmVuX25hbWUiOiJhZG1pbiIsIm9yZ2FuaXphdGlvbl9vZ3JuIjpbIjExMTIyMjMzMzQ0NCIsIjExMTEiXSwiZmFtaWx5X25hbWUiOiJhZG1pbiIsImVtYWlsIjoiYWRtaW5AYWRtaW4uYWRtaW4ifQ.BC3sREXC3nf2LNvBX8SiHKouVGqJVfUBokVJa-B-9YW0zLhnNTs7mGZVOnC-kM-5mWE6bz8du0lvxQqiGpi3HRlAv1eedcGMTf_2TmjhohAaz--zSCdLC5NSmI79r54XYTLORiWKXj5T_AY8efFwWnWgUJ1LEkd5BTQyGSTvaoJkMv7xextA_isx_WoReHC5_-3GznNtcf_hOd2J1CfMHUFjhqMRSxMkIQDPHnspgU6WUz9IeVA1VWKh1GcggqYDtrruigQcl4_f7XeJQKJ49NNVdhjHtywUVbTpEDKYh4FsgAbf3vIPYUVwGWFW0Qm7LgUCpB8UvMQfb4UYZMF4UA", "expires_in": 3600, "refresh_expires_in": 3600, "refresh_token": "eyJhbGciOiJIUzI1NiIsInR5cCIgOiAiSldUIiwia2lkIiA6ICJmYmMzOWNmNi1kZTRkLTQ2YWEtYTAwZi1iZGU3ZmFkNTJmNTgifQ.eyJleHAiOjE2OTMzMTg0NzIsImlhdCI6MTY5MzMxNDg3MiwianRpIjoiYWIxN2M0MzQtZGI4Yi00N2QwLTkyM2YtMTFiYmM3NzBiNmE4IiwiaXNzIjoiaHR0cHM6Ly9rYy5kYXRhbWFydC5ydS9yZWFsbXMvc3R1ZGlvLWRldiIsImF1ZCI6Imh0dHBzOi8va2MuZGF0YW1hcnQucnUvcmVhbG1zL3N0dWRpby1kZXYiLCJzdWIiOiIxNTY3ZGFiNy03NTkwLTRjNGYtYjVjYS1mMzJhZDk1NTU4Y2MiLCJ0eXAiOiJSZWZyZXNoIiwiYXpwIjoic3R1ZGlvIiwic2Vzc2lvbl9zdGF0ZSI6ImUwYTQyMmVkLWE0NDEtNDNjYy1hMDExLTUzM2JjZGI1Nzk4ZCIsInNjb3BlIjoib3BlbmlkIGVtYWlsIiwic2lkIjoiZTBhNDIyZWQtYTQ0MS00M2NjLWEwMTEtNTMzYmNkYjU3OThkIn0.SI6Tb6CJ5HIHwzETOyJZ-2nTLibGNq7JEQwW07fY-2M", "token_type": "Bearer", "id_token": "eyJhbGciOiJSUzI1NiIsInR5cCIgOiAiSldUIiwia2lkIiA6ICI5NGRuNzhOeVF2TjBhRGtvX3NMcUdkc0tTOTNlTWxjNkRwRjZ5V1NXSUo4In0.eyJleHAiOjE2OTMzMTg0NzIsImlhdCI6MTY5MzMxNDg3MiwiYXV0aF90aW1lIjowLCJqdGkiOiI5NDM1ZDE3NS00MzE2LTQyN2QtODlkYi1lYTJkOWJlZjJmNWIiLCJpc3MiOiJodHRwczovL2tjLmRhdGFtYXJ0LnJ1L3JlYWxtcy9zdHVkaW8tZGV2IiwiYXVkIjoic3R1ZGlvIiwic3ViIjoiMTU2N2RhYjctNzU5MC00YzRmLWI1Y2EtZjMyYWQ5NTU1OGNjIiwidHlwIjoiSUQiLCJhenAiOiJzdHVkaW8iLCJzZXNzaW9uX3N0YXRlIjoiZTBhNDIyZWQtYTQ0MS00M2NjLWEwMTEtNTMzYmNkYjU3OThkIiwiYXRfaGFzaCI6Ik9UZzZoUFQtWjhhWHR1Yjl1VV91LWciLCJhY3IiOiIxIiwic2lkIjoiZTBhNDIyZWQtYTQ0MS00M2NjLWEwMTEtNTMzYmNkYjU3OThkIiwidXBuIjoiYWRtaW4iLCJlbWFpbF92ZXJpZmllZCI6dHJ1ZSwiYWRkcmVzcyI6e30sIm5hbWUiOiJhZG1pbiBhZG1pbiIsImdyb3VwcyI6WyJzdXBlcmFkbWluIiwiYWRtaW4iXSwicHJlZmVycmVkX3VzZXJuYW1lIjoiYWRtaW4iLCJnaXZlbl9uYW1lIjoiYWRtaW4iLCJvcmdhbml6YXRpb25fb2dybiI6WyIxMTEyMjIzMzM0NDQiLCIxMTExIl0sImZhbWlseV9uYW1lIjoiYWRtaW4iLCJlbWFpbCI6ImFkbWluQGFkbWluLmFkbWluIn0.K372NBffA3xtsxyM3hixr5GF1ouHNdr8DFMBYnGQ-t-REbYcwOymvs-D-HYEsmaUhkCWjKSeLM9taLmAPloSt2hb8xN_VG4s-gc_yvGs_aHkUehTqddjGMislPyAlydzCaDVxQ5Px-TplsDzIAwm5P0V23LDU3qwnVxVR7P3Dbxi5YB84_38zjClNrDWt9YOxnPMCzDT4fnBjGXkDcQZoHo5jsbFP_K5ymugsYEumKIZyekbY_l_A-XkRcTM6SMTRhKLAvQ3lq2YguLm2LFF3e-PrGsaEymeS-Peuff5qJw5Vf9r3_nD3APCivVc0Kunl8miRpr3lsZgSAp-xi3Jow", "not-before-policy": 0, "session_state": "e0a422ed-a441-43cc-a011-533bcdb5798d", "scope": "openidemail" } 2. С полученным токеном послать запрос для подключения к API инсталляции Компонента «Витрина данных» для выбранной организации: .. code-block:: curl -X 'http://:8088/api/v1/secure/////' \ -H "Authorization: Bearer " \ -H "" \ -d "" где: - ``ip-studio`` — ip-адрес Datamart Studio; - ``organization_ogrn`` — ОГРН организации, в рамках которой развернута Витрина данных; - ``datamart_mnemonic`` — мнемоника Витрины (пример: eduejd##, где ## – номер региона); - ``installation_name`` — имя инсталляции в целевой Витрине; - ``installation_id`` — идентификатор инсталляции (присутствует в ее названии); - ``request_path`` — URI оригинального API инсталляции; - ``access_token`` — токен Proxy API; - ``headers`` — заголовки запроса; - ``data`` — данные запроса. .. _etl_load: Загрузка и удаление данных через DTM-Uploader ################################################# Сначала источник данных формирует и передает файлы по REST API, которые накапливаются Компонентом ETL. Далее данные загружаются и вставляются на Витрину данных через внешние таблицы, или удаляются. Статусы обработки каждой операции необходимо отслеживать через Endpoint ``/status``. Информация о каждом шаге процесса содержится в подразделах ниже. .. _start_load_agreed_data: Начало операции загрузки / удаления согласованных данных (Endpoint – newDelta) *********************************************************************************** Под Endpoint’ом ``/newDelta`` регистрируется новая порция данных. Для того чтобы начать работу с данными, источнику данных необходимо сгенерировать UUID (идентификатор для новой порции данных) и вставить его в запрос c Endpoint’ом ``/newDelta``. Согласованные данные – это данные для нескольких таблиц, которые должны попасть на Витрину данных за одну операцию вставки (то есть в одной дельте), а значит будут доступны для потребителя одномоментно (такой способ загрузки актуален, когда необходимо обновить данные). Пример набора данных, который будет загружен или удален в рамках одной дельты представлен ниже: .. code-block:: json { "requestId": "6B29FC40-CA47-1067-B31D-00DD010662DA", "dataSetName": ["product", "stock"] } где: - ``requestId`` — идентификатор порции изменений (дельты); - ``dataSetName`` — массив имен набора данных (product и stock - имена таблиц). Запрос с Endpoint’ом ``/newDelta`` будет иметь вид: .. code-block:: curl -X POST "http://:8088/api/v1/secure/////newDelta" -H "Authorization: Bearer " -H 'Content-Type: application/json' -d '{"requestId": "6B29FC40-CA47-1067-B31D-00DD010662DA", "dataSetName": ["product", "stock"]}' где: - ``requestId`` — идентификатор порции изменений (дельты), который был ранее сгенерирован; - ``dataSetName`` — имя набора данных (имена таблиц). .. note:: Данные для обновления/вставки и для удаления должны быть отдельно зарегистрированы в newDelta с разными requestId. Если требуется обновить/вставить данные, то сначала нужно зарегистрировать порцию данных через newDelta с requestId, затем прислать данные с зарегистрированным requestId через endpoint /partOfDelta (описан в :ref:`load_agreed_data`). Допускается передача как в одном файле одним запросом, так и в двух файлах двумя запросами - это не имеет значения. Главное условие - нужно отправлять только данные на обновление и вставку, и они должны быть уникальны по первичному ключу. Также в них не должно быть одинаковых записей с одним первичным ключом. Пример: отправлены два обновления одной записи (с одинаковым первичным ключом). В этом случае в хранилище Prostore попадет только одна запись, и неизвестно какая, поэтому дублей по первичному ключу отправленных с одним requestId быть не должно! Чтобы проверить статус выполнения запроса, необходимо направить запрос с Endpoint’ом ``/status`` (пример запроса описан в :ref:`etl_check`) Если обработка запроса завершится успешно, то Витрина данных вернет JSON-ответ, содержащий статус-сообщение об успешной операции: .. code-block:: json { "requestId": "6B29FC40-CA47-1067-B31D-00DD010662DA", "dataSets": [ "product", "stock" ], "inDeltaFlag": true, "statusCode": "SUCCESS", "statusMessage": "Загрузка порции данных успешно завершена" "errors": [] } где: - ``requestId`` — идентификатор порции изменений (дельты); - ``statusCode`` — код возвращаемого статуса (SUCCESS – запрос выполнен успешно). Пример JSON-ответа на проверку статуса, завершившегося ошибкой, приведен ниже: .. code-block:: json { "requestId": "6B29FC40-CA47-1067-B31D-00DD010662DA", "statusCode": "REQUESTID_ALREADY_EXIST", "statusMessage": "Дельта с requestId = 6B29FC40-CA47-1067-B31D-00DD010662DA уже существует. Статус загрузки дельты: SUCCESS" } где: - ``requestId`` — идентификатор порции изменений (дельты) который необходимо проверить; - ``statusCode`` — код возвращаемого статуса (REQUESTID_ALREADY_EXIST – идентификатор уже существует в БД); - ``statusMessage`` — сообщение с описанием кода ошибки. .. note:: Для удаления записей необходимо зарегистрировать новую дельту во Endpoint’е ``/newDelta`` с новым ``requestId``, и по зарегистрированному ``requestId`` должны быть присланы только данные на удаление. .. _load_agreed_data: Загрузка / удаление согласованных данных (Endpoint – partOfDelta) ****************************************************************** На данном шаге выполняется накапливание порции данных, создается вставка на Витрину данных по Endpoint’у ``isLastChunk``. .. note:: Если в процессе загрузки вызван метод ``newDelta``, то текущая загрузка будет прервана и порция не попадет на Витрину данных. Чтобы отправить последнюю порцию данных для таблицы product, необходимо направить запрос с Endpoint’ом ``/partOfDelta`` с указанием ``dataSetName=product`` и ``isLastChunk=true`` (что означает что данная порция данных - последняя). Обработка и загрузка данных не начнется, пока не будет направлен такой же запрос, но уже по таблице ``stock: dataSetName=stock, isLastChunk=true``. Пример запроса на загрузку данных под ранее созданный набор данных: .. code-block:: curl -X POST "http://:8088/api/v1/secure/////partOfDelta" -H "Authorization: Bearer " -F upload=@"./product.avro" -F dataSetName=product -F chunkNumber=0 -F isLastChunk=false -F requestId=a6212a7d-4526-4e2d-89a7-9828f380c91d где: - ``upload`` — загружаемый avro-файл (пример avro-файла с данными представлен в разделе 2); - ``dataSetName`` — имя набора данных (имя таблицы); - ``chunkNumber`` — номер порции dataSet в рамках дельты; - ``isLastChunk`` — флаг последней порции dataSet; - ``requestId`` — идентификатор порции изменений (дельты). В результате успешной загрузки при проверке статуса ``requestId`` (пример запроса по Endpoint’у ``/status`` представлен в :ref:`etl_check`) на запрос Витрина данных вернет JSON-ответ, содержащий статус-сообщение об успешной операции. Пример успешной загрузки: .. code-block:: { "requestId": "f3947645-88c8-4044-bd8b-de273f8a8461", "statusCode": "SUCCESS", "statusMessage": "Порция получена." } где: - ``requestId`` — идентификатор порции изменений (дельты); - ``statusCode`` — статус код результата запроса (SUCCESS - запрос выполнен успешно); - ``statusMessage`` — описание статусного сообщения. Если загрузка прервалась ошибкой, то при проверке статуса ``requestId`` (пример запроса по Endpoint’у ``/status`` представлен в :ref:`etl_check`) на запрос Витрина данных вернет JSON-ответ с описанием ошибки. Пример загрузки, прерванной ошибкой: .. code-block:: json { "requestId": "aef2f195-b037-4aaa-b171-f2746511e7e2", "dataSets": [ "stock" ], "inDeltaFlag": true, "statusCode": "ERROR", "statusMessage": "Произошла ошибка" "errors": [ { "dataSet": "stock", "errorType": "INSERT", "message": "Ошибка вставки в таблицы: stock" } ] } где: - ``requestId`` — идентификатор порции изменений (дельты); - ``dataSets`` — массив имен набора данных (имен таблиц где была допущена ошибка); - ``inDeltaFlag`` = true — загрузка согласованных данных производилась через endpoint /partOfDetla; - ``status`` — статус код результата запроса (ERROR – внутренняя ошибка); - ``statusMessage`` — описание статусного сообщения; - ``errors`` — массив, ошибки загрузки или парсинга входящих данных; - ``dataSet`` — название таблицы где допущена ошибка; - ``errorType`` — тип ошибки; - ``message`` — описание ошибки. .. note:: Возможна ситуация, когда после падения ETL приходит запрос с ``requestId``, который был до падения, в данном случае Витрина данных возвращает ошибку со статусом NOT_FOUND. Необходимо снова направить запрос по Endpoint’у ``/newDelta`` с новым ``requestId`` и начать процесс загрузки заново. .. _load_unagreed_data: Загрузка / удаление несогласованных данных (Endpoint – data) ****************************************************************** Для загрузки несогласованных данных поддерживается возможность накапливания данных, аналогично загрузке согласованных данных, описанной в :ref:`load_agreed_data`. Несогласованные данные – могут быть вставлены в разных дельтах и будут доступны потребителю постепенно по мере загрузки. Этот способ подходит для первоначальной загрузки, когда еще нет потребителей. Вставка на Витрину данных выполнится после накопления порции или по флагу ``isLast``, который используется для последней порции данных. Флаг ``isLast`` подает сигнал для завершения формирования дельты, для того чтобы выполнить вставку накопленных данных и закрыть транзакцию. Пример запроса: .. code-block:: curl -X POST "http://:8088/api/v1/secure/////data" -H "Authorization: Bearer " -F upload=@"./product.avro" -F dataSetName=product -F isLast=false -F requestId=a6212a7d-4526-4e2d-89a7-9828f380c91d где: - ``upload`` — загружаемый avro-файл (пример avro-файла с данными представлен в :ref:`etl_config`); - ``dataSetName`` — имя набора данных (имя таблицы); - ``isLast`` — флаг последней порции данных (сигнал для завершения формирования дельты, для того чтобы выполнить вставку накопленных данных и закрыть транзакцию.); - ``requestId`` — идентификатор порции изменений (дельты). В результате успешной операции при проверке статуса запроса по Endpoint’у ``/status`` (пример запроса описан в :ref:`etl_check`) Витрина данных вернет JSON-ответ, содержащий статус-сообщение: .. code-block:: json { "requestId": "6B29FC40-CA47-1067-B31D-00DD010662DA", "statusCode": "SUCCESS", "statusMessage": "Порция получена." } где: - ``requestId`` — идентификатор порции изменений (дельты); - ``statusCode`` — статус код результата запроса (SUCCESS - запрос выполнен успешно); - ``statusMessage`` — описание статусного сообщения. Если загрузка прервалась ошибкой, то при проверке ``requestId`` (пример запроса по Endpoint’у ``/status`` представлен в :ref:`etl_check`) Витрина данных вернет JSON-ответ с описанием ошибки. Пример JSON-ответа: .. code-block:: json { "requestId": "6B29FC40-CA47-1067-B31D-00DD010662DA", "statusCode": "NOT_FOUND", "statusMessage": "Не найдена дельта с requestId = 6B29FC40-CA47-1067-B31D-00DD010662DA" } где: - ``requestId`` — идентификатор порции изменений (дельты); - ``statusCode`` — статус код результата запроса (NOT_FOUND - данные по requestId были утеряны в результате остановки сервиса, необходимо зарегистрировать новую дельту и снова загрузить данные); - ``statusMessage`` — описание статусного сообщения. .. _responce_code_load: Описание возвращаемых кодов ********************************* .. table:: Описание возвращаемых кодов +--------------------------+---------+---------------------------------------------------------------------+ | **Наименование** | **Код** | **Описание** | +==========================+=========+=====================================================================+ | EMPTY_ATTACHMENT | 400 | Нет файла вложения | +--------------------------+---------+---------------------------------------------------------------------+ | ERROR | 500 | Внутренняя ошибка | +--------------------------+---------+---------------------------------------------------------------------+ | NOT_FOUND | 400 | Данные не найдены, либо были утеряны в результате остановки сервиса | +--------------------------+---------+---------------------------------------------------------------------+ | PROCESSING | 400 | Идет обработка данных | +--------------------------+---------+---------------------------------------------------------------------+ | REQUESTID_ALREADY_EXIST | 400 | ``requestId`` уже зарегистрирован | +--------------------------+---------+---------------------------------------------------------------------+ | SUCCESS | 200 | Успешное выполнение | +--------------------------+---------+---------------------------------------------------------------------+ | UNREGISTERED_DATASETNAME | 400 | Незарегистрированный набор данных | +--------------------------+---------+---------------------------------------------------------------------+ | WRONG_ENDPOINT | 400 | ``requestId`` зарегистрирован для другого Endpoint’а | +--------------------------+---------+---------------------------------------------------------------------+ .. _etl_check: Проверка статусной информации по загрузке / удалению данных (Endpoint – status) ################################################################################## В данном разделе производится проверка статусной информации из сервисных таблиц по ``requestId``. Пример запроса: .. code-block:: bash Curl -X GET "http://:8088/api/v1/secure/////status/" -H "Authorization: Bearer " где: - ``requestId`` — UUID идентификатор порции изменений (дельты). Пример ответа на такой запрос представлен ниже. .. code-block:: json { "requestId": "13f2475e-f3dc-4c9e-b2f6-3a98320261f1", "inDeltaFlag": false, "dataSets": [ "stock" ], "status": "ERROR", "statusMessage": "Произошла ошибка", "errors": [ { "dataSet": "stock", "errorType": "PARCING", "message": "Неверно указан тип поля count_pieces: LONG. Ожидается: INTEGER" }, { "dataSet": "stock", "errorType": "PARCING", "message": "Неверно указан тип поля product_id: LONG. Ожидается: INTEGER" } ] } где: - ``requestId`` — UUID идентификатор порции изменений (дельты); - ``inDeltaFlag = false`` — загрузка несогласованных данных производилась через endpoint /data; - ``dataSets`` — массив имен набора данных (имен таблиц где была допущена ошибка); - ``status`` — статус код результата запроса (NOT_FOUND, PROCESSING, ERROR, SUCCESS); - ``statusMessage`` — описание статусного сообщения; - ``errors`` — массив, ошибки загрузки или парсинга входящих данных; - ``dataSet`` — название таблицы где допущена ошибка; - ``errorType`` — тип ошибки; - ``message`` — описание ошибки. .. _etl_attach: Работа с вложениями через S3 ################################ Загрузка данных в хранилище (Endpoint – uploadAttachment) ************************************************************** Перед загрузкой источнику данных необходимо сгенерировать UUID (идентификатор для новой порции данных) и вставить его в запрос с Endpoint’ом ``/uploadAttachment``. При совпадении имен вложений в хранилище, вложение перезаписывается. Пример запроса на загрузку вложения в хранилище представлен ниже: .. code-block:: curl -X POST "http://:8088/api/v1/secure/////uploadAttachment" -H "Authorization: Bearer " -F upload=@"document.pdf" -F requestId=13f2475e-f3dc-4c9e-b2f6-3a98320261f1 -F name=Doc_1 где: - ``upload`` — путь до загружаемого файла-вложения; - ``requestId`` — UUID идентификатор запроса; - ``name`` — уникальное имя вложения. После успешной загрузки при проверке по Endpoint’у ``/status`` (пример запроса описан в :ref:`etl_check`) Витрина данных вернет JSON-ответ, содержащий статус-сообщение: .. code-block:: json { "requestId": "13f2475e-f3dc-4c9e-b2f6-3a98320261f1", "statusCode": "UPDATED", "statusMessage": "Файл Doc_1 успешно обновлен." } где: - ``requestId`` — UUID идентификатор запроса; - ``statusCode`` — статус код результата запроса (SUCCESS - запрос выполнен успешно; UPDATED – данные обновлены); - ``statusMessage`` — описание статусного сообщения. Пример неуспешной загрузки после проверки по endpoint’у /status (пример запроса описан в :ref:`etl_check`) представлен ниже: .. code-block:: json { "requestId": "13f2475e-f3dc-4c9e-b2f6-3a98320261f1", "statusCode": "ERROR", "statusMessage": "Произошла ошибка" } где: - ``requestId`` — UUID идентификатор запроса; - ``statusCode`` — статус код результата запроса (ERROR – запрос завершился ошибкой); - ``statusMessage`` — описание статусного сообщения. Удаление данных из хранилища (Endpoint – deleteAttachment) ****************************************************************** Для того чтобы удалить вложения из хранилища S3 необходимо направить следующий запрос: .. code-block:: curl -X DELETE "http://:8088/api/v1/secure/////deleteAttachment/Doc_1/requestId/" -H "Authorization: Bearer " где: - ``requestId`` — UUID идентификатор запроса. В результате успешного удаления Витрина данных вернет JSON-ответ, содержащий статус-сообщение: .. code-block:: json { "requestId": "13f2475e-f3dc-4c9e-b2f6-3a98320261f1", "statusCode": "SUCCESS", "statusMessage": "Файл Doc_1 успешно удален." } где: - ``requestId`` — UUID идентификатор запроса; - ``statusCode`` — статус код результата запроса (SUCCESS - запрос выполнен успешно); - ``statusMessage`` — описание статусного сообщения. Если удаление завершилось ошибкой, то Витрина данных вернет JSON-ответ c кодом ошибки: .. code-block:: json { "requestId": "13f2475e-f3dc-4c9e-b2f6-3a98320261f1", "statusCode": "NOT_FOUND", "statusMessage": "Файл Doc_1 не найден." } где: - ``requestId`` — UUID идентификатор запроса; - ``statusCode`` — статус код результата запроса (NOT_FOUND); - ``statusMessage`` — описание статусного сообщения. .. _responce_code_attach: Описание возвращаемых кодов ********************************* .. table:: Описание возвращаемых кодов +------------------+---------+---------------------+ | **Наименование** | **Код** | **Описание** | +==================+=========+=====================+ | EMPTY_ATTACHMENT | 400 | Нет файла вложения | +------------------+---------+---------------------+ | ERROR | 500 | Внутренняя ошибка | +------------------+---------+---------------------+ | SUCCESS | 200 | Успешное выполнение | +------------------+---------+---------------------+ | UPDATED | 200 | Данные обновлены | +------------------+---------+---------------------+ .. _etl_mapping: Маппинг данных (Endpoint – generateMapping) ############################################# В данном разделе описывается генерация файла маппинга. Endpoint предназначен для первичной настройки, а также перенастройки сервиса в случае изменения модели данных Витрины. Файл маппинга генерируется источником данных в формате ``Kotlin-script``. В файле описана модель данных Витрины данных в виде структуры объектов Kotlin (``table``, ``column``). Объекты ``table`` описывают таблицы, каждый из них содержит имя таблицы и список колонок в том порядке, в котором они созданы на Витрине. Объекты ``column`` описывают колонки, каждый из них содержит имя колонки, тип данных, признак обязательности (nullable), признак первичного ключа. Файл используется сервисом для описания модели данных и валидации входящих данных. Выполняются следующие проверки: - проверяется соответствие состава полей входящей avro-структуры составу полей, описанных в файле маппинга; - проверяется соответствие порядка полей входящей avro-структуры порядку полей, описанных в файле маппинга; - проверяется соответствие типов данных полей входящей avro-структуры типам полей, описанных в файле маппинга. Для полей с установленным признаком обязательности (nullable = false) выполняется проверка на null. При вызове Endpoint’а ``/generateMapping`` сервис генерирует файл на основе информации о модели, полученной из развернутой Витрины данных. Файл складывается сервисом на диск, а также возвращается в ответе на вызов. Пример запроса на генерацию маппинга представлен ниже: .. code-block:: curl -X GET "http://:8088/api/v1/secure/////generateMapping/aef2f195-0001-4aaa-b171-f2746511e889" -H "Authorization: Bearer " Результат Витрина данных вернет в формате Kotlin-script: .. code-block:: kotlin import ru.supercode.mapping.common.ColumnType.* import ru.supercode.mapping.mapper.dsl.mappingAvro mappingAvro { table("product") { column("id", INTEGER) { nullable = false; primary = true; } column("name", STRING) { nullable = false; } } table("stock") { column("product_id", INTEGER) { nullable = false; primary = true; } column("count_pieces", INTEGER) { nullable = false; } } } .. _etl_validation: Валидация данных ##################### Валидация порции данных производится в момент обработки и вставки. .. note:: Помимо валидации данных осуществляется валидация параметров запроса. Во всех Endpoint’ах ``requestId`` должен быть в формате UUID. В случае ошибок валидации результат будет возвращен при вызове Endpoint’а ``/status``. **Ошибки, возникающие в процессе обработки Endpoint’а /newDelta**: - отклоняются запросы, которые получены в момент обработки порции данных (``statusCode: PROCESSED``); - если пустой параметр ``dataSetName``; - прислан запрос с уже зарегистрированным ``requestId`` и ``statusCode`` данного ``requestId`` не равен ``NOT_FOUND`` или ``WAIT_DATA``. **Ошибки, возникающие в процессе обработки Endpoint’а /partOfDelta**: - прислан запрос с незарегистрированным ``requestId``; - прислан запрос с уже зарегистрированным ``requestId`` и ``statusCode`` данного ``requestId`` не равен ``NOT_FOUND`` или ``WAIT_DATA``; - прислан запрос с ``requestId`` зарегистрированным для Endpoint'а ``/data``; - прислан запрос с параметром ``dataSetName``, который не был зарегистрирован в Endpoint’е ``/newDelta``; - нет файла вложения в параметре ``upload``. Ошибки, возникающие в процессе обработки Endpoint’а ``/data``: - отклоняются запросы, которые получены в момент обработки порции данных (``statusCode: PROCESSED``); - прислан запрос с незарегистрированным ``requestId``; - прислан запрос с уже зарегистрированным ``requestId`` и ``statusCode`` данного ``requestId`` не равен ``NOT_FOUND`` или ``WAIT_DATA``; - прислан запрос с ``requestId`` зарегистрированным для Endpoint’а ``/partOfDelta``; - нет файла вложения в параметре ``upload``. **Ошибки, возникающие в процессе обработки Endpoint’а /uploadAttachment**: - нет файла вложения в параметре ``upload``. **Ошибки, возникающие в процессе обработки Endpoint’а /generateMapping**: - не созданы логические таблицы в схеме.