fireflyclient/examples/example.py
Jacob Schmidt 4063fe6bd8
Some checks failed
Build / build (push) Has been cancelled
Initial Repo Setup
2025-04-10 21:49:46 -05:00

1439 lines
55 KiB
Python

import os
import platform
from ctypes import cdll, c_char_p, c_bool, c_void_p, c_int, POINTER
import re
import traceback
import sys
# Set up logging
import logging
logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
handlers=[
logging.FileHandler("firefly_debug.log"),
logging.StreamHandler(sys.stdout),
],
)
logger = logging.getLogger("FireflyDB")
logger.info("Script starting...")
class FireflyDatabase:
"""Wrapper for Firefly database client library"""
def __init__(self, host="localhost", port=6379, password=None):
"""Initialize the Firefly database connection
Args:
host: Hostname of the Firefly server (default: localhost)
port: Port number of the Firefly server (default: 6379)
password: Optional password for authentication
"""
self.client = None
self.lib = None
self._load_library()
self._connect(host, port, password)
def _load_library(self):
"""Load the appropriate Firefly library for the current platform"""
try:
# Determine the library path based on the platform
lib_path = os.path.dirname(os.path.abspath(__file__))
if platform.system() == "Windows":
lib_file = os.path.join(lib_path, "libFirefly.dll")
else: # Linux/macOS
lib_file = os.path.join(lib_path, "libFirefly.so")
if not os.path.exists(lib_file):
raise FileNotFoundError(
f"Firefly library not found: {lib_file}"
)
# Load the library
self.lib = cdll.LoadLibrary(lib_file)
if self.lib is None: # Explicitly check for None
raise OSError("Failed to load the Firefly library")
logger.debug(f"Firefly library loaded from: {lib_file}")
# Set argument and return types. Moved here to be closer to load
self.lib.CreateClient.restype = c_void_p
self.lib.CreateClient.argtypes = [c_char_p, c_int]
self.lib.DestroyClient.argtypes = [c_void_p]
self.lib.Authenticate.restype = c_bool
self.lib.Authenticate.argtypes = [c_void_p, c_char_p]
self.lib.StringSet.restype = c_bool
self.lib.StringSet.argtypes = [c_void_p, c_char_p, c_char_p]
self.lib.StringGet.restype = c_char_p
self.lib.StringGet.argtypes = [c_void_p, c_char_p]
self.lib.FreeString.argtypes = [c_char_p]
self.lib.ListLeftPush.restype = c_int
self.lib.ListLeftPush.argtypes = [c_void_p, c_char_p, c_char_p]
self.lib.ListRightPush.restype = c_int
self.lib.ListRightPush.argtypes = [c_void_p, c_char_p, c_char_p]
self.lib.ListLeftPop.restype = c_char_p
self.lib.ListLeftPop.argtypes = [c_void_p, c_char_p]
self.lib.ListRightPop.restype = c_char_p
self.lib.ListRightPop.argtypes = [c_void_p, c_char_p]
self.lib.ListRange.restype = c_char_p
self.lib.ListRange.argtypes = [c_void_p, c_char_p, c_int, c_int]
self.lib.ListIndex.restype = c_char_p
self.lib.ListIndex.argtypes = [c_void_p, c_char_p, c_int]
self.lib.ListSet.restype = c_bool
self.lib.ListSet.argtypes = [c_void_p, c_char_p, c_int, c_char_p]
self.lib.ListPosition.restype = c_int
self.lib.ListPosition.argtypes = [
c_void_p,
c_char_p,
c_char_p,
c_int,
c_int,
]
self.lib.ListTrim.restype = c_bool
self.lib.ListTrim.argtypes = [c_void_p, c_char_p, c_int, c_int]
self.lib.ListRemove.restype = c_int
self.lib.ListRemove.argtypes = [
c_void_p,
c_char_p,
c_int,
c_char_p,
]
self.lib.HashSet.restype = c_bool
self.lib.HashSet.argtypes = [
c_void_p,
c_char_p,
c_char_p,
c_char_p,
]
self.lib.HashGet.restype = c_char_p
self.lib.HashGet.argtypes = [c_void_p, c_char_p, c_char_p]
self.lib.HashDelete.restype = c_bool
self.lib.HashDelete.argtypes = [c_void_p, c_char_p, c_char_p]
self.lib.HashFieldExists.restype = c_bool
self.lib.HashFieldExists.argtypes = [c_void_p, c_char_p, c_char_p]
self.lib.HashGetAll.restype = c_char_p
self.lib.HashGetAll.argtypes = [c_void_p, c_char_p]
self.lib.ExecuteCommand.restype = c_char_p
self.lib.ExecuteCommand.argtypes = [c_void_p, c_char_p, c_char_p]
# Pipeline operations
self.lib.SetPipelineMode.restype = c_bool
self.lib.SetPipelineMode.argtypes = [c_void_p, c_bool]
self.lib.SetBatchSize.restype = c_bool
self.lib.SetBatchSize.argtypes = [c_void_p, c_int]
self.lib.GetQueuedCommandCount.restype = c_int
self.lib.GetQueuedCommandCount.argtypes = [c_void_p]
self.lib.FlushPipeline.restype = c_char_p
self.lib.FlushPipeline.argtypes = [c_void_p]
self.lib.IsPipelineMode.restype = c_bool
self.lib.IsPipelineMode.argtypes = [c_void_p]
self.lib.GetBatchSize.restype = c_int
self.lib.GetBatchSize.argtypes = [c_void_p]
except FileNotFoundError as e:
logger.error(f"Error loading library: {e}")
raise # Re-raise to halt execution
except OSError as e:
logger.error(f"OS Error loading library: {e}")
raise
except Exception as e:
logger.error(f"Unexpected error loading library: {e}")
raise
def _connect(self, host, port, password=None):
"""Connect to the Firefly server and authenticate if needed"""
try:
# Convert host to bytes for C API
host_bytes = host.encode("utf-8")
logger.debug(f"Connecting to {host}:{port}")
# Create client
self.client = self.lib.CreateClient(host_bytes, port)
if not self.client:
raise ConnectionError(
f"Failed to connect to Firefly server at {host}:{port}"
)
logger.debug("Client created successfully")
# Authenticate if password is provided
if password:
password_bytes = password.encode("utf-8")
logger.debug("Authenticating...")
if not self.lib.Authenticate(self.client, password_bytes):
self.close()
raise ConnectionError("Authentication failed")
logger.debug("Authentication successful")
else:
logger.debug("No password provided, skipping authentication")
except ConnectionError as e:
logger.error(f"Connection error: {e}")
if self.client:
self.close() # Clean up the client if it was created
raise # Re-raise the exception for the caller to handle
except Exception as e:
logger.error(f"Unexpected error during connection: {e}")
if self.client:
self.close()
raise
def close(self):
"""Close the connection to the Firefly server"""
try:
if self.client:
logger.debug("Destroying client connection")
self.lib.DestroyClient(self.client)
self.client = None
logger.debug("Client connection destroyed")
else:
logger.debug("Client connection already closed")
except Exception as e:
logger.error(f"Error closing connection: {e}")
# Do not re-raise here, as close() should not throw exceptions in normal usage.
# Caller has no recovery options.
def __enter__(self):
"""Context manager entry"""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit"""
self.close()
if exc_type: # Log the exception if one occurred within the context
logger.error(
f"Exception in context: {exc_type.__name__}, {exc_val}. Traceback:\n{''.join(traceback.format_exception(exc_type, exc_val, exc_tb))}"
)
def _check_connection(self):
"""Check if the client is connected"""
if not self.client:
raise ConnectionError("Not connected to Firefly server")
def _to_bytes(self, value):
"""Convert a value to bytes for C API"""
if isinstance(value, bytes):
return value
return str(value).encode("utf-8")
def _from_bytes(self, value):
"""Convert bytes from C API to string"""
try:
if value is None:
logger.debug("_from_bytes received None value")
return None
if not value: # Zero value check
logger.debug("_from_bytes received empty value")
return ""
# Try to decode safely
try:
result = value.decode("utf-8")
return result
except UnicodeDecodeError as e:
logger.error(f"Unicode decode error in _from_bytes: {e}")
# Try with a more forgiving approach
return value.decode("utf-8", errors="replace")
except Exception as e:
logger.error(f"Error in _from_bytes: {e}")
return None
def _free_string(self, ptr):
"""Free a string pointer allocated by the C API"""
logger.debug("Starting free string")
try:
# Check if ptr is valid and non-zero
if ptr and ptr != 0:
# Wrap in another try/except to catch any errors from the FreeString call
try:
self.lib.FreeString(ptr)
logger.debug("String freed successfully")
except Exception as inner_e:
logger.error(f"Error in FreeString call: {inner_e}")
else:
logger.debug(f"Skipping free for null or zero pointer: {ptr}")
except Exception as e:
logger.error(f"Error in _free_string outer block: {e}")
# Do not re-raise; resource cleanup, caller cannot handle.
def execute_command(self, command, args=""):
"""Execute a raw command on the server
Args:
command: The command to execute
args: Optional arguments for the command
Returns:
The response from the server
"""
try:
logger.debug(f"Executing command: {command} with args: {args}")
self._check_connection()
command_bytes = self._to_bytes(command)
args_bytes = self._to_bytes(args)
result = self.lib.ExecuteCommand(
self.client, command_bytes, args_bytes
)
logger.debug(f"ExecuteCommand result pointer: {result}")
if result:
try:
# If the result is already a bytes object, we don't need to free it
if isinstance(result, bytes):
logger.debug("Result is already a Python bytes object, no need to free")
response = result.decode("utf-8")
logger.debug(f"Received response: {response}")
return response
# Otherwise, treat it as a C pointer that needs to be freed
response = self._from_bytes(result)
logger.debug(f"Received response: {response}")
# Log before freeing
logger.debug(f"About to free string at address: {result}")
self._free_string(result)
return response
except Exception as e:
logger.error(f"Error processing ExecuteCommand result: {e}")
# Try to free anyway to avoid memory leaks, but only if it's not a bytes object
try:
if result and not isinstance(result, bytes):
self._free_string(result)
except Exception as free_e:
logger.error(f"Failed to free result after error: {free_e}")
return None
else:
logger.warning("ExecuteCommand returned NULL")
return None
except ConnectionError as e:
logger.error(f"Connection error during execute_command: {e}")
raise # Re-raise to be handled by caller
except Exception as e:
logger.error(
f"Error in execute_command: {e}. Traceback:\n{traceback.format_exc()}"
)
return None # Consistent return on error
def ping(self):
"""Test the connection to the server
Returns:
True if the server responds with PONG, False otherwise
"""
try:
logger.debug("Sending PING command")
self._check_connection()
# Add a try/except specifically around execute_command
try:
response = self.execute_command("PING")
logger.debug(f"Raw ping response: '{response}'")
except Exception as e:
logger.error(f"Exception in execute_command during ping: {e}")
return False
# Log the type and value of the response
logger.debug(
f"Response type: {type(response)}, value: '{response}'"
)
if response is None:
logger.warning("Received NULL response from ping")
return False
# Normalize: strip whitespace, remove leading '+', uppercase
try:
normalized = response.strip().lstrip("+").upper()
logger.debug(f"Normalized response: '{normalized}'")
if normalized == "PONG":
logger.debug(
"PONG found in normalized response - ping successful"
)
return True
else:
logger.warning(
f"PONG not found in response: raw='{response}', normalized='{normalized}'"
)
return False
except AttributeError:
logger.error(f"Unable to process ping response: {response}")
return False
except Exception as e:
logger.error(
f"Ping failed: {e}. Traceback:\n{traceback.format_exc()}"
)
return False
#region String operations
def string_set(self, key, value):
"""Set a string value
Args:
key: The key to set
value: The value to set
Returns:
True if successful
"""
try:
self._check_connection()
key_bytes = self._to_bytes(key)
value_bytes = self._to_bytes(value)
result = self.lib.StringSet(self.client, key_bytes, value_bytes)
logger.debug(f"StringSet result for key '{key}': {result}")
return result
except ConnectionError as e:
logger.error(f"Connection error in string_set: {e}")
raise
except Exception as e:
logger.error(
f"Error in string_set: {e}. Traceback:\n{traceback.format_exc()}"
)
return False
def string_get(self, key):
"""Get a string value
Args:
key: The key to get
Returns:
The value, or None if not found
"""
try:
self._check_connection()
key_bytes = self._to_bytes(key)
result = self.lib.StringGet(self.client, key_bytes)
logger.debug(f"StringGet raw result pointer: {result}")
if result:
try:
# If the result is already a bytes object, we don't need to free it
if isinstance(result, bytes):
logger.debug("Result is already a Python bytes object, no need to free")
value = result.decode("utf-8")
logger.debug(f"StringGet for key '{key}': {value}")
return value
# Otherwise, treat it as a C pointer that needs to be freed
value = self._from_bytes(result)
logger.debug(f"StringGet decoded value: {value}")
# Log before freeing
logger.debug(f"About to free string at address: {result}")
self._free_string(result)
logger.debug(f"StringGet for key '{key}': {value}")
return value
except Exception as decode_e:
logger.error(f"Error processing StringGet result: {decode_e}")
# Try to free anyway, but only if it's not a bytes object
try:
if not isinstance(result, bytes):
self._free_string(result)
except Exception as free_e:
logger.error(f"Error freeing string in StringGet: {free_e}")
return None
logger.debug(f"StringGet for key '{key}': Key not found")
return None
except ConnectionError as e:
logger.error(f"Connection error in string_get: {e}")
raise
except Exception as e:
logger.error(
f"Error in string_get: {e}. Traceback:\n{traceback.format_exc()}"
)
return None
def delete(self, key):
"""Delete a key
Args:
key: The key to delete
Returns:
The number of keys removed
"""
try:
self._check_connection()
key_bytes = self._to_bytes(key)
result = self.lib.ExecuteCommand(self.client, b"DEL", key_bytes)
logger.debug(f"Delete result: {result}")
if result:
try:
# Handle as bytes or C pointer
if isinstance(result, bytes):
# Directly decode bytes
response = result.decode("utf-8")
# Check if this is a pipeline response (multiple lines or not starting with ':')
if "\r\n" in response and not response.startswith(':'):
# If in pipeline mode, find the actual DEL response (usually the last line with ':')
for line in response.split("\r\n"):
if line.startswith(':'):
try:
count = int(line.strip(':'))
logger.debug(f"Found DEL count in pipeline response: {count}")
return count
except ValueError:
pass
# If we didn't find a clear DEL count, just report success
logger.debug(f"Complex pipeline response for DEL on key '{key}', assuming success")
return 1
else:
# Regular response format
try:
count = int(response.strip(":\r\n"))
except ValueError:
logger.warning(f"Unexpected response from DEL command: {response}")
count = 0
else:
# Handle as C pointer
try:
response = self._from_bytes(result)
count = int(response.strip(":\r\n"))
self._free_string(result)
except ValueError:
self._free_string(result) # Free memory even on error
logger.warning(f"Unexpected response from DEL command: {response}")
count = 0
logger.debug(f"Deleted key '{key}'. Count: {count}")
return count
except Exception as e:
logger.error(f"Error processing DEL result: {e}")
# Try to free if needed
if result and not isinstance(result, bytes):
try:
self._free_string(result)
except Exception as free_e:
logger.error(f"Error freeing DEL result: {free_e}")
return 0
logger.debug(f"Key '{key}' not found.")
return 0
except ConnectionError as e:
logger.error(f"Connection error in delete: {e}")
raise
except Exception as e:
logger.error(
f"Error in delete: {e}. Traceback:\n{traceback.format_exc()}"
)
return 0
#endregion
#region List operations
def list_left_push(self, key, value):
"""Push a value to the left of a list
Args:
key: The list key
value: The value to push
Returns:
The length of the list after the push
"""
try:
self._check_connection()
key_bytes = self._to_bytes(key)
value_bytes = self._to_bytes(value)
result = self.lib.ListLeftPush(self.client, key_bytes, value_bytes)
logger.debug(
f"ListLeftPush on key '{key}' with value '{value}'. New length: {result}"
)
return result
except ConnectionError as e:
logger.error(f"Connection error in list_left_push: {e}")
raise
except Exception as e:
logger.error(
f"Error in list_left_push: {e}. Traceback:\n{traceback.format_exc()}"
)
return 0
def list_right_push(self, key, value):
"""Push a value to the right of a list
Args:
key: The list key
value: The value to push
Returns:
The length of the list after the push
"""
try:
self._check_connection()
key_bytes = self._to_bytes(key)
value_bytes = self._to_bytes(value)
result = self.lib.ListRightPush(
self.client, key_bytes, value_bytes
)
logger.debug(
f"ListRightPush on key '{key}' with value '{value}'. New length: {result}"
)
return result
except ConnectionError as e:
logger.error(f"Connection error in list_right_push: {e}")
raise
except Exception as e:
logger.error(
f"Error in list_right_push: {e}. Traceback:\n{traceback.format_exc()}"
)
return 0
def list_left_pop(self, key):
"""Pop a value from the left of a list
Args:
key: The list key
Returns:
The popped value, or None if the list is empty
"""
try:
self._check_connection()
key_bytes = self._to_bytes(key)
result = self.lib.ListLeftPop(self.client, key_bytes)
logger.debug(f"ListLeftPop raw result: {result}")
if result:
try:
# If the result is already a bytes object, we don't need to free it
if isinstance(result, bytes):
logger.debug("Result is already a Python bytes object, no need to free")
value = result.decode("utf-8")
logger.debug(f"ListLeftPop on key '{key}'. Popped value: {value}")
return value
# Otherwise, treat it as a C pointer that needs to be freed
value = self._from_bytes(result)
logger.debug(f"About to free string at address: {result}")
self._free_string(result)
logger.debug(f"ListLeftPop on key '{key}'. Popped value: {value}")
return value
except Exception as e:
logger.error(f"Error processing ListLeftPop result: {e}")
# Try to free if needed
if result and not isinstance(result, bytes):
try:
self._free_string(result)
except Exception as free_e:
logger.error(f"Error freeing string in ListLeftPop: {free_e}")
return None
logger.debug(f"ListLeftPop on key '{key}'. List is empty.")
return None
except ConnectionError as e:
logger.error(f"Connection error in list_left_pop: {e}")
raise
except Exception as e:
logger.error(
f"Error in list_left_pop: {e}. Traceback:\n{traceback.format_exc()}"
)
return None
def list_right_pop(self, key):
"""Pop a value from the right of a list
Args:
key: The list key
Returns:
The popped value, or None if the list is empty
"""
try:
self._check_connection()
key_bytes = self._to_bytes(key)
result = self.lib.ListRightPop(self.client, key_bytes)
logger.debug(f"ListRightPop raw result: {result}")
if result:
try:
# If result is already a bytes object
if isinstance(result, bytes):
logger.debug("Result is already a Python bytes object, no need to free")
value = result.decode("utf-8")
logger.debug(f"ListRightPop on key '{key}'. Popped value: {value}")
return value
# Regular C pointer handling
value = self._from_bytes(result)
logger.debug(f"About to free string at address: {result}")
self._free_string(result)
logger.debug(f"ListRightPop on key '{key}'. Popped value: {value}")
return value
except Exception as e:
logger.error(f"Error processing ListRightPop result: {e}")
if result and not isinstance(result, bytes):
try:
self._free_string(result)
except Exception as free_e:
logger.error(f"Error freeing string in ListRightPop: {free_e}")
return None
logger.debug(f"ListRightPop on key '{key}'. List empty")
return None
except ConnectionError as e:
logger.error(f"Connection error in list_right_pop: {e}")
raise
except Exception as e:
logger.error(
f"Error in list_right_pop: {e}. Traceback:\n{traceback.format_exc()}"
)
return None
def list_range(self, key, start, stop):
"""Get a range of elements from a list
Args:
key: The list key
start: The start index (inclusive)
stop: The stop index (inclusive)
Returns:
A list of values in the specified range
"""
try:
self._check_connection()
key_bytes = self._to_bytes(key)
result = self.lib.ListRange(self.client, key_bytes, start, stop)
logger.debug(f"ListRange raw result: {result}")
if result:
try:
# The result is a newline-delimited string according to the README
if isinstance(result, bytes):
# Directly decode the bytes object
value_str = result.decode("utf-8")
else:
# Handle as C pointer to string
value_str = self._from_bytes(result)
# Free the allocated string
self._free_string(result)
# Split by newlines to get individual list items
values = value_str.split('\n') if value_str else []
logger.debug(f"ListRange on key '{key}' from {start} to {stop}. Values: {values}")
return values
except Exception as e:
logger.error(f"Error processing ListRange result: {e}")
# Try to free if it was a C pointer
if result and not isinstance(result, bytes):
try:
self._free_string(result)
except Exception as free_e:
logger.error(f"Error freeing ListRange result: {free_e}")
return []
logger.debug(f"ListRange on key '{key}' from {start} to {stop}. Empty list")
return []
except ConnectionError as e:
logger.error(f"Connection error in list_range: {e}")
raise
except Exception as e:
logger.error(
f"Error in list_range: {e}. Traceback:\n{traceback.format_exc()}"
)
return []
def list_index(self, key, index):
"""Get an element at a specific index in a list
Args:
key: The list key
index: The index of the element
Returns:
The element at the specified index, or None if not found
"""
try:
self._check_connection()
key_bytes = self._to_bytes(key)
result = self.lib.ListIndex(self.client, key_bytes, index)
if result:
value = self._from_bytes(result)
self._free_string(result)
logger.debug(
f"ListIndex on key '{key}' at index {index}: {value}"
)
return value
logger.debug(
f"ListIndex on key '{key}' at index {index}: Not found."
)
return None
except ConnectionError as e:
logger.error(f"Connection error in list_index: {e}")
raise
except Exception as e:
logger.error(
f"Error in list_index: {e}. Traceback:\n{traceback.format_exc()}"
)
return None
def list_set(self, key, index, value):
"""Set an element at a specific index in a list
Args:
key: The list key
index: The index of the element
value: The value to set
Returns:
True if successful
"""
try:
self._check_connection()
key_bytes = self._to_bytes(key)
value_bytes = self._to_bytes(value)
result = self.lib.ListSet(
self.client, key_bytes, index, value_bytes
)
logger.debug(
f"ListSet on key '{key}' at index {index} with value '{value}': {result}"
)
return result
except ConnectionError as e:
logger.error(f"Connection error in list_set: {e}")
raise
except Exception as e:
logger.error(
f"Error in list_set: {e}. Traceback:\n{traceback.format_exc()}"
)
return False
def list_position(self, key, element, rank=1, maxlen=0):
"""Find the position of an element in a list
Args:
key: The list key
element: The element to find
rank: The rank of the element to find (default: 1)
maxlen: Maximum number of elements to scan (default: 0, meaning no limit)
Returns:
The index of the element, or -1 if not found
"""
try:
self._check_connection()
key_bytes = self._to_bytes(key)
element_bytes = self._to_bytes(element)
result = self.lib.ListPosition(
self.client, key_bytes, element_bytes, rank, maxlen
)
logger.debug(
f"ListPosition on key '{key}' for element '{element}' (rank={rank}, maxlen={maxlen}): {result}"
)
return result
except ConnectionError as e:
logger.error(f"Connection error in list_position: {e}")
raise
except Exception as e:
logger.error(
f"Error in list_position: {e}. Traceback:\n{traceback.format_exc()}"
)
return -1
def list_trim(self, key, start, stop):
"""Trim a list to the specified range
Args:
key: The list key
start: The start index (inclusive)
stop: The stop index (inclusive)
Returns:
True if successful
"""
try:
self._check_connection()
key_bytes = self._to_bytes(key)
result = self.lib.ListTrim(self.client, key_bytes, start, stop)
logger.debug(
f"ListTrim on key '{key}' from {start} to {stop}: {result}"
)
return result
except ConnectionError as e:
logger.error(f"Connection error in list_trim: {e}")
raise
except Exception as e:
logger.error(
f"Error in list_trim: {e}. Traceback:\n{traceback.format_exc()}"
)
return False
def list_remove(self, key, count, element):
"""Remove elements equal to the given value from a list
Args:
key: The list key
count: The number of occurrences to remove (positive: from head, negative: from tail, 0: all)
element: The element to remove
Returns:
The number of elements removed
"""
try:
self._check_connection()
key_bytes = self._to_bytes(key)
element_bytes = self._to_bytes(element)
result = self.lib.ListRemove(
self.client, key_bytes, count, element_bytes
)
logger.debug(
f"ListRemove on key '{key}' removing {count} of element '{element}': {result}"
)
return result
except ConnectionError as e:
logger.error(f"Connection error in list_remove: {e}")
raise
except Exception as e:
logger.error(
f"Error in list_remove: {e}. Traceback:\n{traceback.format_exc()}"
)
return 0
#endregion
#region Hash operations
def hash_set(self, key, field, value):
"""Set a field in a hash
Args:
key: The hash key
field: The field name
value: The field value
Returns:
True if successful
"""
try:
self._check_connection()
key_bytes = self._to_bytes(key)
field_bytes = self._to_bytes(field)
value_bytes = self._to_bytes(value)
result = self.lib.HashSet(
self.client, key_bytes, field_bytes, value_bytes
)
logger.debug(f"HashSet on key '{key}', field '{field}': {result}")
return result
except ConnectionError as e:
logger.error(f"Connection error in hash_set: {e}")
raise
except Exception as e:
logger.error(
f"Error in hash_set: {e}. Traceback:\n{traceback.format_exc()}"
)
return False
def hash_get(self, key, field):
"""Get a field from a hash
Args:
key: The hash key
field: The field name
Returns:
The field value, or None if not found
"""
try:
self._check_connection()
key_bytes = self._to_bytes(key)
field_bytes = self._to_bytes(field)
result = self.lib.HashGet(self.client, key_bytes, field_bytes)
logger.debug(f"HashGet raw result: {result}")
if result:
try:
# Process the result based on its type
if isinstance(result, bytes):
# Directly decode the bytes object
value = result.decode("utf-8")
else:
# Handle as C pointer to string
value = self._from_bytes(result)
# Free the allocated string
logger.debug(f"About to free string at address: {result}")
self._free_string(result)
logger.debug(f"HashGet on key '{key}', field '{field}': {value}")
return value
except Exception as e:
logger.error(f"Error processing HashGet result: {e}")
# Try to free if it was a C pointer
if result and not isinstance(result, bytes):
try:
self._free_string(result)
except Exception as free_e:
logger.error(f"Error freeing HashGet result: {free_e}")
return None
logger.debug(f"HashGet on key '{key}', field '{field}': Not found.")
return None
except ConnectionError as e:
logger.error(f"Connection error in hash_get: {e}")
raise
except Exception as e:
logger.error(
f"Error in hash_get: {e}. Traceback:\n{traceback.format_exc()}"
)
return None
def hash_delete(self, key, field):
"""Delete a field from a hash
Args:
key: The hash key
field: The field name
Returns:
True if successful
"""
try:
self._check_connection()
key_bytes = self._to_bytes(key)
field_bytes = self._to_bytes(field)
result = self.lib.HashDelete(self.client, key_bytes, field_bytes)
logger.debug(
f"HashDelete on key '{key}', field '{field}': {result}"
)
return result
except ConnectionError as e:
logger.error(f"Connection error in hash_delete: {e}")
raise
except Exception as e:
logger.error(
f"Error in hash_delete: {e}. Traceback:\n{traceback.format_exc()}"
)
return False
def hash_field_exists(self, key, field):
"""Check if a field exists in a hash
Args:
key: The hash key
field: The field name
Returns:
True if the field exists
"""
try:
self._check_connection()
key_bytes = self._to_bytes(key)
field_bytes = self._to_bytes(field)
result = self.lib.HashFieldExists(
self.client, key_bytes, field_bytes
)
logger.debug(
f"HashFieldExists on key '{key}', field '{field}': {result}"
)
return result
except ConnectionError as e:
logger.error(f"Connection error in hash_field_exists: {e}")
raise
except Exception as e:
logger.error(
f"Error in hash_field_exists: {e}. Traceback:\n{traceback.format_exc()}"
)
return False
def hash_get_all(self, key):
"""Get all fields and values from a hash
Args:
key: The hash key
Returns:
A dictionary of field-value pairs
"""
try:
self._check_connection()
key_bytes = self._to_bytes(key)
result = self.lib.HashGetAll(self.client, key_bytes)
logger.debug(f"HashGetAll raw result: {result}")
if result:
try:
# The result is a newline-delimited string with field=value format
if isinstance(result, bytes):
# Directly decode the bytes object
value_str = result.decode("utf-8")
else:
# Handle as C pointer to string
value_str = self._from_bytes(result)
# Free the allocated string
self._free_string(result)
# Parse the field=value format
data = {}
if value_str:
pairs = value_str.split('\n')
for pair in pairs:
if '=' in pair:
field, value = pair.split('=', 1)
data[field] = value
logger.debug(f"HashGetAll on key '{key}': {data}")
return data
except Exception as e:
logger.error(f"Error processing HashGetAll result: {e}")
# Try to free if it was a C pointer
if result and not isinstance(result, bytes):
try:
self._free_string(result)
except Exception as free_e:
logger.error(f"Error freeing HashGetAll result: {free_e}")
return {}
logger.debug(f"HashGetAll on key '{key}': Hash is empty")
return {}
except ConnectionError as e:
logger.error(f"Connection error in hash_get_all: {e}")
raise
except Exception as e:
logger.error(
f"Error in hash_get_all: {e}. Traceback:\n{traceback.format_exc()}"
)
return {}
def hash_multi_set(self, key, field_values):
"""Set multiple fields in a hash at once
Args:
key: The hash key
field_values: A dictionary of field-value pairs
Returns:
True if successful
"""
try:
self._check_connection()
key_bytes = self._to_bytes(key)
# Build the command arguments
args = []
for field, value in field_values.items():
args.append(field)
args.append(value)
args_str = " ".join(args)
args_bytes = self._to_bytes(args_str)
result = self.lib.ExecuteCommand(self.client, b"HMSET", args_bytes)
if result:
success = self._from_bytes(result) == "+OK\r\n"
self._free_string(result)
logger.debug(
f"HashMultiSet on key '{key}' with {len(field_values)} fields: {success}"
)
return success
logger.warning(f"HashMultiSet on key '{key}' failed")
return False
except ConnectionError as e:
logger.error(f"Connection error in hash_multi_set: {e}")
raise
except Exception as e:
logger.error(
f"Error in hash_multi_set: {e}. Traceback:\n{traceback.format_exc()}"
)
return False
#endregion
#region Pipeline operations
def set_pipeline_mode(self, enabled):
"""Enable or disable pipeline mode
Args:
enabled: True to enable pipeline mode, False to disable
Returns:
True if successful
"""
try:
self._check_connection()
logger.debug(f"Setting pipeline mode to {enabled}")
result = self.lib.SetPipelineMode(self.client, enabled)
logger.debug(f"SetPipelineMode to {enabled}: {result}")
return result
except ConnectionError as e:
logger.error(f"Connection error in set_pipeline_mode: {e}")
raise
except Exception as e:
logger.error(
f"Error in set_pipeline_mode: {e}. Traceback:\n{traceback.format_exc()}"
)
return False
def set_batch_size(self, size):
"""Set the batch size for pipelined commands
Args:
size: The maximum number of commands to batch before sending
Returns:
True if successful
"""
try:
self._check_connection()
logger.debug(f"Setting batch size to {size}")
result = self.lib.SetBatchSize(self.client, size)
logger.debug(f"SetBatchSize to {size}: {result}")
return result
except ConnectionError as e:
logger.error(f"Connection error in set_batch_size: {e}")
raise
except Exception as e:
logger.error(
f"Error in set_batch_size: {e}. Traceback:\n{traceback.format_exc()}"
)
return False
def get_queued_command_count(self):
"""Get the current number of queued commands
Returns:
The number of queued commands
"""
try:
self._check_connection()
result = self.lib.GetQueuedCommandCount(self.client)
logger.debug(f"GetQueuedCommandCount: {result}")
return result
except ConnectionError as e:
logger.error(f"Connection error in get_queued_command_count: {e}")
raise
except Exception as e:
logger.error(
f"Error in get_queued_command_count: {e}. Traceback:\n{traceback.format_exc()}"
)
return 0
def flush_pipeline(self):
"""Flush any queued commands in pipeline mode
Returns:
The response from the server
"""
try:
self._check_connection()
result = self.lib.FlushPipeline(self.client)
if result:
try:
# Handle as bytes or C pointer
if isinstance(result, bytes):
# Directly decode bytes
response = result.decode("utf-8")
else:
# Handle as C pointer
response = self._from_bytes(result)
self._free_string(result)
logger.debug(f"Flushed pipeline. Response: {response}")
return response
except Exception as e:
logger.error(f"Error processing FlushPipeline result: {e}")
# Try to free if needed
if result and not isinstance(result, bytes):
try:
self._free_string(result)
except Exception as free_e:
logger.error(f"Error freeing FlushPipeline result: {free_e}")
return None
logger.warning("FlushPipeline returned NULL")
return None
except ConnectionError as e:
logger.error(f"Connection error in flush_pipeline: {e}")
raise
except Exception as e:
logger.error(
f"Error in flush_pipeline: {e}. Traceback:\n{traceback.format_exc()}"
)
return None
def is_pipeline_mode(self):
"""Check if pipeline mode is enabled
Returns:
True if pipeline mode is enabled
"""
try:
self._check_connection()
result = self.lib.IsPipelineMode(self.client)
logger.debug(f"IsPipelineMode: {result}")
return result
except ConnectionError as e:
logger.error(f"Connection error in is_pipeline_mode: {e}")
raise
except Exception as e:
logger.error(
f"Error in is_pipeline_mode: {e}. Traceback:\n{traceback.format_exc()}"
)
return False
def get_batch_size(self):
"""Get the current batch size
Returns:
The current batch size
"""
try:
self._check_connection()
result = self.lib.GetBatchSize(self.client)
logger.debug(f"GetBatchSize: {result}")
return result
except ConnectionError as e:
logger.error(f"Connection error in get_batch_size: {e}")
raise
except Exception as e:
logger.error(
f"Error in get_batch_size: {e}. Traceback:\n{traceback.format_exc()}"
)
return 0
#endregion
def main():
logger.info("Starting FireflyDatabase test...")
try:
# Basic usage
logger.info("Creating FireflyDatabase instance...")
with FireflyDatabase(
host="135.134.202.221", port=6379, password="xyz123"
) as db:
logger.info("Connected to Firefly server")
logger.info("Performing operations...")
logger.info("About to call ping()...")
try:
ping_result = db.ping()
logger.info(f"Ping completed with result: {ping_result}")
if not ping_result:
logger.error("Ping failed, exiting test.")
exit(1)
except Exception as ping_error:
logger.error(f"Exception during ping: {ping_error}")
logger.error(f"Exception type: {type(ping_error)}")
logger.error(traceback.format_exc())
exit(1) # Exit if ping fails
logger.info("After ping attempt")
# String operations
logger.info("Testing Database")
db.string_set("test", "hello world")
value = db.string_get("test")
logger.info(f"Value: {value}")
# List operations
db.list_right_push("mylist", "item1")
db.list_right_push("mylist", "item2")
db.list_right_push("mylist", "item3")
items = db.list_range("mylist", 0, -1)
logger.info("List items:")
for item in items:
logger.info(f" - {item}")
# Hash operations
db.hash_set("user:1", "name", "John")
db.hash_set("user:1", "email", "john@example.com")
name = db.hash_get("user:1", "name")
logger.info(f"Name: {name}")
user_data = db.hash_get_all("user:1")
logger.info("User data:")
for field, value in user_data.items():
logger.info(f" {field}: {value}")
# Pipeline operations
logger.info("\nPipeline Operations:")
db.set_pipeline_mode(True)
logger.info("Note: When in pipeline mode, operations return 'QUEUED' instead of actual values")
logger.info("Values aren't actually set until flush_pipeline() is called")
db.set_batch_size(100)
# Add dummy entries at index 0 for each data type to handle Redis pipeline reordering
logger.info("Adding dummy entries for each data type to handle Redis pipeline shifting")
db.string_set("pipeline:string:0", "dummy-string")
db.list_right_push("pipeline:list:0", "dummy-list-item")
db.hash_set("pipeline:hash", "dummy-field", "dummy-value")
# Queue some commands with unique identifiable values
# Commands will return QUEUED but not be executed until flush_pipeline
for i in range(1, 6):
# Use distinct prefixes for each key type to identify in results
db.string_set(f"pipeline:string:{i}", f"string-value-{i}")
# Use separate list keys for each list item to prevent overwriting
db.list_right_push(f"pipeline:list", f"list-item-{i}")
# Make sure we use field names that match our expected pattern
db.hash_set("pipeline:hash", f"field-{i}", f"hash-value-{i}")
logger.info(f"Queued commands: {db.get_queued_command_count()}")
logger.info("Results should be obtained after flushing the pipeline")
# Flush pipeline (execute all queued commands as a batch)
result = db.flush_pipeline()
logger.info(f"Pipeline flush result: {result}")
logger.info("After pipeline flush, we need to exit pipeline mode to get actual values")
# Disable pipeline mode before verifying results to see actual values, not QUEUED
db.set_pipeline_mode(False)
logger.info("Pipeline mode disabled")
# Verify results - include index 0 for completeness
logger.info("\nVerifying results (note: Redis pipeline responses might not match input order):")
for i in range(0, 6): # Start from 0 to include dummy entry
value = db.string_get(f"pipeline:string:{i}")
logger.info(f"String key:{i} = {value}")
# Check each individual list including dummy list
for i in range(0, 6): # Start from 0 to include dummy entry
items = db.list_range("pipeline:list", 0, -1)
logger.info(f"List items: {items}")
# Fix the hash_get_all implementation issue
hash_data = db.hash_get_all("pipeline:hash")
# If hash_data is empty but we expect data, try a different approach
if not hash_data:
logger.info("Hash appears empty from hash_get_all, trying alternative approach...")
# Try to get each field individually
hash_data = {}
# Get the dummy field first
dummy_value = db.hash_get("pipeline:hash", "dummy-field")
if dummy_value:
hash_data["dummy-field"] = dummy_value
# Try to get each field we expect
for i in range(1, 6):
field_name = f"field-{i}"
field_value = db.hash_get("pipeline:hash", field_name)
if field_value:
hash_data[field_name] = field_value
logger.info("Hash fields:")
for field, value in hash_data.items():
logger.info(f" {field}: {value}")
# Cleanup - don't forget to clean up the dummy entries too
for i in range(0, 6): # Start from 0 to include dummy entry
db.delete(f"pipeline:string:{i}")
db.delete(f"pipeline:list:{i}")
db.delete("pipeline:hash")
logger.info("Cleanup complete")
except Exception as e:
logger.error(f"Exception occurred: {e}")
logger.error(traceback.format_exc())
finally:
logger.info("Test completed")
logger.info("Script completed successfully.")
# print("exiting")
# sys.exit()
if __name__ == "__main__":
main()