From 48d43743461a228b6b3430ba85ea07b1962774b4 Mon Sep 17 00:00:00 2001 From: "SPRINX0\\prochazka" Date: Fri, 20 Sep 2024 10:27:03 +0200 Subject: [PATCH] introduced dbhandle instead of overwriting 3rd party client's fields --- .../api/src/proc/databaseConnectionProcess.js | 42 +++--- packages/tools/src/DatabaseAnalyser.ts | 10 +- .../tools/src/createBulkInsertStreamBase.ts | 18 +-- packages/types/engines.d.ts | 73 ++++++----- .../src/backend/Analyser.js | 6 +- .../src/backend/createBulkInsertStream.js | 6 +- .../src/backend/driver.js | 39 +++--- .../src/backend/Analyser.js | 10 +- .../src/backend/createBulkInsertStream.js | 4 +- .../dbgate-plugin-mongo/src/backend/driver.js | 98 +++++++------- .../src/backend/MsSqlAnalyser.js | 6 +- .../backend/createNativeBulkInsertStream.js | 10 +- .../backend/createTediousBulkInsertStream.js | 14 +- .../dbgate-plugin-mssql/src/backend/driver.js | 70 +++++----- .../src/backend/nativeDriver.js | 13 +- .../src/backend/tediousDriver.js | 15 +-- .../src/backend/Analyser.js | 6 +- .../src/backend/drivers.js | 42 +++--- .../src/backend/Analyser.js | 22 ++-- .../backend/createOracleBulkInsertStream.js | 6 +- .../src/backend/driver.js | 42 +++--- .../src/backend/Analyser.js | 6 +- .../src/backend/drivers.js | 34 ++--- .../src/backend/Analyser.js | 4 +- .../dbgate-plugin-redis/src/backend/driver.js | 124 +++++++++--------- .../src/backend/driver.js | 38 +++--- 26 files changed, 387 insertions(+), 371 deletions(-) diff --git a/packages/api/src/proc/databaseConnectionProcess.js b/packages/api/src/proc/databaseConnectionProcess.js index d88ddb394..702b968dd 100644 --- a/packages/api/src/proc/databaseConnectionProcess.js +++ b/packages/api/src/proc/databaseConnectionProcess.js @@ -11,7 +11,7 @@ const { dumpSqlSelect } = require('dbgate-sqltree'); const logger = getLogger('dbconnProcess'); -let systemConnection; +let dbhan; let storedConnection; let afterConnectCallbacks = []; let afterAnalyseCallbacks = []; @@ -49,7 +49,7 @@ async function handleFullRefresh() { loadingModel = true; const driver = requireEngineDriver(storedConnection); setStatusName('loadStructure'); - analysedStructure = await checkedAsyncCall(driver.analyseFull(systemConnection, serverVersion)); + analysedStructure = await checkedAsyncCall(driver.analyseFull(dbhan, serverVersion)); analysedTime = new Date().getTime(); process.send({ msgtype: 'structure', structure: analysedStructure }); process.send({ msgtype: 'structureTime', analysedTime }); @@ -64,7 +64,7 @@ async function handleIncrementalRefresh(forceSend) { const driver = requireEngineDriver(storedConnection); setStatusName('checkStructure'); const newStructure = await checkedAsyncCall( - driver.analyseIncremental(systemConnection, analysedStructure, serverVersion) + driver.analyseIncremental(dbhan, analysedStructure, serverVersion) ); analysedTime = new Date().getTime(); if (newStructure != null) { @@ -103,7 +103,7 @@ function setStatusName(name) { async function readVersion() { const driver = requireEngineDriver(storedConnection); - const version = await driver.getVersion(systemConnection); + const version = await driver.getVersion(dbhan); process.send({ msgtype: 'version', version }); serverVersion = version; } @@ -114,8 +114,8 @@ async function handleConnect({ connection, structure, globalSettings }) { if (!structure) setStatusName('pending'); const driver = requireEngineDriver(storedConnection); - systemConnection = await checkedAsyncCall(connectUtility(driver, storedConnection, 'app')); - systemConnection.feedback = feedback => setStatus({ feedback }); + dbhan = await checkedAsyncCall(connectUtility(driver, storedConnection, 'app')); + dbhan.feedback = feedback => setStatus({ feedback }); await checkedAsyncCall(readVersion()); if (structure) { analysedStructure = structure; @@ -138,7 +138,7 @@ async function handleConnect({ connection, structure, globalSettings }) { } function waitConnected() { - if (systemConnection) return Promise.resolve(); + if (dbhan) return Promise.resolve(); return new Promise((resolve, reject) => { afterConnectCallbacks.push([resolve, reject]); }); @@ -163,7 +163,7 @@ async function handleRunScript({ msgid, sql, useTransaction }, skipReadonlyCheck const driver = requireEngineDriver(storedConnection); try { if (!skipReadonlyCheck) ensureExecuteCustomScript(driver); - await driver.script(systemConnection, sql, { useTransaction }); + await driver.script(dbhan, sql, { useTransaction }); process.send({ msgtype: 'response', msgid }); } catch (err) { process.send({ msgtype: 'response', msgid, errorMessage: err.message }); @@ -175,7 +175,7 @@ async function handleRunOperation({ msgid, operation, useTransaction }, skipRead const driver = requireEngineDriver(storedConnection); try { if (!skipReadonlyCheck) ensureExecuteCustomScript(driver); - await driver.operation(systemConnection, operation, { useTransaction }); + await driver.operation(dbhan, operation, { useTransaction }); process.send({ msgtype: 'response', msgid }); } catch (err) { process.send({ msgtype: 'response', msgid, errorMessage: err.message }); @@ -188,7 +188,7 @@ async function handleQueryData({ msgid, sql }, skipReadonlyCheck = false) { try { if (!skipReadonlyCheck) ensureExecuteCustomScript(driver); // console.log(sql); - const res = await driver.query(systemConnection, sql); + const res = await driver.query(dbhan, sql); process.send({ msgtype: 'response', msgid, ...res }); } catch (err) { process.send({ msgtype: 'response', msgid, errorMessage: err.message || 'Error executing SQL script' }); @@ -214,23 +214,23 @@ async function handleDriverDataCore(msgid, callMethod) { } async function handleSchemaList({ msgid }) { - return handleDriverDataCore(msgid, driver => driver.listSchemas(systemConnection)); + return handleDriverDataCore(msgid, driver => driver.listSchemas(dbhan)); } async function handleCollectionData({ msgid, options }) { - return handleDriverDataCore(msgid, driver => driver.readCollection(systemConnection, options)); + return handleDriverDataCore(msgid, driver => driver.readCollection(dbhan, options)); } async function handleLoadKeys({ msgid, root, filter }) { - return handleDriverDataCore(msgid, driver => driver.loadKeys(systemConnection, root, filter)); + return handleDriverDataCore(msgid, driver => driver.loadKeys(dbhan, root, filter)); } async function handleExportKeys({ msgid, options }) { - return handleDriverDataCore(msgid, driver => driver.exportKeys(systemConnection, options)); + return handleDriverDataCore(msgid, driver => driver.exportKeys(dbhan, options)); } async function handleLoadKeyInfo({ msgid, key }) { - return handleDriverDataCore(msgid, driver => driver.loadKeyInfo(systemConnection, key)); + return handleDriverDataCore(msgid, driver => driver.loadKeyInfo(dbhan, key)); } async function handleCallMethod({ msgid, method, args }) { @@ -240,17 +240,17 @@ async function handleCallMethod({ msgid, method, args }) { } ensureExecuteCustomScript(driver); - return driver.callMethod(systemConnection, method, args); + return driver.callMethod(dbhan, method, args); }); } async function handleLoadKeyTableRange({ msgid, key, cursor, count }) { - return handleDriverDataCore(msgid, driver => driver.loadKeyTableRange(systemConnection, key, cursor, count)); + return handleDriverDataCore(msgid, driver => driver.loadKeyTableRange(dbhan, key, cursor, count)); } async function handleLoadFieldValues({ msgid, schemaName, pureName, field, search }) { return handleDriverDataCore(msgid, driver => - driver.loadFieldValues(systemConnection, { schemaName, pureName }, field, search) + driver.loadFieldValues(dbhan, { schemaName, pureName }, field, search) ); } @@ -268,7 +268,7 @@ async function handleUpdateCollection({ msgid, changeSet }) { const driver = requireEngineDriver(storedConnection); try { ensureExecuteCustomScript(driver); - const result = await driver.updateCollection(systemConnection, changeSet); + const result = await driver.updateCollection(dbhan, changeSet); process.send({ msgtype: 'response', msgid, result }); } catch (err) { process.send({ msgtype: 'response', msgid, errorMessage: err.message }); @@ -281,7 +281,7 @@ async function handleSqlPreview({ msgid, objects, options }) { try { const dmp = driver.createDumper(); - const generator = new SqlGenerator(analysedStructure, options, objects, dmp, driver, systemConnection); + const generator = new SqlGenerator(analysedStructure, options, objects, dmp, driver, dbhan); await generator.dump(); process.send({ msgtype: 'response', msgid, sql: dmp.s, isTruncated: generator.isTruncated }); @@ -301,7 +301,7 @@ async function handleGenerateDeploySql({ msgid, modelFolder }) { try { const res = await generateDeploySql({ - systemConnection, + systemConnection: dbhan, connection: storedConnection, analysedStructure, modelFolder, diff --git a/packages/tools/src/DatabaseAnalyser.ts b/packages/tools/src/DatabaseAnalyser.ts index 6c7b371f1..265e5d183 100644 --- a/packages/tools/src/DatabaseAnalyser.ts +++ b/packages/tools/src/DatabaseAnalyser.ts @@ -1,4 +1,4 @@ -import { DatabaseInfo, DatabaseModification, EngineDriver, SqlDialect } from 'dbgate-types'; +import { DatabaseHandle, DatabaseInfo, DatabaseModification, EngineDriver, SqlDialect } from 'dbgate-types'; import _sortBy from 'lodash/sortBy'; import _groupBy from 'lodash/groupBy'; import _pick from 'lodash/pick'; @@ -40,7 +40,7 @@ export class DatabaseAnalyser { dialect: SqlDialect; logger: Logger; - constructor(public pool, public driver: EngineDriver, version) { + constructor(public dbhan: DatabaseHandle, public driver: EngineDriver, version) { this.dialect = (driver?.dialectByVersion && driver?.dialectByVersion(version)) || driver?.dialect; this.logger = logger; } @@ -242,8 +242,8 @@ export class DatabaseAnalyser { } feedback(obj) { - if (this.pool.feedback) { - this.pool.feedback(obj); + if (this.dbhan.feedback) { + this.dbhan.feedback(obj); } if (obj && obj.analysingMessage) { logger.debug(obj.analysingMessage); @@ -318,7 +318,7 @@ export class DatabaseAnalyser { }; } try { - const res = await this.driver.query(this.pool, sql); + const res = await this.driver.query(this.dbhan, sql); this.logger.debug({ rows: res.rows.length, template }, `Loaded analyser query`); return res; } catch (err) { diff --git a/packages/tools/src/createBulkInsertStreamBase.ts b/packages/tools/src/createBulkInsertStreamBase.ts index dcf93e40b..78b466aa8 100644 --- a/packages/tools/src/createBulkInsertStreamBase.ts +++ b/packages/tools/src/createBulkInsertStreamBase.ts @@ -5,7 +5,7 @@ import { prepareTableForImport } from './tableTransforms'; const logger = getLogger('bulkStreamBase'); -export function createBulkInsertStreamBase(driver: EngineDriver, stream, pool, name, options: WriteTableOptions): any { +export function createBulkInsertStreamBase(driver: EngineDriver, stream, dbhan, name, options: WriteTableOptions): any { const fullNameQuoted = name.schemaName ? `${driver.dialect.quoteIdentifier(name.schemaName)}.${driver.dialect.quoteIdentifier(name.pureName)}` : driver.dialect.quoteIdentifier(name.pureName); @@ -29,22 +29,22 @@ export function createBulkInsertStreamBase(driver: EngineDriver, stream, pool, n }; writable.checkStructure = async () => { - let structure = await driver.analyseSingleTable(pool, name); + let structure = await driver.analyseSingleTable(dbhan, name); // console.log('ANALYSING', name, structure); if (structure && options.dropIfExists) { logger.info(`Dropping table ${fullNameQuoted}`); - await driver.script(pool, `DROP TABLE ${fullNameQuoted}`); + await driver.script(dbhan, `DROP TABLE ${fullNameQuoted}`); } if (options.createIfNotExists && (!structure || options.dropIfExists)) { const dmp = driver.createDumper(); const createdTableInfo = driver.adaptTableInfo(prepareTableForImport({ ...writable.structure, ...name })); dmp.createTable(createdTableInfo); logger.info({ sql: dmp.s }, `Creating table ${fullNameQuoted}`); - await driver.script(pool, dmp.s); - structure = await driver.analyseSingleTable(pool, name); + await driver.script(dbhan, dmp.s); + structure = await driver.analyseSingleTable(dbhan, name); } if (options.truncate) { - await driver.script(pool, `TRUNCATE TABLE ${fullNameQuoted}`); + await driver.script(dbhan, `TRUNCATE TABLE ${fullNameQuoted}`); } writable.columnNames = _intersection( @@ -74,7 +74,7 @@ export function createBulkInsertStreamBase(driver: EngineDriver, stream, pool, n dmp.putRaw(';'); // require('fs').writeFileSync('/home/jena/test.sql', dmp.s); // console.log(dmp.s); - await driver.query(pool, dmp.s, { discardResult: true }); + await driver.query(dbhan, dmp.s, { discardResult: true }); } else { for (const row of rows) { const dmp = driver.createDumper(); @@ -85,13 +85,13 @@ export function createBulkInsertStreamBase(driver: EngineDriver, stream, pool, n dmp.putRaw('('); dmp.putCollection(',', writable.columnNames, col => dmp.putValue(row[col as string])); dmp.putRaw(')'); - await driver.query(pool, dmp.s, { discardResult: true }); + await driver.query(dbhan, dmp.s, { discardResult: true }); } } if (options.commitAfterInsert) { const dmp = driver.createDumper(); dmp.commitTransaction(); - await driver.query(pool, dmp.s, { discardResult: true }); + await driver.query(dbhan, dmp.s, { discardResult: true }); } }; diff --git a/packages/types/engines.d.ts b/packages/types/engines.d.ts index de6e604c0..f100858b7 100644 --- a/packages/types/engines.d.ts +++ b/packages/types/engines.d.ts @@ -130,6 +130,15 @@ export interface FilterBehaviourProvider { getFilterBehaviour(dataType: string, standardFilterBehaviours: { [id: string]: FilterBehaviour }): FilterBehaviour; } +export interface DatabaseHandle { + client: any; + database?: string; + feedback?: (message: any) => void; + getDatabase?: () => any; + connectionType?: string; + treeKeySeparator?: string; +} + export interface EngineDriver extends FilterBehaviourProvider { engine: string; title: string; @@ -171,52 +180,52 @@ export interface EngineDriver extends FilterBehaviourProvider { defaultSocketPath?: string; authTypeLabel?: string; importExportArgs?: any[]; - connect({ server, port, user, password, database }): Promise; - close(pool): Promise; - query(pool: any, sql: string, options?: QueryOptions): Promise; - stream(pool: any, sql: string, options: StreamOptions); - readQuery(pool: any, sql: string, structure?: TableInfo): Promise; - readJsonQuery(pool: any, query: any, structure?: TableInfo): Promise; - writeTable(pool: any, name: NamedObjectInfo, options: WriteTableOptions): Promise; + connect({ server, port, user, password, database }): Promise; + close(dbhan: DatabaseHandle): Promise; + query(dbhan: DatabaseHandle, sql: string, options?: QueryOptions): Promise; + stream(dbhan: DatabaseHandle, sql: string, options: StreamOptions); + readQuery(dbhan: DatabaseHandle, sql: string, structure?: TableInfo): Promise; + readJsonQuery(dbhan: DatabaseHandle, query: any, structure?: TableInfo): Promise; + writeTable(dbhan: DatabaseHandle, name: NamedObjectInfo, options: WriteTableOptions): Promise; analyseSingleObject( - pool: any, + dbhan: DatabaseHandle, name: NamedObjectInfo, objectTypeField: keyof DatabaseInfo ): Promise; - analyseSingleTable(pool: any, name: NamedObjectInfo): Promise; - getVersion(pool: any): Promise<{ version: string }>; - listDatabases(pool: any): Promise< + analyseSingleTable(dbhan: DatabaseHandle, name: NamedObjectInfo): Promise; + getVersion(dbhan: DatabaseHandle): Promise<{ version: string }>; + listDatabases(dbhan: DatabaseHandle): Promise< { name: string; }[] >; - loadKeys(pool, root: string, filter?: string): Promise; - exportKeys(pool, options: {}): Promise; - loadKeyInfo(pool, key): Promise; - loadKeyTableRange(pool, key, cursor, count): Promise; - loadFieldValues(pool: any, name: NamedObjectInfo, field: string, search: string): Promise; - analyseFull(pool: any, serverVersion): Promise; - analyseIncremental(pool: any, structure: DatabaseInfo, serverVersion): Promise; + loadKeys(dbhan: DatabaseHandle, root: string, filter?: string): Promise; + exportKeys(dbhan: DatabaseHandle, options: {}): Promise; + loadKeyInfo(dbhan: DatabaseHandle, key): Promise; + loadKeyTableRange(dbhan: DatabaseHandle, key, cursor, count): Promise; + loadFieldValues(dbhan: DatabaseHandle, name: NamedObjectInfo, field: string, search: string): Promise; + analyseFull(dbhan: DatabaseHandle, serverVersion): Promise; + analyseIncremental(dbhan: DatabaseHandle, structure: DatabaseInfo, serverVersion): Promise; dialect: SqlDialect; dialectByVersion(version): SqlDialect; createDumper(options = null): SqlDumper; - createBackupDumper(pool: any, options): Promise; + createBackupDumper(dbhan: DatabaseHandle, options): Promise; getAuthTypes(): EngineAuthType[]; - readCollection(pool: any, options: ReadCollectionOptions): Promise; - updateCollection(pool: any, changeSet: any): Promise; + readCollection(dbhan: DatabaseHandle, options: ReadCollectionOptions): Promise; + updateCollection(dbhan: DatabaseHandle, changeSet: any): Promise; getCollectionUpdateScript(changeSet: any, collectionInfo: CollectionInfo): string; - createDatabase(pool: any, name: string): Promise; - dropDatabase(pool: any, name: string): Promise; + createDatabase(dbhan: DatabaseHandle, name: string): Promise; + dropDatabase(dbhan: DatabaseHandle, name: string): Promise; getQuerySplitterOptions(usage: 'stream' | 'script' | 'editor'): any; - script(pool: any, sql: string, options?: RunScriptOptions): Promise; - operation(pool: any, operation: {}, options?: RunScriptOptions): Promise; + script(dbhan: DatabaseHandle, sql: string, options?: RunScriptOptions): Promise; + operation(dbhan: DatabaseHandle, operation: {}, options?: RunScriptOptions): Promise; getNewObjectTemplates(): NewObjectTemplate[]; - // direct call of pool method, only some methods could be supported, on only some drivers - callMethod(pool, method, args); - serverSummary(pool): Promise; - summaryCommand(pool, command, row): Promise; - startProfiler(pool, options): Promise; - stopProfiler(pool, profiler): Promise; + // direct call of dbhan.client method, only some methods could be supported, on only some drivers + callMethod(dbhan: DatabaseHandle, method, args); + serverSummary(dbhan: DatabaseHandle): Promise; + summaryCommand(dbhan: DatabaseHandle, command, row): Promise; + startProfiler(dbhan: DatabaseHandle, options): Promise; + stopProfiler(dbhan: DatabaseHandle, profiler): Promise; getRedirectAuthUrl(connection, options): Promise<{ url: string; sid: string }>; getAuthTokenFromCode(connection, options): Promise; getAccessTokenFromAuth(connection, req): Promise; @@ -231,7 +240,7 @@ export interface EngineDriver extends FilterBehaviourProvider { ): any[]; // adapts table info from different source (import, other database) to be suitable for this database adaptTableInfo(table: TableInfo): TableInfo; - async listSchemas(pool): SchemaInfo[]; + listSchemas(dbhan: DatabaseHandle): SchemaInfo[]; analyserClass?: any; dumperClass?: any; diff --git a/plugins/dbgate-plugin-clickhouse/src/backend/Analyser.js b/plugins/dbgate-plugin-clickhouse/src/backend/Analyser.js index fa176fb60..ea34369e1 100644 --- a/plugins/dbgate-plugin-clickhouse/src/backend/Analyser.js +++ b/plugins/dbgate-plugin-clickhouse/src/backend/Analyser.js @@ -24,7 +24,7 @@ class Analyser extends DatabaseAnalyser { createQuery(resFileName, typeFields, replacements = {}) { let res = sql[resFileName]; - res = res.replace('#DATABASE#', this.pool.__dbgate_database_name__); + res = res.replace('#DATABASE#', this.dbhan.database); return super.createQuery(res, typeFields, replacements); } @@ -82,8 +82,8 @@ class Analyser extends DatabaseAnalyser { async _computeSingleObjectId() { const { pureName } = this.singleObjectFilter; const resId = await this.driver.query( - this.pool, - `SELECT uuid as id FROM system.tables WHERE database = '${this.pool.__dbgate_database_name__}' AND name='${pureName}'` + this.dbhan, + `SELECT uuid as id FROM system.tables WHERE database = '${this.dbhan.database}' AND name='${pureName}'` ); this.singleObjectId = resId.rows[0]?.id; } diff --git a/plugins/dbgate-plugin-clickhouse/src/backend/createBulkInsertStream.js b/plugins/dbgate-plugin-clickhouse/src/backend/createBulkInsertStream.js index 7627febf9..c4e098c82 100644 --- a/plugins/dbgate-plugin-clickhouse/src/backend/createBulkInsertStream.js +++ b/plugins/dbgate-plugin-clickhouse/src/backend/createBulkInsertStream.js @@ -5,11 +5,11 @@ const _ = require('lodash'); * * @param {import('dbgate-types').EngineDriver} driver */ -function createOracleBulkInsertStream(driver, stream, pool, name, options) { - const writable = createBulkInsertStreamBase(driver, stream, pool, name, options); +function createOracleBulkInsertStream(driver, stream, dbhan, name, options) { + const writable = createBulkInsertStreamBase(driver, stream, dbhan, name, options); writable.send = async () => { - await pool.insert({ + await dbhan.client.insert({ table: name.pureName, values: writable.buffer, format: 'JSONEachRow', diff --git a/plugins/dbgate-plugin-clickhouse/src/backend/driver.js b/plugins/dbgate-plugin-clickhouse/src/backend/driver.js index 26dbc1d72..827a1fe54 100644 --- a/plugins/dbgate-plugin-clickhouse/src/backend/driver.js +++ b/plugins/dbgate-plugin-clickhouse/src/backend/driver.js @@ -5,7 +5,6 @@ const Analyser = require('./Analyser'); const { createClient } = require('@clickhouse/client'); const createBulkInsertStream = require('./createBulkInsertStream'); - /** @type {import('dbgate-types').EngineDriver} */ const driver = { ...driverBase, @@ -19,13 +18,15 @@ const driver = { database, }); - client.__dbgate_database_name__ = database; - return client; + return { + client, + database, + }; }, // called for retrieve data (eg. browse in data grid) and for update database - async query(client, query, options) { + async query(dbhan, query, options) { if (options?.discardResult) { - await client.command({ + await dbhan.client.command({ query, }); return { @@ -33,7 +34,7 @@ const driver = { columns: [], }; } else { - const resultSet = await client.query({ + const resultSet = await dbhan.client.query({ query, format: 'JSONCompactEachRowWithNamesAndTypes', }); @@ -58,10 +59,10 @@ const driver = { } }, // called in query console - async stream(client, query, options) { + async stream(dbhan, query, options) { try { if (!query.match(/^\s*SELECT/i)) { - const resp = await client.command({ + const resp = await dbhan.client.command({ query, }); // console.log('RESP', resp); @@ -77,7 +78,7 @@ const driver = { return; } - const resultSet = await client.query({ + const resultSet = await dbhan.client.query({ query, format: 'JSONCompactEachRowWithNamesAndTypes', }); @@ -139,13 +140,13 @@ const driver = { } }, // called when exporting table or view - async readQuery(client, query, structure) { + async readQuery(dbhan, query, structure) { const pass = new stream.PassThrough({ objectMode: true, highWaterMark: 100, }); - const resultSet = await client.query({ + const resultSet = await dbhan.client.query({ query, format: 'JSONCompactEachRowWithNamesAndTypes', }); @@ -191,12 +192,12 @@ const driver = { return pass; }, - async writeTable(pool, name, options) { - return createBulkInsertStream(this, stream, pool, name, options); + async writeTable(dbhan, name, options) { + return createBulkInsertStream(this, stream, dbhan, name, options); }, // detect server version - async getVersion(client) { - const resultSet = await client.query({ + async getVersion(dbhan) { + const resultSet = await dbhan.client.query({ query: 'SELECT version() as version', format: 'JSONEachRow', }); @@ -204,8 +205,8 @@ const driver = { return { version: dataset[0].version }; }, // list databases on server - async listDatabases(client) { - const resultSet = await client.query({ + async listDatabases(dbhan) { + const resultSet = await dbhan.client.query({ query: `SELECT name FROM system.databases WHERE name NOT IN ('system', 'information_schema', 'information_schema_ro', 'INFORMATION_SCHEMA')`, @@ -215,8 +216,8 @@ const driver = { return dataset; }, - async close(client) { - return client.close(); + async close(dbhan) { + return dbhan.client.close(); }, }; diff --git a/plugins/dbgate-plugin-mongo/src/backend/Analyser.js b/plugins/dbgate-plugin-mongo/src/backend/Analyser.js index 0e4a80fb7..3d3c6e598 100644 --- a/plugins/dbgate-plugin-mongo/src/backend/Analyser.js +++ b/plugins/dbgate-plugin-mongo/src/backend/Analyser.js @@ -1,12 +1,12 @@ const { DatabaseAnalyser } = global.DBGATE_PACKAGES['dbgate-tools']; class Analyser extends DatabaseAnalyser { - constructor(pool, driver, version) { - super(pool, driver, version); + constructor(dbhan, driver, version) { + super(dbhan, driver, version); } async _runAnalysis() { - const collectionsAndViews = await this.pool.__getDatabase().listCollections().toArray(); + const collectionsAndViews = await this.dbhan.getDatabase().listCollections().toArray(); const collections = collectionsAndViews.filter((x) => x.type == 'collection'); const views = collectionsAndViews.filter((x) => x.type == 'view'); @@ -16,8 +16,8 @@ class Analyser extends DatabaseAnalyser { collections .filter((x) => x.type == 'collection') .map((x) => - this.pool - .__getDatabase() + this.dbhan + .getDatabase() .collection(x.name) .aggregate([{ $collStats: { count: {} } }]) .toArray() diff --git a/plugins/dbgate-plugin-mongo/src/backend/createBulkInsertStream.js b/plugins/dbgate-plugin-mongo/src/backend/createBulkInsertStream.js index 2259873cf..153f3f704 100644 --- a/plugins/dbgate-plugin-mongo/src/backend/createBulkInsertStream.js +++ b/plugins/dbgate-plugin-mongo/src/backend/createBulkInsertStream.js @@ -5,9 +5,9 @@ const { EJSON } = require('bson'); const logger = getLogger('mongoBulkInsert'); -function createBulkInsertStream(driver, stream, pool, name, options) { +function createBulkInsertStream(driver, stream, dbhan, name, options) { const collectionName = name.pureName; - const db = pool.__getDatabase(); + const db = dbhan.getDatabase(); const writable = new stream.Writable({ objectMode: true, diff --git a/plugins/dbgate-plugin-mongo/src/backend/driver.js b/plugins/dbgate-plugin-mongo/src/backend/driver.js index 5c8385b21..4a795bc42 100644 --- a/plugins/dbgate-plugin-mongo/src/backend/driver.js +++ b/plugins/dbgate-plugin-mongo/src/backend/driver.js @@ -34,8 +34,8 @@ function findArrayResult(resValue) { return null; } -async function getScriptableDb(pool) { - const db = pool.__getDatabase(); +async function getScriptableDb(dbhan) { + const db = dbhan.getDatabase(); const collections = await db.listCollections().toArray(); for (const collection of collections) { _.set(db, collection.name, db.collection(collection.name)); @@ -77,41 +77,43 @@ const driver = { options.tlsInsecure = !ssl.rejectUnauthorized; } - const pool = new MongoClient(mongoUrl, options); - await pool.connect(); - // const pool = await MongoClient.connect(mongoUrl); - pool.__getDatabase = database ? () => pool.db(database) : () => pool.db(); - return pool; + const client = new MongoClient(mongoUrl, options); + await client.connect(); + return { + client, + database, + getDatabase: database ? () => client.db(database) : () => client.db(), + }; }, // @ts-ignore - async query(pool, sql) { + async query(dbhan, sql) { return { rows: [], columns: [], }; }, - async script(pool, sql) { + async script(dbhan, sql) { let func; func = eval(`(db,ObjectId) => ${sql}`); - const db = await getScriptableDb(pool); + const db = await getScriptableDb(dbhan); const res = func(db, ObjectId.createFromHexString); if (isPromise(res)) await res; }, - async operation(pool, operation, options) { + async operation(dbhan, operation, options) { const { type } = operation; switch (type) { case 'createCollection': - await this.script(pool, `db.createCollection('${operation.collection.name}')`); + await this.script(dbhan, `db.createCollection('${operation.collection.name}')`); break; case 'dropCollection': - await this.script(pool, `db.dropCollection('${operation.collection}')`); + await this.script(dbhan, `db.dropCollection('${operation.collection}')`); break; case 'renameCollection': - await this.script(pool, `db.renameCollection('${operation.collection}', '${operation.newName}')`); + await this.script(dbhan, `db.renameCollection('${operation.collection}', '${operation.newName}')`); break; case 'cloneCollection': await this.script( - pool, + dbhan, `db.collection('${operation.collection}').aggregate([{$out: '${operation.newName}'}]).toArray()` ); break; @@ -120,7 +122,7 @@ const driver = { } // saveScriptToDatabase({ conid: connection._id, database: name }, `db.createCollection('${newCollection}')`); }, - async stream(pool, sql, options) { + async stream(dbhan, sql, options) { let func; try { func = eval(`(db,ObjectId) => ${sql}`); @@ -133,7 +135,7 @@ const driver = { options.done(); return; } - const db = await getScriptableDb(pool); + const db = await getScriptableDb(dbhan); let exprValue; try { @@ -191,8 +193,8 @@ const driver = { options.done(); }, - async startProfiler(pool, options) { - const db = await getScriptableDb(pool); + async startProfiler(dbhan, options) { + const db = await getScriptableDb(dbhan); const old = await db.command({ profile: -1 }); await db.command({ profile: 2 }); const cursor = await db.collection('system.profile').find({ @@ -229,12 +231,12 @@ const driver = { old, }; }, - async stopProfiler(pool, { cursor, old }) { + async stopProfiler(dbhan, { cursor, old }) { cursor.close(); - const db = await getScriptableDb(pool); + const db = await getScriptableDb(dbhan); await db.command({ profile: old.was, slowms: old.slowms }); }, - async readQuery(pool, sql, structure) { + async readQuery(dbhan, sql, structure) { try { const json = JSON.parse(sql); if (json && json.pureName) { @@ -250,7 +252,7 @@ const driver = { // }); func = eval(`(db,ObjectId) => ${sql}`); - const db = await getScriptableDb(pool); + const db = await getScriptableDb(dbhan); exprValue = func(db, ObjectId.createFromHexString); const pass = new stream.PassThrough({ @@ -277,27 +279,27 @@ const driver = { // return pass; }, - async writeTable(pool, name, options) { - return createBulkInsertStream(this, stream, pool, name, options); + async writeTable(dbhan, name, options) { + return createBulkInsertStream(this, stream, dbhan, name, options); }, - async getVersion(pool) { - const status = await pool.__getDatabase().admin().serverInfo(); + async getVersion(dbhan) { + const status = await dbhan.getDatabase().admin().serverInfo(); return { ...status, versionText: `MongoDB ${status.version}`, }; }, - async listDatabases(pool) { - const res = await pool.__getDatabase().admin().listDatabases(); + async listDatabases(dbhan) { + const res = await dbhan.getDatabase().admin().listDatabases(); return res.databases; }, - async readCollection(pool, options) { + async readCollection(dbhan, options) { try { const mongoCondition = convertToMongoCondition(options.condition); // console.log('******************* mongoCondition *****************'); // console.log(JSON.stringify(mongoCondition, undefined, 2)); - const collection = pool.__getDatabase().collection(options.pureName); + const collection = dbhan.getDatabase().collection(options.pureName); if (options.countDocuments) { const count = await collection.countDocuments(convertObjectId(mongoCondition) || {}); return { count }; @@ -325,7 +327,7 @@ const driver = { return { errorMessage: err.message }; } }, - async updateCollection(pool, changeSet) { + async updateCollection(dbhan, changeSet) { const res = { inserted: [], updated: [], @@ -333,7 +335,7 @@ const driver = { replaced: [], }; try { - const db = pool.__getDatabase(); + const db = dbhan.getDatabase(); for (const insert of changeSet.inserts) { const collection = db.collection(insert.pureName); const document = { @@ -383,19 +385,19 @@ const driver = { } }, - async createDatabase(pool, name) { - const db = pool.db(name); + async createDatabase(dbhan, name) { + const db = dbhan.client.db(name); await db.createCollection('collection1'); }, - async dropDatabase(pool, name) { - const db = pool.db(name); + async dropDatabase(dbhan, name) { + const db = dbhan.client.db(name); await db.dropDatabase(); }, - async loadFieldValues(pool, name, field, search) { + async loadFieldValues(dbhan, name, field, search) { try { - const collection = pool.__getDatabase().collection(name.pureName); + const collection = dbhan.getDatabase().collection(name.pureName); // console.log('options.condition', JSON.stringify(options.condition, undefined, 2)); const pipelineMatch = []; @@ -441,10 +443,10 @@ const driver = { } }, - readJsonQuery(pool, select, structure) { + readJsonQuery(dbhan, select, structure) { const { collection, condition, sort } = select; - const db = pool.__getDatabase(); + const db = dbhan.getDatabase(); const res = db .collection(collection) .find(condition || {}) @@ -454,23 +456,23 @@ const driver = { return res; }, - async summaryCommand(pool, command, row) { + async summaryCommand(dbhan, command, row) { switch (command) { case 'profileOff': - await pool.db(row.name).command({ profile: 0 }); + await dbhan.client.db(row.name).command({ profile: 0 }); return; case 'profileFiltered': - await pool.db(row.name).command({ profile: 1, slowms: 100 }); + await dbhan.client.db(row.name).command({ profile: 1, slowms: 100 }); return; case 'profileAll': - await pool.db(row.name).command({ profile: 2 }); + await dbhan.client.db(row.name).command({ profile: 2 }); return; } }, - async serverSummary(pool) { - const res = await pool.__getDatabase().admin().listDatabases(); - const profiling = await Promise.all(res.databases.map((x) => pool.db(x.name).command({ profile: -1 }))); + async serverSummary(dbhan) { + const res = await dbhan.getDatabase().admin().listDatabases(); + const profiling = await Promise.all(res.databases.map((x) => dbhan.client.db(x.name).command({ profile: -1 }))); function formatProfiling(info) { switch (info.was) { diff --git a/plugins/dbgate-plugin-mssql/src/backend/MsSqlAnalyser.js b/plugins/dbgate-plugin-mssql/src/backend/MsSqlAnalyser.js index 592bef7b5..3ecf458aa 100644 --- a/plugins/dbgate-plugin-mssql/src/backend/MsSqlAnalyser.js +++ b/plugins/dbgate-plugin-mssql/src/backend/MsSqlAnalyser.js @@ -63,8 +63,8 @@ function getColumnInfo({ } class MsSqlAnalyser extends DatabaseAnalyser { - constructor(pool, driver, version) { - super(pool, driver, version); + constructor(dbhan, driver, version) { + super(dbhan, driver, version); } createQuery(resFileName, typeFields) { @@ -75,7 +75,7 @@ class MsSqlAnalyser extends DatabaseAnalyser { async _computeSingleObjectId() { const { schemaName, pureName, typeField } = this.singleObjectFilter; const fullName = schemaName ? `[${schemaName}].[${pureName}]` : pureName; - const resId = await this.driver.query(this.pool, `SELECT OBJECT_ID('${fullName}') AS id`); + const resId = await this.driver.query(this.dbhan, `SELECT OBJECT_ID('${fullName}') AS id`); this.singleObjectId = resId.rows[0].id; } diff --git a/plugins/dbgate-plugin-mssql/src/backend/createNativeBulkInsertStream.js b/plugins/dbgate-plugin-mssql/src/backend/createNativeBulkInsertStream.js index 093c05881..5a60f0ae3 100644 --- a/plugins/dbgate-plugin-mssql/src/backend/createNativeBulkInsertStream.js +++ b/plugins/dbgate-plugin-mssql/src/backend/createNativeBulkInsertStream.js @@ -1,8 +1,8 @@ const { createBulkInsertStreamBase } = global.DBGATE_PACKAGES['dbgate-tools']; -function runBulkInsertBatch(pool, tableName, writable, rows) { +function runBulkInsertBatch(dbhan, tableName, writable, rows) { return new Promise((resolve, reject) => { - const tableMgr = pool.tableMgr(); + const tableMgr = dbhan.client.tableMgr(); tableMgr.bind(tableName, bulkMgr => { bulkMgr.insertRows(rows, err => { if (err) reject(err); @@ -16,8 +16,8 @@ function runBulkInsertBatch(pool, tableName, writable, rows) { * * @param {import('dbgate-types').EngineDriver} driver */ -function createNativeBulkInsertStream(driver, stream, pool, name, options) { - const writable = createBulkInsertStreamBase(driver, stream, pool, name, options); +function createNativeBulkInsertStream(driver, stream, dbhan, name, options) { + const writable = createBulkInsertStreamBase(driver, stream, dbhan, name, options); const fullName = name.schemaName ? `[${name.schemaName}].[${name.pureName}]` : name.pureName; @@ -25,7 +25,7 @@ function createNativeBulkInsertStream(driver, stream, pool, name, options) { const rows = writable.buffer; writable.buffer = []; - await runBulkInsertBatch(pool, fullName, writable, rows); + await runBulkInsertBatch(dbhan, fullName, writable, rows); }; return writable; diff --git a/plugins/dbgate-plugin-mssql/src/backend/createTediousBulkInsertStream.js b/plugins/dbgate-plugin-mssql/src/backend/createTediousBulkInsertStream.js index 2c2c057b0..878579262 100644 --- a/plugins/dbgate-plugin-mssql/src/backend/createTediousBulkInsertStream.js +++ b/plugins/dbgate-plugin-mssql/src/backend/createTediousBulkInsertStream.js @@ -3,12 +3,12 @@ const tedious = require('tedious'); const getConcreteType = require('./getConcreteType'); const _ = require('lodash'); -function runBulkInsertBatch(pool, tableName, writable, rows) { +function runBulkInsertBatch(dbhan, tableName, writable, rows) { return new Promise((resolve, reject) => { var options = { keepNulls: true }; // instantiate - provide the table where you'll be inserting to, options and a callback - var bulkLoad = pool.newBulkLoad(tableName, options, (error, rowCount) => { + var bulkLoad = dbhan.client.newBulkLoad(tableName, options, (error, rowCount) => { if (error) reject(error); else resolve(); }); @@ -40,7 +40,7 @@ function runBulkInsertBatch(pool, tableName, writable, rows) { ); // console.log('IMPORT ROWS', rowsMapped); - pool.execBulkLoad(bulkLoad, rowsMapped); + dbhan.client.execBulkLoad(bulkLoad, rowsMapped); }); } @@ -48,8 +48,8 @@ function runBulkInsertBatch(pool, tableName, writable, rows) { * * @param {import('dbgate-types').EngineDriver} driver */ -function createTediousBulkInsertStream(driver, stream, pool, name, options) { - const writable = createBulkInsertStreamBase(driver, stream, pool, name, options); +function createTediousBulkInsertStream(driver, stream, dbhan, name, options) { + const writable = createBulkInsertStreamBase(driver, stream, dbhan, name, options); const fullName = name.schemaName ? `[${name.schemaName}].[${name.pureName}]` : name.pureName; @@ -59,7 +59,7 @@ function createTediousBulkInsertStream(driver, stream, pool, name, options) { ? `${driver.dialect.quoteIdentifier(name.schemaName)}.${driver.dialect.quoteIdentifier(name.pureName)}` : driver.dialect.quoteIdentifier(name.pureName); - const respTemplate = await driver.query(pool, `SELECT * FROM ${fullNameQuoted} WHERE 1=0`, { + const respTemplate = await driver.query(dbhan, `SELECT * FROM ${fullNameQuoted} WHERE 1=0`, { addDriverNativeColumn: true, }); writable.templateColumns = respTemplate.columns; @@ -68,7 +68,7 @@ function createTediousBulkInsertStream(driver, stream, pool, name, options) { const rows = writable.buffer; writable.buffer = []; - await runBulkInsertBatch(pool, fullName, writable, rows); + await runBulkInsertBatch(dbhan, fullName, writable, rows); }; return writable; diff --git a/plugins/dbgate-plugin-mssql/src/backend/driver.js b/plugins/dbgate-plugin-mssql/src/backend/driver.js index 335789fe6..565d193a9 100644 --- a/plugins/dbgate-plugin-mssql/src/backend/driver.js +++ b/plugins/dbgate-plugin-mssql/src/backend/driver.js @@ -79,55 +79,53 @@ const driver = { async connect(conn) { const { authType } = conn; - const result = - requireMsnodesqlv8 && (authType == 'sspi' || authType == 'sql') - ? await nativeConnect(conn) - : await tediousConnect(conn); + const connectionType = requireMsnodesqlv8 && (authType == 'sspi' || authType == 'sql') ? 'msnodesqlv8' : 'tedious'; + const client = connectionType == 'msnodesqlv8' ? await nativeConnect(conn) : await tediousConnect(conn); - if (result) { - result.__dbgate_database_name__ = conn.database; - } - - return result; + return { + client, + connectionType, + database: conn.database, + }; }, - async close(pool) { - return pool.close(); + async close(dbhan) { + return dbhan.client.close(); }, - async queryCore(pool, sql, options) { - if (pool._connectionType == 'msnodesqlv8') { - return nativeQueryCore(pool, sql, options); + async queryCore(dbhan, sql, options) { + if (dbhan.connectionType == 'msnodesqlv8') { + return nativeQueryCore(dbhan, sql, options); } else { - return tediousQueryCore(pool, sql, options); + return tediousQueryCore(dbhan, sql, options); } }, - async query(pool, sql, options) { + async query(dbhan, sql, options) { return lock.acquire('connection', async () => { - return this.queryCore(pool, sql, options); + return this.queryCore(dbhan, sql, options); }); }, - async stream(pool, sql, options) { - if (pool._connectionType == 'msnodesqlv8') { - return nativeStream(pool, sql, options); + async stream(dbhan, sql, options) { + if (dbhan.connectionType == 'msnodesqlv8') { + return nativeStream(dbhan, sql, options); } else { - return tediousStream(pool, sql, options); + return tediousStream(dbhan, sql, options); } }, - async readQuery(pool, sql, structure) { - if (pool._connectionType == 'msnodesqlv8') { - return nativeReadQuery(pool, sql, structure); + async readQuery(dbhan, sql, structure) { + if (dbhan.connectionType == 'msnodesqlv8') { + return nativeReadQuery(dbhan, sql, structure); } else { - return tediousReadQuery(pool, sql, structure); + return tediousReadQuery(dbhan, sql, structure); } }, - async writeTable(pool, name, options) { - if (pool._connectionType == 'msnodesqlv8') { - return createNativeBulkInsertStream(this, stream, pool, name, options); + async writeTable(dbhan, name, options) { + if (dbhan.connectionType == 'msnodesqlv8') { + return createNativeBulkInsertStream(this, stream, dbhan, name, options); } else { - return createTediousBulkInsertStream(this, stream, pool, name, options); + return createTediousBulkInsertStream(this, stream, dbhan, name, options); } }, - async getVersion(pool) { - const res = (await this.query(pool, versionQuery)).rows[0]; + async getVersion(dbhan) { + const res = (await this.query(dbhan, versionQuery)).rows[0]; if (res.productVersion) { const splitted = res.productVersion.split('.'); @@ -138,8 +136,8 @@ const driver = { } return res; }, - async listDatabases(pool) { - const { rows } = await this.query(pool, 'SELECT name FROM sys.databases order by name'); + async listDatabases(dbhan) { + const { rows } = await this.query(dbhan, 'SELECT name FROM sys.databases order by name'); return rows; }, getRedirectAuthUrl(connection, options) { @@ -155,10 +153,10 @@ const driver = { getAccessTokenFromAuth: (connection, req) => { return req?.user?.msentraToken; }, - async listSchemas(pool) { - const { rows } = await this.query(pool, 'select schema_id as objectId, name as schemaName from sys.schemas'); + async listSchemas(dbhan) { + const { rows } = await this.query(dbhan, 'select schema_id as objectId, name as schemaName from sys.schemas'); - const defaultSchemaRows = await this.query(pool, 'SELECT SCHEMA_NAME() as name'); + const defaultSchemaRows = await this.query(dbhan, 'SELECT SCHEMA_NAME() as name'); const defaultSchema = defaultSchemaRows.rows[0]?.name; return rows.map(x => ({ diff --git a/plugins/dbgate-plugin-mssql/src/backend/nativeDriver.js b/plugins/dbgate-plugin-mssql/src/backend/nativeDriver.js index 5c2de8d23..dad4928af 100644 --- a/plugins/dbgate-plugin-mssql/src/backend/nativeDriver.js +++ b/plugins/dbgate-plugin-mssql/src/backend/nativeDriver.js @@ -63,7 +63,6 @@ async function connectWithDriver({ server, port, user, password, database, authT if (err) { reject(err); } else { - conn._connectionType = 'msnodesqlv8'; resolve(conn); } }); @@ -88,7 +87,7 @@ async function nativeConnect(connection) { } } -async function nativeQueryCore(pool, sql, options) { +async function nativeQueryCore(dbhan, sql, options) { if (sql == null) { return Promise.resolve({ rows: [], @@ -98,7 +97,7 @@ async function nativeQueryCore(pool, sql, options) { return new Promise((resolve, reject) => { let columns = null; let currentRow = null; - const q = pool.query(sql); + const q = dbhan.client.query(sql); const rows = []; q.on('meta', meta => { @@ -128,7 +127,7 @@ async function nativeQueryCore(pool, sql, options) { }); } -async function nativeReadQuery(pool, sql, structure) { +async function nativeReadQuery(dbhan, sql, structure) { const pass = new stream.PassThrough({ objectMode: true, highWaterMark: 100, @@ -136,7 +135,7 @@ async function nativeReadQuery(pool, sql, structure) { let columns = null; let currentRow = null; - const q = pool.query(sql); + const q = dbhan.client.query(sql); q.on('meta', meta => { columns = extractNativeColumns(meta); @@ -168,7 +167,7 @@ async function nativeReadQuery(pool, sql, structure) { return pass; } -async function nativeStream(pool, sql, options) { +async function nativeStream(dbhan, sql, options) { const handleInfo = info => { const { message, lineNumber, procName } = info; options.info({ @@ -192,7 +191,7 @@ async function nativeStream(pool, sql, options) { let columns = null; let currentRow = null; - const q = pool.query(sql); + const q = dbhan.client.query(sql); q.on('meta', meta => { if (currentRow) options.row(currentRow); diff --git a/plugins/dbgate-plugin-mssql/src/backend/tediousDriver.js b/plugins/dbgate-plugin-mssql/src/backend/tediousDriver.js index aff9291fb..028aa0304 100644 --- a/plugins/dbgate-plugin-mssql/src/backend/tediousDriver.js +++ b/plugins/dbgate-plugin-mssql/src/backend/tediousDriver.js @@ -68,14 +68,13 @@ async function tediousConnect(storedConnection) { if (err) { reject(err); } - connection._connectionType = 'tedious'; resolve(connection); }); connection.connect(); }); } -async function tediousQueryCore(pool, sql, options) { +async function tediousQueryCore(dbhan, sql, options) { if (sql == null) { return Promise.resolve({ rows: [], @@ -103,12 +102,12 @@ async function tediousQueryCore(pool, sql, options) { ) ); }); - if (discardResult) pool.execSqlBatch(request); - else pool.execSql(request); + if (discardResult) dbhan.client.execSqlBatch(request); + else dbhan.client.execSql(request); }); } -async function tediousReadQuery(pool, sql, structure) { +async function tediousReadQuery(dbhan, sql, structure) { const pass = new stream.PassThrough({ objectMode: true, highWaterMark: 100, @@ -133,12 +132,12 @@ async function tediousReadQuery(pool, sql, structure) { ); pass.write(row); }); - pool.execSql(request); + dbhan.client.execSql(request); return pass; } -async function tediousStream(pool, sql, options) { +async function tediousStream(dbhan, sql, options) { let currentColumns = []; const handleInfo = info => { @@ -188,7 +187,7 @@ async function tediousStream(pool, sql, options) { ); options.row(row); }); - pool.execSqlBatch(request); + dbhan.client.execSqlBatch(request); } module.exports = { diff --git a/plugins/dbgate-plugin-mysql/src/backend/Analyser.js b/plugins/dbgate-plugin-mysql/src/backend/Analyser.js index 676687c2e..fb18cc683 100644 --- a/plugins/dbgate-plugin-mysql/src/backend/Analyser.js +++ b/plugins/dbgate-plugin-mysql/src/backend/Analyser.js @@ -62,13 +62,13 @@ function getColumnInfo( } class Analyser extends DatabaseAnalyser { - constructor(pool, driver, version) { - super(pool, driver, version); + constructor(dbhan, driver, version) { + super(dbhan, driver, version); } createQuery(resFileName, typeFields, replacements = {}) { let res = sql[resFileName]; - res = res.replace('#DATABASE#', this.pool.__dbgate_database_name__); + res = res.replace('#DATABASE#', this.dbhan.database); return super.createQuery(res, typeFields, replacements); } diff --git a/plugins/dbgate-plugin-mysql/src/backend/drivers.js b/plugins/dbgate-plugin-mysql/src/backend/drivers.js index 20c001be1..7aee2984d 100644 --- a/plugins/dbgate-plugin-mysql/src/backend/drivers.js +++ b/plugins/dbgate-plugin-mysql/src/backend/drivers.js @@ -48,17 +48,19 @@ const drivers = driverBases.map(driverBase => ({ // multipleStatements: true, }; - const connection = mysql2.createConnection(options); - connection.__dbgate_database_name__ = database; + const client = mysql2.createConnection(options); if (isReadOnly) { - await this.query(connection, 'SET SESSION TRANSACTION READ ONLY'); + await this.query(client, 'SET SESSION TRANSACTION READ ONLY'); } - return connection; + return { + client, + database, + }; }, - async close(pool) { - return pool.close(); + async close(dbhan) { + return dbhan.client.close(); }, - query(connection, sql) { + query(dbhan, sql) { if (sql == null) { return { rows: [], @@ -67,15 +69,15 @@ const drivers = driverBases.map(driverBase => ({ } return new Promise((resolve, reject) => { - connection.query(sql, function (error, results, fields) { + dbhan.client.query(sql, function (error, results, fields) { if (error) reject(error); const columns = extractColumns(fields); resolve({ rows: results && columns && results.map && results.map(row => zipDataRow(row, columns)), columns }); }); }); }, - async stream(connection, sql, options) { - const query = connection.query(sql); + async stream(dbhan, sql, options) { + const query = dbhan.client.query(sql); let columns = []; // const handleInfo = (info) => { @@ -125,8 +127,8 @@ const drivers = driverBases.map(driverBase => ({ query.on('error', handleError).on('fields', handleFields).on('result', handleRow).on('end', handleEnd); }, - async readQuery(connection, sql, structure) { - const query = connection.query(sql); + async readQuery(dbhan, sql, structure) { + const query = dbhan.client.query(sql); const pass = new stream.PassThrough({ objectMode: true, @@ -151,8 +153,8 @@ const drivers = driverBases.map(driverBase => ({ return pass; }, - async getVersion(connection) { - const { rows } = await this.query(connection, "show variables like 'version'"); + async getVersion(dbhan) { + const { rows } = await this.query(dbhan, "show variables like 'version'"); const version = rows[0].Value; if (version) { const m = version.match(/(.*)-MariaDB-/); @@ -169,18 +171,18 @@ const drivers = driverBases.map(driverBase => ({ versionText: `MySQL ${version}`, }; }, - async listDatabases(connection) { - const { rows } = await this.query(connection, 'show databases'); + async listDatabases(dbhan) { + const { rows } = await this.query(dbhan, 'show databases'); return rows.map(x => ({ name: x.Database })); }, - async writeTable(pool, name, options) { + async writeTable(dbhan, name, options) { // @ts-ignore - return createBulkInsertStreamBase(this, stream, pool, name, options); + return createBulkInsertStreamBase(this, stream, dbhan, name, options); }, - async createBackupDumper(pool, options) { + async createBackupDumper(dbhan, options) { const { outputFile, databaseName, schemaName } = options; const res = new MySqlDumper({ - connection: pool, + connection: dbhan.client, schema: databaseName || schemaName, outputFile, }); diff --git a/plugins/dbgate-plugin-oracle/src/backend/Analyser.js b/plugins/dbgate-plugin-oracle/src/backend/Analyser.js index c8c314bbd..5b4b845c8 100644 --- a/plugins/dbgate-plugin-oracle/src/backend/Analyser.js +++ b/plugins/dbgate-plugin-oracle/src/backend/Analyser.js @@ -30,8 +30,8 @@ function getColumnInfo( } class Analyser extends DatabaseAnalyser { - constructor(pool, driver, version) { - super(pool, driver, version); + constructor(dbhan, driver, version) { + super(dbhan, driver, version); } createQuery(resFileName, typeFields, replacements = {}) { @@ -47,32 +47,32 @@ class Analyser extends DatabaseAnalyser { async _runAnalysis() { this.feedback({ analysingMessage: 'Loading tables' }); - const tables = await this.analyserQuery('tableList', ['tables'], { $owner: this.pool._schema_name }); + const tables = await this.analyserQuery('tableList', ['tables'], { $owner: this.dbhan.database }); this.feedback({ analysingMessage: 'Loading columns' }); - const columns = await this.analyserQuery('columns', ['tables', 'views'], { $owner: this.pool._schema_name }); + const columns = await this.analyserQuery('columns', ['tables', 'views'], { $owner: this.dbhan.database }); this.feedback({ analysingMessage: 'Loading primary keys' }); - const pkColumns = await this.analyserQuery('primaryKeys', ['tables'], { $owner: this.pool._schema_name }); + const pkColumns = await this.analyserQuery('primaryKeys', ['tables'], { $owner: this.dbhan.database }); //let fkColumns = null; this.feedback({ analysingMessage: 'Loading foreign keys' }); - const fkColumns = await this.analyserQuery('foreignKeys', ['tables'], { $owner: this.pool._schema_name }); + const fkColumns = await this.analyserQuery('foreignKeys', ['tables'], { $owner: this.dbhan.database }); this.feedback({ analysingMessage: 'Loading views' }); - const views = await this.analyserQuery('views', ['views'], { $owner: this.pool._schema_name }); + const views = await this.analyserQuery('views', ['views'], { $owner: this.dbhan.database }); this.feedback({ analysingMessage: 'Loading materialized views' }); const matviews = this.driver.dialect.materializedViews - ? await this.analyserQuery('matviews', ['matviews'], { $owner: this.pool._schema_name }) + ? await this.analyserQuery('matviews', ['matviews'], { $owner: this.dbhan.database }) : null; this.feedback({ analysingMessage: 'Loading routines' }); const routines = await this.analyserQuery('routines', ['procedures', 'functions'], { - $owner: this.pool._schema_name, + $owner: this.dbhan.database, }); this.feedback({ analysingMessage: 'Loading indexes' }); - const indexes = await this.analyserQuery('indexes', ['tables'], { $owner: this.pool._schema_name }); + const indexes = await this.analyserQuery('indexes', ['tables'], { $owner: this.dbhan.database }); this.feedback({ analysingMessage: 'Loading unique names' }); - const uniqueNames = await this.analyserQuery('uniqueNames', ['tables'], { $owner: this.pool._schema_name }); + const uniqueNames = await this.analyserQuery('uniqueNames', ['tables'], { $owner: this.dbhan.database }); this.feedback({ analysingMessage: 'Finalizing DB structure' }); const fkColumnsMapped = fkColumns.rows.map(x => ({ diff --git a/plugins/dbgate-plugin-oracle/src/backend/createOracleBulkInsertStream.js b/plugins/dbgate-plugin-oracle/src/backend/createOracleBulkInsertStream.js index dac1cf56d..b7eedcc6a 100644 --- a/plugins/dbgate-plugin-oracle/src/backend/createOracleBulkInsertStream.js +++ b/plugins/dbgate-plugin-oracle/src/backend/createOracleBulkInsertStream.js @@ -5,12 +5,12 @@ const _ = require('lodash'); * * @param {import('dbgate-types').EngineDriver} driver */ -function createOracleBulkInsertStream(driver, stream, pool, name, options) { +function createOracleBulkInsertStream(driver, stream, dbhan, name, options) { const fullNameQuoted = name.schemaName ? `${driver.dialect.quoteIdentifier(name.schemaName)}.${driver.dialect.quoteIdentifier(name.pureName)}` : driver.dialect.quoteIdentifier(name.pureName); - const writable = createBulkInsertStreamBase(driver, stream, pool, name, { + const writable = createBulkInsertStreamBase(driver, stream, dbhan, name, { ...options, // this is really not used, send method below is used instead commitAfterInsert: true, @@ -28,7 +28,7 @@ function createOracleBulkInsertStream(driver, stream, pool, name, options) { dmp.putRaw(')'); const rows = writable.buffer.map(row => _.mapKeys(row, (v, k) => `c${writable.columnNames.indexOf(k)}`)); - await pool.executeMany(dmp.s, rows, { autoCommit: true }); + await dbhan.client.executeMany(dmp.s, rows, { autoCommit: true }); writable.buffer = []; }; diff --git a/plugins/dbgate-plugin-oracle/src/backend/driver.js b/plugins/dbgate-plugin-oracle/src/backend/driver.js index 64c9078fc..b1c3f39c2 100644 --- a/plugins/dbgate-plugin-oracle/src/backend/driver.js +++ b/plugins/dbgate-plugin-oracle/src/backend/driver.js @@ -88,13 +88,15 @@ const driver = { if (database) { await client.execute(`ALTER SESSION SET CURRENT_SCHEMA = ${database}`); } - client._schema_name = database; - return client; + return { + client, + database, + }; }, - async close(pool) { - return pool.end(); + async close(dbhan) { + return dbhan.client.end(); }, - async query(client, sql) { + async query(dbhan, sql) { if (sql == null || sql.trim() == '') { return { rows: [], @@ -107,7 +109,7 @@ const driver = { sql = mtrim[1]; } - const res = await client.execute(sql); + const res = await dbhan.client.execute(sql); try { const columns = extractOracleColumns(res.metaData); return { rows: (res.rows || []).map(row => zipDataRow(row, columns)), columns }; @@ -118,7 +120,7 @@ const driver = { }; } }, - stream(client, sql, options) { + stream(dbhan, sql, options) { /* const query = new pg.Query({ text: sql, @@ -128,7 +130,7 @@ const driver = { // console.log('queryStream', sql); if (sql.trim().toLowerCase().startsWith('select')) { - const query = client.queryStream(sql); + const query = dbhan.client.queryStream(sql); // const consumeStream = new Promise((resolve, reject) => { let rowcount = 0; let wasHeader = false; @@ -202,7 +204,7 @@ const driver = { }); //}); } else { - client.execute(sql, (err, res) => { + dbhan.client.execute(sql, (err, res) => { if (err) { console.log('Error query', err, sql); const lineNumber = (sql.substring(0, err.offset).match(/\n/g) || []).length; @@ -237,23 +239,23 @@ const driver = { //console.log('Rows selected: ' + numrows); //client.query(query); }, - async getVersionCore(client) { + async getVersionCore(dbhan) { try { const { rows } = await this.query( - client, + dbhan, "SELECT product || ' ' || version_full as \"version\" FROM product_component_version WHERE product LIKE 'Oracle%Database%'" ); return rows[0].version.replace(' ', ' '); } catch (e) { - const { rows } = await this.query(client, 'SELECT banner as "version" FROM v$version'); + const { rows } = await this.query(dbhan, 'SELECT banner as "version" FROM v$version'); return rows[0].version; } }, - async getVersion(client) { + async getVersion(dbhan) { try { //const { rows } = await this.query(client, "SELECT banner as version FROM v$version WHERE banner LIKE 'Oracle%'"); // const { rows } = await this.query(client, 'SELECT version as "version" FROM v$instance'); - const version = await this.getVersionCore(client); + const version = await this.getVersionCore(dbhan); const m = version.match(/(\d+[a-z]+)\s+(\w+).*?(\d+)\.(\d+)/); //console.log('M', m); @@ -281,7 +283,7 @@ const driver = { }; } }, - async readQuery(client, sql, structure) { + async readQuery(dbhan, sql, structure) { /* const query = new pg.Query({ text: sql, @@ -289,7 +291,7 @@ const driver = { }); */ // console.log('readQuery', sql, structure); - const query = await client.queryStream(sql); + const query = await dbhan.client.queryStream(sql); let wasHeader = false; let columns = null; @@ -333,11 +335,11 @@ const driver = { return pass; }, - async writeTable(pool, name, options) { - return createOracleBulkInsertStream(this, stream, pool, name, options); + async writeTable(dbhan, name, options) { + return createOracleBulkInsertStream(this, stream, dbhan, name, options); }, - async listDatabases(client) { - const { rows } = await this.query(client, 'SELECT username as "name" from all_users order by username'); + async listDatabases(dbhan) { + const { rows } = await this.query(dbhan, 'SELECT username as "name" from all_users order by username'); return rows; }, diff --git a/plugins/dbgate-plugin-postgres/src/backend/Analyser.js b/plugins/dbgate-plugin-postgres/src/backend/Analyser.js index 608897f63..e2f87b848 100644 --- a/plugins/dbgate-plugin-postgres/src/backend/Analyser.js +++ b/plugins/dbgate-plugin-postgres/src/backend/Analyser.js @@ -51,13 +51,13 @@ function getColumnInfo( } class Analyser extends DatabaseAnalyser { - constructor(pool, driver, version) { - super(pool, driver, version); + constructor(dbhan, driver, version) { + super(dbhan, driver, version); } createQuery(resFileName, typeFields, replacements = {}) { const query = super.createQuery(sql[resFileName], typeFields, replacements); - const dbname = this.pool.database; + const dbname = this.dbhan.database; const schemaCondition = isCompositeDbName(dbname) ? `= '${splitCompositeDbName(dbname).schema}' ` : ' IS NOT NULL '; diff --git a/plugins/dbgate-plugin-postgres/src/backend/drivers.js b/plugins/dbgate-plugin-postgres/src/backend/drivers.js index 220481883..dd01dd0c0 100644 --- a/plugins/dbgate-plugin-postgres/src/backend/drivers.js +++ b/plugins/dbgate-plugin-postgres/src/backend/drivers.js @@ -95,21 +95,21 @@ const drivers = driverBases.map(driverBase => ({ database, }; }, - async close(handle) { - return handle.client.end(); + async close(dbhan) { + return dbhan.client.end(); }, - async query(handle, sql) { + async query(dbhan, sql) { if (sql == null) { return { rows: [], columns: [], }; } - const res = await handle.client.query({ text: sql, rowMode: 'array' }); + const res = await dbhan.client.query({ text: sql, rowMode: 'array' }); const columns = extractPostgresColumns(res); return { rows: (res.rows || []).map(row => zipDataRow(row, columns)), columns }; }, - stream(handle, sql, options) { + stream(dbhan, sql, options) { const query = new pg.Query({ text: sql, rowMode: 'array', @@ -168,10 +168,10 @@ const drivers = driverBases.map(driverBase => ({ options.done(); }); - handle.client.query(query); + dbhan.client.query(query); }, - async getVersion(handle) { - const { rows } = await this.query(handle, 'SELECT version()'); + async getVersion(dbhan) { + const { rows } = await this.query(dbhan, 'SELECT version()'); const { version } = rows[0]; const isCockroach = version.toLowerCase().includes('cockroachdb'); @@ -201,7 +201,7 @@ const drivers = driverBases.map(driverBase => ({ versionMinor, }; }, - async readQuery(client, sql, structure) { + async readQuery(dbhan, sql, structure) { const query = new pg.Query({ text: sql, rowMode: 'array', @@ -246,16 +246,16 @@ const drivers = driverBases.map(driverBase => ({ pass.end(); }); - client.query(query); + dbhan.client.query(query); return pass; }, - async writeTable(pool, name, options) { + async writeTable(dbhan, name, options) { // @ts-ignore - return createBulkInsertStreamBase(this, stream, pool, name, options); + return createBulkInsertStreamBase(this, stream, dbhan, name, options); }, - async listDatabases(handle) { - const { rows } = await this.query(handle, 'SELECT datname AS name FROM pg_database WHERE datistemplate = false'); + async listDatabases(dbhan) { + const { rows } = await this.query(dbhan, 'SELECT datname AS name FROM pg_database WHERE datistemplate = false'); return rows; }, @@ -272,12 +272,12 @@ const drivers = driverBases.map(driverBase => ({ ]; }, - async listSchemas(handle) { + async listSchemas(dbhan) { const schemaRows = await this.query( - handle, + dbhan, 'select oid as "object_id", nspname as "schema_name" from pg_catalog.pg_namespace' ); - const defaultSchemaRows = await this.query(handle, 'SHOW SEARCH_PATH;'); + const defaultSchemaRows = await this.query(dbhan, 'SHOW SEARCH_PATH;'); const searchPath = defaultSchemaRows.rows[0]?.search_path?.replace('"$user",', '')?.trim(); const schemas = schemaRows.rows.map(x => ({ diff --git a/plugins/dbgate-plugin-redis/src/backend/Analyser.js b/plugins/dbgate-plugin-redis/src/backend/Analyser.js index 37749ff72..6000ff063 100644 --- a/plugins/dbgate-plugin-redis/src/backend/Analyser.js +++ b/plugins/dbgate-plugin-redis/src/backend/Analyser.js @@ -1,8 +1,8 @@ const { DatabaseAnalyser } = global.DBGATE_PACKAGES['dbgate-tools'];; class Analyser extends DatabaseAnalyser { - constructor(pool, driver) { - super(pool, driver); + constructor(dbhan, driver) { + super(dbhan, driver); } } diff --git a/plugins/dbgate-plugin-redis/src/backend/driver.js b/plugins/dbgate-plugin-redis/src/backend/driver.js index 576d3c213..db2d097aa 100644 --- a/plugins/dbgate-plugin-redis/src/backend/driver.js +++ b/plugins/dbgate-plugin-redis/src/backend/driver.js @@ -83,32 +83,34 @@ const driver = { analyserClass: Analyser, async connect({ server, port, user, password, database, useDatabaseUrl, databaseUrl, treeKeySeparator }) { let db = 0; - let pool; + let client; if (useDatabaseUrl) { - pool = new Redis(databaseUrl); + client = new Redis(databaseUrl); } else { if (_.isString(database) && database.startsWith('db')) db = parseInt(database.substring(2)); if (_.isNumber(database)) db = database; - pool = new Redis({ + client = new Redis({ host: server, port, username: user, password, db, }); - pool.__treeKeySeparator = treeKeySeparator || ':'; } - return pool; + return { + client, + treeKeySeparator: treeKeySeparator || ':', + }; }, // @ts-ignore - async query(pool, sql) { + async query(dbhan, sql) { return { rows: [], columns: [], }; }, - async stream(pool, sql, options) { + async stream(dbhan, sql, options) { const parts = splitCommandLine(sql); if (parts.length < 1) { options.done(); @@ -116,7 +118,7 @@ const driver = { } const command = parts[0].toLowerCase(); const args = parts.slice(1); - const res = await pool.call(command, ...args); + const res = await dbhan.client.call(command, ...args); options.info({ message: JSON.stringify(res), @@ -126,7 +128,7 @@ const driver = { options.done(); }, - async readQuery(pool, sql, structure) { + async readQuery(dbhan, sql, structure) { const pass = new stream.PassThrough({ objectMode: true, highWaterMark: 100, @@ -139,11 +141,11 @@ const driver = { return pass; }, - async writeTable(pool, name, options) { - return createBulkInsertStreamBase(this, stream, pool, name, options); + async writeTable(dbhan, name, options) { + return createBulkInsertStreamBase(this, stream, dbhan, name, options); }, - async info(pool) { - const info = await pool.info(); + async info(dbhan) { + const info = await dbhan.client.info(); return _.fromPairs( info .split('\n') @@ -151,30 +153,30 @@ const driver = { .map((x) => x.split(':')) ); }, - async getVersion(pool) { - const info = await this.info(pool); + async getVersion(dbhan) { + const info = await this.info(dbhan); return { version: info.redis_version, versionText: `Redis ${info.redis_version}`, }; }, - async listDatabases(pool) { - const info = await this.info(pool); + async listDatabases(dbhan) { + const info = await this.info(dbhan); return _.range(16).map((index) => ({ name: `db${index}`, extInfo: info[`db${index}`], sortOrder: index })); }, - async loadKeys(pool, root = '', filter = null) { - const keys = await this.getKeys(pool, root ? `${root}${pool.__treeKeySeparator}*` : '*'); + async loadKeys(dbhan, root = '', filter = null) { + const keys = await this.getKeys(dbhan, root ? `${root}${dbhan.__treeKeySeparator}*` : '*'); const keysFiltered = keys.filter((x) => filterName(filter, x)); - const res = this.extractKeysFromLevel(pool, root, keysFiltered); - await this.enrichKeyInfo(pool, res); + const res = this.extractKeysFromLevel(dbhan, root, keysFiltered); + await this.enrichKeyInfo(dbhan, res); return res; }, - async exportKeys(pool, options) { - const dump = new RedisDump({ client: pool }); + async exportKeys(dbhan, options) { + const dump = new RedisDump({ client: dbhan.client }); return new Promise((resolve, reject) => { dump.export({ type: 'redis', @@ -187,24 +189,24 @@ const driver = { }); }, - async getKeys(pool, keyQuery = '*') { + async getKeys(dbhan, keyQuery = '*') { const res = []; let cursor = 0; do { - const [strCursor, keys] = await pool.scan(cursor, 'MATCH', keyQuery, 'COUNT', 100); + const [strCursor, keys] = await dbhan.client.scan(cursor, 'MATCH', keyQuery, 'COUNT', 100); res.push(...keys); cursor = parseInt(strCursor); } while (cursor > 0); return res; }, - extractKeysFromLevel(pool, root, keys) { - const prefix = root ? `${root}${pool.__treeKeySeparator}` : ''; - const rootSplit = _.compact(root.split(pool.__treeKeySeparator)); + extractKeysFromLevel(dbhan, root, keys) { + const prefix = root ? `${root}${dbhan.treeKeySeparator}` : ''; + const rootSplit = _.compact(root.split(dbhan.treeKeySeparator)); const res = {}; for (const key of keys) { if (!key.startsWith(prefix)) continue; - const keySplit = key.split(pool.__treeKeySeparator); + const keySplit = key.split(dbhan.treeKeySeparator); if (keySplit.length > rootSplit.length) { const text = keySplit[rootSplit.length]; if (keySplit.length == rootSplit.length + 1) { @@ -218,9 +220,9 @@ const driver = { res[dctKey].count++; } else { res[dctKey] = { - text: text + pool.__treeKeySeparator + '*', + text: text + dbhan.treeKeySeparator + '*', type: 'dir', - root: keySplit.slice(0, rootSplit.length + 1).join(pool.__treeKeySeparator), + root: keySplit.slice(0, rootSplit.length + 1).join(dbhan.treeKeySeparator), count: 1, }; } @@ -230,46 +232,46 @@ const driver = { return Object.values(res); }, - async getKeyCardinality(pool, key, type) { + async getKeyCardinality(dbhan, key, type) { switch (type) { case 'list': - return pool.llen(key); + return dbhan.client.llen(key); case 'set': - return pool.scard(key); + return dbhan.client.scard(key); case 'zset': - return pool.zcard(key); + return dbhan.client.zcard(key); case 'stream': - return pool.xlen(key); + return dbhan.client.xlen(key); case 'hash': - return pool.hlen(key); + return dbhan.client.hlen(key); } }, - async enrichOneKeyInfo(pool, item) { - item.type = await pool.type(item.key); - item.count = await this.getKeyCardinality(pool, item.key, item.type); + async enrichOneKeyInfo(dbhan, item) { + item.type = await dbhan.client.type(item.key); + item.count = await this.getKeyCardinality(dbhan, item.key, item.type); }, - async enrichKeyInfo(pool, levelInfo) { + async enrichKeyInfo(dbhan, levelInfo) { await async.eachLimit( levelInfo.filter((x) => x.key), 10, - async (item) => await this.enrichOneKeyInfo(pool, item) + async (item) => await this.enrichOneKeyInfo(dbhan, item) ); }, - async loadKeyInfo(pool, key) { + async loadKeyInfo(dbhan, key) { const res = {}; - const type = await pool.type(key); + const type = await dbhan.client.type(key); res.key = key; res.type = type; - res.ttl = await pool.ttl(key); - res.count = await this.getKeyCardinality(pool, key, type); + res.ttl = await dbhan.client.ttl(key); + res.count = await this.getKeyCardinality(dbhan, key, type); switch (type) { case 'string': - res.value = await pool.get(key); + res.value = await dbhan.client.get(key); break; // case 'list': // res.tableColumns = [{ name: 'value' }]; @@ -297,16 +299,16 @@ const driver = { return res; }, - async deleteBranch(pool, keyQuery) { - const keys = await this.getKeys(pool, keyQuery); + async deleteBranch(dbhan, keyQuery) { + const keys = await this.getKeys(dbhan, keyQuery); const keysChunked = _.chunk(keys, 10); - await async.eachLimit(keysChunked, 10, async (keysChunk) => await pool.del(...keysChunk)); + await async.eachLimit(keysChunked, 10, async (keysChunk) => await dbhan.client.del(...keysChunk)); }, - async callMethod(pool, method, args) { + async callMethod(dbhan, method, args) { switch (method) { case 'mdel': - return await this.deleteBranch(pool, args[0]); + return await this.deleteBranch(dbhan, args[0]); case 'xaddjson': let json; try { @@ -314,44 +316,44 @@ const driver = { } catch (e) { throw new Error('Value must be valid JSON. ' + e.message); } - return await pool.xadd(args[0], args[1] || '*', ..._.flatten(_.toPairs(json))); + return await dbhan.client.xadd(args[0], args[1] || '*', ..._.flatten(_.toPairs(json))); } - return await pool[method](...args); + return await dbhan.client[method](...args); }, - async loadKeyTableRange(pool, key, cursor, count) { - const type = await pool.type(key); + async loadKeyTableRange(dbhan, key, cursor, count) { + const type = await dbhan.client.type(key); switch (type) { case 'list': { - const res = await pool.lrange(key, cursor, cursor + count); + const res = await dbhan.client.lrange(key, cursor, cursor + count); return { cursor: res.length > count ? cursor + count : 0, items: res.map((value) => ({ value })).slice(0, count), }; } case 'set': { - const res = await pool.sscan(key, cursor, 'COUNT', count); + const res = await dbhan.client.sscan(key, cursor, 'COUNT', count); return { cursor: parseInt(res[0]), items: res[1].map((value) => ({ value })), }; } case 'zset': { - const res = await pool.zscan(key, cursor, 'COUNT', count); + const res = await dbhan.client.zscan(key, cursor, 'COUNT', count); return { cursor: parseInt(res[0]), items: _.chunk(res[1], 2).map((item) => ({ value: item[0], score: item[1] })), }; } case 'hash': { - const res = await pool.hscan(key, cursor, 'COUNT', count); + const res = await dbhan.client.hscan(key, cursor, 'COUNT', count); return { cursor: parseInt(res[0]), items: _.chunk(res[1], 2).map((item) => ({ key: item[0], value: item[1] })), }; } case 'stream': { - const res = await pool.xrange(key, cursor == 0 ? '-' : cursor, '+', 'COUNT', count); + const res = await dbhan.client.xrange(key, cursor == 0 ? '-' : cursor, '+', 'COUNT', count); let newCursor = 0; if (res.length > 0) { const id = res[res.length - 1][0]; diff --git a/plugins/dbgate-plugin-sqlite/src/backend/driver.js b/plugins/dbgate-plugin-sqlite/src/backend/driver.js index 378bd2fdf..aff55650c 100644 --- a/plugins/dbgate-plugin-sqlite/src/backend/driver.js +++ b/plugins/dbgate-plugin-sqlite/src/backend/driver.js @@ -64,15 +64,17 @@ const driver = { analyserClass: Analyser, async connect({ databaseFile, isReadOnly }) { const Database = getBetterSqlite(); - const pool = new Database(databaseFile, { readonly: !!isReadOnly }); - return pool; + const client = new Database(databaseFile, { readonly: !!isReadOnly }); + return { + client, + }; }, - async close(pool) { - return pool.close(); + async close(dbhan) { + return dbhan.client.close(); }, // @ts-ignore - async query(pool, sql) { - const stmt = pool.prepare(sql); + async query(dbhan, sql) { + const stmt = dbhan.client.prepare(sql); // stmt.raw(); if (stmt.reader) { const columns = stmt.columns(); @@ -92,14 +94,14 @@ const driver = { }; } }, - async stream(client, sql, options) { + async stream(dbhan, sql, options) { const sqlSplitted = splitQuery(sql, sqliteSplitterOptions); const rowCounter = { count: 0, date: null }; - const inTransaction = client.transaction(() => { + const inTransaction = dbhan.client.transaction(() => { for (const sqlItem of sqlSplitted) { - runStreamItem(client, sqlItem, options, rowCounter); + runStreamItem(dbhan.client, sqlItem, options, rowCounter); } if (rowCounter.date) { @@ -128,10 +130,10 @@ const driver = { options.done(); // return stream; }, - async script(client, sql) { - const inTransaction = client.transaction(() => { + async script(dbhan, sql) { + const inTransaction = dbhan.client.transaction(() => { for (const sqlItem of splitQuery(sql, this.getQuerySplitterOptions('script'))) { - const stmt = client.prepare(sqlItem); + const stmt = dbhan.client.prepare(sqlItem); stmt.run(); } }); @@ -149,13 +151,13 @@ const driver = { } pass.end(); }, - async readQuery(pool, sql, structure) { + async readQuery(dbhan, sql, structure) { const pass = new stream.PassThrough({ objectMode: true, highWaterMark: 100, }); - const stmt = pool.prepare(sql); + const stmt = dbhan.client.prepare(sql); const columns = stmt.columns(); pass.write({ @@ -171,11 +173,11 @@ const driver = { return pass; }, - async writeTable(pool, name, options) { - return createBulkInsertStreamBase(this, stream, pool, name, options); + async writeTable(dbhan, name, options) { + return createBulkInsertStreamBase(this, stream, dbhan, name, options); }, - async getVersion(pool) { - const { rows } = await this.query(pool, 'select sqlite_version() as version'); + async getVersion(dbhan) { + const { rows } = await this.query(dbhan, 'select sqlite_version() as version'); const { version } = rows[0]; return {