feat: use straming api for duckdb driver's readQuery

This commit is contained in:
Nybkox
2025-04-29 11:11:21 +02:00
parent fdb14cd49b
commit ecaafaca69

View File

@@ -24,6 +24,20 @@ function getDuckDb() {
return duckDb;
}
function getReturningStatementTypes() {
const duckdb = getDuckDb();
const returningStatementTypes = [
duckdb.StatementType.SELECT,
duckdb.StatementType.EXPLAIN,
duckdb.StatementType.EXECUTE,
duckdb.StatementType.RELATION,
duckdb.StatementType.LOGICAL_PLAN,
];
return returningStatementTypes;
}
/** @type {import('dbgate-types').EngineDriver<import('@duckdb/node-api').DuckDBConnection>} */
const driver = {
...driverBase,
@@ -60,19 +74,12 @@ const driver = {
try {
const statements = await dbhan.client.extractStatements(sql);
const returningStatementTypes = getReturningStatementTypes();
const count = statements.count;
for (let i = 0; i < count; i++) {
const stmt = await statements.prepare(i);
const returningStatementTypes = [
duckdb.StatementType.SELECT,
duckdb.StatementType.EXPLAIN,
duckdb.StatementType.EXECUTE,
duckdb.StatementType.RELATION,
duckdb.StatementType.LOGICAL_PLAN,
];
const result = await stmt.stream();
let hasSentColumns = false;
@@ -149,33 +156,71 @@ const driver = {
highWaterMark: 100,
});
const res = await dbhan.client.runAndReadAll(sql);
const rowsObjects = res.getRowObjects();
try {
const statements = await dbhan.client.extractStatements(sql);
const returningStatementTypes = getReturningStatementTypes();
const count = statements.count;
const columnNames = res.columnNames();
const columnTypes = res.columnTypes();
for (let i = 0; i < count; i++) {
const stmt = await statements.prepare(i);
const columns = getColumnsInfo(columnNames, columnTypes).map(normalizeRow);
if (!returningStatementTypes.includes(stmt.statementType)) {
continue;
}
const rows = rowsObjects.map(normalizeRow);
const result = await stmt.stream();
let hasSentHeader = false;
pass.write({
__isStreamHeader: true,
...(structure || {
columns: columns.map((col) => ({
columnName: col.name,
dataType: col.type,
})),
}),
});
while (true) {
const chunk = await result.fetchChunk();
if (!chunk || chunk.rowCount === 0) {
break;
}
for (const row of rows) {
pass.write(row);
if (!hasSentHeader) {
const columnNames = result.columnNames();
const columnTypes = result.columnTypes();
const columns = getColumnsInfo(columnNames, columnTypes);
pass.write({
__isStreamHeader: true,
...(structure || {
columns: columns.map((col) => ({
columnName: col.columnName,
dataType: col.dataType,
})),
}),
});
hasSentHeader = true;
}
const rows = chunk.getRows();
const columnNames = result.columnNames();
for (const row of rows) {
const zipped = zipObject(columnNames, row);
pass.write(normalizeRow(zipped));
}
}
}
pass.end();
return pass;
} catch (error) {
logger.error(extractErrorLogData(error), 'ReadQuery error');
const { message, procName } = error;
pass.write({
__isStreamInfo: true,
info: {
message,
line: 0,
procedure: procName,
time: new Date(),
severity: 'error',
},
});
pass.end();
return pass;
}
pass.end();
return pass;
},
async writeTable(dbhan, name, options) {
return createBulkInsertStreamBase(this, stream, dbhan, name, options);