diff --git a/packages/api/src/shell/importDbFromFolder.js b/packages/api/src/shell/importDbFromFolder.js index 73b3c3456..f604ce9e0 100644 --- a/packages/api/src/shell/importDbFromFolder.js +++ b/packages/api/src/shell/importDbFromFolder.js @@ -17,8 +17,9 @@ const copyStream = require('./copyStream'); * @param {object} options.driver - driver object. If not provided, it will be loaded from connection * @param {string} options.folder - folder with model files (YAML files for tables, SQL files for views, procedures, ...) * @param {function[]} options.modelTransforms - array of functions for transforming model + * @param {((row: Record) => Record) | undefined} options.transformRow - function to transform each row */ -async function importDbFromFolder({ connection, systemConnection, driver, folder, modelTransforms }) { +async function importDbFromFolder({ connection, systemConnection, driver, folder, modelTransforms, transformRow }) { if (!driver) driver = requireEngineDriver(connection); const dbhan = systemConnection || (await connectUtility(driver, connection, 'read')); @@ -77,7 +78,7 @@ async function importDbFromFolder({ connection, systemConnection, driver, folder for (const table of modelAdapted.tables) { const fileName = path.join(folder, `${table.pureName}.jsonl`); if (await fs.exists(fileName)) { - const src = await jsonLinesReader({ fileName }); + const src = await jsonLinesReader({ fileName, transformRow }); const dst = await tableWriter({ systemConnection: dbhan, pureName: table.pureName, @@ -105,7 +106,7 @@ async function importDbFromFolder({ connection, systemConnection, driver, folder for (const file of fs.readdirSync(folder)) { if (!file.endsWith('.jsonl')) continue; const pureName = path.parse(file).name; - const src = await jsonLinesReader({ fileName: path.join(folder, file) }); + const src = await jsonLinesReader({ fileName: path.join(folder, file), transformRow }); const dst = await tableWriter({ systemConnection: dbhan, pureName, diff --git a/packages/api/src/shell/jsonLinesReader.js b/packages/api/src/shell/jsonLinesReader.js index da3d5d85d..1cf9cb578 100644 --- a/packages/api/src/shell/jsonLinesReader.js +++ b/packages/api/src/shell/jsonLinesReader.js @@ -6,10 +6,11 @@ const download = require('./download'); const logger = getLogger('jsonLinesReader'); class ParseStream extends stream.Transform { - constructor({ limitRows }) { + constructor({ limitRows, transformRow }) { super({ objectMode: true }); this.wasHeader = false; this.limitRows = limitRows; + this.transformRow = transformRow; this.rowsWritten = 0; } _transform(chunk, encoding, done) { @@ -26,7 +27,11 @@ class ParseStream extends stream.Transform { this.wasHeader = true; } if (!this.limitRows || this.rowsWritten < this.limitRows) { - this.push(obj); + if (this.transformRow) { + this.push(this.transformRow(obj)); + } else { + this.push(obj); + } this.rowsWritten += 1; } done(); @@ -39,9 +44,10 @@ class ParseStream extends stream.Transform { * @param {string} options.fileName - file name or URL * @param {string} options.encoding - encoding of the file * @param {number} options.limitRows - maximum number of rows to read + * @param {((row: Record) => Record) | undefined} options.transformRow - function to transform each row * @returns {Promise} - reader object */ -async function jsonLinesReader({ fileName, encoding = 'utf-8', limitRows = undefined }) { +async function jsonLinesReader({ fileName, encoding = 'utf-8', limitRows = undefined, transformRow }) { logger.info(`Reading file ${fileName}`); const downloadedFile = await download(fileName); @@ -52,7 +58,7 @@ async function jsonLinesReader({ fileName, encoding = 'utf-8', limitRows = undef encoding ); const liner = byline(fileStream); - const parser = new ParseStream({ limitRows }); + const parser = new ParseStream({ limitRows, transformRow }); return [liner, parser]; }