fixed read mongo query #312

This commit is contained in:
Jan Prochazka
2022-07-14 07:27:38 +02:00
parent e0376a708c
commit dc31552f9e

View File

@@ -5,19 +5,17 @@ const driverBase = require('../frontend/driver');
const Analyser = require('./Analyser'); const Analyser = require('./Analyser');
const MongoClient = require('mongodb').MongoClient; const MongoClient = require('mongodb').MongoClient;
const ObjectId = require('mongodb').ObjectId; const ObjectId = require('mongodb').ObjectId;
const Cursor = require('mongodb').Cursor; const AbstractCursor = require('mongodb').AbstractCursor;
const createBulkInsertStream = require('./createBulkInsertStream'); const createBulkInsertStream = require('./createBulkInsertStream');
function transformMongoData(row) { function transformMongoData(row) {
return _.mapValues(row, (v) => (v && v.constructor == ObjectId ? { $oid: v.toString() } : v)); return _.mapValues(row, (v) => (v && v.constructor == ObjectId ? { $oid: v.toString() } : v));
} }
function readCursor(cursor, options) { async function readCursor(cursor, options) {
return new Promise((resolve) => {
options.recordset({ __isDynamicStructure: true }); options.recordset({ __isDynamicStructure: true });
await cursor.forEach((row) => {
cursor.on('data', (data) => options.row(transformMongoData(data))); options.row(transformMongoData(row));
cursor.on('end', () => resolve());
}); });
} }
@@ -118,7 +116,7 @@ const driver = {
return; return;
} }
if (exprValue instanceof Cursor) { if (exprValue instanceof AbstractCursor) {
await readCursor(exprValue, options); await readCursor(exprValue, options);
} else if (isPromise(exprValue)) { } else if (isPromise(exprValue)) {
try { try {