mssql: support readableStream (missing drained pause+resume+drained support)

This commit is contained in:
Jan Prochazka
2020-06-07 19:06:21 +02:00
parent 22fa92520d
commit 81738487d7
6 changed files with 54 additions and 6 deletions

View File

@@ -8,7 +8,7 @@ async function queryReader({ connection, sql }) {
const driver = engines(connection);
const pool = await driverConnect(driver, connection);
console.log(`Connected.`);
return driver.readableStream(pool, sql);
return await driver.readableStream(pool, sql);
}
module.exports = queryReader;

View File

@@ -2,12 +2,16 @@ const mssql = require('mssql');
const mysql = require('mysql');
const pg = require('pg');
const pgQueryStream = require('pg-query-stream');
const fs = require('fs');
const stream = require('stream');
const nativeModules = {
mssql,
mysql,
pg,
pgQueryStream,
fs,
stream,
};
function driverConnect(driver, connection) {

View File

@@ -154,6 +154,27 @@ const driver = {
return request;
},
async readableStream(pool, sql) {
const request = await pool.request();
const { stream } = pool._nativeModules;
const pass = new stream.PassThrough({
objectMode: true,
highWaterMark: 100,
});
request.stream = true;
request.on('row', (row) => pass.write(row));
request.on('error', (err) => {
console.error(err);
pass.end();
});
request.on('done', () => pass.end());
request.query(sql);
return pass;
},
async getVersion(pool) {
const { version } = (await this.query(pool, 'SELECT @@VERSION AS version')).rows[0];
return { version };

View File

@@ -84,7 +84,7 @@ const driver = {
return query;
},
readableStream(connection, sql) {
async readableStream(connection, sql) {
const query = connection.query(sql);
return query.stream({ highWaterMark: 100 });
},

View File

@@ -17,7 +17,7 @@ export interface EngineDriver {
connect(nativeModules, { server, port, user, password, database }): any;
query(pool: any, sql: string): Promise<QueryResult>;
stream(pool: any, sql: string, options: StreamOptions);
readableStream(pool: any, sql: string): stream.Readable;
readableStream(pool: any, sql: string): Promise<stream.Readable>;
getVersion(pool: any): Promise<{ version: string }>;
listDatabases(
pool: any

View File

@@ -5,7 +5,8 @@ import { DropDownMenuItem } from '../modals/DropDownMenu';
import { openNewTab } from '../utility/common';
import { getConnectionInfo } from '../utility/metadataLoaders';
import fullDisplayName from '../utility/fullDisplayName';
import { filterName } from '@dbgate/datalib';
import { filterName, fullNameToString } from '@dbgate/datalib';
import ImportExportModal from '../modals/ImportExportModal';
const icons = {
tables: 'table2.svg',
@@ -28,6 +29,10 @@ const menus = {
label: 'Show CREATE TABLE script',
sqlTemplate: 'CREATE TABLE',
},
{
label: 'Export',
isExport: true,
},
],
views: [
{
@@ -38,6 +43,10 @@ const menus = {
label: 'Show CREATE VIEW script',
sqlTemplate: 'CREATE OBJECT',
},
{
label: 'Export',
isExport: true,
},
],
procedures: [
{
@@ -90,14 +99,28 @@ async function openObjectDetail(
});
}
function Menu({ data, makeAppObj, setOpenedTabs }) {
function Menu({ data, makeAppObj, setOpenedTabs, showModal }) {
return (
<>
{menus[data.objectTypeField].map((menu) => (
<DropDownMenuItem
key={menu.label}
onClick={() => {
openObjectDetail(setOpenedTabs, menu.tab, menu.sqlTemplate, data);
if (menu.isExport) {
showModal((modalState) => (
<ImportExportModal
modalState={modalState}
initialValues={{
sourceStorageType: 'database',
sourceConnectionId: data.conid,
sourceDatabaseName: data.database,
sourceTables: [fullNameToString(data)],
}}
/>
));
} else {
openObjectDetail(setOpenedTabs, menu.tab, menu.sqlTemplate, data);
}
}}
>
{menu.label}