Update ArmaDragonflyClient Extension: Implement pub/sub messaging system with event routing, chunking support, and consistent message handling

This commit is contained in:
Jacob Schmidt 2025-02-07 22:34:56 -06:00
parent a97d3a5285
commit 95fdf45cde
9 changed files with 256 additions and 23 deletions

View File

@ -17,6 +17,8 @@ PREP(listGet);
PREP(listLoad); PREP(listLoad);
PREP(listRemove); PREP(listRemove);
PREP(listSet); PREP(listSet);
PREP(pubSubFetch);
PREP(pubSubHandler);
PREP(processQueue); PREP(processQueue);
PREP(saveDB); PREP(saveDB);
PREP(scheduler); PREP(scheduler);

View File

@ -1,7 +1,8 @@
#include "script_component.hpp" #include "script_component.hpp"
GVAR(isProcessing) = false; GVAR(isProcessing) = false;
GVAR(fetchArray) = []; GVAR(fetch_array) = [];
GVAR(pubSubFetch_array) = [];
GVAR(taskQueue) = []; GVAR(taskQueue) = [];
// addMissionEventHandler ["ExtensionCallback", { // addMissionEventHandler ["ExtensionCallback", {

View File

@ -23,18 +23,11 @@
diag_log _this; diag_log _this;
params ["_uniqueID", "_function", "_index", "_total", "_datachunk", "_call", "_netId"]; params ["_uniqueID", "_function", "_index", "_total", "_datachunk", "_call", "_netId"];
private _dataString = ""; private _dataString = "";
private _index_array = []; private _index_array = [];
private _count_total = -1; private _count_total = -1;
// _uniqueID = parseNumber _uniqueID;
// _function
// _index = parseNumber _index;
// _total = parseNumber _total;
// _datachunk
// _call
// _netId
dragonfly_db_fetch_array pushBackUnique [_uniqueID, _function, _index, _total, _datachunk, _call, _netId]; dragonfly_db_fetch_array pushBackUnique [_uniqueID, _function, _index, _total, _datachunk, _call, _netId];
_count_total = { _count_total = {
@ -48,19 +41,14 @@ _count_total = {
if (_count_total == _total) then { if (_count_total == _total) then {
_index_array sort true; _index_array sort true;
for "_i" from 0 to (_total - 1) do { for "_i" from 0 to (_total - 1) do {
_dataString = _dataString + ((_index_array select _i) select 1); _dataString = _dataString + ((_index_array select _i) select 1);
}; };
diag_log _dataString; diag_log _dataString;
// Old direct execution
// (parseSimpleArray _dataString) call (missionNamespace getVariable [_function, {
// hint "Function does not exist!";
// }]);
[_uniqueID, _function, _call, (parseSimpleArray _dataString), _netId] call FUNC(handler); [_uniqueID, _function, _call, (parseSimpleArray _dataString), _netId] call FUNC(handler);
// Cleanup
dragonfly_db_fetch_array = dragonfly_db_fetch_array select {!((_x select 0) in [_uniqueID])}; dragonfly_db_fetch_array = dragonfly_db_fetch_array select {!((_x select 0) in [_uniqueID])};
}; };

View File

@ -0,0 +1,54 @@
#include "script_component.hpp"
/*
@file Title: ArmaDragonflyClient Framework by Creedcoder, J.Schmidt
@file Version: 0.1
@file Name: fnc_pubSubFetch.sqf
@file Author: Creedcoder, J.Schmidt
@file edit: 03.25.2024
Copyright © 2024 Creedcoder, J.Schmidt, All rights reserved
Do not edit without permission!
This work is licensed under the Creative Commons Attribution-NonCommercial-ShareAlikes 4.0 International License.
To view a copy of this license, visit http://creativecommons.org/licenses/by-nc-nd/4.0/ or send a letter to Creative Commons,
444 Castro Street, Suite 900, Mountain View, California, 94041, USA.
Fetch from DB:
dragonfly_db_pubSubFetch_array
["uniqueID", "evetnType", "eventName", "index", "indextotal", "datachunk", "target"]
*/
diag_log _this;
params ["_uniqueID", "_eventType", "_eventName", "_index", "_total", "_datachunk", "_target"];
private _dataString = "";
private _index_array = [];
private _count_total = -1;
dragonfly_db_pubSubFetch_array pushBackUnique [_uniqueID, _eventType, _eventName, _index, _total, _datachunk, _target];
_count_total = {
if (_uniqueID == _x select 0) then {
_index_array pushBackUnique [_x select 3, _x select 5];
true
} else {
false
}
} count dragonfly_db_pubSubFetch_array;
if (_count_total == _total) then {
_index_array sort true;
for "_i" from 0 to (_total - 1) do {
_dataString = _dataString + ((_index_array select _i) select 1);
};
diag_log _dataString;
[_uniqueID, _eventType, _eventName, (parseSimpleArray _dataString), _target] call FUNC(pubSubHandler);
dragonfly_db_pubSubFetch_array = dragonfly_db_pubSubFetch_array select {!((_x select 0) in [_uniqueID])};
};

View File

@ -0,0 +1,53 @@
#include "script_component.hpp"
#include "script_component.hpp"
/*
* Function: dragonfly_db_fnc_pubSubHandler
* Author: Creedcoder, J.Schmidt
* Edit: 07.15.2024
* Copyright © 2024 Creedcoder, J.Schmidt, All rights reserved
*
* Do not edit without permission!
*
* This work is licensed under the Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International License.
* To view a copy of this license, vist https://creativecommons.org/licenses/by-nc-sa/4.0/ or send a letter to Creative Commons,
* PO Box 1866, Mountain View, CA 94042
*
* [Description]
* Handles pub/sub messages and routes them through the CBA event system based on event type.
*
* Arguments:
* 0: UniqueID for message tracking <STRING> (default: "")
* 1: Event type (global, local, server, remote) <STRING> (default: "")
* 2: Event name for CBA event system <STRING> (default: "")
* 3: Message data [<ARRAY|STRING|NUMBER|BOOL>] (default: [])
* 4: Target NetID for remote events <STRING> (default: "")
*
* Return Value:
* None
*
* Examples:
* ["123", "global", "myEvent", ["Hello"], ""] call dragonfly_db_fnc_pubSubHandler
* ["456", "remote", "playerEvent", ["Update"], "2:3"] call dragonfly_db_fnc_pubSubHandler
*
* Public: Yes
*/
params [["_uniqueID", "", [""]], ["_eventType", "", [""]], ["_eventName", "", [""]], ["_data", [], [[]]], ["_target", "", [""]]];
switch (_eventType) do {
case "global": {
[_eventName, _data] call CFUNC(globalEvent);
};
case "local": {
[_eventName, _data] call CFUNC(localEvent);
};
case "server": {
[_eventName, _data] call CFUNC(serverEvent);
};
case "remote": {
private _targetObj = objectFromNetId _target;
[_eventName, _data, _targetObj] call CFUNC(remoteEvent);
};
};

View File

@ -26,7 +26,7 @@ namespace ArmaDragonflyClient
throw new InvalidOperationException("Invalid password provided."); throw new InvalidOperationException("Invalid password provided.");
} }
Main.Log("Connected to DragonflyDB.", "debug"); Main.Log("Connected to DragonflyDB.", "debug");
} }
} }
@ -39,6 +39,12 @@ namespace ArmaDragonflyClient
return ParseResponse(await _reader!.ReadLineAsync(), _reader, convertFromBase64); return ParseResponse(await _reader!.ReadLineAsync(), _reader, convertFromBase64);
} }
public async Task<string> ReceiveMessageAsync()
{
string response = await _reader.ReadLineAsync();
return ParseResponse(response, _reader);
}
private static string ParseResponse(string response, StreamReader reader, bool convertFromBase64 = false) private static string ParseResponse(string response, StreamReader reader, bool convertFromBase64 = false)
{ {
if (response == null) if (response == null)
@ -108,7 +114,7 @@ namespace ArmaDragonflyClient
_reader = null; _reader = null;
_client = null; _client = null;
Main.Log("Disconnected from DragonflyDB.", "debug"); Main.Log("Disconnected from DragonflyDB.", "debug");
} }
public void Dispose() public void Dispose()

