diff --git a/plugins/dbgate-plugin-mongo/src/backend/createBulkInsertStream.js b/plugins/dbgate-plugin-mongo/src/backend/createBulkInsertStream.js index bfe19cb17..2259873cf 100644 --- a/plugins/dbgate-plugin-mongo/src/backend/createBulkInsertStream.js +++ b/plugins/dbgate-plugin-mongo/src/backend/createBulkInsertStream.js @@ -1,5 +1,6 @@ const ObjectId = require('mongodb').ObjectId; const { getLogger } = global.DBGATE_PACKAGES['dbgate-tools']; +const { EJSON } = require('bson'); const logger = getLogger('mongoBulkInsert'); @@ -26,7 +27,7 @@ function createBulkInsertStream(driver, stream, pool, name, options) { ...row, }; } - writable.buffer.push(row); + writable.buffer.push(EJSON.deserialize(row)); }; writable.checkStructure = async () => { diff --git a/plugins/dbgate-plugin-mongo/src/backend/driver.js b/plugins/dbgate-plugin-mongo/src/backend/driver.js index d9f4c5c4f..e7433cadf 100644 --- a/plugins/dbgate-plugin-mongo/src/backend/driver.js +++ b/plugins/dbgate-plugin-mongo/src/backend/driver.js @@ -245,8 +245,23 @@ const driver = { const db = await getScriptableDb(pool); exprValue = func(db, ObjectId.createFromHexString); + const pass = new stream.PassThrough({ + objectMode: true, + highWaterMark: 100, + }); + + exprValue + .forEach((row) => pass.write(transformMongoData(row))) + .then(() => { + pass.end(); + // pass.end(() => { + // pass.emit('end'); + // }) + }); + + return pass; // return directly stream without header row - return exprValue.stream(); + // return exprValue.stream(); // pass.write(structure || { __isDynamicStructure: true }); // exprValue.on('data', (row) => pass.write(row));