diff --git a/packages/api/src/controllers/runners.js b/packages/api/src/controllers/runners.js index 6708831f6..c6c28cd56 100644 --- a/packages/api/src/controllers/runners.js +++ b/packages/api/src/controllers/runners.js @@ -94,14 +94,22 @@ module.exports = { handle_ping() {}, handle_freeData(runid, { freeData }) { - const [resolve, reject] = this.requests[runid]; + const { resolve } = this.requests[runid]; resolve(freeData); delete this.requests[runid]; }, + handle_copyStreamError(runid, { copyStreamError }) { + const { reject, exitOnStreamError } = this.requests[runid]; + if (exitOnStreamError) { + reject(copyStreamError); + delete this.requests[runid]; + } + }, + rejectRequest(runid, error) { if (this.requests[runid]) { - const [resolve, reject] = this.requests[runid]; + const { reject } = this.requests[runid]; reject(error); delete this.requests[runid]; } @@ -113,6 +121,8 @@ module.exports = { fs.writeFileSync(`${scriptFile}`, scriptText); fs.mkdirSync(directory); const pluginNames = extractPlugins(scriptText); + // console.log('********************** SCRIPT TEXT **********************'); + // console.log(scriptText); logger.info({ scriptFile }, 'Running script'); // const subprocess = fork(scriptFile, ['--checkParent', '--max-old-space-size=8192'], { const subprocess = fork( @@ -150,11 +160,13 @@ module.exports = { byline(subprocess.stdout).on('data', pipeDispatcher('info')); byline(subprocess.stderr).on('data', pipeDispatcher('error')); subprocess.on('exit', code => { + // console.log('... EXITED', code); this.rejectRequest(runid, { message: 'No data returned, maybe input data source is too big' }); logger.info({ code, pid: subprocess.pid }, 'Exited process'); socket.emit(`runner-done-${runid}`, code); }); subprocess.on('error', error => { + // console.log('... ERROR subprocess', error); this.rejectRequest(runid, { message: error && (error.message || error.toString()) }); console.error('... ERROR subprocess', error); this.dispatchMessage({ @@ -231,7 +243,7 @@ module.exports = { const promise = new Promise((resolve, reject) => { const runid = crypto.randomUUID(); - this.requests[runid] = [resolve, reject]; + this.requests[runid] = { resolve, reject, exitOnStreamError: true }; this.startCore(runid, loaderScriptTemplate(prefix, functionName, props, runid)); }); return promise; diff --git a/packages/api/src/shell/copyStream.js b/packages/api/src/shell/copyStream.js index 8d912a56c..4952c0b83 100644 --- a/packages/api/src/shell/copyStream.js +++ b/packages/api/src/shell/copyStream.js @@ -33,6 +33,19 @@ function copyStream(input, output, options) { return new Promise((resolve, reject) => { const finisher = output['finisher'] || output; finisher.on('finish', resolve); + + input.on('error', err => { + // console.log('&&&&&&&&&&&&&&&&&&&&&&& CATCH ERROR IN COPY STREAM &&&&&&&&&&&&&&&&&&&&&&'); + // console.log(err); + process.send({ + msgtype: 'copyStreamError', + runid: this.runid, + copyStreamError: err, + }); + }); + + input.on('error', reject); + finisher.on('error', reject); let lastStream = input;