Merge pull request #1101 from dbgate/feature/duckdb-2

Feature/duckdb 2
This commit is contained in:
Jan Prochazka
2025-04-29 12:57:48 +02:00
committed by GitHub
2 changed files with 166 additions and 49 deletions

View File

@@ -7,6 +7,7 @@ const { getLogger, extractErrorLogData, createBulkInsertStreamBase } = require('
const { getColumnsInfo, normalizeRow } = require('./helpers');
const sql = require('./sql');
const { mapSchemaRowToSchemaInfo } = require('./Analyser.helpers');
const { zipObject } = require('lodash');
const logger = getLogger('sqliteDriver');
@@ -23,6 +24,20 @@ function getDuckDb() {
return duckDb;
}
function getReturningStatementTypes() {
const duckdb = getDuckDb();
const returningStatementTypes = [
duckdb.StatementType.SELECT,
duckdb.StatementType.EXPLAIN,
duckdb.StatementType.EXECUTE,
duckdb.StatementType.RELATION,
duckdb.StatementType.LOGICAL_PLAN,
];
return returningStatementTypes;
}
/** @type {import('dbgate-types').EngineDriver<import('@duckdb/node-api').DuckDBConnection>} */
const driver = {
...driverBase,
@@ -59,44 +74,41 @@ const driver = {
try {
const statements = await dbhan.client.extractStatements(sql);
const returningStatementTypes = getReturningStatementTypes();
const count = statements.count;
for (let i = 0; i < count; i++) {
let hasSentColumns = false;
const stmt = await statements.prepare(i);
const res = await stmt.runAndReadAll();
const returningStatemetes = [
duckdb.StatementType.SELECT,
duckdb.StatementType.EXPLAIN,
duckdb.StatementType.EXECUTE,
duckdb.StatementType.RELATION,
duckdb.StatementType.LOGICAL_PLAN,
];
const result = await stmt.stream();
let hasSentColumns = false;
if (!returningStatemetes.includes(stmt.statementType)) {
continue;
}
while (true) {
const chunk = await result.fetchChunk();
// options.info({
// message: JSON.stringify(res),
// time: new Date(),
// severity: 'info',
// });
if (!returningStatementTypes.includes(stmt.statementType)) {
break;
}
if (!hasSentColumns) {
const columnNames = res.columnNames();
const columnTypes = res.columnTypes();
const columns = getColumnsInfo(columnNames, columnTypes);
if (!chunk || chunk.rowCount === 0) {
break;
}
options.recordset(columns);
hasSentColumns = true;
}
if (!hasSentColumns) {
const columnNames = result.columnNames();
const columnTypes = result.columnTypes();
const columns = getColumnsInfo(columnNames, columnTypes);
options.recordset(columns);
hasSentColumns = true;
}
const rows = res.getRowObjects();
const rows = chunk.getRows();
const columnNames = result.columnNames();
for (const row of rows) {
options.row(normalizeRow(row));
for (const row of rows) {
const zipped = zipObject(columnNames, row);
options.row(normalizeRow(zipped));
}
}
}
@@ -144,33 +156,71 @@ const driver = {
highWaterMark: 100,
});
const res = await dbhan.client.runAndReadAll(sql);
const rowsObjects = res.getRowObjects();
try {
const statements = await dbhan.client.extractStatements(sql);
const returningStatementTypes = getReturningStatementTypes();
const count = statements.count;
const columnNames = res.columnNames();
const columnTypes = res.columnTypes();
for (let i = 0; i < count; i++) {
const stmt = await statements.prepare(i);
const columns = getColumnsInfo(columnNames, columnTypes).map(normalizeRow);
if (!returningStatementTypes.includes(stmt.statementType)) {
continue;
}
const rows = rowsObjects.map(normalizeRow);
const result = await stmt.stream();
let hasSentHeader = false;
pass.write({
__isStreamHeader: true,
...(structure || {
columns: columns.map((col) => ({
columnName: col.name,
dataType: col.type,
})),
}),
});
while (true) {
const chunk = await result.fetchChunk();
if (!chunk || chunk.rowCount === 0) {
break;
}
for (const row of rows) {
pass.write(row);
if (!hasSentHeader) {
const columnNames = result.columnNames();
const columnTypes = result.columnTypes();
const columns = getColumnsInfo(columnNames, columnTypes);
pass.write({
__isStreamHeader: true,
...(structure || {
columns: columns.map((col) => ({
columnName: col.columnName,
dataType: col.dataType,
})),
}),
});
hasSentHeader = true;
}
const rows = chunk.getRows();
const columnNames = result.columnNames();
for (const row of rows) {
const zipped = zipObject(columnNames, row);
pass.write(normalizeRow(zipped));
}
}
}
pass.end();
return pass;
} catch (error) {
logger.error(extractErrorLogData(error), 'ReadQuery error');
const { message, procName } = error;
pass.write({
__isStreamInfo: true,
info: {
message,
line: 0,
procedure: procName,
time: new Date(),
severity: 'error',
},
});
pass.end();
return pass;
}
pass.end();
return pass;
},
async writeTable(dbhan, name, options) {
return createBulkInsertStreamBase(this, stream, dbhan, name, options);

View File

@@ -1,5 +1,15 @@
const {
DuckDBTimestampValue,
DuckDBDecimalValue,
DuckDBDateValue,
DuckDBTimeValue,
DuckDBIntervalValue,
DuckDBBlobValue,
DuckDBBitValue,
DuckDBUUIDValue,
} = require('@duckdb/node-api');
/**
* @param {string[} columnNames
* @param {string[]} columnNames
* @param {import('@duckdb/node-api').DuckDBType[]} columnTypes
*/
function getColumnsInfo(columnNames, columnTypes) {
@@ -24,6 +34,63 @@ function _normalizeValue(value) {
return parseInt(value);
}
if (value instanceof DuckDBTimestampValue) {
const date = new Date(Number(value.micros / 1000n));
return date.toISOString();
}
if (value instanceof DuckDBDecimalValue) {
return value.toDouble();
}
if (value instanceof DuckDBDateValue) {
const year = value.year;
const month = String(value.month).padStart(2, '0');
const day = String(value.day).padStart(2, '0');
return `${year}-${month}-${day}`;
}
if (value instanceof DuckDBTimeValue) {
const hour = String(value.hour).padStart(2, '0');
const minute = String(value.min).padStart(2, '0');
const second = String(value.sec).padStart(2, '0');
const micros = String(value.micros).padStart(6, '0').substring(0, 3);
return `${hour}:${minute}:${second}.${micros}`;
}
if (value instanceof DuckDBBlobValue) {
return value.toString();
}
if (value instanceof DuckDBBitValue) {
return value.toString();
}
if (value instanceof DuckDBUUIDValue) {
return value.toString();
}
if (value instanceof DuckDBIntervalValue) {
let result = '';
if (value.months !== 0) {
const years = Math.floor(value.months / 12);
const remainingMonths = value.months % 12;
if (years !== 0) result += `${years}y `;
if (remainingMonths !== 0) result += `${remainingMonths}m `;
}
if (value.days !== 0) {
result += `${value.days}d `;
}
if (value.micros !== 0n) {
const microseconds = Number(value.micros);
const seconds = Math.floor(microseconds / 1000000);
const remainingMicros = microseconds % 1000000;
if (seconds !== 0) result += `${seconds}s `;
if (remainingMicros !== 0) result += `${remainingMicros}μs `;
}
return result.trim() || '0';
}
if (Array.isArray(value)) {
return value.map((item) => _normalizeValue(item));
}