mirror of
https://github.com/DeNNiiInc/dbgate.git
synced 2026-05-03 12:53:58 +00:00
duplicator options
This commit is contained in:
@@ -9,7 +9,7 @@ const copyStream = require('./copyStream');
|
|||||||
const jsonLinesReader = require('./jsonLinesReader');
|
const jsonLinesReader = require('./jsonLinesReader');
|
||||||
const { resolveArchiveFolder } = require('../utility/directories');
|
const { resolveArchiveFolder } = require('../utility/directories');
|
||||||
|
|
||||||
async function dataDuplicator({ connection, archive, items, analysedStructure = null }) {
|
async function dataDuplicator({ connection, archive, items, options, analysedStructure = null }) {
|
||||||
const driver = requireEngineDriver(connection);
|
const driver = requireEngineDriver(connection);
|
||||||
const pool = await connectUtility(driver, connection, 'write');
|
const pool = await connectUtility(driver, connection, 'write');
|
||||||
logger.info(`Connected.`);
|
logger.info(`Connected.`);
|
||||||
@@ -29,7 +29,8 @@ async function dataDuplicator({ connection, archive, items, analysedStructure =
|
|||||||
openStream: () => jsonLinesReader({ fileName: path.join(resolveArchiveFolder(archive), `${item.name}.jsonl`) }),
|
openStream: () => jsonLinesReader({ fileName: path.join(resolveArchiveFolder(archive), `${item.name}.jsonl`) }),
|
||||||
})),
|
})),
|
||||||
stream,
|
stream,
|
||||||
copyStream
|
copyStream,
|
||||||
|
options
|
||||||
);
|
);
|
||||||
|
|
||||||
await dupl.run();
|
await dupl.run();
|
||||||
|
|||||||
@@ -12,6 +12,11 @@ export interface DataDuplicatorItem {
|
|||||||
matchColumns: string[];
|
matchColumns: string[];
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export interface DataDuplicatorOptions {
|
||||||
|
rollbackAfterFinish: boolean;
|
||||||
|
skipRowsWithUnresolvedRefs: boolean;
|
||||||
|
}
|
||||||
|
|
||||||
class DuplicatorReference {
|
class DuplicatorReference {
|
||||||
constructor(
|
constructor(
|
||||||
public base: DuplicatorItemHolder,
|
public base: DuplicatorItemHolder,
|
||||||
@@ -78,6 +83,13 @@ class DuplicatorItemHolder {
|
|||||||
if (ref) {
|
if (ref) {
|
||||||
// remap id
|
// remap id
|
||||||
res[key] = ref.ref.idMap[res[key]];
|
res[key] = ref.ref.idMap[res[key]];
|
||||||
|
if (ref.isMandatory && res[key] == null) {
|
||||||
|
// mandatory refertence not matched
|
||||||
|
if (this.duplicator.options.skipRowsWithUnresolvedRefs) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
throw new Error(`Unresolved reference, base=${ref.base.name}, ref=${ref.ref.name}, ${key}=${chunk[key]}`);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -91,6 +103,7 @@ class DuplicatorItemHolder {
|
|||||||
let inserted = 0;
|
let inserted = 0;
|
||||||
let mapped = 0;
|
let mapped = 0;
|
||||||
let missing = 0;
|
let missing = 0;
|
||||||
|
let skipped = 0;
|
||||||
let lastLogged = new Date();
|
let lastLogged = new Date();
|
||||||
|
|
||||||
const writeStream = createAsyncWriteStream(this.duplicator.stream, {
|
const writeStream = createAsyncWriteStream(this.duplicator.stream, {
|
||||||
@@ -101,6 +114,10 @@ class DuplicatorItemHolder {
|
|||||||
|
|
||||||
const doCopy = async () => {
|
const doCopy = async () => {
|
||||||
const insertedObj = this.createInsertObject(chunk);
|
const insertedObj = this.createInsertObject(chunk);
|
||||||
|
if (insertedObj == null) {
|
||||||
|
skipped += 1;
|
||||||
|
return;
|
||||||
|
}
|
||||||
await runCommandOnDriver(pool, driver, dmp =>
|
await runCommandOnDriver(pool, driver, dmp =>
|
||||||
dmp.putCmd(
|
dmp.putCmd(
|
||||||
'^insert ^into %f (%,i) ^values (%,v)',
|
'^insert ^into %f (%,i) ^values (%,v)',
|
||||||
@@ -150,7 +167,7 @@ class DuplicatorItemHolder {
|
|||||||
|
|
||||||
if (new Date().getTime() - lastLogged.getTime() > 5000) {
|
if (new Date().getTime() - lastLogged.getTime() > 5000) {
|
||||||
logger.info(
|
logger.info(
|
||||||
`Duplicating ${this.item.name} in progress, inserted ${inserted} rows, mapped ${mapped} rows, missing ${missing} rows`
|
`Duplicating ${this.item.name} in progress, inserted ${inserted} rows, mapped ${mapped} rows, missing ${missing} rows, skipped ${skipped} rows`
|
||||||
);
|
);
|
||||||
lastLogged = new Date();
|
lastLogged = new Date();
|
||||||
}
|
}
|
||||||
@@ -166,7 +183,7 @@ class DuplicatorItemHolder {
|
|||||||
// },
|
// },
|
||||||
// });
|
// });
|
||||||
|
|
||||||
return { inserted, mapped, missing };
|
return { inserted, mapped, missing, skipped };
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -180,7 +197,8 @@ export class DataDuplicator {
|
|||||||
public db: DatabaseInfo,
|
public db: DatabaseInfo,
|
||||||
public items: DataDuplicatorItem[],
|
public items: DataDuplicatorItem[],
|
||||||
public stream,
|
public stream,
|
||||||
public copyStream: (input, output) => Promise<void>
|
public copyStream: (input, output) => Promise<void>,
|
||||||
|
public options: DataDuplicatorOptions
|
||||||
) {
|
) {
|
||||||
this.itemHolders = items.map(x => new DuplicatorItemHolder(x, this));
|
this.itemHolders = items.map(x => new DuplicatorItemHolder(x, this));
|
||||||
this.itemHolders.forEach(x => x.initializeReferences());
|
this.itemHolders.forEach(x => x.initializeReferences());
|
||||||
@@ -220,13 +238,19 @@ export class DataDuplicator {
|
|||||||
for (const item of this.itemPlan) {
|
for (const item of this.itemPlan) {
|
||||||
const stats = await item.runImport();
|
const stats = await item.runImport();
|
||||||
logger.info(
|
logger.info(
|
||||||
`Duplicated ${item.name}, inserted ${stats.inserted} rows, mapped ${stats.mapped} rows, missing ${stats.missing} rows`
|
`Duplicated ${item.name}, inserted ${stats.inserted} rows, mapped ${stats.mapped} rows, missing ${stats.missing} rows, skipped ${stats.skipped} rows`
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
logger.error({ err }, 'Failed duplicator job, rollbacking');
|
logger.error({ err }, 'Failed duplicator job, rollbacking');
|
||||||
await runCommandOnDriver(this.pool, this.driver, dmp => dmp.rollbackTransaction());
|
await runCommandOnDriver(this.pool, this.driver, dmp => dmp.rollbackTransaction());
|
||||||
}
|
}
|
||||||
|
if (this.options.rollbackAfterFinish) {
|
||||||
|
logger.info('Rollbacking transaction, nothing was changed');
|
||||||
|
await runCommandOnDriver(this.pool, this.driver, dmp => dmp.rollbackTransaction());
|
||||||
|
} else {
|
||||||
|
logger.info('Committing duplicator transaction');
|
||||||
await runCommandOnDriver(this.pool, this.driver, dmp => dmp.commitTransaction());
|
await runCommandOnDriver(this.pool, this.driver, dmp => dmp.commitTransaction());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -37,6 +37,7 @@
|
|||||||
import TableControl from '../elements/TableControl.svelte';
|
import TableControl from '../elements/TableControl.svelte';
|
||||||
import VerticalSplitter from '../elements/VerticalSplitter.svelte';
|
import VerticalSplitter from '../elements/VerticalSplitter.svelte';
|
||||||
import CheckboxField from '../forms/CheckboxField.svelte';
|
import CheckboxField from '../forms/CheckboxField.svelte';
|
||||||
|
import FormFieldTemplateLarge from '../forms/FormFieldTemplateLarge.svelte';
|
||||||
import SelectField from '../forms/SelectField.svelte';
|
import SelectField from '../forms/SelectField.svelte';
|
||||||
import { extractShellConnection } from '../impexp/createImpExpScript';
|
import { extractShellConnection } from '../impexp/createImpExpScript';
|
||||||
import SocketMessageView from '../query/SocketMessageView.svelte';
|
import SocketMessageView from '../query/SocketMessageView.svelte';
|
||||||
@@ -116,6 +117,10 @@
|
|||||||
operation: row.operation,
|
operation: row.operation,
|
||||||
matchColumns: _.compact([row.matchColumn1]),
|
matchColumns: _.compact([row.matchColumn1]),
|
||||||
})),
|
})),
|
||||||
|
options: {
|
||||||
|
rollbackAfterFinish: !!$editorState.value?.rollbackAfterFinish,
|
||||||
|
skipRowsWithUnresolvedRefs: !!$editorState.value?.skipRowsWithUnresolvedRefs,
|
||||||
|
},
|
||||||
});
|
});
|
||||||
return script.getScript();
|
return script.getScript();
|
||||||
}
|
}
|
||||||
@@ -211,7 +216,7 @@
|
|||||||
<svelte:fragment slot="1">
|
<svelte:fragment slot="1">
|
||||||
<div class="wrapper">
|
<div class="wrapper">
|
||||||
<ObjectConfigurationControl title="Configuration">
|
<ObjectConfigurationControl title="Configuration">
|
||||||
<div class="bold m-2">Source archive</div>
|
<FormFieldTemplateLarge label="Source archive" type="combo">
|
||||||
<SelectField
|
<SelectField
|
||||||
isNative
|
isNative
|
||||||
value={$editorState.value?.archiveFolder}
|
value={$editorState.value?.archiveFolder}
|
||||||
@@ -226,6 +231,53 @@
|
|||||||
value: x.name,
|
value: x.name,
|
||||||
})) || []}
|
})) || []}
|
||||||
/>
|
/>
|
||||||
|
</FormFieldTemplateLarge>
|
||||||
|
|
||||||
|
<FormFieldTemplateLarge
|
||||||
|
label="Dry run - no changes (rollback when finished)"
|
||||||
|
type="checkbox"
|
||||||
|
labelProps={{
|
||||||
|
onClick: () => {
|
||||||
|
setEditorData(old => ({
|
||||||
|
...old,
|
||||||
|
rollbackAfterFinish: !$editorState.value?.rollbackAfterFinish,
|
||||||
|
}));
|
||||||
|
},
|
||||||
|
}}
|
||||||
|
>
|
||||||
|
<CheckboxField
|
||||||
|
checked={$editorState.value?.rollbackAfterFinish}
|
||||||
|
on:change={e => {
|
||||||
|
setEditorData(old => ({
|
||||||
|
...old,
|
||||||
|
rollbackAfterFinish: e.target.checked,
|
||||||
|
}));
|
||||||
|
}}
|
||||||
|
/>
|
||||||
|
</FormFieldTemplateLarge>
|
||||||
|
|
||||||
|
<FormFieldTemplateLarge
|
||||||
|
label="Skip rows with unresolved mandatory references"
|
||||||
|
type="checkbox"
|
||||||
|
labelProps={{
|
||||||
|
onClick: () => {
|
||||||
|
setEditorData(old => ({
|
||||||
|
...old,
|
||||||
|
skipRowsWithUnresolvedRefs: !$editorState.value?.skipRowsWithUnresolvedRefs,
|
||||||
|
}));
|
||||||
|
},
|
||||||
|
}}
|
||||||
|
>
|
||||||
|
<CheckboxField
|
||||||
|
checked={$editorState.value?.skipRowsWithUnresolvedRefs}
|
||||||
|
on:change={e => {
|
||||||
|
setEditorData(old => ({
|
||||||
|
...old,
|
||||||
|
skipRowsWithUnresolvedRefs: e.target.checked,
|
||||||
|
}));
|
||||||
|
}}
|
||||||
|
/>
|
||||||
|
</FormFieldTemplateLarge>
|
||||||
</ObjectConfigurationControl>
|
</ObjectConfigurationControl>
|
||||||
|
|
||||||
<ObjectConfigurationControl title="Imported files">
|
<ObjectConfigurationControl title="Imported files">
|
||||||
|
|||||||
@@ -6,7 +6,14 @@
|
|||||||
|
|
||||||
import FontIcon from '../icons/FontIcon.svelte';
|
import FontIcon from '../icons/FontIcon.svelte';
|
||||||
|
|
||||||
import { activeTabId, currentDatabase, currentThemeDefinition, visibleCommandPalette } from '../stores';
|
import {
|
||||||
|
activeTabId,
|
||||||
|
currentArchive,
|
||||||
|
currentDatabase,
|
||||||
|
currentThemeDefinition,
|
||||||
|
selectedWidget,
|
||||||
|
visibleCommandPalette,
|
||||||
|
} from '../stores';
|
||||||
import getConnectionLabel from '../utility/getConnectionLabel';
|
import getConnectionLabel from '../utility/getConnectionLabel';
|
||||||
import { useConnectionList, useDatabaseServerVersion, useDatabaseStatus } from '../utility/metadataLoaders';
|
import { useConnectionList, useDatabaseServerVersion, useDatabaseStatus } from '../utility/metadataLoaders';
|
||||||
import { findCommand } from '../commands/runCommand';
|
import { findCommand } from '../commands/runCommand';
|
||||||
@@ -140,6 +147,18 @@
|
|||||||
</div>
|
</div>
|
||||||
</div>
|
</div>
|
||||||
{/if}
|
{/if}
|
||||||
|
{#if $currentArchive}
|
||||||
|
<div
|
||||||
|
class="item flex clickable"
|
||||||
|
title="Current archive"
|
||||||
|
on:click={() => {
|
||||||
|
$selectedWidget = 'archive';
|
||||||
|
}}
|
||||||
|
>
|
||||||
|
<FontIcon icon="icon archive" />
|
||||||
|
{$currentArchive}
|
||||||
|
</div>
|
||||||
|
{/if}
|
||||||
</div>
|
</div>
|
||||||
<div class="container">
|
<div class="container">
|
||||||
{#each contextItems || [] as item}
|
{#each contextItems || [] as item}
|
||||||
|
|||||||
Reference in New Issue
Block a user