mirror of
https://github.com/DeNNiiInc/dbgate.git
synced 2026-04-23 13:56:00 +00:00
Merge pull request #1258 from dbgate/feature/postgresql-export-bytea
Feature/postgresql export bytea
This commit is contained in:
@@ -49,6 +49,32 @@ class StreamHandler {
|
||||
}
|
||||
}
|
||||
|
||||
class BinaryTestStreamHandler {
|
||||
constructor(resolve, reject, expectedValue) {
|
||||
this.resolve = resolve;
|
||||
this.reject = reject;
|
||||
this.expectedValue = expectedValue;
|
||||
this.rowsReceived = [];
|
||||
}
|
||||
row(row) {
|
||||
try {
|
||||
this.rowsReceived.push(row);
|
||||
if (this.expectedValue) {
|
||||
expect(row).toEqual(this.expectedValue);
|
||||
}
|
||||
} catch (error) {
|
||||
this.reject(error);
|
||||
return;
|
||||
}
|
||||
}
|
||||
recordset(columns) {}
|
||||
done(result) {
|
||||
this.resolve(this.rowsReceived);
|
||||
}
|
||||
info(msg) {}
|
||||
}
|
||||
|
||||
|
||||
function executeStreamItem(driver, conn, sql) {
|
||||
return new Promise(resolve => {
|
||||
const handler = new StreamHandler(resolve);
|
||||
@@ -223,4 +249,51 @@ describe('Query', () => {
|
||||
expect(row[keys[0]] == 1).toBeTruthy();
|
||||
})
|
||||
);
|
||||
|
||||
test.each(engines.filter(x => x.binaryDataType).map(engine => [engine.label, engine]))(
|
||||
'Binary - %s',
|
||||
testWrapper(async (dbhan, driver, engine) => {
|
||||
await runCommandOnDriver(dbhan, driver, dmp =>
|
||||
dmp.createTable({
|
||||
pureName: 't1',
|
||||
columns: [
|
||||
{ columnName: 'id', dataType: 'int', notNull: true, autoIncrement: true },
|
||||
{ columnName: 'val', dataType: engine.binaryDataType },
|
||||
],
|
||||
primaryKey: {
|
||||
columns: [{ columnName: 'id' }],
|
||||
},
|
||||
})
|
||||
);
|
||||
const structure = await driver.analyseFull(dbhan);
|
||||
const table = structure.tables.find(x => x.pureName == 't1');
|
||||
|
||||
const dmp = driver.createDumper();
|
||||
dmp.putCmd("INSERT INTO ~t1 (~val) VALUES (%v)", {
|
||||
$binary: { base64: 'iVBORw0KWgo=' },
|
||||
});
|
||||
await driver.query(dbhan, dmp.s, {discardResult: true});
|
||||
|
||||
const dmp2 = driver.createDumper();
|
||||
dmp2.put('SELECT ~val FROM ~t1');
|
||||
const res = await driver.query(dbhan, dmp2.s);
|
||||
|
||||
const row = res.rows[0];
|
||||
const keys = Object.keys(row);
|
||||
expect(keys.length).toEqual(1);
|
||||
expect(row[keys[0]]).toEqual({$binary: {base64: 'iVBORw0KWgo='}});
|
||||
|
||||
const res2 = await driver.readQuery(dbhan, dmp2.s);
|
||||
const rows = await Array.fromAsync(res2);
|
||||
const rowsVal = rows.filter(r => r.val != null);
|
||||
|
||||
expect(rowsVal.length).toEqual(1);
|
||||
expect(rowsVal[0].val).toEqual({$binary: {base64: 'iVBORw0KWgo='}});
|
||||
|
||||
const res3 = await new Promise((resolve, reject) => {
|
||||
const handler = new BinaryTestStreamHandler(resolve, reject, {val: {$binary: {base64: 'iVBORw0KWgo='}}});
|
||||
driver.stream(dbhan, dmp2.s, handler);
|
||||
});
|
||||
})
|
||||
);
|
||||
});
|
||||
|
||||
@@ -44,6 +44,7 @@ const mysqlEngine = {
|
||||
supportRenameSqlObject: false,
|
||||
dbSnapshotBySeconds: true,
|
||||
dumpFile: 'data/chinook-mysql.sql',
|
||||
binaryDataType: 'blob',
|
||||
dumpChecks: [
|
||||
{
|
||||
sql: 'select count(*) as res from genre',
|
||||
@@ -216,6 +217,7 @@ const postgreSqlEngine = {
|
||||
supportSchemas: true,
|
||||
supportRenameSqlObject: true,
|
||||
defaultSchemaName: 'public',
|
||||
binaryDataType: 'bytea',
|
||||
dumpFile: 'data/chinook-postgre.sql',
|
||||
dumpChecks: [
|
||||
{
|
||||
@@ -446,6 +448,7 @@ const sqlServerEngine = {
|
||||
supportTableComments: true,
|
||||
supportColumnComments: true,
|
||||
// skipSeparateSchemas: true,
|
||||
binaryDataType: 'varbinary(100)',
|
||||
triggers: [
|
||||
{
|
||||
testName: 'triggers before each row',
|
||||
@@ -506,6 +509,7 @@ const sqliteEngine = {
|
||||
},
|
||||
},
|
||||
],
|
||||
binaryDataType: 'blob',
|
||||
};
|
||||
|
||||
const libsqlFileEngine = {
|
||||
@@ -619,6 +623,7 @@ const oracleEngine = {
|
||||
},
|
||||
},
|
||||
],
|
||||
binaryDataType: 'blob',
|
||||
};
|
||||
|
||||
/** @type {import('dbgate-types').TestEngineInfo} */
|
||||
@@ -752,18 +757,18 @@ const enginesOnCi = [
|
||||
const enginesOnLocal = [
|
||||
// all engines, which would be run on local test
|
||||
// cassandraEngine,
|
||||
// mysqlEngine,
|
||||
//mysqlEngine,
|
||||
// mariaDbEngine,
|
||||
// postgreSqlEngine,
|
||||
// sqlServerEngine,
|
||||
// sqliteEngine,
|
||||
//postgreSqlEngine,
|
||||
//sqlServerEngine,
|
||||
sqliteEngine,
|
||||
// cockroachDbEngine,
|
||||
// clickhouseEngine,
|
||||
// libsqlFileEngine,
|
||||
// libsqlWsEngine,
|
||||
// oracleEngine,
|
||||
//oracleEngine,
|
||||
// duckdbEngine,
|
||||
firebirdEngine,
|
||||
//firebirdEngine,
|
||||
];
|
||||
|
||||
/** @type {import('dbgate-types').TestEngineInfo[] & Record<string, import('dbgate-types').TestEngineInfo>} */
|
||||
|
||||
@@ -17,7 +17,7 @@ class QueryStreamTableWriter {
|
||||
this.started = new Date().getTime();
|
||||
}
|
||||
|
||||
initializeFromQuery(structure, resultIndex, chartDefinition, autoDetectCharts = false) {
|
||||
initializeFromQuery(structure, resultIndex, chartDefinition, autoDetectCharts = false, options = {}) {
|
||||
this.jslid = crypto.randomUUID();
|
||||
this.currentFile = path.join(jsldir(), `${this.jslid}.jsonl`);
|
||||
fs.writeFileSync(
|
||||
@@ -25,6 +25,7 @@ class QueryStreamTableWriter {
|
||||
JSON.stringify({
|
||||
...structure,
|
||||
__isStreamHeader: true,
|
||||
...options
|
||||
}) + '\n'
|
||||
);
|
||||
this.currentStream = fs.createWriteStream(this.currentFile, { flags: 'a' });
|
||||
@@ -179,7 +180,7 @@ class StreamHandler {
|
||||
process.send({ msgtype: 'changedCurrentDatabase', database, sesid: this.sesid });
|
||||
}
|
||||
|
||||
recordset(columns) {
|
||||
recordset(columns, options) {
|
||||
if (this.rowsLimitOverflow) {
|
||||
return;
|
||||
}
|
||||
@@ -189,7 +190,8 @@ class StreamHandler {
|
||||
Array.isArray(columns) ? { columns } : columns,
|
||||
this.queryStreamInfoHolder.resultIndex,
|
||||
this.frontMatter?.[`chart-${this.queryStreamInfoHolder.resultIndex + 1}`],
|
||||
this.autoDetectCharts
|
||||
this.autoDetectCharts,
|
||||
options
|
||||
);
|
||||
this.queryStreamInfoHolder.resultIndex += 1;
|
||||
this.rowCounter = 0;
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
import { arrayToHexString, evalFilterBehaviour, isTypeDateTime } from 'dbgate-tools';
|
||||
import { arrayToHexString, base64ToHex, evalFilterBehaviour, isTypeDateTime } from 'dbgate-tools';
|
||||
import { format, toDate } from 'date-fns';
|
||||
import _isString from 'lodash/isString';
|
||||
import _cloneDeepWith from 'lodash/cloneDeepWith';
|
||||
@@ -24,7 +24,9 @@ export function getFilterValueExpression(value, dataType?) {
|
||||
if (value.type == 'Buffer' && Array.isArray(value.data)) {
|
||||
return '0x' + arrayToHexString(value.data);
|
||||
}
|
||||
|
||||
if (value?.$binary?.base64) {
|
||||
return base64ToHex(value.$binary.base64);
|
||||
}
|
||||
return `="${value}"`;
|
||||
}
|
||||
|
||||
|
||||
@@ -2,7 +2,7 @@ import P from 'parsimmon';
|
||||
import moment from 'moment';
|
||||
import { Condition } from 'dbgate-sqltree';
|
||||
import { interpretEscapes, token, word, whitespace } from './common';
|
||||
import { hexStringToArray, parseNumberSafe } from 'dbgate-tools';
|
||||
import { hexToBase64, parseNumberSafe } from 'dbgate-tools';
|
||||
import { FilterBehaviour, TransformType } from 'dbgate-types';
|
||||
|
||||
const binaryCondition =
|
||||
@@ -385,10 +385,7 @@ const createParser = (filterBehaviour: FilterBehaviour) => {
|
||||
|
||||
hexstring: () =>
|
||||
token(P.regexp(/0x(([0-9a-fA-F][0-9a-fA-F])+)/, 1))
|
||||
.map(x => ({
|
||||
type: 'Buffer',
|
||||
data: hexStringToArray(x),
|
||||
}))
|
||||
.map(x => ({ $binary: { base64: hexToBase64(x) } }))
|
||||
.desc('hex string'),
|
||||
|
||||
noQuotedString: () => P.regexp(/[^\s^,^'^"]+/).desc('string unquoted'),
|
||||
|
||||
@@ -78,6 +78,14 @@ export class SqlDumper implements AlterProcessor {
|
||||
else if (_isNumber(value)) this.putRaw(value.toString());
|
||||
else if (_isDate(value)) this.putStringValue(new Date(value).toISOString());
|
||||
else if (value?.type == 'Buffer' && _isArray(value?.data)) this.putByteArrayValue(value?.data);
|
||||
else if (value?.$binary?.base64) {
|
||||
const binary = atob(value.$binary.base64);
|
||||
const bytes = new Array(binary.length);
|
||||
for (let i = 0; i < binary.length; i++) {
|
||||
bytes[i] = binary.charCodeAt(i);
|
||||
}
|
||||
this.putByteArrayValue(bytes);
|
||||
}
|
||||
else if (value?.$bigint) this.putRaw(value?.$bigint);
|
||||
else if (_isPlainObject(value) || _isArray(value)) this.putStringValue(JSON.stringify(value));
|
||||
else this.put('^null');
|
||||
|
||||
@@ -47,6 +47,7 @@ export const mongoFilterBehaviour: FilterBehaviour = {
|
||||
allowStringToken: true,
|
||||
allowNumberDualTesting: true,
|
||||
allowObjectIdTesting: true,
|
||||
allowHexString: true,
|
||||
};
|
||||
|
||||
export const evalFilterBehaviour: FilterBehaviour = {
|
||||
|
||||
@@ -43,6 +43,19 @@ export function hexStringToArray(inputString) {
|
||||
return res;
|
||||
}
|
||||
|
||||
export function base64ToHex(base64String) {
|
||||
const binaryString = atob(base64String);
|
||||
const hexString = Array.from(binaryString, c =>
|
||||
c.charCodeAt(0).toString(16).padStart(2, '0')
|
||||
).join('');
|
||||
return '0x' + hexString.toUpperCase();
|
||||
};
|
||||
|
||||
export function hexToBase64(hexString) {
|
||||
const binaryString = hexString.match(/.{1,2}/g).map(byte => String.fromCharCode(parseInt(byte, 16))).join('');
|
||||
return btoa(binaryString);
|
||||
}
|
||||
|
||||
export function parseCellValue(value, editorTypes?: DataEditorTypesBehaviour) {
|
||||
if (!_isString(value)) return value;
|
||||
|
||||
@@ -54,9 +67,10 @@ export function parseCellValue(value, editorTypes?: DataEditorTypesBehaviour) {
|
||||
const mHex = value.match(/^0x([0-9a-fA-F][0-9a-fA-F])+$/);
|
||||
if (mHex) {
|
||||
return {
|
||||
type: 'Buffer',
|
||||
data: hexStringToArray(value.substring(2)),
|
||||
};
|
||||
$binary: {
|
||||
base64: hexToBase64(value.substring(2))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -230,11 +244,19 @@ export function stringifyCellValue(
|
||||
if (value === true) return { value: 'true', gridStyle: 'valueCellStyle' };
|
||||
if (value === false) return { value: 'false', gridStyle: 'valueCellStyle' };
|
||||
|
||||
if (editorTypes?.parseHexAsBuffer) {
|
||||
if (value?.type == 'Buffer' && _isArray(value.data)) {
|
||||
return { value: '0x' + arrayToHexString(value.data), gridStyle: 'valueCellStyle' };
|
||||
}
|
||||
if (value?.$binary?.base64) {
|
||||
return {
|
||||
value: base64ToHex(value.$binary.base64),
|
||||
gridStyle: 'valueCellStyle',
|
||||
};
|
||||
}
|
||||
|
||||
if (editorTypes?.parseHexAsBuffer) {
|
||||
// if (value?.type == 'Buffer' && _isArray(value.data)) {
|
||||
// return { value: '0x' + arrayToHexString(value.data), gridStyle: 'valueCellStyle' };
|
||||
// }
|
||||
}
|
||||
|
||||
if (editorTypes?.parseObjectIdAsDollar) {
|
||||
if (value?.$oid) {
|
||||
switch (intent) {
|
||||
@@ -482,6 +504,9 @@ export function getAsImageSrc(obj) {
|
||||
if (obj?.type == 'Buffer' && _isArray(obj?.data)) {
|
||||
return `data:image/png;base64, ${arrayBufferToBase64(obj?.data)}`;
|
||||
}
|
||||
if (obj?.$binary?.base64) {
|
||||
return `data:image/png;base64, ${obj.$binary.base64}`;
|
||||
}
|
||||
|
||||
if (_isString(obj) && (obj.startsWith('http://') || obj.startsWith('https://'))) {
|
||||
return obj;
|
||||
|
||||
2
packages/types/test-engines.d.ts
vendored
2
packages/types/test-engines.d.ts
vendored
@@ -96,4 +96,6 @@ export type TestEngineInfo = {
|
||||
}>;
|
||||
|
||||
objects?: Array<TestObjectInfo>;
|
||||
|
||||
binaryDataType?: string;
|
||||
};
|
||||
|
||||
@@ -10,6 +10,9 @@
|
||||
if (value?.type == 'Buffer' && _.isArray(value?.data)) {
|
||||
return 'data:image/png;base64, ' + btoa(String.fromCharCode.apply(null, value?.data));
|
||||
}
|
||||
if (value?.$binary?.base64) {
|
||||
return 'data:image/png;base64, ' + value.$binary.base64;
|
||||
}
|
||||
return null;
|
||||
} catch (err) {
|
||||
console.log('Error showing picture', err);
|
||||
|
||||
@@ -361,6 +361,7 @@
|
||||
detectSqlFilterBehaviour,
|
||||
stringifyCellValue,
|
||||
shouldOpenMultilineDialog,
|
||||
base64ToHex,
|
||||
} from 'dbgate-tools';
|
||||
import { getContext, onDestroy } from 'svelte';
|
||||
import _, { map } from 'lodash';
|
||||
@@ -758,7 +759,7 @@
|
||||
|
||||
export function saveCellToFileEnabled() {
|
||||
const value = getSelectedExportableCell();
|
||||
return _.isString(value) || (value?.type == 'Buffer' && _.isArray(value?.data));
|
||||
return _.isString(value) || (value?.type == 'Buffer' && _.isArray(value?.data)) || (value?.$binary?.base64);
|
||||
}
|
||||
|
||||
export async function saveCellToFile() {
|
||||
@@ -771,6 +772,8 @@
|
||||
fs.promises.writeFile(file, value);
|
||||
} else if (value?.type == 'Buffer' && _.isArray(value?.data)) {
|
||||
fs.promises.writeFile(file, window['Buffer'].from(value.data));
|
||||
} else if (value?.$binary?.base64) {
|
||||
fs.promises.writeFile(file, window['Buffer'].from(value.$binary.base64, 'base64'));
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -796,8 +799,9 @@
|
||||
isText
|
||||
? data
|
||||
: {
|
||||
type: 'Buffer',
|
||||
data: [...data],
|
||||
$binary: {
|
||||
base64: data.toString('base64'),
|
||||
},
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
import _ from 'lodash';
|
||||
import { apiCall } from '../utility/api';
|
||||
import ErrorInfo from '../elements/ErrorInfo.svelte';
|
||||
import { base64ToHex } from 'dbgate-tools';
|
||||
import { _t } from '../translations';
|
||||
|
||||
export let onConfirm;
|
||||
@@ -113,7 +114,7 @@
|
||||
{
|
||||
fieldName: 'value',
|
||||
header: _t('dataGrid.value', { defaultMessage: 'Value' }),
|
||||
formatter: row => (row.value == null ? '(NULL)' : row.value),
|
||||
formatter: row => (row.value == null ? '(NULL)' : row.value?.$binary?.base64 ? base64ToHex(row.value.$binary.base64) : row.value),
|
||||
},
|
||||
]}
|
||||
>
|
||||
|
||||
@@ -15,6 +15,9 @@
|
||||
if (force && value?.type == 'Buffer' && _.isArray(value.data)) {
|
||||
return String.fromCharCode.apply(String, value.data);
|
||||
}
|
||||
else if (force && value?.$binary?.base64) {
|
||||
return atob(value.$binary.base64);
|
||||
}
|
||||
return stringifyCellValue(value, 'gridCellIntent').value;
|
||||
}
|
||||
</script>
|
||||
|
||||
@@ -51,6 +51,10 @@ function findArrayResult(resValue) {
|
||||
return null;
|
||||
}
|
||||
|
||||
function BinData(_subType, base64) {
|
||||
return Buffer.from(base64, 'base64');
|
||||
}
|
||||
|
||||
async function getScriptableDb(dbhan) {
|
||||
const db = dbhan.getDatabase();
|
||||
db.getCollection = (name) => db.collection(name);
|
||||
@@ -156,9 +160,9 @@ const driver = {
|
||||
// return printable;
|
||||
// }
|
||||
let func;
|
||||
func = eval(`(db,ObjectId) => ${sql}`);
|
||||
func = eval(`(db,ObjectId,BinData) => ${sql}`);
|
||||
const db = await getScriptableDb(dbhan);
|
||||
const res = func(db, ObjectId.createFromHexString);
|
||||
const res = func(db, ObjectId.createFromHexString, BinData);
|
||||
if (isPromise(res)) await res;
|
||||
},
|
||||
async operation(dbhan, operation, options) {
|
||||
@@ -285,7 +289,7 @@ const driver = {
|
||||
} else {
|
||||
let func;
|
||||
try {
|
||||
func = eval(`(db,ObjectId) => ${sql}`);
|
||||
func = eval(`(db,ObjectId,BinData) => ${sql}`);
|
||||
} catch (err) {
|
||||
options.info({
|
||||
message: 'Error compiling expression: ' + err.message,
|
||||
@@ -299,7 +303,7 @@ const driver = {
|
||||
|
||||
let exprValue;
|
||||
try {
|
||||
exprValue = func(db, ObjectId.createFromHexString);
|
||||
exprValue = func(db, ObjectId.createFromHexString, BinData);
|
||||
} catch (err) {
|
||||
options.info({
|
||||
message: 'Error evaluating expression: ' + err.message,
|
||||
@@ -411,9 +415,9 @@ const driver = {
|
||||
// highWaterMark: 100,
|
||||
// });
|
||||
|
||||
func = eval(`(db,ObjectId) => ${sql}`);
|
||||
func = eval(`(db,ObjectId,BinData) => ${sql}`);
|
||||
const db = await getScriptableDb(dbhan);
|
||||
exprValue = func(db, ObjectId.createFromHexString);
|
||||
exprValue = func(db, ObjectId.createFromHexString, BinData);
|
||||
|
||||
const pass = new stream.PassThrough({
|
||||
objectMode: true,
|
||||
|
||||
@@ -15,7 +15,10 @@ function mongoReplacer(key, value) {
|
||||
function jsonStringifyWithObjectId(obj) {
|
||||
return JSON.stringify(obj, mongoReplacer, 2)
|
||||
.replace(/\{\s*\"\$oid\"\s*\:\s*\"([0-9a-f]+)\"\s*\}/g, (m, id) => `ObjectId("${id}")`)
|
||||
.replace(/\{\s*\"\$bigint\"\s*\:\s*\"([0-9]+)\"\s*\}/g, (m, num) => `${num}n`);
|
||||
.replace(/\{\s*\"\$bigint\"\s*\:\s*\"([0-9]+)\"\s*\}/g, (m, num) => `${num}n`)
|
||||
.replace(/\{\s*"\$binary"\s*:\s*\{\s*"base64"\s*:\s*"([^"]+)"(?:\s*,\s*"subType"\s*:\s*"([0-9a-fA-F]{2})")?\s*\}\s*\}/g, (m, base64, subType) => {
|
||||
return `BinData(${parseInt(subType || "00", 16)}, "${base64}")`;
|
||||
});
|
||||
}
|
||||
|
||||
/** @type {import('dbgate-types').SqlDialect} */
|
||||
@@ -129,7 +132,7 @@ const driver = {
|
||||
|
||||
getCollectionExportQueryScript(collection, condition, sort) {
|
||||
return `db.getCollection('${collection}')
|
||||
.find(${JSON.stringify(convertToMongoCondition(condition) || {})})
|
||||
.find(${jsonStringifyWithObjectId(convertToMongoCondition(condition) || {})})
|
||||
.sort(${JSON.stringify(convertToMongoSort(sort) || {})})`;
|
||||
},
|
||||
getCollectionExportQueryJson(collection, condition, sort) {
|
||||
@@ -148,6 +151,7 @@ const driver = {
|
||||
parseJsonObject: true,
|
||||
parseObjectIdAsDollar: true,
|
||||
parseDateAsDollar: true,
|
||||
parseHexAsBuffer: true,
|
||||
|
||||
explicitDataType: true,
|
||||
supportNumberType: true,
|
||||
|
||||
@@ -24,6 +24,15 @@ function extractTediousColumns(columns, addDriverNativeColumn = false) {
|
||||
return res;
|
||||
}
|
||||
|
||||
function modifyRow(row, columns) {
|
||||
columns.forEach((col) => {
|
||||
if (Buffer.isBuffer(row[col.columnName])) {
|
||||
row[col.columnName] = { $binary: { base64: Buffer.from(row[col.columnName]).toString('base64') } };
|
||||
}
|
||||
});
|
||||
return row;
|
||||
}
|
||||
|
||||
async function getDefaultAzureSqlToken() {
|
||||
const credential = new ManagedIdentityCredential();
|
||||
const tokenResponse = await credential.getToken('https://database.windows.net/.default');
|
||||
@@ -125,9 +134,12 @@ async function tediousQueryCore(dbhan, sql, options) {
|
||||
});
|
||||
request.on('row', function (columns) {
|
||||
result.rows.push(
|
||||
_.zipObject(
|
||||
result.columns.map(x => x.columnName),
|
||||
columns.map(x => x.value)
|
||||
modifyRow(
|
||||
_.zipObject(
|
||||
result.columns.map(x => x.columnName),
|
||||
columns.map(x => x.value)
|
||||
),
|
||||
result.columns
|
||||
)
|
||||
);
|
||||
});
|
||||
@@ -152,13 +164,17 @@ async function tediousReadQuery(dbhan, sql, structure) {
|
||||
currentColumns = extractTediousColumns(columns);
|
||||
pass.write({
|
||||
__isStreamHeader: true,
|
||||
engine: 'mssql@dbgate-plugin-mssql',
|
||||
...(structure || { columns: currentColumns }),
|
||||
});
|
||||
});
|
||||
request.on('row', function (columns) {
|
||||
const row = _.zipObject(
|
||||
currentColumns.map(x => x.columnName),
|
||||
columns.map(x => x.value)
|
||||
const row = modifyRow(
|
||||
_.zipObject(
|
||||
currentColumns.map(x => x.columnName),
|
||||
columns.map(x => x.value)
|
||||
),
|
||||
currentColumns
|
||||
);
|
||||
pass.write(row);
|
||||
});
|
||||
@@ -216,12 +232,15 @@ async function tediousStream(dbhan, sql, options) {
|
||||
});
|
||||
request.on('columnMetadata', function (columns) {
|
||||
currentColumns = extractTediousColumns(columns);
|
||||
options.recordset(currentColumns);
|
||||
options.recordset(currentColumns, { engine: 'mssql@dbgate-plugin-mssql' });
|
||||
});
|
||||
request.on('row', function (columns) {
|
||||
const row = _.zipObject(
|
||||
currentColumns.map(x => x.columnName),
|
||||
columns.map(x => x.value)
|
||||
const row = modifyRow(
|
||||
_.zipObject(
|
||||
currentColumns.map(x => x.columnName),
|
||||
columns.map(x => x.value)
|
||||
),
|
||||
currentColumns
|
||||
);
|
||||
options.row(row);
|
||||
skipAffectedMessage = true;
|
||||
|
||||
@@ -23,6 +23,15 @@ function extractColumns(fields) {
|
||||
return null;
|
||||
}
|
||||
|
||||
function modifyRow(row, columns) {
|
||||
columns.forEach((col) => {
|
||||
if (Buffer.isBuffer(row[col.columnName])) {
|
||||
row[col.columnName] = { $binary: { base64: Buffer.from(row[col.columnName]).toString('base64') } };
|
||||
}
|
||||
});
|
||||
return row;
|
||||
}
|
||||
|
||||
function zipDataRow(rowArray, columns) {
|
||||
return _.zipObject(
|
||||
columns.map(x => x.columnName),
|
||||
@@ -99,8 +108,8 @@ const drivers = driverBases.map(driverBase => ({
|
||||
return new Promise((resolve, reject) => {
|
||||
dbhan.client.query(sql, function (error, results, fields) {
|
||||
if (error) reject(error);
|
||||
const columns = extractColumns(fields);
|
||||
resolve({ rows: results && columns && results.map && results.map(row => zipDataRow(row, columns)), columns });
|
||||
const columns = extractColumns(fields);
|
||||
resolve({ rows: results && columns && results.map && results.map(row => modifyRow(zipDataRow(row, columns), columns)), columns });
|
||||
});
|
||||
});
|
||||
},
|
||||
@@ -136,14 +145,14 @@ const drivers = driverBases.map(driverBase => ({
|
||||
}
|
||||
} else {
|
||||
if (columns) {
|
||||
options.row(zipDataRow(row, columns));
|
||||
options.row(modifyRow(zipDataRow(row, columns), columns));
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
const handleFields = fields => {
|
||||
columns = extractColumns(fields);
|
||||
if (columns) options.recordset(columns);
|
||||
if (columns) options.recordset(columns, { engine: driverBase.engine });
|
||||
};
|
||||
|
||||
const handleError = error => {
|
||||
@@ -177,10 +186,11 @@ const drivers = driverBases.map(driverBase => ({
|
||||
columns = extractColumns(fields);
|
||||
pass.write({
|
||||
__isStreamHeader: true,
|
||||
engine: driverBase.engine,
|
||||
...(structure || { columns }),
|
||||
});
|
||||
})
|
||||
.on('result', row => pass.write(zipDataRow(row, columns)))
|
||||
.on('result', row => pass.write(modifyRow(zipDataRow(row, columns), columns)))
|
||||
.on('end', () => pass.end());
|
||||
|
||||
return pass;
|
||||
|
||||
@@ -37,6 +37,15 @@ function zipDataRow(rowArray, columns) {
|
||||
return obj;
|
||||
}
|
||||
|
||||
function modifyRow(row, columns) {
|
||||
columns.forEach(col => {
|
||||
if (Buffer.isBuffer(row[col.columnName])) {
|
||||
row[col.columnName] = { $binary: { base64: row[col.columnName].toString('base64') } };
|
||||
}
|
||||
});
|
||||
return row;
|
||||
}
|
||||
|
||||
let oracleClientInitialized = false;
|
||||
|
||||
/** @type {import('dbgate-types').EngineDriver} */
|
||||
@@ -106,7 +115,7 @@ const driver = {
|
||||
const res = await dbhan.client.execute(sql);
|
||||
try {
|
||||
const columns = extractOracleColumns(res.metaData);
|
||||
return { rows: (res.rows || []).map(row => zipDataRow(row, columns)), columns };
|
||||
return { rows: (res.rows || []).map(row => modifyRow(zipDataRow(row, columns), columns)), columns };
|
||||
} catch (err) {
|
||||
return {
|
||||
rows: [],
|
||||
@@ -134,7 +143,7 @@ const driver = {
|
||||
if (!wasHeader) {
|
||||
columns = extractOracleColumns(row);
|
||||
if (columns && columns.length > 0) {
|
||||
options.recordset(columns);
|
||||
options.recordset(columns, { engine: driverBase.engine });
|
||||
}
|
||||
wasHeader = true;
|
||||
}
|
||||
@@ -147,11 +156,11 @@ const driver = {
|
||||
if (!wasHeader) {
|
||||
columns = extractOracleColumns(row);
|
||||
if (columns && columns.length > 0) {
|
||||
options.recordset(columns);
|
||||
options.recordset(columns, { engine: driverBase.engine });
|
||||
}
|
||||
wasHeader = true;
|
||||
}
|
||||
options.row(zipDataRow(row, columns));
|
||||
options.row(modifyRow(zipDataRow(row, columns), columns));
|
||||
});
|
||||
|
||||
query.on('end', () => {
|
||||
@@ -214,9 +223,9 @@ const driver = {
|
||||
|
||||
if (rows && metaData) {
|
||||
const columns = extractOracleColumns(metaData);
|
||||
options.recordset(columns);
|
||||
options.recordset(columns, { engine: driverBase.engine });
|
||||
for (const row of rows) {
|
||||
options.row(zipDataRow(row, columns));
|
||||
options.row(modifyRow(zipDataRow(row, columns), columns));
|
||||
}
|
||||
} else if (rowsAffected) {
|
||||
options.info({
|
||||
@@ -303,6 +312,7 @@ const driver = {
|
||||
if (columns && columns.length > 0) {
|
||||
pass.write({
|
||||
__isStreamHeader: true,
|
||||
engine: driverBase.engine,
|
||||
...(structure || { columns }),
|
||||
});
|
||||
}
|
||||
@@ -311,7 +321,7 @@ const driver = {
|
||||
});
|
||||
|
||||
query.on('data', row => {
|
||||
pass.write(zipDataRow(row, columns));
|
||||
pass.write(modifyRow(zipDataRow(row, columns), columns));
|
||||
});
|
||||
|
||||
query.on('end', () => {
|
||||
|
||||
@@ -136,9 +136,9 @@ class Dumper extends SqlDumper {
|
||||
// else super.putValue(value);
|
||||
// }
|
||||
|
||||
// putByteArrayValue(value) {
|
||||
// this.putRaw(`e'\\\\x${arrayToHexString(value)}'`);
|
||||
// }
|
||||
putByteArrayValue(value) {
|
||||
this.putRaw(`HEXTORAW('${arrayToHexString(value)}')`);
|
||||
}
|
||||
|
||||
putValue(value, dataType) {
|
||||
if (dataType?.toLowerCase() == 'timestamp') {
|
||||
|
||||
@@ -48,6 +48,9 @@ function transformRow(row, columnsToTransform) {
|
||||
if (dataTypeName == 'geography') {
|
||||
row[columnName] = extractGeographyDate(row[columnName]);
|
||||
}
|
||||
else if (dataTypeName == 'bytea' && row[columnName]) {
|
||||
row[columnName] = { $binary: { base64: Buffer.from(row[columnName]).toString('base64') } };
|
||||
}
|
||||
}
|
||||
|
||||
return row;
|
||||
@@ -159,7 +162,7 @@ const drivers = driverBases.map(driverBase => ({
|
||||
conid,
|
||||
};
|
||||
|
||||
const datatypes = await this.query(dbhan, `SELECT oid, typname FROM pg_type WHERE typname in ('geography')`);
|
||||
const datatypes = await this.query(dbhan, `SELECT oid, typname FROM pg_type WHERE typname in ('geography', 'bytea')`);
|
||||
const typeIdToName = _.fromPairs(datatypes.rows.map(cur => [cur.oid, cur.typname]));
|
||||
dbhan['typeIdToName'] = typeIdToName;
|
||||
|
||||
@@ -181,7 +184,14 @@ const drivers = driverBases.map(driverBase => ({
|
||||
}
|
||||
const res = await dbhan.client.query({ text: sql, rowMode: 'array' });
|
||||
const columns = extractPostgresColumns(res, dbhan);
|
||||
return { rows: (res.rows || []).map(row => zipDataRow(row, columns)), columns };
|
||||
|
||||
const transormableTypeNames = Object.values(dbhan.typeIdToName ?? {});
|
||||
const columnsToTransform = columns.filter(x => transormableTypeNames.includes(x.dataTypeName));
|
||||
|
||||
const zippedRows = (res.rows || []).map(row => zipDataRow(row, columns));
|
||||
const transformedRows = zippedRows.map(row => transformRow(row, columnsToTransform));
|
||||
|
||||
return { rows: transformedRows, columns };
|
||||
},
|
||||
stream(dbhan, sql, options) {
|
||||
const handleNotice = notice => {
|
||||
@@ -208,7 +218,7 @@ const drivers = driverBases.map(driverBase => ({
|
||||
if (!wasHeader) {
|
||||
columns = extractPostgresColumns(query._result, dbhan);
|
||||
if (columns && columns.length > 0) {
|
||||
options.recordset(columns);
|
||||
options.recordset(columns, { engine: driverBase.engine });
|
||||
}
|
||||
wasHeader = true;
|
||||
}
|
||||
@@ -328,6 +338,7 @@ const drivers = driverBases.map(driverBase => ({
|
||||
columns = extractPostgresColumns(query._result, dbhan);
|
||||
pass.write({
|
||||
__isStreamHeader: true,
|
||||
engine: driverBase.engine,
|
||||
...(structure || { columns }),
|
||||
});
|
||||
wasHeader = true;
|
||||
|
||||
@@ -4,7 +4,7 @@ const stream = require('stream');
|
||||
const driverBases = require('../frontend/drivers');
|
||||
const Analyser = require('./Analyser');
|
||||
const { splitQuery, sqliteSplitterOptions } = require('dbgate-query-splitter');
|
||||
const { runStreamItem, waitForDrain } = require('./helpers');
|
||||
const { runStreamItem, waitForDrain, modifyRow } = require('./helpers');
|
||||
const { getLogger, createBulkInsertStreamBase, extractErrorLogData } = global.DBGATE_PACKAGES['dbgate-tools'];
|
||||
|
||||
const logger = getLogger('sqliteDriver');
|
||||
@@ -51,7 +51,7 @@ const libsqlDriver = {
|
||||
const columns = stmtColumns.length > 0 ? stmtColumns : extractColumns(rows[0]);
|
||||
|
||||
return {
|
||||
rows,
|
||||
rows: rows.map((row) => modifyRow(row, columns)),
|
||||
columns: columns.map((col) => ({
|
||||
columnName: col.name,
|
||||
dataType: col.type,
|
||||
@@ -66,7 +66,7 @@ const libsqlDriver = {
|
||||
console.log('#stream', sql);
|
||||
const inTransaction = dbhan.client.transaction(() => {
|
||||
for (const sqlItem of sqlSplitted) {
|
||||
runStreamItem(dbhan, sqlItem, options, rowCounter);
|
||||
runStreamItem(dbhan, sqlItem, options, rowCounter, driverBases[1].engine);
|
||||
}
|
||||
|
||||
if (rowCounter.date) {
|
||||
@@ -115,9 +115,10 @@ const libsqlDriver = {
|
||||
|
||||
async readQueryTask(stmt, pass) {
|
||||
// let sent = 0;
|
||||
const columns = stmt.columns();
|
||||
for (const row of stmt.iterate()) {
|
||||
// sent++;
|
||||
if (!pass.write(row)) {
|
||||
if (!pass.write(modifyRow(row, columns))) {
|
||||
// console.log('WAIT DRAIN', sent);
|
||||
await waitForDrain(pass);
|
||||
}
|
||||
@@ -135,6 +136,7 @@ const libsqlDriver = {
|
||||
|
||||
pass.write({
|
||||
__isStreamHeader: true,
|
||||
engine: driverBases[1].engine,
|
||||
...(structure || {
|
||||
columns: columns.map((col) => ({
|
||||
columnName: col.name,
|
||||
|
||||
@@ -5,7 +5,7 @@ const Analyser = require('./Analyser');
|
||||
const driverBases = require('../frontend/drivers');
|
||||
const { splitQuery, sqliteSplitterOptions } = require('dbgate-query-splitter');
|
||||
const { getLogger, createBulkInsertStreamBase, extractErrorLogData } = global.DBGATE_PACKAGES['dbgate-tools'];
|
||||
const { runStreamItem, waitForDrain } = require('./helpers');
|
||||
const { runStreamItem, waitForDrain, modifyRow } = require('./helpers');
|
||||
|
||||
const logger = getLogger('sqliteDriver');
|
||||
|
||||
@@ -40,7 +40,7 @@ const driver = {
|
||||
const columns = stmt.columns();
|
||||
const rows = stmt.all();
|
||||
return {
|
||||
rows,
|
||||
rows: rows.map((row) => modifyRow(row, columns)),
|
||||
columns: columns.map((col) => ({
|
||||
columnName: col.name,
|
||||
dataType: col.type,
|
||||
@@ -61,7 +61,7 @@ const driver = {
|
||||
|
||||
const inTransaction = dbhan.client.transaction(() => {
|
||||
for (const sqlItem of sqlSplitted) {
|
||||
runStreamItem(dbhan, sqlItem, options, rowCounter);
|
||||
runStreamItem(dbhan, sqlItem, options, rowCounter, driverBases[0].engine);
|
||||
}
|
||||
|
||||
if (rowCounter.date) {
|
||||
@@ -103,9 +103,10 @@ const driver = {
|
||||
|
||||
async readQueryTask(stmt, pass) {
|
||||
// let sent = 0;
|
||||
const columns = stmt.columns();
|
||||
for (const row of stmt.iterate()) {
|
||||
// sent++;
|
||||
if (!pass.write(row)) {
|
||||
if (!pass.write(modifyRow(row, columns))) {
|
||||
// console.log('WAIT DRAIN', sent);
|
||||
await waitForDrain(pass);
|
||||
}
|
||||
@@ -123,6 +124,7 @@ const driver = {
|
||||
|
||||
pass.write({
|
||||
__isStreamHeader: true,
|
||||
engine: driverBases[0].engine,
|
||||
...(structure || {
|
||||
columns: columns.map((col) => ({
|
||||
columnName: col.name,
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
// @ts-check
|
||||
|
||||
function runStreamItem(dbhan, sql, options, rowCounter) {
|
||||
function runStreamItem(dbhan, sql, options, rowCounter, engine) {
|
||||
const stmt = dbhan.client.prepare(sql);
|
||||
console.log(stmt);
|
||||
console.log(stmt.reader);
|
||||
@@ -12,11 +12,12 @@ function runStreamItem(dbhan, sql, options, rowCounter) {
|
||||
columns.map((col) => ({
|
||||
columnName: col.name,
|
||||
dataType: col.type,
|
||||
}))
|
||||
})),
|
||||
{ engine }
|
||||
);
|
||||
|
||||
for (const row of stmt.iterate()) {
|
||||
options.row(row);
|
||||
options.row(modifyRow(row, columns));
|
||||
}
|
||||
} else {
|
||||
const info = stmt.run();
|
||||
@@ -44,7 +45,17 @@ async function waitForDrain(stream) {
|
||||
});
|
||||
}
|
||||
|
||||
function modifyRow(row, columns) {
|
||||
columns.forEach((col) => {
|
||||
if (row[col.name] instanceof Uint8Array || row[col.name] instanceof ArrayBuffer) {
|
||||
row[col.name] = { $binary: { base64: Buffer.from(row[col.name]).toString('base64') } };
|
||||
}
|
||||
});
|
||||
return row;
|
||||
}
|
||||
|
||||
module.exports = {
|
||||
runStreamItem,
|
||||
waitForDrain,
|
||||
modifyRow,
|
||||
};
|
||||
|
||||
@@ -45,6 +45,10 @@ class StringifyStream extends stream.Transform {
|
||||
|
||||
elementValue(element, value) {
|
||||
this.startElement(element);
|
||||
if (value?.$binary?.base64) {
|
||||
const buffer = Buffer.from(value.$binary.base64, 'base64');
|
||||
value = '0x' +buffer.toString('hex').toUpperCase();
|
||||
}
|
||||
this.push(escapeXml(`${value}`));
|
||||
this.endElement(element);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user