catch errors in base bulk importer

This commit is contained in:
SPRINX0\prochazka
2025-03-05 16:56:18 +01:00
parent 57f1019e51
commit 58b88d66be

View File

@@ -4,6 +4,7 @@ import _fromPairs from 'lodash/fromPairs';
import { getLogger } from './getLogger'; import { getLogger } from './getLogger';
import { prepareTableForImport } from './tableTransforms'; import { prepareTableForImport } from './tableTransforms';
import { RowProgressReporter } from './rowProgressReporter'; import { RowProgressReporter } from './rowProgressReporter';
import { extractErrorLogData } from './stringTools';
const logger = getLogger('bulkStreamBase'); const logger = getLogger('bulkStreamBase');
@@ -34,88 +35,100 @@ export function createBulkInsertStreamBase(driver: EngineDriver, stream, dbhan,
}; };
writable.checkStructure = async () => { writable.checkStructure = async () => {
let structure = options.targetTableStructure ?? (await driver.analyseSingleTable(dbhan, name)); try {
if (structure) { let structure = options.targetTableStructure ?? (await driver.analyseSingleTable(dbhan, name));
writable.structure = structure; if (structure) {
} writable.structure = structure;
if (structure && options.dropIfExists) { }
logger.info(`Dropping table ${fullNameQuoted}`); if (structure && options.dropIfExists) {
await driver.script(dbhan, `DROP TABLE ${fullNameQuoted}`); logger.info(`Dropping table ${fullNameQuoted}`);
} await driver.script(dbhan, `DROP TABLE ${fullNameQuoted}`);
if (options.createIfNotExists && (!structure || options.dropIfExists)) { }
const dmp = driver.createDumper(); if (options.createIfNotExists && (!structure || options.dropIfExists)) {
const createdTableInfo = driver.adaptTableInfo(prepareTableForImport({ ...writable.structure, ...name })); const dmp = driver.createDumper();
dmp.createTable(createdTableInfo); const createdTableInfo = driver.adaptTableInfo(prepareTableForImport({ ...writable.structure, ...name }));
logger.info({ sql: dmp.s }, `Creating table ${fullNameQuoted}`); dmp.createTable(createdTableInfo);
await driver.script(dbhan, dmp.s); logger.info({ sql: dmp.s }, `Creating table ${fullNameQuoted}`);
structure = await driver.analyseSingleTable(dbhan, name); await driver.script(dbhan, dmp.s);
writable.structure = structure; structure = await driver.analyseSingleTable(dbhan, name);
} writable.structure = structure;
if (!writable.structure) { }
throw new Error(`Error importing table - ${fullNameQuoted} not found`); if (!writable.structure) {
} throw new Error(`Error importing table - ${fullNameQuoted} not found`);
if (options.truncate) { }
await driver.script(dbhan, `TRUNCATE TABLE ${fullNameQuoted}`); if (options.truncate) {
} await driver.script(dbhan, `TRUNCATE TABLE ${fullNameQuoted}`);
}
writable.columnNames = _intersection( writable.columnNames = _intersection(
structure.columns.map(x => x.columnName), structure.columns.map(x => x.columnName),
writable.structure.columns.map(x => x.columnName) writable.structure.columns.map(x => x.columnName)
); );
writable.columnDataTypes = _fromPairs( writable.columnDataTypes = _fromPairs(
writable.columnNames.map(colName => [ writable.columnNames.map(colName => [
colName, colName,
writable.structure.columns.find(x => x.columnName == colName)?.dataType, writable.structure.columns.find(x => x.columnName == colName)?.dataType,
]) ])
); );
} catch (err) {
logger.error(extractErrorLogData(err), 'Error during preparing bulk insert table, stopped');
writable.destroy(err);
}
}; };
writable.send = async () => { writable.send = async () => {
const rows = writable.buffer; const rows = writable.buffer;
writable.buffer = []; writable.buffer = [];
if (driver.dialect.allowMultipleValuesInsert) { try {
const dmp = driver.createDumper(); if (driver.dialect.allowMultipleValuesInsert) {
dmp.putRaw(`INSERT INTO ${fullNameQuoted} (`);
dmp.putCollection(',', writable.columnNames, col => dmp.putRaw(driver.dialect.quoteIdentifier(col as string)));
dmp.putRaw(')\n VALUES\n');
let wasRow = false;
for (const row of rows) {
if (wasRow) dmp.putRaw(',\n');
dmp.putRaw('(');
dmp.putCollection(',', writable.columnNames, col =>
dmp.putValue(row[col as string], writable.columnDataTypes?.[col as string])
);
dmp.putRaw(')');
wasRow = true;
}
dmp.putRaw(';');
// require('fs').writeFileSync('/home/jena/test.sql', dmp.s);
// console.log(dmp.s);
await driver.query(dbhan, dmp.s, { discardResult: true });
writable.rowsReporter.add(rows.length);
} else {
for (const row of rows) {
const dmp = driver.createDumper(); const dmp = driver.createDumper();
dmp.putRaw(`INSERT INTO ${fullNameQuoted} (`); dmp.putRaw(`INSERT INTO ${fullNameQuoted} (`);
dmp.putCollection(',', writable.columnNames, col => dmp.putRaw(driver.dialect.quoteIdentifier(col as string))); dmp.putCollection(',', writable.columnNames, col => dmp.putRaw(driver.dialect.quoteIdentifier(col as string)));
dmp.putRaw(')\n VALUES\n'); dmp.putRaw(')\n VALUES\n');
dmp.putRaw('('); let wasRow = false;
dmp.putCollection(',', writable.columnNames, col => for (const row of rows) {
dmp.putValue(row[col as string], writable.columnDataTypes?.[col as string]) if (wasRow) dmp.putRaw(',\n');
); dmp.putRaw('(');
dmp.putRaw(')'); dmp.putCollection(',', writable.columnNames, col =>
dmp.putValue(row[col as string], writable.columnDataTypes?.[col as string])
);
dmp.putRaw(')');
wasRow = true;
}
dmp.putRaw(';');
// require('fs').writeFileSync('/home/jena/test.sql', dmp.s);
// console.log(dmp.s); // console.log(dmp.s);
await driver.query(dbhan, dmp.s, { discardResult: true }); await driver.query(dbhan, dmp.s, { discardResult: true });
writable.rowsReporter.add(1); writable.rowsReporter.add(rows.length);
} else {
for (const row of rows) {
const dmp = driver.createDumper();
dmp.putRaw(`INSERT INTO ${fullNameQuoted} (`);
dmp.putCollection(',', writable.columnNames, col =>
dmp.putRaw(driver.dialect.quoteIdentifier(col as string))
);
dmp.putRaw(')\n VALUES\n');
dmp.putRaw('(');
dmp.putCollection(',', writable.columnNames, col =>
dmp.putValue(row[col as string], writable.columnDataTypes?.[col as string])
);
dmp.putRaw(')');
// console.log(dmp.s);
await driver.query(dbhan, dmp.s, { discardResult: true });
writable.rowsReporter.add(1);
}
} }
} if (options.commitAfterInsert) {
if (options.commitAfterInsert) { const dmp = driver.createDumper();
const dmp = driver.createDumper(); dmp.commitTransaction();
dmp.commitTransaction(); await driver.query(dbhan, dmp.s, { discardResult: true });
await driver.query(dbhan, dmp.s, { discardResult: true }); }
} catch (err) {
logger.error(extractErrorLogData(err), 'Error during base bulk insert, insert stopped');
writable.destroy(err);
} }
}; };