JSON object import

This commit is contained in:
Jan Prochazka
2024-09-17 12:52:53 +02:00
parent 503e09ddd1
commit fd8a28831e
2 changed files with 54 additions and 14 deletions

View File

@@ -89,8 +89,34 @@ test('JSON array import test', async () => {
const rows = await getReaderRows(reader); const rows = await getReaderRows(reader);
expect(rows.length).toEqual(2); expect(rows.length).toEqual(2);
expect(rows[0]).toEqual({ expect(rows).toEqual([
id: 1, { id: 1, val: 'v1' },
val: 'v1', { id: 2, val: 'v2' },
}); ]);
});
test('JSON object import test', async () => {
const jsonFileName = tmp.tmpNameSync();
fs.writeFileSync(
jsonFileName,
JSON.stringify({
k1: { id: 1, val: 'v1' },
k2: { id: 2, val: 'v2' },
})
);
const reader = await dbgateApi.jsonReader({
fileName: jsonFileName,
jsonStyle: 'object',
keyField: 'mykey',
});
const rows = await getReaderRows(reader);
expect(rows.length).toEqual(2);
expect(rows).toEqual([
{ mykey: 'k1', id: 1, val: 'v1' },
{ mykey: 'k2', id: 2, val: 'v2' },
]);
}); });

View File

@@ -6,12 +6,15 @@ const logger = getLogger('jsonReader');
const { parser } = require('stream-json'); const { parser } = require('stream-json');
const { pick } = require('stream-json/filters/Pick'); const { pick } = require('stream-json/filters/Pick');
const { streamArray } = require('stream-json/streamers/StreamArray'); const { streamArray } = require('stream-json/streamers/StreamArray');
const { streamObject } = require('stream-json/streamers/StreamObject');
class ParseStream extends stream.Transform { class ParseStream extends stream.Transform {
constructor({ limitRows }) { constructor({ limitRows, jsonStyle, keyField }) {
super({ objectMode: true }); super({ objectMode: true });
this.wasHeader = false; this.wasHeader = false;
this.limitRows = limitRows; this.limitRows = limitRows;
this.jsonStyle = jsonStyle;
this.keyField = keyField;
this.rowsWritten = 0; this.rowsWritten = 0;
} }
_transform(chunk, encoding, done) { _transform(chunk, encoding, done) {
@@ -24,14 +27,22 @@ class ParseStream extends stream.Transform {
this.wasHeader = true; this.wasHeader = true;
} }
if (!this.limitRows || this.rowsWritten < this.limitRows) { if (!this.limitRows || this.rowsWritten < this.limitRows) {
this.push(chunk.value); if (this.jsonStyle === 'object') {
this.push({
...chunk.value,
[this.keyField]: chunk.key,
});
} else {
this.push(chunk.value);
}
this.rowsWritten += 1; this.rowsWritten += 1;
} }
done(); done();
} }
} }
async function jsonReader({ fileName, encoding = 'utf-8', limitRows = undefined }) { async function jsonReader({ fileName, jsonStyle, keyField = '_key', encoding = 'utf-8', limitRows = undefined }) {
logger.info(`Reading file ${fileName}`); logger.info(`Reading file ${fileName}`);
const fileStream = fs.createReadStream( const fileStream = fs.createReadStream(
@@ -42,14 +53,17 @@ async function jsonReader({ fileName, encoding = 'utf-8', limitRows = undefined
const parseJsonStream = parser(); const parseJsonStream = parser();
fileStream.pipe(parseJsonStream); fileStream.pipe(parseJsonStream);
const { streamArray } = require('stream-json/streamers/StreamArray'); const parseStream = new ParseStream({ limitRows, jsonStyle, keyField });
const streamArrayStream = streamArray();
parseJsonStream.pipe(streamArrayStream); if (jsonStyle === 'object') {
const tramsformer = streamObject();
const parseStream = new ParseStream({ limitRows }); parseJsonStream.pipe(tramsformer);
tramsformer.pipe(parseStream);
streamArrayStream.pipe(parseStream); } else {
const tramsformer = streamArray();
parseJsonStream.pipe(tramsformer);
tramsformer.pipe(parseStream);
}
return parseStream; return parseStream;
} }