feat: transform rows suport for json lines reader

This commit is contained in:
Pavel
2025-06-05 16:28:50 +02:00
parent e4cc4b6f58
commit b74b6b3284
2 changed files with 14 additions and 7 deletions

View File

@@ -17,8 +17,9 @@ const copyStream = require('./copyStream');
* @param {object} options.driver - driver object. If not provided, it will be loaded from connection * @param {object} options.driver - driver object. If not provided, it will be loaded from connection
* @param {string} options.folder - folder with model files (YAML files for tables, SQL files for views, procedures, ...) * @param {string} options.folder - folder with model files (YAML files for tables, SQL files for views, procedures, ...)
* @param {function[]} options.modelTransforms - array of functions for transforming model * @param {function[]} options.modelTransforms - array of functions for transforming model
* @param {((row: Record<string, any>) => Record<string, any>) | undefined} options.transformRow - function to transform each row
*/ */
async function importDbFromFolder({ connection, systemConnection, driver, folder, modelTransforms }) { async function importDbFromFolder({ connection, systemConnection, driver, folder, modelTransforms, transformRow }) {
if (!driver) driver = requireEngineDriver(connection); if (!driver) driver = requireEngineDriver(connection);
const dbhan = systemConnection || (await connectUtility(driver, connection, 'read')); const dbhan = systemConnection || (await connectUtility(driver, connection, 'read'));
@@ -77,7 +78,7 @@ async function importDbFromFolder({ connection, systemConnection, driver, folder
for (const table of modelAdapted.tables) { for (const table of modelAdapted.tables) {
const fileName = path.join(folder, `${table.pureName}.jsonl`); const fileName = path.join(folder, `${table.pureName}.jsonl`);
if (await fs.exists(fileName)) { if (await fs.exists(fileName)) {
const src = await jsonLinesReader({ fileName }); const src = await jsonLinesReader({ fileName, transformRow });
const dst = await tableWriter({ const dst = await tableWriter({
systemConnection: dbhan, systemConnection: dbhan,
pureName: table.pureName, pureName: table.pureName,
@@ -105,7 +106,7 @@ async function importDbFromFolder({ connection, systemConnection, driver, folder
for (const file of fs.readdirSync(folder)) { for (const file of fs.readdirSync(folder)) {
if (!file.endsWith('.jsonl')) continue; if (!file.endsWith('.jsonl')) continue;
const pureName = path.parse(file).name; const pureName = path.parse(file).name;
const src = await jsonLinesReader({ fileName: path.join(folder, file) }); const src = await jsonLinesReader({ fileName: path.join(folder, file), transformRow });
const dst = await tableWriter({ const dst = await tableWriter({
systemConnection: dbhan, systemConnection: dbhan,
pureName, pureName,

View File

@@ -6,10 +6,11 @@ const download = require('./download');
const logger = getLogger('jsonLinesReader'); const logger = getLogger('jsonLinesReader');
class ParseStream extends stream.Transform { class ParseStream extends stream.Transform {
constructor({ limitRows }) { constructor({ limitRows, transformRow }) {
super({ objectMode: true }); super({ objectMode: true });
this.wasHeader = false; this.wasHeader = false;
this.limitRows = limitRows; this.limitRows = limitRows;
this.transformRow = transformRow;
this.rowsWritten = 0; this.rowsWritten = 0;
} }
_transform(chunk, encoding, done) { _transform(chunk, encoding, done) {
@@ -26,7 +27,11 @@ class ParseStream extends stream.Transform {
this.wasHeader = true; this.wasHeader = true;
} }
if (!this.limitRows || this.rowsWritten < this.limitRows) { if (!this.limitRows || this.rowsWritten < this.limitRows) {
this.push(obj); if (this.transformRow) {
this.push(this.transformRow(obj));
} else {
this.push(obj);
}
this.rowsWritten += 1; this.rowsWritten += 1;
} }
done(); done();
@@ -39,9 +44,10 @@ class ParseStream extends stream.Transform {
* @param {string} options.fileName - file name or URL * @param {string} options.fileName - file name or URL
* @param {string} options.encoding - encoding of the file * @param {string} options.encoding - encoding of the file
* @param {number} options.limitRows - maximum number of rows to read * @param {number} options.limitRows - maximum number of rows to read
* @param {((row: Record<string, any>) => Record<string, any>) | undefined} options.transformRow - function to transform each row
* @returns {Promise<readerType>} - reader object * @returns {Promise<readerType>} - reader object
*/ */
async function jsonLinesReader({ fileName, encoding = 'utf-8', limitRows = undefined }) { async function jsonLinesReader({ fileName, encoding = 'utf-8', limitRows = undefined, transformRow }) {
logger.info(`Reading file ${fileName}`); logger.info(`Reading file ${fileName}`);
const downloadedFile = await download(fileName); const downloadedFile = await download(fileName);
@@ -52,7 +58,7 @@ async function jsonLinesReader({ fileName, encoding = 'utf-8', limitRows = undef
encoding encoding
); );
const liner = byline(fileStream); const liner = byline(fileStream);
const parser = new ParseStream({ limitRows }); const parser = new ParseStream({ limitRows, transformRow });
return [liner, parser]; return [liner, parser];
} }