duckdb imports/exports WIP

This commit is contained in:
SPRINX0\prochazka
2025-04-24 18:44:10 +02:00
parent d331d48ca2
commit f1d80fadc4
7 changed files with 133 additions and 35 deletions

View File

@@ -125,6 +125,12 @@ module.exports = {
socket.emit(`session-initialize-file-${jslid}`); socket.emit(`session-initialize-file-${jslid}`);
}, },
// eval event handler
handle_runnerDone(conid, database, props) {
const { runid } = props;
socket.emit(`runner-done-${runid}`);
},
async ensureOpened(conid, database) { async ensureOpened(conid, database) {
const existing = this.opened.find(x => x.conid == conid && x.database == database); const existing = this.opened.find(x => x.conid == conid && x.database == database);
if (existing) return existing; if (existing) return existing;
@@ -794,7 +800,8 @@ module.exports = {
}, },
executeSessionQuery_meta: true, executeSessionQuery_meta: true,
async executeSessionQuery({ sesid, conid, database, sql }) { async executeSessionQuery({ sesid, conid, database, sql }, req) {
testConnectionPermission(conid, req);
logger.info({ sesid, sql }, 'Processing query'); logger.info({ sesid, sql }, 'Processing query');
sessions.dispatchMessage(sesid, 'Query execution started'); sessions.dispatchMessage(sesid, 'Query execution started');
@@ -803,4 +810,13 @@ module.exports = {
return { state: 'ok' }; return { state: 'ok' };
}, },
evalJsonScript_meta: true,
async evalJsonScript({ conid, database, script, runid }, req) {
testConnectionPermission(conid, req);
const opened = await this.ensureOpened(conid, database);
opened.subprocess.send({ msgtype: 'evalJsonScript', script, runid });
return { state: 'ok' };
},
}; };

View File

@@ -58,7 +58,7 @@ dbgateApi.initializeApiEnvironment();
${requirePluginsTemplate(extractShellApiPlugins(functionName, props))} ${requirePluginsTemplate(extractShellApiPlugins(functionName, props))}
require=null; require=null;
async function run() { async function run() {
const reader=await ${extractShellApiFunctionName(functionName)}(${JSON.stringify(props)}); const reader=await ${extractShellApiFunctionName(functionName, true)}(${JSON.stringify(props)});
const writer=await dbgateApi.collectorWriter({runid: '${runid}'}); const writer=await dbgateApi.collectorWriter({runid: '${runid}'});
await dbgateApi.copyStream(reader, writer); await dbgateApi.copyStream(reader, writer);
} }

View File

@@ -9,14 +9,18 @@ const {
dbNameLogCategory, dbNameLogCategory,
extractErrorMessage, extractErrorMessage,
extractErrorLogData, extractErrorLogData,
ScriptWriterEval,
SqlGenerator,
playJsonScriptWriter,
} = require('dbgate-tools'); } = require('dbgate-tools');
const requireEngineDriver = require('../utility/requireEngineDriver'); const requireEngineDriver = require('../utility/requireEngineDriver');
const { connectUtility } = require('../utility/connectUtility'); const { connectUtility } = require('../utility/connectUtility');
const { handleProcessCommunication } = require('../utility/processComm'); const { handleProcessCommunication } = require('../utility/processComm');
const { SqlGenerator } = require('dbgate-tools');
const generateDeploySql = require('../shell/generateDeploySql'); const generateDeploySql = require('../shell/generateDeploySql');
const { dumpSqlSelect } = require('dbgate-sqltree'); const { dumpSqlSelect } = require('dbgate-sqltree');
const { allowExecuteCustomScript, handleQueryStream } = require('../utility/handleQueryStream'); const { allowExecuteCustomScript, handleQueryStream } = require('../utility/handleQueryStream');
const dbgateApi = require('../shell');
const requirePlugin = require('../shell/requirePlugin');
const logger = getLogger('dbconnProcess'); const logger = getLogger('dbconnProcess');
@@ -406,6 +410,12 @@ async function handleExecuteSessionQuery({ sesid, sql }) {
process.send({ msgtype: 'done', sesid }); process.send({ msgtype: 'done', sesid });
} }
async function handleEvalJsonScript({ script, runid }) {
const evalWriter = new ScriptWriterEval(dbgateApi, requirePlugin, dbhan);
await playJsonScriptWriter(script, evalWriter);
process.send({ msgtype: 'runnerDone', runid });
}
// async function handleRunCommand({ msgid, sql }) { // async function handleRunCommand({ msgid, sql }) {
// await waitConnected(); // await waitConnected();
// const driver = engines(storedConnection); // const driver = engines(storedConnection);
@@ -437,6 +447,7 @@ const messageHandlers = {
exportKeys: handleExportKeys, exportKeys: handleExportKeys,
schemaList: handleSchemaList, schemaList: handleSchemaList,
executeSessionQuery: handleExecuteSessionQuery, executeSessionQuery: handleExecuteSessionQuery,
evalJsonScript: handleEvalJsonScript,
// runCommand: handleRunCommand, // runCommand: handleRunCommand,
}; };

