diff --git a/packages/query-splitter/src/index.ts b/packages/query-splitter/src/index.ts index 7ded2fc5b..bce756683 100644 --- a/packages/query-splitter/src/index.ts +++ b/packages/query-splitter/src/index.ts @@ -1,2 +1,2 @@ -export * from './splitQuery'; +export { splitQuery } from './splitQuery'; export * from './options'; diff --git a/packages/query-splitter/src/splitQuery.ts b/packages/query-splitter/src/splitQuery.ts index 1adb8b32f..20dc08764 100644 --- a/packages/query-splitter/src/splitQuery.ts +++ b/packages/query-splitter/src/splitQuery.ts @@ -166,7 +166,7 @@ function pushQuery(context: SplitLineContext) { if (trimmed) context.pushOutput(trimmed); } -function splitQueryLine(context: SplitLineContext) { +export function splitQueryLine(context: SplitLineContext) { while (context.position < context.end) { const token = scanToken(context); if (!token) { @@ -221,6 +221,9 @@ function splitQueryLine(context: SplitLineContext) { } } +export function getInitialDelimiter(options: SplitterOptions) { + return options?.allowSemicolon === false ? null : SEMICOLON +} export function splitQuery(sql: string, options: SplitterOptions = null): string[] { const usedOptions = { ...defaultSplitterOptions, @@ -235,7 +238,7 @@ export function splitQuery(sql: string, options: SplitterOptions = null): string const context: SplitLineContext = { source: sql, end: sql.length, - currentDelimiter: options?.allowSemicolon === false ? null : SEMICOLON, + currentDelimiter: getInitialDelimiter(options), position: 0, currentCommandStart: 0, pushOutput: cmd => output.push(cmd), diff --git a/packages/query-splitter/src/splitQueryStream.ts b/packages/query-splitter/src/splitQueryStream.ts new file mode 100644 index 000000000..6235f4405 --- /dev/null +++ b/packages/query-splitter/src/splitQueryStream.ts @@ -0,0 +1,41 @@ +import stream from 'stream'; +import { SplitStreamContext, getInitialDelimiter, SplitLineContext, splitQueryLine } from './splitQuery'; +import { SplitterOptions } from './options'; + +export class SplitQueryStream extends stream.Transform { + context: SplitStreamContext; + + constructor(options: SplitterOptions) { + super({ objectMode: true }); + this.context = { + commandPart: '', + options, + currentDelimiter: getInitialDelimiter(options), + pushOutput: cmd => this.push(cmd), + }; + } + _transform(chunk, encoding, done) { + const lineContext: SplitLineContext = { + ...this.context, + position: 0, + currentCommandStart: 0, + wasDataOnLine: false, + source: chunk, + end: chunk.length, + }; + splitQueryLine(lineContext); + this.context.commandPart = lineContext.commandPart; + done(); + } + _flush(done) { + const trimmed = this.context.commandPart; + if (trimmed) this.push(trimmed); + done(); + } +} + +export function splitQueryStream(sourceStream, options: SplitterOptions) { + const splitter = new SplitQueryStream(options); + sourceStream.pipe(splitter); + return splitter; +} diff --git a/packages/query-splitter/src/splitterStream.test.ts b/packages/query-splitter/src/splitterStream.test.ts new file mode 100644 index 000000000..1c33d48dd --- /dev/null +++ b/packages/query-splitter/src/splitterStream.test.ts @@ -0,0 +1,40 @@ +import { mysqlSplitterOptions, mssqlSplitterOptions, postgreSplitterOptions, noSplitSplitterOptions } from './options'; +import stream from 'stream'; +import { splitQueryStream } from './splitQueryStream'; + +function createInputStream(...lines) { + const pass = new stream.PassThrough({ + objectMode: true, + }); + lines.forEach(line => pass.write(line)); + pass.end(); + return pass; +} + +function streamToArray(streamSource) { + return new Promise((resolve, reject) => { + const res = []; + streamSource.on('data', x => res.push(x)); + streamSource.on('end', () => resolve(res)); + }); +} + +test('stream: simple query', async () => { + const output = await streamToArray(splitQueryStream(createInputStream('select * from A'), mysqlSplitterOptions)); + expect(output).toEqual(['select * from A']); +}); + +test('stream: query on 2 lines', async () => { + const output = await streamToArray(splitQueryStream(createInputStream('select * ', 'from A'), mysqlSplitterOptions)); + expect(output).toEqual(['select * from A']); +}); + +test('stream: query on 2 lines', async () => { + const output = await streamToArray( + splitQueryStream( + createInputStream('SELECT * ', 'FROM `table1`;', 'SELECT *', ' FROM `table2`'), + mysqlSplitterOptions + ) + ); + expect(output).toEqual(['SELECT * FROM `table1`', 'SELECT * FROM `table2`']); +});