mirror of
https://github.com/DeNNiiInc/dbgate.git
synced 2026-04-20 22:55:59 +00:00
better usage of __isStreamHeader flag
This commit is contained in:
@@ -2,12 +2,17 @@ const EnsureStreamHeaderStream = require('../utility/EnsureStreamHeaderStream');
|
|||||||
|
|
||||||
function copyStream(input, output) {
|
function copyStream(input, output) {
|
||||||
return new Promise((resolve, reject) => {
|
return new Promise((resolve, reject) => {
|
||||||
const ensureHeader = new EnsureStreamHeaderStream();
|
|
||||||
const finisher = output['finisher'] || output;
|
const finisher = output['finisher'] || output;
|
||||||
finisher.on('finish', resolve);
|
finisher.on('finish', resolve);
|
||||||
finisher.on('error', reject);
|
finisher.on('error', reject);
|
||||||
input.pipe(ensureHeader);
|
|
||||||
ensureHeader.pipe(output);
|
if (output.requireFixedStructure) {
|
||||||
|
const ensureHeader = new EnsureStreamHeaderStream();
|
||||||
|
input.pipe(ensureHeader);
|
||||||
|
ensureHeader.pipe(output);
|
||||||
|
} else {
|
||||||
|
input.pipe(output);
|
||||||
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -11,10 +11,7 @@ class StringifyStream extends stream.Transform {
|
|||||||
let skip = false;
|
let skip = false;
|
||||||
|
|
||||||
if (!this.wasHeader) {
|
if (!this.wasHeader) {
|
||||||
skip =
|
skip = chunk.__isStreamHeader;
|
||||||
chunk.__isStreamHeader ||
|
|
||||||
// TODO remove isArray test
|
|
||||||
Array.isArray(chunk.columns);
|
|
||||||
this.wasHeader = true;
|
this.wasHeader = true;
|
||||||
}
|
}
|
||||||
if (!skip) {
|
if (!skip) {
|
||||||
|
|||||||
@@ -12,14 +12,14 @@ class ParseStream extends stream.Transform {
|
|||||||
_transform(chunk, encoding, done) {
|
_transform(chunk, encoding, done) {
|
||||||
const obj = JSON.parse(chunk);
|
const obj = JSON.parse(chunk);
|
||||||
if (!this.wasHeader) {
|
if (!this.wasHeader) {
|
||||||
if (
|
if (!obj.__isStreamHeader) {
|
||||||
!obj.__isStreamHeader &&
|
this.push({
|
||||||
// TODO remove isArray test
|
__isStreamHeader: true,
|
||||||
!Array.isArray(obj.columns)
|
__isDynamicStructure: true,
|
||||||
) {
|
// columns: Object.keys(obj).map(columnName => ({ columnName })),
|
||||||
this.push({ columns: Object.keys(obj).map(columnName => ({ columnName })) });
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
this.wasHeader = true;
|
this.wasHeader = true;
|
||||||
}
|
}
|
||||||
if (!this.limitRows || this.rowsWritten < this.limitRows) {
|
if (!this.limitRows || this.rowsWritten < this.limitRows) {
|
||||||
|
|||||||
@@ -10,11 +10,7 @@ class StringifyStream extends stream.Transform {
|
|||||||
_transform(chunk, encoding, done) {
|
_transform(chunk, encoding, done) {
|
||||||
let skip = false;
|
let skip = false;
|
||||||
if (!this.wasHeader) {
|
if (!this.wasHeader) {
|
||||||
skip =
|
skip = (chunk.__isStreamHeader && !this.header) || (chunk.__isStreamHeader && chunk.__isDynamicStructure);
|
||||||
(chunk.__isStreamHeader ||
|
|
||||||
// TODO remove isArray test
|
|
||||||
Array.isArray(chunk.columns)) &&
|
|
||||||
!this.header;
|
|
||||||
this.wasHeader = true;
|
this.wasHeader = true;
|
||||||
}
|
}
|
||||||
if (!skip) {
|
if (!skip) {
|
||||||
|
|||||||
@@ -14,11 +14,7 @@ class SqlizeStream extends stream.Transform {
|
|||||||
_transform(chunk, encoding, done) {
|
_transform(chunk, encoding, done) {
|
||||||
let skip = false;
|
let skip = false;
|
||||||
if (!this.wasHeader) {
|
if (!this.wasHeader) {
|
||||||
if (
|
if (chunk.__isStreamHeader) {
|
||||||
chunk.__isStreamHeader ||
|
|
||||||
// TODO remove isArray test
|
|
||||||
Array.isArray(chunk.columns)
|
|
||||||
) {
|
|
||||||
skip = true;
|
skip = true;
|
||||||
this.tableName = chunk.pureName;
|
this.tableName = chunk.pureName;
|
||||||
if (chunk.engine) {
|
if (chunk.engine) {
|
||||||
|
|||||||
@@ -13,11 +13,7 @@ class EnsureStreamHeaderStream extends stream.Transform {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (
|
if (!chunk.__isStreamHeader) {
|
||||||
!chunk.__isStreamHeader &&
|
|
||||||
// TODO remove isArray test
|
|
||||||
!Array.isArray(chunk.columns)
|
|
||||||
) {
|
|
||||||
this.push({
|
this.push({
|
||||||
__isStreamHeader: true,
|
__isStreamHeader: true,
|
||||||
__isComputedStructure: true,
|
__isComputedStructure: true,
|
||||||
|
|||||||
@@ -14,6 +14,7 @@ export function createBulkInsertStreamBase(driver, stream, pool, name, options):
|
|||||||
writable.buffer = [];
|
writable.buffer = [];
|
||||||
writable.structure = null;
|
writable.structure = null;
|
||||||
writable.columnNames = null;
|
writable.columnNames = null;
|
||||||
|
writable.requireFixedStructure = !driver.dialect.nosql;
|
||||||
|
|
||||||
writable.addRow = async row => {
|
writable.addRow = async row => {
|
||||||
if (writable.structure) {
|
if (writable.structure) {
|
||||||
|
|||||||
@@ -14,18 +14,13 @@ function createBulkInsertStream(driver, stream, pool, name, options) {
|
|||||||
writable.addRow = (row) => {
|
writable.addRow = (row) => {
|
||||||
if (!writable.wasHeader) {
|
if (!writable.wasHeader) {
|
||||||
writable.wasHeader = true;
|
writable.wasHeader = true;
|
||||||
if (
|
if (row.__isStreamHeader) return;
|
||||||
row.__isStreamHeader ||
|
|
||||||
// TODO remove isArray test
|
|
||||||
Array.isArray(row.columns)
|
|
||||||
)
|
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
if (options.createStringId) {
|
if (options.createStringId) {
|
||||||
row = {
|
row = {
|
||||||
_id: new ObjectId().toString(),
|
_id: new ObjectId().toString(),
|
||||||
...row,
|
...row,
|
||||||
}
|
};
|
||||||
}
|
}
|
||||||
writable.buffer.push(row);
|
writable.buffer.push(row);
|
||||||
};
|
};
|
||||||
|
|||||||
Reference in New Issue
Block a user