mirror of
https://github.com/DeNNiiInc/dbgate.git
synced 2026-04-18 21:55:59 +00:00
more mirroe archive commands
This commit is contained in:
@@ -1,69 +0,0 @@
|
||||
const fs = require('fs');
|
||||
const stream = require('stream');
|
||||
const byline = require('byline');
|
||||
const { getLogger } = require('dbgate-tools');
|
||||
const logger = getLogger('changeSetOverJsonLinesReader');
|
||||
|
||||
class ParseStream extends stream.Transform {
|
||||
constructor({ limitRows, changeSet }) {
|
||||
super({ objectMode: true });
|
||||
this.limitRows = limitRows;
|
||||
this.changeSet = changeSet;
|
||||
this.currentRowIndex = 0;
|
||||
}
|
||||
_transform(chunk, encoding, done) {
|
||||
let obj = JSON.parse(chunk);
|
||||
if (obj.__isStreamHeader) {
|
||||
this.push(obj);
|
||||
done();
|
||||
return;
|
||||
}
|
||||
|
||||
if (!this.limitRows || this.currentRowIndex < this.limitRows) {
|
||||
if (this.changeSet.deletes.find(x => x.existingRowIndex == this.currentRowIndex)) {
|
||||
obj = null;
|
||||
}
|
||||
|
||||
const update = this.changeSet.updates.find(x => x.existingRowIndex == this.currentRowIndex);
|
||||
if (update) {
|
||||
obj = {
|
||||
...obj,
|
||||
...update.fields,
|
||||
};
|
||||
}
|
||||
|
||||
if (obj) {
|
||||
this.push(obj);
|
||||
}
|
||||
this.currentRowIndex += 1;
|
||||
}
|
||||
done();
|
||||
}
|
||||
|
||||
_flush(done) {
|
||||
for (const insert of this.changeSet.inserts) {
|
||||
this.push({
|
||||
...insert.document,
|
||||
...insert.fields,
|
||||
});
|
||||
}
|
||||
done();
|
||||
}
|
||||
}
|
||||
|
||||
async function changeSetOverJsonLinesReader({
|
||||
fileName,
|
||||
encoding = 'utf-8',
|
||||
limitRows = undefined,
|
||||
changeSet = { inserts: [], updates: [], deletes: [] },
|
||||
}) {
|
||||
logger.info(`Reading file ${fileName} with change set`);
|
||||
|
||||
const fileStream = fs.createReadStream(fileName, encoding);
|
||||
const liner = byline(fileStream);
|
||||
const parser = new ParseStream({ limitRows, changeSet });
|
||||
liner.pipe(parser);
|
||||
return parser;
|
||||
}
|
||||
|
||||
module.exports = changeSetOverJsonLinesReader;
|
||||
@@ -25,7 +25,7 @@ const dumpDatabase = require('./dumpDatabase');
|
||||
const importDatabase = require('./importDatabase');
|
||||
const loadDatabase = require('./loadDatabase');
|
||||
const generateModelSql = require('./generateModelSql');
|
||||
const changeSetOverJsonLinesReader = require('./changeSetOverJsonLinesReader');
|
||||
const modifyJsonLinesReader = require('./modifyJsonLinesReader');
|
||||
|
||||
const dbgateApi = {
|
||||
queryReader,
|
||||
@@ -54,7 +54,7 @@ const dbgateApi = {
|
||||
importDatabase,
|
||||
loadDatabase,
|
||||
generateModelSql,
|
||||
changeSetOverJsonLinesReader,
|
||||
modifyJsonLinesReader,
|
||||
};
|
||||
|
||||
requirePlugin.initializeDbgateApi(dbgateApi);
|
||||
|
||||
117
packages/api/src/shell/modifyJsonLinesReader.js
Normal file
117
packages/api/src/shell/modifyJsonLinesReader.js
Normal file
@@ -0,0 +1,117 @@
|
||||
const fs = require('fs');
|
||||
const _ = require('lodash');
|
||||
const stream = require('stream');
|
||||
const byline = require('byline');
|
||||
const { getLogger } = require('dbgate-tools');
|
||||
const logger = getLogger('modifyJsonLinesReader');
|
||||
const stableStringify = require('json-stable-stringify');
|
||||
|
||||
class ParseStream extends stream.Transform {
|
||||
constructor({ limitRows, changeSet, mergedRows, mergeKey, mergeMode }) {
|
||||
super({ objectMode: true });
|
||||
this.limitRows = limitRows;
|
||||
this.changeSet = changeSet;
|
||||
this.currentRowIndex = 0;
|
||||
if (mergeMode == 'merge') {
|
||||
if (mergedRows && mergeKey) {
|
||||
this.mergedRowsDict = {};
|
||||
for (const row of mergedRows) {
|
||||
const key = stableStringify(_.pick(row, mergeKey));
|
||||
this.mergedRowsDict[key] = row;
|
||||
}
|
||||
}
|
||||
}
|
||||
this.mergedRowsArray = mergedRows;
|
||||
this.mergeKey = mergeKey;
|
||||
this.mergeMode = mergeMode;
|
||||
}
|
||||
_transform(chunk, encoding, done) {
|
||||
let obj = JSON.parse(chunk);
|
||||
if (obj.__isStreamHeader) {
|
||||
this.push(obj);
|
||||
done();
|
||||
return;
|
||||
}
|
||||
|
||||
if (this.changeSet) {
|
||||
if (!this.limitRows || this.currentRowIndex < this.limitRows) {
|
||||
if (this.changeSet.deletes.find(x => x.existingRowIndex == this.currentRowIndex)) {
|
||||
obj = null;
|
||||
}
|
||||
|
||||
const update = this.changeSet.updates.find(x => x.existingRowIndex == this.currentRowIndex);
|
||||
if (update) {
|
||||
obj = {
|
||||
...obj,
|
||||
...update.fields,
|
||||
};
|
||||
}
|
||||
|
||||
if (obj) {
|
||||
this.push(obj);
|
||||
}
|
||||
this.currentRowIndex += 1;
|
||||
}
|
||||
} else if (this.mergedRowsArray && this.mergeKey && this.mergeMode) {
|
||||
if (this.mergeMode == 'merge') {
|
||||
const key = stableStringify(_.pick(obj, this.mergeKey));
|
||||
if (this.mergedRowsDict[key]) {
|
||||
this.push({ ...obj, ...this.mergedRowsDict[key] });
|
||||
delete this.mergedRowsDict[key];
|
||||
} else {
|
||||
this.push(obj);
|
||||
}
|
||||
} else if (this.mergeMode == 'append') {
|
||||
this.push(obj);
|
||||
}
|
||||
} else {
|
||||
this.push(obj);
|
||||
}
|
||||
done();
|
||||
}
|
||||
|
||||
_flush(done) {
|
||||
if (this.changeSet) {
|
||||
for (const insert of this.changeSet.inserts) {
|
||||
this.push({
|
||||
...insert.document,
|
||||
...insert.fields,
|
||||
});
|
||||
}
|
||||
} else if (this.mergedRowsArray && this.mergeKey) {
|
||||
if (this.mergeMode == 'merge') {
|
||||
for (const row of this.mergedRowsArray) {
|
||||
const key = stableStringify(_.pick(row, this.mergeKey));
|
||||
if (this.mergedRowsDict[key]) {
|
||||
this.push(row);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
for (const row of this.mergedRowsArray) {
|
||||
this.push(row);
|
||||
}
|
||||
}
|
||||
}
|
||||
done();
|
||||
}
|
||||
}
|
||||
|
||||
async function modifyJsonLinesReader({
|
||||
fileName,
|
||||
encoding = 'utf-8',
|
||||
limitRows = undefined,
|
||||
changeSet = null,
|
||||
mergedRows = null,
|
||||
mergeKey = null,
|
||||
mergeMode = 'merge',
|
||||
}) {
|
||||
logger.info(`Reading file ${fileName} with change set`);
|
||||
|
||||
const fileStream = fs.createReadStream(fileName, encoding);
|
||||
const liner = byline(fileStream);
|
||||
const parser = new ParseStream({ limitRows, changeSet, mergedRows, mergeKey, mergeMode });
|
||||
liner.pipe(parser);
|
||||
return parser;
|
||||
}
|
||||
|
||||
module.exports = modifyJsonLinesReader;
|
||||
Reference in New Issue
Block a user