report wwritten rows

This commit is contained in:
SPRINX0\prochazka
2025-03-04 15:08:24 +01:00
parent 3bf22a8606
commit bffc34485a
6 changed files with 66 additions and 4 deletions

View File

@@ -107,8 +107,8 @@ module.exports = {
} }
}, },
handle_progress(runid, { progressName, status, errorMessage }) { handle_progress(runid, progressData) {
socket.emit(`runner-progress-${runid}`, { progressName, status, errorMessage }); socket.emit(`runner-progress-${runid}`, progressData);
}, },
rejectRequest(runid, error) { rejectRequest(runid, error) {

View File

@@ -3,6 +3,7 @@ import _intersection from 'lodash/intersection';
import _fromPairs from 'lodash/fromPairs'; import _fromPairs from 'lodash/fromPairs';
import { getLogger } from './getLogger'; import { getLogger } from './getLogger';
import { prepareTableForImport } from './tableTransforms'; import { prepareTableForImport } from './tableTransforms';
import { RowProgressReporter } from './rowProgressReporter';
const logger = getLogger('bulkStreamBase'); const logger = getLogger('bulkStreamBase');
@@ -21,6 +22,7 @@ export function createBulkInsertStreamBase(driver: EngineDriver, stream, dbhan,
writable.columnNames = null; writable.columnNames = null;
writable.columnDataTypes = null; writable.columnDataTypes = null;
writable.requireFixedStructure = driver.databaseEngineTypes.includes('sql'); writable.requireFixedStructure = driver.databaseEngineTypes.includes('sql');
writable.rowsReporter = new RowProgressReporter(options.progressName);
writable.addRow = async row => { writable.addRow = async row => {
if (writable.structure) { if (writable.structure) {
@@ -92,6 +94,7 @@ export function createBulkInsertStreamBase(driver: EngineDriver, stream, dbhan,
// require('fs').writeFileSync('/home/jena/test.sql', dmp.s); // require('fs').writeFileSync('/home/jena/test.sql', dmp.s);
// console.log(dmp.s); // console.log(dmp.s);
await driver.query(dbhan, dmp.s, { discardResult: true }); await driver.query(dbhan, dmp.s, { discardResult: true });
writable.rowsReporter.add(rows.length);
} else { } else {
for (const row of rows) { for (const row of rows) {
const dmp = driver.createDumper(); const dmp = driver.createDumper();
@@ -106,6 +109,7 @@ export function createBulkInsertStreamBase(driver: EngineDriver, stream, dbhan,
dmp.putRaw(')'); dmp.putRaw(')');
// console.log(dmp.s); // console.log(dmp.s);
await driver.query(dbhan, dmp.s, { discardResult: true }); await driver.query(dbhan, dmp.s, { discardResult: true });
writable.rowsReporter.add(1);
} }
} }
if (options.commitAfterInsert) { if (options.commitAfterInsert) {
@@ -129,6 +133,7 @@ export function createBulkInsertStreamBase(driver: EngineDriver, stream, dbhan,
writable._final = async callback => { writable._final = async callback => {
await writable.send(); await writable.send();
writable.rowsReporter.finish();
callback(); callback();
}; };

View File

@@ -0,0 +1,45 @@
export class RowProgressReporter {
counter = 0;
timeoutHandle = null;
constructor(public progressName, public field = 'writtenRowCount') {}
add(count: number) {
this.counter += count;
if (!this.progressName) {
return;
}
if (this.timeoutHandle) {
return;
}
this.timeoutHandle = setTimeout(() => {
this.timeoutHandle = null;
this.send();
}, 1000);
}
finish() {
if (!this.progressName) {
return;
}
if (this.timeoutHandle) {
clearTimeout(this.timeoutHandle);
this.timeoutHandle = null;
}
this.send();
}
send() {
if (!this.progressName) {
return;
}
process.send({
msgtype: 'progress',
progressName: this.progressName,
[this.field]: this.counter,
});
}
}

View File

@@ -41,6 +41,7 @@ export interface WriteTableOptions {
createIfNotExists?: boolean; createIfNotExists?: boolean;
commitAfterInsert?: boolean; commitAfterInsert?: boolean;
targetTableStructure?: TableInfo; targetTableStructure?: TableInfo;
progressName?: string;
} }
export interface EngineAuthType { export interface EngineAuthType {

View File

@@ -305,7 +305,12 @@
</svelte:fragment> </svelte:fragment>
<svelte:fragment slot="3" let:row> <svelte:fragment slot="3" let:row>
{#if progressHolder[row]?.status == 'running'} {#if progressHolder[row]?.status == 'running'}
<FontIcon icon="icon loading" /> Running <FontIcon icon="icon loading" />
{#if progressHolder[row]?.writtenRowCount}
{progressHolder[row]?.writtenRowCount} rows written
{:else}
Running
{/if}
{:else if progressHolder[row]?.status == 'error'} {:else if progressHolder[row]?.status == 'error'}
<FontIcon icon="img error" /> Error <FontIcon icon="img error" /> Error
{#if progressHolder[row]?.errorMessage} {#if progressHolder[row]?.errorMessage}
@@ -317,7 +322,12 @@
/> />
{/if} {/if}
{:else if progressHolder[row]?.status == 'done'} {:else if progressHolder[row]?.status == 'done'}
<FontIcon icon="img ok" /> Done <FontIcon icon="img ok" />
{#if progressHolder[row]?.writtenRowCount}
{progressHolder[row]?.writtenRowCount} rows written
{:else}
Done
{/if}
{:else} {:else}
<FontIcon icon="icon wait" /> Queued <FontIcon icon="icon wait" /> Queued
{/if} {/if}

View File

@@ -164,6 +164,7 @@ function getTargetExpr(extensions, sourceName, values, targetConnection, targetD
pureName: getTargetName(extensions, sourceName, values), pureName: getTargetName(extensions, sourceName, values),
...extractDriverApiParameters(values, 'target', targetDriver), ...extractDriverApiParameters(values, 'target', targetDriver),
...getFlagsFroAction(values[`actionType_${sourceName}`]), ...getFlagsFroAction(values[`actionType_${sourceName}`]),
progressName: sourceName,
}, },
]; ];
} }