From 3f3160406f93e793d909f6fb0fb7c2e0ab7f334d Mon Sep 17 00:00:00 2001 From: "SPRINX0\\prochazka" Date: Mon, 3 Mar 2025 16:05:12 +0100 Subject: [PATCH 01/12] propagate error in mongo stream --- plugins/dbgate-plugin-mongo/src/backend/driver.js | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/plugins/dbgate-plugin-mongo/src/backend/driver.js b/plugins/dbgate-plugin-mongo/src/backend/driver.js index 23aca12b6..9c10ba8eb 100644 --- a/plugins/dbgate-plugin-mongo/src/backend/driver.js +++ b/plugins/dbgate-plugin-mongo/src/backend/driver.js @@ -266,6 +266,11 @@ const driver = { pass.write(transformMongoData(row)); }); + // propagate error + cursorStream.on('error', (err) => { + pass.emit('error', err); + }); + // Called once the cursor is fully read cursorStream.on('end', () => { pass.emit('end'); From 69f781d3de24c8ce8df5413ec21e34f5015a7c46 Mon Sep 17 00:00:00 2001 From: "SPRINX0\\prochazka" Date: Tue, 4 Mar 2025 08:58:04 +0100 Subject: [PATCH 02/12] handle copyStreamError --- packages/api/src/controllers/runners.js | 18 +++++++++++++++--- packages/api/src/shell/copyStream.js | 13 +++++++++++++ 2 files changed, 28 insertions(+), 3 deletions(-) diff --git a/packages/api/src/controllers/runners.js b/packages/api/src/controllers/runners.js index 6708831f6..c6c28cd56 100644 --- a/packages/api/src/controllers/runners.js +++ b/packages/api/src/controllers/runners.js @@ -94,14 +94,22 @@ module.exports = { handle_ping() {}, handle_freeData(runid, { freeData }) { - const [resolve, reject] = this.requests[runid]; + const { resolve } = this.requests[runid]; resolve(freeData); delete this.requests[runid]; }, + handle_copyStreamError(runid, { copyStreamError }) { + const { reject, exitOnStreamError } = this.requests[runid]; + if (exitOnStreamError) { + reject(copyStreamError); + delete this.requests[runid]; + } + }, + rejectRequest(runid, error) { if (this.requests[runid]) { - const [resolve, reject] = this.requests[runid]; + const { reject } = this.requests[runid]; reject(error); delete this.requests[runid]; } @@ -113,6 +121,8 @@ module.exports = { fs.writeFileSync(`${scriptFile}`, scriptText); fs.mkdirSync(directory); const pluginNames = extractPlugins(scriptText); + // console.log('********************** SCRIPT TEXT **********************'); + // console.log(scriptText); logger.info({ scriptFile }, 'Running script'); // const subprocess = fork(scriptFile, ['--checkParent', '--max-old-space-size=8192'], { const subprocess = fork( @@ -150,11 +160,13 @@ module.exports = { byline(subprocess.stdout).on('data', pipeDispatcher('info')); byline(subprocess.stderr).on('data', pipeDispatcher('error')); subprocess.on('exit', code => { + // console.log('... EXITED', code); this.rejectRequest(runid, { message: 'No data returned, maybe input data source is too big' }); logger.info({ code, pid: subprocess.pid }, 'Exited process'); socket.emit(`runner-done-${runid}`, code); }); subprocess.on('error', error => { + // console.log('... ERROR subprocess', error); this.rejectRequest(runid, { message: error && (error.message || error.toString()) }); console.error('... ERROR subprocess', error); this.dispatchMessage({ @@ -231,7 +243,7 @@ module.exports = { const promise = new Promise((resolve, reject) => { const runid = crypto.randomUUID(); - this.requests[runid] = [resolve, reject]; + this.requests[runid] = { resolve, reject, exitOnStreamError: true }; this.startCore(runid, loaderScriptTemplate(prefix, functionName, props, runid)); }); return promise; diff --git a/packages/api/src/shell/copyStream.js b/packages/api/src/shell/copyStream.js index 8d912a56c..4952c0b83 100644 --- a/packages/api/src/shell/copyStream.js +++ b/packages/api/src/shell/copyStream.js @@ -33,6 +33,19 @@ function copyStream(input, output, options) { return new Promise((resolve, reject) => { const finisher = output['finisher'] || output; finisher.on('finish', resolve); + + input.on('error', err => { + // console.log('&&&&&&&&&&&&&&&&&&&&&&& CATCH ERROR IN COPY STREAM &&&&&&&&&&&&&&&&&&&&&&'); + // console.log(err); + process.send({ + msgtype: 'copyStreamError', + runid: this.runid, + copyStreamError: err, + }); + }); + + input.on('error', reject); + finisher.on('error', reject); let lastStream = input; From beca5c6e456461af7b0bac30fb18baba2b16e479 Mon Sep 17 00:00:00 2001 From: "SPRINX0\\prochazka" Date: Tue, 4 Mar 2025 09:51:29 +0100 Subject: [PATCH 03/12] using stream.pipeline for better handling errors --- packages/api/src/shell/copyStream.js | 58 +++++-------------- packages/api/src/shell/importDatabase.js | 17 +----- packages/api/src/shell/jsonLinesReader.js | 3 +- packages/api/src/shell/jsonReader.js | 24 +++++--- packages/api/src/shell/jsonWriter.js | 7 ++- .../api/src/shell/modifyJsonLinesReader.js | 5 +- packages/api/src/shell/sqlDataWriter.js | 7 ++- packages/api/src/utility/streamPipeline.js | 18 ++++++ packages/types/engines.d.ts | 14 ++--- .../dbgate-plugin-csv/src/backend/reader.js | 7 ++- .../dbgate-plugin-csv/src/backend/writer.js | 10 ++-- .../dbgate-plugin-xml/src/backend/reader.js | 6 +- .../dbgate-plugin-xml/src/backend/writer.js | 7 ++- 13 files changed, 87 insertions(+), 96 deletions(-) create mode 100644 packages/api/src/utility/streamPipeline.js diff --git a/packages/api/src/shell/copyStream.js b/packages/api/src/shell/copyStream.js index 4952c0b83..0c2f0967a 100644 --- a/packages/api/src/shell/copyStream.js +++ b/packages/api/src/shell/copyStream.js @@ -1,6 +1,6 @@ const EnsureStreamHeaderStream = require('../utility/EnsureStreamHeaderStream'); -const Stream = require('stream'); const ColumnMapTransformStream = require('../utility/ColumnMapTransformStream'); +const streamPipeline = require('../utility/streamPipeline'); /** * Copies reader to writer. Used for import, export tables and transfer data between tables @@ -9,7 +9,7 @@ const ColumnMapTransformStream = require('../utility/ColumnMapTransformStream'); * @param {object} options - options * @returns {Promise} */ -function copyStream(input, output, options) { +async function copyStream(input, output, options) { const { columns } = options || {}; const transforms = []; @@ -20,49 +20,19 @@ function copyStream(input, output, options) { transforms.push(new EnsureStreamHeaderStream()); } - // return new Promise((resolve, reject) => { - // Stream.pipeline(input, ...transforms, output, err => { - // if (err) { - // reject(err); - // } else { - // resolve(); - // } - // }); - // }); - - return new Promise((resolve, reject) => { - const finisher = output['finisher'] || output; - finisher.on('finish', resolve); - - input.on('error', err => { - // console.log('&&&&&&&&&&&&&&&&&&&&&&& CATCH ERROR IN COPY STREAM &&&&&&&&&&&&&&&&&&&&&&'); - // console.log(err); - process.send({ - msgtype: 'copyStreamError', - runid: this.runid, - copyStreamError: err, - }); + try { + await streamPipeline(input, transforms, output); + } catch (err) { + process.send({ + msgtype: 'copyStreamError', + runid: this.runid, + copyStreamError: { + message: err.message, + ...err, + }, }); - - input.on('error', reject); - - finisher.on('error', reject); - - let lastStream = input; - for (const tran of transforms) { - lastStream.pipe(tran); - lastStream = tran; - } - lastStream.pipe(output); - - // if (output.requireFixedStructure) { - // const ensureHeader = new EnsureStreamHeaderStream(); - // input.pipe(ensureHeader); - // ensureHeader.pipe(output); - // } else { - // input.pipe(output); - // } - }); + throw err; + } } module.exports = copyStream; diff --git a/packages/api/src/shell/importDatabase.js b/packages/api/src/shell/importDatabase.js index a10cb4632..d5a04ccf3 100644 --- a/packages/api/src/shell/importDatabase.js +++ b/packages/api/src/shell/importDatabase.js @@ -5,6 +5,7 @@ const { splitQueryStream } = require('dbgate-query-splitter/lib/splitQueryStream const download = require('./download'); const stream = require('stream'); const { getLogger } = require('dbgate-tools'); +const streamPipeline = require('../utility/streamPipeline'); const logger = getLogger('importDb'); @@ -43,17 +44,6 @@ class ImportStream extends stream.Transform { } } -function awaitStreamEnd(stream) { - return new Promise((resolve, reject) => { - stream.once('end', () => { - resolve(true); - }); - stream.once('error', err => { - reject(err); - }); - }); -} - async function importDatabase({ connection = undefined, systemConnection = undefined, driver = undefined, inputFile }) { logger.info(`Importing database`); @@ -72,9 +62,8 @@ async function importDatabase({ connection = undefined, systemConnection = undef returnRichInfo: true, }); const importStream = new ImportStream(dbhan, driver); - // @ts-ignore - splittedStream.pipe(importStream); - await awaitStreamEnd(importStream); + + await streamPipeline(splittedStream, importStream); } finally { if (!systemConnection) { await driver.close(dbhan); diff --git a/packages/api/src/shell/jsonLinesReader.js b/packages/api/src/shell/jsonLinesReader.js index f1d47dde5..da3d5d85d 100644 --- a/packages/api/src/shell/jsonLinesReader.js +++ b/packages/api/src/shell/jsonLinesReader.js @@ -53,8 +53,7 @@ async function jsonLinesReader({ fileName, encoding = 'utf-8', limitRows = undef ); const liner = byline(fileStream); const parser = new ParseStream({ limitRows }); - liner.pipe(parser); - return parser; + return [liner, parser]; } module.exports = jsonLinesReader; diff --git a/packages/api/src/shell/jsonReader.js b/packages/api/src/shell/jsonReader.js index 2de75930c..654ecf633 100644 --- a/packages/api/src/shell/jsonReader.js +++ b/packages/api/src/shell/jsonReader.js @@ -10,7 +10,6 @@ const download = require('./download'); const logger = getLogger('jsonReader'); - class ParseStream extends stream.Transform { constructor({ limitRows, jsonStyle, keyField }) { super({ objectMode: true }); @@ -72,8 +71,12 @@ async function jsonReader({ // @ts-ignore encoding ); + const parseJsonStream = parser(); - fileStream.pipe(parseJsonStream); + + const resultPipe = [fileStream, parseJsonStream]; + + // fileStream.pipe(parseJsonStream); const parseStream = new ParseStream({ limitRows, jsonStyle, keyField }); @@ -81,15 +84,20 @@ async function jsonReader({ if (rootField) { const filterStream = pick({ filter: rootField }); - parseJsonStream.pipe(filterStream); - filterStream.pipe(tramsformer); - } else { - parseJsonStream.pipe(tramsformer); + resultPipe.push(filterStream); + // parseJsonStream.pipe(filterStream); + // filterStream.pipe(tramsformer); } + // else { + // parseJsonStream.pipe(tramsformer); + // } - tramsformer.pipe(parseStream); + resultPipe.push(tramsformer); + resultPipe.push(parseStream); - return parseStream; + // tramsformer.pipe(parseStream); + + return resultPipe; } module.exports = jsonReader; diff --git a/packages/api/src/shell/jsonWriter.js b/packages/api/src/shell/jsonWriter.js index 8058cabdb..28f622cca 100644 --- a/packages/api/src/shell/jsonWriter.js +++ b/packages/api/src/shell/jsonWriter.js @@ -99,9 +99,10 @@ async function jsonWriter({ fileName, jsonStyle, keyField = '_key', rootField, e logger.info(`Writing file ${fileName}`); const stringify = new StringifyStream({ jsonStyle, keyField, rootField }); const fileStream = fs.createWriteStream(fileName, encoding); - stringify.pipe(fileStream); - stringify['finisher'] = fileStream; - return stringify; + return [stringify, fileStream]; + // stringify.pipe(fileStream); + // stringify['finisher'] = fileStream; + // return stringify; } module.exports = jsonWriter; diff --git a/packages/api/src/shell/modifyJsonLinesReader.js b/packages/api/src/shell/modifyJsonLinesReader.js index dec137ba0..32040acec 100644 --- a/packages/api/src/shell/modifyJsonLinesReader.js +++ b/packages/api/src/shell/modifyJsonLinesReader.js @@ -141,8 +141,9 @@ async function modifyJsonLinesReader({ ); const liner = byline(fileStream); const parser = new ParseStream({ limitRows, changeSet, mergedRows, mergeKey, mergeMode }); - liner.pipe(parser); - return parser; + return [liner, parser]; + // liner.pipe(parser); + // return parser; } module.exports = modifyJsonLinesReader; diff --git a/packages/api/src/shell/sqlDataWriter.js b/packages/api/src/shell/sqlDataWriter.js index 3dd6bad26..c83dab161 100644 --- a/packages/api/src/shell/sqlDataWriter.js +++ b/packages/api/src/shell/sqlDataWriter.js @@ -44,9 +44,10 @@ async function sqlDataWriter({ fileName, dataName, driver, encoding = 'utf-8' }) logger.info(`Writing file ${fileName}`); const stringify = new SqlizeStream({ fileName, dataName }); const fileStream = fs.createWriteStream(fileName, encoding); - stringify.pipe(fileStream); - stringify['finisher'] = fileStream; - return stringify; + return [stringify, fileStream]; + // stringify.pipe(fileStream); + // stringify['finisher'] = fileStream; + // return stringify; } module.exports = sqlDataWriter; diff --git a/packages/api/src/utility/streamPipeline.js b/packages/api/src/utility/streamPipeline.js new file mode 100644 index 000000000..11183b076 --- /dev/null +++ b/packages/api/src/utility/streamPipeline.js @@ -0,0 +1,18 @@ +const stream = require('stream'); +const _ = require('lodash'); + +function streamPipeline(...processedStreams) { + const streams = _.flattenDeep(processedStreams); + return new Promise((resolve, reject) => { + // @ts-ignore + stream.pipeline(...streams, err => { + if (err) { + reject(err); + } else { + resolve(); + } + }); + }); +} + +module.exports = streamPipeline; diff --git a/packages/types/engines.d.ts b/packages/types/engines.d.ts index 11235f5e6..b34b66746 100644 --- a/packages/types/engines.d.ts +++ b/packages/types/engines.d.ts @@ -144,6 +144,8 @@ export interface DatabaseHandle { treeKeySeparator?: string; } +export type StreamResult = stream.Readable | (stream.Readable | stream.Writable)[]; + export interface EngineDriver extends FilterBehaviourProvider { engine: string; title: string; @@ -191,15 +193,11 @@ export interface EngineDriver extends FilterBehaviourProvider { close(dbhan: DatabaseHandle): Promise; query(dbhan: DatabaseHandle, sql: string, options?: QueryOptions): Promise; stream(dbhan: DatabaseHandle, sql: string, options: StreamOptions); - readQuery(dbhan: DatabaseHandle, sql: string, structure?: TableInfo): Promise; - readJsonQuery(dbhan: DatabaseHandle, query: any, structure?: TableInfo): Promise; + readQuery(dbhan: DatabaseHandle, sql: string, structure?: TableInfo): Promise; + readJsonQuery(dbhan: DatabaseHandle, query: any, structure?: TableInfo): Promise; // eg. PostgreSQL COPY FROM stdin - writeQueryFromStream(dbhan: DatabaseHandle, sql: string): Promise; - writeTable( - dbhan: DatabaseHandle, - name: NamedObjectInfo, - options: WriteTableOptions - ): Promise; + writeQueryFromStream(dbhan: DatabaseHandle, sql: string): Promise; + writeTable(dbhan: DatabaseHandle, name: NamedObjectInfo, options: WriteTableOptions): Promise; analyseSingleObject( dbhan: DatabaseHandle, name: NamedObjectInfo, diff --git a/plugins/dbgate-plugin-csv/src/backend/reader.js b/plugins/dbgate-plugin-csv/src/backend/reader.js index 55ef181d9..e52065f20 100644 --- a/plugins/dbgate-plugin-csv/src/backend/reader.js +++ b/plugins/dbgate-plugin-csv/src/backend/reader.js @@ -95,9 +95,10 @@ async function reader({ fileName, encoding = 'utf-8', header = true, delimiter, }); const fileStream = fs.createReadStream(downloadedFile, encoding); const csvPrepare = new CsvPrepareStream({ header }); - fileStream.pipe(csvStream); - csvStream.pipe(csvPrepare); - return csvPrepare; + return [fileStream, csvStream, csvPrepare]; + // fileStream.pipe(csvStream); + // csvStream.pipe(csvPrepare); + // return csvPrepare; } reader.initialize = (dbgateEnv) => { diff --git a/plugins/dbgate-plugin-csv/src/backend/writer.js b/plugins/dbgate-plugin-csv/src/backend/writer.js index 7d62d1563..2297af0c4 100644 --- a/plugins/dbgate-plugin-csv/src/backend/writer.js +++ b/plugins/dbgate-plugin-csv/src/backend/writer.js @@ -31,11 +31,13 @@ async function writer({ fileName, encoding = 'utf-8', header = true, delimiter, const csvPrepare = new CsvPrepareStream({ header }); const csvStream = csv.stringify({ delimiter, quoted }); const fileStream = fs.createWriteStream(fileName, encoding); - csvPrepare.pipe(csvStream); - csvStream.pipe(fileStream); - csvPrepare['finisher'] = fileStream; + // csvPrepare.pipe(csvStream); + // csvStream.pipe(fileStream); + // csvPrepare['finisher'] = fileStream; csvPrepare.requireFixedStructure = true; - return csvPrepare; + + return [csvPrepare, csvStream, fileStream]; + // return csvPrepare; } module.exports = writer; diff --git a/plugins/dbgate-plugin-xml/src/backend/reader.js b/plugins/dbgate-plugin-xml/src/backend/reader.js index e374ea068..18b09adb9 100644 --- a/plugins/dbgate-plugin-xml/src/backend/reader.js +++ b/plugins/dbgate-plugin-xml/src/backend/reader.js @@ -63,8 +63,10 @@ async function reader({ fileName, encoding = 'utf-8', itemElementName }) { const fileStream = fs.createReadStream(fileName, encoding); const parser = new ParseStream({ itemElementName }); - fileStream.pipe(parser); - return parser; + + return [fileStream, parser]; + // fileStream.pipe(parser); + // return parser; } module.exports = reader; diff --git a/plugins/dbgate-plugin-xml/src/backend/writer.js b/plugins/dbgate-plugin-xml/src/backend/writer.js index 66787883c..0474eab02 100644 --- a/plugins/dbgate-plugin-xml/src/backend/writer.js +++ b/plugins/dbgate-plugin-xml/src/backend/writer.js @@ -73,9 +73,10 @@ async function writer({ fileName, encoding = 'utf-8', itemElementName, rootEleme logger.info(`Writing file ${fileName}`); const stringify = new StringifyStream({ itemElementName, rootElementName }); const fileStream = fs.createWriteStream(fileName, encoding); - stringify.pipe(fileStream); - stringify['finisher'] = fileStream; - return stringify; + return [stringify, fileStream]; + // stringify.pipe(fileStream); + // stringify['finisher'] = fileStream; + // return stringify; } module.exports = writer; From 4006f0344443f797ad12eca3673361f1e5a4e74f Mon Sep 17 00:00:00 2001 From: "SPRINX0\\prochazka" Date: Tue, 4 Mar 2025 10:06:05 +0100 Subject: [PATCH 04/12] removed invalid param --- packages/api/src/shell/copyStream.js | 1 - 1 file changed, 1 deletion(-) diff --git a/packages/api/src/shell/copyStream.js b/packages/api/src/shell/copyStream.js index 0c2f0967a..be0c8ea31 100644 --- a/packages/api/src/shell/copyStream.js +++ b/packages/api/src/shell/copyStream.js @@ -25,7 +25,6 @@ async function copyStream(input, output, options) { } catch (err) { process.send({ msgtype: 'copyStreamError', - runid: this.runid, copyStreamError: { message: err.message, ...err, From 0c104d5d29369d19db6e4824914fb2de895b4c4d Mon Sep 17 00:00:00 2001 From: "SPRINX0\\prochazka" Date: Tue, 4 Mar 2025 13:55:36 +0100 Subject: [PATCH 05/12] progress indicator in exports --- packages/api/src/controllers/runners.js | 4 +++ packages/api/src/shell/copyStream.js | 27 ++++++++++++++++++- packages/tools/src/ScriptWriter.ts | 18 +++++++------ packages/web/src/elements/TableControl.svelte | 2 +- packages/web/src/icons/FontIcon.svelte | 1 + .../impexp/ImportExportConfigurator.svelte | 19 ++++++++++++- packages/web/src/impexp/createImpExpScript.ts | 2 +- packages/web/src/tabs/ImportExportTab.svelte | 26 ++++++++++++++++++ 8 files changed, 87 insertions(+), 12 deletions(-) diff --git a/packages/api/src/controllers/runners.js b/packages/api/src/controllers/runners.js index c6c28cd56..cdaaa36b1 100644 --- a/packages/api/src/controllers/runners.js +++ b/packages/api/src/controllers/runners.js @@ -107,6 +107,10 @@ module.exports = { } }, + handle_progress(runid, { progressName, status }) { + socket.emit(`runner-progress-${runid}`, { progressName, status }); + }, + rejectRequest(runid, error) { if (this.requests[runid]) { const { reject } = this.requests[runid]; diff --git a/packages/api/src/shell/copyStream.js b/packages/api/src/shell/copyStream.js index be0c8ea31..c73476d62 100644 --- a/packages/api/src/shell/copyStream.js +++ b/packages/api/src/shell/copyStream.js @@ -10,7 +10,15 @@ const streamPipeline = require('../utility/streamPipeline'); * @returns {Promise} */ async function copyStream(input, output, options) { - const { columns } = options || {}; + const { columns, progressName } = options || {}; + + if (progressName) { + process.send({ + msgtype: 'progress', + progressName, + status: 'running', + }); + } const transforms = []; if (columns) { @@ -22,6 +30,14 @@ async function copyStream(input, output, options) { try { await streamPipeline(input, transforms, output); + + if (progressName) { + process.send({ + msgtype: 'progress', + progressName, + status: 'done', + }); + } } catch (err) { process.send({ msgtype: 'copyStreamError', @@ -30,6 +46,15 @@ async function copyStream(input, output, options) { ...err, }, }); + + if (progressName) { + process.send({ + msgtype: 'progress', + progressName, + status: 'error', + }); + } + throw err; } } diff --git a/packages/tools/src/ScriptWriter.ts b/packages/tools/src/ScriptWriter.ts index 7c4f93b09..23afee0f4 100644 --- a/packages/tools/src/ScriptWriter.ts +++ b/packages/tools/src/ScriptWriter.ts @@ -41,12 +41,13 @@ export class ScriptWriter { this.packageNames.push(packageName); } - copyStream(sourceVar, targetVar, colmapVar = null) { - if (colmapVar) { - this._put(`await dbgateApi.copyStream(${sourceVar}, ${targetVar}, {columns: ${colmapVar}});`); - } else { - this._put(`await dbgateApi.copyStream(${sourceVar}, ${targetVar});`); - } + copyStream(sourceVar, targetVar, colmapVar = null, progressName?: string) { + let opts = '{'; + if (colmapVar) opts += `columns: ${colmapVar}, `; + if (progressName) opts += `progressName: "${progressName}", `; + opts += '}'; + + this._put(`await dbgateApi.copyStream(${sourceVar}, ${targetVar}, ${opts});`); } dumpDatabase(options) { @@ -117,12 +118,13 @@ export class ScriptWriterJson { }); } - copyStream(sourceVar, targetVar, colmapVar = null) { + copyStream(sourceVar, targetVar, colmapVar = null, progressName?: string) { this.commands.push({ type: 'copyStream', sourceVar, targetVar, colmapVar, + progressName, }); } @@ -183,7 +185,7 @@ export function jsonScriptToJavascript(json) { script.assignValue(cmd.variableName, cmd.jsonValue); break; case 'copyStream': - script.copyStream(cmd.sourceVar, cmd.targetVar, cmd.colmapVar); + script.copyStream(cmd.sourceVar, cmd.targetVar, cmd.colmapVar, cmd.progressName); break; case 'endLine': script.endLine(); diff --git a/packages/web/src/elements/TableControl.svelte b/packages/web/src/elements/TableControl.svelte index 931f8f8ec..e8eb1051b 100644 --- a/packages/web/src/elements/TableControl.svelte +++ b/packages/web/src/elements/TableControl.svelte @@ -20,7 +20,7 @@ import { createEventDispatcher } from 'svelte'; import FontIcon from '../icons/FontIcon.svelte'; - export let columns: TableControlColumn[]; + export let columns: (TableControlColumn | false)[]; export let rows; export let focusOnCreate = false; export let selectable = false; diff --git a/packages/web/src/icons/FontIcon.svelte b/packages/web/src/icons/FontIcon.svelte index c5ed2f18a..df808a488 100644 --- a/packages/web/src/icons/FontIcon.svelte +++ b/packages/web/src/icons/FontIcon.svelte @@ -149,6 +149,7 @@ 'icon download': 'mdi mdi-download', 'icon text': 'mdi mdi-text', 'icon ai': 'mdi mdi-head-lightbulb', + 'icon wait': 'mdi mdi-timer-sand', 'icon run': 'mdi mdi-play', 'icon chevron-down': 'mdi mdi-chevron-down', diff --git a/packages/web/src/impexp/ImportExportConfigurator.svelte b/packages/web/src/impexp/ImportExportConfigurator.svelte index efb286ded..8889f763d 100644 --- a/packages/web/src/impexp/ImportExportConfigurator.svelte +++ b/packages/web/src/impexp/ImportExportConfigurator.svelte @@ -104,6 +104,7 @@ $: sourceList = $values.sourceList; let targetEditKey = 0; + export let progressHolder = null; const previewSource = writable(null); @@ -231,11 +232,16 @@ header: 'Target', slot: 1, }, - { + supportsPreview && { fieldName: 'preview', header: 'Preview', slot: 0, }, + !!progressHolder && { + fieldName: 'status', + header: 'Status', + slot: 3, + }, { fieldName: 'columns', header: 'Columns', @@ -296,6 +302,17 @@ >{columnCount > 0 ? `(${columnCount} columns)` : '(copy from source)'} + + {#if progressHolder[row]?.status == 'running'} + Running + {:else if progressHolder[row]?.status == 'error'} + Error + {:else if progressHolder[row]?.status == 'done'} + Done + {:else} + Queued + {/if} + {/key} diff --git a/packages/web/src/impexp/createImpExpScript.ts b/packages/web/src/impexp/createImpExpScript.ts index 0fa3c076c..c9961fb7b 100644 --- a/packages/web/src/impexp/createImpExpScript.ts +++ b/packages/web/src/impexp/createImpExpScript.ts @@ -233,7 +233,7 @@ export default async function createImpExpScript(extensions, values, forceScript script.assignValue(colmapVar, colmap); } - script.copyStream(sourceVar, targetVar, colmapVar); + script.copyStream(sourceVar, targetVar, colmapVar, sourceName); script.endLine(); } return script.getScript(values.schedule); diff --git a/packages/web/src/tabs/ImportExportTab.svelte b/packages/web/src/tabs/ImportExportTab.svelte index 37e4321cc..ab027d607 100644 --- a/packages/web/src/tabs/ImportExportTab.svelte +++ b/packages/web/src/tabs/ImportExportTab.svelte @@ -65,6 +65,7 @@ export let savedFile; export let savedFilePath; + let progressHolder = null; const refreshArchiveFolderRef = createRef(null); const formValues = writable({}); @@ -179,6 +180,7 @@ const handleExecute = async e => { if (busy) return; + progressHolder = {}; const values = $formValues as any; busy = true; const script = await createImpExpScript($extensions, values); @@ -228,6 +230,29 @@ title: `${getSourceTargetTitle('source', values)}->${getSourceTargetTitle('target', values)}(${values.sourceList?.length || 0})`, })); } + + const handleProgress = progress => { + progressHolder = { + ...progressHolder, + [progress.progressName]: { + ...progressHolder[progress.progressName], + ...progress, + }, + }; + }; + + $: progressEffect = useEffect(() => { + if (runnerId) { + const eventName = `runner-progress-${runnerId}`; + apiOn(eventName, handleProgress); + return () => { + apiOff(eventName, handleProgress); + }; + } + return () => {}; + }); + + $progressEffect; @@ -237,6 +262,7 @@ From 257ffa3cc462590ca30edebd8b6bc104fd2be1e7 Mon Sep 17 00:00:00 2001 From: "SPRINX0\\prochazka" Date: Tue, 4 Mar 2025 14:26:11 +0100 Subject: [PATCH 06/12] show import/export error --- packages/api/src/controllers/runners.js | 6 +++--- packages/api/src/shell/copyStream.js | 6 +++++- packages/web/src/impexp/ImportExportConfigurator.svelte | 9 +++++++++ packages/web/src/query/MessageView.svelte | 2 +- 4 files changed, 18 insertions(+), 5 deletions(-) diff --git a/packages/api/src/controllers/runners.js b/packages/api/src/controllers/runners.js index cdaaa36b1..778058f7a 100644 --- a/packages/api/src/controllers/runners.js +++ b/packages/api/src/controllers/runners.js @@ -100,15 +100,15 @@ module.exports = { }, handle_copyStreamError(runid, { copyStreamError }) { - const { reject, exitOnStreamError } = this.requests[runid]; + const { reject, exitOnStreamError } = this.requests[runid] || {}; if (exitOnStreamError) { reject(copyStreamError); delete this.requests[runid]; } }, - handle_progress(runid, { progressName, status }) { - socket.emit(`runner-progress-${runid}`, { progressName, status }); + handle_progress(runid, { progressName, status, errorMessage }) { + socket.emit(`runner-progress-${runid}`, { progressName, status, errorMessage }); }, rejectRequest(runid, error) { diff --git a/packages/api/src/shell/copyStream.js b/packages/api/src/shell/copyStream.js index c73476d62..c6471619f 100644 --- a/packages/api/src/shell/copyStream.js +++ b/packages/api/src/shell/copyStream.js @@ -1,6 +1,8 @@ const EnsureStreamHeaderStream = require('../utility/EnsureStreamHeaderStream'); const ColumnMapTransformStream = require('../utility/ColumnMapTransformStream'); const streamPipeline = require('../utility/streamPipeline'); +const { getLogger, extractErrorLogData } = require('dbgate-tools'); +const logger = getLogger('copyStream'); /** * Copies reader to writer. Used for import, export tables and transfer data between tables @@ -52,10 +54,12 @@ async function copyStream(input, output, options) { msgtype: 'progress', progressName, status: 'error', + errorMessage: err.message, }); } - throw err; + logger.error(extractErrorLogData(err, { progressName }), 'Import/export job failed'); + // throw err; } } diff --git a/packages/web/src/impexp/ImportExportConfigurator.svelte b/packages/web/src/impexp/ImportExportConfigurator.svelte index 8889f763d..9309e98a8 100644 --- a/packages/web/src/impexp/ImportExportConfigurator.svelte +++ b/packages/web/src/impexp/ImportExportConfigurator.svelte @@ -76,6 +76,7 @@ import { compositeDbNameIfNeeded } from 'dbgate-tools'; import createRef from '../utility/createRef'; import DropDownButton from '../buttons/DropDownButton.svelte'; + import ErrorMessageModal from '../modals/ErrorMessageModal.svelte'; // export let uploadedFile = undefined; // export let openedFile = undefined; @@ -307,6 +308,14 @@ Running {:else if progressHolder[row]?.status == 'error'} Error + {#if progressHolder[row]?.errorMessage} + showModal(ErrorMessageModal, { message: progressHolder[row]?.errorMessage })} + style="cursor: pointer" + /> + {/if} {:else if progressHolder[row]?.status == 'done'} Done {:else} diff --git a/packages/web/src/query/MessageView.svelte b/packages/web/src/query/MessageView.svelte index ce6d9dae5..4d67e3c77 100644 --- a/packages/web/src/query/MessageView.svelte +++ b/packages/web/src/query/MessageView.svelte @@ -18,7 +18,7 @@ // $: console.log('MESSAGE ROWS', items); const values = writable({ - hideDebug: false, + hideDebug: true, hideInfo: false, hideError: false, }); From 3bf22a8606e88c54716199e1ab7ba3bde93118e0 Mon Sep 17 00:00:00 2001 From: "SPRINX0\\prochazka" Date: Tue, 4 Mar 2025 14:29:52 +0100 Subject: [PATCH 07/12] import/export log messages --- packages/api/src/shell/dataDuplicator.js | 2 -- packages/api/src/shell/dropAllDbObjects.js | 2 -- packages/api/src/shell/dumpDatabase.js | 2 -- packages/api/src/shell/executeQuery.js | 2 +- packages/api/src/shell/importDatabase.js | 2 -- packages/api/src/shell/loadDatabase.js | 6 ++---- packages/api/src/shell/queryReader.js | 1 - packages/api/src/shell/tableReader.js | 1 - packages/api/src/shell/tableWriter.js | 1 - 9 files changed, 3 insertions(+), 16 deletions(-) diff --git a/packages/api/src/shell/dataDuplicator.js b/packages/api/src/shell/dataDuplicator.js index 750563ff3..e08ede4cf 100644 --- a/packages/api/src/shell/dataDuplicator.js +++ b/packages/api/src/shell/dataDuplicator.js @@ -24,8 +24,6 @@ async function dataDuplicator({ const dbhan = systemConnection || (await connectUtility(driver, connection, 'write')); try { - logger.info(`Connected.`); - if (!analysedStructure) { analysedStructure = await driver.analyseFull(dbhan); } diff --git a/packages/api/src/shell/dropAllDbObjects.js b/packages/api/src/shell/dropAllDbObjects.js index 4ba806052..5623f00bb 100644 --- a/packages/api/src/shell/dropAllDbObjects.js +++ b/packages/api/src/shell/dropAllDbObjects.js @@ -19,8 +19,6 @@ async function dropAllDbObjects({ connection, systemConnection, driver, analysed const dbhan = systemConnection || (await connectUtility(driver, connection, 'write')); - logger.info(`Connected.`); - if (!analysedStructure) { analysedStructure = await driver.analyseFull(dbhan); } diff --git a/packages/api/src/shell/dumpDatabase.js b/packages/api/src/shell/dumpDatabase.js index 45644e038..8d8a016a5 100644 --- a/packages/api/src/shell/dumpDatabase.js +++ b/packages/api/src/shell/dumpDatabase.js @@ -31,8 +31,6 @@ async function dumpDatabase({ const dbhan = systemConnection || (await connectUtility(driver, connection, 'read', { forceRowsAsObjects: true })); try { - logger.info(`Connected.`); - const dumper = await driver.createBackupDumper(dbhan, { outputFile, databaseName, diff --git a/packages/api/src/shell/executeQuery.js b/packages/api/src/shell/executeQuery.js index 0adcf3700..3f54267f0 100644 --- a/packages/api/src/shell/executeQuery.js +++ b/packages/api/src/shell/executeQuery.js @@ -36,7 +36,7 @@ async function executeQuery({ } try { - logger.info(`Connected.`); + logger.debug(`Running SQL query, length: ${sql.length}`); await driver.script(dbhan, sql, { logScriptItems }); } finally { diff --git a/packages/api/src/shell/importDatabase.js b/packages/api/src/shell/importDatabase.js index d5a04ccf3..90bbceb29 100644 --- a/packages/api/src/shell/importDatabase.js +++ b/packages/api/src/shell/importDatabase.js @@ -50,8 +50,6 @@ async function importDatabase({ connection = undefined, systemConnection = undef if (!driver) driver = requireEngineDriver(connection); const dbhan = systemConnection || (await connectUtility(driver, connection, 'write')); try { - logger.info(`Connected.`); - logger.info(`Input file: ${inputFile}`); const downloadedFile = await download(inputFile); logger.info(`Downloaded file: ${downloadedFile}`); diff --git a/packages/api/src/shell/loadDatabase.js b/packages/api/src/shell/loadDatabase.js index d657e61aa..f729342d1 100644 --- a/packages/api/src/shell/loadDatabase.js +++ b/packages/api/src/shell/loadDatabase.js @@ -6,15 +6,13 @@ const exportDbModel = require('../utility/exportDbModel'); const logger = getLogger('analyseDb'); async function loadDatabase({ connection = undefined, systemConnection = undefined, driver = undefined, outputDir }) { - logger.info(`Analysing database`); + logger.debug(`Analysing database`); if (!driver) driver = requireEngineDriver(connection); const dbhan = systemConnection || (await connectUtility(driver, connection, 'read', { forceRowsAsObjects: true })); try { - logger.info(`Connected.`); - const dbInfo = await driver.analyseFull(dbhan); - logger.info(`Analyse finished`); + logger.debug(`Analyse finished`); await exportDbModel(dbInfo, outputDir); } finally { diff --git a/packages/api/src/shell/queryReader.js b/packages/api/src/shell/queryReader.js index dd76a296c..0d9a16296 100644 --- a/packages/api/src/shell/queryReader.js +++ b/packages/api/src/shell/queryReader.js @@ -30,7 +30,6 @@ async function queryReader({ const driver = requireEngineDriver(connection); const pool = await connectUtility(driver, connection, queryType == 'json' ? 'read' : 'script'); - logger.info(`Connected.`); const reader = queryType == 'json' ? await driver.readJsonQuery(pool, query) : await driver.readQuery(pool, query || sql); return reader; diff --git a/packages/api/src/shell/tableReader.js b/packages/api/src/shell/tableReader.js index 6ec0d1fb8..466499de4 100644 --- a/packages/api/src/shell/tableReader.js +++ b/packages/api/src/shell/tableReader.js @@ -18,7 +18,6 @@ async function tableReader({ connection, systemConnection, pureName, schemaName, driver = requireEngineDriver(connection); } const dbhan = systemConnection || (await connectUtility(driver, connection, 'read')); - logger.info(`Connected.`); const fullName = { pureName, schemaName }; diff --git a/packages/api/src/shell/tableWriter.js b/packages/api/src/shell/tableWriter.js index 7b53f0cd1..bb986d5fc 100644 --- a/packages/api/src/shell/tableWriter.js +++ b/packages/api/src/shell/tableWriter.js @@ -26,7 +26,6 @@ async function tableWriter({ connection, schemaName, pureName, driver, systemCon } const dbhan = systemConnection || (await connectUtility(driver, connection, 'write')); - logger.info(`Connected.`); return await driver.writeTable(dbhan, { schemaName, pureName }, options); } From bffc34485a9346508f4d34cdf3ce5953683333a4 Mon Sep 17 00:00:00 2001 From: "SPRINX0\\prochazka" Date: Tue, 4 Mar 2025 15:08:24 +0100 Subject: [PATCH 08/12] report wwritten rows --- packages/api/src/controllers/runners.js | 4 +- .../tools/src/createBulkInsertStreamBase.ts | 5 +++ packages/tools/src/rowProgressReporter.ts | 45 +++++++++++++++++++ packages/types/engines.d.ts | 1 + .../impexp/ImportExportConfigurator.svelte | 14 +++++- packages/web/src/impexp/createImpExpScript.ts | 1 + 6 files changed, 66 insertions(+), 4 deletions(-) create mode 100644 packages/tools/src/rowProgressReporter.ts diff --git a/packages/api/src/controllers/runners.js b/packages/api/src/controllers/runners.js index 778058f7a..86967cf90 100644 --- a/packages/api/src/controllers/runners.js +++ b/packages/api/src/controllers/runners.js @@ -107,8 +107,8 @@ module.exports = { } }, - handle_progress(runid, { progressName, status, errorMessage }) { - socket.emit(`runner-progress-${runid}`, { progressName, status, errorMessage }); + handle_progress(runid, progressData) { + socket.emit(`runner-progress-${runid}`, progressData); }, rejectRequest(runid, error) { diff --git a/packages/tools/src/createBulkInsertStreamBase.ts b/packages/tools/src/createBulkInsertStreamBase.ts index b54e69223..618d6785e 100644 --- a/packages/tools/src/createBulkInsertStreamBase.ts +++ b/packages/tools/src/createBulkInsertStreamBase.ts @@ -3,6 +3,7 @@ import _intersection from 'lodash/intersection'; import _fromPairs from 'lodash/fromPairs'; import { getLogger } from './getLogger'; import { prepareTableForImport } from './tableTransforms'; +import { RowProgressReporter } from './rowProgressReporter'; const logger = getLogger('bulkStreamBase'); @@ -21,6 +22,7 @@ export function createBulkInsertStreamBase(driver: EngineDriver, stream, dbhan, writable.columnNames = null; writable.columnDataTypes = null; writable.requireFixedStructure = driver.databaseEngineTypes.includes('sql'); + writable.rowsReporter = new RowProgressReporter(options.progressName); writable.addRow = async row => { if (writable.structure) { @@ -92,6 +94,7 @@ export function createBulkInsertStreamBase(driver: EngineDriver, stream, dbhan, // require('fs').writeFileSync('/home/jena/test.sql', dmp.s); // console.log(dmp.s); await driver.query(dbhan, dmp.s, { discardResult: true }); + writable.rowsReporter.add(rows.length); } else { for (const row of rows) { const dmp = driver.createDumper(); @@ -106,6 +109,7 @@ export function createBulkInsertStreamBase(driver: EngineDriver, stream, dbhan, dmp.putRaw(')'); // console.log(dmp.s); await driver.query(dbhan, dmp.s, { discardResult: true }); + writable.rowsReporter.add(1); } } if (options.commitAfterInsert) { @@ -129,6 +133,7 @@ export function createBulkInsertStreamBase(driver: EngineDriver, stream, dbhan, writable._final = async callback => { await writable.send(); + writable.rowsReporter.finish(); callback(); }; diff --git a/packages/tools/src/rowProgressReporter.ts b/packages/tools/src/rowProgressReporter.ts new file mode 100644 index 000000000..096ce2f3c --- /dev/null +++ b/packages/tools/src/rowProgressReporter.ts @@ -0,0 +1,45 @@ +export class RowProgressReporter { + counter = 0; + timeoutHandle = null; + + constructor(public progressName, public field = 'writtenRowCount') {} + + add(count: number) { + this.counter += count; + if (!this.progressName) { + return; + } + + if (this.timeoutHandle) { + return; + } + this.timeoutHandle = setTimeout(() => { + this.timeoutHandle = null; + this.send(); + }, 1000); + } + + finish() { + if (!this.progressName) { + return; + } + + if (this.timeoutHandle) { + clearTimeout(this.timeoutHandle); + this.timeoutHandle = null; + } + this.send(); + } + + send() { + if (!this.progressName) { + return; + } + + process.send({ + msgtype: 'progress', + progressName: this.progressName, + [this.field]: this.counter, + }); + } +} diff --git a/packages/types/engines.d.ts b/packages/types/engines.d.ts index b34b66746..048aa2f98 100644 --- a/packages/types/engines.d.ts +++ b/packages/types/engines.d.ts @@ -41,6 +41,7 @@ export interface WriteTableOptions { createIfNotExists?: boolean; commitAfterInsert?: boolean; targetTableStructure?: TableInfo; + progressName?: string; } export interface EngineAuthType { diff --git a/packages/web/src/impexp/ImportExportConfigurator.svelte b/packages/web/src/impexp/ImportExportConfigurator.svelte index 9309e98a8..5eb49fdd1 100644 --- a/packages/web/src/impexp/ImportExportConfigurator.svelte +++ b/packages/web/src/impexp/ImportExportConfigurator.svelte @@ -305,7 +305,12 @@ {#if progressHolder[row]?.status == 'running'} - Running + + {#if progressHolder[row]?.writtenRowCount} + {progressHolder[row]?.writtenRowCount} rows written + {:else} + Running + {/if} {:else if progressHolder[row]?.status == 'error'} Error {#if progressHolder[row]?.errorMessage} @@ -317,7 +322,12 @@ /> {/if} {:else if progressHolder[row]?.status == 'done'} - Done + + {#if progressHolder[row]?.writtenRowCount} + {progressHolder[row]?.writtenRowCount} rows written + {:else} + Done + {/if} {:else} Queued {/if} diff --git a/packages/web/src/impexp/createImpExpScript.ts b/packages/web/src/impexp/createImpExpScript.ts index c9961fb7b..6218f2ec0 100644 --- a/packages/web/src/impexp/createImpExpScript.ts +++ b/packages/web/src/impexp/createImpExpScript.ts @@ -164,6 +164,7 @@ function getTargetExpr(extensions, sourceName, values, targetConnection, targetD pureName: getTargetName(extensions, sourceName, values), ...extractDriverApiParameters(values, 'target', targetDriver), ...getFlagsFroAction(values[`actionType_${sourceName}`]), + progressName: sourceName, }, ]; } From 1d474a967c8bcebbc39ae91f52a74e90045657a8 Mon Sep 17 00:00:00 2001 From: "SPRINX0\\prochazka" Date: Tue, 4 Mar 2025 15:17:58 +0100 Subject: [PATCH 09/12] report read row count, if written row count not available --- packages/api/src/shell/copyStream.js | 24 ++++++++++++++++++- packages/tools/src/index.ts | 1 + .../impexp/ImportExportConfigurator.svelte | 6 ++++- 3 files changed, 29 insertions(+), 2 deletions(-) diff --git a/packages/api/src/shell/copyStream.js b/packages/api/src/shell/copyStream.js index c6471619f..edf9bbe52 100644 --- a/packages/api/src/shell/copyStream.js +++ b/packages/api/src/shell/copyStream.js @@ -1,8 +1,25 @@ const EnsureStreamHeaderStream = require('../utility/EnsureStreamHeaderStream'); const ColumnMapTransformStream = require('../utility/ColumnMapTransformStream'); const streamPipeline = require('../utility/streamPipeline'); -const { getLogger, extractErrorLogData } = require('dbgate-tools'); +const { getLogger, extractErrorLogData, RowProgressReporter } = require('dbgate-tools'); const logger = getLogger('copyStream'); +const stream = require('stream'); + +class ReportingTransform extends stream.Transform { + constructor(reporter, options = {}) { + super({ ...options, objectMode: true }); + this.reporter = reporter; + } + _transform(chunk, encoding, callback) { + this.reporter.add(1); + this.push(chunk); + callback(); + } + _flush(callback) { + this.reporter.finish(); + callback(); + } +} /** * Copies reader to writer. Used for import, export tables and transfer data between tables @@ -23,6 +40,11 @@ async function copyStream(input, output, options) { } const transforms = []; + + if (progressName) { + const reporter = new RowProgressReporter(progressName, 'readRowCount'); + transforms.push(new ReportingTransform(reporter)); + } if (columns) { transforms.push(new ColumnMapTransformStream(columns)); } diff --git a/packages/tools/src/index.ts b/packages/tools/src/index.ts index b446d0fd9..48c137d11 100644 --- a/packages/tools/src/index.ts +++ b/packages/tools/src/index.ts @@ -25,3 +25,4 @@ export * from './detectSqlFilterBehaviour'; export * from './filterBehaviours'; export * from './schemaInfoTools'; export * from './dbKeysLoader'; +export * from './rowProgressReporter'; \ No newline at end of file diff --git a/packages/web/src/impexp/ImportExportConfigurator.svelte b/packages/web/src/impexp/ImportExportConfigurator.svelte index 5eb49fdd1..0086f15e7 100644 --- a/packages/web/src/impexp/ImportExportConfigurator.svelte +++ b/packages/web/src/impexp/ImportExportConfigurator.svelte @@ -307,7 +307,9 @@ {#if progressHolder[row]?.status == 'running'} {#if progressHolder[row]?.writtenRowCount} - {progressHolder[row]?.writtenRowCount} rows written + {progressHolder[row]?.writtenRowCount} rows writtem + {:else if progressHolder[row]?.readRowCount} + {progressHolder[row]?.readRowCount} rows read {:else} Running {/if} @@ -325,6 +327,8 @@ {#if progressHolder[row]?.writtenRowCount} {progressHolder[row]?.writtenRowCount} rows written + {:else if progressHolder[row]?.readRowCount} + {progressHolder[row]?.readRowCount} rows read {:else} Done {/if} From 4015e2566ec94e6f8214b1290ae25e769b5d7336 Mon Sep 17 00:00:00 2001 From: "SPRINX0\\prochazka" Date: Tue, 4 Mar 2025 15:21:22 +0100 Subject: [PATCH 10/12] import/export progress reporter --- packages/web/src/impexp/ImportExportConfigurator.svelte | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/web/src/impexp/ImportExportConfigurator.svelte b/packages/web/src/impexp/ImportExportConfigurator.svelte index 0086f15e7..94b0deb11 100644 --- a/packages/web/src/impexp/ImportExportConfigurator.svelte +++ b/packages/web/src/impexp/ImportExportConfigurator.svelte @@ -328,7 +328,7 @@ {#if progressHolder[row]?.writtenRowCount} {progressHolder[row]?.writtenRowCount} rows written {:else if progressHolder[row]?.readRowCount} - {progressHolder[row]?.readRowCount} rows read + {progressHolder[row]?.readRowCount} rows written {:else} Done {/if} From cfc9b809fc31b46365f2f95c3393d7e41b77e8ce Mon Sep 17 00:00:00 2001 From: "SPRINX0\\prochazka" Date: Tue, 4 Mar 2025 15:30:18 +0100 Subject: [PATCH 11/12] key hack - correct reporting progress rows --- .../impexp/ImportExportConfigurator.svelte | 236 +++++++++--------- 1 file changed, 119 insertions(+), 117 deletions(-) diff --git a/packages/web/src/impexp/ImportExportConfigurator.svelte b/packages/web/src/impexp/ImportExportConfigurator.svelte index 94b0deb11..1c9f5619f 100644 --- a/packages/web/src/impexp/ImportExportConfigurator.svelte +++ b/packages/web/src/impexp/ImportExportConfigurator.svelte @@ -213,130 +213,132 @@
Map source tables/files
{#key targetEditKey} - ({ name: row }), - }, - { - fieldName: 'action', - header: 'Action', - component: SourceAction, - getProps: row => ({ name: row, targetDbinfo }), - }, - { - fieldName: 'target', - header: 'Target', - slot: 1, - }, - supportsPreview && { - fieldName: 'preview', - header: 'Preview', - slot: 0, - }, - !!progressHolder && { - fieldName: 'status', - header: 'Status', - slot: 3, - }, - { - fieldName: 'columns', - header: 'Columns', - slot: 2, - }, - ]} - > - - {#if supportsPreview} - { - // @ts-ignore - if (e.target.checked) $previewSource = row; - else $previewSource = null; - }} - /> - {/if} - - -
- - setFieldValue( - `targetName_${row}`, + {#key progressHolder} + ({ name: row }), + }, + { + fieldName: 'action', + header: 'Action', + component: SourceAction, + getProps: row => ({ name: row, targetDbinfo }), + }, + { + fieldName: 'target', + header: 'Target', + slot: 1, + }, + supportsPreview && { + fieldName: 'preview', + header: 'Preview', + slot: 0, + }, + !!progressHolder && { + fieldName: 'status', + header: 'Status', + slot: 3, + }, + { + fieldName: 'columns', + header: 'Columns', + slot: 2, + }, + ]} + > + + {#if supportsPreview} + { // @ts-ignore - e.target.value - )} - /> - {#if $targetDbinfo} - { - return $targetDbinfo.tables.map(opt => ({ - text: opt.pureName, - onClick: () => { - setFieldValue(`targetName_${row}`, opt.pureName); - targetEditKey += 1; - }, - })); + if (e.target.checked) $previewSource = row; + else $previewSource = null; }} /> {/if} -
-
- - {@const columnCount = ($values[`columns_${row}`] || []).filter(x => !x.skip).length} - { - const targetNameLower = ($values[`targetName_${row}`] || row)?.toLowerCase(); - showModal(ColumnMapModal, { - initialValue: $values[`columns_${row}`], - sourceTableInfo: $sourceDbinfo?.tables?.find(x => x.pureName?.toLowerCase() == row?.toLowerCase()), - targetTableInfo: $targetDbinfo?.tables?.find(x => x.pureName?.toLowerCase() == targetNameLower), - onConfirm: value => setFieldValue(`columns_${row}`, value), - }); - }} - >{columnCount > 0 ? `(${columnCount} columns)` : '(copy from source)'} - - - - {#if progressHolder[row]?.status == 'running'} - - {#if progressHolder[row]?.writtenRowCount} - {progressHolder[row]?.writtenRowCount} rows writtem - {:else if progressHolder[row]?.readRowCount} - {progressHolder[row]?.readRowCount} rows read - {:else} - Running - {/if} - {:else if progressHolder[row]?.status == 'error'} - Error - {#if progressHolder[row]?.errorMessage} - showModal(ErrorMessageModal, { message: progressHolder[row]?.errorMessage })} - style="cursor: pointer" + + +
+ + setFieldValue( + `targetName_${row}`, + // @ts-ignore + e.target.value + )} /> - {/if} - {:else if progressHolder[row]?.status == 'done'} - - {#if progressHolder[row]?.writtenRowCount} - {progressHolder[row]?.writtenRowCount} rows written - {:else if progressHolder[row]?.readRowCount} - {progressHolder[row]?.readRowCount} rows written + {#if $targetDbinfo} + { + return $targetDbinfo.tables.map(opt => ({ + text: opt.pureName, + onClick: () => { + setFieldValue(`targetName_${row}`, opt.pureName); + targetEditKey += 1; + }, + })); + }} + /> + {/if} +
+
+ + {@const columnCount = ($values[`columns_${row}`] || []).filter(x => !x.skip).length} + { + const targetNameLower = ($values[`targetName_${row}`] || row)?.toLowerCase(); + showModal(ColumnMapModal, { + initialValue: $values[`columns_${row}`], + sourceTableInfo: $sourceDbinfo?.tables?.find(x => x.pureName?.toLowerCase() == row?.toLowerCase()), + targetTableInfo: $targetDbinfo?.tables?.find(x => x.pureName?.toLowerCase() == targetNameLower), + onConfirm: value => setFieldValue(`columns_${row}`, value), + }); + }} + >{columnCount > 0 ? `(${columnCount} columns)` : '(copy from source)'} + + + + {#if progressHolder[row]?.status == 'running'} + + {#if progressHolder[row]?.writtenRowCount} + {progressHolder[row]?.writtenRowCount} rows writtem + {:else if progressHolder[row]?.readRowCount} + {progressHolder[row]?.readRowCount} rows read + {:else} + Running + {/if} + {:else if progressHolder[row]?.status == 'error'} + Error + {#if progressHolder[row]?.errorMessage} + showModal(ErrorMessageModal, { message: progressHolder[row]?.errorMessage })} + style="cursor: pointer" + /> + {/if} + {:else if progressHolder[row]?.status == 'done'} + + {#if progressHolder[row]?.writtenRowCount} + {progressHolder[row]?.writtenRowCount} rows written + {:else if progressHolder[row]?.readRowCount} + {progressHolder[row]?.readRowCount} rows written + {:else} + Done + {/if} {:else} - Done + Queued {/if} - {:else} - Queued - {/if} - -
+
+ + {/key} {/key} From 7b56485c747cc36c300fc6a100572fd4e2f57afd Mon Sep 17 00:00:00 2001 From: "SPRINX0\\prochazka" Date: Tue, 4 Mar 2025 15:51:43 +0100 Subject: [PATCH 12/12] report progress for quick exports --- packages/web/src/utility/exportFileTools.ts | 19 +++++++++++++++++-- packages/web/src/utility/snackbar.ts | 6 ++++++ packages/web/src/widgets/Snackbar.svelte | 12 ++++++++++++ 3 files changed, 35 insertions(+), 2 deletions(-) diff --git a/packages/web/src/utility/exportFileTools.ts b/packages/web/src/utility/exportFileTools.ts index c973eac48..e80fed076 100644 --- a/packages/web/src/utility/exportFileTools.ts +++ b/packages/web/src/utility/exportFileTools.ts @@ -1,6 +1,12 @@ import { ScriptWriter, ScriptWriterJson } from 'dbgate-tools'; import getElectron from './getElectron'; -import { showSnackbar, showSnackbarInfo, showSnackbarError, closeSnackbar } from '../utility/snackbar'; +import { + showSnackbar, + showSnackbarInfo, + showSnackbarError, + closeSnackbar, + updateSnackbarProgressMessage, +} from '../utility/snackbar'; import resolveApi, { resolveApiHeaders } from './resolveApi'; import { apiCall, apiOff, apiOn } from './api'; import { normalizeExportColumnMap } from '../impexp/createImpExpScript'; @@ -70,9 +76,17 @@ async function runImportExportScript({ script, runningMessage, canceledMessage, ], }); + function handleRunnerProgress(data) { + const rows = data.writtenRowsCount || data.readRowCount; + if (rows) { + updateSnackbarProgressMessage(snackId, `${rows} rows processed`); + } + } + function handleRunnerDone() { closeSnackbar(snackId); apiOff(`runner-done-${runid}`, handleRunnerDone); + apiOff(`runner-progress-${runid}`, handleRunnerProgress); if (isCanceled) { showSnackbarError(canceledMessage); } else { @@ -82,6 +96,7 @@ async function runImportExportScript({ script, runningMessage, canceledMessage, } apiOn(`runner-done-${runid}`, handleRunnerDone); + apiOn(`runner-progress-${runid}`, handleRunnerProgress); } export async function saveExportedFile(filters, defaultPath, extension, dataName, getScript: (filaPath: string) => {}) { @@ -141,7 +156,7 @@ function generateQuickExportScript( script.assignValue(colmapVar, colmap); } - script.copyStream(sourceVar, targetVar, colmapVar); + script.copyStream(sourceVar, targetVar, colmapVar, 'data'); script.endLine(); return script.getScript(); diff --git a/packages/web/src/utility/snackbar.ts b/packages/web/src/utility/snackbar.ts index 4214a109c..b7ee16eab 100644 --- a/packages/web/src/utility/snackbar.ts +++ b/packages/web/src/utility/snackbar.ts @@ -8,6 +8,7 @@ export interface SnackbarButton { export interface SnackbarInfo { message: string; + progressMessage?: string; icon?: string; autoClose?: boolean; allowClose?: boolean; @@ -59,6 +60,11 @@ export function showSnackbarError(message: string) { export function closeSnackbar(snackId: string) { openedSnackbars.update(x => x.filter(x => x.id != snackId)); } + +export function updateSnackbarProgressMessage(snackId: string, progressMessage: string) { + openedSnackbars.update(x => x.map(x => (x.id === snackId ? { ...x, progressMessage } : x))); +} + // showSnackbar({ // icon: 'img ok', // message: 'Test snackbar', diff --git a/packages/web/src/widgets/Snackbar.svelte b/packages/web/src/widgets/Snackbar.svelte index 61bc6f522..83a1fa768 100644 --- a/packages/web/src/widgets/Snackbar.svelte +++ b/packages/web/src/widgets/Snackbar.svelte @@ -10,6 +10,7 @@ export let autoClose = false; export let allowClose = false; export let buttons = []; + export let progressMessage = null; function handleClose() { openedSnackbars.update(x => x.filter(x => x.id != id)); @@ -25,6 +26,11 @@ {message} + {#if progressMessage} +
+ {progressMessage} +
+ {/if} {#if allowClose}
@@ -83,4 +89,10 @@ .button { margin: 5px; } + + .progress-message { + color: var(--theme-font-3); + margin: 10px; + margin-left: 30px; + }