diff --git a/packages/api/src/controllers/runners.js b/packages/api/src/controllers/runners.js index 6708831f6..86967cf90 100644 --- a/packages/api/src/controllers/runners.js +++ b/packages/api/src/controllers/runners.js @@ -94,14 +94,26 @@ module.exports = { handle_ping() {}, handle_freeData(runid, { freeData }) { - const [resolve, reject] = this.requests[runid]; + const { resolve } = this.requests[runid]; resolve(freeData); delete this.requests[runid]; }, + handle_copyStreamError(runid, { copyStreamError }) { + const { reject, exitOnStreamError } = this.requests[runid] || {}; + if (exitOnStreamError) { + reject(copyStreamError); + delete this.requests[runid]; + } + }, + + handle_progress(runid, progressData) { + socket.emit(`runner-progress-${runid}`, progressData); + }, + rejectRequest(runid, error) { if (this.requests[runid]) { - const [resolve, reject] = this.requests[runid]; + const { reject } = this.requests[runid]; reject(error); delete this.requests[runid]; } @@ -113,6 +125,8 @@ module.exports = { fs.writeFileSync(`${scriptFile}`, scriptText); fs.mkdirSync(directory); const pluginNames = extractPlugins(scriptText); + // console.log('********************** SCRIPT TEXT **********************'); + // console.log(scriptText); logger.info({ scriptFile }, 'Running script'); // const subprocess = fork(scriptFile, ['--checkParent', '--max-old-space-size=8192'], { const subprocess = fork( @@ -150,11 +164,13 @@ module.exports = { byline(subprocess.stdout).on('data', pipeDispatcher('info')); byline(subprocess.stderr).on('data', pipeDispatcher('error')); subprocess.on('exit', code => { + // console.log('... EXITED', code); this.rejectRequest(runid, { message: 'No data returned, maybe input data source is too big' }); logger.info({ code, pid: subprocess.pid }, 'Exited process'); socket.emit(`runner-done-${runid}`, code); }); subprocess.on('error', error => { + // console.log('... ERROR subprocess', error); this.rejectRequest(runid, { message: error && (error.message || error.toString()) }); console.error('... ERROR subprocess', error); this.dispatchMessage({ @@ -231,7 +247,7 @@ module.exports = { const promise = new Promise((resolve, reject) => { const runid = crypto.randomUUID(); - this.requests[runid] = [resolve, reject]; + this.requests[runid] = { resolve, reject, exitOnStreamError: true }; this.startCore(runid, loaderScriptTemplate(prefix, functionName, props, runid)); }); return promise; diff --git a/packages/api/src/shell/copyStream.js b/packages/api/src/shell/copyStream.js index 8d912a56c..edf9bbe52 100644 --- a/packages/api/src/shell/copyStream.js +++ b/packages/api/src/shell/copyStream.js @@ -1,6 +1,25 @@ const EnsureStreamHeaderStream = require('../utility/EnsureStreamHeaderStream'); -const Stream = require('stream'); const ColumnMapTransformStream = require('../utility/ColumnMapTransformStream'); +const streamPipeline = require('../utility/streamPipeline'); +const { getLogger, extractErrorLogData, RowProgressReporter } = require('dbgate-tools'); +const logger = getLogger('copyStream'); +const stream = require('stream'); + +class ReportingTransform extends stream.Transform { + constructor(reporter, options = {}) { + super({ ...options, objectMode: true }); + this.reporter = reporter; + } + _transform(chunk, encoding, callback) { + this.reporter.add(1); + this.push(chunk); + callback(); + } + _flush(callback) { + this.reporter.finish(); + callback(); + } +} /** * Copies reader to writer. Used for import, export tables and transfer data between tables @@ -9,10 +28,23 @@ const ColumnMapTransformStream = require('../utility/ColumnMapTransformStream'); * @param {object} options - options * @returns {Promise} */ -function copyStream(input, output, options) { - const { columns } = options || {}; +async function copyStream(input, output, options) { + const { columns, progressName } = options || {}; + + if (progressName) { + process.send({ + msgtype: 'progress', + progressName, + status: 'running', + }); + } const transforms = []; + + if (progressName) { + const reporter = new RowProgressReporter(progressName, 'readRowCount'); + transforms.push(new ReportingTransform(reporter)); + } if (columns) { transforms.push(new ColumnMapTransformStream(columns)); } @@ -20,36 +52,37 @@ function copyStream(input, output, options) { transforms.push(new EnsureStreamHeaderStream()); } - // return new Promise((resolve, reject) => { - // Stream.pipeline(input, ...transforms, output, err => { - // if (err) { - // reject(err); - // } else { - // resolve(); - // } - // }); - // }); + try { + await streamPipeline(input, transforms, output); - return new Promise((resolve, reject) => { - const finisher = output['finisher'] || output; - finisher.on('finish', resolve); - finisher.on('error', reject); - - let lastStream = input; - for (const tran of transforms) { - lastStream.pipe(tran); - lastStream = tran; + if (progressName) { + process.send({ + msgtype: 'progress', + progressName, + status: 'done', + }); } - lastStream.pipe(output); + } catch (err) { + process.send({ + msgtype: 'copyStreamError', + copyStreamError: { + message: err.message, + ...err, + }, + }); - // if (output.requireFixedStructure) { - // const ensureHeader = new EnsureStreamHeaderStream(); - // input.pipe(ensureHeader); - // ensureHeader.pipe(output); - // } else { - // input.pipe(output); - // } - }); + if (progressName) { + process.send({ + msgtype: 'progress', + progressName, + status: 'error', + errorMessage: err.message, + }); + } + + logger.error(extractErrorLogData(err, { progressName }), 'Import/export job failed'); + // throw err; + } } module.exports = copyStream; diff --git a/packages/api/src/shell/dataDuplicator.js b/packages/api/src/shell/dataDuplicator.js index 750563ff3..e08ede4cf 100644 --- a/packages/api/src/shell/dataDuplicator.js +++ b/packages/api/src/shell/dataDuplicator.js @@ -24,8 +24,6 @@ async function dataDuplicator({ const dbhan = systemConnection || (await connectUtility(driver, connection, 'write')); try { - logger.info(`Connected.`); - if (!analysedStructure) { analysedStructure = await driver.analyseFull(dbhan); } diff --git a/packages/api/src/shell/dropAllDbObjects.js b/packages/api/src/shell/dropAllDbObjects.js index 4ba806052..5623f00bb 100644 --- a/packages/api/src/shell/dropAllDbObjects.js +++ b/packages/api/src/shell/dropAllDbObjects.js @@ -19,8 +19,6 @@ async function dropAllDbObjects({ connection, systemConnection, driver, analysed const dbhan = systemConnection || (await connectUtility(driver, connection, 'write')); - logger.info(`Connected.`); - if (!analysedStructure) { analysedStructure = await driver.analyseFull(dbhan); } diff --git a/packages/api/src/shell/dumpDatabase.js b/packages/api/src/shell/dumpDatabase.js index 45644e038..8d8a016a5 100644 --- a/packages/api/src/shell/dumpDatabase.js +++ b/packages/api/src/shell/dumpDatabase.js @@ -31,8 +31,6 @@ async function dumpDatabase({ const dbhan = systemConnection || (await connectUtility(driver, connection, 'read', { forceRowsAsObjects: true })); try { - logger.info(`Connected.`); - const dumper = await driver.createBackupDumper(dbhan, { outputFile, databaseName, diff --git a/packages/api/src/shell/executeQuery.js b/packages/api/src/shell/executeQuery.js index 0adcf3700..3f54267f0 100644 --- a/packages/api/src/shell/executeQuery.js +++ b/packages/api/src/shell/executeQuery.js @@ -36,7 +36,7 @@ async function executeQuery({ } try { - logger.info(`Connected.`); + logger.debug(`Running SQL query, length: ${sql.length}`); await driver.script(dbhan, sql, { logScriptItems }); } finally { diff --git a/packages/api/src/shell/importDatabase.js b/packages/api/src/shell/importDatabase.js index a10cb4632..90bbceb29 100644 --- a/packages/api/src/shell/importDatabase.js +++ b/packages/api/src/shell/importDatabase.js @@ -5,6 +5,7 @@ const { splitQueryStream } = require('dbgate-query-splitter/lib/splitQueryStream const download = require('./download'); const stream = require('stream'); const { getLogger } = require('dbgate-tools'); +const streamPipeline = require('../utility/streamPipeline'); const logger = getLogger('importDb'); @@ -43,25 +44,12 @@ class ImportStream extends stream.Transform { } } -function awaitStreamEnd(stream) { - return new Promise((resolve, reject) => { - stream.once('end', () => { - resolve(true); - }); - stream.once('error', err => { - reject(err); - }); - }); -} - async function importDatabase({ connection = undefined, systemConnection = undefined, driver = undefined, inputFile }) { logger.info(`Importing database`); if (!driver) driver = requireEngineDriver(connection); const dbhan = systemConnection || (await connectUtility(driver, connection, 'write')); try { - logger.info(`Connected.`); - logger.info(`Input file: ${inputFile}`); const downloadedFile = await download(inputFile); logger.info(`Downloaded file: ${downloadedFile}`); @@ -72,9 +60,8 @@ async function importDatabase({ connection = undefined, systemConnection = undef returnRichInfo: true, }); const importStream = new ImportStream(dbhan, driver); - // @ts-ignore - splittedStream.pipe(importStream); - await awaitStreamEnd(importStream); + + await streamPipeline(splittedStream, importStream); } finally { if (!systemConnection) { await driver.close(dbhan); diff --git a/packages/api/src/shell/jsonLinesReader.js b/packages/api/src/shell/jsonLinesReader.js index f1d47dde5..da3d5d85d 100644 --- a/packages/api/src/shell/jsonLinesReader.js +++ b/packages/api/src/shell/jsonLinesReader.js @@ -53,8 +53,7 @@ async function jsonLinesReader({ fileName, encoding = 'utf-8', limitRows = undef ); const liner = byline(fileStream); const parser = new ParseStream({ limitRows }); - liner.pipe(parser); - return parser; + return [liner, parser]; } module.exports = jsonLinesReader; diff --git a/packages/api/src/shell/jsonReader.js b/packages/api/src/shell/jsonReader.js index 2de75930c..654ecf633 100644 --- a/packages/api/src/shell/jsonReader.js +++ b/packages/api/src/shell/jsonReader.js @@ -10,7 +10,6 @@ const download = require('./download'); const logger = getLogger('jsonReader'); - class ParseStream extends stream.Transform { constructor({ limitRows, jsonStyle, keyField }) { super({ objectMode: true }); @@ -72,8 +71,12 @@ async function jsonReader({ // @ts-ignore encoding ); + const parseJsonStream = parser(); - fileStream.pipe(parseJsonStream); + + const resultPipe = [fileStream, parseJsonStream]; + + // fileStream.pipe(parseJsonStream); const parseStream = new ParseStream({ limitRows, jsonStyle, keyField }); @@ -81,15 +84,20 @@ async function jsonReader({ if (rootField) { const filterStream = pick({ filter: rootField }); - parseJsonStream.pipe(filterStream); - filterStream.pipe(tramsformer); - } else { - parseJsonStream.pipe(tramsformer); + resultPipe.push(filterStream); + // parseJsonStream.pipe(filterStream); + // filterStream.pipe(tramsformer); } + // else { + // parseJsonStream.pipe(tramsformer); + // } - tramsformer.pipe(parseStream); + resultPipe.push(tramsformer); + resultPipe.push(parseStream); - return parseStream; + // tramsformer.pipe(parseStream); + + return resultPipe; } module.exports = jsonReader; diff --git a/packages/api/src/shell/jsonWriter.js b/packages/api/src/shell/jsonWriter.js index 8058cabdb..28f622cca 100644 --- a/packages/api/src/shell/jsonWriter.js +++ b/packages/api/src/shell/jsonWriter.js @@ -99,9 +99,10 @@ async function jsonWriter({ fileName, jsonStyle, keyField = '_key', rootField, e logger.info(`Writing file ${fileName}`); const stringify = new StringifyStream({ jsonStyle, keyField, rootField }); const fileStream = fs.createWriteStream(fileName, encoding); - stringify.pipe(fileStream); - stringify['finisher'] = fileStream; - return stringify; + return [stringify, fileStream]; + // stringify.pipe(fileStream); + // stringify['finisher'] = fileStream; + // return stringify; } module.exports = jsonWriter; diff --git a/packages/api/src/shell/loadDatabase.js b/packages/api/src/shell/loadDatabase.js index d657e61aa..f729342d1 100644 --- a/packages/api/src/shell/loadDatabase.js +++ b/packages/api/src/shell/loadDatabase.js @@ -6,15 +6,13 @@ const exportDbModel = require('../utility/exportDbModel'); const logger = getLogger('analyseDb'); async function loadDatabase({ connection = undefined, systemConnection = undefined, driver = undefined, outputDir }) { - logger.info(`Analysing database`); + logger.debug(`Analysing database`); if (!driver) driver = requireEngineDriver(connection); const dbhan = systemConnection || (await connectUtility(driver, connection, 'read', { forceRowsAsObjects: true })); try { - logger.info(`Connected.`); - const dbInfo = await driver.analyseFull(dbhan); - logger.info(`Analyse finished`); + logger.debug(`Analyse finished`); await exportDbModel(dbInfo, outputDir); } finally { diff --git a/packages/api/src/shell/modifyJsonLinesReader.js b/packages/api/src/shell/modifyJsonLinesReader.js index dec137ba0..32040acec 100644 --- a/packages/api/src/shell/modifyJsonLinesReader.js +++ b/packages/api/src/shell/modifyJsonLinesReader.js @@ -141,8 +141,9 @@ async function modifyJsonLinesReader({ ); const liner = byline(fileStream); const parser = new ParseStream({ limitRows, changeSet, mergedRows, mergeKey, mergeMode }); - liner.pipe(parser); - return parser; + return [liner, parser]; + // liner.pipe(parser); + // return parser; } module.exports = modifyJsonLinesReader; diff --git a/packages/api/src/shell/queryReader.js b/packages/api/src/shell/queryReader.js index dd76a296c..0d9a16296 100644 --- a/packages/api/src/shell/queryReader.js +++ b/packages/api/src/shell/queryReader.js @@ -30,7 +30,6 @@ async function queryReader({ const driver = requireEngineDriver(connection); const pool = await connectUtility(driver, connection, queryType == 'json' ? 'read' : 'script'); - logger.info(`Connected.`); const reader = queryType == 'json' ? await driver.readJsonQuery(pool, query) : await driver.readQuery(pool, query || sql); return reader; diff --git a/packages/api/src/shell/sqlDataWriter.js b/packages/api/src/shell/sqlDataWriter.js index 3dd6bad26..c83dab161 100644 --- a/packages/api/src/shell/sqlDataWriter.js +++ b/packages/api/src/shell/sqlDataWriter.js @@ -44,9 +44,10 @@ async function sqlDataWriter({ fileName, dataName, driver, encoding = 'utf-8' }) logger.info(`Writing file ${fileName}`); const stringify = new SqlizeStream({ fileName, dataName }); const fileStream = fs.createWriteStream(fileName, encoding); - stringify.pipe(fileStream); - stringify['finisher'] = fileStream; - return stringify; + return [stringify, fileStream]; + // stringify.pipe(fileStream); + // stringify['finisher'] = fileStream; + // return stringify; } module.exports = sqlDataWriter; diff --git a/packages/api/src/shell/tableReader.js b/packages/api/src/shell/tableReader.js index 6ec0d1fb8..466499de4 100644 --- a/packages/api/src/shell/tableReader.js +++ b/packages/api/src/shell/tableReader.js @@ -18,7 +18,6 @@ async function tableReader({ connection, systemConnection, pureName, schemaName, driver = requireEngineDriver(connection); } const dbhan = systemConnection || (await connectUtility(driver, connection, 'read')); - logger.info(`Connected.`); const fullName = { pureName, schemaName }; diff --git a/packages/api/src/shell/tableWriter.js b/packages/api/src/shell/tableWriter.js index 7b53f0cd1..bb986d5fc 100644 --- a/packages/api/src/shell/tableWriter.js +++ b/packages/api/src/shell/tableWriter.js @@ -26,7 +26,6 @@ async function tableWriter({ connection, schemaName, pureName, driver, systemCon } const dbhan = systemConnection || (await connectUtility(driver, connection, 'write')); - logger.info(`Connected.`); return await driver.writeTable(dbhan, { schemaName, pureName }, options); } diff --git a/packages/api/src/utility/streamPipeline.js b/packages/api/src/utility/streamPipeline.js new file mode 100644 index 000000000..11183b076 --- /dev/null +++ b/packages/api/src/utility/streamPipeline.js @@ -0,0 +1,18 @@ +const stream = require('stream'); +const _ = require('lodash'); + +function streamPipeline(...processedStreams) { + const streams = _.flattenDeep(processedStreams); + return new Promise((resolve, reject) => { + // @ts-ignore + stream.pipeline(...streams, err => { + if (err) { + reject(err); + } else { + resolve(); + } + }); + }); +} + +module.exports = streamPipeline; diff --git a/packages/tools/src/ScriptWriter.ts b/packages/tools/src/ScriptWriter.ts index 7c4f93b09..23afee0f4 100644 --- a/packages/tools/src/ScriptWriter.ts +++ b/packages/tools/src/ScriptWriter.ts @@ -41,12 +41,13 @@ export class ScriptWriter { this.packageNames.push(packageName); } - copyStream(sourceVar, targetVar, colmapVar = null) { - if (colmapVar) { - this._put(`await dbgateApi.copyStream(${sourceVar}, ${targetVar}, {columns: ${colmapVar}});`); - } else { - this._put(`await dbgateApi.copyStream(${sourceVar}, ${targetVar});`); - } + copyStream(sourceVar, targetVar, colmapVar = null, progressName?: string) { + let opts = '{'; + if (colmapVar) opts += `columns: ${colmapVar}, `; + if (progressName) opts += `progressName: "${progressName}", `; + opts += '}'; + + this._put(`await dbgateApi.copyStream(${sourceVar}, ${targetVar}, ${opts});`); } dumpDatabase(options) { @@ -117,12 +118,13 @@ export class ScriptWriterJson { }); } - copyStream(sourceVar, targetVar, colmapVar = null) { + copyStream(sourceVar, targetVar, colmapVar = null, progressName?: string) { this.commands.push({ type: 'copyStream', sourceVar, targetVar, colmapVar, + progressName, }); } @@ -183,7 +185,7 @@ export function jsonScriptToJavascript(json) { script.assignValue(cmd.variableName, cmd.jsonValue); break; case 'copyStream': - script.copyStream(cmd.sourceVar, cmd.targetVar, cmd.colmapVar); + script.copyStream(cmd.sourceVar, cmd.targetVar, cmd.colmapVar, cmd.progressName); break; case 'endLine': script.endLine(); diff --git a/packages/tools/src/createBulkInsertStreamBase.ts b/packages/tools/src/createBulkInsertStreamBase.ts index b54e69223..618d6785e 100644 --- a/packages/tools/src/createBulkInsertStreamBase.ts +++ b/packages/tools/src/createBulkInsertStreamBase.ts @@ -3,6 +3,7 @@ import _intersection from 'lodash/intersection'; import _fromPairs from 'lodash/fromPairs'; import { getLogger } from './getLogger'; import { prepareTableForImport } from './tableTransforms'; +import { RowProgressReporter } from './rowProgressReporter'; const logger = getLogger('bulkStreamBase'); @@ -21,6 +22,7 @@ export function createBulkInsertStreamBase(driver: EngineDriver, stream, dbhan, writable.columnNames = null; writable.columnDataTypes = null; writable.requireFixedStructure = driver.databaseEngineTypes.includes('sql'); + writable.rowsReporter = new RowProgressReporter(options.progressName); writable.addRow = async row => { if (writable.structure) { @@ -92,6 +94,7 @@ export function createBulkInsertStreamBase(driver: EngineDriver, stream, dbhan, // require('fs').writeFileSync('/home/jena/test.sql', dmp.s); // console.log(dmp.s); await driver.query(dbhan, dmp.s, { discardResult: true }); + writable.rowsReporter.add(rows.length); } else { for (const row of rows) { const dmp = driver.createDumper(); @@ -106,6 +109,7 @@ export function createBulkInsertStreamBase(driver: EngineDriver, stream, dbhan, dmp.putRaw(')'); // console.log(dmp.s); await driver.query(dbhan, dmp.s, { discardResult: true }); + writable.rowsReporter.add(1); } } if (options.commitAfterInsert) { @@ -129,6 +133,7 @@ export function createBulkInsertStreamBase(driver: EngineDriver, stream, dbhan, writable._final = async callback => { await writable.send(); + writable.rowsReporter.finish(); callback(); }; diff --git a/packages/tools/src/index.ts b/packages/tools/src/index.ts index b446d0fd9..48c137d11 100644 --- a/packages/tools/src/index.ts +++ b/packages/tools/src/index.ts @@ -25,3 +25,4 @@ export * from './detectSqlFilterBehaviour'; export * from './filterBehaviours'; export * from './schemaInfoTools'; export * from './dbKeysLoader'; +export * from './rowProgressReporter'; \ No newline at end of file diff --git a/packages/tools/src/rowProgressReporter.ts b/packages/tools/src/rowProgressReporter.ts new file mode 100644 index 000000000..096ce2f3c --- /dev/null +++ b/packages/tools/src/rowProgressReporter.ts @@ -0,0 +1,45 @@ +export class RowProgressReporter { + counter = 0; + timeoutHandle = null; + + constructor(public progressName, public field = 'writtenRowCount') {} + + add(count: number) { + this.counter += count; + if (!this.progressName) { + return; + } + + if (this.timeoutHandle) { + return; + } + this.timeoutHandle = setTimeout(() => { + this.timeoutHandle = null; + this.send(); + }, 1000); + } + + finish() { + if (!this.progressName) { + return; + } + + if (this.timeoutHandle) { + clearTimeout(this.timeoutHandle); + this.timeoutHandle = null; + } + this.send(); + } + + send() { + if (!this.progressName) { + return; + } + + process.send({ + msgtype: 'progress', + progressName: this.progressName, + [this.field]: this.counter, + }); + } +} diff --git a/packages/types/engines.d.ts b/packages/types/engines.d.ts index 11235f5e6..048aa2f98 100644 --- a/packages/types/engines.d.ts +++ b/packages/types/engines.d.ts @@ -41,6 +41,7 @@ export interface WriteTableOptions { createIfNotExists?: boolean; commitAfterInsert?: boolean; targetTableStructure?: TableInfo; + progressName?: string; } export interface EngineAuthType { @@ -144,6 +145,8 @@ export interface DatabaseHandle { treeKeySeparator?: string; } +export type StreamResult = stream.Readable | (stream.Readable | stream.Writable)[]; + export interface EngineDriver extends FilterBehaviourProvider { engine: string; title: string; @@ -191,15 +194,11 @@ export interface EngineDriver extends FilterBehaviourProvider { close(dbhan: DatabaseHandle): Promise; query(dbhan: DatabaseHandle, sql: string, options?: QueryOptions): Promise; stream(dbhan: DatabaseHandle, sql: string, options: StreamOptions); - readQuery(dbhan: DatabaseHandle, sql: string, structure?: TableInfo): Promise; - readJsonQuery(dbhan: DatabaseHandle, query: any, structure?: TableInfo): Promise; + readQuery(dbhan: DatabaseHandle, sql: string, structure?: TableInfo): Promise; + readJsonQuery(dbhan: DatabaseHandle, query: any, structure?: TableInfo): Promise; // eg. PostgreSQL COPY FROM stdin - writeQueryFromStream(dbhan: DatabaseHandle, sql: string): Promise; - writeTable( - dbhan: DatabaseHandle, - name: NamedObjectInfo, - options: WriteTableOptions - ): Promise; + writeQueryFromStream(dbhan: DatabaseHandle, sql: string): Promise; + writeTable(dbhan: DatabaseHandle, name: NamedObjectInfo, options: WriteTableOptions): Promise; analyseSingleObject( dbhan: DatabaseHandle, name: NamedObjectInfo, diff --git a/packages/web/src/elements/TableControl.svelte b/packages/web/src/elements/TableControl.svelte index 931f8f8ec..e8eb1051b 100644 --- a/packages/web/src/elements/TableControl.svelte +++ b/packages/web/src/elements/TableControl.svelte @@ -20,7 +20,7 @@ import { createEventDispatcher } from 'svelte'; import FontIcon from '../icons/FontIcon.svelte'; - export let columns: TableControlColumn[]; + export let columns: (TableControlColumn | false)[]; export let rows; export let focusOnCreate = false; export let selectable = false; diff --git a/packages/web/src/icons/FontIcon.svelte b/packages/web/src/icons/FontIcon.svelte index c5ed2f18a..df808a488 100644 --- a/packages/web/src/icons/FontIcon.svelte +++ b/packages/web/src/icons/FontIcon.svelte @@ -149,6 +149,7 @@ 'icon download': 'mdi mdi-download', 'icon text': 'mdi mdi-text', 'icon ai': 'mdi mdi-head-lightbulb', + 'icon wait': 'mdi mdi-timer-sand', 'icon run': 'mdi mdi-play', 'icon chevron-down': 'mdi mdi-chevron-down', diff --git a/packages/web/src/impexp/ImportExportConfigurator.svelte b/packages/web/src/impexp/ImportExportConfigurator.svelte index efb286ded..1c9f5619f 100644 --- a/packages/web/src/impexp/ImportExportConfigurator.svelte +++ b/packages/web/src/impexp/ImportExportConfigurator.svelte @@ -76,6 +76,7 @@ import { compositeDbNameIfNeeded } from 'dbgate-tools'; import createRef from '../utility/createRef'; import DropDownButton from '../buttons/DropDownButton.svelte'; + import ErrorMessageModal from '../modals/ErrorMessageModal.svelte'; // export let uploadedFile = undefined; // export let openedFile = undefined; @@ -104,6 +105,7 @@ $: sourceList = $values.sourceList; let targetEditKey = 0; + export let progressHolder = null; const previewSource = writable(null); @@ -211,92 +213,132 @@
Map source tables/files
{#key targetEditKey} - ({ name: row }), - }, - { - fieldName: 'action', - header: 'Action', - component: SourceAction, - getProps: row => ({ name: row, targetDbinfo }), - }, - { - fieldName: 'target', - header: 'Target', - slot: 1, - }, - { - fieldName: 'preview', - header: 'Preview', - slot: 0, - }, - { - fieldName: 'columns', - header: 'Columns', - slot: 2, - }, - ]} - > - - {#if supportsPreview} - { - // @ts-ignore - if (e.target.checked) $previewSource = row; - else $previewSource = null; - }} - /> - {/if} - - -
- - setFieldValue( - `targetName_${row}`, + {#key progressHolder} + ({ name: row }), + }, + { + fieldName: 'action', + header: 'Action', + component: SourceAction, + getProps: row => ({ name: row, targetDbinfo }), + }, + { + fieldName: 'target', + header: 'Target', + slot: 1, + }, + supportsPreview && { + fieldName: 'preview', + header: 'Preview', + slot: 0, + }, + !!progressHolder && { + fieldName: 'status', + header: 'Status', + slot: 3, + }, + { + fieldName: 'columns', + header: 'Columns', + slot: 2, + }, + ]} + > + + {#if supportsPreview} + { // @ts-ignore - e.target.value - )} - /> - {#if $targetDbinfo} - { - return $targetDbinfo.tables.map(opt => ({ - text: opt.pureName, - onClick: () => { - setFieldValue(`targetName_${row}`, opt.pureName); - targetEditKey += 1; - }, - })); + if (e.target.checked) $previewSource = row; + else $previewSource = null; }} /> {/if} -
-
- - {@const columnCount = ($values[`columns_${row}`] || []).filter(x => !x.skip).length} - { - const targetNameLower = ($values[`targetName_${row}`] || row)?.toLowerCase(); - showModal(ColumnMapModal, { - initialValue: $values[`columns_${row}`], - sourceTableInfo: $sourceDbinfo?.tables?.find(x => x.pureName?.toLowerCase() == row?.toLowerCase()), - targetTableInfo: $targetDbinfo?.tables?.find(x => x.pureName?.toLowerCase() == targetNameLower), - onConfirm: value => setFieldValue(`columns_${row}`, value), - }); - }} - >{columnCount > 0 ? `(${columnCount} columns)` : '(copy from source)'} - - -
+ + +
+ + setFieldValue( + `targetName_${row}`, + // @ts-ignore + e.target.value + )} + /> + {#if $targetDbinfo} + { + return $targetDbinfo.tables.map(opt => ({ + text: opt.pureName, + onClick: () => { + setFieldValue(`targetName_${row}`, opt.pureName); + targetEditKey += 1; + }, + })); + }} + /> + {/if} +
+
+ + {@const columnCount = ($values[`columns_${row}`] || []).filter(x => !x.skip).length} + { + const targetNameLower = ($values[`targetName_${row}`] || row)?.toLowerCase(); + showModal(ColumnMapModal, { + initialValue: $values[`columns_${row}`], + sourceTableInfo: $sourceDbinfo?.tables?.find(x => x.pureName?.toLowerCase() == row?.toLowerCase()), + targetTableInfo: $targetDbinfo?.tables?.find(x => x.pureName?.toLowerCase() == targetNameLower), + onConfirm: value => setFieldValue(`columns_${row}`, value), + }); + }} + >{columnCount > 0 ? `(${columnCount} columns)` : '(copy from source)'} + + + + {#if progressHolder[row]?.status == 'running'} + + {#if progressHolder[row]?.writtenRowCount} + {progressHolder[row]?.writtenRowCount} rows writtem + {:else if progressHolder[row]?.readRowCount} + {progressHolder[row]?.readRowCount} rows read + {:else} + Running + {/if} + {:else if progressHolder[row]?.status == 'error'} + Error + {#if progressHolder[row]?.errorMessage} + showModal(ErrorMessageModal, { message: progressHolder[row]?.errorMessage })} + style="cursor: pointer" + /> + {/if} + {:else if progressHolder[row]?.status == 'done'} + + {#if progressHolder[row]?.writtenRowCount} + {progressHolder[row]?.writtenRowCount} rows written + {:else if progressHolder[row]?.readRowCount} + {progressHolder[row]?.readRowCount} rows written + {:else} + Done + {/if} + {:else} + Queued + {/if} + + + {/key} {/key} diff --git a/packages/web/src/impexp/createImpExpScript.ts b/packages/web/src/impexp/createImpExpScript.ts index 0fa3c076c..6218f2ec0 100644 --- a/packages/web/src/impexp/createImpExpScript.ts +++ b/packages/web/src/impexp/createImpExpScript.ts @@ -164,6 +164,7 @@ function getTargetExpr(extensions, sourceName, values, targetConnection, targetD pureName: getTargetName(extensions, sourceName, values), ...extractDriverApiParameters(values, 'target', targetDriver), ...getFlagsFroAction(values[`actionType_${sourceName}`]), + progressName: sourceName, }, ]; } @@ -233,7 +234,7 @@ export default async function createImpExpScript(extensions, values, forceScript script.assignValue(colmapVar, colmap); } - script.copyStream(sourceVar, targetVar, colmapVar); + script.copyStream(sourceVar, targetVar, colmapVar, sourceName); script.endLine(); } return script.getScript(values.schedule); diff --git a/packages/web/src/query/MessageView.svelte b/packages/web/src/query/MessageView.svelte index ce6d9dae5..4d67e3c77 100644 --- a/packages/web/src/query/MessageView.svelte +++ b/packages/web/src/query/MessageView.svelte @@ -18,7 +18,7 @@ // $: console.log('MESSAGE ROWS', items); const values = writable({ - hideDebug: false, + hideDebug: true, hideInfo: false, hideError: false, }); diff --git a/packages/web/src/tabs/ImportExportTab.svelte b/packages/web/src/tabs/ImportExportTab.svelte index 37e4321cc..ab027d607 100644 --- a/packages/web/src/tabs/ImportExportTab.svelte +++ b/packages/web/src/tabs/ImportExportTab.svelte @@ -65,6 +65,7 @@ export let savedFile; export let savedFilePath; + let progressHolder = null; const refreshArchiveFolderRef = createRef(null); const formValues = writable({}); @@ -179,6 +180,7 @@ const handleExecute = async e => { if (busy) return; + progressHolder = {}; const values = $formValues as any; busy = true; const script = await createImpExpScript($extensions, values); @@ -228,6 +230,29 @@ title: `${getSourceTargetTitle('source', values)}->${getSourceTargetTitle('target', values)}(${values.sourceList?.length || 0})`, })); } + + const handleProgress = progress => { + progressHolder = { + ...progressHolder, + [progress.progressName]: { + ...progressHolder[progress.progressName], + ...progress, + }, + }; + }; + + $: progressEffect = useEffect(() => { + if (runnerId) { + const eventName = `runner-progress-${runnerId}`; + apiOn(eventName, handleProgress); + return () => { + apiOff(eventName, handleProgress); + }; + } + return () => {}; + }); + + $progressEffect; @@ -237,6 +262,7 @@ diff --git a/packages/web/src/utility/exportFileTools.ts b/packages/web/src/utility/exportFileTools.ts index c973eac48..e80fed076 100644 --- a/packages/web/src/utility/exportFileTools.ts +++ b/packages/web/src/utility/exportFileTools.ts @@ -1,6 +1,12 @@ import { ScriptWriter, ScriptWriterJson } from 'dbgate-tools'; import getElectron from './getElectron'; -import { showSnackbar, showSnackbarInfo, showSnackbarError, closeSnackbar } from '../utility/snackbar'; +import { + showSnackbar, + showSnackbarInfo, + showSnackbarError, + closeSnackbar, + updateSnackbarProgressMessage, +} from '../utility/snackbar'; import resolveApi, { resolveApiHeaders } from './resolveApi'; import { apiCall, apiOff, apiOn } from './api'; import { normalizeExportColumnMap } from '../impexp/createImpExpScript'; @@ -70,9 +76,17 @@ async function runImportExportScript({ script, runningMessage, canceledMessage, ], }); + function handleRunnerProgress(data) { + const rows = data.writtenRowsCount || data.readRowCount; + if (rows) { + updateSnackbarProgressMessage(snackId, `${rows} rows processed`); + } + } + function handleRunnerDone() { closeSnackbar(snackId); apiOff(`runner-done-${runid}`, handleRunnerDone); + apiOff(`runner-progress-${runid}`, handleRunnerProgress); if (isCanceled) { showSnackbarError(canceledMessage); } else { @@ -82,6 +96,7 @@ async function runImportExportScript({ script, runningMessage, canceledMessage, } apiOn(`runner-done-${runid}`, handleRunnerDone); + apiOn(`runner-progress-${runid}`, handleRunnerProgress); } export async function saveExportedFile(filters, defaultPath, extension, dataName, getScript: (filaPath: string) => {}) { @@ -141,7 +156,7 @@ function generateQuickExportScript( script.assignValue(colmapVar, colmap); } - script.copyStream(sourceVar, targetVar, colmapVar); + script.copyStream(sourceVar, targetVar, colmapVar, 'data'); script.endLine(); return script.getScript(); diff --git a/packages/web/src/utility/snackbar.ts b/packages/web/src/utility/snackbar.ts index 4214a109c..b7ee16eab 100644 --- a/packages/web/src/utility/snackbar.ts +++ b/packages/web/src/utility/snackbar.ts @@ -8,6 +8,7 @@ export interface SnackbarButton { export interface SnackbarInfo { message: string; + progressMessage?: string; icon?: string; autoClose?: boolean; allowClose?: boolean; @@ -59,6 +60,11 @@ export function showSnackbarError(message: string) { export function closeSnackbar(snackId: string) { openedSnackbars.update(x => x.filter(x => x.id != snackId)); } + +export function updateSnackbarProgressMessage(snackId: string, progressMessage: string) { + openedSnackbars.update(x => x.map(x => (x.id === snackId ? { ...x, progressMessage } : x))); +} + // showSnackbar({ // icon: 'img ok', // message: 'Test snackbar', diff --git a/packages/web/src/widgets/Snackbar.svelte b/packages/web/src/widgets/Snackbar.svelte index 61bc6f522..83a1fa768 100644 --- a/packages/web/src/widgets/Snackbar.svelte +++ b/packages/web/src/widgets/Snackbar.svelte @@ -10,6 +10,7 @@ export let autoClose = false; export let allowClose = false; export let buttons = []; + export let progressMessage = null; function handleClose() { openedSnackbars.update(x => x.filter(x => x.id != id)); @@ -25,6 +26,11 @@ {message} + {#if progressMessage} +
+ {progressMessage} +
+ {/if} {#if allowClose}
@@ -83,4 +89,10 @@ .button { margin: 5px; } + + .progress-message { + color: var(--theme-font-3); + margin: 10px; + margin-left: 30px; + } diff --git a/plugins/dbgate-plugin-csv/src/backend/reader.js b/plugins/dbgate-plugin-csv/src/backend/reader.js index 55ef181d9..e52065f20 100644 --- a/plugins/dbgate-plugin-csv/src/backend/reader.js +++ b/plugins/dbgate-plugin-csv/src/backend/reader.js @@ -95,9 +95,10 @@ async function reader({ fileName, encoding = 'utf-8', header = true, delimiter, }); const fileStream = fs.createReadStream(downloadedFile, encoding); const csvPrepare = new CsvPrepareStream({ header }); - fileStream.pipe(csvStream); - csvStream.pipe(csvPrepare); - return csvPrepare; + return [fileStream, csvStream, csvPrepare]; + // fileStream.pipe(csvStream); + // csvStream.pipe(csvPrepare); + // return csvPrepare; } reader.initialize = (dbgateEnv) => { diff --git a/plugins/dbgate-plugin-csv/src/backend/writer.js b/plugins/dbgate-plugin-csv/src/backend/writer.js index 7d62d1563..2297af0c4 100644 --- a/plugins/dbgate-plugin-csv/src/backend/writer.js +++ b/plugins/dbgate-plugin-csv/src/backend/writer.js @@ -31,11 +31,13 @@ async function writer({ fileName, encoding = 'utf-8', header = true, delimiter, const csvPrepare = new CsvPrepareStream({ header }); const csvStream = csv.stringify({ delimiter, quoted }); const fileStream = fs.createWriteStream(fileName, encoding); - csvPrepare.pipe(csvStream); - csvStream.pipe(fileStream); - csvPrepare['finisher'] = fileStream; + // csvPrepare.pipe(csvStream); + // csvStream.pipe(fileStream); + // csvPrepare['finisher'] = fileStream; csvPrepare.requireFixedStructure = true; - return csvPrepare; + + return [csvPrepare, csvStream, fileStream]; + // return csvPrepare; } module.exports = writer; diff --git a/plugins/dbgate-plugin-mongo/src/backend/driver.js b/plugins/dbgate-plugin-mongo/src/backend/driver.js index 23aca12b6..9c10ba8eb 100644 --- a/plugins/dbgate-plugin-mongo/src/backend/driver.js +++ b/plugins/dbgate-plugin-mongo/src/backend/driver.js @@ -266,6 +266,11 @@ const driver = { pass.write(transformMongoData(row)); }); + // propagate error + cursorStream.on('error', (err) => { + pass.emit('error', err); + }); + // Called once the cursor is fully read cursorStream.on('end', () => { pass.emit('end'); diff --git a/plugins/dbgate-plugin-xml/src/backend/reader.js b/plugins/dbgate-plugin-xml/src/backend/reader.js index e374ea068..18b09adb9 100644 --- a/plugins/dbgate-plugin-xml/src/backend/reader.js +++ b/plugins/dbgate-plugin-xml/src/backend/reader.js @@ -63,8 +63,10 @@ async function reader({ fileName, encoding = 'utf-8', itemElementName }) { const fileStream = fs.createReadStream(fileName, encoding); const parser = new ParseStream({ itemElementName }); - fileStream.pipe(parser); - return parser; + + return [fileStream, parser]; + // fileStream.pipe(parser); + // return parser; } module.exports = reader; diff --git a/plugins/dbgate-plugin-xml/src/backend/writer.js b/plugins/dbgate-plugin-xml/src/backend/writer.js index 66787883c..0474eab02 100644 --- a/plugins/dbgate-plugin-xml/src/backend/writer.js +++ b/plugins/dbgate-plugin-xml/src/backend/writer.js @@ -73,9 +73,10 @@ async function writer({ fileName, encoding = 'utf-8', itemElementName, rootEleme logger.info(`Writing file ${fileName}`); const stringify = new StringifyStream({ itemElementName, rootElementName }); const fileStream = fs.createWriteStream(fileName, encoding); - stringify.pipe(fileStream); - stringify['finisher'] = fileStream; - return stringify; + return [stringify, fileStream]; + // stringify.pipe(fileStream); + // stringify['finisher'] = fileStream; + // return stringify; } module.exports = writer;