added plugins

This commit is contained in:
Jan Prochazka
2021-04-13 16:17:53 +02:00
parent 446e7c139f
commit 4d5cc119f2
115 changed files with 5519 additions and 24 deletions

View File

@@ -0,0 +1,24 @@
const { DatabaseAnalyser } = require('dbgate-tools');
class Analyser extends DatabaseAnalyser {
constructor(pool, driver) {
super(pool, driver);
}
async _runAnalysis() {
const collections = await this.pool.__getDatabase().listCollections().toArray();
const res = this.mergeAnalyseResult(
{
collections: collections.map((x) => ({
pureName: x.name,
})),
},
(x) => x.pureName
);
// console.log('MERGED', res);
return res;
}
}
module.exports = Analyser;

View File

@@ -0,0 +1,58 @@
export function createBulkInsertStream(driver, stream, pool, name, options) {
const collectionName = name.pureName;
const db = pool.__getDatabase();
const writable = new stream.Writable({
objectMode: true,
});
writable.buffer = [];
writable.wasHeader = false;
writable.addRow = (row) => {
if (!writable.wasHeader) {
writable.wasHeader = true;
if (row.__isStreamHeader ||
// TODO remove isArray test
Array.isArray(row.columns)) return;
}
writable.buffer.push(row);
};
writable.checkStructure = async () => {
if (options.dropIfExists || options.truncate) {
console.log(`Dropping collection ${collectionName}`);
await db.collection(collectionName).drop();
}
if (options.truncate) {
console.log(`Truncating collection ${collectionName}`);
await db.collection(collectionName).deleteMany({});
}
};
writable.send = async () => {
const rows = writable.buffer;
writable.buffer = [];
await db.collection(collectionName).insertMany(rows);
};
writable.sendIfFull = async () => {
if (writable.buffer.length > 100) {
await writable.send();
}
};
writable._write = async (chunk, encoding, callback) => {
writable.addRow(chunk);
await writable.sendIfFull();
callback();
};
writable._final = async (callback) => {
await writable.send();
callback();
};
return writable;
}

View File

