feat: transform geography binary data to wkt

This commit is contained in:
Nybkox
2024-12-12 16:04:34 +01:00
parent 0b30386fee
commit b8ae53db7d
4 changed files with 77 additions and 8 deletions

View File

@@ -3,6 +3,7 @@ const stream = require('stream');
const driverBases = require('../frontend/drivers');
const Analyser = require('./Analyser');
const wkx = require('wkx');
const pg = require('pg');
const pgCopyStreams = require('pg-copy-streams');
const {
@@ -21,10 +22,38 @@ pg.types.setTypeParser(1082, 'text', val => val); // date
pg.types.setTypeParser(1114, 'text', val => val); // timestamp without timezone
pg.types.setTypeParser(1184, 'text', val => val); // timestamp
function extractPostgresColumns(result) {
function extractGeographyDate(value) {
try {
const buffer = Buffer.from(value, 'hex');
const parsed = wkx.Geometry.parse(buffer).toWkt();
return parsed;
} catch (_err) {
return value;
}
}
function transformRow(row, columnsToTransform) {
if (!columnsToTransform?.length) return row;
for (const col of columnsToTransform) {
const { columnName, dataTypeName } = col;
if (dataTypeName == 'geography') {
row[columnName] = extractGeographyDate(row[columnName]);
}
}
return row;
}
function extractPostgresColumns(result, dbhan) {
if (!result || !result.fields) return [];
const { typeIdToName = {} } = dbhan;
const res = result.fields.map(fld => ({
columnName: fld.name,
dataTypeId: fld.dataTypeID,
dataTypeName: typeIdToName[fld.dataTypeID],
}));
makeUniqueColumnNames(res);
return res;
@@ -105,6 +134,18 @@ const drivers = driverBases.map(driverBase => ({
database,
};
const datatypes = await this.query(
dbhan,
`SELECT oid AS datatypeid, typname AS datatypename FROM pg_type WHERE typname in ('geography')`
);
const typeIdToName = datatypes.rows.reduce((acc, cur) => {
acc[cur.datatypeid] = cur.datatypename;
return acc;
}, {});
dbhan['typeIdToName'] = typeIdToName;
if (isReadOnly) {
await this.query(dbhan, 'SET SESSION CHARACTERISTICS AS TRANSACTION READ ONLY');
}
@@ -122,7 +163,7 @@ const drivers = driverBases.map(driverBase => ({
};
}
const res = await dbhan.client.query({ text: sql, rowMode: 'array' });
const columns = extractPostgresColumns(res);
const columns = extractPostgresColumns(res, dbhan);
return { rows: (res.rows || []).map(row => zipDataRow(row, columns)), columns };
},
stream(dbhan, sql, options) {
@@ -132,17 +173,26 @@ const drivers = driverBases.map(driverBase => ({
});
let wasHeader = false;
let columnsToTransform = null;
query.on('row', row => {
if (!wasHeader) {
columns = extractPostgresColumns(query._result);
columns = extractPostgresColumns(query._result, dbhan);
if (columns && columns.length > 0) {
options.recordset(columns);
}
wasHeader = true;
}
options.row(zipDataRow(row, columns));
if (!columnsToTransform) {
const transormableTypeNames = Object.values(dbhan.typeIdToName ?? {});
columnsToTransform = columns.filter(x => transormableTypeNames.includes(x.dataTypeName));
}
const zippedRow = zipDataRow(row, columns);
const transformedRow = transformRow(zippedRow, columnsToTransform);
options.row(transformedRow);
});
query.on('end', () => {
@@ -157,7 +207,7 @@ const drivers = driverBases.map(driverBase => ({
}
if (!wasHeader) {
columns = extractPostgresColumns(query._result);
columns = extractPostgresColumns(query._result, dbhan);
if (columns && columns.length > 0) {
options.recordset(columns);
}
@@ -234,6 +284,8 @@ const drivers = driverBases.map(driverBase => ({
let wasHeader = false;
let columns = null;
let columnsToTransform = null;
const pass = new stream.PassThrough({
objectMode: true,
highWaterMark: 100,
@@ -241,7 +293,7 @@ const drivers = driverBases.map(driverBase => ({
query.on('row', row => {
if (!wasHeader) {
columns = extractPostgresColumns(query._result);
columns = extractPostgresColumns(query._result, dbhan);
pass.write({
__isStreamHeader: true,
...(structure || { columns }),
@@ -249,12 +301,20 @@ const drivers = driverBases.map(driverBase => ({
wasHeader = true;
}
pass.write(zipDataRow(row, columns));
if (!columnsToTransform) {
const transormableTypeNames = Object.values(dbhan.typeIdToName ?? {});
columnsToTransform = columns.filter(x => transormableTypeNames.includes(x.dataTypeName));
}
const zippedRow = zipDataRow(row, columns);
const transformedRow = transformRow(zippedRow, columnsToTransform);
options.row(transformedRow);
});
query.on('end', () => {
if (!wasHeader) {
columns = extractPostgresColumns(query._result);
columns = extractPostgresColumns(query._result, dbhan);
pass.write({
__isStreamHeader: true,
...(structure || { columns }),