From e09294d9aabe3dab6b9f401dd1b9d3cb6387c815 Mon Sep 17 00:00:00 2001 From: Nybkox Date: Tue, 29 Apr 2025 00:15:50 +0200 Subject: [PATCH 1/6] feat: add normalization to duck db types --- .../src/backend/helpers.js | 72 +++++++++++++++++++ 1 file changed, 72 insertions(+) diff --git a/plugins/dbgate-plugin-duckdb/src/backend/helpers.js b/plugins/dbgate-plugin-duckdb/src/backend/helpers.js index 3364cfd93..f28d46cd1 100644 --- a/plugins/dbgate-plugin-duckdb/src/backend/helpers.js +++ b/plugins/dbgate-plugin-duckdb/src/backend/helpers.js @@ -1,3 +1,13 @@ +const { + DuckDBTimestampValue, + DuckDBDecimalValue, + DuckDBDateValue, + DuckDBTimeValue, + DuckDBIntervalValue, + DuckDBBlobValue, + DuckDBBitValue, + DuckDBUUIDValue, +} = require('@duckdb/node-api'); /** * @param {string[} columnNames * @param {import('@duckdb/node-api').DuckDBType[]} columnTypes @@ -24,6 +34,68 @@ function _normalizeValue(value) { return parseInt(value); } + if (value instanceof DuckDBTimestampValue) { + const date = new Date(Number(value.micros / 1000n)); + return date.toISOString(); + } + + if (value instanceof DuckDBDecimalValue) { + return value.toDouble(); + } + + if (value instanceof DuckDBTimestampValue) { + const date = new Date(Number(value.micros / 1000n)); + return date.toISOString(); + } + + if (value instanceof DuckDBDateValue) { + const year = value.year; + const month = String(value.month).padStart(2, '0'); + const day = String(value.day).padStart(2, '0'); + return `${year}-${month}-${day}`; + } + + if (value instanceof DuckDBTimeValue) { + const hour = String(value.hour).padStart(2, '0'); + const minute = String(value.min).padStart(2, '0'); + const second = String(value.sec).padStart(2, '0'); + const micros = String(value.micros).padStart(6, '0').substring(0, 3); + return `${hour}:${minute}:${second}.${micros}`; + } + + if (value instanceof DuckDBBlobValue) { + return value.toString(); + } + + if (value instanceof DuckDBBitValue) { + return value.toString(); + } + + if (value instanceof DuckDBUUIDValue) { + return value.toString(); + } + + if (value instanceof DuckDBIntervalValue) { + let result = ''; + if (value.months !== 0) { + const years = Math.floor(value.months / 12); + const remainingMonths = value.months % 12; + if (years !== 0) result += `${years}y `; + if (remainingMonths !== 0) result += `${remainingMonths}m `; + } + if (value.days !== 0) { + result += `${value.days}d `; + } + if (value.micros !== 0n) { + const microseconds = Number(value.micros); + const seconds = Math.floor(microseconds / 1000000); + const remainingMicros = microseconds % 1000000; + if (seconds !== 0) result += `${seconds}s `; + if (remainingMicros !== 0) result += `${remainingMicros}μs `; + } + return result.trim() || '0'; + } + if (Array.isArray(value)) { return value.map((item) => _normalizeValue(item)); } From f67221ee01b519ad415060c71810485fff097014 Mon Sep 17 00:00:00 2001 From: Nybkox Date: Tue, 29 Apr 2025 01:01:57 +0200 Subject: [PATCH 2/6] feat: use streaming api for duckdb driver's stream method --- .../src/backend/driver.js | 43 +++++++++++-------- 1 file changed, 24 insertions(+), 19 deletions(-) diff --git a/plugins/dbgate-plugin-duckdb/src/backend/driver.js b/plugins/dbgate-plugin-duckdb/src/backend/driver.js index ede90e4cd..7be4ebfda 100644 --- a/plugins/dbgate-plugin-duckdb/src/backend/driver.js +++ b/plugins/dbgate-plugin-duckdb/src/backend/driver.js @@ -7,6 +7,7 @@ const { getLogger, extractErrorLogData, createBulkInsertStreamBase } = require(' const { getColumnsInfo, normalizeRow } = require('./helpers'); const sql = require('./sql'); const { mapSchemaRowToSchemaInfo } = require('./Analyser.helpers'); +const { zipObject } = require('lodash'); const logger = getLogger('sqliteDriver'); @@ -62,11 +63,9 @@ const driver = { const count = statements.count; for (let i = 0; i < count; i++) { - let hasSentColumns = false; const stmt = await statements.prepare(i); - const res = await stmt.runAndReadAll(); - const returningStatemetes = [ + const returningStatementTypes = [ duckdb.StatementType.SELECT, duckdb.StatementType.EXPLAIN, duckdb.StatementType.EXECUTE, @@ -74,29 +73,35 @@ const driver = { duckdb.StatementType.LOGICAL_PLAN, ]; - if (!returningStatemetes.includes(stmt.statementType)) { + if (!returningStatementTypes.includes(stmt.statementType)) { continue; } - // options.info({ - // message: JSON.stringify(res), - // time: new Date(), - // severity: 'info', - // }); + const result = await stmt.stream(); + let hasSentColumns = false; - if (!hasSentColumns) { - const columnNames = res.columnNames(); - const columnTypes = res.columnTypes(); - const columns = getColumnsInfo(columnNames, columnTypes); + while (true) { + const chunk = await result.fetchChunk(); - options.recordset(columns); - hasSentColumns = true; - } + if (!chunk || chunk.rowCount === 0) { + break; + } - const rows = res.getRowObjects(); + if (!hasSentColumns) { + const columnNames = result.columnNames(); + const columnTypes = result.columnTypes(); + const columns = getColumnsInfo(columnNames, columnTypes); + options.recordset(columns); + hasSentColumns = true; + } - for (const row of rows) { - options.row(normalizeRow(row)); + const rows = chunk.getRows(); + const columnNames = result.columnNames(); + + for (const row of rows) { + const zipped = zipObject(columnNames, row); + options.row(normalizeRow(zipped)); + } } } From 9390ab3c6c40d0a1cb3a14fc7e75baa75d2f2c10 Mon Sep 17 00:00:00 2001 From: Nybkox Date: Tue, 29 Apr 2025 01:32:55 +0200 Subject: [PATCH 3/6] fix: do not skip non-returnin statements --- plugins/dbgate-plugin-duckdb/src/backend/driver.js | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/plugins/dbgate-plugin-duckdb/src/backend/driver.js b/plugins/dbgate-plugin-duckdb/src/backend/driver.js index 7be4ebfda..5e8626eb8 100644 --- a/plugins/dbgate-plugin-duckdb/src/backend/driver.js +++ b/plugins/dbgate-plugin-duckdb/src/backend/driver.js @@ -73,16 +73,16 @@ const driver = { duckdb.StatementType.LOGICAL_PLAN, ]; - if (!returningStatementTypes.includes(stmt.statementType)) { - continue; - } - const result = await stmt.stream(); let hasSentColumns = false; while (true) { const chunk = await result.fetchChunk(); + if (!returningStatementTypes.includes(stmt.statementType)) { + break; + } + if (!chunk || chunk.rowCount === 0) { break; } From 070e955b89a930301ac17d4dc82705e21125a76c Mon Sep 17 00:00:00 2001 From: Nybkox Date: Tue, 29 Apr 2025 01:57:42 +0200 Subject: [PATCH 4/6] fix: remove duplicated typecheck --- plugins/dbgate-plugin-duckdb/src/backend/helpers.js | 5 ----- 1 file changed, 5 deletions(-) diff --git a/plugins/dbgate-plugin-duckdb/src/backend/helpers.js b/plugins/dbgate-plugin-duckdb/src/backend/helpers.js index f28d46cd1..0d3a26c95 100644 --- a/plugins/dbgate-plugin-duckdb/src/backend/helpers.js +++ b/plugins/dbgate-plugin-duckdb/src/backend/helpers.js @@ -43,11 +43,6 @@ function _normalizeValue(value) { return value.toDouble(); } - if (value instanceof DuckDBTimestampValue) { - const date = new Date(Number(value.micros / 1000n)); - return date.toISOString(); - } - if (value instanceof DuckDBDateValue) { const year = value.year; const month = String(value.month).padStart(2, '0'); From fdb14cd49bf4caa43259a4b710d91cefd90dd7b8 Mon Sep 17 00:00:00 2001 From: Nybkox Date: Tue, 29 Apr 2025 01:58:25 +0200 Subject: [PATCH 5/6] fix: typo in jsdoc types --- plugins/dbgate-plugin-duckdb/src/backend/helpers.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/dbgate-plugin-duckdb/src/backend/helpers.js b/plugins/dbgate-plugin-duckdb/src/backend/helpers.js index 0d3a26c95..5e5b85047 100644 --- a/plugins/dbgate-plugin-duckdb/src/backend/helpers.js +++ b/plugins/dbgate-plugin-duckdb/src/backend/helpers.js @@ -9,7 +9,7 @@ const { DuckDBUUIDValue, } = require('@duckdb/node-api'); /** - * @param {string[} columnNames + * @param {string[]} columnNames * @param {import('@duckdb/node-api').DuckDBType[]} columnTypes */ function getColumnsInfo(columnNames, columnTypes) { From ecaafaca695161ddce70f6adf8eb7a9c3df51f31 Mon Sep 17 00:00:00 2001 From: Nybkox Date: Tue, 29 Apr 2025 11:11:21 +0200 Subject: [PATCH 6/6] feat: use straming api for duckdb driver's readQuery --- .../src/backend/driver.js | 103 +++++++++++++----- 1 file changed, 74 insertions(+), 29 deletions(-) diff --git a/plugins/dbgate-plugin-duckdb/src/backend/driver.js b/plugins/dbgate-plugin-duckdb/src/backend/driver.js index 5e8626eb8..d78a4a0a7 100644 --- a/plugins/dbgate-plugin-duckdb/src/backend/driver.js +++ b/plugins/dbgate-plugin-duckdb/src/backend/driver.js @@ -24,6 +24,20 @@ function getDuckDb() { return duckDb; } +function getReturningStatementTypes() { + const duckdb = getDuckDb(); + + const returningStatementTypes = [ + duckdb.StatementType.SELECT, + duckdb.StatementType.EXPLAIN, + duckdb.StatementType.EXECUTE, + duckdb.StatementType.RELATION, + duckdb.StatementType.LOGICAL_PLAN, + ]; + + return returningStatementTypes; +} + /** @type {import('dbgate-types').EngineDriver} */ const driver = { ...driverBase, @@ -60,19 +74,12 @@ const driver = { try { const statements = await dbhan.client.extractStatements(sql); + const returningStatementTypes = getReturningStatementTypes(); const count = statements.count; for (let i = 0; i < count; i++) { const stmt = await statements.prepare(i); - const returningStatementTypes = [ - duckdb.StatementType.SELECT, - duckdb.StatementType.EXPLAIN, - duckdb.StatementType.EXECUTE, - duckdb.StatementType.RELATION, - duckdb.StatementType.LOGICAL_PLAN, - ]; - const result = await stmt.stream(); let hasSentColumns = false; @@ -149,33 +156,71 @@ const driver = { highWaterMark: 100, }); - const res = await dbhan.client.runAndReadAll(sql); - const rowsObjects = res.getRowObjects(); + try { + const statements = await dbhan.client.extractStatements(sql); + const returningStatementTypes = getReturningStatementTypes(); + const count = statements.count; - const columnNames = res.columnNames(); - const columnTypes = res.columnTypes(); + for (let i = 0; i < count; i++) { + const stmt = await statements.prepare(i); - const columns = getColumnsInfo(columnNames, columnTypes).map(normalizeRow); + if (!returningStatementTypes.includes(stmt.statementType)) { + continue; + } - const rows = rowsObjects.map(normalizeRow); + const result = await stmt.stream(); + let hasSentHeader = false; - pass.write({ - __isStreamHeader: true, - ...(structure || { - columns: columns.map((col) => ({ - columnName: col.name, - dataType: col.type, - })), - }), - }); + while (true) { + const chunk = await result.fetchChunk(); + if (!chunk || chunk.rowCount === 0) { + break; + } - for (const row of rows) { - pass.write(row); + if (!hasSentHeader) { + const columnNames = result.columnNames(); + const columnTypes = result.columnTypes(); + const columns = getColumnsInfo(columnNames, columnTypes); + + pass.write({ + __isStreamHeader: true, + ...(structure || { + columns: columns.map((col) => ({ + columnName: col.columnName, + dataType: col.dataType, + })), + }), + }); + hasSentHeader = true; + } + + const rows = chunk.getRows(); + const columnNames = result.columnNames(); + for (const row of rows) { + const zipped = zipObject(columnNames, row); + pass.write(normalizeRow(zipped)); + } + } + } + + pass.end(); + return pass; + } catch (error) { + logger.error(extractErrorLogData(error), 'ReadQuery error'); + const { message, procName } = error; + pass.write({ + __isStreamInfo: true, + info: { + message, + line: 0, + procedure: procName, + time: new Date(), + severity: 'error', + }, + }); + pass.end(); + return pass; } - - pass.end(); - - return pass; }, async writeTable(dbhan, name, options) { return createBulkInsertStreamBase(this, stream, dbhan, name, options);