mognoDB bigint support

This commit is contained in:
SPRINX0\prochazka
2025-05-06 12:51:37 +02:00
parent b9d4197b5c
commit a71c4fe7ec
2 changed files with 36 additions and 24 deletions

View File

@@ -590,11 +590,14 @@ export function jsonLinesParse(jsonLines: string): any[] {
.filter(x => x); .filter(x => x);
} }
export function serializeJsTypesForJsonStringify(obj) { export function serializeJsTypesForJsonStringify(obj, replacer = null) {
return _cloneDeepWith(obj, value => { return _cloneDeepWith(obj, value => {
if (typeof value === 'bigint') { if (typeof value === 'bigint') {
return { $bigint: value.toString() }; return { $bigint: value.toString() };
} }
if (replacer) {
return replacer(value);
}
}); });
} }

View File

@@ -3,9 +3,9 @@ const stream = require('stream');
const isPromise = require('is-promise'); const isPromise = require('is-promise');
const driverBase = require('../frontend/driver'); const driverBase = require('../frontend/driver');
const Analyser = require('./Analyser'); const Analyser = require('./Analyser');
const { MongoClient, ObjectId, AbstractCursor } = require('mongodb'); const { MongoClient, ObjectId, AbstractCursor, Long } = require('mongodb');
const { EJSON } = require('bson'); const { EJSON } = require('bson');
const { serializeJsTypesForJsonStringify } = require('dbgate-tools'); const { serializeJsTypesForJsonStringify, deserializeJsTypesFromJsonParse } = require('dbgate-tools');
const createBulkInsertStream = require('./createBulkInsertStream'); const createBulkInsertStream = require('./createBulkInsertStream');
const { const {
convertToMongoCondition, convertToMongoCondition,
@@ -13,21 +13,30 @@ const {
convertToMongoSort, convertToMongoSort,
} = require('../frontend/convertToMongoCondition'); } = require('../frontend/convertToMongoCondition');
function transformMongoData(row) { function serializeMongoData(row) {
// TODO process LONG type return EJSON.serialize(
// console.log('ROW', row); serializeJsTypesForJsonStringify(row, (value) => {
return EJSON.serialize(serializeJsTypesForJsonStringify(row)); if (value instanceof Long) {
if (Number.isSafeInteger(value.toNumber())) {
return value.toNumber();
}
return {
$bigint: value.toString(),
};
}
})
);
} }
async function readCursor(cursor, options) { async function readCursor(cursor, options) {
options.recordset({ __isDynamicStructure: true }); options.recordset({ __isDynamicStructure: true });
await cursor.forEach((row) => { await cursor.forEach((row) => {
options.row(transformMongoData(row)); options.row(serializeMongoData(row));
}); });
} }
function convertObjectId(condition) { function deserializeMongoData(value) {
return EJSON.deserialize(condition); return deserializeJsTypesFromJsonParse(EJSON.deserialize(value));
} }
function findArrayResult(resValue) { function findArrayResult(resValue) {
@@ -266,7 +275,7 @@ const driver = {
const cursorStream = exprValue.stream(); const cursorStream = exprValue.stream();
cursorStream.on('data', (row) => { cursorStream.on('data', (row) => {
pass.write(transformMongoData(row)); pass.write(serializeMongoData(row));
}); });
// propagate error // propagate error
@@ -320,26 +329,26 @@ const driver = {
const collection = dbhan.getDatabase().collection(options.pureName); const collection = dbhan.getDatabase().collection(options.pureName);
if (options.countDocuments) { if (options.countDocuments) {
const count = await collection.countDocuments(convertObjectId(mongoCondition) || {}); const count = await collection.countDocuments(deserializeMongoData(mongoCondition) || {});
return { count }; return { count };
} else if (options.aggregate) { } else if (options.aggregate) {
let cursor = await collection.aggregate(convertObjectId(convertToMongoAggregate(options.aggregate))); let cursor = await collection.aggregate(deserializeMongoData(convertToMongoAggregate(options.aggregate)));
const rows = await cursor.toArray(); const rows = await cursor.toArray();
return { return {
rows: rows.map(transformMongoData).map((x) => ({ rows: rows.map(serializeMongoData).map((x) => ({
...x._id, ...x._id,
..._.omit(x, ['_id']), ..._.omit(x, ['_id']),
})), })),
}; };
} else { } else {
// console.log('options.condition', JSON.stringify(options.condition, undefined, 2)); // console.log('options.condition', JSON.stringify(options.condition, undefined, 2));
let cursor = await collection.find(convertObjectId(mongoCondition) || {}); let cursor = await collection.find(deserializeMongoData(mongoCondition) || {});
if (options.sort) cursor = cursor.sort(convertToMongoSort(options.sort)); if (options.sort) cursor = cursor.sort(convertToMongoSort(options.sort));
if (options.skip) cursor = cursor.skip(options.skip); if (options.skip) cursor = cursor.skip(options.skip);
if (options.limit) cursor = cursor.limit(options.limit); if (options.limit) cursor = cursor.limit(options.limit);
const rows = await cursor.toArray(); const rows = await cursor.toArray();
return { return {
rows: rows.map(transformMongoData), rows: rows.map(serializeMongoData),
}; };
} }
} catch (err) { } catch (err) {
@@ -361,7 +370,7 @@ const driver = {
...insert.document, ...insert.document,
...insert.fields, ...insert.fields,
}; };
const resdoc = await collection.insertOne(convertObjectId(document)); const resdoc = await collection.insertOne(deserializeMongoData(document));
res.inserted.push(resdoc._id); res.inserted.push(resdoc._id);
} }
for (const update of changeSet.updates) { for (const update of changeSet.updates) {
@@ -371,16 +380,16 @@ const driver = {
...update.document, ...update.document,
...update.fields, ...update.fields,
}; };
const doc = await collection.findOne(convertObjectId(update.condition)); const doc = await collection.findOne(deserializeMongoData(update.condition));
if (doc) { if (doc) {
const resdoc = await collection.replaceOne(convertObjectId(update.condition), { const resdoc = await collection.replaceOne(deserializeMongoData(update.condition), {
...convertObjectId(document), ...deserializeMongoData(document),
_id: doc._id, _id: doc._id,
}); });
res.replaced.push(resdoc._id); res.replaced.push(resdoc._id);
} }
} else { } else {
const set = convertObjectId(_.pickBy(update.fields, (v, k) => !v?.$$undefined$$)); const set = deserializeMongoData(_.pickBy(update.fields, (v, k) => !v?.$$undefined$$));
const unset = _.fromPairs( const unset = _.fromPairs(
Object.keys(update.fields) Object.keys(update.fields)
.filter((k) => update.fields[k]?.$$undefined$$) .filter((k) => update.fields[k]?.$$undefined$$)
@@ -390,13 +399,13 @@ const driver = {
if (!_.isEmpty(set)) updates.$set = set; if (!_.isEmpty(set)) updates.$set = set;
if (!_.isEmpty(unset)) updates.$unset = unset; if (!_.isEmpty(unset)) updates.$unset = unset;
const resdoc = await collection.updateOne(convertObjectId(update.condition), updates); const resdoc = await collection.updateOne(deserializeMongoData(update.condition), updates);
res.updated.push(resdoc._id); res.updated.push(resdoc._id);
} }
} }
for (const del of changeSet.deletes) { for (const del of changeSet.deletes) {
const collection = db.collection(del.pureName); const collection = db.collection(del.pureName);
const resdoc = await collection.deleteOne(convertObjectId(del.condition)); const resdoc = await collection.deleteOne(deserializeMongoData(del.condition));
res.deleted.push(resdoc._id); res.deleted.push(resdoc._id);
} }
return res; return res;
@@ -452,7 +461,7 @@ const driver = {
]); ]);
const rows = await cursor.toArray(); const rows = await cursor.toArray();
return _.uniqBy( return _.uniqBy(
rows.map(transformMongoData).map(({ _id }) => { rows.map(serializeMongoData).map(({ _id }) => {
if (_.isArray(_id) || _.isPlainObject(_id)) return { value: null }; if (_.isArray(_id) || _.isPlainObject(_id)) return { value: null };
return { value: _id }; return { value: _id };
}), }),