mirror of
https://github.com/DeNNiiInc/dbgate.git
synced 2026-04-19 11:56:00 +00:00
Merge branch 'feature/impexp'
This commit is contained in:
@@ -94,14 +94,26 @@ module.exports = {
|
||||
handle_ping() {},
|
||||
|
||||
handle_freeData(runid, { freeData }) {
|
||||
const [resolve, reject] = this.requests[runid];
|
||||
const { resolve } = this.requests[runid];
|
||||
resolve(freeData);
|
||||
delete this.requests[runid];
|
||||
},
|
||||
|
||||
handle_copyStreamError(runid, { copyStreamError }) {
|
||||
const { reject, exitOnStreamError } = this.requests[runid] || {};
|
||||
if (exitOnStreamError) {
|
||||
reject(copyStreamError);
|
||||
delete this.requests[runid];
|
||||
}
|
||||
},
|
||||
|
||||
handle_progress(runid, progressData) {
|
||||
socket.emit(`runner-progress-${runid}`, progressData);
|
||||
},
|
||||
|
||||
rejectRequest(runid, error) {
|
||||
if (this.requests[runid]) {
|
||||
const [resolve, reject] = this.requests[runid];
|
||||
const { reject } = this.requests[runid];
|
||||
reject(error);
|
||||
delete this.requests[runid];
|
||||
}
|
||||
@@ -113,6 +125,8 @@ module.exports = {
|
||||
fs.writeFileSync(`${scriptFile}`, scriptText);
|
||||
fs.mkdirSync(directory);
|
||||
const pluginNames = extractPlugins(scriptText);
|
||||
// console.log('********************** SCRIPT TEXT **********************');
|
||||
// console.log(scriptText);
|
||||
logger.info({ scriptFile }, 'Running script');
|
||||
// const subprocess = fork(scriptFile, ['--checkParent', '--max-old-space-size=8192'], {
|
||||
const subprocess = fork(
|
||||
@@ -150,11 +164,13 @@ module.exports = {
|
||||
byline(subprocess.stdout).on('data', pipeDispatcher('info'));
|
||||
byline(subprocess.stderr).on('data', pipeDispatcher('error'));
|
||||
subprocess.on('exit', code => {
|
||||
// console.log('... EXITED', code);
|
||||
this.rejectRequest(runid, { message: 'No data returned, maybe input data source is too big' });
|
||||
logger.info({ code, pid: subprocess.pid }, 'Exited process');
|
||||
socket.emit(`runner-done-${runid}`, code);
|
||||
});
|
||||
subprocess.on('error', error => {
|
||||
// console.log('... ERROR subprocess', error);
|
||||
this.rejectRequest(runid, { message: error && (error.message || error.toString()) });
|
||||
console.error('... ERROR subprocess', error);
|
||||
this.dispatchMessage({
|
||||
@@ -231,7 +247,7 @@ module.exports = {
|
||||
|
||||
const promise = new Promise((resolve, reject) => {
|
||||
const runid = crypto.randomUUID();
|
||||
this.requests[runid] = [resolve, reject];
|
||||
this.requests[runid] = { resolve, reject, exitOnStreamError: true };
|
||||
this.startCore(runid, loaderScriptTemplate(prefix, functionName, props, runid));
|
||||
});
|
||||
return promise;
|
||||
|
||||
@@ -1,6 +1,25 @@
|
||||
const EnsureStreamHeaderStream = require('../utility/EnsureStreamHeaderStream');
|
||||
const Stream = require('stream');
|
||||
const ColumnMapTransformStream = require('../utility/ColumnMapTransformStream');
|
||||
const streamPipeline = require('../utility/streamPipeline');
|
||||
const { getLogger, extractErrorLogData, RowProgressReporter } = require('dbgate-tools');
|
||||
const logger = getLogger('copyStream');
|
||||
const stream = require('stream');
|
||||
|
||||
class ReportingTransform extends stream.Transform {
|
||||
constructor(reporter, options = {}) {
|
||||
super({ ...options, objectMode: true });
|
||||
this.reporter = reporter;
|
||||
}
|
||||
_transform(chunk, encoding, callback) {
|
||||
this.reporter.add(1);
|
||||
this.push(chunk);
|
||||
callback();
|
||||
}
|
||||
_flush(callback) {
|
||||
this.reporter.finish();
|
||||
callback();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Copies reader to writer. Used for import, export tables and transfer data between tables
|
||||
@@ -9,10 +28,23 @@ const ColumnMapTransformStream = require('../utility/ColumnMapTransformStream');
|
||||
* @param {object} options - options
|
||||
* @returns {Promise}
|
||||
*/
|
||||
function copyStream(input, output, options) {
|
||||
const { columns } = options || {};
|
||||
async function copyStream(input, output, options) {
|
||||
const { columns, progressName } = options || {};
|
||||
|
||||
if (progressName) {
|
||||
process.send({
|
||||
msgtype: 'progress',
|
||||
progressName,
|
||||
status: 'running',
|
||||
});
|
||||
}
|
||||
|
||||
const transforms = [];
|
||||
|
||||
if (progressName) {
|
||||
const reporter = new RowProgressReporter(progressName, 'readRowCount');
|
||||
transforms.push(new ReportingTransform(reporter));
|
||||
}
|
||||
if (columns) {
|
||||
transforms.push(new ColumnMapTransformStream(columns));
|
||||
}
|
||||
@@ -20,36 +52,37 @@ function copyStream(input, output, options) {
|
||||
transforms.push(new EnsureStreamHeaderStream());
|
||||
}
|
||||
|
||||
// return new Promise((resolve, reject) => {
|
||||
// Stream.pipeline(input, ...transforms, output, err => {
|
||||
// if (err) {
|
||||
// reject(err);
|
||||
// } else {
|
||||
// resolve();
|
||||
// }
|
||||
// });
|
||||
// });
|
||||
try {
|
||||
await streamPipeline(input, transforms, output);
|
||||
|
||||
return new Promise((resolve, reject) => {
|
||||
const finisher = output['finisher'] || output;
|
||||
finisher.on('finish', resolve);
|
||||
finisher.on('error', reject);
|
||||
|
||||
let lastStream = input;
|
||||
for (const tran of transforms) {
|
||||
lastStream.pipe(tran);
|
||||
lastStream = tran;
|
||||
if (progressName) {
|
||||
process.send({
|
||||
msgtype: 'progress',
|
||||
progressName,
|
||||
status: 'done',
|
||||
});
|
||||
}
|
||||
lastStream.pipe(output);
|
||||
} catch (err) {
|
||||
process.send({
|
||||
msgtype: 'copyStreamError',
|
||||
copyStreamError: {
|
||||
message: err.message,
|
||||
...err,
|
||||
},
|
||||
});
|
||||
|
||||
// if (output.requireFixedStructure) {
|
||||
// const ensureHeader = new EnsureStreamHeaderStream();
|
||||
// input.pipe(ensureHeader);
|
||||
// ensureHeader.pipe(output);
|
||||
// } else {
|
||||
// input.pipe(output);
|
||||
// }
|
||||
});
|
||||
if (progressName) {
|
||||
process.send({
|
||||
msgtype: 'progress',
|
||||
progressName,
|
||||
status: 'error',
|
||||
errorMessage: err.message,
|
||||
});
|
||||
}
|
||||
|
||||
logger.error(extractErrorLogData(err, { progressName }), 'Import/export job failed');
|
||||
// throw err;
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = copyStream;
|
||||
|
||||
@@ -24,8 +24,6 @@ async function dataDuplicator({
|
||||
const dbhan = systemConnection || (await connectUtility(driver, connection, 'write'));
|
||||
|
||||
try {
|
||||
logger.info(`Connected.`);
|
||||
|
||||
if (!analysedStructure) {
|
||||
analysedStructure = await driver.analyseFull(dbhan);
|
||||
}
|
||||
|
||||
@@ -19,8 +19,6 @@ async function dropAllDbObjects({ connection, systemConnection, driver, analysed
|
||||
|
||||
const dbhan = systemConnection || (await connectUtility(driver, connection, 'write'));
|
||||
|
||||
logger.info(`Connected.`);
|
||||
|
||||
if (!analysedStructure) {
|
||||
analysedStructure = await driver.analyseFull(dbhan);
|
||||
}
|
||||
|
||||
@@ -31,8 +31,6 @@ async function dumpDatabase({
|
||||
const dbhan = systemConnection || (await connectUtility(driver, connection, 'read', { forceRowsAsObjects: true }));
|
||||
|
||||
try {
|
||||
logger.info(`Connected.`);
|
||||
|
||||
const dumper = await driver.createBackupDumper(dbhan, {
|
||||
outputFile,
|
||||
databaseName,
|
||||
|
||||
@@ -36,7 +36,7 @@ async function executeQuery({
|
||||
}
|
||||
|
||||
try {
|
||||
logger.info(`Connected.`);
|
||||
logger.debug(`Running SQL query, length: ${sql.length}`);
|
||||
|
||||
await driver.script(dbhan, sql, { logScriptItems });
|
||||
} finally {
|
||||
|
||||
@@ -5,6 +5,7 @@ const { splitQueryStream } = require('dbgate-query-splitter/lib/splitQueryStream
|
||||
const download = require('./download');
|
||||
const stream = require('stream');
|
||||
const { getLogger } = require('dbgate-tools');
|
||||
const streamPipeline = require('../utility/streamPipeline');
|
||||
|
||||
const logger = getLogger('importDb');
|
||||
|
||||
@@ -43,25 +44,12 @@ class ImportStream extends stream.Transform {
|
||||
}
|
||||
}
|
||||
|
||||
function awaitStreamEnd(stream) {
|
||||
return new Promise((resolve, reject) => {
|
||||
stream.once('end', () => {
|
||||
resolve(true);
|
||||
});
|
||||
stream.once('error', err => {
|
||||
reject(err);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
async function importDatabase({ connection = undefined, systemConnection = undefined, driver = undefined, inputFile }) {
|
||||
logger.info(`Importing database`);
|
||||
|
||||
if (!driver) driver = requireEngineDriver(connection);
|
||||
const dbhan = systemConnection || (await connectUtility(driver, connection, 'write'));
|
||||
try {
|
||||
logger.info(`Connected.`);
|
||||
|
||||
logger.info(`Input file: ${inputFile}`);
|
||||
const downloadedFile = await download(inputFile);
|
||||
logger.info(`Downloaded file: ${downloadedFile}`);
|
||||
@@ -72,9 +60,8 @@ async function importDatabase({ connection = undefined, systemConnection = undef
|
||||
returnRichInfo: true,
|
||||
});
|
||||
const importStream = new ImportStream(dbhan, driver);
|
||||
// @ts-ignore
|
||||
splittedStream.pipe(importStream);
|
||||
await awaitStreamEnd(importStream);
|
||||
|
||||
await streamPipeline(splittedStream, importStream);
|
||||
} finally {
|
||||
if (!systemConnection) {
|
||||
await driver.close(dbhan);
|
||||
|
||||
@@ -53,8 +53,7 @@ async function jsonLinesReader({ fileName, encoding = 'utf-8', limitRows = undef
|
||||
);
|
||||
const liner = byline(fileStream);
|
||||
const parser = new ParseStream({ limitRows });
|
||||
liner.pipe(parser);
|
||||
return parser;
|
||||
return [liner, parser];
|
||||
}
|
||||
|
||||
module.exports = jsonLinesReader;
|
||||
|
||||
@@ -10,7 +10,6 @@ const download = require('./download');
|
||||
|
||||
const logger = getLogger('jsonReader');
|
||||
|
||||
|
||||
class ParseStream extends stream.Transform {
|
||||
constructor({ limitRows, jsonStyle, keyField }) {
|
||||
super({ objectMode: true });
|
||||
@@ -72,8 +71,12 @@ async function jsonReader({
|
||||
// @ts-ignore
|
||||
encoding
|
||||
);
|
||||
|
||||
const parseJsonStream = parser();
|
||||
fileStream.pipe(parseJsonStream);
|
||||
|
||||
const resultPipe = [fileStream, parseJsonStream];
|
||||
|
||||
// fileStream.pipe(parseJsonStream);
|
||||
|
||||
const parseStream = new ParseStream({ limitRows, jsonStyle, keyField });
|
||||
|
||||
@@ -81,15 +84,20 @@ async function jsonReader({
|
||||
|
||||
if (rootField) {
|
||||
const filterStream = pick({ filter: rootField });
|
||||
parseJsonStream.pipe(filterStream);
|
||||
filterStream.pipe(tramsformer);
|
||||
} else {
|
||||
parseJsonStream.pipe(tramsformer);
|
||||
resultPipe.push(filterStream);
|
||||
// parseJsonStream.pipe(filterStream);
|
||||
// filterStream.pipe(tramsformer);
|
||||
}
|
||||
// else {
|
||||
// parseJsonStream.pipe(tramsformer);
|
||||
// }
|
||||
|
||||
tramsformer.pipe(parseStream);
|
||||
resultPipe.push(tramsformer);
|
||||
resultPipe.push(parseStream);
|
||||
|
||||
return parseStream;
|
||||
// tramsformer.pipe(parseStream);
|
||||
|
||||
return resultPipe;
|
||||
}
|
||||
|
||||
module.exports = jsonReader;
|
||||
|
||||
@@ -99,9 +99,10 @@ async function jsonWriter({ fileName, jsonStyle, keyField = '_key', rootField, e
|
||||
logger.info(`Writing file ${fileName}`);
|
||||
const stringify = new StringifyStream({ jsonStyle, keyField, rootField });
|
||||
const fileStream = fs.createWriteStream(fileName, encoding);
|
||||
stringify.pipe(fileStream);
|
||||
stringify['finisher'] = fileStream;
|
||||
return stringify;
|
||||
return [stringify, fileStream];
|
||||
// stringify.pipe(fileStream);
|
||||
// stringify['finisher'] = fileStream;
|
||||
// return stringify;
|
||||
}
|
||||
|
||||
module.exports = jsonWriter;
|
||||
|
||||
@@ -6,15 +6,13 @@ const exportDbModel = require('../utility/exportDbModel');
|
||||
const logger = getLogger('analyseDb');
|
||||
|
||||
async function loadDatabase({ connection = undefined, systemConnection = undefined, driver = undefined, outputDir }) {
|
||||
logger.info(`Analysing database`);
|
||||
logger.debug(`Analysing database`);
|
||||
|
||||
if (!driver) driver = requireEngineDriver(connection);
|
||||
const dbhan = systemConnection || (await connectUtility(driver, connection, 'read', { forceRowsAsObjects: true }));
|
||||
try {
|
||||
logger.info(`Connected.`);
|
||||
|
||||
const dbInfo = await driver.analyseFull(dbhan);
|
||||
logger.info(`Analyse finished`);
|
||||
logger.debug(`Analyse finished`);
|
||||
|
||||
await exportDbModel(dbInfo, outputDir);
|
||||
} finally {
|
||||
|
||||
@@ -141,8 +141,9 @@ async function modifyJsonLinesReader({
|
||||
);
|
||||
const liner = byline(fileStream);
|
||||
const parser = new ParseStream({ limitRows, changeSet, mergedRows, mergeKey, mergeMode });
|
||||
liner.pipe(parser);
|
||||
return parser;
|
||||
return [liner, parser];
|
||||
// liner.pipe(parser);
|
||||
// return parser;
|
||||
}
|
||||
|
||||
module.exports = modifyJsonLinesReader;
|
||||
|
||||
@@ -30,7 +30,6 @@ async function queryReader({
|
||||
|
||||
const driver = requireEngineDriver(connection);
|
||||
const pool = await connectUtility(driver, connection, queryType == 'json' ? 'read' : 'script');
|
||||
logger.info(`Connected.`);
|
||||
const reader =
|
||||
queryType == 'json' ? await driver.readJsonQuery(pool, query) : await driver.readQuery(pool, query || sql);
|
||||
return reader;
|
||||
|
||||
@@ -44,9 +44,10 @@ async function sqlDataWriter({ fileName, dataName, driver, encoding = 'utf-8' })
|
||||
logger.info(`Writing file ${fileName}`);
|
||||
const stringify = new SqlizeStream({ fileName, dataName });
|
||||
const fileStream = fs.createWriteStream(fileName, encoding);
|
||||
stringify.pipe(fileStream);
|
||||
stringify['finisher'] = fileStream;
|
||||
return stringify;
|
||||
return [stringify, fileStream];
|
||||
// stringify.pipe(fileStream);
|
||||
// stringify['finisher'] = fileStream;
|
||||
// return stringify;
|
||||
}
|
||||
|
||||
module.exports = sqlDataWriter;
|
||||
|
||||
@@ -18,7 +18,6 @@ async function tableReader({ connection, systemConnection, pureName, schemaName,
|
||||
driver = requireEngineDriver(connection);
|
||||
}
|
||||
const dbhan = systemConnection || (await connectUtility(driver, connection, 'read'));
|
||||
logger.info(`Connected.`);
|
||||
|
||||
const fullName = { pureName, schemaName };
|
||||
|
||||
|
||||
@@ -26,7 +26,6 @@ async function tableWriter({ connection, schemaName, pureName, driver, systemCon
|
||||
}
|
||||
const dbhan = systemConnection || (await connectUtility(driver, connection, 'write'));
|
||||
|
||||
logger.info(`Connected.`);
|
||||
return await driver.writeTable(dbhan, { schemaName, pureName }, options);
|
||||
}
|
||||
|
||||
|
||||
18
packages/api/src/utility/streamPipeline.js
Normal file
18
packages/api/src/utility/streamPipeline.js
Normal file
@@ -0,0 +1,18 @@
|
||||
const stream = require('stream');
|
||||
const _ = require('lodash');
|
||||
|
||||
function streamPipeline(...processedStreams) {
|
||||
const streams = _.flattenDeep(processedStreams);
|
||||
return new Promise((resolve, reject) => {
|
||||
// @ts-ignore
|
||||
stream.pipeline(...streams, err => {
|
||||
if (err) {
|
||||
reject(err);
|
||||
} else {
|
||||
resolve();
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
module.exports = streamPipeline;
|
||||
Reference in New Issue
Block a user