diff --git a/packages/api/package.json b/packages/api/package.json index 4812aefd0..c2813e3e8 100644 --- a/packages/api/package.json +++ b/packages/api/package.json @@ -57,6 +57,7 @@ "rimraf": "^3.0.0", "simple-encryptor": "^4.0.0", "ssh2": "^1.11.0", + "stream-json": "^1.8.0", "tar": "^6.0.5" }, "scripts": { diff --git a/packages/api/src/shell/jsonReader.js b/packages/api/src/shell/jsonReader.js new file mode 100644 index 000000000..e29187738 --- /dev/null +++ b/packages/api/src/shell/jsonReader.js @@ -0,0 +1,58 @@ +const fs = require('fs'); +const stream = require('stream'); +const byline = require('byline'); +const { getLogger } = require('dbgate-tools'); +const logger = getLogger('jsonReader'); +const { parser } = require('stream-json'); +const { pick } = require('stream-json/filters/Pick'); +const { streamArray } = require('stream-json/streamers/StreamArray'); + +class ParseStream extends stream.Transform { + constructor({ limitRows }) { + super({ objectMode: true }); + this.wasHeader = false; + this.limitRows = limitRows; + this.rowsWritten = 0; + } + _transform(chunk, encoding, done) { + const obj = JSON.parse(chunk); + if (!this.wasHeader) { + this.push({ + __isStreamHeader: true, + __isDynamicStructure: true, + }); + + this.wasHeader = true; + } + if (!this.limitRows || this.rowsWritten < this.limitRows) { + this.push(obj.value); + this.rowsWritten += 1; + } + done(); + } +} + +async function jsonReader({ fileName, encoding = 'utf-8', limitRows = undefined }) { + logger.info(`Reading file ${fileName}`); + + const fileStream = fs.createReadStream( + fileName, + // @ts-ignore + encoding + ); + const parseJsonStream = parser(); + fileStream.pipe(parseJsonStream); + + const { streamArray } = require('stream-json/streamers/StreamArray'); + const streamArrayStream = streamArray(); + + parseJsonStream.pipe(streamArrayStream); + + const parseStream = new ParseStream({ limitRows }); + + streamArrayStream.pipe(parseStream); + + return parseStream; +} + +module.exports = jsonReader; diff --git a/packages/web/src/plugins/fileformats.ts b/packages/web/src/plugins/fileformats.ts index a65aef9df..700bba3ba 100644 --- a/packages/web/src/plugins/fileformats.ts +++ b/packages/web/src/plugins/fileformats.ts @@ -13,6 +13,7 @@ const jsonFormat = { storageType: 'json', extension: 'json', name: 'JSON', + readerFunc: 'jsonReader', writerFunc: 'jsonArrayWriter', }; diff --git a/yarn.lock b/yarn.lock index 824131276..66c1a2057 100644 --- a/yarn.lock +++ b/yarn.lock @@ -9745,6 +9745,18 @@ stoppable@^1.1.0: resolved "https://registry.yarnpkg.com/stoppable/-/stoppable-1.1.0.tgz#32da568e83ea488b08e4d7ea2c3bcc9d75015d5b" integrity sha512-KXDYZ9dszj6bzvnEMRYvxgeTHU74QBFL54XKtP3nyMuJ81CFYtABZ3bAzL2EdFUaEwJOBOgENyFj3R7oTzDyyw== +stream-chain@^2.2.5: + version "2.2.5" + resolved "https://registry.yarnpkg.com/stream-chain/-/stream-chain-2.2.5.tgz#b30967e8f14ee033c5b9a19bbe8a2cba90ba0d09" + integrity sha512-1TJmBx6aSWqZ4tx7aTpBDXK0/e2hhcNSTV8+CbFJtDjbb+I1mZ8lHit0Grw9GRT+6JbIrrDd8esncgBi8aBXGA== + +stream-json@^1.8.0: + version "1.8.0" + resolved "https://registry.yarnpkg.com/stream-json/-/stream-json-1.8.0.tgz#53f486b2e3b4496c506131f8d7260ba42def151c" + integrity sha512-HZfXngYHUAr1exT4fxlbc1IOce1RYxp2ldeaf97LYCOPSoOqY/1Psp7iGvpb+6JIOgkra9zDYnPX01hGAHzEPw== + dependencies: + stream-chain "^2.2.5" + stream-transform@^3.3.2: version "3.3.2" resolved "https://registry.yarnpkg.com/stream-transform/-/stream-transform-3.3.2.tgz#398c67b2f3b6ed5d04ceadde9e412bda8416c8ab"