@@ -0,0 +1,260 @@
const _ = require('lodash');
const stream = require('stream');
const isPromise = require('is-promise');
const driverBase = require('../frontend/driver');
const Analyser = require('./Analyser');
const MongoClient = require('mongodb').MongoClient;
const ObjectId = require('mongodb').ObjectId;
const Cursor = require('mongodb').Cursor;
const { createBulkInsertStream } = require('./createBulkInsertStream');
function readCursor(cursor, options) {
return new Promise((resolve) => {
options.recordset({ __isDynamicStructure: true });
cursor.on('data', (data) => options.row(data));
cursor.on('end', () => resolve());
});
}
const mongoIdRegex = /^[0-9a-f]{24}$/;
function convertCondition(condition) {
if (condition && _.isString(condition._id) && condition._id.match(mongoIdRegex)) {
return {
_id: ObjectId(condition._id),
};
}
return condition;
}
function findArrayResult(resValue) {
if (!_.isPlainObject(resValue)) return null;
const arrays = _.values(resValue).filter((x) => _.isArray(x));
if (arrays.length == 1) return arrays[0];
return null;
}
async function getScriptableDb(pool) {
const db = pool.__getDatabase();
const collections = await db.listCollections().toArray();
for (const collection of collections) {
db[collection.name] = db.collection(collection.name);
}
return db;
}
/** @type {import('dbgate-types').EngineDriver} */
const driver = {
...driverBase,
analyserClass: Analyser,
async connect({ server, port, user, password, database, useDatabaseUrl, databaseUrl, ssl }) {
// let mongoUrl = databaseUrl;
// if (!useDatabaseUrl) {
// mongoUrl = user ? `mongodb://${user}:${password}@${server}:${port}` : `mongodb://${server}:${port}`;
// if (database) mongoUrl += '/' + database;
// }
const mongoUrl = useDatabaseUrl
? databaseUrl
: user
? `mongodb://${user}:${password}@${server}:${port}`
: `mongodb://${server}:${port}`;
const options = {};
if (ssl) {
options.tls = true;
options.tlsCAFile = ssl.ca;
options.tlsCertificateKeyFile = ssl.cert || ssl.key;
options.tlsCertificateKeyFilePassword = ssl.password;
options.tlsAllowInvalidCertificates = !ssl.rejectUnauthorized;
}
const pool = new MongoClient(mongoUrl, options);
await pool.connect();
// const pool = await MongoClient.connect(mongoUrl);
pool.__getDatabase = database ? () => pool.db(database) : () => pool.db();
pool.__databaseName = database;
return pool;
},
// @ts-ignore
async query(pool, sql) {
return {
rows: [],
columns: [],
};
},
async stream(pool, sql, options) {
let func;
try {
func = eval(`(db,ObjectId) => ${sql}`);
} catch (err) {
options.info({
message: 'Error compiling expression: ' + err.message,
time: new Date(),
severity: 'error',
});
options.done();
return;
}
const db = await getScriptableDb(pool);
let exprValue;
try {
exprValue = func(db, ObjectId);
} catch (err) {
options.info({
message: 'Error evaluating expression: ' + err.message,
time: new Date(),
severity: 'error',
});
options.done();
return;
}
if (exprValue instanceof Cursor) {
await readCursor(exprValue, options);
} else if (isPromise(exprValue)) {
try {
const resValue = await exprValue;
options.info({
message: 'Command succesfully executed',
time: new Date(),
severity: 'info',
});
options.info({
message: JSON.stringify(resValue),
time: new Date(),
severity: 'info',
});
const arrayRes = findArrayResult(resValue);
if (arrayRes) {
options.recordset({ __isDynamicStructure: true });
for (const row of arrayRes) {
options.row(row);
}
}
} catch (err) {
options.info({
message: 'Error when running command: ' + err.message,
time: new Date(),
severity: 'error',
});
}
}
options.done();
},
async readQuery(pool, sql, structure) {
try {
const json = JSON.parse(sql);
if (json && json.pureName) {
sql = `db.${json.pureName}.find()`;
}
} catch (err) {
// query is not JSON serialized collection name
}
// const pass = new stream.PassThrough({
// objectMode: true,
// highWaterMark: 100,
// });
func = eval(`(db,ObjectId) => ${sql}`);
const db = await getScriptableDb(pool);
exprValue = func(db, ObjectId);
// return directly stream without header row
return exprValue;
// pass.write(structure || { __isDynamicStructure: true });
// exprValue.on('data', (row) => pass.write(row));
// exprValue.on('end', () => pass.end());
// return pass;
},
async writeTable(pool, name, options) {
return createBulkInsertStream(this, stream, pool, name, options);
},
async getVersion(pool) {
const status = await pool.__getDatabase().admin().serverInfo();
return status;
},
async listDatabases(pool) {
const res = await pool.__getDatabase().admin().listDatabases();
return res.databases;
},
async readCollection(pool, options) {
try {
const collection = pool.__getDatabase().collection(options.pureName);
if (options.countDocuments) {
const count = await collection.countDocuments(options.condition || {});
return { count };
} else {
let cursor = await collection.find(options.condition || {});
if (options.sort) cursor = cursor.sort(options.sort);
if (options.skip) cursor = cursor.skip(options.skip);
if (options.limit) cursor = cursor.limit(options.limit);
const rows = await cursor.toArray();
return { rows };
}
} catch (err) {
return { errorMessage: err.message };
}
},
async updateCollection(pool, changeSet) {
const res = {
inserted: [],
updated: [],
deleted: [],
replaced: [],
};
try {
const db = pool.__getDatabase();
for (const insert of changeSet.inserts) {
const collection = db.collection(insert.pureName);
const document = {
...insert.document,
...insert.fields,
};
const resdoc = await collection.insert(document);
res.inserted.push(resdoc._id);
}
for (const update of changeSet.updates) {
const collection = db.collection(update.pureName);
if (update.document) {
const document = {
...update.document,
...update.fields,
};
const doc = await collection.findOne(convertCondition(update.condition));
if (doc) {
const resdoc = await collection.replaceOne(convertCondition(update.condition), {
...document,
_id: doc._id,
});
res.replaced.push(resdoc._id);
}
} else {
const resdoc = await collection.updateOne(convertCondition(update.condition), { $set: update.fields });
res.updated.push(resdoc._id);
}
}
for (const del of changeSet.deletes) {
const collection = db.collection(del.pureName);
const resdoc = await collection.deleteOne(convertCondition(del.condition));
res.deleted.push(resdoc._id);
}
return res;
} catch (err) {
return { errorMessage: err.message };
}
},
async createDatabase(pool, name) {
const db = pool.db(name);
await db.createCollection('collection1');
},
};
module.exports = driver;

View File

@@ -0,0 +1,6 @@
const driver = require('./driver');
module.exports = {
packageName: 'dbgate-plugin-mongo',
driver,
};