diff --git a/packages/engines/default/createBulkInsertStreamBase.js b/packages/engines/default/createBulkInsertStreamBase.js index ed5dfe99c..a47096199 100644 --- a/packages/engines/default/createBulkInsertStreamBase.js +++ b/packages/engines/default/createBulkInsertStreamBase.js @@ -45,16 +45,32 @@ function createBulkInsertStreamBase(driver, stream, pool, name, options) { await driver.query(pool, `TRUNCATE TABLE ${fullNameQuoted}`); } - const respTemplate = await pool.request().query(`SELECT * FROM ${fullNameQuoted} WHERE 1=0`); - writable.templateColumns = respTemplate.recordset.toTable().columns; - // console.log('writable.templateColumns', writable.templateColumns); - this.columnNames = _.intersection( structure.columns.map((x) => x.columnName), writable.structure.columns.map((x) => x.columnName) ); }; + writable.send = async () => { + const rows = writable.buffer; + writable.buffer = []; + + const dmp = driver.createDumper(); + + dmp.putRaw(`INSERT INTO ${fullNameQuoted} (`); + dmp.putCollection(',', this.columnNames, (col) => dmp.putRaw(driver.dialect.quoteIdentifier(col))); + dmp.putRaw('\n'); + + let wasRow = false; + for (const row of rows) { + if (wasRow) dmp.putRaw(',\n'); + dmp.putRaw('('); + dmp.putCollection(',', this.columnNames, (col) => dmp.putValue(row[col])); + dmp.putRaw(')'); + } + await driver.query(pool, dmp.s); + }; + writable.sendIfFull = async () => { if (writable.buffer.length > 100) { await writable.send(); diff --git a/packages/engines/default/driverBase.js b/packages/engines/default/driverBase.js new file mode 100644 index 000000000..38c6a9599 --- /dev/null +++ b/packages/engines/default/driverBase.js @@ -0,0 +1,34 @@ +const createBulkInsertStreamBase = require('./createBulkInsertStreamBase'); + +const driverBase = { + analyserClass: null, + dumperClass: null, + + async analyseFull(pool) { + const analyser = new this.analyserClass(pool, this); + return analyser.fullAnalysis(); + }, + async analyseSingleObject(pool, name, typeField = 'tables') { + const analyser = new this.analyserClass(pool, this); + analyser.singleObjectFilter = { ...name, typeField }; + const res = await analyser.fullAnalysis(); + return res.tables[0]; + }, + analyseSingleTable(pool, name) { + return this.analyseSingleObject(pool, name, 'tables'); + }, + async analyseIncremental(pool, structure) { + const analyser = new this.analyserClass(pool, this); + return analyser.incrementalAnalysis(structure); + }, + createDumper() { + return new this.dumperClass(this); + }, + async writeTable(pool, name, options) { + const { stream, mssql } = pool._nativeModules; + // @ts-ignore + return createBulkInsertStreamBase(this, stream, pool, name, options); + }, +}; + +module.exports = driverBase; diff --git a/packages/engines/mssql/createBulkInsertStream.js b/packages/engines/mssql/createBulkInsertStream.js index 26aaca1c4..c6d250820 100644 --- a/packages/engines/mssql/createBulkInsertStream.js +++ b/packages/engines/mssql/createBulkInsertStream.js @@ -1,5 +1,3 @@ -const { prepareTableForImport } = require('@dbgate/tools'); -const _ = require('lodash'); const createBulkInsertStreamBase = require('../default/createBulkInsertStreamBase'); /** @@ -12,6 +10,15 @@ function createBulkInsertStream(driver, mssql, stream, pool, name, options) { const fullName = name.schemaName ? `[${name.schemaName}].[${name.pureName}]` : name.pureName; writable.send = async () => { + if (!writable.templateColumns) { + const fullNameQuoted = name.schemaName + ? `${driver.dialect.quoteIdentifier(name.schemaName)}.${driver.dialect.quoteIdentifier(name.pureName)}` + : driver.dialect.quoteIdentifier(name.pureName); + + const respTemplate = await pool.request().query(`SELECT * FROM ${fullNameQuoted} WHERE 1=0`); + writable.templateColumns = respTemplate.recordset.toTable().columns; + } + const rows = writable.buffer; writable.buffer = []; const table = new mssql.Table(fullName); diff --git a/packages/engines/mssql/index.js b/packages/engines/mssql/index.js index 0261b1909..269473728 100644 --- a/packages/engines/mssql/index.js +++ b/packages/engines/mssql/index.js @@ -2,7 +2,7 @@ const _ = require('lodash'); const MsSqlAnalyser = require('./MsSqlAnalyser'); const MsSqlDumper = require('./MsSqlDumper'); const createBulkInsertStream = require('./createBulkInsertStream'); -const { analyseSingleObject } = require('../mysql'); +const driverBase = require('../default/driverBase'); /** @type {import('@dbgate/types').SqlDialect} */ const dialect = { @@ -53,6 +53,9 @@ function extractColumns(columns) { /** @type {import('@dbgate/types').EngineDriver} */ const driver = { + ...driverBase, + analyserClass: MsSqlAnalyser, + dumperClass: MsSqlDumper, async connect(nativeModules, { server, port, user, password, database }) { const pool = new nativeModules.mssql.ConnectionPool({ server, @@ -200,27 +203,6 @@ const driver = { const { rows } = await this.query(pool, 'SELECT name FROM sys.databases order by name'); return rows; }, - async analyseFull(pool) { - const analyser = new MsSqlAnalyser(pool, this); - return analyser.fullAnalysis(); - }, - async analyseSingleObject(pool, name, typeField = 'tables') { - const analyser = new MsSqlAnalyser(pool, this); - analyser.singleObjectFilter = { ...name, typeField }; - const res = await analyser.fullAnalysis(); - return res.tables[0]; - }, - // @ts-ignore - analyseSingleTable(pool, name) { - return this.analyseSingleObject(pool, name, 'tables'); - }, - async analyseIncremental(pool, structure) { - const analyser = new MsSqlAnalyser(pool, this); - return analyser.incrementalAnalysis(structure); - }, - createDumper() { - return new MsSqlDumper(this); - }, dialect, engine: 'mssql', }; diff --git a/packages/engines/mysql/index.js b/packages/engines/mysql/index.js index 1f1b131e5..66f00d2bc 100644 --- a/packages/engines/mysql/index.js +++ b/packages/engines/mysql/index.js @@ -1,3 +1,4 @@ +const driverBase = require('../default/driverBase'); const MySqlAnalyser = require('./MySqlAnalyser'); const MySqlDumper = require('./MySqlDumper'); @@ -21,6 +22,10 @@ function extractColumns(fields) { /** @type {import('@dbgate/types').EngineDriver} */ const driver = { + ...driverBase, + analyserClass: MySqlAnalyser, + dumperClass: MySqlDumper, + async connect(nativeModules, { server, port, user, password, database }) { const connection = nativeModules.mysql.createConnection({ host: server, @@ -116,31 +121,31 @@ const driver = { const version = rows[0].Value; return { version }; }, - async analyseFull(pool) { - const analyser = new MySqlAnalyser(pool, this); - return analyser.fullAnalysis(); - }, - async analyseIncremental(pool, structure) { - const analyser = new MySqlAnalyser(pool, this); - return analyser.incrementalAnalysis(structure); - }, - async analyseSingleObject(pool, name, typeField = 'tables') { - const analyser = new MySqlAnalyser(pool, this); - analyser.singleObjectFilter = { ...name, typeField }; - const res = await analyser.fullAnalysis(); - return res.tables[0]; - }, - // @ts-ignore - analyseSingleTable(pool, name) { - return this.analyseSingleObject(pool, name, 'tables'); - }, + // async analyseFull(pool) { + // const analyser = new MySqlAnalyser(pool, this); + // return analyser.fullAnalysis(); + // }, + // async analyseIncremental(pool, structure) { + // const analyser = new MySqlAnalyser(pool, this); + // return analyser.incrementalAnalysis(structure); + // }, + // async analyseSingleObject(pool, name, typeField = 'tables') { + // const analyser = new MySqlAnalyser(pool, this); + // analyser.singleObjectFilter = { ...name, typeField }; + // const res = await analyser.fullAnalysis(); + // return res.tables[0]; + // }, + // // @ts-ignore + // analyseSingleTable(pool, name) { + // return this.analyseSingleObject(pool, name, 'tables'); + // }, async listDatabases(connection) { const { rows } = await this.query(connection, 'show databases'); return rows.map((x) => ({ name: x.Database })); }, - createDumper() { - return new MySqlDumper(this); - }, + // createDumper() { + // return new MySqlDumper(this); + // }, dialect, engine: 'mysql', }; diff --git a/packages/engines/postgres/index.js b/packages/engines/postgres/index.js index 1134198ab..a53f99d29 100644 --- a/packages/engines/postgres/index.js +++ b/packages/engines/postgres/index.js @@ -1,4 +1,5 @@ const _ = require('lodash'); +const driverBase = require('../default/driverBase'); const PostgreAnalyser = require('./PostgreAnalyser'); const PostgreDumper = require('./PostgreDumper'); @@ -14,6 +15,10 @@ const dialect = { /** @type {import('@dbgate/types').EngineDriver} */ const driver = { + ...driverBase, + analyserClass: PostgreAnalyser, + dumperClass: PostgreDumper, + async connect(nativeModules, { server, port, user, password, database }) { const client = new nativeModules.pg.Client({ host: server, @@ -97,29 +102,29 @@ const driver = { return stream; }, - async analyseSingleObject(pool, name, typeField = 'tables') { - const analyser = new PostgreAnalyser(pool, this); - analyser.singleObjectFilter = { ...name, typeField }; - const res = await analyser.fullAnalysis(); - return res.tables[0]; - }, - // @ts-ignore - analyseSingleTable(pool, name) { - return this.analyseSingleObject(pool, name, 'tables'); - }, + // async analyseSingleObject(pool, name, typeField = 'tables') { + // const analyser = new PostgreAnalyser(pool, this); + // analyser.singleObjectFilter = { ...name, typeField }; + // const res = await analyser.fullAnalysis(); + // return res.tables[0]; + // }, + // // @ts-ignore + // analyseSingleTable(pool, name) { + // return this.analyseSingleObject(pool, name, 'tables'); + // }, async getVersion(client) { const { rows } = await this.query(client, 'SELECT version()'); const { version } = rows[0]; return { version }; }, - async analyseFull(pool) { - const analyser = new PostgreAnalyser(pool, this); - return analyser.fullAnalysis(); - }, - async analyseIncremental(pool, structure) { - const analyser = new PostgreAnalyser(pool, this); - return analyser.incrementalAnalysis(structure); - }, + // async analyseFull(pool) { + // const analyser = new PostgreAnalyser(pool, this); + // return analyser.fullAnalysis(); + // }, + // async analyseIncremental(pool, structure) { + // const analyser = new PostgreAnalyser(pool, this); + // return analyser.incrementalAnalysis(structure); + // }, async readQuery(client, sql, structure) { const query = new client._nativeModules.pgQueryStream(sql); const { stream } = client._nativeModules; @@ -165,9 +170,9 @@ const driver = { return pass; }, - createDumper() { - return new PostgreDumper(this); - }, + // createDumper() { + // return new PostgreDumper(this); + // }, async listDatabases(client) { const { rows } = await this.query(client, 'SELECT datname AS name FROM pg_database WHERE datistemplate = false'); return rows; diff --git a/packages/types/engines.d.ts b/packages/types/engines.d.ts index 3dba19c56..9dc88dec3 100644 --- a/packages/types/engines.d.ts +++ b/packages/types/engines.d.ts @@ -43,6 +43,9 @@ export interface EngineDriver { analyseIncremental(pool: any, structure: DatabaseInfo): Promise; dialect: SqlDialect; createDumper(): SqlDumper; + + analyserClass?: any; + dumperClass?: any; } export interface DatabaseModification {