introduced dbhandle instead of overwriting 3rd party client's fields

This commit is contained in:
SPRINX0\prochazka
2024-09-20 10:27:03 +02:00
parent 6b8b511d0d
commit 48d4374346
26 changed files with 387 additions and 371 deletions

View File

@@ -63,8 +63,8 @@ function getColumnInfo({
}
class MsSqlAnalyser extends DatabaseAnalyser {
constructor(pool, driver, version) {
super(pool, driver, version);
constructor(dbhan, driver, version) {
super(dbhan, driver, version);
}
createQuery(resFileName, typeFields) {
@@ -75,7 +75,7 @@ class MsSqlAnalyser extends DatabaseAnalyser {
async _computeSingleObjectId() {
const { schemaName, pureName, typeField } = this.singleObjectFilter;
const fullName = schemaName ? `[${schemaName}].[${pureName}]` : pureName;
const resId = await this.driver.query(this.pool, `SELECT OBJECT_ID('${fullName}') AS id`);
const resId = await this.driver.query(this.dbhan, `SELECT OBJECT_ID('${fullName}') AS id`);
this.singleObjectId = resId.rows[0].id;
}

View File

@@ -1,8 +1,8 @@
const { createBulkInsertStreamBase } = global.DBGATE_PACKAGES['dbgate-tools'];
function runBulkInsertBatch(pool, tableName, writable, rows) {
function runBulkInsertBatch(dbhan, tableName, writable, rows) {
return new Promise((resolve, reject) => {
const tableMgr = pool.tableMgr();
const tableMgr = dbhan.client.tableMgr();
tableMgr.bind(tableName, bulkMgr => {
bulkMgr.insertRows(rows, err => {
if (err) reject(err);
@@ -16,8 +16,8 @@ function runBulkInsertBatch(pool, tableName, writable, rows) {
*
* @param {import('dbgate-types').EngineDriver} driver
*/
function createNativeBulkInsertStream(driver, stream, pool, name, options) {
const writable = createBulkInsertStreamBase(driver, stream, pool, name, options);
function createNativeBulkInsertStream(driver, stream, dbhan, name, options) {
const writable = createBulkInsertStreamBase(driver, stream, dbhan, name, options);
const fullName = name.schemaName ? `[${name.schemaName}].[${name.pureName}]` : name.pureName;
@@ -25,7 +25,7 @@ function createNativeBulkInsertStream(driver, stream, pool, name, options) {
const rows = writable.buffer;
writable.buffer = [];
await runBulkInsertBatch(pool, fullName, writable, rows);
await runBulkInsertBatch(dbhan, fullName, writable, rows);
};
return writable;

View File

@@ -3,12 +3,12 @@ const tedious = require('tedious');
const getConcreteType = require('./getConcreteType');
const _ = require('lodash');
function runBulkInsertBatch(pool, tableName, writable, rows) {
function runBulkInsertBatch(dbhan, tableName, writable, rows) {
return new Promise((resolve, reject) => {
var options = { keepNulls: true };
// instantiate - provide the table where you'll be inserting to, options and a callback
var bulkLoad = pool.newBulkLoad(tableName, options, (error, rowCount) => {
var bulkLoad = dbhan.client.newBulkLoad(tableName, options, (error, rowCount) => {
if (error) reject(error);
else resolve();
});
@@ -40,7 +40,7 @@ function runBulkInsertBatch(pool, tableName, writable, rows) {
);
// console.log('IMPORT ROWS', rowsMapped);
pool.execBulkLoad(bulkLoad, rowsMapped);
dbhan.client.execBulkLoad(bulkLoad, rowsMapped);
});
}
@@ -48,8 +48,8 @@ function runBulkInsertBatch(pool, tableName, writable, rows) {
*
* @param {import('dbgate-types').EngineDriver} driver
*/
function createTediousBulkInsertStream(driver, stream, pool, name, options) {
const writable = createBulkInsertStreamBase(driver, stream, pool, name, options);
function createTediousBulkInsertStream(driver, stream, dbhan, name, options) {
const writable = createBulkInsertStreamBase(driver, stream, dbhan, name, options);
const fullName = name.schemaName ? `[${name.schemaName}].[${name.pureName}]` : name.pureName;
@@ -59,7 +59,7 @@ function createTediousBulkInsertStream(driver, stream, pool, name, options) {
? `${driver.dialect.quoteIdentifier(name.schemaName)}.${driver.dialect.quoteIdentifier(name.pureName)}`
: driver.dialect.quoteIdentifier(name.pureName);
const respTemplate = await driver.query(pool, `SELECT * FROM ${fullNameQuoted} WHERE 1=0`, {
const respTemplate = await driver.query(dbhan, `SELECT * FROM ${fullNameQuoted} WHERE 1=0`, {
addDriverNativeColumn: true,
});
writable.templateColumns = respTemplate.columns;
@@ -68,7 +68,7 @@ function createTediousBulkInsertStream(driver, stream, pool, name, options) {
const rows = writable.buffer;
writable.buffer = [];
await runBulkInsertBatch(pool, fullName, writable, rows);
await runBulkInsertBatch(dbhan, fullName, writable, rows);
};
return writable;

View File

@@ -79,55 +79,53 @@ const driver = {
async connect(conn) {
const { authType } = conn;
const result =
requireMsnodesqlv8 && (authType == 'sspi' || authType == 'sql')
? await nativeConnect(conn)
: await tediousConnect(conn);
const connectionType = requireMsnodesqlv8 && (authType == 'sspi' || authType == 'sql') ? 'msnodesqlv8' : 'tedious';
const client = connectionType == 'msnodesqlv8' ? await nativeConnect(conn) : await tediousConnect(conn);
if (result) {
result.__dbgate_database_name__ = conn.database;
}
return result;
return {
client,
connectionType,
database: conn.database,
};
},
async close(pool) {
return pool.close();
async close(dbhan) {
return dbhan.client.close();
},
async queryCore(pool, sql, options) {
if (pool._connectionType == 'msnodesqlv8') {
return nativeQueryCore(pool, sql, options);
async queryCore(dbhan, sql, options) {
if (dbhan.connectionType == 'msnodesqlv8') {
return nativeQueryCore(dbhan, sql, options);
} else {
return tediousQueryCore(pool, sql, options);
return tediousQueryCore(dbhan, sql, options);
}
},
async query(pool, sql, options) {
async query(dbhan, sql, options) {
return lock.acquire('connection', async () => {
return this.queryCore(pool, sql, options);
return this.queryCore(dbhan, sql, options);
});
},
async stream(pool, sql, options) {
if (pool._connectionType == 'msnodesqlv8') {
return nativeStream(pool, sql, options);
async stream(dbhan, sql, options) {
if (dbhan.connectionType == 'msnodesqlv8') {
return nativeStream(dbhan, sql, options);
} else {
return tediousStream(pool, sql, options);
return tediousStream(dbhan, sql, options);
}
},
async readQuery(pool, sql, structure) {
if (pool._connectionType == 'msnodesqlv8') {
return nativeReadQuery(pool, sql, structure);
async readQuery(dbhan, sql, structure) {
if (dbhan.connectionType == 'msnodesqlv8') {
return nativeReadQuery(dbhan, sql, structure);
} else {
return tediousReadQuery(pool, sql, structure);
return tediousReadQuery(dbhan, sql, structure);
}
},
async writeTable(pool, name, options) {
if (pool._connectionType == 'msnodesqlv8') {
return createNativeBulkInsertStream(this, stream, pool, name, options);
async writeTable(dbhan, name, options) {
if (dbhan.connectionType == 'msnodesqlv8') {
return createNativeBulkInsertStream(this, stream, dbhan, name, options);
} else {
return createTediousBulkInsertStream(this, stream, pool, name, options);
return createTediousBulkInsertStream(this, stream, dbhan, name, options);
}
},
async getVersion(pool) {
const res = (await this.query(pool, versionQuery)).rows[0];
async getVersion(dbhan) {
const res = (await this.query(dbhan, versionQuery)).rows[0];
if (res.productVersion) {
const splitted = res.productVersion.split('.');
@@ -138,8 +136,8 @@ const driver = {
}
return res;
},
async listDatabases(pool) {
const { rows } = await this.query(pool, 'SELECT name FROM sys.databases order by name');
async listDatabases(dbhan) {
const { rows } = await this.query(dbhan, 'SELECT name FROM sys.databases order by name');
return rows;
},
getRedirectAuthUrl(connection, options) {
@@ -155,10 +153,10 @@ const driver = {
getAccessTokenFromAuth: (connection, req) => {
return req?.user?.msentraToken;
},
async listSchemas(pool) {
const { rows } = await this.query(pool, 'select schema_id as objectId, name as schemaName from sys.schemas');
async listSchemas(dbhan) {
const { rows } = await this.query(dbhan, 'select schema_id as objectId, name as schemaName from sys.schemas');
const defaultSchemaRows = await this.query(pool, 'SELECT SCHEMA_NAME() as name');
const defaultSchemaRows = await this.query(dbhan, 'SELECT SCHEMA_NAME() as name');
const defaultSchema = defaultSchemaRows.rows[0]?.name;
return rows.map(x => ({

View File

@@ -63,7 +63,6 @@ async function connectWithDriver({ server, port, user, password, database, authT
if (err) {
reject(err);
} else {
conn._connectionType = 'msnodesqlv8';
resolve(conn);
}
});
@@ -88,7 +87,7 @@ async function nativeConnect(connection) {
}
}
async function nativeQueryCore(pool, sql, options) {
async function nativeQueryCore(dbhan, sql, options) {
if (sql == null) {
return Promise.resolve({
rows: [],
@@ -98,7 +97,7 @@ async function nativeQueryCore(pool, sql, options) {
return new Promise((resolve, reject) => {
let columns = null;
let currentRow = null;
const q = pool.query(sql);
const q = dbhan.client.query(sql);
const rows = [];
q.on('meta', meta => {
@@ -128,7 +127,7 @@ async function nativeQueryCore(pool, sql, options) {
});
}
async function nativeReadQuery(pool, sql, structure) {
async function nativeReadQuery(dbhan, sql, structure) {
const pass = new stream.PassThrough({
objectMode: true,
highWaterMark: 100,
@@ -136,7 +135,7 @@ async function nativeReadQuery(pool, sql, structure) {
let columns = null;
let currentRow = null;
const q = pool.query(sql);
const q = dbhan.client.query(sql);
q.on('meta', meta => {
columns = extractNativeColumns(meta);
@@ -168,7 +167,7 @@ async function nativeReadQuery(pool, sql, structure) {
return pass;
}
async function nativeStream(pool, sql, options) {
async function nativeStream(dbhan, sql, options) {
const handleInfo = info => {
const { message, lineNumber, procName } = info;
options.info({
@@ -192,7 +191,7 @@ async function nativeStream(pool, sql, options) {
let columns = null;
let currentRow = null;
const q = pool.query(sql);
const q = dbhan.client.query(sql);
q.on('meta', meta => {
if (currentRow) options.row(currentRow);

View File

@@ -68,14 +68,13 @@ async function tediousConnect(storedConnection) {
if (err) {
reject(err);
}
connection._connectionType = 'tedious';
resolve(connection);
});
connection.connect();
});
}
async function tediousQueryCore(pool, sql, options) {
async function tediousQueryCore(dbhan, sql, options) {
if (sql == null) {
return Promise.resolve({
rows: [],
@@ -103,12 +102,12 @@ async function tediousQueryCore(pool, sql, options) {
)
);
});
if (discardResult) pool.execSqlBatch(request);
else pool.execSql(request);
if (discardResult) dbhan.client.execSqlBatch(request);
else dbhan.client.execSql(request);
});
}
async function tediousReadQuery(pool, sql, structure) {
async function tediousReadQuery(dbhan, sql, structure) {
const pass = new stream.PassThrough({
objectMode: true,
highWaterMark: 100,
@@ -133,12 +132,12 @@ async function tediousReadQuery(pool, sql, structure) {
);
pass.write(row);
});
pool.execSql(request);
dbhan.client.execSql(request);
return pass;
}
async function tediousStream(pool, sql, options) {
async function tediousStream(dbhan, sql, options) {
let currentColumns = [];
const handleInfo = info => {
@@ -188,7 +187,7 @@ async function tediousStream(pool, sql, options) {
);
options.row(row);
});
pool.execSqlBatch(request);
dbhan.client.execSqlBatch(request);
}
module.exports = {