diff --git a/packages/api/src/shell/copyStream.js b/packages/api/src/shell/copyStream.js index 4952c0b83..0c2f0967a 100644 --- a/packages/api/src/shell/copyStream.js +++ b/packages/api/src/shell/copyStream.js @@ -1,6 +1,6 @@ const EnsureStreamHeaderStream = require('../utility/EnsureStreamHeaderStream'); -const Stream = require('stream'); const ColumnMapTransformStream = require('../utility/ColumnMapTransformStream'); +const streamPipeline = require('../utility/streamPipeline'); /** * Copies reader to writer. Used for import, export tables and transfer data between tables @@ -9,7 +9,7 @@ const ColumnMapTransformStream = require('../utility/ColumnMapTransformStream'); * @param {object} options - options * @returns {Promise} */ -function copyStream(input, output, options) { +async function copyStream(input, output, options) { const { columns } = options || {}; const transforms = []; @@ -20,49 +20,19 @@ function copyStream(input, output, options) { transforms.push(new EnsureStreamHeaderStream()); } - // return new Promise((resolve, reject) => { - // Stream.pipeline(input, ...transforms, output, err => { - // if (err) { - // reject(err); - // } else { - // resolve(); - // } - // }); - // }); - - return new Promise((resolve, reject) => { - const finisher = output['finisher'] || output; - finisher.on('finish', resolve); - - input.on('error', err => { - // console.log('&&&&&&&&&&&&&&&&&&&&&&& CATCH ERROR IN COPY STREAM &&&&&&&&&&&&&&&&&&&&&&'); - // console.log(err); - process.send({ - msgtype: 'copyStreamError', - runid: this.runid, - copyStreamError: err, - }); + try { + await streamPipeline(input, transforms, output); + } catch (err) { + process.send({ + msgtype: 'copyStreamError', + runid: this.runid, + copyStreamError: { + message: err.message, + ...err, + }, }); - - input.on('error', reject); - - finisher.on('error', reject); - - let lastStream = input; - for (const tran of transforms) { - lastStream.pipe(tran); - lastStream = tran; - } - lastStream.pipe(output); - - // if (output.requireFixedStructure) { - // const ensureHeader = new EnsureStreamHeaderStream(); - // input.pipe(ensureHeader); - // ensureHeader.pipe(output); - // } else { - // input.pipe(output); - // } - }); + throw err; + } } module.exports = copyStream; diff --git a/packages/api/src/shell/importDatabase.js b/packages/api/src/shell/importDatabase.js index a10cb4632..d5a04ccf3 100644 --- a/packages/api/src/shell/importDatabase.js +++ b/packages/api/src/shell/importDatabase.js @@ -5,6 +5,7 @@ const { splitQueryStream } = require('dbgate-query-splitter/lib/splitQueryStream const download = require('./download'); const stream = require('stream'); const { getLogger } = require('dbgate-tools'); +const streamPipeline = require('../utility/streamPipeline'); const logger = getLogger('importDb'); @@ -43,17 +44,6 @@ class ImportStream extends stream.Transform { } } -function awaitStreamEnd(stream) { - return new Promise((resolve, reject) => { - stream.once('end', () => { - resolve(true); - }); - stream.once('error', err => { - reject(err); - }); - }); -} - async function importDatabase({ connection = undefined, systemConnection = undefined, driver = undefined, inputFile }) { logger.info(`Importing database`); @@ -72,9 +62,8 @@ async function importDatabase({ connection = undefined, systemConnection = undef returnRichInfo: true, }); const importStream = new ImportStream(dbhan, driver); - // @ts-ignore - splittedStream.pipe(importStream); - await awaitStreamEnd(importStream); + + await streamPipeline(splittedStream, importStream); } finally { if (!systemConnection) { await driver.close(dbhan); diff --git a/packages/api/src/shell/jsonLinesReader.js b/packages/api/src/shell/jsonLinesReader.js index f1d47dde5..da3d5d85d 100644 --- a/packages/api/src/shell/jsonLinesReader.js +++ b/packages/api/src/shell/jsonLinesReader.js @@ -53,8 +53,7 @@ async function jsonLinesReader({ fileName, encoding = 'utf-8', limitRows = undef ); const liner = byline(fileStream); const parser = new ParseStream({ limitRows }); - liner.pipe(parser); - return parser; + return [liner, parser]; } module.exports = jsonLinesReader; diff --git a/packages/api/src/shell/jsonReader.js b/packages/api/src/shell/jsonReader.js index 2de75930c..654ecf633 100644 --- a/packages/api/src/shell/jsonReader.js +++ b/packages/api/src/shell/jsonReader.js @@ -10,7 +10,6 @@ const download = require('./download'); const logger = getLogger('jsonReader'); - class ParseStream extends stream.Transform { constructor({ limitRows, jsonStyle, keyField }) { super({ objectMode: true }); @@ -72,8 +71,12 @@ async function jsonReader({ // @ts-ignore encoding ); + const parseJsonStream = parser(); - fileStream.pipe(parseJsonStream); + + const resultPipe = [fileStream, parseJsonStream]; + + // fileStream.pipe(parseJsonStream); const parseStream = new ParseStream({ limitRows, jsonStyle, keyField }); @@ -81,15 +84,20 @@ async function jsonReader({ if (rootField) { const filterStream = pick({ filter: rootField }); - parseJsonStream.pipe(filterStream); - filterStream.pipe(tramsformer); - } else { - parseJsonStream.pipe(tramsformer); + resultPipe.push(filterStream); + // parseJsonStream.pipe(filterStream); + // filterStream.pipe(tramsformer); } + // else { + // parseJsonStream.pipe(tramsformer); + // } - tramsformer.pipe(parseStream); + resultPipe.push(tramsformer); + resultPipe.push(parseStream); - return parseStream; + // tramsformer.pipe(parseStream); + + return resultPipe; } module.exports = jsonReader; diff --git a/packages/api/src/shell/jsonWriter.js b/packages/api/src/shell/jsonWriter.js index 8058cabdb..28f622cca 100644 --- a/packages/api/src/shell/jsonWriter.js +++ b/packages/api/src/shell/jsonWriter.js @@ -99,9 +99,10 @@ async function jsonWriter({ fileName, jsonStyle, keyField = '_key', rootField, e logger.info(`Writing file ${fileName}`); const stringify = new StringifyStream({ jsonStyle, keyField, rootField }); const fileStream = fs.createWriteStream(fileName, encoding); - stringify.pipe(fileStream); - stringify['finisher'] = fileStream; - return stringify; + return [stringify, fileStream]; + // stringify.pipe(fileStream); + // stringify['finisher'] = fileStream; + // return stringify; } module.exports = jsonWriter; diff --git a/packages/api/src/shell/modifyJsonLinesReader.js b/packages/api/src/shell/modifyJsonLinesReader.js index dec137ba0..32040acec 100644 --- a/packages/api/src/shell/modifyJsonLinesReader.js +++ b/packages/api/src/shell/modifyJsonLinesReader.js @@ -141,8 +141,9 @@ async function modifyJsonLinesReader({ ); const liner = byline(fileStream); const parser = new ParseStream({ limitRows, changeSet, mergedRows, mergeKey, mergeMode }); - liner.pipe(parser); - return parser; + return [liner, parser]; + // liner.pipe(parser); + // return parser; } module.exports = modifyJsonLinesReader; diff --git a/packages/api/src/shell/sqlDataWriter.js b/packages/api/src/shell/sqlDataWriter.js index 3dd6bad26..c83dab161 100644 --- a/packages/api/src/shell/sqlDataWriter.js +++ b/packages/api/src/shell/sqlDataWriter.js @@ -44,9 +44,10 @@ async function sqlDataWriter({ fileName, dataName, driver, encoding = 'utf-8' }) logger.info(`Writing file ${fileName}`); const stringify = new SqlizeStream({ fileName, dataName }); const fileStream = fs.createWriteStream(fileName, encoding); - stringify.pipe(fileStream); - stringify['finisher'] = fileStream; - return stringify; + return [stringify, fileStream]; + // stringify.pipe(fileStream); + // stringify['finisher'] = fileStream; + // return stringify; } module.exports = sqlDataWriter; diff --git a/packages/api/src/utility/streamPipeline.js b/packages/api/src/utility/streamPipeline.js new file mode 100644 index 000000000..11183b076 --- /dev/null +++ b/packages/api/src/utility/streamPipeline.js @@ -0,0 +1,18 @@ +const stream = require('stream'); +const _ = require('lodash'); + +function streamPipeline(...processedStreams) { + const streams = _.flattenDeep(processedStreams); + return new Promise((resolve, reject) => { + // @ts-ignore + stream.pipeline(...streams, err => { + if (err) { + reject(err); + } else { + resolve(); + } + }); + }); +} + +module.exports = streamPipeline; diff --git a/packages/types/engines.d.ts b/packages/types/engines.d.ts index 11235f5e6..b34b66746 100644 --- a/packages/types/engines.d.ts +++ b/packages/types/engines.d.ts @@ -144,6 +144,8 @@ export interface DatabaseHandle { treeKeySeparator?: string; } +export type StreamResult = stream.Readable | (stream.Readable | stream.Writable)[]; + export interface EngineDriver extends FilterBehaviourProvider { engine: string; title: string; @@ -191,15 +193,11 @@ export interface EngineDriver extends FilterBehaviourProvider { close(dbhan: DatabaseHandle): Promise; query(dbhan: DatabaseHandle, sql: string, options?: QueryOptions): Promise; stream(dbhan: DatabaseHandle, sql: string, options: StreamOptions); - readQuery(dbhan: DatabaseHandle, sql: string, structure?: TableInfo): Promise; - readJsonQuery(dbhan: DatabaseHandle, query: any, structure?: TableInfo): Promise; + readQuery(dbhan: DatabaseHandle, sql: string, structure?: TableInfo): Promise; + readJsonQuery(dbhan: DatabaseHandle, query: any, structure?: TableInfo): Promise; // eg. PostgreSQL COPY FROM stdin - writeQueryFromStream(dbhan: DatabaseHandle, sql: string): Promise; - writeTable( - dbhan: DatabaseHandle, - name: NamedObjectInfo, - options: WriteTableOptions - ): Promise; + writeQueryFromStream(dbhan: DatabaseHandle, sql: string): Promise; + writeTable(dbhan: DatabaseHandle, name: NamedObjectInfo, options: WriteTableOptions): Promise; analyseSingleObject( dbhan: DatabaseHandle, name: NamedObjectInfo, diff --git a/plugins/dbgate-plugin-csv/src/backend/reader.js b/plugins/dbgate-plugin-csv/src/backend/reader.js index 55ef181d9..e52065f20 100644 --- a/plugins/dbgate-plugin-csv/src/backend/reader.js +++ b/plugins/dbgate-plugin-csv/src/backend/reader.js @@ -95,9 +95,10 @@ async function reader({ fileName, encoding = 'utf-8', header = true, delimiter, }); const fileStream = fs.createReadStream(downloadedFile, encoding); const csvPrepare = new CsvPrepareStream({ header }); - fileStream.pipe(csvStream); - csvStream.pipe(csvPrepare); - return csvPrepare; + return [fileStream, csvStream, csvPrepare]; + // fileStream.pipe(csvStream); + // csvStream.pipe(csvPrepare); + // return csvPrepare; } reader.initialize = (dbgateEnv) => { diff --git a/plugins/dbgate-plugin-csv/src/backend/writer.js b/plugins/dbgate-plugin-csv/src/backend/writer.js index 7d62d1563..2297af0c4 100644 --- a/plugins/dbgate-plugin-csv/src/backend/writer.js +++ b/plugins/dbgate-plugin-csv/src/backend/writer.js @@ -31,11 +31,13 @@ async function writer({ fileName, encoding = 'utf-8', header = true, delimiter, const csvPrepare = new CsvPrepareStream({ header }); const csvStream = csv.stringify({ delimiter, quoted }); const fileStream = fs.createWriteStream(fileName, encoding); - csvPrepare.pipe(csvStream); - csvStream.pipe(fileStream); - csvPrepare['finisher'] = fileStream; + // csvPrepare.pipe(csvStream); + // csvStream.pipe(fileStream); + // csvPrepare['finisher'] = fileStream; csvPrepare.requireFixedStructure = true; - return csvPrepare; + + return [csvPrepare, csvStream, fileStream]; + // return csvPrepare; } module.exports = writer; diff --git a/plugins/dbgate-plugin-xml/src/backend/reader.js b/plugins/dbgate-plugin-xml/src/backend/reader.js index e374ea068..18b09adb9 100644 --- a/plugins/dbgate-plugin-xml/src/backend/reader.js +++ b/plugins/dbgate-plugin-xml/src/backend/reader.js @@ -63,8 +63,10 @@ async function reader({ fileName, encoding = 'utf-8', itemElementName }) { const fileStream = fs.createReadStream(fileName, encoding); const parser = new ParseStream({ itemElementName }); - fileStream.pipe(parser); - return parser; + + return [fileStream, parser]; + // fileStream.pipe(parser); + // return parser; } module.exports = reader; diff --git a/plugins/dbgate-plugin-xml/src/backend/writer.js b/plugins/dbgate-plugin-xml/src/backend/writer.js index 66787883c..0474eab02 100644 --- a/plugins/dbgate-plugin-xml/src/backend/writer.js +++ b/plugins/dbgate-plugin-xml/src/backend/writer.js @@ -73,9 +73,10 @@ async function writer({ fileName, encoding = 'utf-8', itemElementName, rootEleme logger.info(`Writing file ${fileName}`); const stringify = new StringifyStream({ itemElementName, rootElementName }); const fileStream = fs.createWriteStream(fileName, encoding); - stringify.pipe(fileStream); - stringify['finisher'] = fileStream; - return stringify; + return [stringify, fileStream]; + // stringify.pipe(fileStream); + // stringify['finisher'] = fileStream; + // return stringify; } module.exports = writer;