mirror of
https://github.com/DeNNiiInc/dbgate.git
synced 2026-04-30 17:24:00 +00:00
feat: add recordset and row zipping for cassandra query
This commit is contained in:
@@ -4,11 +4,31 @@ const driverBase = require('../frontend/driver');
|
|||||||
const Analyser = require('./Analyser');
|
const Analyser = require('./Analyser');
|
||||||
const cassandra = require('cassandra-driver');
|
const cassandra = require('cassandra-driver');
|
||||||
const createCassandraBulkInsertStream = require('./createBulkInsertStream.js');
|
const createCassandraBulkInsertStream = require('./createBulkInsertStream.js');
|
||||||
|
const { makeUniqueColumnNames } = require('dbgate-tools');
|
||||||
|
|
||||||
function getTypeName(code) {
|
function getTypeName(code) {
|
||||||
return Object.keys(cassandra.types.dataTypes).find((key) => cassandra.types.dataTypes[key] === code);
|
return Object.keys(cassandra.types.dataTypes).find((key) => cassandra.types.dataTypes[key] === code);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function zipDataRow(row, header) {
|
||||||
|
const zippedRow = {};
|
||||||
|
|
||||||
|
for (let i = 0; i < header.length; i++) {
|
||||||
|
zippedRow[header[i].columnName] = row.get(i);
|
||||||
|
}
|
||||||
|
|
||||||
|
return zippedRow;
|
||||||
|
}
|
||||||
|
|
||||||
|
function extractCassandraColumns(row) {
|
||||||
|
if (!row) return [];
|
||||||
|
|
||||||
|
const columns = row.__columns.map((column) => ({ columnName: column.name }));
|
||||||
|
makeUniqueColumnNames(columns);
|
||||||
|
|
||||||
|
return columns;
|
||||||
|
}
|
||||||
|
|
||||||
/** @type {import('dbgate-types').EngineDriver<cassandra.Client>} */
|
/** @type {import('dbgate-types').EngineDriver<cassandra.Client>} */
|
||||||
const driver = {
|
const driver = {
|
||||||
...driverBase,
|
...driverBase,
|
||||||
@@ -70,10 +90,16 @@ const driver = {
|
|||||||
|
|
||||||
const strm = dbhan.client.stream(query);
|
const strm = dbhan.client.stream(query);
|
||||||
|
|
||||||
|
let header;
|
||||||
|
|
||||||
strm.on('readable', () => {
|
strm.on('readable', () => {
|
||||||
let row;
|
let row;
|
||||||
while ((row = strm.read())) {
|
while ((row = strm.read())) {
|
||||||
options.row(row);
|
if (!header) {
|
||||||
|
header = extractCassandraColumns(row);
|
||||||
|
options.recordset(header);
|
||||||
|
}
|
||||||
|
options.row(zipDataRow(row, header));
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user