This commit is contained in:
SPRINX0\prochazka
2024-12-20 13:45:41 +01:00
4 changed files with 18 additions and 16 deletions

View File

@@ -25,12 +25,13 @@ function createImportStream() {
}
function createExportStream() {
const writable = new Stream.Writable({ objectMode: true });
writable.result = [];
writable._write = (object, encoding, done) => {
result.push(object);
done();
const writable = new stream.Writable({ objectMode: true });
writable.resultArray = [];
writable._write = (chunk, encoding, callback) => {
writable.resultArray.push(chunk);
callback();
};
return writable;
}
describe('DB Import/export', () => {
@@ -115,7 +116,7 @@ describe('DB Import/export', () => {
testWrapper(async (conn, driver, engine) => {
// const reader = await fakeObjectReader({ delay: 10 });
// const reader = await fakeObjectReader();
await runCommandOnDriver(conn, driver, 'create table ~t1 (~id int, ~country varchar(100))');
await runCommandOnDriver(conn, driver, 'create table ~t1 (~id int primary key, ~country varchar(100))');
const data = [
[1, 'Czechia'],
[2, 'Austria'],
@@ -125,18 +126,19 @@ describe('DB Import/export', () => {
[6, 'Bosna, Hecegovina'],
];
for (const row of data) {
await runCommandOnDriver(conn, driver, 'insert into ~t1(~id, ~country) values (%v, %v)', ...row);
await runCommandOnDriver(conn, driver, dmp =>
dmp.put('insert into ~t1(~id, ~country) values (%v, %v)', ...row)
);
}
const reader = await tableReader({
systemConnection: conn,
driver,
pureName: 't1',
createIfNotExists: true,
});
const writer = createExportStream();
await copyStream(reader, writer);
expect(writer.result).toEqual(data);
expect(writer.resultArray.filter(x => !x.__isStreamHeader).map(row => [row.id, row.country])).toEqual(data);
})
);
});

View File

@@ -580,11 +580,11 @@ const enginesOnLocal = [
// mysqlEngine,
// mariaDbEngine,
// postgreSqlEngine,
sqlServerEngine,
// sqlServerEngine,
// sqliteEngine,
// cockroachDbEngine,
// clickhouseEngine,
// oracleEngine,
oracleEngine,
];
module.exports = process.env.CITEST ? enginesOnCi : enginesOnLocal;

View File

@@ -8,12 +8,15 @@ const logger = getLogger('tableReader');
* @param {object} options
* @param {connectionType} options.connection - connection object
* @param {object} options.systemConnection - system connection (result of driver.connect). If not provided, new connection will be created
* @param {object} options.driver - driver object. If not provided, it will be loaded from connection
* @param {string} options.pureName - table name
* @param {string} options.schemaName - schema name
* @returns {Promise<readerType>} - reader object
*/
async function tableReader({ connection, systemConnection, pureName, schemaName }) {
const driver = requireEngineDriver(connection);
async function tableReader({ connection, systemConnection, pureName, schemaName, driver }) {
if (!driver) {
driver = requireEngineDriver(connection);
}
const dbhan = systemConnection || (await connectUtility(driver, connection, 'read'));
logger.info(`Connected.`);

View File

@@ -306,12 +306,9 @@ const driver = {
}
wasHeader = true;
}
pass.write(zipDataRow(row, columns));
});
query.on('data', row => {
// console.log('readQuery data', row);
pass.write(zipDataRow(row, columns));
});