Merge branch 'develop'

This commit is contained in:
Jan Prochazka
2023-02-26 10:14:42 +01:00
93 changed files with 1672 additions and 1616 deletions

View File

@@ -3,13 +3,13 @@ const readline = require('readline');
const path = require('path');
const { archivedir, clearArchiveLinksCache, resolveArchiveFolder } = require('../utility/directories');
const socket = require('../utility/socket');
const { saveFreeTableData } = require('../utility/freeTableStorage');
const loadFilesRecursive = require('../utility/loadFilesRecursive');
const getJslFileName = require('../utility/getJslFileName');
const { getLogger } = require('dbgate-tools');
const uuidv1 = require('uuid/v1');
const dbgateApi = require('../shell');
const jsldata = require('./jsldata');
const platformInfo = require('../utility/platformInfo');
const logger = getLogger('archive');
@@ -137,8 +137,13 @@ module.exports = {
});
const writer = await dbgateApi.jsonLinesWriter({ fileName: tmpchangedFilePath });
await dbgateApi.copyStream(reader, writer);
await fs.unlink(changedFilePath);
await fs.rename(path.join(tmpchangedFilePath), path.join(changedFilePath));
if (platformInfo.isWindows) {
await fs.copyFile(tmpchangedFilePath, changedFilePath);
await fs.unlink(tmpchangedFilePath);
} else {
await fs.unlink(changedFilePath);
await fs.rename(tmpchangedFilePath, changedFilePath);
}
return true;
},
@@ -162,34 +167,6 @@ module.exports = {
return true;
},
saveFreeTable_meta: true,
async saveFreeTable({ folder, file, data }) {
await saveFreeTableData(path.join(resolveArchiveFolder(folder), `${file}.jsonl`), data);
socket.emitChanged(`archive-files-changed`, { folder });
return true;
},
loadFreeTable_meta: true,
async loadFreeTable({ folder, file }) {
return new Promise((resolve, reject) => {
const fileStream = fs.createReadStream(path.join(resolveArchiveFolder(folder), `${file}.jsonl`));
const liner = readline.createInterface({
input: fileStream,
});
let structure = null;
const rows = [];
liner.on('line', line => {
const data = JSON.parse(line);
if (structure) rows.push(data);
else structure = data;
});
liner.on('close', () => {
resolve({ structure, rows });
fileStream.close();
});
});
},
saveText_meta: true,
async saveText({ folder, file, text }) {
await fs.writeFile(path.join(resolveArchiveFolder(folder), `${file}.jsonl`), text);
@@ -198,10 +175,30 @@ module.exports = {
},
saveJslData_meta: true,
async saveJslData({ folder, file, jslid }) {
async saveJslData({ folder, file, jslid, changeSet }) {
const source = getJslFileName(jslid);
const target = path.join(resolveArchiveFolder(folder), `${file}.jsonl`);
await fs.copyFile(source, target);
if (changeSet) {
const reader = await dbgateApi.modifyJsonLinesReader({
fileName: source,
changeSet,
});
const writer = await dbgateApi.jsonLinesWriter({ fileName: target });
await dbgateApi.copyStream(reader, writer);
} else {
await fs.copyFile(source, target);
socket.emitChanged(`archive-files-changed`, { folder });
}
return true;
},
saveRows_meta: true,
async saveRows({ folder, file, rows }) {
const fileStream = fs.createWriteStream(path.join(resolveArchiveFolder(folder), `${file}.jsonl`));
for (const row of rows) {
await fileStream.write(JSON.stringify(row) + '\n');
}
await fileStream.close();
socket.emitChanged(`archive-files-changed`, { folder });
return true;
},

View File

@@ -4,7 +4,6 @@ const lineReader = require('line-reader');
const _ = require('lodash');
const { __ } = require('lodash/fp');
const DatastoreProxy = require('../utility/DatastoreProxy');
const { saveFreeTableData } = require('../utility/freeTableStorage');
const getJslFileName = require('../utility/getJslFileName');
const JsonLinesDatastore = require('../utility/JsonLinesDatastore');
const requirePluginFunction = require('../utility/requirePluginFunction');
@@ -148,6 +147,12 @@ module.exports = {
return datastore.getRows(offset, limit, _.isEmpty(filters) ? null : filters, _.isEmpty(sort) ? null : sort);
},
exists_meta: true,
async exists({ jslid }) {
const fileName = getJslFileName(jslid);
return fs.existsSync(fileName);
},
getStats_meta: true,
getStats({ jslid }) {
const file = `${getJslFileName(jslid)}.stats`;
@@ -189,18 +194,22 @@ module.exports = {
// }
},
saveFreeTable_meta: true,
async saveFreeTable({ jslid, data }) {
saveFreeTableData(getJslFileName(jslid), data);
return true;
},
saveText_meta: true,
async saveText({ jslid, text }) {
await fs.promises.writeFile(getJslFileName(jslid), text);
return true;
},
saveRows_meta: true,
async saveRows({ jslid, rows }) {
const fileStream = fs.createWriteStream(getJslFileName(jslid));
for (const row of rows) {
await fileStream.write(JSON.stringify(row) + '\n');
}
await fileStream.close();
return true;
},
extractTimelineChart_meta: true,
async extractTimelineChart({ jslid, timestampFunction, aggregateFunction, measures }) {
const timestamp = requirePluginFunction(timestampFunction);

View File

@@ -70,15 +70,20 @@ module.exports = {
if (message) {
const json = safeJsonParse(message.message);
if (json) logger.info(json);
if (json) logger.log(json);
else logger.info(message.message);
socket.emit(`runner-info-${runid}`, {
const toEmit = {
time: new Date(),
severity: 'info',
...message,
message: json ? json.msg : message.message,
});
};
if (json && json.level >= 50) {
toEmit.severity = 'error';
}
socket.emit(`runner-info-${runid}`, toEmit);
}
},
@@ -125,8 +130,9 @@ module.exports = {
},
}
);
const pipeDispatcher = severity => data =>
this.dispatchMessage(runid, { severity, message: data.toString().trim() });
const pipeDispatcher = severity => data => {
return this.dispatchMessage(runid, { severity, message: data.toString().trim() });
};
byline(subprocess.stdout).on('data', pipeDispatcher('info'));
byline(subprocess.stderr).on('data', pipeDispatcher('error'));

View File

@@ -271,7 +271,7 @@ async function handleSqlPreview({ msgid, objects, options }) {
process.send({ msgtype: 'response', msgid, sql: dmp.s, isTruncated: generator.isTruncated });
if (generator.isUnhandledException) {
setTimeout(() => {
getLogger.info('Exiting because of unhandled exception');
logger.error('Exiting because of unhandled exception');
process.exit(0);
}, 500);
}

View File

@@ -9,9 +9,18 @@ const copyStream = require('./copyStream');
const jsonLinesReader = require('./jsonLinesReader');
const { resolveArchiveFolder } = require('../utility/directories');
async function dataDuplicator({ connection, archive, items, analysedStructure = null }) {
const driver = requireEngineDriver(connection);
const pool = await connectUtility(driver, connection, 'write');
async function dataDuplicator({
connection,
archive,
items,
options,
analysedStructure = null,
driver,
systemConnection,
}) {
if (!driver) driver = requireEngineDriver(connection);
const pool = systemConnection || (await connectUtility(driver, connection, 'write'));
logger.info(`Connected.`);
if (!analysedStructure) {
@@ -26,10 +35,13 @@ async function dataDuplicator({ connection, archive, items, analysedStructure =
name: item.name,
operation: item.operation,
matchColumns: item.matchColumns,
openStream: () => jsonLinesReader({ fileName: path.join(resolveArchiveFolder(archive), `${item.name}.jsonl`) }),
openStream:
item.openStream ||
(() => jsonLinesReader({ fileName: path.join(resolveArchiveFolder(archive), `${item.name}.jsonl`) })),
})),
stream,
copyStream
copyStream,
options
);
await dupl.run();

View File

@@ -1,18 +1,26 @@
const stream = require('stream');
async function fakeObjectReader({ delay = 0 } = {}) {
async function fakeObjectReader({ delay = 0, dynamicData = null } = {}) {
const pass = new stream.PassThrough({
objectMode: true,
});
function doWrite() {
pass.write({ columns: [{ columnName: 'id' }, { columnName: 'country' }], __isStreamHeader: true });
pass.write({ id: 1, country: 'Czechia' });
pass.write({ id: 2, country: 'Austria' });
pass.write({ country: 'Germany', id: 3 });
pass.write({ country: 'Romania', id: 4 });
pass.write({ country: 'Great Britain', id: 5 });
pass.write({ country: 'Bosna, Hecegovina', id: 6 });
pass.end();
if (dynamicData) {
pass.write({ __isStreamHeader: true, __isDynamicStructure: true });
for (const item of dynamicData) {
pass.write(item);
}
pass.end();
} else {
pass.write({ columns: [{ columnName: 'id' }, { columnName: 'country' }], __isStreamHeader: true });
pass.write({ id: 1, country: 'Czechia' });
pass.write({ id: 2, country: 'Austria' });
pass.write({ country: 'Germany', id: 3 });
pass.write({ country: 'Romania', id: 4 });
pass.write({ country: 'Great Britain', id: 5 });
pass.write({ country: 'Bosna, Hecegovina', id: 6 });
pass.end();
}
}
if (delay) {

View File

@@ -12,7 +12,9 @@ class StringifyStream extends stream.Transform {
_transform(chunk, encoding, done) {
let skip = false;
if (!this.wasHeader) {
skip = (chunk.__isStreamHeader && !this.header) || (chunk.__isStreamHeader && chunk.__isDynamicStructure);
skip =
(chunk.__isStreamHeader && !this.header) ||
(chunk.__isStreamHeader && chunk.__isDynamicStructure && !chunk.__keepDynamicStreamHeader);
this.wasHeader = true;
}
if (!skip) {

View File

@@ -2,7 +2,7 @@ const fs = require('fs');
const _ = require('lodash');
const stream = require('stream');
const byline = require('byline');
const { getLogger } = require('dbgate-tools');
const { getLogger, processJsonDataUpdateCommands, removeTablePairingId } = require('dbgate-tools');
const logger = getLogger('modifyJsonLinesReader');
const stableStringify = require('json-stable-stringify');
@@ -11,6 +11,7 @@ class ParseStream extends stream.Transform {
super({ objectMode: true });
this.limitRows = limitRows;
this.changeSet = changeSet;
this.wasHeader = false;
this.currentRowIndex = 0;
if (mergeMode == 'merge') {
if (mergedRows && mergeKey) {
@@ -28,12 +29,28 @@ class ParseStream extends stream.Transform {
_transform(chunk, encoding, done) {
let obj = JSON.parse(chunk);
if (obj.__isStreamHeader) {
this.push(obj);
if (this.changeSet && this.changeSet.structure) {
this.push({
...removeTablePairingId(this.changeSet.structure),
__isStreamHeader: true,
});
} else {
this.push(obj);
}
this.wasHeader = true;
done();
return;
}
if (this.changeSet) {
if (!this.wasHeader && this.changeSet.structure) {
this.push({
...removeTablePairingId(this.changeSet.structure),
__isStreamHeader: true,
});
this.wasHeader = true;
}
if (!this.limitRows || this.currentRowIndex < this.limitRows) {
if (this.changeSet.deletes.find(x => x.existingRowIndex == this.currentRowIndex)) {
obj = null;
@@ -41,13 +58,20 @@ class ParseStream extends stream.Transform {
const update = this.changeSet.updates.find(x => x.existingRowIndex == this.currentRowIndex);
if (update) {
obj = {
...obj,
...update.fields,
};
if (update.document) {
obj = update.document;
} else {
obj = {
...obj,
...update.fields,
};
}
}
if (obj) {
if (this.changeSet.dataUpdateCommands) {
obj = processJsonDataUpdateCommands(obj, this.changeSet.dataUpdateCommands);
}
this.push(obj);
}
this.currentRowIndex += 1;

View File

@@ -11,7 +11,7 @@ async function runScript(func) {
await func();
process.exit(0);
} catch (err) {
logger.error('Error running script', err);
logger.error({ err }, `Error running script: ${err.message}`);
process.exit(1);
}
}

View File

@@ -2,7 +2,6 @@ const fs = require('fs');
const os = require('os');
const rimraf = require('rimraf');
const path = require('path');
const lineReader = require('line-reader');
const AsyncLock = require('async-lock');
const lock = new AsyncLock();
const stableStringify = require('json-stable-stringify');
@@ -11,23 +10,7 @@ const requirePluginFunction = require('./requirePluginFunction');
const esort = require('external-sorting');
const uuidv1 = require('uuid/v1');
const { jsldir } = require('./directories');
function fetchNextLineFromReader(reader) {
return new Promise((resolve, reject) => {
if (!reader.hasNextLine()) {
resolve(null);
return;
}
reader.nextLine((err, line) => {
if (err) {
reject(err);
} else {
resolve(line);
}
});
});
}
const LineReader = require('./LineReader');
class JsonLinesDatastore {
constructor(file, formatterFunction) {
@@ -74,7 +57,7 @@ class JsonLinesDatastore {
await new Promise(resolve => rimraf(tempDir, resolve));
}
_closeReader() {
async _closeReader() {
// console.log('CLOSING READER', this.reader);
if (!this.reader) return;
const reader = this.reader;
@@ -84,7 +67,7 @@ class JsonLinesDatastore {
// this.firstRowToBeReturned = null;
this.currentFilter = null;
this.currentSort = null;
return new Promise(resolve => reader.close(resolve));
await reader.close();
}
async notifyChanged(callback) {
@@ -100,12 +83,9 @@ class JsonLinesDatastore {
async _openReader(fileName) {
// console.log('OPENING READER', fileName);
// console.log(fs.readFileSync(fileName, 'utf-8'));
return new Promise((resolve, reject) =>
lineReader.open(fileName, (err, reader) => {
if (err) reject(err);
resolve(reader);
})
);
const fileStream = fs.createReadStream(fileName);
return new LineReader(fileStream);
}
parseLine(line) {
@@ -120,7 +100,7 @@ class JsonLinesDatastore {
// return res;
// }
for (;;) {
const line = await fetchNextLineFromReader(this.reader);
const line = await this.reader.readLine();
if (!line) {
// EOF
return null;
@@ -240,6 +220,7 @@ class JsonLinesDatastore {
// console.log(JSON.stringify(this.currentFilter, undefined, 2));
for (let i = 0; i < limit; i += 1) {
const line = await this._readLine(true);
// console.log('READED LINE', i);
if (line == null) break;
res.push(line);
}

View File

@@ -0,0 +1,88 @@
const readline = require('readline');
class Queue {
constructor() {
this.elements = {};
this.head = 0;
this.tail = 0;
}
enqueue(element) {
this.elements[this.tail] = element;
this.tail++;
}
dequeue() {
const item = this.elements[this.head];
delete this.elements[this.head];
this.head++;
return item;
}
peek() {
return this.elements[this.head];
}
getLength() {
return this.tail - this.head;
}
isEmpty() {
return this.getLength() === 0;
}
}
class LineReader {
constructor(input) {
this.input = input;
this.queue = new Queue();
this.resolve = null;
this.isEnded = false;
this.rl = readline.createInterface({
input,
});
this.input.pause();
this.rl.on('line', line => {
this.input.pause();
if (this.resolve) {
const resolve = this.resolve;
this.resolve = null;
resolve(line);
return;
}
this.queue.enqueue(line);
});
this.rl.on('close', () => {
if (this.resolve) {
const resolve = this.resolve;
this.resolve = null;
this.isEnded = true;
resolve(null);
return;
}
this.queue.enqueue(null);
});
}
readLine() {
if (this.isEnded) {
return Promise.resolve(null);
}
if (!this.queue.isEmpty()) {
const res = this.queue.dequeue();
if (res == null) this.isEnded = true;
return Promise.resolve(res);
}
this.input.resume();
return new Promise(resolve => {
this.resolve = resolve;
});
}
close() {
this.isEnded = true;
return new Promise(resolve => this.input.close(resolve));
}
}
module.exports = LineReader;

View File

@@ -42,18 +42,23 @@ function datadir() {
return dir;
}
const dirFunc = (dirname, clean) => () => {
const dir = path.join(datadir(), dirname);
ensureDirectory(dir, clean);
const dirFunc =
(dirname, clean, subdirs = []) =>
() => {
const dir = path.join(datadir(), dirname);
ensureDirectory(dir, clean);
for (const subdir of subdirs) {
ensureDirectory(path.join(dir, subdir), false);
}
return dir;
};
return dir;
};
const jsldir = dirFunc('jsl', true);
const rundir = dirFunc('run', true);
const uploadsdir = dirFunc('uploads', true);
const pluginsdir = dirFunc('plugins');
const archivedir = dirFunc('archive');
const archivedir = dirFunc('archive', false, ['default']);
const appdir = dirFunc('apps');
const filesdir = dirFunc('files');
const logsdir = dirFunc('logs', 3600 * 24 * 7);

View File

@@ -1,15 +0,0 @@
const fs = require('fs-extra');
async function saveFreeTableData(file, data) {
const { structure, rows } = data;
const fileStream = fs.createWriteStream(file);
await fileStream.write(JSON.stringify({ __isStreamHeader: true, ...structure }) + '\n');
for (const row of rows) {
await fileStream.write(JSON.stringify(row) + '\n');
}
await fileStream.close();
}
module.exports = {
saveFreeTableData,
};