Как прочитать данные
Содержание раздела
Система позволяет запрашивать данные и получать выборку в ответе. Данные можно запрашивать из следующих сущностей и их соединений:
- логических таблиц,
- снапшот-таблиц,
- прокси-таблиц,
- standalone-таблиц,
- логических представлений,
- материализованных представлений.
Предварительный шаг для standalone-таблиц
Чтение данных из standalone-таблицы выполняется с помощью внешней readable-таблицы. Если она отсутствует, ее необходимо создать.
Чтобы создать внешнюю readable-таблицу, выполните CREATE READABLE EXTERNAL TABLE:
- если standalone-таблица отсутствует и ее нужно создать при создании внешней таблицы, укажите в запросе ключевое слово
OPTIONSсо значениемauto.create.table.enable=true; - иначе выполните запрос без ключевого слова
OPTIONS.
Чтение данных одной выборкой (HTTP, JDBC)
Способ подходит для чтения данных, уже существующих в системе и включающих до сотни записей. Для других сценариев см. Потоковое чтение данных (HTTP, JDBC) и Чтение данных с табличным параметром (HTTP).
Чтобы прочитать данные единой выборкой:
- [Вручную по HTTP или JDBC] Выполните SELECT-запрос в выбранном HTTP- или SQL-клиенте.
- [Программно по HTTP] Реализуйте клиент (см. пример ниже), который:
- [Программно по JDBC] Реализуйте клиент (см. пример ниже), который:
- устанавливает соединение с системой по JDBC;
- исполняет SELECT-запрос с помощью методов интерфейса
PreparedStatementилиStatementбезsetFetchSize; - обрабатывает результат запроса и закрывает соединение.
Пример запроса на чтение данных в SQL-клиенте
На рисунке ниже показан пример запроса SELECT * FROM marketing.sales WHERE product_code = 'ABC1835' с помощью SQL-клиента.

