diff --git a/packages/api/src/shell/copyStream.js b/packages/api/src/shell/copyStream.js index 353c7369c..60e49deb7 100644 --- a/packages/api/src/shell/copyStream.js +++ b/packages/api/src/shell/copyStream.js @@ -2,12 +2,17 @@ const EnsureStreamHeaderStream = require('../utility/EnsureStreamHeaderStream'); function copyStream(input, output) { return new Promise((resolve, reject) => { - const ensureHeader = new EnsureStreamHeaderStream(); const finisher = output['finisher'] || output; finisher.on('finish', resolve); finisher.on('error', reject); - input.pipe(ensureHeader); - ensureHeader.pipe(output); + + if (output.requireFixedStructure) { + const ensureHeader = new EnsureStreamHeaderStream(); + input.pipe(ensureHeader); + ensureHeader.pipe(output); + } else { + input.pipe(output); + } }); } diff --git a/packages/api/src/shell/jsonArrayWriter.js b/packages/api/src/shell/jsonArrayWriter.js index 3e631b30f..39e1831be 100644 --- a/packages/api/src/shell/jsonArrayWriter.js +++ b/packages/api/src/shell/jsonArrayWriter.js @@ -11,10 +11,7 @@ class StringifyStream extends stream.Transform { let skip = false; if (!this.wasHeader) { - skip = - chunk.__isStreamHeader || - // TODO remove isArray test - Array.isArray(chunk.columns); + skip = chunk.__isStreamHeader; this.wasHeader = true; } if (!skip) { diff --git a/packages/api/src/shell/jsonLinesReader.js b/packages/api/src/shell/jsonLinesReader.js index 55c9ca110..113a9a788 100644 --- a/packages/api/src/shell/jsonLinesReader.js +++ b/packages/api/src/shell/jsonLinesReader.js @@ -12,14 +12,14 @@ class ParseStream extends stream.Transform { _transform(chunk, encoding, done) { const obj = JSON.parse(chunk); if (!this.wasHeader) { - if ( - !obj.__isStreamHeader && - // TODO remove isArray test - !Array.isArray(obj.columns) - ) { - this.push({ columns: Object.keys(obj).map(columnName => ({ columnName })) }); + if (!obj.__isStreamHeader) { + this.push({ + __isStreamHeader: true, + __isDynamicStructure: true, + // columns: Object.keys(obj).map(columnName => ({ columnName })), + }); } - + this.wasHeader = true; } if (!this.limitRows || this.rowsWritten < this.limitRows) { diff --git a/packages/api/src/shell/jsonLinesWriter.js b/packages/api/src/shell/jsonLinesWriter.js index 4df935fa3..495c89560 100644 --- a/packages/api/src/shell/jsonLinesWriter.js +++ b/packages/api/src/shell/jsonLinesWriter.js @@ -10,11 +10,7 @@ class StringifyStream extends stream.Transform { _transform(chunk, encoding, done) { let skip = false; if (!this.wasHeader) { - skip = - (chunk.__isStreamHeader || - // TODO remove isArray test - Array.isArray(chunk.columns)) && - !this.header; + skip = (chunk.__isStreamHeader && !this.header) || (chunk.__isStreamHeader && chunk.__isDynamicStructure); this.wasHeader = true; } if (!skip) { diff --git a/packages/api/src/shell/sqlDataWriter.js b/packages/api/src/shell/sqlDataWriter.js index 50324db03..56d8d567f 100644 --- a/packages/api/src/shell/sqlDataWriter.js +++ b/packages/api/src/shell/sqlDataWriter.js @@ -14,11 +14,7 @@ class SqlizeStream extends stream.Transform { _transform(chunk, encoding, done) { let skip = false; if (!this.wasHeader) { - if ( - chunk.__isStreamHeader || - // TODO remove isArray test - Array.isArray(chunk.columns) - ) { + if (chunk.__isStreamHeader) { skip = true; this.tableName = chunk.pureName; if (chunk.engine) { diff --git a/packages/api/src/utility/EnsureStreamHeaderStream.js b/packages/api/src/utility/EnsureStreamHeaderStream.js index 8a0e26651..3ffa12b58 100644 --- a/packages/api/src/utility/EnsureStreamHeaderStream.js +++ b/packages/api/src/utility/EnsureStreamHeaderStream.js @@ -13,11 +13,7 @@ class EnsureStreamHeaderStream extends stream.Transform { return; } - if ( - !chunk.__isStreamHeader && - // TODO remove isArray test - !Array.isArray(chunk.columns) - ) { + if (!chunk.__isStreamHeader) { this.push({ __isStreamHeader: true, __isComputedStructure: true, diff --git a/packages/tools/src/createBulkInsertStreamBase.ts b/packages/tools/src/createBulkInsertStreamBase.ts index 068912c3d..37d0f6c5a 100644 --- a/packages/tools/src/createBulkInsertStreamBase.ts +++ b/packages/tools/src/createBulkInsertStreamBase.ts @@ -14,6 +14,7 @@ export function createBulkInsertStreamBase(driver, stream, pool, name, options): writable.buffer = []; writable.structure = null; writable.columnNames = null; + writable.requireFixedStructure = !driver.dialect.nosql; writable.addRow = async row => { if (writable.structure) { diff --git a/plugins/dbgate-plugin-mongo/src/backend/createBulkInsertStream.js b/plugins/dbgate-plugin-mongo/src/backend/createBulkInsertStream.js index abb1811cf..c54afbf2d 100644 --- a/plugins/dbgate-plugin-mongo/src/backend/createBulkInsertStream.js +++ b/plugins/dbgate-plugin-mongo/src/backend/createBulkInsertStream.js @@ -14,18 +14,13 @@ function createBulkInsertStream(driver, stream, pool, name, options) { writable.addRow = (row) => { if (!writable.wasHeader) { writable.wasHeader = true; - if ( - row.__isStreamHeader || - // TODO remove isArray test - Array.isArray(row.columns) - ) - return; + if (row.__isStreamHeader) return; } if (options.createStringId) { row = { _id: new ObjectId().toString(), ...row, - } + }; } writable.buffer.push(row); };