query splitter refactor

This commit is contained in:
Jan Prochazka
2021-06-03 11:27:49 +02:00
parent a76e742ce6
commit 0c48a5ee09
26 changed files with 203 additions and 172 deletions

View File

@@ -23,56 +23,6 @@ function zipDataRow(rowArray, columns) {
);
}
async function runStreamItem(client, sql, options) {
return new Promise((resolve, reject) => {
const query = new pg.Query({
text: sql,
rowMode: 'array',
});
let wasHeader = false;
query.on('row', row => {
if (!wasHeader) {
columns = extractPostgresColumns(query._result);
if (columns && columns.length > 0) {
options.recordset(columns);
}
wasHeader = true;
}
options.row(zipDataRow(row, columns));
});
query.on('end', () => {
if (!wasHeader) {
columns = extractPostgresColumns(query._result);
if (columns && columns.length > 0) {
options.recordset(columns);
}
wasHeader = true;
}
resolve();
});
query.on('error', error => {
console.log('ERROR', error);
const { message, lineNumber, procName } = error;
options.info({
message,
line: lineNumber,
procedure: procName,
time: new Date(),
severity: 'error',
});
resolve();
});
client.query(query);
});
}
/** @type {import('dbgate-types').EngineDriver} */
const drivers = driverBases.map(driverBase => ({
...driverBase,
@@ -127,21 +77,52 @@ const drivers = driverBases.map(driverBase => ({
const columns = extractPostgresColumns(res);
return { rows: res.rows.map(row => zipDataRow(row, columns)), columns };
},
async stream(client, sql, options) {
let sqlSplitted;
try {
sqlSplitted = identify(sql, { dialect: 'psql', strict: false });
} catch (e) {
// workaround
sqlSplitted = [{ text: sql }];
}
stream(client, sql, options) {
const query = new pg.Query({
text: sql,
rowMode: 'array',
});
for (const sqlItem of sqlSplitted) {
await runStreamItem(client, sqlItem.text, options);
}
let wasHeader = false;
options.done();
// return stream;
query.on('row', row => {
if (!wasHeader) {
columns = extractPostgresColumns(query._result);
if (columns && columns.length > 0) {
options.recordset(columns);
}
wasHeader = true;
}
options.row(zipDataRow(row, columns));
});
query.on('end', () => {
if (!wasHeader) {
columns = extractPostgresColumns(query._result);
if (columns && columns.length > 0) {
options.recordset(columns);
}
wasHeader = true;
}
options.done();
});
query.on('error', error => {
console.log('ERROR', error);
const { message, lineNumber, procName } = error;
options.info({
message,
line: lineNumber,
procedure: procName,
time: new Date(),
severity: 'error',
});
options.done();
});
client.query(query);
},
async getVersion(client) {
const { rows } = await this.query(client, 'SELECT version()');