using stream.pipeline for better handling errors

This commit is contained in:
SPRINX0\prochazka
2025-03-04 09:51:29 +01:00
parent 69f781d3de
commit beca5c6e45
13 changed files with 87 additions and 96 deletions

View File

@@ -1,6 +1,6 @@
const EnsureStreamHeaderStream = require('../utility/EnsureStreamHeaderStream'); const EnsureStreamHeaderStream = require('../utility/EnsureStreamHeaderStream');
const Stream = require('stream');
const ColumnMapTransformStream = require('../utility/ColumnMapTransformStream'); const ColumnMapTransformStream = require('../utility/ColumnMapTransformStream');
const streamPipeline = require('../utility/streamPipeline');
/** /**
* Copies reader to writer. Used for import, export tables and transfer data between tables * Copies reader to writer. Used for import, export tables and transfer data between tables
@@ -9,7 +9,7 @@ const ColumnMapTransformStream = require('../utility/ColumnMapTransformStream');
* @param {object} options - options * @param {object} options - options
* @returns {Promise} * @returns {Promise}
*/ */
function copyStream(input, output, options) { async function copyStream(input, output, options) {
const { columns } = options || {}; const { columns } = options || {};
const transforms = []; const transforms = [];
@@ -20,49 +20,19 @@ function copyStream(input, output, options) {
transforms.push(new EnsureStreamHeaderStream()); transforms.push(new EnsureStreamHeaderStream());
} }
// return new Promise((resolve, reject) => { try {
// Stream.pipeline(input, ...transforms, output, err => { await streamPipeline(input, transforms, output);
// if (err) { } catch (err) {
// reject(err); process.send({
// } else { msgtype: 'copyStreamError',
// resolve(); runid: this.runid,
// } copyStreamError: {
// }); message: err.message,
// }); ...err,
},
return new Promise((resolve, reject) => {
const finisher = output['finisher'] || output;
finisher.on('finish', resolve);
input.on('error', err => {
// console.log('&&&&&&&&&&&&&&&&&&&&&&& CATCH ERROR IN COPY STREAM &&&&&&&&&&&&&&&&&&&&&&');
// console.log(err);
process.send({
msgtype: 'copyStreamError',
runid: this.runid,
copyStreamError: err,
});
}); });
throw err;
input.on('error', reject); }
finisher.on('error', reject);
let lastStream = input;
for (const tran of transforms) {
lastStream.pipe(tran);
lastStream = tran;
}
lastStream.pipe(output);
// if (output.requireFixedStructure) {
// const ensureHeader = new EnsureStreamHeaderStream();
// input.pipe(ensureHeader);
// ensureHeader.pipe(output);
// } else {
// input.pipe(output);
// }
});
} }
module.exports = copyStream; module.exports = copyStream;

View File

