mirror of
https://github.com/DeNNiiInc/dbgate.git
synced 2026-04-21 19:56:02 +00:00
SYNC: Merge pull request #4 from dbgate/feature/charts
This commit is contained in:
@@ -10,6 +10,7 @@ const requirePluginFunction = require('../utility/requirePluginFunction');
|
||||
const socket = require('../utility/socket');
|
||||
const crypto = require('crypto');
|
||||
const dbgateApi = require('../shell');
|
||||
const { ChartProcessor } = require('dbgate-datalib');
|
||||
|
||||
function readFirstLine(file) {
|
||||
return new Promise((resolve, reject) => {
|
||||
@@ -302,4 +303,29 @@ module.exports = {
|
||||
await dbgateApi.download(uri, { targetFile: getJslFileName(jslid) });
|
||||
return { jslid };
|
||||
},
|
||||
|
||||
buildChart_meta: true,
|
||||
async buildChart({ jslid, definition }) {
|
||||
const datastore = new JsonLinesDatastore(getJslFileName(jslid));
|
||||
const processor = new ChartProcessor(definition ? [definition] : undefined);
|
||||
await datastore.enumRows(row => {
|
||||
processor.addRow(row);
|
||||
return true;
|
||||
});
|
||||
processor.finalize();
|
||||
return processor.charts;
|
||||
},
|
||||
|
||||
detectChartColumns_meta: true,
|
||||
async detectChartColumns({ jslid }) {
|
||||
const datastore = new JsonLinesDatastore(getJslFileName(jslid));
|
||||
const processor = new ChartProcessor();
|
||||
processor.autoDetectCharts = false;
|
||||
await datastore.enumRows(row => {
|
||||
processor.addRow(row);
|
||||
return true;
|
||||
});
|
||||
processor.finalize();
|
||||
return processor.availableColumns;
|
||||
},
|
||||
};
|
||||
|
||||
@@ -83,6 +83,11 @@ module.exports = {
|
||||
jsldata.notifyChangedStats(stats);
|
||||
},
|
||||
|
||||
handle_charts(sesid, props) {
|
||||
const { jslid, charts, resultIndex } = props;
|
||||
socket.emit(`session-charts-${sesid}`, { jslid, resultIndex, charts });
|
||||
},
|
||||
|
||||
handle_initializeFile(sesid, props) {
|
||||
const { jslid } = props;
|
||||
socket.emit(`session-initialize-file-${jslid}`);
|
||||
@@ -141,7 +146,7 @@ module.exports = {
|
||||
},
|
||||
|
||||
executeQuery_meta: true,
|
||||
async executeQuery({ sesid, sql, autoCommit, limitRows }) {
|
||||
async executeQuery({ sesid, sql, autoCommit, limitRows, frontMatter }) {
|
||||
const session = this.opened.find(x => x.sesid == sesid);
|
||||
if (!session) {
|
||||
throw new Error('Invalid session');
|
||||
@@ -149,7 +154,7 @@ module.exports = {
|
||||
|
||||
logger.info({ sesid, sql }, 'Processing query');
|
||||
this.dispatchMessage(sesid, 'Query execution started');
|
||||
session.subprocess.send({ msgtype: 'executeQuery', sql, autoCommit, limitRows });
|
||||
session.subprocess.send({ msgtype: 'executeQuery', sql, autoCommit, limitRows, frontMatter });
|
||||
|
||||
return { state: 'ok' };
|
||||
},
|
||||
|
||||
@@ -117,7 +117,7 @@ async function handleExecuteControlCommand({ command }) {
|
||||
}
|
||||
}
|
||||
|
||||
async function handleExecuteQuery({ sql, autoCommit, limitRows }) {
|
||||
async function handleExecuteQuery({ sql, autoCommit, limitRows, frontMatter }) {
|
||||
lastActivity = new Date().getTime();
|
||||
|
||||
await waitConnected();
|
||||
@@ -146,7 +146,7 @@ async function handleExecuteQuery({ sql, autoCommit, limitRows }) {
|
||||
...driver.getQuerySplitterOptions('stream'),
|
||||
returnRichInfo: true,
|
||||
})) {
|
||||
await handleQueryStream(dbhan, driver, queryStreamInfoHolder, sqlItem, undefined, limitRows);
|
||||
await handleQueryStream(dbhan, driver, queryStreamInfoHolder, sqlItem, undefined, limitRows, frontMatter);
|
||||
// const handler = new StreamHandler(resultIndex);
|
||||
// const stream = await driver.stream(systemConnection, sqlItem, handler);
|
||||
// handler.stream = stream;
|
||||
|
||||
@@ -5,6 +5,8 @@ const _ = require('lodash');
|
||||
|
||||
const { jsldir } = require('../utility/directories');
|
||||
const { serializeJsTypesReplacer } = require('dbgate-tools');
|
||||
const { ChartProcessor } = require('dbgate-datalib');
|
||||
const { isProApp } = require('./checkLicense');
|
||||
|
||||
class QueryStreamTableWriter {
|
||||
constructor(sesid = undefined) {
|
||||
@@ -12,9 +14,12 @@ class QueryStreamTableWriter {
|
||||
this.currentChangeIndex = 1;
|
||||
this.initializedFile = false;
|
||||
this.sesid = sesid;
|
||||
if (isProApp()) {
|
||||
this.chartProcessor = new ChartProcessor();
|
||||
}
|
||||
}
|
||||
|
||||
initializeFromQuery(structure, resultIndex) {
|
||||
initializeFromQuery(structure, resultIndex, chartDefinition) {
|
||||
this.jslid = crypto.randomUUID();
|
||||
this.currentFile = path.join(jsldir(), `${this.jslid}.jsonl`);
|
||||
fs.writeFileSync(
|
||||
@@ -28,6 +33,9 @@ class QueryStreamTableWriter {
|
||||
this.writeCurrentStats(false, false);
|
||||
this.resultIndex = resultIndex;
|
||||
this.initializedFile = true;
|
||||
if (isProApp() && chartDefinition) {
|
||||
this.chartProcessor = new ChartProcessor([chartDefinition]);
|
||||
}
|
||||
process.send({ msgtype: 'recordset', jslid: this.jslid, resultIndex, sesid: this.sesid });
|
||||
}
|
||||
|
||||
@@ -40,6 +48,15 @@ class QueryStreamTableWriter {
|
||||
row(row) {
|
||||
// console.log('ACCEPT ROW', row);
|
||||
this.currentStream.write(JSON.stringify(row, serializeJsTypesReplacer) + '\n');
|
||||
try {
|
||||
if (this.chartProcessor) {
|
||||
this.chartProcessor.addRow(row);
|
||||
}
|
||||
} catch (e) {
|
||||
console.error('Error processing chart row', e);
|
||||
this.chartProcessor = null;
|
||||
}
|
||||
|
||||
this.currentRowCount += 1;
|
||||
|
||||
if (!this.plannedStats) {
|
||||
@@ -87,6 +104,23 @@ class QueryStreamTableWriter {
|
||||
this.currentStream.end(() => {
|
||||
this.writeCurrentStats(true, true);
|
||||
if (afterClose) afterClose();
|
||||
if (this.chartProcessor) {
|
||||
try {
|
||||
this.chartProcessor.finalize();
|
||||
if (this.chartProcessor.charts.length > 0) {
|
||||
process.send({
|
||||
msgtype: 'charts',
|
||||
sesid: this.sesid,
|
||||
jslid: this.jslid,
|
||||
charts: this.chartProcessor.charts,
|
||||
resultIndex: this.resultIndex,
|
||||
});
|
||||
}
|
||||
} catch (e) {
|
||||
console.error('Error finalizing chart processor', e);
|
||||
this.chartProcessor = null;
|
||||
}
|
||||
}
|
||||
resolve();
|
||||
});
|
||||
} else {
|
||||
@@ -97,10 +131,18 @@ class QueryStreamTableWriter {
|
||||
}
|
||||
|
||||
class StreamHandler {
|
||||
constructor(queryStreamInfoHolder, resolve, startLine, sesid = undefined, limitRows = undefined) {
|
||||
constructor(
|
||||
queryStreamInfoHolder,
|
||||
resolve,
|
||||
startLine,
|
||||
sesid = undefined,
|
||||
limitRows = undefined,
|
||||
frontMatter = undefined
|
||||
) {
|
||||
this.recordset = this.recordset.bind(this);
|
||||
this.startLine = startLine;
|
||||
this.sesid = sesid;
|
||||
this.frontMatter = frontMatter;
|
||||
this.limitRows = limitRows;
|
||||
this.rowsLimitOverflow = false;
|
||||
this.row = this.row.bind(this);
|
||||
@@ -133,7 +175,8 @@ class StreamHandler {
|
||||
this.currentWriter = new QueryStreamTableWriter(this.sesid);
|
||||
this.currentWriter.initializeFromQuery(
|
||||
Array.isArray(columns) ? { columns } : columns,
|
||||
this.queryStreamInfoHolder.resultIndex
|
||||
this.queryStreamInfoHolder.resultIndex,
|
||||
this.frontMatter?.[`chart-${this.queryStreamInfoHolder.resultIndex + 1}`]
|
||||
);
|
||||
this.queryStreamInfoHolder.resultIndex += 1;
|
||||
this.rowCounter = 0;
|
||||
@@ -201,10 +244,25 @@ class StreamHandler {
|
||||
}
|
||||
}
|
||||
|
||||
function handleQueryStream(dbhan, driver, queryStreamInfoHolder, sqlItem, sesid = undefined, limitRows = undefined) {
|
||||
function handleQueryStream(
|
||||
dbhan,
|
||||
driver,
|
||||
queryStreamInfoHolder,
|
||||
sqlItem,
|
||||
sesid = undefined,
|
||||
limitRows = undefined,
|
||||
frontMatter = undefined
|
||||
) {
|
||||
return new Promise((resolve, reject) => {
|
||||
const start = sqlItem.trimStart || sqlItem.start;
|
||||
const handler = new StreamHandler(queryStreamInfoHolder, resolve, start && start.line, sesid, limitRows);
|
||||
const handler = new StreamHandler(
|
||||
queryStreamInfoHolder,
|
||||
resolve,
|
||||
start && start.line,
|
||||
sesid,
|
||||
limitRows,
|
||||
frontMatter
|
||||
);
|
||||
driver.stream(dbhan, sqlItem.text, handler);
|
||||
});
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user