bulk inserter - fixes for mysql and postgres

This commit is contained in:
Jan Prochazka
2020-09-28 14:33:47 +02:00
parent 29a7b68b59
commit 6548400b96
6 changed files with 13 additions and 8 deletions

View File

@@ -21,7 +21,7 @@ module.exports = {
opened: [], opened: [],
dispatchMessage(runid, message) { dispatchMessage(runid, message) {
console.log('DISPATCHING', message); // console.log('DISPATCHING', message);
if (_.isString(message)) { if (_.isString(message)) {
socket.emit(`runner-info-${runid}`, { socket.emit(`runner-info-${runid}`, {
message, message,

View File

@@ -8,11 +8,11 @@ module.exports = {
return socket; return socket;
}, },
emit(message, data) { emit(message, data) {
console.log('EMIT:', message, data); // console.log('EMIT:', message, data);
socket.emit(message, data); socket.emit(message, data);
}, },
emitChanged(key) { emitChanged(key) {
console.log('EMIT_CHANGED:', key); // console.log('EMIT_CHANGED:', key);
socket.emit('clean-cache', key); socket.emit('clean-cache', key);
socket.emit(key); socket.emit(key);
}, },

View File

@@ -29,6 +29,7 @@ function createBulkInsertStreamBase(driver, stream, pool, name, options) {
writable.checkStructure = async () => { writable.checkStructure = async () => {
let structure = await driver.analyseSingleTable(pool, name); let structure = await driver.analyseSingleTable(pool, name);
// console.log('ANALYSING', name, structure);
if (structure && options.dropIfExists) { if (structure && options.dropIfExists) {
console.log(`Dropping table ${fullNameQuoted}`); console.log(`Dropping table ${fullNameQuoted}`);
await driver.query(pool, `DROP TABLE ${fullNameQuoted}`); await driver.query(pool, `DROP TABLE ${fullNameQuoted}`);
@@ -59,7 +60,7 @@ function createBulkInsertStreamBase(driver, stream, pool, name, options) {
dmp.putRaw(`INSERT INTO ${fullNameQuoted} (`); dmp.putRaw(`INSERT INTO ${fullNameQuoted} (`);
dmp.putCollection(',', this.columnNames, (col) => dmp.putRaw(driver.dialect.quoteIdentifier(col))); dmp.putCollection(',', this.columnNames, (col) => dmp.putRaw(driver.dialect.quoteIdentifier(col)));
dmp.putRaw('\n'); dmp.putRaw(')\n VALUES\n');
let wasRow = false; let wasRow = false;
for (const row of rows) { for (const row of rows) {
@@ -67,7 +68,10 @@ function createBulkInsertStreamBase(driver, stream, pool, name, options) {
dmp.putRaw('('); dmp.putRaw('(');
dmp.putCollection(',', this.columnNames, (col) => dmp.putValue(row[col])); dmp.putCollection(',', this.columnNames, (col) => dmp.putValue(row[col]));
dmp.putRaw(')'); dmp.putRaw(')');
wasRow = true;
} }
dmp.putRaw(';');
// require('fs').writeFileSync('/home/jena/test.sql', dmp.s);
await driver.query(pool, dmp.s); await driver.query(pool, dmp.s);
}; };

View File

@@ -6,7 +6,7 @@ const MySqlDumper = require('./MySqlDumper');
const dialect = { const dialect = {
rangeSelect: true, rangeSelect: true,
stringEscapeChar: '\\', stringEscapeChar: '\\',
fallbackDataType: 'nvarchar(max)', fallbackDataType: 'longtext',
quoteIdentifier(s) { quoteIdentifier(s) {
return '`' + s + '`'; return '`' + s + '`';
}, },

View File

@@ -46,7 +46,7 @@ class PostgreAnalyser extends DatabaseAnalayser {
if (this.singleObjectFilter) { if (this.singleObjectFilter) {
const { typeField, schemaName, pureName } = this.singleObjectFilter; const { typeField, schemaName, pureName } = this.singleObjectFilter;
if (!typeFields || !typeFields.includes(typeField)) return null; if (!typeFields || !typeFields.includes(typeField)) return null;
res = res.replace(/=OBJECT_ID_CONDITION/g, ` = '${typeField}:${schemaName}.${pureName}'`); res = res.replace(/=OBJECT_ID_CONDITION/g, ` = '${typeField}:${schemaName || 'public'}.${pureName}'`);
return res; return res;
} }
if (!this.modifications || !typeFields || this.modifications.length == 0) { if (!this.modifications || !typeFields || this.modifications.length == 0) {

View File

@@ -6,8 +6,9 @@ const PostgreDumper = require('./PostgreDumper');
/** @type {import('@dbgate/types').SqlDialect} */ /** @type {import('@dbgate/types').SqlDialect} */
const dialect = { const dialect = {
rangeSelect: true, rangeSelect: true,
stringEscapeChar: '\\', // stringEscapeChar: '\\',
fallbackDataType: 'nvarchar(max)', stringEscapeChar: "'",
fallbackDataType: 'varchar',
quoteIdentifier(s) { quoteIdentifier(s) {
return '"' + s + '"'; return '"' + s + '"';
}, },