data duplicator - handle weak refs

This commit is contained in:
Jan Prochazka
2024-11-07 13:42:41 +01:00
parent cb450a0313
commit 1fb4a06092
2 changed files with 38 additions and 6 deletions

View File

@@ -121,7 +121,7 @@ describe('Data duplicator', () => {
foreignKeys: [{ refTableName: 't1', columns: [{ columnName: 'valfk', refColumnName: 'id' }] }], foreignKeys: [{ refTableName: 't1', columns: [{ columnName: 'valfk', refColumnName: 'id' }] }],
}) })
); );
runCommandOnDriver(conn, driver, dmp => dmp.put("insert into ~t1 (~id, ~val) ~values (1, 'first')")); runCommandOnDriver(conn, driver, dmp => dmp.put("insert into ~t1 (~id, ~val) values (1, 'first')"));
const gett2 = () => const gett2 = () =>
stream.Readable.from([ stream.Readable.from([
@@ -149,7 +149,10 @@ describe('Data duplicator', () => {
expect(res1.rows[0].cnt.toString()).toEqual('1'); expect(res1.rows[0].cnt.toString()).toEqual('1');
const res2 = await driver.query(conn, `select count(*) as cnt from t2`); const res2 = await driver.query(conn, `select count(*) as cnt from t2`);
expect(res2.rows[0].cnt.toString()).toEqual('1'); expect(res2.rows[0].cnt.toString()).toEqual('2');
const res3 = await driver.query(conn, `select count(*) as cnt from t2 where valfk is not null`);
expect(res3.rows[0].cnt.toString()).toEqual('1');
}) })
); );
}); });

View File

@@ -79,7 +79,11 @@ class DuplicatorItemHolder {
const isMandatory = this.table.columns.find(x => x.columnName == fk.columns[0]?.columnName)?.notNull; const isMandatory = this.table.columns.find(x => x.columnName == fk.columns[0]?.columnName)?.notNull;
if (refHolder == null) { if (refHolder == null) {
if (!isMandatory) { if (!isMandatory) {
const weakref = new DuplicatorWeakReference(this, this.duplicator.db.tables.find(x => x.pureName == fk.refTableName), fk); const weakref = new DuplicatorWeakReference(
this,
this.duplicator.db.tables.find(x => x.pureName == fk.refTableName),
fk
);
this.weakReferences.push(weakref); this.weakReferences.push(weakref);
} }
} else { } else {
@@ -92,13 +96,13 @@ class DuplicatorItemHolder {
} }
} }
createInsertObject(chunk) { createInsertObject(chunk, weakrefcols: string[]) {
const res = _omit( const res = _omit(
_pick( _pick(
chunk, chunk,
this.table.columns.map(x => x.columnName) this.table.columns.map(x => x.columnName)
), ),
[this.autoColumn, ...this.backReferences.map(x => x.columnName)] [this.autoColumn, ...this.backReferences.map(x => x.columnName), ...weakrefcols]
); );
for (const key in res) { for (const key in res) {
@@ -119,6 +123,28 @@ class DuplicatorItemHolder {
return res; return res;
} }
// returns list of columns that are weak references and are not resolved
async getMissingWeakRefsForRow(row): Promise<string[]> {
if (!this.duplicator.options.setNullForUnresolvedNullableRefs) {
return [];
}
const qres = await runQueryOnDriver(this.duplicator.pool, this.duplicator.driver, dmp => {
dmp.put('^select ');
dmp.putCollection(',', this.weakReferences, weakref => {
dmp.put(
'(^case ^when ^exists (^select * ^from %f where %i = %v) ^then 1 ^else 0 ^end) as %i',
weakref.ref,
weakref.foreignKey.columns[0].refColumnName,
row[weakref.foreignKey.columns[0].columnName],
weakref.foreignKey.columns[0].columnName
);
});
});
const qrow = qres.rows[0];
return this.weakReferences.filter(x => !qrow[x.columnName]).map(x => x.columnName);
}
async runImport() { async runImport() {
const readStream = await this.item.openStream(); const readStream = await this.item.openStream();
const driver = this.duplicator.driver; const driver = this.duplicator.driver;
@@ -129,6 +155,8 @@ class DuplicatorItemHolder {
let skipped = 0; let skipped = 0;
let lastLogged = new Date(); let lastLogged = new Date();
const existingWeakRefs = {};
const writeStream = createAsyncWriteStream(this.duplicator.stream, { const writeStream = createAsyncWriteStream(this.duplicator.stream, {
processItem: async chunk => { processItem: async chunk => {
if (chunk.__isStreamHeader) { if (chunk.__isStreamHeader) {
@@ -137,7 +165,8 @@ class DuplicatorItemHolder {
const doCopy = async () => { const doCopy = async () => {
// console.log('chunk', this.name, JSON.stringify(chunk)); // console.log('chunk', this.name, JSON.stringify(chunk));
const insertedObj = this.createInsertObject(chunk); const weakrefcols = await this.getMissingWeakRefsForRow(chunk);
const insertedObj = this.createInsertObject(chunk, weakrefcols);
// console.log('insertedObj', this.name, JSON.stringify(insertedObj)); // console.log('insertedObj', this.name, JSON.stringify(insertedObj));
if (insertedObj == null) { if (insertedObj == null) {
skipped += 1; skipped += 1;