Files
dbgate/plugins/dbgate-plugin-redis/src/backend/driver.js
2022-03-10 17:54:57 +01:00

229 lines
5.9 KiB
JavaScript

const _ = require('lodash');
const async = require('async');
const stream = require('stream');
const driverBase = require('../frontend/driver');
const Analyser = require('./Analyser');
const Redis = require('ioredis');
/** @type {import('dbgate-types').EngineDriver} */
const driver = {
...driverBase,
analyserClass: Analyser,
async connect({ server, port, password, database }) {
let db = 0;
if (_.isString(database) && database.startsWith('db')) db = parseInt(database.substring(2));
if (_.isNumber(database)) db = database;
const pool = new Redis({
host: server,
port,
password,
db,
});
return pool;
},
// @ts-ignore
async query(pool, sql) {
return {
rows: [],
columns: [],
};
},
async stream(pool, sql, options) {
return null;
},
async readQuery(pool, sql, structure) {
const pass = new stream.PassThrough({
objectMode: true,
highWaterMark: 100,
});
// pass.write(structure)
// pass.write(row1)
// pass.write(row2)
// pass.end()
return pass;
},
async writeTable(pool, name, options) {
return createBulkInsertStreamBase(this, stream, pool, name, options);
},
async info(pool) {
const info = await pool.info();
return _.fromPairs(
info
.split('\n')
.filter((x) => x.trim() && !x.trim().startsWith('#'))
.map((x) => x.split(':'))
);
},
async getVersion(pool) {
const info = await this.info(pool);
return {
version: info.redis_version,
versionText: `Redis ${info.redis_version}`,
};
},
async listDatabases(pool) {
const info = await this.info(pool);
return _.range(16).map((index) => ({ name: `db${index}`, extInfo: info[`db${index}`], sortOrder: index }));
},
async loadKeys(pool, root = '') {
const keys = await this.getKeys(pool, root);
const res = this.extractKeysFromLevel(root, keys);
await this.enrichKeyInfo(pool, res);
return res;
},
async getKeys(pool, root = '') {
const res = [];
let cursor = 0;
do {
const [strCursor, keys] = await pool.scan(cursor, 'MATCH', root ? `${root}:*` : '*', 'COUNT', 100);
res.push(...keys);
cursor = parseInt(strCursor);
} while (cursor > 0);
return res;
},
extractKeysFromLevel(root, keys) {
const prefix = root ? `${root}:` : '';
const rootSplit = _.compact(root.split(':'));
const res = {};
for (const key of keys) {
if (!key.startsWith(prefix)) continue;
const keySplit = key.split(':');
if (keySplit.length > rootSplit.length) {
const text = keySplit[rootSplit.length];
if (keySplit.length == rootSplit.length + 1) {
res[text] = {
text,
key,
};
} else {
const dctKey = '::' + text;
if (res[dctKey]) {
res[dctKey].count++;
} else {
res[dctKey] = {
text: text + ':*',
type: 'dir',
root: keySplit.slice(0, rootSplit.length + 1).join(':'),
count: 1,
};
}
}
}
}
return Object.values(res);
},
async getKeyCardinality(pool, key, type) {
switch (type) {
case 'list':
return pool.llen(key);
case 'set':
return pool.scard(key);
case 'zset':
return pool.zcard(key);
case 'stream':
return pool.xlen(key);
case 'hash':
return pool.hlen(key);
}
},
async enrichOneKeyInfo(pool, item) {
item.type = await pool.type(item.key);
item.count = await this.getKeyCardinality(pool, item.key, item.type);
},
async enrichKeyInfo(pool, levelInfo) {
await async.eachLimit(
levelInfo.filter((x) => x.key),
10,
async (item) => await this.enrichOneKeyInfo(pool, item)
);
},
async loadKeyInfo(pool, key) {
const res = {};
const type = await pool.type(key);
res.key = key;
res.type = type;
res.ttl = await pool.ttl(key);
res.count = await this.getKeyCardinality(pool, key, type);
switch (type) {
case 'string':
res.value = await pool.get(key);
break;
case 'list':
res.tableColumns = [{ name: 'value' }];
res.addMethod = 'rpush';
break;
case 'set':
res.tableColumns = [{ name: 'value' }];
res.keyColumn = 'value';
res.addMethod = 'sadd';
break;
case 'zset':
res.tableColumns = [{ name: 'score' }, { name: 'value' }];
res.keyColumn = 'value';
res.addMethod = 'zadd';
break;
case 'hash':
res.tableColumns = [{ name: 'key' }, { name: 'value' }];
res.keyColumn = 'key';
res.addMethod = 'hset';
break;
}
return res;
},
async callMethod(pool, method, args) {
return await pool[method](...args);
},
async loadKeyTableRange(pool, key, cursor, count) {
const type = await pool.type(key);
switch (type) {
case 'list': {
const res = await pool.lrange(key, cursor, cursor + count);
return {
cursor: res.length > count ? cursor + count : 0,
items: res.map((value) => ({ value })).slice(0, count),
};
}
case 'set': {
const res = await pool.sscan(key, cursor, 'COUNT', count);
return {
cursor: parseInt(res[0]),
items: res[1].map((value) => ({ value })),
};
}
case 'zset': {
const res = await pool.zscan(key, cursor, 'COUNT', count);
return {
cursor: parseInt(res[0]),
items: _.chunk(res[1], 2).map((item) => ({ value: item[0], score: item[1] })),
};
}
case 'hash': {
const res = await pool.hscan(key, cursor, 'COUNT', count);
return {
cursor: parseInt(res[0]),
items: _.chunk(res[1], 2).map((item) => ({ key: item[0], value: item[1] })),
};
}
}
return null;
},
};
module.exports = driver;