From 6689849f977b9acb300cd71a076c0ba4566bafef Mon Sep 17 00:00:00 2001 From: Stela Augustinova Date: Tue, 23 Dec 2025 19:04:41 +0100 Subject: [PATCH] Add field insert functionality for Redis data types --- .../api/src/proc/databaseConnectionProcess.js | 34 +++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/packages/api/src/proc/databaseConnectionProcess.js b/packages/api/src/proc/databaseConnectionProcess.js index 1ea2a362c..050694b21 100644 --- a/packages/api/src/proc/databaseConnectionProcess.js +++ b/packages/api/src/proc/databaseConnectionProcess.js @@ -393,18 +393,52 @@ async function handleSaveRedisData({ msgid, changeSet }) { } } } + if (change.inserts && Array.isArray(change.inserts)) { + for (const insert of change.inserts) { + await driver.query(dbhan, `HSET "${change.key}" "${insert.key}" "${insert.value}"`); + + if (insert.ttl !== undefined && insert.ttl !== null && insert.ttl !== -1) { + try { + await dbhan.client.call('HEXPIRE', change.key, insert.ttl, 'FIELDS', 1, insert.key); + } catch (e) {} + } + } + } } else if (change.type === 'zset') { if (change.updates && Array.isArray(change.updates)) { for (const update of change.updates) { await driver.query(dbhan, `ZADD "${change.key}" ${update.score} "${update.member}"`); } } + if (change.inserts && Array.isArray(change.inserts)) { + for (const insert of change.inserts) { + await driver.query(dbhan, `ZADD "${change.key}" ${insert.score} "${insert.member}"`); + } + } } else if (change.type === 'list') { if (change.updates && Array.isArray(change.updates)) { for (const update of change.updates) { await driver.query(dbhan, `LSET "${change.key}" ${update.index} "${update.value}"`); } } + if (change.inserts && Array.isArray(change.inserts)) { + for (const insert of change.inserts) { + await driver.query(dbhan, `RPUSH "${change.key}" "${insert.value}"`); + } + } + } else if (change.type === 'set') { + if (change.inserts && Array.isArray(change.inserts)) { + for (const insert of change.inserts) { + await driver.query(dbhan, `SADD "${change.key}" "${insert.value}"`); + } + } + } else if (change.type === 'stream') { + if (change.inserts && Array.isArray(change.inserts)) { + for (const insert of change.inserts) { + const streamId = insert.id === '*' || !insert.id ? '*' : insert.id; + await driver.query(dbhan, `XADD "${change.key}" ${streamId} value "${insert.value}"`); + } + } } }