View File

@ -5,6 +5,7 @@ namespace ArmaDragonflyClient
internal class DragonflyDB internal class DragonflyDB
{ {
private readonly static DragonflyClient _client = new(Main.ADC_HOST, Main.ADC_PORT, Main.ADC_PASSWORD); private readonly static DragonflyClient _client = new(Main.ADC_HOST, Main.ADC_PORT, Main.ADC_PASSWORD);
private static CancellationTokenSource _listenerCts;
public static async Task<string> DragonflyRaw(string key, string keyValue, string function = null) public static async Task<string> DragonflyRaw(string key, string keyValue, string function = null)
{ {
@ -166,5 +167,76 @@ namespace ArmaDragonflyClient
string response = await _client.SendCommandAsync("SAVE"); string response = await _client.SendCommandAsync("SAVE");
Main.Log($"ArmaDragonflyClient 'SAVE', Response: '{response}'", "debug"); Main.Log($"ArmaDragonflyClient 'SAVE', Response: '{response}'", "debug");
} }
public static async Task DragonflyPublishAsync(string channel, string message, string uniqueID)
{
await _client.ConnectAsync();
await _client.SendCommandAsync($"PUBLISH {channel} {message}");
Main.Log($"ArmaDragonflyClient 'PUBLISH', Channel: '{channel}', Message: '{message}'", "debug");
}
public static async Task DragonflySubscribeAsync(string channel, string eventType, string eventName, string uniqueID, string target = null, int bufferSize = Main.ADC_BUFFERSIZE)
{
await _client.ConnectAsync();
try
{
await _client.SendCommandAsync($"SUBSCRIBE {channel}");
Main.Log($"ArmaDragonflyClient 'SUBSCRIBE', Channel: '{channel}'", "debug");
_listenerCts = new CancellationTokenSource();
await Task.Run(async () =>
{
await StartMessageListener(message => {
Utils.CheckByteCountPubSub(uniqueID, message, eventType, eventName, target, bufferSize);
});
});
}
catch (Exception ex)
{
Main.Log($"ArmaDragonflyClient 'SUBSCRIBE', Exception: '{ex.Message}'", "error");
throw;
}
}
private static async Task StartMessageListener(Action<string> messageHandler)
{
try
{
while (!_listenerCts.Token.IsCancellationRequested)
{
string response = await _client.ReceiveMessageAsync();
if (!string.IsNullOrEmpty(response))
{
var messageParts = response.Split(',');
if (messageParts.Length >= 3)
{
string messageType = messageParts[0];
string channel = messageParts[1];
string message = messageParts[2];
Main.Log($"ArmaDragonflyClient 'SUBSCRIBE', Channel: '{channel}', Message: '{message}'", "debug");
messageHandler(message);
}
}
await Task.Delay(10);
}
}
catch (Exception ex)
{
Main.Log($"ArmaDragonflyClient 'SUBSCRIBE', Exception: '{ex.Message}'", "error");
throw;
}
}
public static void StopMessageListener()
{
_listenerCts.Cancel();
_listenerCts.Dispose();
_listenerCts = null;
}
} }
} }