View File

@@ -1,4 +1,5 @@
import _uniq from 'lodash/uniq'; import _uniq from 'lodash/uniq';
import _cloneDeepWith from 'lodash/cloneDeepWith';
import { evalShellApiFunctionName, extractShellApiFunctionName, extractShellApiPlugins } from './packageTools'; import { evalShellApiFunctionName, extractShellApiFunctionName, extractShellApiPlugins } from './packageTools';
export interface ScriptWriterGeneric { export interface ScriptWriterGeneric {
@@ -43,7 +44,7 @@ export class ScriptWriterJavaScript implements ScriptWriterGeneric {
} }
assign(variableName, functionName, props) { assign(variableName, functionName, props) {
this.assignCore(variableName, extractShellApiFunctionName(functionName), props); this.assignCore(variableName, extractShellApiFunctionName(functionName, true), props);
this.packageNames.push(...extractShellApiPlugins(functionName, props)); this.packageNames.push(...extractShellApiPlugins(functionName, props));
} }
@@ -117,7 +118,7 @@ export class ScriptWriterJson implements ScriptWriterGeneric {
this.commands.push({ this.commands.push({
type: 'assign', type: 'assign',
variableName, variableName,
functionName: extractShellApiFunctionName(functionName), functionName: extractShellApiFunctionName(functionName, false),
props, props,
}); });
@@ -192,11 +193,13 @@ export class ScriptWriterEval implements ScriptWriterGeneric {
dbgateApi: any; dbgateApi: any;
requirePlugin: (name: string) => any; requirePlugin: (name: string) => any;
variables: { [name: string]: any } = {}; variables: { [name: string]: any } = {};
hostConnection: any;
constructor(dbgateApi, requirePlugin, varCount = '0') { constructor(dbgateApi, requirePlugin, hostConnection, varCount = '0') {
this.varCount = parseInt(varCount) || 0; this.varCount = parseInt(varCount) || 0;
this.dbgateApi = dbgateApi; this.dbgateApi = dbgateApi;
this.requirePlugin = requirePlugin; this.requirePlugin = requirePlugin;
this.hostConnection = hostConnection;
} }
allocVariable(prefix = 'var') { allocVariable(prefix = 'var') {
@@ -210,7 +213,14 @@ export class ScriptWriterEval implements ScriptWriterGeneric {
async assign(variableName, functionName, props) { async assign(variableName, functionName, props) {
const func = evalShellApiFunctionName(functionName, this.dbgateApi, this.requirePlugin); const func = evalShellApiFunctionName(functionName, this.dbgateApi, this.requirePlugin);
this.variables[variableName] = await func(props);
this.variables[variableName] = await func(
_cloneDeepWith(props, node => {
if (node?.$hostConnection) {
return this.hostConnection;
}
})
);
} }
assignValue(variableName, jsonValue) { assignValue(variableName, jsonValue) {
@@ -243,32 +253,32 @@ export class ScriptWriterEval implements ScriptWriterGeneric {
} }
} }
export function playJsonScriptWriter(json, script) { export async function playJsonScriptWriter(json, script: ScriptWriterGeneric) {
for (const cmd of json.commands) { for (const cmd of json.commands) {
switch (cmd.type) { switch (cmd.type) {
case 'assign': case 'assign':
script.assignCore(cmd.variableName, cmd.functionName, cmd.props); await script.assign(cmd.variableName, cmd.functionName, cmd.props);
break; break;
case 'assignValue': case 'assignValue':
script.assignValue(cmd.variableName, cmd.jsonValue); await script.assignValue(cmd.variableName, cmd.jsonValue);
break; break;
case 'copyStream': case 'copyStream':
script.copyStream(cmd.sourceVar, cmd.targetVar, cmd.colmapVar, cmd.progressName); await script.copyStream(cmd.sourceVar, cmd.targetVar, cmd.colmapVar, cmd.progressName);
break; break;
case 'endLine': case 'endLine':
script.endLine(); await script.endLine();
break; break;
case 'comment': case 'comment':
script.comment(cmd.text); await script.comment(cmd.text);
break; break;
case 'importDatabase': case 'importDatabase':
script.importDatabase(cmd.options); await script.importDatabase(cmd.options);
break; break;
case 'dataReplicator': case 'dataReplicator':
script.dataReplicator(cmd.options); await script.dataReplicator(cmd.options);
break; break;
case 'zipDirectory': case 'zipDirectory':
script.zipDirectory(cmd.inputDirectory, cmd.outputFile); await script.zipDirectory(cmd.inputDirectory, cmd.outputFile);
break; break;
} }
} }

View File

@@ -27,12 +27,15 @@ export function extractPackageName(name): string {
return null; return null;
} }
export function extractShellApiFunctionName(functionName) { export function extractShellApiFunctionName(functionName, usePrefixForDbGateApi) {
const nsMatch = functionName.match(/^([^@]+)@([^@]+)/); const nsMatch = functionName.match(/^([^@]+)@([^@]+)/);
if (nsMatch) { if (nsMatch) {
return `${_camelCase(nsMatch[2])}.shellApi.${nsMatch[1]}`; return `${_camelCase(nsMatch[2])}.shellApi.${nsMatch[1]}`;
} }
if (usePrefixForDbGateApi) {
return `dbgateApi.${functionName}`; return `dbgateApi.${functionName}`;
}
return functionName;
} }
export function evalShellApiFunctionName(functionName, dbgateApi, requirePlugin) { export function evalShellApiFunctionName(functionName, dbgateApi, requirePlugin) {

View File

@@ -63,14 +63,22 @@ async function getConnection(extensions, storageType, conid, database) {
return [null, null]; return [null, null];
} }
function getSourceExpr(extensions, sourceName, values, sourceConnection, sourceDriver) { function getSourceExpr(extensions, sourceName, values, sourceConnection, sourceDriver, hostConnection) {
const { sourceStorageType } = values; const { sourceStorageType } = values;
const connectionParams =
sourceDriver?.singleConnectionOnly && hostConnection
? {
systemConnection: { $hostConnection: true },
}
: {
connection: sourceConnection,
};
if (sourceStorageType == 'database') { if (sourceStorageType == 'database') {
const fullName = { schemaName: values.sourceSchemaName, pureName: sourceName }; const fullName = { schemaName: values.sourceSchemaName, pureName: sourceName };
return [ return [
'tableReader', 'tableReader',
{ {
connection: sourceConnection, ...connectionParams,
...extractDriverApiParameters(values, 'source', sourceDriver), ...extractDriverApiParameters(values, 'source', sourceDriver),
...fullName, ...fullName,
}, },
@@ -80,7 +88,7 @@ function getSourceExpr(extensions, sourceName, values, sourceConnection, sourceD
return [ return [
'queryReader', 'queryReader',
{ {
connection: sourceConnection, ...connectionParams,
...extractDriverApiParameters(values, 'source', sourceDriver), ...extractDriverApiParameters(values, 'source', sourceDriver),
queryType: values.sourceQueryType, queryType: values.sourceQueryType,
query: values.sourceQueryType == 'json' ? JSON.parse(values.sourceQuery) : values.sourceQuery, query: values.sourceQueryType == 'json' ? JSON.parse(values.sourceQuery) : values.sourceQuery,
@@ -145,8 +153,16 @@ function getFlagsFroAction(action) {
}; };
} }
function getTargetExpr(extensions, sourceName, values, targetConnection, targetDriver) { function getTargetExpr(extensions, sourceName, values, targetConnection, targetDriver, hostConnection) {
const { targetStorageType } = values; const { targetStorageType } = values;
const connectionParams =
targetDriver?.singleConnectionOnly && hostConnection
? {
systemConnection: { $hostConnection: true },
}
: {
connection: targetConnection,
};
const format = findFileFormat(extensions, targetStorageType); const format = findFileFormat(extensions, targetStorageType);
if (format && format.writerFunc) { if (format && format.writerFunc) {
const outputParams = format.getOutputParams && format.getOutputParams(sourceName, values); const outputParams = format.getOutputParams && format.getOutputParams(sourceName, values);
@@ -166,7 +182,7 @@ function getTargetExpr(extensions, sourceName, values, targetConnection, targetD
return [ return [
'tableWriter', 'tableWriter',
{ {
connection: targetConnection, ...connectionParams,
schemaName: values.targetSchemaName, schemaName: values.targetSchemaName,
pureName: getTargetName(extensions, sourceName, values), pureName: getTargetName(extensions, sourceName, values),
...extractDriverApiParameters(values, 'target', targetDriver), ...extractDriverApiParameters(values, 'target', targetDriver),
@@ -203,7 +219,7 @@ export function normalizeExportColumnMap(colmap) {
return null; return null;
} }
export default async function createImpExpScript(extensions, values, format = undefined) { export default async function createImpExpScript(extensions, values, format = undefined, detectHostConnection = false) {
const config = getCurrentConfig(); const config = getCurrentConfig();
let script: ScriptWriterGeneric = new ScriptWriterJson(values.startVariableIndex || 0); let script: ScriptWriterGeneric = new ScriptWriterJson(values.startVariableIndex || 0);
if (format == 'script' && config.allowShellScripting) { if (format == 'script' && config.allowShellScripting) {
@@ -223,15 +239,39 @@ export default async function createImpExpScript(extensions, values, format = un
values.targetDatabaseName values.targetDatabaseName
); );
let hostConnection = null;
if (detectHostConnection) {
// @ts-ignore
if (sourceDriver?.singleConnectionOnly) {
hostConnection = { conid: values.sourceConnectionId, database: values.sourceDatabaseName };
}
// @ts-ignore
if (targetDriver?.singleConnectionOnly) {
if (
hostConnection &&
(hostConnection.conid != values.targetConnectionId || hostConnection.database != values.targetDatabaseName)
) {
throw new Error('Cannot use two different single-connections in the same script');
}
hostConnection = { conid: values.targetConnectionId, database: values.targetDatabaseName };
}
}
const sourceList = getAsArray(values.sourceList); const sourceList = getAsArray(values.sourceList);
for (const sourceName of sourceList) { for (const sourceName of sourceList) {
const sourceVar = script.allocVariable(); const sourceVar = script.allocVariable();
script.assign(
sourceVar,
// @ts-ignore // @ts-ignore
script.assign(sourceVar, ...getSourceExpr(extensions, sourceName, values, sourceConnection, sourceDriver)); ...getSourceExpr(extensions, sourceName, values, sourceConnection, sourceDriver, hostConnection)
);
const targetVar = script.allocVariable(); const targetVar = script.allocVariable();
script.assign(
targetVar,
// @ts-ignore // @ts-ignore
script.assign(targetVar, ...getTargetExpr(extensions, sourceName, values, targetConnection, targetDriver)); ...getTargetExpr(extensions, sourceName, values, targetConnection, targetDriver, hostConnection)
);
const colmap = normalizeExportColumnMap(values[`columns_${sourceName}`]); const colmap = normalizeExportColumnMap(values[`columns_${sourceName}`]);
@@ -251,7 +291,11 @@ export default async function createImpExpScript(extensions, values, format = un
script.zipDirectory('.', values.createZipFileInArchive ? 'archive:' + zipFileName : zipFileName); script.zipDirectory('.', values.createZipFileInArchive ? 'archive:' + zipFileName : zipFileName);
} }
return script.getScript(values.schedule); const res = script.getScript(values.schedule);
if (format == 'json') {
res.hostConnection = hostConnection;
}
return res;
} }
export function getActionOptions(extensions, source, values, targetDbinfo) { export function getActionOptions(extensions, source, values, targetDbinfo) {
@@ -289,7 +333,7 @@ export async function createPreviewReader(extensions, values, sourceName) {
values.sourceConnectionId, values.sourceConnectionId,
values.sourceDatabaseName values.sourceDatabaseName
); );
const [functionName, props] = getSourceExpr(extensions, sourceName, values, sourceConnection, sourceDriver); const [functionName, props] = getSourceExpr(extensions, sourceName, values, sourceConnection, sourceDriver, null);
return { return {
functionName, functionName,
props: { props: {

View File

@@ -50,6 +50,8 @@
import { registerFileCommands } from '../commands/stdCommands'; import { registerFileCommands } from '../commands/stdCommands';
import ToolStripCommandButton from '../buttons/ToolStripCommandButton.svelte'; import ToolStripCommandButton from '../buttons/ToolStripCommandButton.svelte';
import ToolStripSaveButton from '../buttons/ToolStripSaveButton.svelte'; import ToolStripSaveButton from '../buttons/ToolStripSaveButton.svelte';
import uuidv1 from 'uuid/v1';
import { tick } from 'svelte';
let busy = false; let busy = false;
let executeNumber = 0; let executeNumber = 0;
@@ -183,12 +185,24 @@
progressHolder = {}; progressHolder = {};
const values = $formValues as any; const values = $formValues as any;
busy = true; busy = true;
const script = await createImpExpScript($extensions, values, 'json'); const script = await createImpExpScript($extensions, values, 'json', true);
executeNumber += 1; executeNumber += 1;
if (script.hostConnection) {
runnerId = uuidv1();
await tick();
await apiCall('database-connections/eval-json-script', {
runid: runnerId,
conid: script.hostConnection.conid,
database: script.hostConnection.database,
script,
});
} else {
let runid = runnerId; let runid = runnerId;
const resp = await apiCall('runners/start', { script }); const resp = await apiCall('runners/start', { script });
runid = resp.runid; runid = resp.runid;
runnerId = runid; runnerId = runid;
}
if (values.targetStorageType == 'archive') { if (values.targetStorageType == 'archive') {
refreshArchiveFolderRef.set(values.targetArchiveFolder); refreshArchiveFolderRef.set(values.targetArchiveFolder);