Enhance binary data handling by integrating modifyRow function in SQLite driver and helpers

This commit is contained in:
Stela Augustinova
2025-11-06 14:14:24 +01:00
parent 98f2b5dd08
commit dfeb910ac9
3 changed files with 26 additions and 11 deletions

View File

@@ -4,7 +4,7 @@ const stream = require('stream');
const driverBases = require('../frontend/drivers'); const driverBases = require('../frontend/drivers');
const Analyser = require('./Analyser'); const Analyser = require('./Analyser');
const { splitQuery, sqliteSplitterOptions } = require('dbgate-query-splitter'); const { splitQuery, sqliteSplitterOptions } = require('dbgate-query-splitter');
const { runStreamItem, waitForDrain } = require('./helpers'); const { runStreamItem, waitForDrain, modifyRow } = require('./helpers');
const { getLogger, createBulkInsertStreamBase, extractErrorLogData } = global.DBGATE_PACKAGES['dbgate-tools']; const { getLogger, createBulkInsertStreamBase, extractErrorLogData } = global.DBGATE_PACKAGES['dbgate-tools'];
const logger = getLogger('sqliteDriver'); const logger = getLogger('sqliteDriver');
@@ -51,7 +51,7 @@ const libsqlDriver = {
const columns = stmtColumns.length > 0 ? stmtColumns : extractColumns(rows[0]); const columns = stmtColumns.length > 0 ? stmtColumns : extractColumns(rows[0]);
return { return {
rows, rows: rows.map((row) => modifyRow(row, columns)),
columns: columns.map((col) => ({ columns: columns.map((col) => ({
columnName: col.name, columnName: col.name,
dataType: col.type, dataType: col.type,
@@ -66,7 +66,7 @@ const libsqlDriver = {
console.log('#stream', sql); console.log('#stream', sql);
const inTransaction = dbhan.client.transaction(() => { const inTransaction = dbhan.client.transaction(() => {
for (const sqlItem of sqlSplitted) { for (const sqlItem of sqlSplitted) {
runStreamItem(dbhan, sqlItem, options, rowCounter); runStreamItem(dbhan, sqlItem, options, rowCounter, driverBases[1].engine);
} }
if (rowCounter.date) { if (rowCounter.date) {
@@ -114,9 +114,10 @@ const libsqlDriver = {
async readQueryTask(stmt, pass) { async readQueryTask(stmt, pass) {
// let sent = 0; // let sent = 0;
const columns = stmt.columns();
for (const row of stmt.iterate()) { for (const row of stmt.iterate()) {
// sent++; // sent++;
if (!pass.write(row)) { if (!pass.write(modifyRow(row, columns))) {
// console.log('WAIT DRAIN', sent); // console.log('WAIT DRAIN', sent);
await waitForDrain(pass); await waitForDrain(pass);
} }
@@ -134,6 +135,7 @@ const libsqlDriver = {
pass.write({ pass.write({
__isStreamHeader: true, __isStreamHeader: true,
engine: driverBases[1].engine,
...(structure || { ...(structure || {
columns: columns.map((col) => ({ columns: columns.map((col) => ({
columnName: col.name, columnName: col.name,

View File

@@ -5,7 +5,7 @@ const Analyser = require('./Analyser');
const driverBases = require('../frontend/drivers'); const driverBases = require('../frontend/drivers');
const { splitQuery, sqliteSplitterOptions } = require('dbgate-query-splitter'); const { splitQuery, sqliteSplitterOptions } = require('dbgate-query-splitter');
const { getLogger, createBulkInsertStreamBase, extractErrorLogData } = global.DBGATE_PACKAGES['dbgate-tools']; const { getLogger, createBulkInsertStreamBase, extractErrorLogData } = global.DBGATE_PACKAGES['dbgate-tools'];
const { runStreamItem, waitForDrain } = require('./helpers'); const { runStreamItem, waitForDrain, modifyRow } = require('./helpers');
const logger = getLogger('sqliteDriver'); const logger = getLogger('sqliteDriver');
@@ -40,7 +40,7 @@ const driver = {
const columns = stmt.columns(); const columns = stmt.columns();
const rows = stmt.all(); const rows = stmt.all();
return { return {
rows, rows: rows.map((row) => modifyRow(row, columns)),
columns: columns.map((col) => ({ columns: columns.map((col) => ({
columnName: col.name, columnName: col.name,
dataType: col.type, dataType: col.type,
@@ -61,7 +61,7 @@ const driver = {
const inTransaction = dbhan.client.transaction(() => { const inTransaction = dbhan.client.transaction(() => {
for (const sqlItem of sqlSplitted) { for (const sqlItem of sqlSplitted) {
runStreamItem(dbhan, sqlItem, options, rowCounter); runStreamItem(dbhan, sqlItem, options, rowCounter, driverBases[0].engine);
} }
if (rowCounter.date) { if (rowCounter.date) {
@@ -102,9 +102,10 @@ const driver = {
async readQueryTask(stmt, pass) { async readQueryTask(stmt, pass) {
// let sent = 0; // let sent = 0;
const columns = stmt.columns();
for (const row of stmt.iterate()) { for (const row of stmt.iterate()) {
// sent++; // sent++;
if (!pass.write(row)) { if (!pass.write(modifyRow(row, columns))) {
// console.log('WAIT DRAIN', sent); // console.log('WAIT DRAIN', sent);
await waitForDrain(pass); await waitForDrain(pass);
} }
@@ -122,6 +123,7 @@ const driver = {
pass.write({ pass.write({
__isStreamHeader: true, __isStreamHeader: true,
engine: driverBases[0].engine,
...(structure || { ...(structure || {
columns: columns.map((col) => ({ columns: columns.map((col) => ({
columnName: col.name, columnName: col.name,

View File

@@ -1,6 +1,6 @@
// @ts-check // @ts-check
function runStreamItem(dbhan, sql, options, rowCounter) { function runStreamItem(dbhan, sql, options, rowCounter, engine) {
const stmt = dbhan.client.prepare(sql); const stmt = dbhan.client.prepare(sql);
console.log(stmt); console.log(stmt);
console.log(stmt.reader); console.log(stmt.reader);
@@ -12,11 +12,12 @@ function runStreamItem(dbhan, sql, options, rowCounter) {
columns.map((col) => ({ columns.map((col) => ({
columnName: col.name, columnName: col.name,
dataType: col.type, dataType: col.type,
})) })),
{ engine }
); );
for (const row of stmt.iterate()) { for (const row of stmt.iterate()) {
options.row(row); options.row(modifyRow(row, columns));
} }
} else { } else {
const info = stmt.run(); const info = stmt.run();
@@ -43,7 +44,17 @@ async function waitForDrain(stream) {
}); });
} }
function modifyRow(row, columns) {
columns.forEach((col) => {
if (row[col.name] instanceof Uint8Array) {
row[col.name] = { $binary: { base64: row[col.name].toString('base64') } };
}
});
return row;
}
module.exports = { module.exports = {
runStreamItem, runStreamItem,
waitForDrain, waitForDrain,
modifyRow,
}; };