save structure changes to jsonl file

This commit is contained in:
Jan Prochazka
2023-02-16 13:33:28 +01:00
parent edf0637a35
commit 3114a05c3b
4 changed files with 65 additions and 4 deletions

View File

@@ -2,7 +2,7 @@ const fs = require('fs');
const _ = require('lodash');
const stream = require('stream');
const byline = require('byline');
const { getLogger } = require('dbgate-tools');
const { getLogger, processJsonDataUpdateCommands, removeTablePairingId } = require('dbgate-tools');
const logger = getLogger('modifyJsonLinesReader');
const stableStringify = require('json-stable-stringify');
@@ -11,6 +11,7 @@ class ParseStream extends stream.Transform {
super({ objectMode: true });
this.limitRows = limitRows;
this.changeSet = changeSet;
this.wasHeader = false;
this.currentRowIndex = 0;
if (mergeMode == 'merge') {
if (mergedRows && mergeKey) {
@@ -28,12 +29,28 @@ class ParseStream extends stream.Transform {
_transform(chunk, encoding, done) {
let obj = JSON.parse(chunk);
if (obj.__isStreamHeader) {
this.push(obj);
if (this.changeSet && this.changeSet.structure) {
this.push({
...removeTablePairingId(this.changeSet.structure),
__isStreamHeader: true,
});
} else {
this.push(obj);
}
this.wasHeader = true;
done();
return;
}
if (this.changeSet) {
if (!this.wasHeader && this.changeSet.structure) {
this.push({
...removeTablePairingId(this.changeSet.structure),
__isStreamHeader: true,
});
this.wasHeader = true;
}
if (!this.limitRows || this.currentRowIndex < this.limitRows) {
if (this.changeSet.deletes.find(x => x.existingRowIndex == this.currentRowIndex)) {
obj = null;
@@ -48,6 +65,9 @@ class ParseStream extends stream.Transform {
}
if (obj) {
if (this.changeSet.dataUpdateCommands) {
obj = processJsonDataUpdateCommands(obj, this.changeSet.dataUpdateCommands);
}
this.push(obj);
}
this.currentRowIndex += 1;