new streams

This commit is contained in:
Jan Prochazka
2020-06-10 21:35:25 +02:00
parent 26120969de
commit 1695fb2fd8
7 changed files with 77 additions and 27 deletions

View File

@@ -0,0 +1,16 @@
const stream = require('stream');
class ObjectWriterStream extends stream.Writable {
_write(chunk, enc, next) {
console.log(JSON.stringify(chunk));
next();
}
}
async function consoleObjectWriter() {
return new ObjectWriterStream({
objectMode: true,
});
}
module.exports = consoleObjectWriter;

View File

@@ -0,0 +1,15 @@
const csv = require('csv');
const fs = require('fs');
async function csvReader({ fileName, encoding = 'utf-8', ...options }) {
console.log(`Reading file ${fileName}`);
const csvStream = csv.parse({
columns: true,
...options,
});
const fileStream = fs.createReadStream(fileName, encoding);
fileStream.pipe(csvStream);
return csvStream;
}
module.exports = csvReader;

View File

@@ -7,8 +7,10 @@ async function fakeObjectReader({ delay = 0 } = {}) {
function doWrite() {
pass.write({ id: 1, country: 'Czechia' });
pass.write({ id: 2, country: 'Austria' });
pass.write({ id: 3, country: 'Germany' });
pass.write({ id: 4, country: 'Romania' });
pass.write({ country: 'Germany', id: 3 });
pass.write({ country: 'Romania', id: 4 });
pass.write({ country: 'Great Britain', id: 5 });
pass.write({ country: 'Bosna, Hecegovina', id: 6 });
pass.end();
}

View File

@@ -1,13 +1,17 @@
const queryReader = require('./queryReader');
const csvWriter = require('./csvWriter');
const csvReader = require('./csvReader');
const runScript = require('./runScript');
const copyStream = require('./copyStream');
const fakeObjectReader = require('./fakeObjectReader');
const consoleObjectWriter = require('./consoleObjectWriter');
module.exports = {
queryReader,
csvWriter,
csvReader,
runScript,
copyStream,
fakeObjectReader,
consoleObjectWriter,
};