copy from stdin WIP

This commit is contained in:
SPRINX0\prochazka
2024-09-26 16:06:54 +02:00
parent a08a8ef208
commit ae599ac6f6
3 changed files with 25 additions and 7 deletions

View File

@@ -9,14 +9,24 @@ const { getLogger } = require('dbgate-tools');
const logger = getLogger('importDb');
class ImportStream extends stream.Transform {
constructor(pool, driver) {
constructor(dbhan, driver) {
super({ objectMode: true });
this.pool = pool;
this.dbhan = dbhan;
this.driver = driver;
this.writeQueryStream = null;
}
async _transform(chunk, encoding, cb) {
try {
await this.driver.script(this.pool, chunk, { queryOptions: { importSqlDump: true } });
if (chunk.specialMarker == 'copy_stdin_start') {
this.writeQueryStream = await this.driver.writeQueryFromStream(this.dbhan, chunk.text);
} else if (chunk.specialMarker == 'copy_stdin_data') {
this.writeQueryStream.write(chunk.text);
} else if (chunk.specialMarker == 'copy_stdin_end') {
this.writeQueryStream.end();
this.writeQueryStream = null;
} else {
await this.driver.script(this.dbhan, chunk.text, { queryOptions: { importSqlDump: true } });
}
} catch (err) {
this.emit('error', err.message);
}
@@ -44,7 +54,7 @@ async function importDatabase({ connection = undefined, systemConnection = undef
logger.info(`Importing database`);
if (!driver) driver = requireEngineDriver(connection);
const pool = systemConnection || (await connectUtility(driver, connection, 'write'));
const dbhan = systemConnection || (await connectUtility(driver, connection, 'write'));
logger.info(`Connected.`);
logger.info(`Input file: ${inputFile}`);
@@ -52,8 +62,11 @@ async function importDatabase({ connection = undefined, systemConnection = undef
logger.info(`Downloaded file: ${downloadedFile}`);
const fileStream = fs.createReadStream(downloadedFile, 'utf-8');
const splittedStream = splitQueryStream(fileStream, driver.getQuerySplitterOptions('script'));
const importStream = new ImportStream(pool, driver);
const splittedStream = splitQueryStream(fileStream, {
...driver.getQuerySplitterOptions('import'),
returnRichInfo: true,
});
const importStream = new ImportStream(dbhan, driver);
// @ts-ignore
splittedStream.pipe(importStream);
await awaitStreamEnd(importStream);

View File

@@ -220,7 +220,7 @@ export interface EngineDriver extends FilterBehaviourProvider {
getCollectionUpdateScript(changeSet: any, collectionInfo: CollectionInfo): string;
createDatabase(dbhan: DatabaseHandle, name: string): Promise;
dropDatabase(dbhan: DatabaseHandle, name: string): Promise;
getQuerySplitterOptions(usage: 'stream' | 'script' | 'editor'): any;
getQuerySplitterOptions(usage: 'stream' | 'script' | 'editor' | 'import'): any;
script(dbhan: DatabaseHandle, sql: string, options?: RunScriptOptions): Promise;
operation(dbhan: DatabaseHandle, operation: {}, options?: RunScriptOptions): Promise;
getNewObjectTemplates(): NewObjectTemplate[];

View File

@@ -124,6 +124,11 @@ const postgresDriverBase = {
getQuerySplitterOptions: usage =>
usage == 'editor'
? { ...postgreSplitterOptions, ignoreComments: true, preventSingleLineSplit: true }
: usage == 'import'
? {
...postgreSplitterOptions,
copyFromStdin: true,
}
: postgreSplitterOptions,
readOnlySessions: true,