diff --git a/packages/engines/default/createBulkInsertStreamBase.js b/packages/engines/default/createBulkInsertStreamBase.js new file mode 100644 index 000000000..ed5dfe99c --- /dev/null +++ b/packages/engines/default/createBulkInsertStreamBase.js @@ -0,0 +1,78 @@ +const { prepareTableForImport } = require('@dbgate/tools'); +const _ = require('lodash'); + +/** + * + * @param {import('@dbgate/types').EngineDriver} driver + */ +function createBulkInsertStreamBase(driver, stream, pool, name, options) { + const fullNameQuoted = name.schemaName + ? `${driver.dialect.quoteIdentifier(name.schemaName)}.${driver.dialect.quoteIdentifier(name.pureName)}` + : driver.dialect.quoteIdentifier(name.pureName); + + const writable = new stream.Writable({ + objectMode: true, + }); + + writable.buffer = []; + writable.structure = null; + writable.columnNames = null; + + writable.addRow = async (row) => { + if (writable.structure) { + writable.buffer.push(row); + } else { + writable.structure = row; + await writable.checkStructure(); + } + }; + + writable.checkStructure = async () => { + let structure = await driver.analyseSingleTable(pool, name); + if (structure && options.dropIfExists) { + console.log(`Dropping table ${fullNameQuoted}`); + await driver.query(pool, `DROP TABLE ${fullNameQuoted}`); + } + if (options.createIfNotExists && (!structure || options.dropIfExists)) { + console.log(`Creating table ${fullNameQuoted}`); + const dmp = driver.createDumper(); + dmp.createTable(prepareTableForImport({ ...writable.structure, ...name })); + console.log(dmp.s); + await driver.query(pool, dmp.s); + structure = await driver.analyseSingleTable(pool, name); + } + if (options.truncate) { + await driver.query(pool, `TRUNCATE TABLE ${fullNameQuoted}`); + } + + const respTemplate = await pool.request().query(`SELECT * FROM ${fullNameQuoted} WHERE 1=0`); + writable.templateColumns = respTemplate.recordset.toTable().columns; + // console.log('writable.templateColumns', writable.templateColumns); + + this.columnNames = _.intersection( + structure.columns.map((x) => x.columnName), + writable.structure.columns.map((x) => x.columnName) + ); + }; + + writable.sendIfFull = async () => { + if (writable.buffer.length > 100) { + await writable.send(); + } + }; + + writable._write = async (chunk, encoding, callback) => { + await writable.addRow(chunk); + await writable.sendIfFull(); + callback(); + }; + + writable._final = async (callback) => { + await writable.send(); + callback(); + }; + + return writable; +} + +module.exports = createBulkInsertStreamBase; diff --git a/packages/engines/mssql/createBulkInsertStream.js b/packages/engines/mssql/createBulkInsertStream.js index 850543ae3..26aaca1c4 100644 --- a/packages/engines/mssql/createBulkInsertStream.js +++ b/packages/engines/mssql/createBulkInsertStream.js @@ -1,58 +1,15 @@ const { prepareTableForImport } = require('@dbgate/tools'); const _ = require('lodash'); +const createBulkInsertStreamBase = require('../default/createBulkInsertStreamBase'); /** * * @param {import('@dbgate/types').EngineDriver} driver */ function createBulkInsertStream(driver, mssql, stream, pool, name, options) { + const writable = createBulkInsertStreamBase(driver, stream, pool, name, options); + const fullName = name.schemaName ? `[${name.schemaName}].[${name.pureName}]` : name.pureName; - const fullNameQuoted = name.schemaName ? `[${name.schemaName}].[${name.pureName}]` : `[${name.pureName}]`; - - const writable = new stream.Writable({ - objectMode: true, - }); - - writable.buffer = []; - writable.structure = null; - writable.columnNames = null; - - writable.addRow = async (row) => { - if (writable.structure) { - writable.buffer.push(row); - } else { - writable.structure = row; - await writable.checkStructure(); - } - }; - - writable.checkStructure = async () => { - let structure = await driver.analyseSingleTable(pool, name); - if (structure && options.dropIfExists) { - console.log(`Dropping table ${fullName}`); - await driver.query(pool, `DROP TABLE ${fullNameQuoted}`); - } - if (options.createIfNotExists && (!structure || options.dropIfExists)) { - console.log(`Creating table ${fullName}`); - const dmp = driver.createDumper(); - dmp.createTable(prepareTableForImport({ ...writable.structure, ...name })); - console.log(dmp.s); - await driver.query(pool, dmp.s); - structure = await driver.analyseSingleTable(pool, name); - } - if (options.truncate) { - await driver.query(pool, `TRUNCATE TABLE ${fullNameQuoted}`); - } - - const respTemplate = await pool.request().query(`SELECT * FROM ${fullNameQuoted} WHERE 1=0`); - writable.templateColumns = respTemplate.recordset.toTable().columns; - // console.log('writable.templateColumns', writable.templateColumns); - - this.columnNames = _.intersection( - structure.columns.map((x) => x.columnName), - writable.structure.columns.map((x) => x.columnName) - ); - }; writable.send = async () => { const rows = writable.buffer; @@ -78,23 +35,6 @@ function createBulkInsertStream(driver, mssql, stream, pool, name, options) { await request.bulk(table); }; - writable.sendIfFull = async () => { - if (writable.buffer.length > 100) { - await writable.send(); - } - }; - - writable._write = async (chunk, encoding, callback) => { - await writable.addRow(chunk); - await writable.sendIfFull(); - callback(); - }; - - writable._final = async (callback) => { - await writable.send(); - callback(); - }; - return writable; }