diff --git a/packages/api/src/controllers/runners.js b/packages/api/src/controllers/runners.js index 778058f7a..86967cf90 100644 --- a/packages/api/src/controllers/runners.js +++ b/packages/api/src/controllers/runners.js @@ -107,8 +107,8 @@ module.exports = { } }, - handle_progress(runid, { progressName, status, errorMessage }) { - socket.emit(`runner-progress-${runid}`, { progressName, status, errorMessage }); + handle_progress(runid, progressData) { + socket.emit(`runner-progress-${runid}`, progressData); }, rejectRequest(runid, error) { diff --git a/packages/tools/src/createBulkInsertStreamBase.ts b/packages/tools/src/createBulkInsertStreamBase.ts index b54e69223..618d6785e 100644 --- a/packages/tools/src/createBulkInsertStreamBase.ts +++ b/packages/tools/src/createBulkInsertStreamBase.ts @@ -3,6 +3,7 @@ import _intersection from 'lodash/intersection'; import _fromPairs from 'lodash/fromPairs'; import { getLogger } from './getLogger'; import { prepareTableForImport } from './tableTransforms'; +import { RowProgressReporter } from './rowProgressReporter'; const logger = getLogger('bulkStreamBase'); @@ -21,6 +22,7 @@ export function createBulkInsertStreamBase(driver: EngineDriver, stream, dbhan, writable.columnNames = null; writable.columnDataTypes = null; writable.requireFixedStructure = driver.databaseEngineTypes.includes('sql'); + writable.rowsReporter = new RowProgressReporter(options.progressName); writable.addRow = async row => { if (writable.structure) { @@ -92,6 +94,7 @@ export function createBulkInsertStreamBase(driver: EngineDriver, stream, dbhan, // require('fs').writeFileSync('/home/jena/test.sql', dmp.s); // console.log(dmp.s); await driver.query(dbhan, dmp.s, { discardResult: true }); + writable.rowsReporter.add(rows.length); } else { for (const row of rows) { const dmp = driver.createDumper(); @@ -106,6 +109,7 @@ export function createBulkInsertStreamBase(driver: EngineDriver, stream, dbhan, dmp.putRaw(')'); // console.log(dmp.s); await driver.query(dbhan, dmp.s, { discardResult: true }); + writable.rowsReporter.add(1); } } if (options.commitAfterInsert) { @@ -129,6 +133,7 @@ export function createBulkInsertStreamBase(driver: EngineDriver, stream, dbhan, writable._final = async callback => { await writable.send(); + writable.rowsReporter.finish(); callback(); }; diff --git a/packages/tools/src/rowProgressReporter.ts b/packages/tools/src/rowProgressReporter.ts new file mode 100644 index 000000000..096ce2f3c --- /dev/null +++ b/packages/tools/src/rowProgressReporter.ts @@ -0,0 +1,45 @@ +export class RowProgressReporter { + counter = 0; + timeoutHandle = null; + + constructor(public progressName, public field = 'writtenRowCount') {} + + add(count: number) { + this.counter += count; + if (!this.progressName) { + return; + } + + if (this.timeoutHandle) { + return; + } + this.timeoutHandle = setTimeout(() => { + this.timeoutHandle = null; + this.send(); + }, 1000); + } + + finish() { + if (!this.progressName) { + return; + } + + if (this.timeoutHandle) { + clearTimeout(this.timeoutHandle); + this.timeoutHandle = null; + } + this.send(); + } + + send() { + if (!this.progressName) { + return; + } + + process.send({ + msgtype: 'progress', + progressName: this.progressName, + [this.field]: this.counter, + }); + } +} diff --git a/packages/types/engines.d.ts b/packages/types/engines.d.ts index b34b66746..048aa2f98 100644 --- a/packages/types/engines.d.ts +++ b/packages/types/engines.d.ts @@ -41,6 +41,7 @@ export interface WriteTableOptions { createIfNotExists?: boolean; commitAfterInsert?: boolean; targetTableStructure?: TableInfo; + progressName?: string; } export interface EngineAuthType { diff --git a/packages/web/src/impexp/ImportExportConfigurator.svelte b/packages/web/src/impexp/ImportExportConfigurator.svelte index 9309e98a8..5eb49fdd1 100644 --- a/packages/web/src/impexp/ImportExportConfigurator.svelte +++ b/packages/web/src/impexp/ImportExportConfigurator.svelte @@ -305,7 +305,12 @@ {#if progressHolder[row]?.status == 'running'} - Running + + {#if progressHolder[row]?.writtenRowCount} + {progressHolder[row]?.writtenRowCount} rows written + {:else} + Running + {/if} {:else if progressHolder[row]?.status == 'error'} Error {#if progressHolder[row]?.errorMessage} @@ -317,7 +322,12 @@ /> {/if} {:else if progressHolder[row]?.status == 'done'} - Done + + {#if progressHolder[row]?.writtenRowCount} + {progressHolder[row]?.writtenRowCount} rows written + {:else} + Done + {/if} {:else} Queued {/if} diff --git a/packages/web/src/impexp/createImpExpScript.ts b/packages/web/src/impexp/createImpExpScript.ts index c9961fb7b..6218f2ec0 100644 --- a/packages/web/src/impexp/createImpExpScript.ts +++ b/packages/web/src/impexp/createImpExpScript.ts @@ -164,6 +164,7 @@ function getTargetExpr(extensions, sourceName, values, targetConnection, targetD pureName: getTargetName(extensions, sourceName, values), ...extractDriverApiParameters(values, 'target', targetDriver), ...getFlagsFroAction(values[`actionType_${sourceName}`]), + progressName: sourceName, }, ]; }