@@ -5,6 +5,7 @@ const { splitQueryStream } = require('dbgate-query-splitter/lib/splitQueryStream
const download = require('./download'); const download = require('./download');
const stream = require('stream'); const stream = require('stream');
const { getLogger } = require('dbgate-tools'); const { getLogger } = require('dbgate-tools');
const streamPipeline = require('../utility/streamPipeline');
const logger = getLogger('importDb'); const logger = getLogger('importDb');
@@ -43,17 +44,6 @@ class ImportStream extends stream.Transform {
} }
} }
function awaitStreamEnd(stream) {
return new Promise((resolve, reject) => {
stream.once('end', () => {
resolve(true);
});
stream.once('error', err => {
reject(err);
});
});
}
async function importDatabase({ connection = undefined, systemConnection = undefined, driver = undefined, inputFile }) { async function importDatabase({ connection = undefined, systemConnection = undefined, driver = undefined, inputFile }) {
logger.info(`Importing database`); logger.info(`Importing database`);
@@ -72,9 +62,8 @@ async function importDatabase({ connection = undefined, systemConnection = undef
returnRichInfo: true, returnRichInfo: true,
}); });
const importStream = new ImportStream(dbhan, driver); const importStream = new ImportStream(dbhan, driver);
// @ts-ignore
splittedStream.pipe(importStream); await streamPipeline(splittedStream, importStream);
await awaitStreamEnd(importStream);
} finally { } finally {
if (!systemConnection) { if (!systemConnection) {
await driver.close(dbhan); await driver.close(dbhan);

View File

@@ -53,8 +53,7 @@ async function jsonLinesReader({ fileName, encoding = 'utf-8', limitRows = undef
); );
const liner = byline(fileStream); const liner = byline(fileStream);
const parser = new ParseStream({ limitRows }); const parser = new ParseStream({ limitRows });
liner.pipe(parser); return [liner, parser];
return parser;
} }
module.exports = jsonLinesReader; module.exports = jsonLinesReader;

View File

@@ -10,7 +10,6 @@ const download = require('./download');
const logger = getLogger('jsonReader'); const logger = getLogger('jsonReader');
class ParseStream extends stream.Transform { class ParseStream extends stream.Transform {
constructor({ limitRows, jsonStyle, keyField }) { constructor({ limitRows, jsonStyle, keyField }) {
super({ objectMode: true }); super({ objectMode: true });
@@ -72,8 +71,12 @@ async function jsonReader({
// @ts-ignore // @ts-ignore
encoding encoding
); );
const parseJsonStream = parser(); const parseJsonStream = parser();
fileStream.pipe(parseJsonStream);
const resultPipe = [fileStream, parseJsonStream];
// fileStream.pipe(parseJsonStream);
const parseStream = new ParseStream({ limitRows, jsonStyle, keyField }); const parseStream = new ParseStream({ limitRows, jsonStyle, keyField });
@@ -81,15 +84,20 @@ async function jsonReader({
if (rootField) { if (rootField) {
const filterStream = pick({ filter: rootField }); const filterStream = pick({ filter: rootField });
parseJsonStream.pipe(filterStream); resultPipe.push(filterStream);
filterStream.pipe(tramsformer); // parseJsonStream.pipe(filterStream);
} else { // filterStream.pipe(tramsformer);
parseJsonStream.pipe(tramsformer);
} }
// else {
// parseJsonStream.pipe(tramsformer);
// }
tramsformer.pipe(parseStream); resultPipe.push(tramsformer);
resultPipe.push(parseStream);
return parseStream; // tramsformer.pipe(parseStream);
return resultPipe;
} }
module.exports = jsonReader; module.exports = jsonReader;

View File

@@ -99,9 +99,10 @@ async function jsonWriter({ fileName, jsonStyle, keyField = '_key', rootField, e
logger.info(`Writing file ${fileName}`); logger.info(`Writing file ${fileName}`);
const stringify = new StringifyStream({ jsonStyle, keyField, rootField }); const stringify = new StringifyStream({ jsonStyle, keyField, rootField });
const fileStream = fs.createWriteStream(fileName, encoding); const fileStream = fs.createWriteStream(fileName, encoding);
stringify.pipe(fileStream); return [stringify, fileStream];
stringify['finisher'] = fileStream; // stringify.pipe(fileStream);
return stringify; // stringify['finisher'] = fileStream;
// return stringify;
} }
module.exports = jsonWriter; module.exports = jsonWriter;

View File

@@ -141,8 +141,9 @@ async function modifyJsonLinesReader({
); );
const liner = byline(fileStream); const liner = byline(fileStream);
const parser = new ParseStream({ limitRows, changeSet, mergedRows, mergeKey, mergeMode }); const parser = new ParseStream({ limitRows, changeSet, mergedRows, mergeKey, mergeMode });
liner.pipe(parser); return [liner, parser];
return parser; // liner.pipe(parser);
// return parser;
} }
module.exports = modifyJsonLinesReader; module.exports = modifyJsonLinesReader;

View File

@@ -44,9 +44,10 @@ async function sqlDataWriter({ fileName, dataName, driver, encoding = 'utf-8' })
logger.info(`Writing file ${fileName}`); logger.info(`Writing file ${fileName}`);
const stringify = new SqlizeStream({ fileName, dataName }); const stringify = new SqlizeStream({ fileName, dataName });
const fileStream = fs.createWriteStream(fileName, encoding); const fileStream = fs.createWriteStream(fileName, encoding);
stringify.pipe(fileStream); return [stringify, fileStream];
stringify['finisher'] = fileStream; // stringify.pipe(fileStream);
return stringify; // stringify['finisher'] = fileStream;
// return stringify;
} }
module.exports = sqlDataWriter; module.exports = sqlDataWriter;

View File

@@ -0,0 +1,18 @@
const stream = require('stream');
const _ = require('lodash');
function streamPipeline(...processedStreams) {
const streams = _.flattenDeep(processedStreams);
return new Promise((resolve, reject) => {
// @ts-ignore
stream.pipeline(...streams, err => {
if (err) {
reject(err);
} else {
resolve();
}
});
});
}
module.exports = streamPipeline;

View File

@@ -144,6 +144,8 @@ export interface DatabaseHandle<TClient = any> {
treeKeySeparator?: string; treeKeySeparator?: string;
} }
export type StreamResult = stream.Readable | (stream.Readable | stream.Writable)[];
export interface EngineDriver<TClient = any> extends FilterBehaviourProvider { export interface EngineDriver<TClient = any> extends FilterBehaviourProvider {
engine: string; engine: string;
title: string; title: string;
@@ -191,15 +193,11 @@ export interface EngineDriver<TClient = any> extends FilterBehaviourProvider {
close(dbhan: DatabaseHandle<TClient>): Promise<any>; close(dbhan: DatabaseHandle<TClient>): Promise<any>;
query(dbhan: DatabaseHandle<TClient>, sql: string, options?: QueryOptions): Promise<QueryResult>; query(dbhan: DatabaseHandle<TClient>, sql: string, options?: QueryOptions): Promise<QueryResult>;
stream(dbhan: DatabaseHandle<TClient>, sql: string, options: StreamOptions); stream(dbhan: DatabaseHandle<TClient>, sql: string, options: StreamOptions);
readQuery(dbhan: DatabaseHandle<TClient>, sql: string, structure?: TableInfo): Promise<stream.Readable>; readQuery(dbhan: DatabaseHandle<TClient>, sql: string, structure?: TableInfo): Promise<StreamResult>;
readJsonQuery(dbhan: DatabaseHandle<TClient>, query: any, structure?: TableInfo): Promise<stream.Readable>; readJsonQuery(dbhan: DatabaseHandle<TClient>, query: any, structure?: TableInfo): Promise<StreamResult>;
// eg. PostgreSQL COPY FROM stdin // eg. PostgreSQL COPY FROM stdin
writeQueryFromStream(dbhan: DatabaseHandle<TClient>, sql: string): Promise<stream.Writable>; writeQueryFromStream(dbhan: DatabaseHandle<TClient>, sql: string): Promise<StreamResult>;
writeTable( writeTable(dbhan: DatabaseHandle<TClient>, name: NamedObjectInfo, options: WriteTableOptions): Promise<StreamResult>;
dbhan: DatabaseHandle<TClient>,
name: NamedObjectInfo,
options: WriteTableOptions
): Promise<stream.Writable>;
analyseSingleObject( analyseSingleObject(
dbhan: DatabaseHandle<TClient>, dbhan: DatabaseHandle<TClient>,
name: NamedObjectInfo, name: NamedObjectInfo,

View File

@@ -95,9 +95,10 @@ async function reader({ fileName, encoding = 'utf-8', header = true, delimiter,
}); });
const fileStream = fs.createReadStream(downloadedFile, encoding); const fileStream = fs.createReadStream(downloadedFile, encoding);
const csvPrepare = new CsvPrepareStream({ header }); const csvPrepare = new CsvPrepareStream({ header });
fileStream.pipe(csvStream); return [fileStream, csvStream, csvPrepare];
csvStream.pipe(csvPrepare); // fileStream.pipe(csvStream);
return csvPrepare; // csvStream.pipe(csvPrepare);
// return csvPrepare;
} }
reader.initialize = (dbgateEnv) => { reader.initialize = (dbgateEnv) => {

View File

@@ -31,11 +31,13 @@ async function writer({ fileName, encoding = 'utf-8', header = true, delimiter,
const csvPrepare = new CsvPrepareStream({ header }); const csvPrepare = new CsvPrepareStream({ header });
const csvStream = csv.stringify({ delimiter, quoted }); const csvStream = csv.stringify({ delimiter, quoted });
const fileStream = fs.createWriteStream(fileName, encoding); const fileStream = fs.createWriteStream(fileName, encoding);
csvPrepare.pipe(csvStream); // csvPrepare.pipe(csvStream);
csvStream.pipe(fileStream); // csvStream.pipe(fileStream);
csvPrepare['finisher'] = fileStream; // csvPrepare['finisher'] = fileStream;
csvPrepare.requireFixedStructure = true; csvPrepare.requireFixedStructure = true;
return csvPrepare;
return [csvPrepare, csvStream, fileStream];
// return csvPrepare;
} }
module.exports = writer; module.exports = writer;

View File

@@ -63,8 +63,10 @@ async function reader({ fileName, encoding = 'utf-8', itemElementName }) {
const fileStream = fs.createReadStream(fileName, encoding); const fileStream = fs.createReadStream(fileName, encoding);
const parser = new ParseStream({ itemElementName }); const parser = new ParseStream({ itemElementName });
fileStream.pipe(parser);
return parser; return [fileStream, parser];
// fileStream.pipe(parser);
// return parser;
} }
module.exports = reader; module.exports = reader;

View File

@@ -73,9 +73,10 @@ async function writer({ fileName, encoding = 'utf-8', itemElementName, rootEleme
logger.info(`Writing file ${fileName}`); logger.info(`Writing file ${fileName}`);
const stringify = new StringifyStream({ itemElementName, rootElementName }); const stringify = new StringifyStream({ itemElementName, rootElementName });
const fileStream = fs.createWriteStream(fileName, encoding); const fileStream = fs.createWriteStream(fileName, encoding);
stringify.pipe(fileStream); return [stringify, fileStream];
stringify['finisher'] = fileStream; // stringify.pipe(fileStream);
return stringify; // stringify['finisher'] = fileStream;
// return stringify;
} }
module.exports = writer; module.exports = writer;