feat: use streaming api for duckdb driver's stream method

This commit is contained in:
Nybkox
2025-04-29 01:01:57 +02:00
parent e09294d9aa
commit f67221ee01

View File

@@ -7,6 +7,7 @@ const { getLogger, extractErrorLogData, createBulkInsertStreamBase } = require('
const { getColumnsInfo, normalizeRow } = require('./helpers'); const { getColumnsInfo, normalizeRow } = require('./helpers');
const sql = require('./sql'); const sql = require('./sql');
const { mapSchemaRowToSchemaInfo } = require('./Analyser.helpers'); const { mapSchemaRowToSchemaInfo } = require('./Analyser.helpers');
const { zipObject } = require('lodash');
const logger = getLogger('sqliteDriver'); const logger = getLogger('sqliteDriver');
@@ -62,11 +63,9 @@ const driver = {
const count = statements.count; const count = statements.count;
for (let i = 0; i < count; i++) { for (let i = 0; i < count; i++) {
let hasSentColumns = false;
const stmt = await statements.prepare(i); const stmt = await statements.prepare(i);
const res = await stmt.runAndReadAll();
const returningStatemetes = [ const returningStatementTypes = [
duckdb.StatementType.SELECT, duckdb.StatementType.SELECT,
duckdb.StatementType.EXPLAIN, duckdb.StatementType.EXPLAIN,
duckdb.StatementType.EXECUTE, duckdb.StatementType.EXECUTE,
@@ -74,29 +73,35 @@ const driver = {
duckdb.StatementType.LOGICAL_PLAN, duckdb.StatementType.LOGICAL_PLAN,
]; ];
if (!returningStatemetes.includes(stmt.statementType)) { if (!returningStatementTypes.includes(stmt.statementType)) {
continue; continue;
} }
// options.info({ const result = await stmt.stream();
// message: JSON.stringify(res), let hasSentColumns = false;
// time: new Date(),
// severity: 'info',
// });
if (!hasSentColumns) { while (true) {
const columnNames = res.columnNames(); const chunk = await result.fetchChunk();
const columnTypes = res.columnTypes();
const columns = getColumnsInfo(columnNames, columnTypes);
options.recordset(columns); if (!chunk || chunk.rowCount === 0) {
hasSentColumns = true; break;
} }
const rows = res.getRowObjects(); if (!hasSentColumns) {
const columnNames = result.columnNames();
const columnTypes = result.columnTypes();
const columns = getColumnsInfo(columnNames, columnTypes);
options.recordset(columns);
hasSentColumns = true;
}
for (const row of rows) { const rows = chunk.getRows();
options.row(normalizeRow(row)); const columnNames = result.columnNames();
for (const row of rows) {
const zipped = zipObject(columnNames, row);
options.row(normalizeRow(zipped));
}
} }
} }