diff --git a/plugins/dbgate-plugin-duckdb/src/backend/driver.js b/plugins/dbgate-plugin-duckdb/src/backend/driver.js index 5e8626eb8..d78a4a0a7 100644 --- a/plugins/dbgate-plugin-duckdb/src/backend/driver.js +++ b/plugins/dbgate-plugin-duckdb/src/backend/driver.js @@ -24,6 +24,20 @@ function getDuckDb() { return duckDb; } +function getReturningStatementTypes() { + const duckdb = getDuckDb(); + + const returningStatementTypes = [ + duckdb.StatementType.SELECT, + duckdb.StatementType.EXPLAIN, + duckdb.StatementType.EXECUTE, + duckdb.StatementType.RELATION, + duckdb.StatementType.LOGICAL_PLAN, + ]; + + return returningStatementTypes; +} + /** @type {import('dbgate-types').EngineDriver} */ const driver = { ...driverBase, @@ -60,19 +74,12 @@ const driver = { try { const statements = await dbhan.client.extractStatements(sql); + const returningStatementTypes = getReturningStatementTypes(); const count = statements.count; for (let i = 0; i < count; i++) { const stmt = await statements.prepare(i); - const returningStatementTypes = [ - duckdb.StatementType.SELECT, - duckdb.StatementType.EXPLAIN, - duckdb.StatementType.EXECUTE, - duckdb.StatementType.RELATION, - duckdb.StatementType.LOGICAL_PLAN, - ]; - const result = await stmt.stream(); let hasSentColumns = false; @@ -149,33 +156,71 @@ const driver = { highWaterMark: 100, }); - const res = await dbhan.client.runAndReadAll(sql); - const rowsObjects = res.getRowObjects(); + try { + const statements = await dbhan.client.extractStatements(sql); + const returningStatementTypes = getReturningStatementTypes(); + const count = statements.count; - const columnNames = res.columnNames(); - const columnTypes = res.columnTypes(); + for (let i = 0; i < count; i++) { + const stmt = await statements.prepare(i); - const columns = getColumnsInfo(columnNames, columnTypes).map(normalizeRow); + if (!returningStatementTypes.includes(stmt.statementType)) { + continue; + } - const rows = rowsObjects.map(normalizeRow); + const result = await stmt.stream(); + let hasSentHeader = false; - pass.write({ - __isStreamHeader: true, - ...(structure || { - columns: columns.map((col) => ({ - columnName: col.name, - dataType: col.type, - })), - }), - }); + while (true) { + const chunk = await result.fetchChunk(); + if (!chunk || chunk.rowCount === 0) { + break; + } - for (const row of rows) { - pass.write(row); + if (!hasSentHeader) { + const columnNames = result.columnNames(); + const columnTypes = result.columnTypes(); + const columns = getColumnsInfo(columnNames, columnTypes); + + pass.write({ + __isStreamHeader: true, + ...(structure || { + columns: columns.map((col) => ({ + columnName: col.columnName, + dataType: col.dataType, + })), + }), + }); + hasSentHeader = true; + } + + const rows = chunk.getRows(); + const columnNames = result.columnNames(); + for (const row of rows) { + const zipped = zipObject(columnNames, row); + pass.write(normalizeRow(zipped)); + } + } + } + + pass.end(); + return pass; + } catch (error) { + logger.error(extractErrorLogData(error), 'ReadQuery error'); + const { message, procName } = error; + pass.write({ + __isStreamInfo: true, + info: { + message, + line: 0, + procedure: procName, + time: new Date(), + severity: 'error', + }, + }); + pass.end(); + return pass; } - - pass.end(); - - return pass; }, async writeTable(dbhan, name, options) { return createBulkInsertStreamBase(this, stream, dbhan, name, options);