Files
dbgate/plugins/dbgate-plugin-mysql/src/backend/drivers.js
2025-11-13 13:44:11 +01:00

315 lines
8.1 KiB
JavaScript

const _ = require('lodash');
const stream = require('stream');
const driverBases = require('../frontend/drivers');
const Analyser = require('./Analyser');
const mysql2 = require('mysql2');
const fs = require('fs');
const { getLogger, createBulkInsertStreamBase, makeUniqueColumnNames, extractErrorLogData } =
global.DBGATE_PACKAGES['dbgate-tools'];
const logger = getLogger('mysqlDriver');
let authProxy;
function extractColumns(fields) {
if (fields) {
const res = fields.map(col => ({
columnName: col.name,
pureName: col.orgTable,
}));
makeUniqueColumnNames(res);
return res;
}
return null;
}
function modifyRow(row, columns) {
columns.forEach((col) => {
if (Buffer.isBuffer(row[col.columnName])) {
row[col.columnName] = { $binary: { base64: Buffer.from(row[col.columnName]).toString('base64') } };
}
});
return row;
}
function zipDataRow(rowArray, columns) {
return _.zipObject(
columns.map(x => x.columnName),
rowArray
);
}
/** @type {import('dbgate-types').EngineDriver} */
const drivers = driverBases.map(driverBase => ({
...driverBase,
analyserClass: Analyser,
async connect(props) {
const { conid, server, port, user, password, database, ssl, isReadOnly, forceRowsAsObjects, socketPath, authType } =
props;
let awsIamToken = null;
if (authType == 'awsIam') {
awsIamToken = await authProxy.getAwsIamToken(props);
}
const options = {
host: authType == 'socket' ? null : server,
port: authType == 'socket' ? null : port,
socketPath: authType == 'socket' ? socketPath || driverBase.defaultSocketPath : null,
user,
password: awsIamToken || password,
database,
ssl: authType == 'awsIam' ? ssl || { rejectUnauthorized: false } : ssl,
rowsAsArray: forceRowsAsObjects ? false : true,
supportBigNumbers: true,
bigNumberStrings: true,
dateStrings: true,
infileStreamFactory: path => fs.createReadStream(path),
// TODO: test following options
// multipleStatements: true,
};
const client = mysql2.createConnection(options);
const dbhan = {
client,
database,
conid,
};
if (isReadOnly) {
await this.query(dbhan, 'SET SESSION TRANSACTION READ ONLY');
}
return dbhan;
},
close(dbhan) {
return new Promise(resolve => {
dbhan.client.end(resolve);
});
},
query(dbhan, sql, options) {
if (sql == null) {
return {
rows: [],
columns: [],
};
}
if (
options?.importSqlDump &&
(sql.trim().startsWith('/*!') || sql.trim().startsWith('/*M!')) &&
(sql.includes('character_set_client') || sql.includes('NOTE_VERBOSITY'))
) {
// skip this in SQL dumps
return {
rows: [],
columns: [],
};
}
return new Promise((resolve, reject) => {
dbhan.client.query(sql, function (error, results, fields) {
if (error) reject(error);
const columns = extractColumns(fields);
resolve({ rows: results && columns && results.map && results.map(row => modifyRow(zipDataRow(row, columns), columns)), columns });
});
});
},
async stream(dbhan, sql, options) {
const query = dbhan.client.query(sql);
let columns = [];
// const handleInfo = (info) => {
// const { message, lineNumber, procName } = info;
// options.info({
// message,
// line: lineNumber,
// procedure: procName,
// time: new Date(),
// severity: 'info',
// });
// };
const handleEnd = () => {
options.done();
};
const handleRow = row => {
if (row && row.constructor && (row.constructor.name == 'OkPacket' || row.constructor.name == 'ResultSetHeader')) {
options.info({
message: `${row.affectedRows} rows affected`,
time: new Date(),
severity: 'info',
rowsAffected: row.affectedRows,
});
if (row.stateChanges?.schema) {
options.changedCurrentDatabase(row.stateChanges.schema);
}
} else {
if (columns) {
options.row(modifyRow(zipDataRow(row, columns), columns));
}
}
};
const handleFields = fields => {
columns = extractColumns(fields);
if (columns) options.recordset(columns, { engine: driverBase.engine });
};
const handleError = error => {
logger.error(extractErrorLogData(error, this.getLogDbInfo(dbhan)), 'DBGM-00200 Stream error');
const { message } = error;
options.info({
message,
line: 0,
time: new Date(),
severity: 'error',
});
};
query.on('error', handleError).on('fields', handleFields).on('result', handleRow).on('end', handleEnd);
},
async readQuery(dbhan, sql, structure) {
const query = dbhan.client.query(sql);
const pass = new stream.PassThrough({
objectMode: true,
highWaterMark: 100,
});
let columns = [];
query
.on('error', err => {
console.error(err);
pass.end();
})
.on('fields', fields => {
columns = extractColumns(fields);
pass.write({
__isStreamHeader: true,
engine: driverBase.engine,
...(structure || { columns }),
});
})
.on('result', row => pass.write(modifyRow(zipDataRow(row, columns), columns)))
.on('end', () => pass.end());
return pass;
},
async getVersion(dbhan) {
const { rows } = await this.query(dbhan, "show variables like 'version'");
const version = rows[0].Value;
if (version) {
const m = version.match(/(.*)-MariaDB-/);
if (m) {
return {
version,
versionText: `MariaDB ${m[1]}`,
};
}
}
return {
version,
versionText: `MySQL ${version}`,
};
},
async listDatabases(dbhan) {
const { rows } = await this.query(dbhan, 'show databases');
return rows.map(x => ({ name: x.Database }));
},
async listVariables(dbhan) {
const { rows } = await this.query(dbhan, 'SHOW VARIABLES');
return rows.map(row => ({
variable: row.Variable_name,
value: row.Value,
}));
},
async listProcesses(dbhan) {
const { rows } = await this.query(dbhan, 'SHOW FULL PROCESSLIST');
return rows.map(row => ({
processId: row.Id,
connectionId: null,
client: row.Host,
operation: row.Info,
namespace: row.Database,
runningTime: row.Time,
state: row.State,
waitingFor: row.State && row.State.includes('Waiting'),
}));
},
async killProcess(dbhan, processId) {
await this.query(dbhan, `KILL ${processId}`);
},
async serverSummary(dbhan) {
const [variables, processes, databases] = await Promise.all([
this.listVariables(dbhan),
this.listProcesses(dbhan),
this.listDatabases(dbhan),
]);
return {
variables,
processes: processes.map(p => ({
processId: p.processId,
connectionId: p.connectionId,
client: p.client,
operation: p.operation,
namespace: p.namespace,
runningTime: p.runningTime,
state: p.state,
waitingFor: p.waitingFor,
})),
databases: {
rows: databases.map(db => ({
name: db.name,
})),
columns: [
{
filterable: true,
sortable: true,
header: 'Database',
fieldName: 'name',
type: 'data',
},
],
},
};
},
async writeTable(dbhan, name, options) {
// @ts-ignore
return createBulkInsertStreamBase(this, stream, dbhan, name, options);
},
getAuthTypes() {
const res = [
{
title: 'Host and port',
name: 'hostPort',
disabledFields: ['socketPath'],
},
{
title: 'Socket',
name: 'socket',
disabledFields: ['server', 'port'],
},
];
if (authProxy.supportsAwsIam()) {
res.push({
title: 'AWS IAM',
name: 'awsIam',
});
}
return res;
},
}));
drivers.initialize = dbgateEnv => {
authProxy = dbgateEnv.authProxy;
};
module.exports = drivers;