Запрос с помощью SQL-клиента
Пример класса для чтения данных одной выборкой по HTTP
Ниже показан пример реализации класса HTTP-клиента для чтения данных одной выборкой в JSON-формате. Код написан на Java с использованием Vertx.
// создание инстанса vertx
Vertx vertx = Vertx.vertx();
// создание HTTP-клиента с заданными параметрами
HttpClientOptions clientOptions = new HttpClientOptions();
// установка протокола HTTP/2
clientOptions.setProtocolVersion(HttpVersion.HTTP_2);
// отключение переключения с HTTP/1.1 на HTTP/2
clientOptions.setHttp2ClearTextUpgrade(false);
HttpClient httpClient = vertx.createHttpClient(clientOptions);
// формирование тела запроса: параметризованный SELECT без потокового чтения
// параметр fetchSize не указан, поэтому вся выборка возвращается одним ответом
// maxRowsToRead=100 ограничивает число возвращаемых записей
JsonObject requestBody = new JsonObject()
.put("query", "SELECT * FROM sales FOR SYSTEM_TIME AS OF ? WHERE store_id = ?")
.put("queryId", "72840")
.put("maxRowsToRead", 100)
.put("params", new JsonArray()
.add(new JsonObject().put("value", "2024-05-10 13:12:09").put("type", "TIMESTAMP"))
.add(new JsonObject().put("value", 123).put("type", "BIGINT")));
// отправка запроса на чтение из логической базы данных marketing
Future<Void> requestFuture = httpClient.request(HttpMethod.POST, 9090, "localhost", "/api/v1/datamarts/marketing/query?format=json")
.compose(request -> {
// добавление заголовков запроса
request.putHeader(HttpHeaders.CONTENT_TYPE, "application/json");
request.putHeader("x-request-id", "e89cc2fc-8fc1-415f-9dee-02deb30c6f34");
// отправка тела запроса и получение полного ответа одной порцией
return request.send(requestBody.toBuffer())
.compose(response -> {
// вывод кода ответа
System.out.println("Response status code: " + response.statusCode());
// получение всего тела ответа целиком
return response.body();
})
.compose(body -> {
// разбор единого JSON-ответа со всей выборкой
JsonObject result = body.toJsonObject();
System.out.println("Query id: " + result.getString("queryId"));
System.out.println("Rows: " + result.getLong("rows"));
System.out.println("Statistics: " + result.getJsonObject("statistics"));
System.out.println("Columns: " + result.getJsonArray("metadata"));
// обработка записей выборки
result.getJsonArray("result").forEach(row -> System.out.println("Row: " + row));
return Future.<Void>succeededFuture();
});
})
// завершение чтения: вывод успешного ответа или логирование ошибки
.onSuccess(v -> System.out.println("Read done"))
.onFailure(Throwable::printStackTrace);
requestFuture.toCompletionStage()
.toCompletableFuture()
// ожидание завершения процесса
.get();
// закрытие инстанса vertx
vertx.close();
Пример класса для чтения данных одной выборкой по JDBC
Ниже показан пример реализации класса JDBC-клиента для чтения данных одной выборкой в JSON-формате. Код написан на Java с использованием Vertx.
public class SingleResultSetReadExample {
public static void main(String[] args) {
// адрес подключения к ноде Prostore с логической базой данных marketing по умолчанию
String url = "jdbc:prostore://localhost:9090/marketing";
// параметризованный SELECT-запрос
String query = "SELECT * FROM sales " +
"FOR SYSTEM_TIME AS OF ? " +
"WHERE store_id = ?;";
try (Connection connection = DriverManager.getConnection(url)) {
System.out.println("Connected");
try (PreparedStatement statement = connection.prepareStatement(query)) {
// параметр fetchSize не задается, поэтому вся выборка возвращается одним ResultSet
// (потоковое чтение выключено)
// установка значений параметров запроса
statement.setTimestamp(1, Timestamp.valueOf("2024-05-10 13:12:09")); // метка времени для FOR SYSTEM_TIME AS OF
statement.setLong(2, 123); // значение store_id
// выполнение запроса и получение всей выборки
try (ResultSet resultSet = statement.executeQuery()) {
// перебор строк выборки
while (resultSet.next()) {
// добавьте логику обработки полученных строк, например:
long id = resultSet.getLong("id");
Timestamp transactionDate = resultSet.getTimestamp("transaction_date");
System.out.println(id + " " + transactionDate);
}
}
} catch (SQLException error) {
// добавьте логику обработки SQL-ошибок
error.printStackTrace();
}
} catch (SQLException error) {
error.printStackTrace();
}
}
}
Потоковое чтение данных (HTTP, JDBC)
Способ подходит для чтения данных, уже существующих в системе и включающих более сотни записей. Для других сценариев см. Чтение данных одной выборкой (HTTP, JDBC) и Чтение данных с табличным параметром (HTTP).
Чтобы прочитать данные потоком, состоящим из порций указанного размера:
- [Программно по HTTP] Реализуйте клиент (см. пример ниже), который:
- [Программно по JDBC] Реализуйте клиент (см. пример ниже), который:
- устанавливает соединение с системой по JDBC;
- исполняет SELECT-запрос с помощью методов интерфейса
PreparedStatementилиStatementсsetFetchSize> 0; - обрабатывает результат запроса и закрывает соединение.
Потоковое чтение по JDBC не поддерживает одновременный возврат результатов нескольких запросов. Если один вызов statement.executeQuery содержит несколько запросов, результаты каждого следующего возвращаются только после возврата результатов предыдущего.
Пример класса для потокового чтения данных по HTTP
Ниже показан пример реализации класса HTTP-клиента для потокового чтения данных в JSON-формате. Код написан на Java с использованием Vertx.
// создание инстанса vertx
Vertx vertx = Vertx.vertx();
// создание HTTP-клиента с заданными параметрами
HttpClientOptions clientOptions = new HttpClientOptions();
// установка протокола HTTP/2 (обязателен для потокового чтения)
clientOptions.setProtocolVersion(HttpVersion.HTTP_2);
// отключение переключения с HTTP/1.1 на HTTP/2
clientOptions.setHttp2ClearTextUpgrade(false);
HttpClient httpClient = vertx.createHttpClient(clientOptions);
// формирование тела запроса: параметризованный SELECT с потоковым чтением
// fetchSize=100 включает потоковое чтение (порции по 100 записей)
// maxRowsToRead=10000 ограничивает суммарное число записей
// fetchTimeoutMs=60000 ограничивает время исполнения запроса в системе
// params задают значения для индексированных параметров (?) в порядке следования
JsonObject requestBody = new JsonObject()
.put("query", "SELECT * FROM sales FOR SYSTEM_TIME AS OF ? WHERE store_id = ?")
.put("queryId", "72848562")
.put("fetchSize", 100)
.put("maxRowsToRead", 10000)
.put("fetchTimeoutMs", 60000)
.put("params", new JsonArray()
.add(new JsonObject().put("value", "2024-05-10 13:12:09").put("type", "TIMESTAMP"))
.add(new JsonObject().put("value", 123).put("type", "BIGINT")));
// буфер для сборки построчно разделенных JSON-ответов потока
// ответы потока разделены переводом строки (по одному JSON-объекту на строку);
// уточните формат разбиения по поведению ноды Prostore
StringBuilder pending = new StringBuilder();
// отправка запроса на потоковое чтение из логической базы данных marketing
Future<Void> requestFuture = httpClient.request(HttpMethod.POST, 9090, "localhost", "/api/v1/datamarts/marketing/query?format=json")
.compose(request -> {
// добавление заголовков запроса
request.putHeader(HttpHeaders.CONTENT_TYPE, "application/json");
request.putHeader("x-request-id", "84706c2d-ba4a-4bb5-85e3-a367090f5882");
// отправка тела запроса и обработка потока ответов
return request.send(requestBody.toBuffer())
.compose(response -> {
// вывод кода ответа
System.out.println("Response status code: " + response.statusCode());
Promise<Void> promise = Promise.promise();
// обработка ошибок
response.exceptionHandler(promise::tryFail);
// обработка порций данных по мере их поступления
response.handler(chunk -> {
pending.append(chunk.toString());
int nl;
// выделение завершенных строк-ответов из буфера
while ((nl = pending.indexOf("\n")) >= 0) {
String line = pending.substring(0, nl).trim();
pending.delete(0, nl + 1);
if (!line.isEmpty()) {
handleStreamResponse(new JsonObject(line));
}
}
});
// завершение формирования и обработки ответа
response.endHandler(v -> {
// обработка возможного остатка в буфере
String tail = pending.toString().trim();
if (!tail.isEmpty()) {
handleStreamResponse(new JsonObject(tail));
}
promise.tryComplete();
});
return promise.future();
})
.onComplete(ar -> {
if (ar.failed()) {
ar.cause().printStackTrace();
}
});
})
// завершение потокового чтения: вывод успешного ответа или логирование ошибки
.onSuccess(v -> System.out.println("Read done"))
.onFailure(Throwable::printStackTrace);
requestFuture.toCompletionStage()
.toCompletableFuture()
// ожидание завершения процесса
.get();
// закрытие инстанса vertx
vertx.close();
// обработка одного ответа из потока:
// - первый ответ содержит метаданные (metadata) — логическую схему выборки;
// - промежуточные и последний ответы содержат порции записей (result);
// - последний ответ дополнительно содержит rows и statistics
private static void handleStreamResponse(JsonObject response) {
JsonArray metadata = response.getJsonArray("metadata");
if (metadata != null) {
System.out.println("Columns: " + metadata);
return;
}
JsonArray result = response.getJsonArray("result");
if (result != null) {
result.forEach(row -> System.out.println("Row: " + row));
}
Long rows = response.getLong("rows");
if (rows != null) {
System.out.println("Total rows: " + rows + ", statistics: " + response.getJsonObject("statistics"));
}
}
Пример класса для потокового чтения данных по JDBC
Ниже показан пример реализации класса JDBC-клиента для потокового чтения данных в JSON-формате. Код написан на Java с использованием Vertx.
public class StreamingReadExample {
public static void main(String[] args) {
// Адрес подключения к ноде Prostore с логической базой данных marketing по умолчанию
String url = "jdbc:prostore://localhost:9090/marketing";
// Параметризованный SELECT-запрос
String query = "SELECT * FROM sales " +
"FOR SYSTEM_TIME AS OF ? " +
"WHERE store_id = ?;";
try (Connection connection = DriverManager.getConnection(url)) {
System.out.println("Connected");
try (PreparedStatement statement = connection.prepareStatement(query)) {
// Включение потокового чтения данных порциями по 100 записей
statement.setFetchSize(100);
// Максимальное количество записей по запросу. Опциональный параметр, по умолчанию количество не ограничено
statement.setMaxRows(5000);
// Максимальное время исполнения запроса и получения каждой порции данных в секундах.
// Опциональный параметр, по умолчанию время не ограничено
statement.setQueryTimeout(300);
// Установка значений параметров запроса
statement.setTimestamp(1, Timestamp.valueOf("2024-05-10 13:12:09")); // метка времени для FOR SYSTEM_TIME AS OF
statement.setLong(2, 123); // значение store_id
// Открытие курсора
try (ResultSet resultSet = statement.executeQuery()) {
// Сдвиг курсора: порции данных вычитываются по мере перебора строк
while (resultSet.next()) {
// Добавьте логику обработки полученных строк, например:
long id = resultSet.getLong("id");
Timestamp transactionDate = resultSet.getTimestamp("transaction_date");
System.out.println(id + " " + transactionDate);
}
}
} catch (SQLException error) {
// Добавьте логику обработки SQL-ошибок
error.printStackTrace();
}
} catch (SQLException error) {
error.printStackTrace();
}
}
}
Чтение данных с табличным параметром (HTTP)
Способ подходит для чтения комбинированных данных: существующих в системе + загруженных в запросе. Доступно чтение одной выборкой или потоком. Для других сценариев см. Чтение данных одной выборкой (HTTP, JDBC) и Потоковое чтение данных (HTTP, JDBC).
Чтобы прочитать данные с использованием табличного параметра:
- [Для чтения одной выборкой] Реализуйте клиент (см. пример ниже), который:
- устанавливает HTTP-соединение с системой по HTTP/2 или HTTP/1.1;
- исполняет SELECT-запрос через /query-upload без параметра
fetchSize; - обрабатывает результат запроса и закрывает соединение.
- [Для потокового чтения] Реализуйте клиент (см. пример ниже), который:
- устанавливает HTTP-соединение с системой по HTTP/2;
- исполняет SELECT-запрос через /query-upload с параметром
fetchSize> 0; - обрабатывает результат запроса и закрывает соединение.
Пример реализации класса для чтения данных с табличным параметром одной выборкой
Ниже показан пример реализации класса HTTP-клиента для чтения данных с табличным параметром одной выборкой в JSON-формате. Код написан на Java с использованием Vertx.
public class QueryUploadSingleResultExample {
// SELECT-запрос соединяет логическую таблицу с табличным параметром @store_and_product_filter
private static final String SQL =
"SELECT s.* FROM marketing.sales s " +
"JOIN @store_and_product_filter " +
" ON s.store_id = @store_and_product_filter.store_id " +
" AND s.product_code = @store_and_product_filter.product_code " +
"DATASOURCE_TYPE = 'adp'";
// Avro-схема табличного параметра.
// имена полей должны совпадать с именами столбцов в columns[].name в JSON-части,
// логические типы отображаются в Avro как LONG -> long, STRING -> string и т. д.
private static final Schema TABLE_PARAM_SCHEMA = SchemaBuilder
.record("store_and_product_filter").namespace("query.upload.param").fields()
.name("store_id").type().longType().noDefault()
.name("product_code").type().stringType().noDefault()
.endRecord();
public static void main(String[] args) throws Exception {
Vertx vertx = Vertx.vertx();
WebClient client = WebClient.create(vertx,
new WebClientOptions().setDefaultHost("localhost").setDefaultPort(9090));
String queryId = UUID.randomUUID().toString();
// --- формирование JSON-части тела запроса ---
JsonObject tableParam = new JsonObject()
.put("name", "store_and_product_filter")
.put("columns", new JsonArray()
.add(new JsonObject().put("name", "store_id").put("type", "LONG"))
.add(new JsonObject().put("name", "product_code").put("type", "STRING")));
// параметр fetchSize не задается, поэтому вся выборка возвращается одним ответом
JsonObject json = new JsonObject()
.put("query", SQL)
.put("params", new JsonArray().add(tableParam))
.put("queryId", queryId)
.put("maxRowsToRead", 100)
.put("fetchTimeoutMs", 30000);
// кодировка UTF-8; длина в байтах задается в заголовке x-json-part-size
Buffer jsonPart = Buffer.buffer(json.encode().getBytes(StandardCharsets.UTF_8));
// --- формирование Avro-части тела запроса (Avro-файл с данными табличного параметра) ---
Buffer avroPart;
DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(TABLE_PARAM_SCHEMA);
try (DataFileWriter<GenericRecord> dfw = new DataFileWriter<>(datumWriter);
ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
dfw.create(TABLE_PARAM_SCHEMA, baos); // запись заголовка со схемой данных
for (Object[] row : new Object[][]{
{123L, "ABC0001"},
{123L, "ABC0002"},
{234L, "ABC0003"}}) {
GenericRecord rec = new GenericData.Record(TABLE_PARAM_SCHEMA);
rec.put("store_id", row[0]);
rec.put("product_code", row[1]);
dfw.append(rec);
}
dfw.flush();
avroPart = Buffer.buffer(baos.toByteArray());
}
// бинарное тело запроса: JSON-часть, затем Avro-часть
Buffer body = Buffer.buffer().appendBuffer(jsonPart).appendBuffer(avroPart);
client.post("/api/v1/datamarts/marketing/query-upload?format=json")
.putHeader("Content-Type", "application/octet-stream")
// граница между JSON- и Avro-частями — должна быть ТОЧНОЙ длиной JSON-части в байтах
.putHeader("x-json-part-size", String.valueOf(jsonPart.length()))
.putHeader("x-request-id", UUID.randomUUID().toString())
.sendBuffer(body)
.onSuccess(resp -> {
if (resp.statusCode() != 200) {
System.err.println("Ошибка запроса, HTTP " + resp.statusCode());
client.close(); vertx.close();
return;
}
// единый ответ: схема столбцов, полная выборка и итоговые значения вместе
JsonObject result = resp.bodyAsJsonObject();
System.out.println("Столбцы: " + result.getJsonArray("metadata"));
result.getJsonArray("result").forEach(row -> System.out.println(row));
System.out.printf("Готово: строк %s, статистика=%s%n",
result.getValue("rows"), result.getJsonObject("statistics"));
client.close(); vertx.close();
})
.onFailure(err -> { err.printStackTrace(); client.close(); vertx.close(); });
}
}
Пример класса для потокового чтения данных с табличным параметром
Ниже показан пример реализации класса HTTP-клиента для потокового чтения данных с табличным параметром в JSON-формате. Код написан на Java с использованием Vertx.
public class QueryUploadStreamingExample {
// SELECT-запрос соединяет логическую таблицу с табличным параметром @store_and_product_filter
private static final String SQL =
"SELECT s.* FROM marketing.sales s " +
"JOIN @store_and_product_filter " +
" ON s.store_id = @store_and_product_filter.store_id " +
" AND s.product_code = @store_and_product_filter.product_code " +
"DATASOURCE_TYPE = 'adp'";
// Avro-схема табличного параметра.
// имена полей должны совпадать с именами столбцов в columns[].name в JSON-части,
// логические типы отображаются в Avro как LONG -> long, STRING -> string и т. д.
private static final Schema TABLE_PARAM_SCHEMA = SchemaBuilder
.record("store_and_product_filter").namespace("query.upload.param").fields()
.name("store_id").type().longType().noDefault()
.name("product_code").type().stringType().noDefault()
.endRecord();
public static void main(String[] args) throws Exception {
Vertx vertx = Vertx.vertx();
// потоковое чтение возможно только по протоколу HTTP/2
HttpClient client = vertx.createHttpClient(new HttpClientOptions()
.setProtocolVersion(HttpVersion.HTTP_2)
.setHttp2ClearTextUpgrade(false) // h2c с прямым подключением (prior knowledge)
.setDefaultHost("localhost").setDefaultPort(9090));
String queryId = UUID.randomUUID().toString();
// --- формирование JSON-части тела запроса ---
JsonObject tableParam = new JsonObject()
.put("name", "store_and_product_filter")
.put("columns", new JsonArray()
.add(new JsonObject().put("name", "store_id").put("type", "LONG"))
.add(new JsonObject().put("name", "product_code").put("type", "STRING")));
JsonObject json = new JsonObject()
.put("query", SQL)
.put("params", new JsonArray().add(tableParam))
.put("queryId", queryId)
.put("maxRowsToRead", 100)
.put("fetchSize", 100) // fetchSize > 0 включает потоковое чтение
.put("fetchTimeoutMs", 30000);
// кодировка UTF-8; длина в байтах задается в заголовке x-json-part-size
Buffer jsonPart = Buffer.buffer(json.encode().getBytes(StandardCharsets.UTF_8));
// --- формирование Avro-части тела запроса (Avro-файл с данными табличного параметра) ---
Buffer avroPart;
DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(TABLE_PARAM_SCHEMA);
try (DataFileWriter<GenericRecord> dfw = new DataFileWriter<>(datumWriter);
ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
dfw.create(TABLE_PARAM_SCHEMA, baos); // запись заголовка со схемой данных
for (Object[] row : new Object[][]{
{123L, "ABC0001"},
{123L, "ABC0002"},
{234L, "ABC0003"}}) {
GenericRecord rec = new GenericData.Record(TABLE_PARAM_SCHEMA);
rec.put("store_id", row[0]);
rec.put("product_code", row[1]);
dfw.append(rec);
}
dfw.flush();
avroPart = Buffer.buffer(baos.toByteArray());
}
// бинарное тело запроса: JSON-часть, затем Avro-часть
Buffer body = Buffer.buffer().appendBuffer(jsonPart).appendBuffer(avroPart);
client.request(HttpMethod.POST, "/api/v1/datamarts/marketing/query-upload?format=json")
.compose(req -> {
req.putHeader("Content-Type", "application/octet-stream");
// граница между JSON- и Avro-частями — должна быть ТОЧНОЙ длиной JSON-части в байтах
req.putHeader("x-json-part-size", String.valueOf(jsonPart.length()));
req.putHeader("x-request-id", UUID.randomUUID().toString());
return req.send(body);
})
.onSuccess(resp -> {
if (resp.statusCode() != 200) {
System.err.println("Ошибка запроса, HTTP " + resp.statusCode());
resp.body().onComplete(b -> client.close());
return;
}
// чтение ответа порциями по мере поступления (с учетом back-pressure).
// поток ответов разделен переводами строк (один объект-ответ на строку).
StringBuilder pending = new StringBuilder();
resp.handler(chunk -> {
pending.append(chunk.toString());
int nl;
while ((nl = pending.indexOf("\n")) >= 0) {
String line = pending.substring(0, nl).trim();
pending.delete(0, nl + 1);
if (!line.isEmpty()) handleResponseObject(new JsonObject(line));
}
});
resp.endHandler(v -> {
String tail = pending.toString().trim();
if (!tail.isEmpty()) handleResponseObject(new JsonObject(tail));
client.close();
});
})
.onFailure(err -> { err.printStackTrace(); client.close(); });
}
private static void handleResponseObject(JsonObject resp) {
// первый ответ: только схема столбцов
if (resp.getJsonArray("metadata") != null) {
System.out.println("Столбцы: " + resp.getJsonArray("metadata"));
return;
}
// промежуточные и последний ответы: порция строк
if (resp.getJsonArray("result") != null) {
resp.getJsonArray("result").forEach(row -> System.out.println(row));
}
// последний ответ также содержит итоговые значения
if (resp.getValue("rows") != null) {
System.out.printf("Готово: строк %s, статистика=%s%n",
resp.getValue("rows"), resp.getJsonObject("statistics"));
}
}
}