mirror of
https://github.com/DeNNiiInc/dbgate.git
synced 2026-04-30 12:43:58 +00:00
Merge branch 'develop'
This commit is contained in:
38
packages/api/src/shell/dataDuplicator.js
Normal file
38
packages/api/src/shell/dataDuplicator.js
Normal file
@@ -0,0 +1,38 @@
|
|||||||
|
const stream = require('stream');
|
||||||
|
const path = require('path');
|
||||||
|
const { quoteFullName, fullNameToString, getLogger } = require('dbgate-tools');
|
||||||
|
const requireEngineDriver = require('../utility/requireEngineDriver');
|
||||||
|
const connectUtility = require('../utility/connectUtility');
|
||||||
|
const logger = getLogger('dataDuplicator');
|
||||||
|
const { DataDuplicator } = require('dbgate-datalib');
|
||||||
|
const copyStream = require('./copyStream');
|
||||||
|
const jsonLinesReader = require('./jsonLinesReader');
|
||||||
|
const { resolveArchiveFolder } = require('../utility/directories');
|
||||||
|
|
||||||
|
async function dataDuplicator({ connection, archive, items, analysedStructure = null }) {
|
||||||
|
const driver = requireEngineDriver(connection);
|
||||||
|
const pool = await connectUtility(driver, connection, 'write');
|
||||||
|
logger.info(`Connected.`);
|
||||||
|
|
||||||
|
if (!analysedStructure) {
|
||||||
|
analysedStructure = await driver.analyseFull(pool);
|
||||||
|
}
|
||||||
|
|
||||||
|
const dupl = new DataDuplicator(
|
||||||
|
pool,
|
||||||
|
driver,
|
||||||
|
analysedStructure,
|
||||||
|
items.map(item => ({
|
||||||
|
name: item.name,
|
||||||
|
operation: item.operation,
|
||||||
|
matchColumns: item.matchColumns,
|
||||||
|
openStream: () => jsonLinesReader({ fileName: path.join(resolveArchiveFolder(archive), `${item.name}.jsonl`) }),
|
||||||
|
})),
|
||||||
|
stream,
|
||||||
|
copyStream
|
||||||
|
);
|
||||||
|
|
||||||
|
await dupl.run();
|
||||||
|
}
|
||||||
|
|
||||||
|
module.exports = dataDuplicator;
|
||||||
@@ -26,6 +26,7 @@ const importDatabase = require('./importDatabase');
|
|||||||
const loadDatabase = require('./loadDatabase');
|
const loadDatabase = require('./loadDatabase');
|
||||||
const generateModelSql = require('./generateModelSql');
|
const generateModelSql = require('./generateModelSql');
|
||||||
const modifyJsonLinesReader = require('./modifyJsonLinesReader');
|
const modifyJsonLinesReader = require('./modifyJsonLinesReader');
|
||||||
|
const dataDuplicator = require('./dataDuplicator');
|
||||||
|
|
||||||
const dbgateApi = {
|
const dbgateApi = {
|
||||||
queryReader,
|
queryReader,
|
||||||
@@ -55,6 +56,7 @@ const dbgateApi = {
|
|||||||
loadDatabase,
|
loadDatabase,
|
||||||
generateModelSql,
|
generateModelSql,
|
||||||
modifyJsonLinesReader,
|
modifyJsonLinesReader,
|
||||||
|
dataDuplicator,
|
||||||
};
|
};
|
||||||
|
|
||||||
requirePlugin.initializeDbgateApi(dbgateApi);
|
requirePlugin.initializeDbgateApi(dbgateApi);
|
||||||
|
|||||||
@@ -35,7 +35,11 @@ class ParseStream extends stream.Transform {
|
|||||||
async function jsonLinesReader({ fileName, encoding = 'utf-8', limitRows = undefined }) {
|
async function jsonLinesReader({ fileName, encoding = 'utf-8', limitRows = undefined }) {
|
||||||
logger.info(`Reading file ${fileName}`);
|
logger.info(`Reading file ${fileName}`);
|
||||||
|
|
||||||
const fileStream = fs.createReadStream(fileName, encoding);
|
const fileStream = fs.createReadStream(
|
||||||
|
fileName,
|
||||||
|
// @ts-ignore
|
||||||
|
encoding
|
||||||
|
);
|
||||||
const liner = byline(fileStream);
|
const liner = byline(fileStream);
|
||||||
const parser = new ParseStream({ limitRows });
|
const parser = new ParseStream({ limitRows });
|
||||||
liner.pipe(parser);
|
liner.pipe(parser);
|
||||||
|
|||||||
@@ -107,7 +107,11 @@ async function modifyJsonLinesReader({
|
|||||||
}) {
|
}) {
|
||||||
logger.info(`Reading file ${fileName} with change set`);
|
logger.info(`Reading file ${fileName} with change set`);
|
||||||
|
|
||||||
const fileStream = fs.createReadStream(fileName, encoding);
|
const fileStream = fs.createReadStream(
|
||||||
|
fileName,
|
||||||
|
// @ts-ignore
|
||||||
|
encoding
|
||||||
|
);
|
||||||
const liner = byline(fileStream);
|
const liner = byline(fileStream);
|
||||||
const parser = new ParseStream({ limitRows, changeSet, mergedRows, mergeKey, mergeMode });
|
const parser = new ParseStream({ limitRows, changeSet, mergedRows, mergeKey, mergeMode });
|
||||||
liner.pipe(parser);
|
liner.pipe(parser);
|
||||||
|
|||||||
224
packages/datalib/src/DataDuplicator.ts
Normal file
224
packages/datalib/src/DataDuplicator.ts
Normal file
@@ -0,0 +1,224 @@
|
|||||||
|
import { createAsyncWriteStream, getLogger, runCommandOnDriver, runQueryOnDriver } from 'dbgate-tools';
|
||||||
|
import { DatabaseInfo, EngineDriver, ForeignKeyInfo, TableInfo } from 'dbgate-types';
|
||||||
|
import _pick from 'lodash/pick';
|
||||||
|
import _omit from 'lodash/omit';
|
||||||
|
|
||||||
|
const logger = getLogger('dataDuplicator');
|
||||||
|
|
||||||
|
export interface DataDuplicatorItem {
|
||||||
|
openStream: () => Promise<ReadableStream>;
|
||||||
|
name: string;
|
||||||
|
operation: 'copy' | 'lookup' | 'insertMissing';
|
||||||
|
matchColumns: string[];
|
||||||
|
}
|
||||||
|
|
||||||
|
class DuplicatorReference {
|
||||||
|
constructor(
|
||||||
|
public base: DuplicatorItemHolder,
|
||||||
|
public ref: DuplicatorItemHolder,
|
||||||
|
public isMandatory: boolean,
|
||||||
|
public foreignKey: ForeignKeyInfo
|
||||||
|
) {}
|
||||||
|
|
||||||
|
get columnName() {
|
||||||
|
return this.foreignKey.columns[0].columnName;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
class DuplicatorItemHolder {
|
||||||
|
references: DuplicatorReference[] = [];
|
||||||
|
backReferences: DuplicatorReference[] = [];
|
||||||
|
table: TableInfo;
|
||||||
|
isPlanned = false;
|
||||||
|
idMap = {};
|
||||||
|
autoColumn: string;
|
||||||
|
refByColumn: { [columnName: string]: DuplicatorReference } = {};
|
||||||
|
isReferenced: boolean;
|
||||||
|
|
||||||
|
get name() {
|
||||||
|
return this.item.name;
|
||||||
|
}
|
||||||
|
|
||||||
|
constructor(public item: DataDuplicatorItem, public duplicator: DataDuplicator) {
|
||||||
|
this.table = duplicator.db.tables.find(x => x.pureName.toUpperCase() == item.name.toUpperCase());
|
||||||
|
this.autoColumn = this.table.columns.find(x => x.autoIncrement)?.columnName;
|
||||||
|
if (
|
||||||
|
this.table.primaryKey?.columns?.length != 1 ||
|
||||||
|
this.table.primaryKey?.columns?.[0].columnName != this.autoColumn
|
||||||
|
) {
|
||||||
|
this.autoColumn = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
initializeReferences() {
|
||||||
|
for (const fk of this.table.foreignKeys) {
|
||||||
|
if (fk.columns?.length != 1) continue;
|
||||||
|
const refHolder = this.duplicator.itemHolders.find(y => y.name.toUpperCase() == fk.refTableName.toUpperCase());
|
||||||
|
if (refHolder == null) continue;
|
||||||
|
const isMandatory = this.table.columns.find(x => x.columnName == fk.columns[0]?.columnName)?.notNull;
|
||||||
|
const newref = new DuplicatorReference(this, refHolder, isMandatory, fk);
|
||||||
|
this.references.push(newref);
|
||||||
|
this.refByColumn[newref.columnName] = newref;
|
||||||
|
|
||||||
|
refHolder.isReferenced = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
createInsertObject(chunk) {
|
||||||
|
const res = _omit(
|
||||||
|
_pick(
|
||||||
|
chunk,
|
||||||
|
this.table.columns.map(x => x.columnName)
|
||||||
|
),
|
||||||
|
[this.autoColumn, ...this.backReferences.map(x => x.columnName)]
|
||||||
|
);
|
||||||
|
|
||||||
|
for (const key in res) {
|
||||||
|
const ref = this.refByColumn[key];
|
||||||
|
if (ref) {
|
||||||
|
// remap id
|
||||||
|
res[key] = ref.ref.idMap[res[key]];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
|
||||||
|
async runImport() {
|
||||||
|
const readStream = await this.item.openStream();
|
||||||
|
const driver = this.duplicator.driver;
|
||||||
|
const pool = this.duplicator.pool;
|
||||||
|
let inserted = 0;
|
||||||
|
let mapped = 0;
|
||||||
|
let missing = 0;
|
||||||
|
|
||||||
|
const writeStream = createAsyncWriteStream(this.duplicator.stream, {
|
||||||
|
processItem: async chunk => {
|
||||||
|
if (chunk.__isStreamHeader) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const doCopy = async () => {
|
||||||
|
const insertedObj = this.createInsertObject(chunk);
|
||||||
|
await runCommandOnDriver(pool, driver, dmp =>
|
||||||
|
dmp.putCmd(
|
||||||
|
'^insert ^into %f (%,i) ^values (%,v)',
|
||||||
|
this.table,
|
||||||
|
Object.keys(insertedObj),
|
||||||
|
Object.values(insertedObj)
|
||||||
|
)
|
||||||
|
);
|
||||||
|
inserted += 1;
|
||||||
|
if (this.autoColumn && this.isReferenced) {
|
||||||
|
const res = await runQueryOnDriver(pool, driver, dmp => dmp.selectScopeIdentity(this.table));
|
||||||
|
const resId = Object.entries(res?.rows?.[0])?.[0]?.[1];
|
||||||
|
if (resId != null) {
|
||||||
|
this.idMap[chunk[this.autoColumn]] = resId;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
switch (this.item.operation) {
|
||||||
|
case 'copy': {
|
||||||
|
await doCopy();
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case 'insertMissing':
|
||||||
|
case 'lookup': {
|
||||||
|
const res = await runQueryOnDriver(pool, driver, dmp =>
|
||||||
|
dmp.put(
|
||||||
|
'^select %i ^from %f ^where %i = %v',
|
||||||
|
this.autoColumn,
|
||||||
|
this.table,
|
||||||
|
this.item.matchColumns[0],
|
||||||
|
chunk[this.item.matchColumns[0]]
|
||||||
|
)
|
||||||
|
);
|
||||||
|
const resId = Object.entries(res?.rows?.[0])?.[0]?.[1];
|
||||||
|
if (resId != null) {
|
||||||
|
mapped += 1;
|
||||||
|
this.idMap[chunk[this.autoColumn]] = resId;
|
||||||
|
} else if (this.item.operation == 'insertMissing') {
|
||||||
|
await doCopy();
|
||||||
|
} else {
|
||||||
|
missing += 1;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// this.idMap[oldId] = newId;
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
await this.duplicator.copyStream(readStream, writeStream);
|
||||||
|
|
||||||
|
// await this.duplicator.driver.writeQueryStream(this.duplicator.pool, {
|
||||||
|
// mapResultId: (oldId, newId) => {
|
||||||
|
// this.idMap[oldId] = newId;
|
||||||
|
// },
|
||||||
|
// });
|
||||||
|
|
||||||
|
return { inserted, mapped, missing };
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export class DataDuplicator {
|
||||||
|
itemHolders: DuplicatorItemHolder[];
|
||||||
|
itemPlan: DuplicatorItemHolder[] = [];
|
||||||
|
|
||||||
|
constructor(
|
||||||
|
public pool: any,
|
||||||
|
public driver: EngineDriver,
|
||||||
|
public db: DatabaseInfo,
|
||||||
|
public items: DataDuplicatorItem[],
|
||||||
|
public stream,
|
||||||
|
public copyStream: (input, output) => Promise<void>
|
||||||
|
) {
|
||||||
|
this.itemHolders = items.map(x => new DuplicatorItemHolder(x, this));
|
||||||
|
this.itemHolders.forEach(x => x.initializeReferences());
|
||||||
|
}
|
||||||
|
|
||||||
|
findItemToPlan(): DuplicatorItemHolder {
|
||||||
|
for (const item of this.itemHolders) {
|
||||||
|
if (item.isPlanned) continue;
|
||||||
|
if (item.references.every(x => x.ref.isPlanned)) {
|
||||||
|
return item;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for (const item of this.itemHolders) {
|
||||||
|
if (item.isPlanned) continue;
|
||||||
|
if (item.references.every(x => x.ref.isPlanned || !x.isMandatory)) {
|
||||||
|
const backReferences = item.references.filter(x => !x.ref.isPlanned);
|
||||||
|
item.backReferences = backReferences;
|
||||||
|
return item;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
throw new Error('Cycle in mandatory references');
|
||||||
|
}
|
||||||
|
|
||||||
|
createPlan() {
|
||||||
|
while (this.itemPlan.length < this.itemHolders.length) {
|
||||||
|
const item = this.findItemToPlan();
|
||||||
|
item.isPlanned = true;
|
||||||
|
this.itemPlan.push(item);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async run() {
|
||||||
|
this.createPlan();
|
||||||
|
|
||||||
|
await runCommandOnDriver(this.pool, this.driver, dmp => dmp.beginTransaction());
|
||||||
|
try {
|
||||||
|
for (const item of this.itemPlan) {
|
||||||
|
const stats = await item.runImport();
|
||||||
|
logger.info(
|
||||||
|
`Duplicated ${item.name}, inserted ${stats.inserted} rows, mapped ${stats.mapped} rows, missing ${stats.missing} rows`
|
||||||
|
);
|
||||||
|
}
|
||||||
|
} catch (err) {
|
||||||
|
logger.error({ err }, 'Failed duplicator job, rollbacking');
|
||||||
|
await runCommandOnDriver(this.pool, this.driver, dmp => dmp.rollbackTransaction());
|
||||||
|
}
|
||||||
|
await runCommandOnDriver(this.pool, this.driver, dmp => dmp.commitTransaction());
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -55,7 +55,7 @@ function processDependencies(
|
|||||||
schemaName: fk.schemaName,
|
schemaName: fk.schemaName,
|
||||||
},
|
},
|
||||||
alias: 't0',
|
alias: 't0',
|
||||||
relations: subFkPath.map((fkItem, fkIndex) => ({
|
relations: [...subFkPath].reverse().map((fkItem, fkIndex) => ({
|
||||||
joinType: 'INNER JOIN',
|
joinType: 'INNER JOIN',
|
||||||
alias: `t${fkIndex + 1}`,
|
alias: `t${fkIndex + 1}`,
|
||||||
name: {
|
name: {
|
||||||
@@ -123,7 +123,16 @@ export function getDeleteCascades(changeSet: ChangeSet, dbinfo: DatabaseInfo): C
|
|||||||
const table = dbinfo.tables.find(x => x.pureName == baseCmd.pureName && x.schemaName == baseCmd.schemaName);
|
const table = dbinfo.tables.find(x => x.pureName == baseCmd.pureName && x.schemaName == baseCmd.schemaName);
|
||||||
if (!table.primaryKey) continue;
|
if (!table.primaryKey) continue;
|
||||||
|
|
||||||
processDependencies(changeSet, result, allForeignKeys, [], table, baseCmd, dbinfo, [table.pureName]);
|
const itemResult: ChangeSetDeleteCascade[] = [];
|
||||||
|
processDependencies(changeSet, itemResult, allForeignKeys, [], table, baseCmd, dbinfo, [table.pureName]);
|
||||||
|
for (const item of itemResult) {
|
||||||
|
const existing = result.find(x => x.title == item.title);
|
||||||
|
if (existing) {
|
||||||
|
existing.commands.push(...item.commands);
|
||||||
|
} else {
|
||||||
|
result.push(item);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// let resItem = result.find(x => x.title == baseCmd.pureName);
|
// let resItem = result.find(x => x.title == baseCmd.pureName);
|
||||||
// if (!resItem) {
|
// if (!resItem) {
|
||||||
|
|||||||
@@ -22,3 +22,4 @@ export * from './processPerspectiveDefaultColunns';
|
|||||||
export * from './PerspectiveDataPattern';
|
export * from './PerspectiveDataPattern';
|
||||||
export * from './PerspectiveDataLoader';
|
export * from './PerspectiveDataLoader';
|
||||||
export * from './perspectiveTools';
|
export * from './perspectiveTools';
|
||||||
|
export * from './DataDuplicator';
|
||||||
|
|||||||
@@ -57,6 +57,10 @@ export class ScriptWriter {
|
|||||||
this._put(`await dbgateApi.importDatabase(${JSON.stringify(options)});`);
|
this._put(`await dbgateApi.importDatabase(${JSON.stringify(options)});`);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
dataDuplicator(options) {
|
||||||
|
this._put(`await dbgateApi.dataDuplicator(${JSON.stringify(options)});`);
|
||||||
|
}
|
||||||
|
|
||||||
comment(s) {
|
comment(s) {
|
||||||
this._put(`// ${s}`);
|
this._put(`// ${s}`);
|
||||||
}
|
}
|
||||||
@@ -143,6 +147,13 @@ export class ScriptWriterJson {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
dataDuplicator(options) {
|
||||||
|
this.commands.push({
|
||||||
|
type: 'dataDuplicator',
|
||||||
|
options,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
getScript(schedule = null) {
|
getScript(schedule = null) {
|
||||||
return {
|
return {
|
||||||
type: 'json',
|
type: 'json',
|
||||||
@@ -186,6 +197,9 @@ export function jsonScriptToJavascript(json) {
|
|||||||
case 'importDatabase':
|
case 'importDatabase':
|
||||||
script.importDatabase(cmd.options);
|
script.importDatabase(cmd.options);
|
||||||
break;
|
break;
|
||||||
|
case 'dataDuplicator':
|
||||||
|
script.dataDuplicator(cmd.options);
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -197,6 +197,8 @@ export class SqlDumper implements AlterProcessor {
|
|||||||
|
|
||||||
specialColumnOptions(column) {}
|
specialColumnOptions(column) {}
|
||||||
|
|
||||||
|
selectScopeIdentity(table: TableInfo) {}
|
||||||
|
|
||||||
columnDefinition(column: ColumnInfo, { includeDefault = true, includeNullable = true, includeCollate = true } = {}) {
|
columnDefinition(column: ColumnInfo, { includeDefault = true, includeNullable = true, includeCollate = true } = {}) {
|
||||||
if (column.computedExpression) {
|
if (column.computedExpression) {
|
||||||
this.put('^as %s', column.computedExpression);
|
this.put('^as %s', column.computedExpression);
|
||||||
|
|||||||
41
packages/tools/src/createAsyncWriteStream.ts
Normal file
41
packages/tools/src/createAsyncWriteStream.ts
Normal file
@@ -0,0 +1,41 @@
|
|||||||
|
import _intersection from 'lodash/intersection';
|
||||||
|
import _isArray from 'lodash/isArray';
|
||||||
|
import { getLogger } from './getLogger';
|
||||||
|
|
||||||
|
const logger = getLogger('asyncWriteStream');
|
||||||
|
|
||||||
|
export interface AsyncWriteStreamOptions {
|
||||||
|
processItem: (chunk: any) => Promise<void>;
|
||||||
|
}
|
||||||
|
|
||||||
|
export function createAsyncWriteStream(stream, options: AsyncWriteStreamOptions): any {
|
||||||
|
const writable = new stream.Writable({
|
||||||
|
objectMode: true,
|
||||||
|
});
|
||||||
|
|
||||||
|
writable._write = async (chunk, encoding, callback) => {
|
||||||
|
await options.processItem(chunk);
|
||||||
|
|
||||||
|
// const { sql, id, newIdSql } = chunk;
|
||||||
|
// if (_isArray(sql)) {
|
||||||
|
// for (const item of sql) await driver.query(pool, item, { discardResult: true });
|
||||||
|
// } else {
|
||||||
|
// await driver.query(pool, sql, { discardResult: true });
|
||||||
|
// }
|
||||||
|
// if (newIdSql) {
|
||||||
|
// const res = await driver.query(pool, newIdSql);
|
||||||
|
// const resId = Object.entries(res?.rows?.[0])?.[0]?.[1];
|
||||||
|
|
||||||
|
// if (options?.mapResultId) {
|
||||||
|
// options?.mapResultId(id, resId as string);
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
callback();
|
||||||
|
};
|
||||||
|
|
||||||
|
// writable._final = async callback => {
|
||||||
|
// callback();
|
||||||
|
// };
|
||||||
|
|
||||||
|
return writable;
|
||||||
|
}
|
||||||
@@ -1,10 +1,11 @@
|
|||||||
|
import { EngineDriver, WriteTableOptions } from 'dbgate-types';
|
||||||
import _intersection from 'lodash/intersection';
|
import _intersection from 'lodash/intersection';
|
||||||
import { getLogger } from './getLogger';
|
import { getLogger } from './getLogger';
|
||||||
import { prepareTableForImport } from './tableTransforms';
|
import { prepareTableForImport } from './tableTransforms';
|
||||||
|
|
||||||
const logger = getLogger('bulkStreamBase');
|
const logger = getLogger('bulkStreamBase');
|
||||||
|
|
||||||
export function createBulkInsertStreamBase(driver, stream, pool, name, options): any {
|
export function createBulkInsertStreamBase(driver: EngineDriver, stream, pool, name, options: WriteTableOptions): any {
|
||||||
const fullNameQuoted = name.schemaName
|
const fullNameQuoted = name.schemaName
|
||||||
? `${driver.dialect.quoteIdentifier(name.schemaName)}.${driver.dialect.quoteIdentifier(name.pureName)}`
|
? `${driver.dialect.quoteIdentifier(name.schemaName)}.${driver.dialect.quoteIdentifier(name.pureName)}`
|
||||||
: driver.dialect.quoteIdentifier(name.pureName);
|
: driver.dialect.quoteIdentifier(name.pureName);
|
||||||
@@ -58,21 +59,21 @@ export function createBulkInsertStreamBase(driver, stream, pool, name, options):
|
|||||||
const dmp = driver.createDumper();
|
const dmp = driver.createDumper();
|
||||||
|
|
||||||
dmp.putRaw(`INSERT INTO ${fullNameQuoted} (`);
|
dmp.putRaw(`INSERT INTO ${fullNameQuoted} (`);
|
||||||
dmp.putCollection(',', writable.columnNames, col => dmp.putRaw(driver.dialect.quoteIdentifier(col)));
|
dmp.putCollection(',', writable.columnNames, col => dmp.putRaw(driver.dialect.quoteIdentifier(col as string)));
|
||||||
dmp.putRaw(')\n VALUES\n');
|
dmp.putRaw(')\n VALUES\n');
|
||||||
|
|
||||||
let wasRow = false;
|
let wasRow = false;
|
||||||
for (const row of rows) {
|
for (const row of rows) {
|
||||||
if (wasRow) dmp.putRaw(',\n');
|
if (wasRow) dmp.putRaw(',\n');
|
||||||
dmp.putRaw('(');
|
dmp.putRaw('(');
|
||||||
dmp.putCollection(',', writable.columnNames, col => dmp.putValue(row[col]));
|
dmp.putCollection(',', writable.columnNames, col => dmp.putValue(row[col as string]));
|
||||||
dmp.putRaw(')');
|
dmp.putRaw(')');
|
||||||
wasRow = true;
|
wasRow = true;
|
||||||
}
|
}
|
||||||
dmp.putRaw(';');
|
dmp.putRaw(';');
|
||||||
// require('fs').writeFileSync('/home/jena/test.sql', dmp.s);
|
// require('fs').writeFileSync('/home/jena/test.sql', dmp.s);
|
||||||
// console.log(dmp.s);
|
// console.log(dmp.s);
|
||||||
await driver.query(pool, dmp.s);
|
await driver.query(pool, dmp.s, { discardResult: true });
|
||||||
};
|
};
|
||||||
|
|
||||||
writable.sendIfFull = async () => {
|
writable.sendIfFull = async () => {
|
||||||
|
|||||||
@@ -2,7 +2,7 @@ import _compact from 'lodash/compact';
|
|||||||
import { SqlDumper } from './SqlDumper';
|
import { SqlDumper } from './SqlDumper';
|
||||||
import { splitQuery } from 'dbgate-query-splitter';
|
import { splitQuery } from 'dbgate-query-splitter';
|
||||||
import { dumpSqlSelect } from 'dbgate-sqltree';
|
import { dumpSqlSelect } from 'dbgate-sqltree';
|
||||||
import { EngineDriver, RunScriptOptions } from 'dbgate-types';
|
import { EngineDriver, QueryResult, RunScriptOptions } from 'dbgate-types';
|
||||||
|
|
||||||
const dialect = {
|
const dialect = {
|
||||||
limitSelect: true,
|
limitSelect: true,
|
||||||
@@ -20,12 +20,22 @@ const dialect = {
|
|||||||
defaultSchemaName: null,
|
defaultSchemaName: null,
|
||||||
};
|
};
|
||||||
|
|
||||||
export async function runCommandOnDriver(pool, driver: EngineDriver, cmd: (dmp: SqlDumper) => void) {
|
export async function runCommandOnDriver(pool, driver: EngineDriver, cmd: (dmp: SqlDumper) => void): Promise<void> {
|
||||||
const dmp = driver.createDumper();
|
const dmp = driver.createDumper();
|
||||||
cmd(dmp as any);
|
cmd(dmp as any);
|
||||||
await driver.query(pool, dmp.s, { discardResult: true });
|
await driver.query(pool, dmp.s, { discardResult: true });
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export async function runQueryOnDriver(
|
||||||
|
pool,
|
||||||
|
driver: EngineDriver,
|
||||||
|
cmd: (dmp: SqlDumper) => void
|
||||||
|
): Promise<QueryResult> {
|
||||||
|
const dmp = driver.createDumper();
|
||||||
|
cmd(dmp as any);
|
||||||
|
return await driver.query(pool, dmp.s);
|
||||||
|
}
|
||||||
|
|
||||||
export const driverBase = {
|
export const driverBase = {
|
||||||
analyserClass: null,
|
analyserClass: null,
|
||||||
dumperClass: SqlDumper,
|
dumperClass: SqlDumper,
|
||||||
|
|||||||
@@ -3,6 +3,7 @@ export * from './nameTools';
|
|||||||
export * from './tableTransforms';
|
export * from './tableTransforms';
|
||||||
export * from './packageTools';
|
export * from './packageTools';
|
||||||
export * from './createBulkInsertStreamBase';
|
export * from './createBulkInsertStreamBase';
|
||||||
|
export * from './createAsyncWriteStream';
|
||||||
export * from './DatabaseAnalyser';
|
export * from './DatabaseAnalyser';
|
||||||
export * from './driverBase';
|
export * from './driverBase';
|
||||||
export * from './SqlDumper';
|
export * from './SqlDumper';
|
||||||
|
|||||||
@@ -102,8 +102,27 @@ await dbgateApi.deployDb(${JSON.stringify(
|
|||||||
editor: {
|
editor: {
|
||||||
sourceConid: '__model',
|
sourceConid: '__model',
|
||||||
sourceDatabase: `archive:${data.name}`,
|
sourceDatabase: `archive:${data.name}`,
|
||||||
targetConid: _.get($currentDatabase, 'connection._id'),
|
targetConid: $currentDatabase?.connection?._id,
|
||||||
targetDatabase: _.get($currentDatabase, 'name'),
|
targetDatabase: $currentDatabase?.name,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
);
|
||||||
|
};
|
||||||
|
|
||||||
|
const handleOpenDuplicatorTab = () => {
|
||||||
|
openNewTab(
|
||||||
|
{
|
||||||
|
title: data.name,
|
||||||
|
icon: 'img duplicator',
|
||||||
|
tabComponent: 'DataDuplicatorTab',
|
||||||
|
props: {
|
||||||
|
conid: $currentDatabase?.connection?._id,
|
||||||
|
database: $currentDatabase?.name,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
editor: {
|
||||||
|
archiveFolder: data.name,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
@@ -115,6 +134,7 @@ await dbgateApi.deployDb(${JSON.stringify(
|
|||||||
data.name != 'default' && { text: 'Rename', onClick: handleRename },
|
data.name != 'default' && { text: 'Rename', onClick: handleRename },
|
||||||
data.name != 'default' &&
|
data.name != 'default' &&
|
||||||
$currentDatabase && [
|
$currentDatabase && [
|
||||||
|
{ text: 'Data duplicator', onClick: handleOpenDuplicatorTab },
|
||||||
{ text: 'Generate deploy DB SQL - experimental', onClick: handleGenerateDeploySql },
|
{ text: 'Generate deploy DB SQL - experimental', onClick: handleGenerateDeploySql },
|
||||||
{ text: 'Shell: Deploy DB - experimental', onClick: handleGenerateDeployScript },
|
{ text: 'Shell: Deploy DB - experimental', onClick: handleGenerateDeployScript },
|
||||||
],
|
],
|
||||||
|
|||||||
@@ -226,6 +226,8 @@
|
|||||||
'img type-binary': 'mdi mdi-file color-icon-blue',
|
'img type-binary': 'mdi mdi-file color-icon-blue',
|
||||||
'img type-rejson': 'mdi mdi-color-json color-icon-blue',
|
'img type-rejson': 'mdi mdi-color-json color-icon-blue',
|
||||||
'img keydb': 'mdi mdi-key color-icon-blue',
|
'img keydb': 'mdi mdi-key color-icon-blue',
|
||||||
|
|
||||||
|
'img duplicator': 'mdi mdi-content-duplicate color-icon-green',
|
||||||
};
|
};
|
||||||
</script>
|
</script>
|
||||||
|
|
||||||
|
|||||||
@@ -72,7 +72,7 @@ export default function useEditorData({ tabid, reloadToken = 0, loadFromArgs = n
|
|||||||
if (onInitialData) onInitialData(initFallback);
|
if (onInitialData) onInitialData(initFallback);
|
||||||
value = initFallback;
|
value = initFallback;
|
||||||
// move to local forage
|
// move to local forage
|
||||||
await localforage.setItem(localStorageKey, JSON.stringify(initFallback));
|
await localforage.setItem(localStorageKey, initFallback);
|
||||||
localStorage.removeItem(localStorageKey);
|
localStorage.removeItem(localStorageKey);
|
||||||
} else {
|
} else {
|
||||||
const init = await localforage.getItem(localStorageKey);
|
const init = await localforage.getItem(localStorageKey);
|
||||||
|
|||||||
257
packages/web/src/tabs/DataDuplicatorTab.svelte
Normal file
257
packages/web/src/tabs/DataDuplicatorTab.svelte
Normal file
@@ -0,0 +1,257 @@
|
|||||||
|
<script lang="ts" context="module">
|
||||||
|
const getCurrentEditor = () => getActiveComponent('DataDuplicatorTab');
|
||||||
|
|
||||||
|
registerCommand({
|
||||||
|
id: 'dataDuplicator.run',
|
||||||
|
category: 'Data duplicator',
|
||||||
|
name: 'Import into DB',
|
||||||
|
keyText: 'F5 | CtrlOrCommand+Enter',
|
||||||
|
toolbar: true,
|
||||||
|
isRelatedToTab: true,
|
||||||
|
icon: 'icon run',
|
||||||
|
testEnabled: () => getCurrentEditor()?.canRun(),
|
||||||
|
onClick: () => getCurrentEditor().run(),
|
||||||
|
});
|
||||||
|
</script>
|
||||||
|
|
||||||
|
<script lang="ts">
|
||||||
|
import { ScriptWriter, ScriptWriterJson } from 'dbgate-tools';
|
||||||
|
|
||||||
|
import _ from 'lodash';
|
||||||
|
import ToolStripCommandButton from '../buttons/ToolStripCommandButton.svelte';
|
||||||
|
import ToolStripContainer from '../buttons/ToolStripContainer.svelte';
|
||||||
|
import invalidateCommands from '../commands/invalidateCommands';
|
||||||
|
import registerCommand from '../commands/registerCommand';
|
||||||
|
import TableControl from '../elements/TableControl.svelte';
|
||||||
|
import VerticalSplitter from '../elements/VerticalSplitter.svelte';
|
||||||
|
import CheckboxField from '../forms/CheckboxField.svelte';
|
||||||
|
import SelectField from '../forms/SelectField.svelte';
|
||||||
|
import { extractShellConnection } from '../impexp/createImpExpScript';
|
||||||
|
import SocketMessageView from '../query/SocketMessageView.svelte';
|
||||||
|
import useEditorData from '../query/useEditorData';
|
||||||
|
import { getCurrentConfig } from '../stores';
|
||||||
|
import { apiCall, apiOff, apiOn } from '../utility/api';
|
||||||
|
import { changeTab } from '../utility/common';
|
||||||
|
import createActivator, { getActiveComponent } from '../utility/createActivator';
|
||||||
|
import { useArchiveFiles, useArchiveFolders, useConnectionInfo, useDatabaseInfo } from '../utility/metadataLoaders';
|
||||||
|
import useEffect from '../utility/useEffect';
|
||||||
|
|
||||||
|
export let conid;
|
||||||
|
export let database;
|
||||||
|
export let tabid;
|
||||||
|
|
||||||
|
let busy = false;
|
||||||
|
let runnerId = null;
|
||||||
|
let executeNumber = 0;
|
||||||
|
|
||||||
|
export const activator = createActivator('DataDuplicatorTab', true);
|
||||||
|
|
||||||
|
$: connection = useConnectionInfo({ conid });
|
||||||
|
$: dbinfo = useDatabaseInfo({ conid, database });
|
||||||
|
|
||||||
|
$: archiveFolders = useArchiveFolders();
|
||||||
|
$: archiveFiles = useArchiveFiles({ folder: $editorState?.value?.archiveFolder });
|
||||||
|
|
||||||
|
$: pairedNames = _.intersectionBy(
|
||||||
|
$dbinfo?.tables?.map(x => x.pureName),
|
||||||
|
$archiveFiles?.map(x => x.name),
|
||||||
|
(x: string) => _.toUpper(x)
|
||||||
|
);
|
||||||
|
|
||||||
|
$: {
|
||||||
|
changeTab(tabid, tab => ({ ...tab, busy }));
|
||||||
|
}
|
||||||
|
|
||||||
|
$: {
|
||||||
|
busy;
|
||||||
|
runnerId;
|
||||||
|
tableRows;
|
||||||
|
invalidateCommands();
|
||||||
|
}
|
||||||
|
|
||||||
|
const { editorState, editorValue, setEditorData } = useEditorData({
|
||||||
|
tabid,
|
||||||
|
onInitialData: value => {
|
||||||
|
invalidateCommands();
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
function changeTable(row) {
|
||||||
|
setEditorData(old => ({
|
||||||
|
...old,
|
||||||
|
tables: {
|
||||||
|
...old?.tables,
|
||||||
|
[row.name]: row,
|
||||||
|
},
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
|
||||||
|
function createScript(forceScript = false) {
|
||||||
|
const config = getCurrentConfig();
|
||||||
|
const script = config.allowShellScripting || forceScript ? new ScriptWriter() : new ScriptWriterJson();
|
||||||
|
script.dataDuplicator({
|
||||||
|
connection: extractShellConnection($connection, database),
|
||||||
|
archive: $editorState.value.archiveFolder,
|
||||||
|
items: tableRows
|
||||||
|
.filter(x => x.isChecked)
|
||||||
|
.map(row => ({
|
||||||
|
name: row.name,
|
||||||
|
operation: row.operation,
|
||||||
|
matchColumns: _.compact([row.matchColumn1]),
|
||||||
|
})),
|
||||||
|
});
|
||||||
|
return script.getScript();
|
||||||
|
}
|
||||||
|
|
||||||
|
export function canRun() {
|
||||||
|
return !!tableRows.find(x => x.isChecked) && !busy;
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function run() {
|
||||||
|
if (busy) return;
|
||||||
|
executeNumber += 1;
|
||||||
|
busy = true;
|
||||||
|
const script = await createScript();
|
||||||
|
let runid = runnerId;
|
||||||
|
const resp = await apiCall('runners/start', { script });
|
||||||
|
runid = resp.runid;
|
||||||
|
runnerId = runid;
|
||||||
|
}
|
||||||
|
|
||||||
|
$: effect = useEffect(() => registerRunnerDone(runnerId));
|
||||||
|
|
||||||
|
function registerRunnerDone(rid) {
|
||||||
|
if (rid) {
|
||||||
|
apiOn(`runner-done-${rid}`, handleRunnerDone);
|
||||||
|
return () => {
|
||||||
|
apiOff(`runner-done-${rid}`, handleRunnerDone);
|
||||||
|
};
|
||||||
|
} else {
|
||||||
|
return () => {};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
$: $effect;
|
||||||
|
|
||||||
|
const handleRunnerDone = () => {
|
||||||
|
busy = false;
|
||||||
|
};
|
||||||
|
|
||||||
|
// $: console.log('$archiveFiles', $archiveFiles);
|
||||||
|
// $: console.log('$editorState', $editorState.value);
|
||||||
|
|
||||||
|
$: tableRows = pairedNames.map(name => {
|
||||||
|
const item = $editorState?.value?.tables?.[name];
|
||||||
|
const isChecked = item?.isChecked ?? true;
|
||||||
|
const operation = item?.operation ?? 'copy';
|
||||||
|
const tableInfo = $dbinfo?.tables?.find(x => x.pureName?.toUpperCase() == name.toUpperCase());
|
||||||
|
const matchColumn1 =
|
||||||
|
item?.matchColumn1 ?? tableInfo?.primaryKey?.columns?.[0]?.columnName ?? tableInfo?.columns?.[0]?.columnName;
|
||||||
|
|
||||||
|
return {
|
||||||
|
name,
|
||||||
|
isChecked,
|
||||||
|
operation,
|
||||||
|
matchColumn1,
|
||||||
|
};
|
||||||
|
});
|
||||||
|
|
||||||
|
// $: console.log('$archiveFolders', $archiveFolders);
|
||||||
|
</script>
|
||||||
|
|
||||||
|
<ToolStripContainer>
|
||||||
|
<VerticalSplitter>
|
||||||
|
<svelte:fragment slot="1">
|
||||||
|
<div>
|
||||||
|
<div class="bold m-2">Source archive</div>
|
||||||
|
<SelectField
|
||||||
|
isNative
|
||||||
|
value={$editorState.value?.archiveFolder}
|
||||||
|
on:change={e => {
|
||||||
|
setEditorData(old => ({
|
||||||
|
...old,
|
||||||
|
archiveFolder: e.detail,
|
||||||
|
}));
|
||||||
|
}}
|
||||||
|
options={$archiveFolders?.map(x => ({
|
||||||
|
label: x.name,
|
||||||
|
value: x.name,
|
||||||
|
})) || []}
|
||||||
|
/>
|
||||||
|
|
||||||
|
<div class="bold m-2">Imported files</div>
|
||||||
|
|
||||||
|
<TableControl
|
||||||
|
rows={tableRows}
|
||||||
|
columns={[
|
||||||
|
{ header: '', fieldName: 'isChecked', slot: 1 },
|
||||||
|
{ header: 'File=>Table', fieldName: 'name' },
|
||||||
|
{ header: 'Operation', fieldName: 'operation', slot: 2 },
|
||||||
|
{ header: 'Match column', fieldName: 'matchColumn1', slot: 3 },
|
||||||
|
]}
|
||||||
|
>
|
||||||
|
<svelte:fragment slot="1" let:row>
|
||||||
|
<CheckboxField
|
||||||
|
checked={row.isChecked}
|
||||||
|
on:change={e => {
|
||||||
|
changeTable({ ...row, isChecked: e.target.checked });
|
||||||
|
}}
|
||||||
|
/>
|
||||||
|
</svelte:fragment>
|
||||||
|
<svelte:fragment slot="2" let:row>
|
||||||
|
<SelectField
|
||||||
|
isNative
|
||||||
|
value={row.operation}
|
||||||
|
on:change={e => {
|
||||||
|
changeTable({ ...row, operation: e.detail });
|
||||||
|
}}
|
||||||
|
disabled={!row.isChecked}
|
||||||
|
options={[
|
||||||
|
{ label: 'Copy row', value: 'copy' },
|
||||||
|
{ label: 'Lookup (find matching row)', value: 'lookup' },
|
||||||
|
{ label: 'Insert if not exists', value: 'insertMissing' },
|
||||||
|
]}
|
||||||
|
/>
|
||||||
|
</svelte:fragment>
|
||||||
|
<svelte:fragment slot="3" let:row>
|
||||||
|
{#if row.operation != 'copy'}
|
||||||
|
<SelectField
|
||||||
|
isNative
|
||||||
|
value={row.matchColumn1}
|
||||||
|
on:change={e => {
|
||||||
|
changeTable({ ...row, matchColumn1: e.detail });
|
||||||
|
}}
|
||||||
|
disabled={!row.isChecked}
|
||||||
|
options={$dbinfo?.tables
|
||||||
|
?.find(x => x.pureName?.toUpperCase() == row.name.toUpperCase())
|
||||||
|
?.columns?.map(col => ({
|
||||||
|
label: col.columnName,
|
||||||
|
value: col.columnName,
|
||||||
|
})) || []}
|
||||||
|
/>
|
||||||
|
{/if}
|
||||||
|
</svelte:fragment>
|
||||||
|
</TableControl>
|
||||||
|
</div>
|
||||||
|
</svelte:fragment>
|
||||||
|
<svelte:fragment slot="2">
|
||||||
|
<SocketMessageView eventName={runnerId ? `runner-info-${runnerId}` : null} {executeNumber} showNoMessagesAlert />
|
||||||
|
</svelte:fragment>
|
||||||
|
</VerticalSplitter>
|
||||||
|
|
||||||
|
<svelte:fragment slot="toolstrip">
|
||||||
|
<ToolStripCommandButton command="dataDuplicator.run" />
|
||||||
|
</svelte:fragment>
|
||||||
|
</ToolStripContainer>
|
||||||
|
|
||||||
|
<!-- <div>
|
||||||
|
{#each pairedNames as name}
|
||||||
|
<div>{name}</div>
|
||||||
|
{/each}
|
||||||
|
</div> -->
|
||||||
|
|
||||||
|
<!-- <style>
|
||||||
|
.title {
|
||||||
|
font-weight: bold;
|
||||||
|
}
|
||||||
|
</style> -->
|
||||||
@@ -28,6 +28,7 @@ import * as MapTab from './MapTab.svelte';
|
|||||||
import * as PerspectiveTab from './PerspectiveTab.svelte';
|
import * as PerspectiveTab from './PerspectiveTab.svelte';
|
||||||
import * as ServerSummaryTab from './ServerSummaryTab.svelte';
|
import * as ServerSummaryTab from './ServerSummaryTab.svelte';
|
||||||
import * as ProfilerTab from './ProfilerTab.svelte';
|
import * as ProfilerTab from './ProfilerTab.svelte';
|
||||||
|
import * as DataDuplicatorTab from './DataDuplicatorTab.svelte';
|
||||||
|
|
||||||
export default {
|
export default {
|
||||||
TableDataTab,
|
TableDataTab,
|
||||||
@@ -60,4 +61,5 @@ export default {
|
|||||||
PerspectiveTab,
|
PerspectiveTab,
|
||||||
ServerSummaryTab,
|
ServerSummaryTab,
|
||||||
ProfilerTab,
|
ProfilerTab,
|
||||||
|
DataDuplicatorTab,
|
||||||
};
|
};
|
||||||
|
|||||||
@@ -155,6 +155,10 @@ class MsSqlDumper extends SqlDumper {
|
|||||||
newname
|
newname
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
selectScopeIdentity() {
|
||||||
|
this.put('^select ^scope_identity()');
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
MsSqlDumper.prototype.renameView = MsSqlDumper.prototype.renameObject;
|
MsSqlDumper.prototype.renameView = MsSqlDumper.prototype.renameObject;
|
||||||
|
|||||||
@@ -89,6 +89,10 @@ class Dumper extends SqlDumper {
|
|||||||
putByteArrayValue(value) {
|
putByteArrayValue(value) {
|
||||||
this.putRaw(`unhex('${arrayToHexString(value)}')`);
|
this.putRaw(`unhex('${arrayToHexString(value)}')`);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
selectScopeIdentity() {
|
||||||
|
this.put('^select ^last_insert_id()')
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
module.exports = Dumper;
|
module.exports = Dumper;
|
||||||
|
|||||||
@@ -99,6 +99,14 @@ class Dumper extends SqlDumper {
|
|||||||
putByteArrayValue(value) {
|
putByteArrayValue(value) {
|
||||||
this.putRaw(`e'\\\\x${arrayToHexString(value)}'`);
|
this.putRaw(`e'\\\\x${arrayToHexString(value)}'`);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
selectScopeIdentity(table) {
|
||||||
|
this.put(
|
||||||
|
"^SELECT currval(pg_get_serial_sequence('%f','%s'))",
|
||||||
|
table,
|
||||||
|
table.columns?.find(x => x.autoIncrement)?.[0]?.columnName
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
module.exports = Dumper;
|
module.exports = Dumper;
|
||||||
|
|||||||
@@ -16,6 +16,10 @@ class Dumper extends SqlDumper {
|
|||||||
truncateTable(name) {
|
truncateTable(name) {
|
||||||
this.putCmd('^delete ^from %f', name);
|
this.putCmd('^delete ^from %f', name);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
selectScopeIdentity() {
|
||||||
|
this.put('^select last_insert_rowid()')
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
module.exports = Dumper;
|
module.exports = Dumper;
|
||||||
|
|||||||
Reference in New Issue
Block a user