INSERT SELECT FROM external_table
Содержание раздела
- Требования к загружаемым данным
- Способы загрузки данных
- Особенности загрузки данных в standalone-таблицы
- Признак успешной загрузки
- Перезапуск и отмена операций
- Как работает запрос
- Синтаксис
- Варианты ответа
- Ограничения
- Примеры
- Загрузка данных в логическую таблицу в дельте
- Загрузка данных в снапшот-таблицу
- Загрузка данных в логическую таблицу вне дельты
- Загрузка данных в прокси-таблицу
- Загрузка данных в standalone-таблицу
- Загрузка данных в партиционированную таблицу
- Загрузка данных в партицию
- Перезапуск операций по загрузке данных
Поддерживается в версиях: 7.7 / 7.6 / 7.5 / 7.4 / 7.3 / 7.2 / 7.1 / 7.0 / 6.12 / 6.11 / 6.10 / 6.9 / 6.8 / 6.7 / 6.6 / 6.5 / 6.4 / 6.3 / 6.2 / 6.1 / 6.0 / 5.8 / 5.7 / 5.6 / 5.5 / 5.4 / 5.3 / 5.2 / 5.1 / 5.0.
Запрос загружает данные из топика Kafka в указанную таблицу:
Путь к топику задается с помощью внешней таблицы, указываемой в запросе.
При загрузке данных в прокси-таблицы и standalone-таблицы учитывайте ограничения таблиц целевой СУБД.
Прямое перемещение данных между топиками Kafka недоступно. Но можно использовать таблицу как промежуточное звено: загрузить данные из топика в таблицу, затем выгрузить данные из таблицы в другой топик.
Требования к загружаемым данным
Данные в топике должны иметь формат, подходящий для загрузки. Валидация и, если нужно, корректировка данных должна проводиться внешней системой до загрузки данных в топик.
Способы загрузки данных
Загрузить данные с помощью запроса можно с использованием:
- внешней таблицы загрузки,
- внешней readable-таблицы (только в ADP; требуется коннектор Kafka Jet writer).
Сравнение способов см. в разделе Загрузка данных.
Особенности загрузки данных в standalone-таблицы
Синтаксис загрузки в standalone-таблицу подразумевает использование внешней writable-таблицы, которая указывает на нужную standalone-таблицу.
Признак успешной загрузки
Об успешности загрузки данных можно судить по успешному ответу на запрос загрузки данных. Другие факторы, например смещения (офсеты) в топике Kafka или данные в физической таблице *_staging, не означают успешность загрузки.
Перезапуск и отмена операций
Кластер с кворумом нод автоматически отменяет сбойные операции записи при типовых сбоях, исключая необходимость ручного вмешательства.
Возможности по ручному управлению операциями описаны в разделе Управление операциями записи.
Как работает запрос
Консьюмер-группы и офсеты Kafka
Данные загружаются из Kafka в датасорсы с помощью коннекторов, каждый из которых использует свою консьюмер-группу.
Каждый коннектор читает данные с последнего смещения (офсета) своей группы и обновляет его по ходу загрузки. При сбое и откате операций записи смещение не откатывается — оно остается на значении, прочитанном к моменту сбоя.
Для коннекторов одного типа используйте разные имена консьюмер-групп, чтобы избежать конкуренции за топик и нестабильной загрузки.
Обработка загружаемых записей
Данные выбираются из топика Kafka в соответствии с указанным SELECT-подзапросом и загружаются в указанную таблицу.
Обработка каждой загружаемой записи зависит от наличия/отсутствия в таблице записи с таким же первичным ключом (PK), а также значений опций set.on.conflict.do и set.delete.tracking.enable таблицы (для снапшот-таблиц), как показано ниже.
| Таблица | PK новый или таблица без PK | PK совпадает и sys_op=0 | PK совпадает и sys_op=1 |
|---|---|---|---|
| Логическая таблица | Добавление новой записи | Обновление записи | Удаление записи |
| Снапшот-таблица | Добавление новой записи |
|
|
| Прокси-, standalone-таблица | Добавление новой записи |
| |
Обновление записей в снапшот-таблице без первичного ключа не поддерживается: вставляемая запись всегда добавляется как новая.
Особенности загрузки в партиции
Загрузка в партиции имеет особенности:
- [через партиционированную таблицу] записи распределяются по партициям; записи вне диапазонов партиций игнорируются;
- [напрямую в партиции] записи вне диапазонов партиции игнорируются.
Заполнение столбцов таблицы
Столбцы добавляемых и обновляемых записей:
- [указанные в запросе] заполняются значениями из загружаемых данных;
- [пропущенные* в запросе] заполняются значениями, используемыми по умолчанию в СУБД.
Столбцы удаляемых записей:
- [sys_op] заполняется значением 1;
- [остальные столбцы] сохраняют свои текущие значения, игнорируя загруженные данные.
* Для добавляемых и обновляемых записей можно пропустить только nullable-столбцы: пропуск обязательных столбцов (NOT NULL) вызывает ошибку. Для удаляемых записей можно пропустить любые столбцы, кроме столбцов первичного ключа.
Выполнение при отключенном датасорсе
Отключенный датасорс пропускается при исполнении запроса. Загрузка данных считается успешной, если данные сохранены в необходимые датасорсы.
Статистика
Запросы INSERT SELECT FROM external_table учитываются в категории UPLOAD статистики. Подробнее о категориях запросов в статистике см. в разделе GET_ENTITY_STATISTICS, о способах просмотра статистики — в разделе Управление статистикой.
Синтаксис
Загрузка с помощью внешней таблицы загрузки:
[RETRY] INSERT INTO [db_name.]table_name SELECT { * | column_name, ... }
FROM [db_name.]upload_ext_table_name
Загрузка с помощью внешней readable-таблицы:
[RETRY] INSERT INTO [db_name.]table_name SELECT { * | column_name, ... }
FROM [db_name.]readable_ext_table_name
[WHERE condition]
[LIMIT count] [OFFSET start]
Параметры:
db_name-
Имя логической базы данных. Опционально, если выбрана логическая БД, используемая по умолчанию.
table_name-
Имя таблицы-приемника данных. Возможные значения:
- имя логической таблицы,
- имя снапшот-таблицы,
- имя прокси-таблицы,
- имя внешней writable-таблицы, указывающей на нужную standalone-таблицу.
Логическая таблица может быть любого вида.
Ключевое слово SELECT { * | column_name, ... }
Задает имена всех столбцов внешней таблицы, определяющей параметры загрузки. Имена, типы и порядок указанных столбцов должны соответствовать именам, типам и порядку столбцов (полей) в таблице-приемнике данных, а также типам и порядку полей в топике Kafka, из которого загружаются данные.
Ключевое слово FROM [db_name.]{upload_ext_table_name | readable_ext_table_name}
Задает имя внешней таблицы загрузки или внешней readable-таблицы, определяющей параметры загрузки.
Ключевое слово WHERE condition
Задает условия выбора данных из топика при их загрузке в таблицу. Поддерживается только для загрузки через внешнюю readable-таблицу.
Условие может содержать операторы:
>,>=,<,<=,=,<>,IS NULL,IS NOT NULL,AND,OR.
Если ключевое слово не указано при загрузке через readable-таблицу, из топика загружаются все непрочитанные ранее данные.
Ключевое слово LIMIT count
Задает количество строк, загружаемых из топика в таблицу. Поддерживается только для загрузки через внешнюю readable-таблицу.
Значение ключевого слово не влияет на количество сообщений, считываемых из топика, — при обработке каждого запроса коннектор считывает все непрочитанные ранее сообщения из топика.
Поддерживаемые сочетания LIMIT и OFFSET:
LIMIT,LIMIT count OFFSET start.
Строки загружаются без гарантий сохранения порядка их следования, поэтому рекомендуется использовать загрузку с LIMIT и OFFSET только в тестовых целях.
Если ключевое слово не указано при загрузке через readable-таблицу, из топика загружаются все непрочитанные ранее данные.
Ключевое слово OFFSET start
Задает количество строк, пропускаемых при загрузке данных из топика в таблицу. Поддерживается только для загрузки через внешнюю readable-таблицу.
Значение ключевого слово не влияет на количество сообщений, считываемых из топика, — при обработке каждого запроса коннектор считывает все непрочитанные ранее сообщения из топика.
Поддерживаемые сочетания LIMIT и OFFSET:
LIMIT,LIMIT count OFFSET start.
Строки загружаются без гарантий сохранения порядка их следования, поэтому рекомендуется использовать загрузку с LIMIT и OFFSET только в тестовых целях.
Если ключевое слово не указано при загрузке через readable-таблицу, из топика загружаются все непрочитанные ранее данные.
Ключевое слово RETRY
При типовых сбоях (перебои в сети, сбой ноды и т.п.) операции отменяются автоматически и не требуют ручного управления с помощью RETRY или других команд.
Перезапускает обработку операции записи со статусом 0 (Выполняется) в логической таблице. Если указано, после него должна следовать копия исходного запроса, создавшего перезапускаемую операцию.
Не поддерживается для других таблиц:
- игнорируется для прокси- и standalone-таблиц;
- приводит к ошибке для снапшот-таблиц.
Если ключевое слово не указано, система обрабатывает запрос как новый, а не перезапущенный.
Независимо от наличия/отсутствия RETRY запрос загружает данные из топика Kafka с последнего прочитанного офсета.
Варианты ответа
В ответе возвращается:
- объект ResultSet c одной записью при успешном выполнении запроса;
- исключение при неуспешном выполнении запроса.
Успешный ответ содержит столбцы:
sysCn:- номер выполненной операции записи — при загрузке в логическую или снапшот-таблицу;
- пустое значение — при загрузке в прокси-таблицу или во внешнюю writable-таблицу;
ts:- дата и время завершения операции записи в формате
YYYY-MM-DD hh:mm:ss.SSSSSS— при выполнении операции вне дельты; - пустое значение — при выполнении операции в дельте;
- дата и время завершения операции записи в формате
rowsAffected— количество затронутых (добавленных, измененных и удаленных) строк. Расчет значения поддерживается для СУБД ADB и ADP.
Запрос с RETRY возвращает в rowsAffected количество строк, затронутых перезапущенной операцией записи. Строки, обработанные до перезапуска операции, не учитываются.
Ограничения
Ограничения выполнения
- Выполнение запроса к логической или снапшот-таблице недоступно, если она участвует в незавершенной операции по изменению схемы.
- Выполнение запроса вне дельты недоступно, если последняя дельта закрыта с меткой времени, превышающей серверное время.
Ограничения сущностей
- Запрос недоступен для логических и материализованных представлений.
- Обновление существующих записей не поддерживается в следующих снапшот-таблицах:
- без первичного ключа — загружаемые записи с
sys_op=0добавляются как новые; - с первичным ключом и опцией
set.on.conflict.do=error— все загружаемые записи отбрасываются, возвращается ошибка; - с первичным ключом и опцией
set.on.conflict.do=nothing— отбрасываются записи с дубликатами первичного ключа, остальные — применяются.
- без первичного ключа — загружаемые записи с
- Удаление записей не поддерживается из следующих снапшот-таблиц:
- без первичного ключа — удаляемые записи вставляются как новые с
sys_op=1; - с первичным ключом и опциями
set.delete.tracking.enable=true+set.on.conflict.do=error— все загружаемые записи отбрасываются, возвращается ошибка; - с первичным ключом и опциями
set.delete.tracking.enable=true+set.on.conflict.do=nothing— отбрасываются записи с дубликатами первичного ключа, остальные — применяются.
- без первичного ключа — удаляемые записи вставляются как новые с
- Обновление существующих записей не поддерживается в прокси- и standalone-таблицах в ADB, ADQM и ADG — возвращается ошибка или добавляются записи-дубликаты по первичному ключу (зависит от СУБД).
Ограничения ключевых слов
- Частичная загрузка данных с указанием
WHERE,LIMITиOFFSETдоступна только при загрузке с помощью внешних readable-таблиц. - Ключевое слово
RETRYдоступно только для логических таблиц; для других таблиц игнорируется или приводит к ошибке.
Ограничения партиционирования
- При загрузке данных в партиционированную таблицу все записи, для которых нет подходящей партиции, игнорируются.
- Одновременная запись данных (загрузка, вставка и удаление) в партиционированную таблицу и ее партиции недоступна.
Ограничения смещений (офсетов) Kafka
- При сбое и откате операций записи смещение не откатывается — оно остается на значении, прочитанном к моменту сбоя.
- Для равенства данных, загружаемых в разнотипные датасорсы, необходимо загружать данные из общего топика и контролировать, что начальные смещения в топике одинаковы для всех участвующих консьюмер-групп. Это актуально для любых коннекторов, кроме Kafka Jet writer, который обычно имеет общую группу для всех датасорсов.
Другие ограничения
- Пропуск в запросе обязательных столбцов (
NOT NULL) таблицы не поддерживается: возвращается ошибка. - Недоступна одновременная загрузка данных из одного топика в разные таблицы ADG.
- Загрузка данных с помощью readable-таблиц доступна только в ADP и только при наличии установленного коннектора Kafka Jet writer.
- При обработке запроса отключенные (отключенный датасорс: Датасорс, отключенный системой из-за сбоя или администратором
) датасорсы (датасорс: СУБД или кластер СУБД хранилища
) пропускаются без возврата ошибки. Ошибка возвращается, если не осталось включенных (включенный датасорс: Датасорс, работающий в штатном режиме
) датасорсов, необходимых для записи данных.
Примеры
Загрузка данных в логическую таблицу в дельте
Загрузка с помощью внешней таблицы загрузки marketing.sales_ext_upload в дельте:
-- выбор логической базы данных по умолчанию
USE marketing;
-- открытие новой дельты
BEGIN DELTA;
-- запуск загрузки данных в логическую таблицу sales
INSERT INTO sales SELECT * FROM sales_ext_upload;
-- закрытие дельты
COMMIT DELTA;
Загрузка с помощью внешней readable-таблицы marketing.sales_ext_upload_through_ret вне дельты:
INSERT INTO marketing.sales SELECT * FROM marketing.sales_ext_upload_through_ret
WHERE (product_code = 'ABC1111' OR product_code = 'ABC1212') AND transaction_date >= '2023-01-01 00:00:00'
Загрузка данных в снапшот-таблицу
-- создание внешней readable-таблицы для загрузки данных
CREATE READABLE EXTERNAL TABLE marketing.ret_for_sales_snapshot (
id BIGINT,
transaction_date TIMESTAMP,
product_code VARCHAR(256),
product_units BIGINT,
store_id BIGINT,
description VARCHAR(256),
sys_op INTEGER,
PRIMARY KEY(id)
)
DISTRIBUTED BY (id)
LOCATION 'kafka://$kafka/sales_snapshot_in'
FORMAT 'AVRO';
-- загрузка данных в cнапшот-таблицу sales_snapshot
INSERT INTO marketing.sales_snapshot SELECT * FROM marketing.ret_for_sales_snapshot;
Загрузка данных в логическую таблицу вне дельты
-- загрузка данных в логическую таблицу sales вне дельты
INSERT INTO marketing.sales SELECT * FROM marketing.sales_ext_upload;
Загрузка данных в прокси-таблицу
-- создание внешней readable-таблицы для загрузки данных
CREATE READABLE EXTERNAL TABLE marketing.payments_ext_upload_to_proxy (
id BIGINT,
agreement_id BIGINT,
type_code VARCHAR(16),
amount DOUBLE,
currency_code VARCHAR(3),
description VARCHAR,
PRIMARY KEY(id)
)
DISTRIBUTED BY (id)
LOCATION 'kafka://$kafka/payments_adp_in'
FORMAT 'AVRO';
-- загрузка данных в прокси-таблицу payments_proxy
INSERT INTO marketing.payments_proxy SELECT * FROM marketing.payments_ext_upload_to_proxy;
Загрузка данных в standalone-таблицу
Загрузка в standalone-таблицу датасорса adp, на которую указывает внешняя writable-таблица agreements_ext_write_adp:
INSERT INTO marketing.agreements_ext_write_adp SELECT * FROM marketing.agreements_ext_upload;
Загрузка в standalone-таблицу датасорса adqm, на которую указывает внешняя writable-таблица payments_ext_write_adqm:
INSERT INTO marketing.payments_ext_write_adqm SELECT * FROM marketing.payments_ext_upload;
Загрузка данных в партиционированную таблицу
-- выбор логической базы данных по умолчанию
USE marketing;
-- создание внешней таблицы загрузки
CREATE UPLOAD EXTERNAL TABLE sales_part_ext_upload (
id BIGINT,
transaction_date TIMESTAMP,
product_code VARCHAR(256),
product_units BIGINT,
store_id BIGINT,
description VARCHAR(256)
)
LOCATION 'kafka://localhost:2181/sales_part_in'
FORMAT 'AVRO'
MESSAGE_LIMIT 1000;
-- открытие новой дельты
BEGIN DELTA;
-- запуск загрузки данных в партиционированную таблицу sales_partitioned
INSERT INTO sales_partitioned SELECT * FROM sales_part_ext_upload;
-- закрытие дельты
COMMIT DELTA;
Загрузка данных в партицию
-- выбор логической базы данных по умолчанию
USE marketing;
-- открытие новой дельты
BEGIN DELTA;
-- запуск загрузки данных в партицию sales_jan_2023
---- можно использовать ту же внешнюю таблицу загрузки, что и для партиционированной таблицы,
---- или создать отдельную внешнюю таблицу
INSERT INTO sales_feb_2023 SELECT * FROM sales_part_ext_upload;
-- закрытие дельты
COMMIT DELTA;
Перезапуск операций по загрузке данных
-- выбор логической базы данных по умолчанию
USE marketing;
-- открытие новой дельты
BEGIN DELTA;
-- запуск загрузки данных в логическую таблицу sales с помощью внешней таблицы загрузки
INSERT INTO sales SELECT * FROM sales_ext_upload;
-- перезапуск обработки операции по загрузке данных в логическую таблицу sales с помощью внешней таблицы загрузки
RETRY INSERT INTO sales SELECT * FROM sales_ext_upload;
-- запуск загрузки данных в логическую таблицу sales с помощью внешней readable-таблицы
INSERT INTO sales SELECT * FROM sales_ext_upload_through_ret
WHERE (product_code = 'ABC1111' OR product_code = 'ABC1212') AND transaction_date >= '2023-01-01 00:00:00';
-- перезапуск обработки операции по загрузке данных в логическую таблицу sales с помощью внешней readable-таблицы
RETRY INSERT INTO sales SELECT * FROM sales_ext_upload_through_ret
WHERE (product_code = 'ABC1111' OR product_code = 'ABC1212') AND transaction_date >= '2023-01-01 00:00:00';
-- закрытие дельты
COMMIT DELTA;