handle copyStreamError

This commit is contained in:
SPRINX0\prochazka
2025-03-04 08:58:04 +01:00
parent 3f3160406f
commit 69f781d3de
2 changed files with 28 additions and 3 deletions

View File

@@ -94,14 +94,22 @@ module.exports = {
handle_ping() {},
handle_freeData(runid, { freeData }) {
const [resolve, reject] = this.requests[runid];
const { resolve } = this.requests[runid];
resolve(freeData);
delete this.requests[runid];
},
handle_copyStreamError(runid, { copyStreamError }) {
const { reject, exitOnStreamError } = this.requests[runid];
if (exitOnStreamError) {
reject(copyStreamError);
delete this.requests[runid];
}
},
rejectRequest(runid, error) {
if (this.requests[runid]) {
const [resolve, reject] = this.requests[runid];
const { reject } = this.requests[runid];
reject(error);
delete this.requests[runid];
}
@@ -113,6 +121,8 @@ module.exports = {
fs.writeFileSync(`${scriptFile}`, scriptText);
fs.mkdirSync(directory);
const pluginNames = extractPlugins(scriptText);
// console.log('********************** SCRIPT TEXT **********************');
// console.log(scriptText);
logger.info({ scriptFile }, 'Running script');
// const subprocess = fork(scriptFile, ['--checkParent', '--max-old-space-size=8192'], {
const subprocess = fork(
@@ -150,11 +160,13 @@ module.exports = {
byline(subprocess.stdout).on('data', pipeDispatcher('info'));
byline(subprocess.stderr).on('data', pipeDispatcher('error'));
subprocess.on('exit', code => {
// console.log('... EXITED', code);
this.rejectRequest(runid, { message: 'No data returned, maybe input data source is too big' });
logger.info({ code, pid: subprocess.pid }, 'Exited process');
socket.emit(`runner-done-${runid}`, code);
});
subprocess.on('error', error => {
// console.log('... ERROR subprocess', error);
this.rejectRequest(runid, { message: error && (error.message || error.toString()) });
console.error('... ERROR subprocess', error);
this.dispatchMessage({
@@ -231,7 +243,7 @@ module.exports = {
const promise = new Promise((resolve, reject) => {
const runid = crypto.randomUUID();
this.requests[runid] = [resolve, reject];
this.requests[runid] = { resolve, reject, exitOnStreamError: true };
this.startCore(runid, loaderScriptTemplate(prefix, functionName, props, runid));
});
return promise;

View File

@@ -33,6 +33,19 @@ function copyStream(input, output, options) {
return new Promise((resolve, reject) => {
const finisher = output['finisher'] || output;
finisher.on('finish', resolve);
input.on('error', err => {
// console.log('&&&&&&&&&&&&&&&&&&&&&&& CATCH ERROR IN COPY STREAM &&&&&&&&&&&&&&&&&&&&&&');
// console.log(err);
process.send({
msgtype: 'copyStreamError',
runid: this.runid,
copyStreamError: err,
});
});
input.on('error', reject);
finisher.on('error', reject);
let lastStream = input;