diff --git a/plugins/dbgate-plugin-duckdb/src/backend/driver.js b/plugins/dbgate-plugin-duckdb/src/backend/driver.js index ede90e4cd..d78a4a0a7 100644 --- a/plugins/dbgate-plugin-duckdb/src/backend/driver.js +++ b/plugins/dbgate-plugin-duckdb/src/backend/driver.js @@ -7,6 +7,7 @@ const { getLogger, extractErrorLogData, createBulkInsertStreamBase } = require(' const { getColumnsInfo, normalizeRow } = require('./helpers'); const sql = require('./sql'); const { mapSchemaRowToSchemaInfo } = require('./Analyser.helpers'); +const { zipObject } = require('lodash'); const logger = getLogger('sqliteDriver'); @@ -23,6 +24,20 @@ function getDuckDb() { return duckDb; } +function getReturningStatementTypes() { + const duckdb = getDuckDb(); + + const returningStatementTypes = [ + duckdb.StatementType.SELECT, + duckdb.StatementType.EXPLAIN, + duckdb.StatementType.EXECUTE, + duckdb.StatementType.RELATION, + duckdb.StatementType.LOGICAL_PLAN, + ]; + + return returningStatementTypes; +} + /** @type {import('dbgate-types').EngineDriver} */ const driver = { ...driverBase, @@ -59,44 +74,41 @@ const driver = { try { const statements = await dbhan.client.extractStatements(sql); + const returningStatementTypes = getReturningStatementTypes(); const count = statements.count; for (let i = 0; i < count; i++) { - let hasSentColumns = false; const stmt = await statements.prepare(i); - const res = await stmt.runAndReadAll(); - const returningStatemetes = [ - duckdb.StatementType.SELECT, - duckdb.StatementType.EXPLAIN, - duckdb.StatementType.EXECUTE, - duckdb.StatementType.RELATION, - duckdb.StatementType.LOGICAL_PLAN, - ]; + const result = await stmt.stream(); + let hasSentColumns = false; - if (!returningStatemetes.includes(stmt.statementType)) { - continue; - } + while (true) { + const chunk = await result.fetchChunk(); - // options.info({ - // message: JSON.stringify(res), - // time: new Date(), - // severity: 'info', - // }); + if (!returningStatementTypes.includes(stmt.statementType)) { + break; + } - if (!hasSentColumns) { - const columnNames = res.columnNames(); - const columnTypes = res.columnTypes(); - const columns = getColumnsInfo(columnNames, columnTypes); + if (!chunk || chunk.rowCount === 0) { + break; + } - options.recordset(columns); - hasSentColumns = true; - } + if (!hasSentColumns) { + const columnNames = result.columnNames(); + const columnTypes = result.columnTypes(); + const columns = getColumnsInfo(columnNames, columnTypes); + options.recordset(columns); + hasSentColumns = true; + } - const rows = res.getRowObjects(); + const rows = chunk.getRows(); + const columnNames = result.columnNames(); - for (const row of rows) { - options.row(normalizeRow(row)); + for (const row of rows) { + const zipped = zipObject(columnNames, row); + options.row(normalizeRow(zipped)); + } } } @@ -144,33 +156,71 @@ const driver = { highWaterMark: 100, }); - const res = await dbhan.client.runAndReadAll(sql); - const rowsObjects = res.getRowObjects(); + try { + const statements = await dbhan.client.extractStatements(sql); + const returningStatementTypes = getReturningStatementTypes(); + const count = statements.count; - const columnNames = res.columnNames(); - const columnTypes = res.columnTypes(); + for (let i = 0; i < count; i++) { + const stmt = await statements.prepare(i); - const columns = getColumnsInfo(columnNames, columnTypes).map(normalizeRow); + if (!returningStatementTypes.includes(stmt.statementType)) { + continue; + } - const rows = rowsObjects.map(normalizeRow); + const result = await stmt.stream(); + let hasSentHeader = false; - pass.write({ - __isStreamHeader: true, - ...(structure || { - columns: columns.map((col) => ({ - columnName: col.name, - dataType: col.type, - })), - }), - }); + while (true) { + const chunk = await result.fetchChunk(); + if (!chunk || chunk.rowCount === 0) { + break; + } - for (const row of rows) { - pass.write(row); + if (!hasSentHeader) { + const columnNames = result.columnNames(); + const columnTypes = result.columnTypes(); + const columns = getColumnsInfo(columnNames, columnTypes); + + pass.write({ + __isStreamHeader: true, + ...(structure || { + columns: columns.map((col) => ({ + columnName: col.columnName, + dataType: col.dataType, + })), + }), + }); + hasSentHeader = true; + } + + const rows = chunk.getRows(); + const columnNames = result.columnNames(); + for (const row of rows) { + const zipped = zipObject(columnNames, row); + pass.write(normalizeRow(zipped)); + } + } + } + + pass.end(); + return pass; + } catch (error) { + logger.error(extractErrorLogData(error), 'ReadQuery error'); + const { message, procName } = error; + pass.write({ + __isStreamInfo: true, + info: { + message, + line: 0, + procedure: procName, + time: new Date(), + severity: 'error', + }, + }); + pass.end(); + return pass; } - - pass.end(); - - return pass; }, async writeTable(dbhan, name, options) { return createBulkInsertStreamBase(this, stream, dbhan, name, options); diff --git a/plugins/dbgate-plugin-duckdb/src/backend/helpers.js b/plugins/dbgate-plugin-duckdb/src/backend/helpers.js index 3364cfd93..5e5b85047 100644 --- a/plugins/dbgate-plugin-duckdb/src/backend/helpers.js +++ b/plugins/dbgate-plugin-duckdb/src/backend/helpers.js @@ -1,5 +1,15 @@ +const { + DuckDBTimestampValue, + DuckDBDecimalValue, + DuckDBDateValue, + DuckDBTimeValue, + DuckDBIntervalValue, + DuckDBBlobValue, + DuckDBBitValue, + DuckDBUUIDValue, +} = require('@duckdb/node-api'); /** - * @param {string[} columnNames + * @param {string[]} columnNames * @param {import('@duckdb/node-api').DuckDBType[]} columnTypes */ function getColumnsInfo(columnNames, columnTypes) { @@ -24,6 +34,63 @@ function _normalizeValue(value) { return parseInt(value); } + if (value instanceof DuckDBTimestampValue) { + const date = new Date(Number(value.micros / 1000n)); + return date.toISOString(); + } + + if (value instanceof DuckDBDecimalValue) { + return value.toDouble(); + } + + if (value instanceof DuckDBDateValue) { + const year = value.year; + const month = String(value.month).padStart(2, '0'); + const day = String(value.day).padStart(2, '0'); + return `${year}-${month}-${day}`; + } + + if (value instanceof DuckDBTimeValue) { + const hour = String(value.hour).padStart(2, '0'); + const minute = String(value.min).padStart(2, '0'); + const second = String(value.sec).padStart(2, '0'); + const micros = String(value.micros).padStart(6, '0').substring(0, 3); + return `${hour}:${minute}:${second}.${micros}`; + } + + if (value instanceof DuckDBBlobValue) { + return value.toString(); + } + + if (value instanceof DuckDBBitValue) { + return value.toString(); + } + + if (value instanceof DuckDBUUIDValue) { + return value.toString(); + } + + if (value instanceof DuckDBIntervalValue) { + let result = ''; + if (value.months !== 0) { + const years = Math.floor(value.months / 12); + const remainingMonths = value.months % 12; + if (years !== 0) result += `${years}y `; + if (remainingMonths !== 0) result += `${remainingMonths}m `; + } + if (value.days !== 0) { + result += `${value.days}d `; + } + if (value.micros !== 0n) { + const microseconds = Number(value.micros); + const seconds = Math.floor(microseconds / 1000000); + const remainingMicros = microseconds % 1000000; + if (seconds !== 0) result += `${seconds}s `; + if (remainingMicros !== 0) result += `${remainingMicros}μs `; + } + return result.trim() || '0'; + } + if (Array.isArray(value)) { return value.map((item) => _normalizeValue(item)); }