sqlite stream reader

This commit is contained in:
Jan Prochazka
2021-05-06 15:23:45 +02:00
parent 615397f332
commit 1888de8728
3 changed files with 37 additions and 7 deletions

View File

@@ -6,6 +6,15 @@ const { identify } = require('sql-query-identifier');
let Database;
async function waitForDrain(stream) {
return new Promise((resolve) => {
stream.once('drain', () => {
// console.log('CONTINUE DRAIN');
resolve();
});
});
}
function runStreamItem(client, sql, options, rowCounter) {
const stmt = client.prepare(sql);
if (stmt.reader) {
@@ -96,16 +105,36 @@ const driver = {
options.done();
// return stream;
},
async readQueryTask(stmt, pass) {
// let sent = 0;
for (const row of stmt.iterate()) {
// sent++;
if (!pass.write(row)) {
// console.log('WAIT DRAIN', sent);
await waitForDrain(pass);
}
}
pass.end();
},
async readQuery(pool, sql, structure) {
const pass = new stream.PassThrough({
objectMode: true,
highWaterMark: 100,
});
// pass.write(structure)
// pass.write(row1)
// pass.write(row2)
// pass.end()
const stmt = pool.prepare(sql);
const columns = stmt.columns();
pass.write({
__isStreamHeader: true,
...(structure || {
columns: columns.map((col) => ({
columnName: col.name,
dataType: col.type,
})),
}),
});
this.readQueryTask(stmt, pass);
return pass;
},