diff --git a/packages/tools/src/createBulkInsertStreamBase.ts b/packages/tools/src/createBulkInsertStreamBase.ts index 618d6785e..9dc6842fc 100644 --- a/packages/tools/src/createBulkInsertStreamBase.ts +++ b/packages/tools/src/createBulkInsertStreamBase.ts @@ -4,6 +4,7 @@ import _fromPairs from 'lodash/fromPairs'; import { getLogger } from './getLogger'; import { prepareTableForImport } from './tableTransforms'; import { RowProgressReporter } from './rowProgressReporter'; +import { extractErrorLogData } from './stringTools'; const logger = getLogger('bulkStreamBase'); @@ -34,88 +35,100 @@ export function createBulkInsertStreamBase(driver: EngineDriver, stream, dbhan, }; writable.checkStructure = async () => { - let structure = options.targetTableStructure ?? (await driver.analyseSingleTable(dbhan, name)); - if (structure) { - writable.structure = structure; - } - if (structure && options.dropIfExists) { - logger.info(`Dropping table ${fullNameQuoted}`); - await driver.script(dbhan, `DROP TABLE ${fullNameQuoted}`); - } - if (options.createIfNotExists && (!structure || options.dropIfExists)) { - const dmp = driver.createDumper(); - const createdTableInfo = driver.adaptTableInfo(prepareTableForImport({ ...writable.structure, ...name })); - dmp.createTable(createdTableInfo); - logger.info({ sql: dmp.s }, `Creating table ${fullNameQuoted}`); - await driver.script(dbhan, dmp.s); - structure = await driver.analyseSingleTable(dbhan, name); - writable.structure = structure; - } - if (!writable.structure) { - throw new Error(`Error importing table - ${fullNameQuoted} not found`); - } - if (options.truncate) { - await driver.script(dbhan, `TRUNCATE TABLE ${fullNameQuoted}`); - } + try { + let structure = options.targetTableStructure ?? (await driver.analyseSingleTable(dbhan, name)); + if (structure) { + writable.structure = structure; + } + if (structure && options.dropIfExists) { + logger.info(`Dropping table ${fullNameQuoted}`); + await driver.script(dbhan, `DROP TABLE ${fullNameQuoted}`); + } + if (options.createIfNotExists && (!structure || options.dropIfExists)) { + const dmp = driver.createDumper(); + const createdTableInfo = driver.adaptTableInfo(prepareTableForImport({ ...writable.structure, ...name })); + dmp.createTable(createdTableInfo); + logger.info({ sql: dmp.s }, `Creating table ${fullNameQuoted}`); + await driver.script(dbhan, dmp.s); + structure = await driver.analyseSingleTable(dbhan, name); + writable.structure = structure; + } + if (!writable.structure) { + throw new Error(`Error importing table - ${fullNameQuoted} not found`); + } + if (options.truncate) { + await driver.script(dbhan, `TRUNCATE TABLE ${fullNameQuoted}`); + } - writable.columnNames = _intersection( - structure.columns.map(x => x.columnName), - writable.structure.columns.map(x => x.columnName) - ); - writable.columnDataTypes = _fromPairs( - writable.columnNames.map(colName => [ - colName, - writable.structure.columns.find(x => x.columnName == colName)?.dataType, - ]) - ); + writable.columnNames = _intersection( + structure.columns.map(x => x.columnName), + writable.structure.columns.map(x => x.columnName) + ); + writable.columnDataTypes = _fromPairs( + writable.columnNames.map(colName => [ + colName, + writable.structure.columns.find(x => x.columnName == colName)?.dataType, + ]) + ); + } catch (err) { + logger.error(extractErrorLogData(err), 'Error during preparing bulk insert table, stopped'); + writable.destroy(err); + } }; writable.send = async () => { const rows = writable.buffer; writable.buffer = []; - if (driver.dialect.allowMultipleValuesInsert) { - const dmp = driver.createDumper(); - dmp.putRaw(`INSERT INTO ${fullNameQuoted} (`); - dmp.putCollection(',', writable.columnNames, col => dmp.putRaw(driver.dialect.quoteIdentifier(col as string))); - dmp.putRaw(')\n VALUES\n'); - - let wasRow = false; - for (const row of rows) { - if (wasRow) dmp.putRaw(',\n'); - dmp.putRaw('('); - dmp.putCollection(',', writable.columnNames, col => - dmp.putValue(row[col as string], writable.columnDataTypes?.[col as string]) - ); - dmp.putRaw(')'); - wasRow = true; - } - dmp.putRaw(';'); - // require('fs').writeFileSync('/home/jena/test.sql', dmp.s); - // console.log(dmp.s); - await driver.query(dbhan, dmp.s, { discardResult: true }); - writable.rowsReporter.add(rows.length); - } else { - for (const row of rows) { + try { + if (driver.dialect.allowMultipleValuesInsert) { const dmp = driver.createDumper(); dmp.putRaw(`INSERT INTO ${fullNameQuoted} (`); dmp.putCollection(',', writable.columnNames, col => dmp.putRaw(driver.dialect.quoteIdentifier(col as string))); dmp.putRaw(')\n VALUES\n'); - dmp.putRaw('('); - dmp.putCollection(',', writable.columnNames, col => - dmp.putValue(row[col as string], writable.columnDataTypes?.[col as string]) - ); - dmp.putRaw(')'); + let wasRow = false; + for (const row of rows) { + if (wasRow) dmp.putRaw(',\n'); + dmp.putRaw('('); + dmp.putCollection(',', writable.columnNames, col => + dmp.putValue(row[col as string], writable.columnDataTypes?.[col as string]) + ); + dmp.putRaw(')'); + wasRow = true; + } + dmp.putRaw(';'); + // require('fs').writeFileSync('/home/jena/test.sql', dmp.s); // console.log(dmp.s); await driver.query(dbhan, dmp.s, { discardResult: true }); - writable.rowsReporter.add(1); + writable.rowsReporter.add(rows.length); + } else { + for (const row of rows) { + const dmp = driver.createDumper(); + dmp.putRaw(`INSERT INTO ${fullNameQuoted} (`); + dmp.putCollection(',', writable.columnNames, col => + dmp.putRaw(driver.dialect.quoteIdentifier(col as string)) + ); + dmp.putRaw(')\n VALUES\n'); + + dmp.putRaw('('); + dmp.putCollection(',', writable.columnNames, col => + dmp.putValue(row[col as string], writable.columnDataTypes?.[col as string]) + ); + dmp.putRaw(')'); + // console.log(dmp.s); + await driver.query(dbhan, dmp.s, { discardResult: true }); + writable.rowsReporter.add(1); + } } - } - if (options.commitAfterInsert) { - const dmp = driver.createDumper(); - dmp.commitTransaction(); - await driver.query(dbhan, dmp.s, { discardResult: true }); + if (options.commitAfterInsert) { + const dmp = driver.createDumper(); + dmp.commitTransaction(); + await driver.query(dbhan, dmp.s, { discardResult: true }); + } + } catch (err) { + logger.error(extractErrorLogData(err), 'Error during base bulk insert, insert stopped'); + writable.destroy(err); } };