export from duckdb works

This commit is contained in:
Jan Prochazka
2025-04-25 16:06:10 +02:00
parent f1d80fadc4
commit 5ab980ce1a
8 changed files with 91 additions and 54 deletions

View File

@@ -131,6 +131,12 @@ module.exports = {
socket.emit(`runner-done-${runid}`);
},
handle_progress(conid, database, progressData) {
const { progressName } = progressData;
const { name, runid } = progressName;
socket.emit(`runner-progress-${runid}`, { ...progressData, progressName: name });
},
async ensureOpened(conid, database) {
const existing = this.opened.find(x => x.conid == conid && x.database == database);
if (existing) return existing;

View File

@@ -8,7 +8,7 @@ const { fork, spawn } = require('child_process');
const { rundir, uploadsdir, pluginsdir, getPluginBackendPath, packagedPluginList } = require('../utility/directories');
const {
extractShellApiPlugins,
extractShellApiFunctionName,
compileShellApiFunctionName,
jsonScriptToJavascript,
getLogger,
safeJsonParse,
@@ -58,7 +58,7 @@ dbgateApi.initializeApiEnvironment();
${requirePluginsTemplate(extractShellApiPlugins(functionName, props))}
require=null;
async function run() {
const reader=await ${extractShellApiFunctionName(functionName, true)}(${JSON.stringify(props)});
const reader=await ${compileShellApiFunctionName(functionName)}(${JSON.stringify(props)});
const writer=await dbgateApi.collectorWriter({runid: '${runid}'});
await dbgateApi.copyStream(reader, writer);
}
@@ -273,7 +273,7 @@ module.exports = {
const runid = crypto.randomUUID();
if (script.type == 'json') {
const js = jsonScriptToJavascript(script);
const js = await jsonScriptToJavascript(script);
return this.startCore(runid, scriptTemplate(js, false));
}
@@ -335,7 +335,7 @@ module.exports = {
return { errorMessage: 'Only JSON scripts are allowed' };
}
const promise = new Promise((resolve, reject) => {
const promise = new Promise(async (resolve, reject) => {
const runid = crypto.randomUUID();
this.requests[runid] = { resolve, reject, exitOnStreamError: true };
const cloned = _.cloneDeepWith(script, node => {
@@ -343,7 +343,7 @@ module.exports = {
return runid;
}
});
const js = jsonScriptToJavascript(cloned);
const js = await jsonScriptToJavascript(cloned);
this.startCore(runid, scriptTemplate(js, false));
});
return promise;

View File

@@ -21,6 +21,9 @@ const { dumpSqlSelect } = require('dbgate-sqltree');
const { allowExecuteCustomScript, handleQueryStream } = require('../utility/handleQueryStream');
const dbgateApi = require('../shell');
const requirePlugin = require('../shell/requirePlugin');
const path = require('path');
const { rundir } = require('../utility/directories');
const fs = require('fs-extra');
const logger = getLogger('dbconnProcess');
@@ -411,9 +414,19 @@ async function handleExecuteSessionQuery({ sesid, sql }) {
}
async function handleEvalJsonScript({ script, runid }) {
const evalWriter = new ScriptWriterEval(dbgateApi, requirePlugin, dbhan);
await playJsonScriptWriter(script, evalWriter);
process.send({ msgtype: 'runnerDone', runid });
const directory = path.join(rundir(), runid);
fs.mkdirSync(directory);
const originalCwd = process.cwd();
try {
process.chdir(directory);
const evalWriter = new ScriptWriterEval(dbgateApi, requirePlugin, dbhan, runid);
await playJsonScriptWriter(script, evalWriter);
process.send({ msgtype: 'runnerDone', runid });
} finally {
process.chdir(originalCwd);
}
}
// async function handleRunCommand({ msgid, sql }) {

View File

@@ -1,6 +1,6 @@
import _uniq from 'lodash/uniq';
import _cloneDeepWith from 'lodash/cloneDeepWith';
import { evalShellApiFunctionName, extractShellApiFunctionName, extractShellApiPlugins } from './packageTools';
import { evalShellApiFunctionName, compileShellApiFunctionName, extractShellApiPlugins } from './packageTools';
export interface ScriptWriterGeneric {
allocVariable(prefix?: string);
@@ -44,7 +44,7 @@ export class ScriptWriterJavaScript implements ScriptWriterGeneric {
}
assign(variableName, functionName, props) {
this.assignCore(variableName, extractShellApiFunctionName(functionName, true), props);
this.assignCore(variableName, compileShellApiFunctionName(functionName), props);
this.packageNames.push(...extractShellApiPlugins(functionName, props));
}
@@ -56,10 +56,10 @@ export class ScriptWriterJavaScript implements ScriptWriterGeneric {
this.packageNames.push(packageName);
}
copyStream(sourceVar, targetVar, colmapVar = null, progressName?: string) {
copyStream(sourceVar, targetVar, colmapVar = null, progressName?: string | { name: string; runid: string }) {
let opts = '{';
if (colmapVar) opts += `columns: ${colmapVar}, `;
if (progressName) opts += `progressName: "${progressName}", `;
if (progressName) opts += `progressName: ${JSON.stringify(progressName)}, `;
opts += '}';
this._put(`await dbgateApi.copyStream(${sourceVar}, ${targetVar}, ${opts});`);
@@ -118,7 +118,7 @@ export class ScriptWriterJson implements ScriptWriterGeneric {
this.commands.push({
type: 'assign',
variableName,
functionName: extractShellApiFunctionName(functionName, false),
functionName,
props,
});
@@ -137,7 +137,7 @@ export class ScriptWriterJson implements ScriptWriterGeneric {
});
}
copyStream(sourceVar, targetVar, colmapVar = null, progressName?: string) {
copyStream(sourceVar, targetVar, colmapVar = null, progressName?: string | { name: string; runid: string }) {
this.commands.push({
type: 'copyStream',
sourceVar,
@@ -194,12 +194,14 @@ export class ScriptWriterEval implements ScriptWriterGeneric {
requirePlugin: (name: string) => any;
variables: { [name: string]: any } = {};
hostConnection: any;
runid: string;
constructor(dbgateApi, requirePlugin, hostConnection, varCount = '0') {
constructor(dbgateApi, requirePlugin, hostConnection, runid, varCount = '0') {
this.varCount = parseInt(varCount) || 0;
this.dbgateApi = dbgateApi;
this.requirePlugin = requirePlugin;
this.hostConnection = hostConnection;
this.runid = runid;
}
allocVariable(prefix = 'var') {
@@ -227,9 +229,15 @@ export class ScriptWriterEval implements ScriptWriterGeneric {
this.variables[variableName] = jsonValue;
}
async copyStream(sourceVar, targetVar, colmapVar = null, progressName?: string) {
async copyStream(sourceVar, targetVar, colmapVar = null, progressName?: string | { name: string; runid: string }) {
await this.dbgateApi.copyStream(this.variables[sourceVar], this.variables[targetVar], {
progressName,
progressName: _cloneDeepWith(progressName, node => {
if (node?.$runid) {
if (node?.$runid) {
return this.runid;
}
}
}),
columns: colmapVar ? this.variables[colmapVar] : null,
});
}
@@ -253,38 +261,42 @@ export class ScriptWriterEval implements ScriptWriterGeneric {
}
}
export async function playJsonScriptWriter(json, script: ScriptWriterGeneric) {
for (const cmd of json.commands) {
switch (cmd.type) {
case 'assign':
await script.assign(cmd.variableName, cmd.functionName, cmd.props);
break;
case 'assignValue':
await script.assignValue(cmd.variableName, cmd.jsonValue);
break;
case 'copyStream':
await script.copyStream(cmd.sourceVar, cmd.targetVar, cmd.colmapVar, cmd.progressName);
break;
case 'endLine':
await script.endLine();
break;
case 'comment':
await script.comment(cmd.text);
break;
case 'importDatabase':
await script.importDatabase(cmd.options);
break;
case 'dataReplicator':
await script.dataReplicator(cmd.options);
break;
case 'zipDirectory':
await script.zipDirectory(cmd.inputDirectory, cmd.outputFile);
break;
}
async function playJsonCommand(cmd, script: ScriptWriterGeneric) {
switch (cmd.type) {
case 'assign':
await script.assign(cmd.variableName, cmd.functionName, cmd.props);
break;
case 'assignValue':
await script.assignValue(cmd.variableName, cmd.jsonValue);
break;
case 'copyStream':
await script.copyStream(cmd.sourceVar, cmd.targetVar, cmd.colmapVar, cmd.progressName);
break;
case 'endLine':
await script.endLine();
break;
case 'comment':
await script.comment(cmd.text);
break;
case 'importDatabase':
await script.importDatabase(cmd.options);
break;
case 'dataReplicator':
await script.dataReplicator(cmd.options);
break;
case 'zipDirectory':
await script.zipDirectory(cmd.inputDirectory, cmd.outputFile);
break;
}
}
export function jsonScriptToJavascript(json) {
export async function playJsonScriptWriter(json, script: ScriptWriterGeneric) {
for (const cmd of json.commands) {
await playJsonCommand(cmd, script);
}
}
export async function jsonScriptToJavascript(json) {
const { schedule, packageNames } = json;
const script = new ScriptWriterJavaScript();
for (const packageName of packageNames) {
@@ -294,7 +306,7 @@ export function jsonScriptToJavascript(json) {
script.packageNames.push(packageName);
}
playJsonScriptWriter(json, script);
await playJsonScriptWriter(json, script);
return script.getScript(schedule);
}

View File

@@ -27,15 +27,12 @@ export function extractPackageName(name): string {
return null;
}
export function extractShellApiFunctionName(functionName, usePrefixForDbGateApi) {
export function compileShellApiFunctionName(functionName) {
const nsMatch = functionName.match(/^([^@]+)@([^@]+)/);
if (nsMatch) {
return `${_camelCase(nsMatch[2])}.shellApi.${nsMatch[1]}`;
}
if (usePrefixForDbGateApi) {
return `dbgateApi.${functionName}`;
}
return functionName;
return `dbgateApi.${functionName}`;
}
export function evalShellApiFunctionName(functionName, dbgateApi, requirePlugin) {

View File

@@ -69,6 +69,7 @@ function getSourceExpr(extensions, sourceName, values, sourceConnection, sourceD
sourceDriver?.singleConnectionOnly && hostConnection
? {
systemConnection: { $hostConnection: true },
connection: sourceDriver?.engine,
}
: {
connection: sourceConnection,
@@ -159,6 +160,7 @@ function getTargetExpr(extensions, sourceName, values, targetConnection, targetD
targetDriver?.singleConnectionOnly && hostConnection
? {
systemConnection: { $hostConnection: true },
connection: targetDriver?.engine,
}
: {
connection: targetConnection,
@@ -281,7 +283,12 @@ export default async function createImpExpScript(extensions, values, format = un
script.assignValue(colmapVar, colmap);
}
script.copyStream(sourceVar, targetVar, colmapVar, sourceName);
script.copyStream(
sourceVar,
targetVar,
colmapVar,
hostConnection ? { name: sourceName, runid: { $runid: true } } : sourceName
);
script.endLine();
}

View File

@@ -169,7 +169,7 @@
const handleGenerateScript = async e => {
const values = $formValues as any;
const code = await createImpExpScript($extensions, values, 'script');
const code = await createImpExpScript($extensions, values, 'script', true);
openNewTab(
{
title: 'Shell #',

View File

@@ -168,6 +168,8 @@ const driver = {
pass.write(row);
}
pass.end();
return pass;
},
async writeTable(dbhan, name, options) {