View File

@ -276,6 +276,14 @@ namespace ArmaDragonflyClient
HandleHDelIdOperation(argsArr); HandleHDelIdOperation(argsArr);
WriteOutput(output, "Async"); WriteOutput(output, "Async");
return 200; return 200;
case "publish":
HandlePublishOperation(_id, argsArr);
WriteOutput(output, $"[\"{_id}_publish\"]");
return 100;
case "subscribe":
HandleSubscribeOperation(_id, argsArr, argc);
WriteOutput(output, $"[\"{_id}_subscribe\"]");
return 100;
case "savedb": case "savedb":
HandleSaveDBOperation(); HandleSaveDBOperation();
WriteOutput(output, "Async"); WriteOutput(output, "Async");
@ -539,10 +547,7 @@ namespace ArmaDragonflyClient
private static void HandleSaveDBOperation() private static void HandleSaveDBOperation()
{ {
Task.Run(async () => Task.Run(DragonflyDB.DragonflySaveDBAsync);
{
await DragonflyDB.DragonflySaveDBAsync();
});
} }
private static void HandleSetupOperation(List<string> argsArr, int argsCnt) private static void HandleSetupOperation(List<string> argsArr, int argsCnt)
@ -574,5 +579,29 @@ namespace ArmaDragonflyClient
Log($"DragonflyDB connection settings updated - Host: '{host}', Port: '{port}', Password: '{password}'", "action"); Log($"DragonflyDB connection settings updated - Host: '{host}', Port: '{port}', Password: '{password}'", "action");
} }
private static void HandlePublishOperation(long _id, List<string> argsArr)
{
Task.Run(async () =>
{
string channel = argsArr[0].Trim('"');
string message = argsArr[1];
string _uniqueID = $"{_id}_publish";
await DragonflyDB.DragonflyPublishAsync(channel, message, _uniqueID);
});
}
private static void HandleSubscribeOperation(long _id, List<string> argsArr, int argsCnt)
{
Task.Run(async () =>
{
string _uniqueID = $"{_id}_subscribe";
if (argsCnt == 3)
await DragonflyDB.DragonflySubscribeAsync(argsArr[0].Trim('"'), argsArr[1].Trim('"'), argsArr[2].Trim('"'), _uniqueID);
if (argsCnt == 4)
await DragonflyDB.DragonflySubscribeAsync(argsArr[0].Trim('"'), argsArr[1].Trim('"'), argsArr[2].Trim('"'), _uniqueID, argsArr[3].Trim('"'));
});
}
} }
} }

View File

@ -84,5 +84,33 @@ namespace ArmaDragonflyClient
} }
} }
} }
public static void CheckByteCountPubSub(string uniqueId, string data, string eventType, string eventName, string target, int bufferSize)
{
if (Encoding.UTF8.GetByteCount(data) <= bufferSize)
{
if (!data.StartsWith('['))
data = BuildArray(data);
Main.Log($"{data}", "debug");
string dataAsString = $"[\"{uniqueId}\",\"{eventType}\",\"{eventName}\",{data},\"{target}\"]";
Main.Log($"{dataAsString}", "debug");
Main.Callback("ArmaDragonflyClient", "dragonfly_db_fnc_pubSubHandler", dataAsString);
}
else
{
if (!data.StartsWith('['))
data = BuildArray(data);
Main.Log($"{data}", "debug");
var chunks = SplitIntoChunks(data, bufferSize);
int totalChunks = chunks.Count;
foreach (string chunk in chunks)
{
string chunkAsString = $"[{uniqueId},{eventType},{eventName},{chunks.IndexOf(chunk)},{totalChunks},\"{chunk}\",\"{target}\"]";
Main.Callback("ArmaDragonflyClient", "dragonfly_db_fnc_pubSubFetch", chunkAsString);
}
}
}
} }
} }