From 95fdf45cde6dac1a31ea86eec9885d5030daff1a Mon Sep 17 00:00:00 2001 From: Jacob Schmidt Date: Fri, 7 Feb 2025 22:34:56 -0600 Subject: [PATCH] Update ArmaDragonflyClient Extension: Implement pub/sub messaging system with event routing, chunking support, and consistent message handling --- addons/db/XEH_PREP.hpp | 2 + addons/db/XEH_postInit.sqf | 3 +- addons/db/functions/fnc_fetch.sqf | 16 +---- addons/db/functions/fnc_pubSubFetch.sqf | 54 +++++++++++++++++ addons/db/functions/fnc_pubSubHandler.sqf | 53 ++++++++++++++++ extension/src/DragonflyClient.cs | 10 ++- extension/src/DragonflyDB.cs | 74 ++++++++++++++++++++++- extension/src/Main.cs | 39 ++++++++++-- extension/src/Utils.cs | 28 +++++++++ 9 files changed, 256 insertions(+), 23 deletions(-) create mode 100644 addons/db/functions/fnc_pubSubFetch.sqf create mode 100644 addons/db/functions/fnc_pubSubHandler.sqf diff --git a/addons/db/XEH_PREP.hpp b/addons/db/XEH_PREP.hpp index 9e79ebc..3957e2c 100644 --- a/addons/db/XEH_PREP.hpp +++ b/addons/db/XEH_PREP.hpp @@ -17,6 +17,8 @@ PREP(listGet); PREP(listLoad); PREP(listRemove); PREP(listSet); +PREP(pubSubFetch); +PREP(pubSubHandler); PREP(processQueue); PREP(saveDB); PREP(scheduler); diff --git a/addons/db/XEH_postInit.sqf b/addons/db/XEH_postInit.sqf index 206987d..02650ac 100644 --- a/addons/db/XEH_postInit.sqf +++ b/addons/db/XEH_postInit.sqf @@ -1,7 +1,8 @@ #include "script_component.hpp" GVAR(isProcessing) = false; -GVAR(fetchArray) = []; +GVAR(fetch_array) = []; +GVAR(pubSubFetch_array) = []; GVAR(taskQueue) = []; // addMissionEventHandler ["ExtensionCallback", { diff --git a/addons/db/functions/fnc_fetch.sqf b/addons/db/functions/fnc_fetch.sqf index db15298..32b00ed 100644 --- a/addons/db/functions/fnc_fetch.sqf +++ b/addons/db/functions/fnc_fetch.sqf @@ -23,18 +23,11 @@ diag_log _this; params ["_uniqueID", "_function", "_index", "_total", "_datachunk", "_call", "_netId"]; + private _dataString = ""; private _index_array = []; private _count_total = -1; -// _uniqueID = parseNumber _uniqueID; -// _function -// _index = parseNumber _index; -// _total = parseNumber _total; -// _datachunk -// _call -// _netId - dragonfly_db_fetch_array pushBackUnique [_uniqueID, _function, _index, _total, _datachunk, _call, _netId]; _count_total = { @@ -48,19 +41,14 @@ _count_total = { if (_count_total == _total) then { _index_array sort true; + for "_i" from 0 to (_total - 1) do { _dataString = _dataString + ((_index_array select _i) select 1); }; diag_log _dataString; - // Old direct execution - // (parseSimpleArray _dataString) call (missionNamespace getVariable [_function, { - // hint "Function does not exist!"; - // }]); - [_uniqueID, _function, _call, (parseSimpleArray _dataString), _netId] call FUNC(handler); - // Cleanup dragonfly_db_fetch_array = dragonfly_db_fetch_array select {!((_x select 0) in [_uniqueID])}; }; \ No newline at end of file diff --git a/addons/db/functions/fnc_pubSubFetch.sqf b/addons/db/functions/fnc_pubSubFetch.sqf new file mode 100644 index 0000000..d864fa2 --- /dev/null +++ b/addons/db/functions/fnc_pubSubFetch.sqf @@ -0,0 +1,54 @@ +#include "script_component.hpp" + +/* + @file Title: ArmaDragonflyClient Framework by Creedcoder, J.Schmidt + @file Version: 0.1 + @file Name: fnc_pubSubFetch.sqf + @file Author: Creedcoder, J.Schmidt + @file edit: 03.25.2024 + Copyright © 2024 Creedcoder, J.Schmidt, All rights reserved + + Do not edit without permission! + + This work is licensed under the Creative Commons Attribution-NonCommercial-ShareAlikes 4.0 International License. + To view a copy of this license, visit http://creativecommons.org/licenses/by-nc-nd/4.0/ or send a letter to Creative Commons, + 444 Castro Street, Suite 900, Mountain View, California, 94041, USA. + + Fetch from DB: + dragonfly_db_pubSubFetch_array + + ["uniqueID", "evetnType", "eventName", "index", "indextotal", "datachunk", "target"] +*/ + +diag_log _this; + +params ["_uniqueID", "_eventType", "_eventName", "_index", "_total", "_datachunk", "_target"]; + +private _dataString = ""; +private _index_array = []; +private _count_total = -1; + +dragonfly_db_pubSubFetch_array pushBackUnique [_uniqueID, _eventType, _eventName, _index, _total, _datachunk, _target]; + +_count_total = { + if (_uniqueID == _x select 0) then { + _index_array pushBackUnique [_x select 3, _x select 5]; + true + } else { + false + } +} count dragonfly_db_pubSubFetch_array; + +if (_count_total == _total) then { + _index_array sort true; + + for "_i" from 0 to (_total - 1) do { + _dataString = _dataString + ((_index_array select _i) select 1); + }; + + diag_log _dataString; + + [_uniqueID, _eventType, _eventName, (parseSimpleArray _dataString), _target] call FUNC(pubSubHandler); + + dragonfly_db_pubSubFetch_array = dragonfly_db_pubSubFetch_array select {!((_x select 0) in [_uniqueID])}; +}; \ No newline at end of file diff --git a/addons/db/functions/fnc_pubSubHandler.sqf b/addons/db/functions/fnc_pubSubHandler.sqf new file mode 100644 index 0000000..f91b426 --- /dev/null +++ b/addons/db/functions/fnc_pubSubHandler.sqf @@ -0,0 +1,53 @@ +#include "script_component.hpp" + +#include "script_component.hpp" + +/* + * Function: dragonfly_db_fnc_pubSubHandler + * Author: Creedcoder, J.Schmidt + * Edit: 07.15.2024 + * Copyright © 2024 Creedcoder, J.Schmidt, All rights reserved + * + * Do not edit without permission! + * + * This work is licensed under the Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International License. + * To view a copy of this license, vist https://creativecommons.org/licenses/by-nc-sa/4.0/ or send a letter to Creative Commons, + * PO Box 1866, Mountain View, CA 94042 + * + * [Description] + * Handles pub/sub messages and routes them through the CBA event system based on event type. + * + * Arguments: + * 0: UniqueID for message tracking (default: "") + * 1: Event type (global, local, server, remote) (default: "") + * 2: Event name for CBA event system (default: "") + * 3: Message data [] (default: []) + * 4: Target NetID for remote events (default: "") + * + * Return Value: + * None + * + * Examples: + * ["123", "global", "myEvent", ["Hello"], ""] call dragonfly_db_fnc_pubSubHandler + * ["456", "remote", "playerEvent", ["Update"], "2:3"] call dragonfly_db_fnc_pubSubHandler + * + * Public: Yes + */ + +params [["_uniqueID", "", [""]], ["_eventType", "", [""]], ["_eventName", "", [""]], ["_data", [], [[]]], ["_target", "", [""]]]; + +switch (_eventType) do { + case "global": { + [_eventName, _data] call CFUNC(globalEvent); + }; + case "local": { + [_eventName, _data] call CFUNC(localEvent); + }; + case "server": { + [_eventName, _data] call CFUNC(serverEvent); + }; + case "remote": { + private _targetObj = objectFromNetId _target; + [_eventName, _data, _targetObj] call CFUNC(remoteEvent); + }; +}; \ No newline at end of file diff --git a/extension/src/DragonflyClient.cs b/extension/src/DragonflyClient.cs index 8a58ca2..5572c25 100644 --- a/extension/src/DragonflyClient.cs +++ b/extension/src/DragonflyClient.cs @@ -26,7 +26,7 @@ namespace ArmaDragonflyClient throw new InvalidOperationException("Invalid password provided."); } - Main.Log("Connected to DragonflyDB.", "debug"); + Main.Log("Connected to DragonflyDB.", "debug"); } } @@ -39,6 +39,12 @@ namespace ArmaDragonflyClient return ParseResponse(await _reader!.ReadLineAsync(), _reader, convertFromBase64); } + public async Task ReceiveMessageAsync() + { + string response = await _reader.ReadLineAsync(); + return ParseResponse(response, _reader); + } + private static string ParseResponse(string response, StreamReader reader, bool convertFromBase64 = false) { if (response == null) @@ -108,7 +114,7 @@ namespace ArmaDragonflyClient _reader = null; _client = null; - Main.Log("Disconnected from DragonflyDB.", "debug"); + Main.Log("Disconnected from DragonflyDB.", "debug"); } public void Dispose() diff --git a/extension/src/DragonflyDB.cs b/extension/src/DragonflyDB.cs index a42c2d2..5a36715 100644 --- a/extension/src/DragonflyDB.cs +++ b/extension/src/DragonflyDB.cs @@ -5,6 +5,7 @@ namespace ArmaDragonflyClient internal class DragonflyDB { private readonly static DragonflyClient _client = new(Main.ADC_HOST, Main.ADC_PORT, Main.ADC_PASSWORD); + private static CancellationTokenSource _listenerCts; public static async Task DragonflyRaw(string key, string keyValue, string function = null) { @@ -166,5 +167,76 @@ namespace ArmaDragonflyClient string response = await _client.SendCommandAsync("SAVE"); Main.Log($"ArmaDragonflyClient 'SAVE', Response: '{response}'", "debug"); } + + public static async Task DragonflyPublishAsync(string channel, string message, string uniqueID) + { + await _client.ConnectAsync(); + await _client.SendCommandAsync($"PUBLISH {channel} {message}"); + Main.Log($"ArmaDragonflyClient 'PUBLISH', Channel: '{channel}', Message: '{message}'", "debug"); + } + + public static async Task DragonflySubscribeAsync(string channel, string eventType, string eventName, string uniqueID, string target = null, int bufferSize = Main.ADC_BUFFERSIZE) + { + await _client.ConnectAsync(); + + try + { + await _client.SendCommandAsync($"SUBSCRIBE {channel}"); + Main.Log($"ArmaDragonflyClient 'SUBSCRIBE', Channel: '{channel}'", "debug"); + + _listenerCts = new CancellationTokenSource(); + + await Task.Run(async () => + { + await StartMessageListener(message => { + Utils.CheckByteCountPubSub(uniqueID, message, eventType, eventName, target, bufferSize); + }); + }); + } + catch (Exception ex) + { + Main.Log($"ArmaDragonflyClient 'SUBSCRIBE', Exception: '{ex.Message}'", "error"); + throw; + } + } + + private static async Task StartMessageListener(Action messageHandler) + { + try + { + while (!_listenerCts.Token.IsCancellationRequested) + { + string response = await _client.ReceiveMessageAsync(); + + if (!string.IsNullOrEmpty(response)) + { + var messageParts = response.Split(','); + if (messageParts.Length >= 3) + { + string messageType = messageParts[0]; + string channel = messageParts[1]; + string message = messageParts[2]; + + Main.Log($"ArmaDragonflyClient 'SUBSCRIBE', Channel: '{channel}', Message: '{message}'", "debug"); + messageHandler(message); + } + } + + await Task.Delay(10); + } + } + catch (Exception ex) + { + Main.Log($"ArmaDragonflyClient 'SUBSCRIBE', Exception: '{ex.Message}'", "error"); + throw; + } + } + + public static void StopMessageListener() + { + _listenerCts.Cancel(); + _listenerCts.Dispose(); + _listenerCts = null; + } } -} +} \ No newline at end of file diff --git a/extension/src/Main.cs b/extension/src/Main.cs index 55dcb88..8493a9d 100644 --- a/extension/src/Main.cs +++ b/extension/src/Main.cs @@ -276,6 +276,14 @@ namespace ArmaDragonflyClient HandleHDelIdOperation(argsArr); WriteOutput(output, "Async"); return 200; + case "publish": + HandlePublishOperation(_id, argsArr); + WriteOutput(output, $"[\"{_id}_publish\"]"); + return 100; + case "subscribe": + HandleSubscribeOperation(_id, argsArr, argc); + WriteOutput(output, $"[\"{_id}_subscribe\"]"); + return 100; case "savedb": HandleSaveDBOperation(); WriteOutput(output, "Async"); @@ -539,10 +547,7 @@ namespace ArmaDragonflyClient private static void HandleSaveDBOperation() { - Task.Run(async () => - { - await DragonflyDB.DragonflySaveDBAsync(); - }); + Task.Run(DragonflyDB.DragonflySaveDBAsync); } private static void HandleSetupOperation(List argsArr, int argsCnt) @@ -574,5 +579,29 @@ namespace ArmaDragonflyClient Log($"DragonflyDB connection settings updated - Host: '{host}', Port: '{port}', Password: '{password}'", "action"); } + + private static void HandlePublishOperation(long _id, List argsArr) + { + Task.Run(async () => + { + string channel = argsArr[0].Trim('"'); + string message = argsArr[1]; + string _uniqueID = $"{_id}_publish"; + + await DragonflyDB.DragonflyPublishAsync(channel, message, _uniqueID); + }); + } + + private static void HandleSubscribeOperation(long _id, List argsArr, int argsCnt) + { + Task.Run(async () => + { + string _uniqueID = $"{_id}_subscribe"; + if (argsCnt == 3) + await DragonflyDB.DragonflySubscribeAsync(argsArr[0].Trim('"'), argsArr[1].Trim('"'), argsArr[2].Trim('"'), _uniqueID); + if (argsCnt == 4) + await DragonflyDB.DragonflySubscribeAsync(argsArr[0].Trim('"'), argsArr[1].Trim('"'), argsArr[2].Trim('"'), _uniqueID, argsArr[3].Trim('"')); + }); + } } -} +} \ No newline at end of file diff --git a/extension/src/Utils.cs b/extension/src/Utils.cs index 372b6d7..8e45400 100644 --- a/extension/src/Utils.cs +++ b/extension/src/Utils.cs @@ -84,5 +84,33 @@ namespace ArmaDragonflyClient } } } + + public static void CheckByteCountPubSub(string uniqueId, string data, string eventType, string eventName, string target, int bufferSize) + { + if (Encoding.UTF8.GetByteCount(data) <= bufferSize) + { + if (!data.StartsWith('[')) + data = BuildArray(data); + Main.Log($"{data}", "debug"); + + string dataAsString = $"[\"{uniqueId}\",\"{eventType}\",\"{eventName}\",{data},\"{target}\"]"; + Main.Log($"{dataAsString}", "debug"); + Main.Callback("ArmaDragonflyClient", "dragonfly_db_fnc_pubSubHandler", dataAsString); + } + else + { + if (!data.StartsWith('[')) + data = BuildArray(data); + Main.Log($"{data}", "debug"); + var chunks = SplitIntoChunks(data, bufferSize); + int totalChunks = chunks.Count; + + foreach (string chunk in chunks) + { + string chunkAsString = $"[{uniqueId},{eventType},{eventName},{chunks.IndexOf(chunk)},{totalChunks},\"{chunk}\",\"{target}\"]"; + Main.Callback("ArmaDragonflyClient", "dragonfly_db_fnc_pubSubFetch", chunkAsString); + } + } + } } }