1439 lines
55 KiB
Python
1439 lines
55 KiB
Python
import os
|
|
import platform
|
|
from ctypes import cdll, c_char_p, c_bool, c_void_p, c_int, POINTER
|
|
import re
|
|
import traceback
|
|
import sys
|
|
|
|
# Set up logging
|
|
import logging
|
|
|
|
logging.basicConfig(
|
|
level=logging.DEBUG,
|
|
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
|
|
handlers=[
|
|
logging.FileHandler("firefly_debug.log"),
|
|
logging.StreamHandler(sys.stdout),
|
|
],
|
|
)
|
|
logger = logging.getLogger("FireflyDB")
|
|
|
|
logger.info("Script starting...")
|
|
|
|
|
|
class FireflyDatabase:
|
|
"""Wrapper for Firefly database client library"""
|
|
|
|
def __init__(self, host="localhost", port=6379, password=None):
|
|
"""Initialize the Firefly database connection
|
|
|
|
Args:
|
|
host: Hostname of the Firefly server (default: localhost)
|
|
port: Port number of the Firefly server (default: 6379)
|
|
password: Optional password for authentication
|
|
"""
|
|
self.client = None
|
|
self.lib = None
|
|
self._load_library()
|
|
self._connect(host, port, password)
|
|
|
|
def _load_library(self):
|
|
"""Load the appropriate Firefly library for the current platform"""
|
|
try:
|
|
# Determine the library path based on the platform
|
|
lib_path = os.path.dirname(os.path.abspath(__file__))
|
|
|
|
if platform.system() == "Windows":
|
|
lib_file = os.path.join(lib_path, "libFirefly.dll")
|
|
else: # Linux/macOS
|
|
lib_file = os.path.join(lib_path, "libFirefly.so")
|
|
|
|
if not os.path.exists(lib_file):
|
|
raise FileNotFoundError(
|
|
f"Firefly library not found: {lib_file}"
|
|
)
|
|
|
|
# Load the library
|
|
self.lib = cdll.LoadLibrary(lib_file)
|
|
if self.lib is None: # Explicitly check for None
|
|
raise OSError("Failed to load the Firefly library")
|
|
|
|
logger.debug(f"Firefly library loaded from: {lib_file}")
|
|
|
|
# Set argument and return types. Moved here to be closer to load
|
|
self.lib.CreateClient.restype = c_void_p
|
|
self.lib.CreateClient.argtypes = [c_char_p, c_int]
|
|
self.lib.DestroyClient.argtypes = [c_void_p]
|
|
self.lib.Authenticate.restype = c_bool
|
|
self.lib.Authenticate.argtypes = [c_void_p, c_char_p]
|
|
self.lib.StringSet.restype = c_bool
|
|
self.lib.StringSet.argtypes = [c_void_p, c_char_p, c_char_p]
|
|
self.lib.StringGet.restype = c_char_p
|
|
self.lib.StringGet.argtypes = [c_void_p, c_char_p]
|
|
self.lib.FreeString.argtypes = [c_char_p]
|
|
self.lib.ListLeftPush.restype = c_int
|
|
self.lib.ListLeftPush.argtypes = [c_void_p, c_char_p, c_char_p]
|
|
self.lib.ListRightPush.restype = c_int
|
|
self.lib.ListRightPush.argtypes = [c_void_p, c_char_p, c_char_p]
|
|
self.lib.ListLeftPop.restype = c_char_p
|
|
self.lib.ListLeftPop.argtypes = [c_void_p, c_char_p]
|
|
self.lib.ListRightPop.restype = c_char_p
|
|
self.lib.ListRightPop.argtypes = [c_void_p, c_char_p]
|
|
self.lib.ListRange.restype = c_char_p
|
|
self.lib.ListRange.argtypes = [c_void_p, c_char_p, c_int, c_int]
|
|
self.lib.ListIndex.restype = c_char_p
|
|
self.lib.ListIndex.argtypes = [c_void_p, c_char_p, c_int]
|
|
self.lib.ListSet.restype = c_bool
|
|
self.lib.ListSet.argtypes = [c_void_p, c_char_p, c_int, c_char_p]
|
|
self.lib.ListPosition.restype = c_int
|
|
self.lib.ListPosition.argtypes = [
|
|
c_void_p,
|
|
c_char_p,
|
|
c_char_p,
|
|
c_int,
|
|
c_int,
|
|
]
|
|
self.lib.ListTrim.restype = c_bool
|
|
self.lib.ListTrim.argtypes = [c_void_p, c_char_p, c_int, c_int]
|
|
self.lib.ListRemove.restype = c_int
|
|
self.lib.ListRemove.argtypes = [
|
|
c_void_p,
|
|
c_char_p,
|
|
c_int,
|
|
c_char_p,
|
|
]
|
|
self.lib.HashSet.restype = c_bool
|
|
self.lib.HashSet.argtypes = [
|
|
c_void_p,
|
|
c_char_p,
|
|
c_char_p,
|
|
c_char_p,
|
|
]
|
|
self.lib.HashGet.restype = c_char_p
|
|
self.lib.HashGet.argtypes = [c_void_p, c_char_p, c_char_p]
|
|
self.lib.HashDelete.restype = c_bool
|
|
self.lib.HashDelete.argtypes = [c_void_p, c_char_p, c_char_p]
|
|
self.lib.HashFieldExists.restype = c_bool
|
|
self.lib.HashFieldExists.argtypes = [c_void_p, c_char_p, c_char_p]
|
|
self.lib.HashGetAll.restype = c_char_p
|
|
self.lib.HashGetAll.argtypes = [c_void_p, c_char_p]
|
|
self.lib.ExecuteCommand.restype = c_char_p
|
|
self.lib.ExecuteCommand.argtypes = [c_void_p, c_char_p, c_char_p]
|
|
|
|
# Pipeline operations
|
|
self.lib.SetPipelineMode.restype = c_bool
|
|
self.lib.SetPipelineMode.argtypes = [c_void_p, c_bool]
|
|
self.lib.SetBatchSize.restype = c_bool
|
|
self.lib.SetBatchSize.argtypes = [c_void_p, c_int]
|
|
self.lib.GetQueuedCommandCount.restype = c_int
|
|
self.lib.GetQueuedCommandCount.argtypes = [c_void_p]
|
|
self.lib.FlushPipeline.restype = c_char_p
|
|
self.lib.FlushPipeline.argtypes = [c_void_p]
|
|
self.lib.IsPipelineMode.restype = c_bool
|
|
self.lib.IsPipelineMode.argtypes = [c_void_p]
|
|
self.lib.GetBatchSize.restype = c_int
|
|
self.lib.GetBatchSize.argtypes = [c_void_p]
|
|
|
|
except FileNotFoundError as e:
|
|
logger.error(f"Error loading library: {e}")
|
|
raise # Re-raise to halt execution
|
|
except OSError as e:
|
|
logger.error(f"OS Error loading library: {e}")
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Unexpected error loading library: {e}")
|
|
raise
|
|
|
|
def _connect(self, host, port, password=None):
|
|
"""Connect to the Firefly server and authenticate if needed"""
|
|
try:
|
|
# Convert host to bytes for C API
|
|
host_bytes = host.encode("utf-8")
|
|
logger.debug(f"Connecting to {host}:{port}")
|
|
|
|
# Create client
|
|
self.client = self.lib.CreateClient(host_bytes, port)
|
|
if not self.client:
|
|
raise ConnectionError(
|
|
f"Failed to connect to Firefly server at {host}:{port}"
|
|
)
|
|
logger.debug("Client created successfully")
|
|
|
|
# Authenticate if password is provided
|
|
if password:
|
|
password_bytes = password.encode("utf-8")
|
|
logger.debug("Authenticating...")
|
|
if not self.lib.Authenticate(self.client, password_bytes):
|
|
self.close()
|
|
raise ConnectionError("Authentication failed")
|
|
logger.debug("Authentication successful")
|
|
else:
|
|
logger.debug("No password provided, skipping authentication")
|
|
|
|
except ConnectionError as e:
|
|
logger.error(f"Connection error: {e}")
|
|
if self.client:
|
|
self.close() # Clean up the client if it was created
|
|
raise # Re-raise the exception for the caller to handle
|
|
except Exception as e:
|
|
logger.error(f"Unexpected error during connection: {e}")
|
|
if self.client:
|
|
self.close()
|
|
raise
|
|
|
|
def close(self):
|
|
"""Close the connection to the Firefly server"""
|
|
try:
|
|
if self.client:
|
|
logger.debug("Destroying client connection")
|
|
self.lib.DestroyClient(self.client)
|
|
self.client = None
|
|
logger.debug("Client connection destroyed")
|
|
else:
|
|
logger.debug("Client connection already closed")
|
|
except Exception as e:
|
|
logger.error(f"Error closing connection: {e}")
|
|
# Do not re-raise here, as close() should not throw exceptions in normal usage.
|
|
# Caller has no recovery options.
|
|
|
|
def __enter__(self):
|
|
"""Context manager entry"""
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
"""Context manager exit"""
|
|
self.close()
|
|
if exc_type: # Log the exception if one occurred within the context
|
|
logger.error(
|
|
f"Exception in context: {exc_type.__name__}, {exc_val}. Traceback:\n{''.join(traceback.format_exception(exc_type, exc_val, exc_tb))}"
|
|
)
|
|
|
|
def _check_connection(self):
|
|
"""Check if the client is connected"""
|
|
if not self.client:
|
|
raise ConnectionError("Not connected to Firefly server")
|
|
|
|
def _to_bytes(self, value):
|
|
"""Convert a value to bytes for C API"""
|
|
if isinstance(value, bytes):
|
|
return value
|
|
return str(value).encode("utf-8")
|
|
|
|
def _from_bytes(self, value):
|
|
"""Convert bytes from C API to string"""
|
|
try:
|
|
if value is None:
|
|
logger.debug("_from_bytes received None value")
|
|
return None
|
|
|
|
if not value: # Zero value check
|
|
logger.debug("_from_bytes received empty value")
|
|
return ""
|
|
|
|
# Try to decode safely
|
|
try:
|
|
result = value.decode("utf-8")
|
|
return result
|
|
except UnicodeDecodeError as e:
|
|
logger.error(f"Unicode decode error in _from_bytes: {e}")
|
|
# Try with a more forgiving approach
|
|
return value.decode("utf-8", errors="replace")
|
|
except Exception as e:
|
|
logger.error(f"Error in _from_bytes: {e}")
|
|
return None
|
|
|
|
def _free_string(self, ptr):
|
|
"""Free a string pointer allocated by the C API"""
|
|
logger.debug("Starting free string")
|
|
try:
|
|
# Check if ptr is valid and non-zero
|
|
if ptr and ptr != 0:
|
|
# Wrap in another try/except to catch any errors from the FreeString call
|
|
try:
|
|
self.lib.FreeString(ptr)
|
|
logger.debug("String freed successfully")
|
|
except Exception as inner_e:
|
|
logger.error(f"Error in FreeString call: {inner_e}")
|
|
else:
|
|
logger.debug(f"Skipping free for null or zero pointer: {ptr}")
|
|
except Exception as e:
|
|
logger.error(f"Error in _free_string outer block: {e}")
|
|
# Do not re-raise; resource cleanup, caller cannot handle.
|
|
|
|
def execute_command(self, command, args=""):
|
|
"""Execute a raw command on the server
|
|
|
|
Args:
|
|
command: The command to execute
|
|
args: Optional arguments for the command
|
|
|
|
Returns:
|
|
The response from the server
|
|
"""
|
|
try:
|
|
logger.debug(f"Executing command: {command} with args: {args}")
|
|
self._check_connection()
|
|
command_bytes = self._to_bytes(command)
|
|
args_bytes = self._to_bytes(args)
|
|
|
|
result = self.lib.ExecuteCommand(
|
|
self.client, command_bytes, args_bytes
|
|
)
|
|
|
|
logger.debug(f"ExecuteCommand result pointer: {result}")
|
|
|
|
if result:
|
|
try:
|
|
# If the result is already a bytes object, we don't need to free it
|
|
if isinstance(result, bytes):
|
|
logger.debug("Result is already a Python bytes object, no need to free")
|
|
response = result.decode("utf-8")
|
|
logger.debug(f"Received response: {response}")
|
|
return response
|
|
|
|
# Otherwise, treat it as a C pointer that needs to be freed
|
|
response = self._from_bytes(result)
|
|
logger.debug(f"Received response: {response}")
|
|
|
|
# Log before freeing
|
|
logger.debug(f"About to free string at address: {result}")
|
|
self._free_string(result)
|
|
return response
|
|
except Exception as e:
|
|
logger.error(f"Error processing ExecuteCommand result: {e}")
|
|
# Try to free anyway to avoid memory leaks, but only if it's not a bytes object
|
|
try:
|
|
if result and not isinstance(result, bytes):
|
|
self._free_string(result)
|
|
except Exception as free_e:
|
|
logger.error(f"Failed to free result after error: {free_e}")
|
|
return None
|
|
else:
|
|
logger.warning("ExecuteCommand returned NULL")
|
|
return None
|
|
except ConnectionError as e:
|
|
logger.error(f"Connection error during execute_command: {e}")
|
|
raise # Re-raise to be handled by caller
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Error in execute_command: {e}. Traceback:\n{traceback.format_exc()}"
|
|
)
|
|
return None # Consistent return on error
|
|
|
|
def ping(self):
|
|
"""Test the connection to the server
|
|
|
|
Returns:
|
|
True if the server responds with PONG, False otherwise
|
|
"""
|
|
try:
|
|
logger.debug("Sending PING command")
|
|
self._check_connection()
|
|
|
|
# Add a try/except specifically around execute_command
|
|
try:
|
|
response = self.execute_command("PING")
|
|
logger.debug(f"Raw ping response: '{response}'")
|
|
except Exception as e:
|
|
logger.error(f"Exception in execute_command during ping: {e}")
|
|
return False
|
|
|
|
# Log the type and value of the response
|
|
logger.debug(
|
|
f"Response type: {type(response)}, value: '{response}'"
|
|
)
|
|
|
|
if response is None:
|
|
logger.warning("Received NULL response from ping")
|
|
return False
|
|
|
|
# Normalize: strip whitespace, remove leading '+', uppercase
|
|
try:
|
|
normalized = response.strip().lstrip("+").upper()
|
|
logger.debug(f"Normalized response: '{normalized}'")
|
|
|
|
if normalized == "PONG":
|
|
logger.debug(
|
|
"PONG found in normalized response - ping successful"
|
|
)
|
|
return True
|
|
else:
|
|
logger.warning(
|
|
f"PONG not found in response: raw='{response}', normalized='{normalized}'"
|
|
)
|
|
return False
|
|
except AttributeError:
|
|
logger.error(f"Unable to process ping response: {response}")
|
|
return False
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Ping failed: {e}. Traceback:\n{traceback.format_exc()}"
|
|
)
|
|
return False
|
|
|
|
#region String operations
|
|
def string_set(self, key, value):
|
|
"""Set a string value
|
|
|
|
Args:
|
|
key: The key to set
|
|
value: The value to set
|
|
|
|
Returns:
|
|
True if successful
|
|
"""
|
|
try:
|
|
self._check_connection()
|
|
key_bytes = self._to_bytes(key)
|
|
value_bytes = self._to_bytes(value)
|
|
result = self.lib.StringSet(self.client, key_bytes, value_bytes)
|
|
logger.debug(f"StringSet result for key '{key}': {result}")
|
|
return result
|
|
except ConnectionError as e:
|
|
logger.error(f"Connection error in string_set: {e}")
|
|
raise
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Error in string_set: {e}. Traceback:\n{traceback.format_exc()}"
|
|
)
|
|
return False
|
|
|
|
def string_get(self, key):
|
|
"""Get a string value
|
|
|
|
Args:
|
|
key: The key to get
|
|
|
|
Returns:
|
|
The value, or None if not found
|
|
"""
|
|
try:
|
|
self._check_connection()
|
|
key_bytes = self._to_bytes(key)
|
|
result = self.lib.StringGet(self.client, key_bytes)
|
|
logger.debug(f"StringGet raw result pointer: {result}")
|
|
|
|
if result:
|
|
try:
|
|
# If the result is already a bytes object, we don't need to free it
|
|
if isinstance(result, bytes):
|
|
logger.debug("Result is already a Python bytes object, no need to free")
|
|
value = result.decode("utf-8")
|
|
logger.debug(f"StringGet for key '{key}': {value}")
|
|
return value
|
|
|
|
# Otherwise, treat it as a C pointer that needs to be freed
|
|
value = self._from_bytes(result)
|
|
logger.debug(f"StringGet decoded value: {value}")
|
|
|
|
# Log before freeing
|
|
logger.debug(f"About to free string at address: {result}")
|
|
self._free_string(result)
|
|
logger.debug(f"StringGet for key '{key}': {value}")
|
|
return value
|
|
except Exception as decode_e:
|
|
logger.error(f"Error processing StringGet result: {decode_e}")
|
|
# Try to free anyway, but only if it's not a bytes object
|
|
try:
|
|
if not isinstance(result, bytes):
|
|
self._free_string(result)
|
|
except Exception as free_e:
|
|
logger.error(f"Error freeing string in StringGet: {free_e}")
|
|
return None
|
|
logger.debug(f"StringGet for key '{key}': Key not found")
|
|
return None
|
|
except ConnectionError as e:
|
|
logger.error(f"Connection error in string_get: {e}")
|
|
raise
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Error in string_get: {e}. Traceback:\n{traceback.format_exc()}"
|
|
)
|
|
return None
|
|
|
|
def delete(self, key):
|
|
"""Delete a key
|
|
|
|
Args:
|
|
key: The key to delete
|
|
|
|
Returns:
|
|
The number of keys removed
|
|
"""
|
|
try:
|
|
self._check_connection()
|
|
key_bytes = self._to_bytes(key)
|
|
result = self.lib.ExecuteCommand(self.client, b"DEL", key_bytes)
|
|
logger.debug(f"Delete result: {result}")
|
|
|
|
if result:
|
|
try:
|
|
# Handle as bytes or C pointer
|
|
if isinstance(result, bytes):
|
|
# Directly decode bytes
|
|
response = result.decode("utf-8")
|
|
|
|
# Check if this is a pipeline response (multiple lines or not starting with ':')
|
|
if "\r\n" in response and not response.startswith(':'):
|
|
# If in pipeline mode, find the actual DEL response (usually the last line with ':')
|
|
for line in response.split("\r\n"):
|
|
if line.startswith(':'):
|
|
try:
|
|
count = int(line.strip(':'))
|
|
logger.debug(f"Found DEL count in pipeline response: {count}")
|
|
return count
|
|
except ValueError:
|
|
pass
|
|
|
|
# If we didn't find a clear DEL count, just report success
|
|
logger.debug(f"Complex pipeline response for DEL on key '{key}', assuming success")
|
|
return 1
|
|
else:
|
|
# Regular response format
|
|
try:
|
|
count = int(response.strip(":\r\n"))
|
|
except ValueError:
|
|
logger.warning(f"Unexpected response from DEL command: {response}")
|
|
count = 0
|
|
else:
|
|
# Handle as C pointer
|
|
try:
|
|
response = self._from_bytes(result)
|
|
count = int(response.strip(":\r\n"))
|
|
self._free_string(result)
|
|
except ValueError:
|
|
self._free_string(result) # Free memory even on error
|
|
logger.warning(f"Unexpected response from DEL command: {response}")
|
|
count = 0
|
|
|
|
logger.debug(f"Deleted key '{key}'. Count: {count}")
|
|
return count
|
|
except Exception as e:
|
|
logger.error(f"Error processing DEL result: {e}")
|
|
# Try to free if needed
|
|
if result and not isinstance(result, bytes):
|
|
try:
|
|
self._free_string(result)
|
|
except Exception as free_e:
|
|
logger.error(f"Error freeing DEL result: {free_e}")
|
|
return 0
|
|
logger.debug(f"Key '{key}' not found.")
|
|
return 0
|
|
except ConnectionError as e:
|
|
logger.error(f"Connection error in delete: {e}")
|
|
raise
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Error in delete: {e}. Traceback:\n{traceback.format_exc()}"
|
|
)
|
|
return 0
|
|
#endregion
|
|
|
|
#region List operations
|
|
def list_left_push(self, key, value):
|
|
"""Push a value to the left of a list
|
|
|
|
Args:
|
|
key: The list key
|
|
value: The value to push
|
|
|
|
Returns:
|
|
The length of the list after the push
|
|
"""
|
|
try:
|
|
self._check_connection()
|
|
key_bytes = self._to_bytes(key)
|
|
value_bytes = self._to_bytes(value)
|
|
result = self.lib.ListLeftPush(self.client, key_bytes, value_bytes)
|
|
logger.debug(
|
|
f"ListLeftPush on key '{key}' with value '{value}'. New length: {result}"
|
|
)
|
|
return result
|
|
except ConnectionError as e:
|
|
logger.error(f"Connection error in list_left_push: {e}")
|
|
raise
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Error in list_left_push: {e}. Traceback:\n{traceback.format_exc()}"
|
|
)
|
|
return 0
|
|
def list_right_push(self, key, value):
|
|
"""Push a value to the right of a list
|
|
|
|
Args:
|
|
key: The list key
|
|
value: The value to push
|
|
|
|
Returns:
|
|
The length of the list after the push
|
|
"""
|
|
try:
|
|
self._check_connection()
|
|
key_bytes = self._to_bytes(key)
|
|
value_bytes = self._to_bytes(value)
|
|
result = self.lib.ListRightPush(
|
|
self.client, key_bytes, value_bytes
|
|
)
|
|
logger.debug(
|
|
f"ListRightPush on key '{key}' with value '{value}'. New length: {result}"
|
|
)
|
|
return result
|
|
except ConnectionError as e:
|
|
logger.error(f"Connection error in list_right_push: {e}")
|
|
raise
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Error in list_right_push: {e}. Traceback:\n{traceback.format_exc()}"
|
|
)
|
|
return 0
|
|
|
|
def list_left_pop(self, key):
|
|
"""Pop a value from the left of a list
|
|
|
|
Args:
|
|
key: The list key
|
|
|
|
Returns:
|
|
The popped value, or None if the list is empty
|
|
"""
|
|
try:
|
|
self._check_connection()
|
|
key_bytes = self._to_bytes(key)
|
|
result = self.lib.ListLeftPop(self.client, key_bytes)
|
|
logger.debug(f"ListLeftPop raw result: {result}")
|
|
|
|
if result:
|
|
try:
|
|
# If the result is already a bytes object, we don't need to free it
|
|
if isinstance(result, bytes):
|
|
logger.debug("Result is already a Python bytes object, no need to free")
|
|
value = result.decode("utf-8")
|
|
logger.debug(f"ListLeftPop on key '{key}'. Popped value: {value}")
|
|
return value
|
|
|
|
# Otherwise, treat it as a C pointer that needs to be freed
|
|
value = self._from_bytes(result)
|
|
logger.debug(f"About to free string at address: {result}")
|
|
self._free_string(result)
|
|
logger.debug(f"ListLeftPop on key '{key}'. Popped value: {value}")
|
|
return value
|
|
except Exception as e:
|
|
logger.error(f"Error processing ListLeftPop result: {e}")
|
|
# Try to free if needed
|
|
if result and not isinstance(result, bytes):
|
|
try:
|
|
self._free_string(result)
|
|
except Exception as free_e:
|
|
logger.error(f"Error freeing string in ListLeftPop: {free_e}")
|
|
return None
|
|
logger.debug(f"ListLeftPop on key '{key}'. List is empty.")
|
|
return None
|
|
except ConnectionError as e:
|
|
logger.error(f"Connection error in list_left_pop: {e}")
|
|
raise
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Error in list_left_pop: {e}. Traceback:\n{traceback.format_exc()}"
|
|
)
|
|
return None
|
|
|
|
def list_right_pop(self, key):
|
|
"""Pop a value from the right of a list
|
|
|
|
Args:
|
|
key: The list key
|
|
|
|
Returns:
|
|
The popped value, or None if the list is empty
|
|
"""
|
|
try:
|
|
self._check_connection()
|
|
key_bytes = self._to_bytes(key)
|
|
result = self.lib.ListRightPop(self.client, key_bytes)
|
|
logger.debug(f"ListRightPop raw result: {result}")
|
|
|
|
if result:
|
|
try:
|
|
# If result is already a bytes object
|
|
if isinstance(result, bytes):
|
|
logger.debug("Result is already a Python bytes object, no need to free")
|
|
value = result.decode("utf-8")
|
|
logger.debug(f"ListRightPop on key '{key}'. Popped value: {value}")
|
|
return value
|
|
|
|
# Regular C pointer handling
|
|
value = self._from_bytes(result)
|
|
logger.debug(f"About to free string at address: {result}")
|
|
self._free_string(result)
|
|
logger.debug(f"ListRightPop on key '{key}'. Popped value: {value}")
|
|
return value
|
|
except Exception as e:
|
|
logger.error(f"Error processing ListRightPop result: {e}")
|
|
if result and not isinstance(result, bytes):
|
|
try:
|
|
self._free_string(result)
|
|
except Exception as free_e:
|
|
logger.error(f"Error freeing string in ListRightPop: {free_e}")
|
|
return None
|
|
logger.debug(f"ListRightPop on key '{key}'. List empty")
|
|
return None
|
|
except ConnectionError as e:
|
|
logger.error(f"Connection error in list_right_pop: {e}")
|
|
raise
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Error in list_right_pop: {e}. Traceback:\n{traceback.format_exc()}"
|
|
)
|
|
return None
|
|
|
|
def list_range(self, key, start, stop):
|
|
"""Get a range of elements from a list
|
|
|
|
Args:
|
|
key: The list key
|
|
start: The start index (inclusive)
|
|
stop: The stop index (inclusive)
|
|
|
|
Returns:
|
|
A list of values in the specified range
|
|
"""
|
|
try:
|
|
self._check_connection()
|
|
key_bytes = self._to_bytes(key)
|
|
result = self.lib.ListRange(self.client, key_bytes, start, stop)
|
|
logger.debug(f"ListRange raw result: {result}")
|
|
|
|
if result:
|
|
try:
|
|
# The result is a newline-delimited string according to the README
|
|
if isinstance(result, bytes):
|
|
# Directly decode the bytes object
|
|
value_str = result.decode("utf-8")
|
|
else:
|
|
# Handle as C pointer to string
|
|
value_str = self._from_bytes(result)
|
|
# Free the allocated string
|
|
self._free_string(result)
|
|
|
|
# Split by newlines to get individual list items
|
|
values = value_str.split('\n') if value_str else []
|
|
|
|
logger.debug(f"ListRange on key '{key}' from {start} to {stop}. Values: {values}")
|
|
return values
|
|
except Exception as e:
|
|
logger.error(f"Error processing ListRange result: {e}")
|
|
# Try to free if it was a C pointer
|
|
if result and not isinstance(result, bytes):
|
|
try:
|
|
self._free_string(result)
|
|
except Exception as free_e:
|
|
logger.error(f"Error freeing ListRange result: {free_e}")
|
|
return []
|
|
logger.debug(f"ListRange on key '{key}' from {start} to {stop}. Empty list")
|
|
return []
|
|
except ConnectionError as e:
|
|
logger.error(f"Connection error in list_range: {e}")
|
|
raise
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Error in list_range: {e}. Traceback:\n{traceback.format_exc()}"
|
|
)
|
|
return []
|
|
|
|
def list_index(self, key, index):
|
|
"""Get an element at a specific index in a list
|
|
|
|
Args:
|
|
key: The list key
|
|
index: The index of the element
|
|
|
|
Returns:
|
|
The element at the specified index, or None if not found
|
|
"""
|
|
try:
|
|
self._check_connection()
|
|
key_bytes = self._to_bytes(key)
|
|
result = self.lib.ListIndex(self.client, key_bytes, index)
|
|
if result:
|
|
value = self._from_bytes(result)
|
|
self._free_string(result)
|
|
logger.debug(
|
|
f"ListIndex on key '{key}' at index {index}: {value}"
|
|
)
|
|
return value
|
|
logger.debug(
|
|
f"ListIndex on key '{key}' at index {index}: Not found."
|
|
)
|
|
return None
|
|
except ConnectionError as e:
|
|
logger.error(f"Connection error in list_index: {e}")
|
|
raise
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Error in list_index: {e}. Traceback:\n{traceback.format_exc()}"
|
|
)
|
|
return None
|
|
|
|
def list_set(self, key, index, value):
|
|
"""Set an element at a specific index in a list
|
|
|
|
Args:
|
|
key: The list key
|
|
index: The index of the element
|
|
value: The value to set
|
|
|
|
Returns:
|
|
True if successful
|
|
"""
|
|
try:
|
|
self._check_connection()
|
|
key_bytes = self._to_bytes(key)
|
|
value_bytes = self._to_bytes(value)
|
|
result = self.lib.ListSet(
|
|
self.client, key_bytes, index, value_bytes
|
|
)
|
|
logger.debug(
|
|
f"ListSet on key '{key}' at index {index} with value '{value}': {result}"
|
|
)
|
|
return result
|
|
except ConnectionError as e:
|
|
logger.error(f"Connection error in list_set: {e}")
|
|
raise
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Error in list_set: {e}. Traceback:\n{traceback.format_exc()}"
|
|
)
|
|
return False
|
|
|
|
def list_position(self, key, element, rank=1, maxlen=0):
|
|
"""Find the position of an element in a list
|
|
|
|
Args:
|
|
key: The list key
|
|
element: The element to find
|
|
rank: The rank of the element to find (default: 1)
|
|
maxlen: Maximum number of elements to scan (default: 0, meaning no limit)
|
|
|
|
Returns:
|
|
The index of the element, or -1 if not found
|
|
"""
|
|
try:
|
|
self._check_connection()
|
|
key_bytes = self._to_bytes(key)
|
|
element_bytes = self._to_bytes(element)
|
|
result = self.lib.ListPosition(
|
|
self.client, key_bytes, element_bytes, rank, maxlen
|
|
)
|
|
logger.debug(
|
|
f"ListPosition on key '{key}' for element '{element}' (rank={rank}, maxlen={maxlen}): {result}"
|
|
)
|
|
return result
|
|
except ConnectionError as e:
|
|
logger.error(f"Connection error in list_position: {e}")
|
|
raise
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Error in list_position: {e}. Traceback:\n{traceback.format_exc()}"
|
|
)
|
|
return -1
|
|
|
|
def list_trim(self, key, start, stop):
|
|
"""Trim a list to the specified range
|
|
|
|
Args:
|
|
key: The list key
|
|
start: The start index (inclusive)
|
|
stop: The stop index (inclusive)
|
|
|
|
Returns:
|
|
True if successful
|
|
"""
|
|
try:
|
|
self._check_connection()
|
|
key_bytes = self._to_bytes(key)
|
|
result = self.lib.ListTrim(self.client, key_bytes, start, stop)
|
|
logger.debug(
|
|
f"ListTrim on key '{key}' from {start} to {stop}: {result}"
|
|
)
|
|
return result
|
|
except ConnectionError as e:
|
|
logger.error(f"Connection error in list_trim: {e}")
|
|
raise
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Error in list_trim: {e}. Traceback:\n{traceback.format_exc()}"
|
|
)
|
|
return False
|
|
|
|
def list_remove(self, key, count, element):
|
|
"""Remove elements equal to the given value from a list
|
|
|
|
Args:
|
|
key: The list key
|
|
count: The number of occurrences to remove (positive: from head, negative: from tail, 0: all)
|
|
element: The element to remove
|
|
|
|
Returns:
|
|
The number of elements removed
|
|
"""
|
|
try:
|
|
self._check_connection()
|
|
key_bytes = self._to_bytes(key)
|
|
element_bytes = self._to_bytes(element)
|
|
result = self.lib.ListRemove(
|
|
self.client, key_bytes, count, element_bytes
|
|
)
|
|
logger.debug(
|
|
f"ListRemove on key '{key}' removing {count} of element '{element}': {result}"
|
|
)
|
|
return result
|
|
except ConnectionError as e:
|
|
logger.error(f"Connection error in list_remove: {e}")
|
|
raise
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Error in list_remove: {e}. Traceback:\n{traceback.format_exc()}"
|
|
)
|
|
return 0
|
|
#endregion
|
|
|
|
#region Hash operations
|
|
def hash_set(self, key, field, value):
|
|
"""Set a field in a hash
|
|
|
|
Args:
|
|
key: The hash key
|
|
field: The field name
|
|
value: The field value
|
|
|
|
Returns:
|
|
True if successful
|
|
"""
|
|
try:
|
|
self._check_connection()
|
|
key_bytes = self._to_bytes(key)
|
|
field_bytes = self._to_bytes(field)
|
|
value_bytes = self._to_bytes(value)
|
|
result = self.lib.HashSet(
|
|
self.client, key_bytes, field_bytes, value_bytes
|
|
)
|
|
logger.debug(f"HashSet on key '{key}', field '{field}': {result}")
|
|
return result
|
|
except ConnectionError as e:
|
|
logger.error(f"Connection error in hash_set: {e}")
|
|
raise
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Error in hash_set: {e}. Traceback:\n{traceback.format_exc()}"
|
|
)
|
|
return False
|
|
|
|
def hash_get(self, key, field):
|
|
"""Get a field from a hash
|
|
|
|
Args:
|
|
key: The hash key
|
|
field: The field name
|
|
|
|
Returns:
|
|
The field value, or None if not found
|
|
"""
|
|
try:
|
|
self._check_connection()
|
|
key_bytes = self._to_bytes(key)
|
|
field_bytes = self._to_bytes(field)
|
|
result = self.lib.HashGet(self.client, key_bytes, field_bytes)
|
|
logger.debug(f"HashGet raw result: {result}")
|
|
|
|
if result:
|
|
try:
|
|
# Process the result based on its type
|
|
if isinstance(result, bytes):
|
|
# Directly decode the bytes object
|
|
value = result.decode("utf-8")
|
|
else:
|
|
# Handle as C pointer to string
|
|
value = self._from_bytes(result)
|
|
# Free the allocated string
|
|
logger.debug(f"About to free string at address: {result}")
|
|
self._free_string(result)
|
|
|
|
logger.debug(f"HashGet on key '{key}', field '{field}': {value}")
|
|
return value
|
|
except Exception as e:
|
|
logger.error(f"Error processing HashGet result: {e}")
|
|
# Try to free if it was a C pointer
|
|
if result and not isinstance(result, bytes):
|
|
try:
|
|
self._free_string(result)
|
|
except Exception as free_e:
|
|
logger.error(f"Error freeing HashGet result: {free_e}")
|
|
return None
|
|
logger.debug(f"HashGet on key '{key}', field '{field}': Not found.")
|
|
return None
|
|
except ConnectionError as e:
|
|
logger.error(f"Connection error in hash_get: {e}")
|
|
raise
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Error in hash_get: {e}. Traceback:\n{traceback.format_exc()}"
|
|
)
|
|
return None
|
|
|
|
def hash_delete(self, key, field):
|
|
"""Delete a field from a hash
|
|
|
|
Args:
|
|
key: The hash key
|
|
field: The field name
|
|
|
|
Returns:
|
|
True if successful
|
|
"""
|
|
try:
|
|
self._check_connection()
|
|
key_bytes = self._to_bytes(key)
|
|
field_bytes = self._to_bytes(field)
|
|
result = self.lib.HashDelete(self.client, key_bytes, field_bytes)
|
|
logger.debug(
|
|
f"HashDelete on key '{key}', field '{field}': {result}"
|
|
)
|
|
return result
|
|
except ConnectionError as e:
|
|
logger.error(f"Connection error in hash_delete: {e}")
|
|
raise
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Error in hash_delete: {e}. Traceback:\n{traceback.format_exc()}"
|
|
)
|
|
return False
|
|
|
|
def hash_field_exists(self, key, field):
|
|
"""Check if a field exists in a hash
|
|
|
|
Args:
|
|
key: The hash key
|
|
field: The field name
|
|
|
|
Returns:
|
|
True if the field exists
|
|
"""
|
|
try:
|
|
self._check_connection()
|
|
key_bytes = self._to_bytes(key)
|
|
field_bytes = self._to_bytes(field)
|
|
result = self.lib.HashFieldExists(
|
|
self.client, key_bytes, field_bytes
|
|
)
|
|
logger.debug(
|
|
f"HashFieldExists on key '{key}', field '{field}': {result}"
|
|
)
|
|
return result
|
|
except ConnectionError as e:
|
|
logger.error(f"Connection error in hash_field_exists: {e}")
|
|
raise
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Error in hash_field_exists: {e}. Traceback:\n{traceback.format_exc()}"
|
|
)
|
|
return False
|
|
|
|
def hash_get_all(self, key):
|
|
"""Get all fields and values from a hash
|
|
|
|
Args:
|
|
key: The hash key
|
|
|
|
Returns:
|
|
A dictionary of field-value pairs
|
|
"""
|
|
try:
|
|
self._check_connection()
|
|
key_bytes = self._to_bytes(key)
|
|
result = self.lib.HashGetAll(self.client, key_bytes)
|
|
logger.debug(f"HashGetAll raw result: {result}")
|
|
|
|
if result:
|
|
try:
|
|
# The result is a newline-delimited string with field=value format
|
|
if isinstance(result, bytes):
|
|
# Directly decode the bytes object
|
|
value_str = result.decode("utf-8")
|
|
else:
|
|
# Handle as C pointer to string
|
|
value_str = self._from_bytes(result)
|
|
# Free the allocated string
|
|
self._free_string(result)
|
|
|
|
# Parse the field=value format
|
|
data = {}
|
|
if value_str:
|
|
pairs = value_str.split('\n')
|
|
for pair in pairs:
|
|
if '=' in pair:
|
|
field, value = pair.split('=', 1)
|
|
data[field] = value
|
|
|
|
logger.debug(f"HashGetAll on key '{key}': {data}")
|
|
return data
|
|
except Exception as e:
|
|
logger.error(f"Error processing HashGetAll result: {e}")
|
|
# Try to free if it was a C pointer
|
|
if result and not isinstance(result, bytes):
|
|
try:
|
|
self._free_string(result)
|
|
except Exception as free_e:
|
|
logger.error(f"Error freeing HashGetAll result: {free_e}")
|
|
return {}
|
|
logger.debug(f"HashGetAll on key '{key}': Hash is empty")
|
|
return {}
|
|
except ConnectionError as e:
|
|
logger.error(f"Connection error in hash_get_all: {e}")
|
|
raise
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Error in hash_get_all: {e}. Traceback:\n{traceback.format_exc()}"
|
|
)
|
|
return {}
|
|
|
|
def hash_multi_set(self, key, field_values):
|
|
"""Set multiple fields in a hash at once
|
|
|
|
Args:
|
|
key: The hash key
|
|
field_values: A dictionary of field-value pairs
|
|
|
|
Returns:
|
|
True if successful
|
|
"""
|
|
try:
|
|
self._check_connection()
|
|
key_bytes = self._to_bytes(key)
|
|
|
|
# Build the command arguments
|
|
args = []
|
|
for field, value in field_values.items():
|
|
args.append(field)
|
|
args.append(value)
|
|
|
|
args_str = " ".join(args)
|
|
args_bytes = self._to_bytes(args_str)
|
|
|
|
result = self.lib.ExecuteCommand(self.client, b"HMSET", args_bytes)
|
|
if result:
|
|
success = self._from_bytes(result) == "+OK\r\n"
|
|
self._free_string(result)
|
|
logger.debug(
|
|
f"HashMultiSet on key '{key}' with {len(field_values)} fields: {success}"
|
|
)
|
|
return success
|
|
logger.warning(f"HashMultiSet on key '{key}' failed")
|
|
return False
|
|
except ConnectionError as e:
|
|
logger.error(f"Connection error in hash_multi_set: {e}")
|
|
raise
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Error in hash_multi_set: {e}. Traceback:\n{traceback.format_exc()}"
|
|
)
|
|
return False
|
|
#endregion
|
|
|
|
#region Pipeline operations
|
|
def set_pipeline_mode(self, enabled):
|
|
"""Enable or disable pipeline mode
|
|
|
|
Args:
|
|
enabled: True to enable pipeline mode, False to disable
|
|
|
|
Returns:
|
|
True if successful
|
|
"""
|
|
try:
|
|
self._check_connection()
|
|
logger.debug(f"Setting pipeline mode to {enabled}")
|
|
result = self.lib.SetPipelineMode(self.client, enabled)
|
|
logger.debug(f"SetPipelineMode to {enabled}: {result}")
|
|
return result
|
|
except ConnectionError as e:
|
|
logger.error(f"Connection error in set_pipeline_mode: {e}")
|
|
raise
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Error in set_pipeline_mode: {e}. Traceback:\n{traceback.format_exc()}"
|
|
)
|
|
return False
|
|
|
|
def set_batch_size(self, size):
|
|
"""Set the batch size for pipelined commands
|
|
|
|
Args:
|
|
size: The maximum number of commands to batch before sending
|
|
|
|
Returns:
|
|
True if successful
|
|
"""
|
|
try:
|
|
self._check_connection()
|
|
logger.debug(f"Setting batch size to {size}")
|
|
result = self.lib.SetBatchSize(self.client, size)
|
|
logger.debug(f"SetBatchSize to {size}: {result}")
|
|
return result
|
|
except ConnectionError as e:
|
|
logger.error(f"Connection error in set_batch_size: {e}")
|
|
raise
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Error in set_batch_size: {e}. Traceback:\n{traceback.format_exc()}"
|
|
)
|
|
return False
|
|
|
|
def get_queued_command_count(self):
|
|
"""Get the current number of queued commands
|
|
|
|
Returns:
|
|
The number of queued commands
|
|
"""
|
|
try:
|
|
self._check_connection()
|
|
result = self.lib.GetQueuedCommandCount(self.client)
|
|
logger.debug(f"GetQueuedCommandCount: {result}")
|
|
return result
|
|
except ConnectionError as e:
|
|
logger.error(f"Connection error in get_queued_command_count: {e}")
|
|
raise
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Error in get_queued_command_count: {e}. Traceback:\n{traceback.format_exc()}"
|
|
)
|
|
return 0
|
|
|
|
def flush_pipeline(self):
|
|
"""Flush any queued commands in pipeline mode
|
|
|
|
Returns:
|
|
The response from the server
|
|
"""
|
|
try:
|
|
self._check_connection()
|
|
result = self.lib.FlushPipeline(self.client)
|
|
if result:
|
|
try:
|
|
# Handle as bytes or C pointer
|
|
if isinstance(result, bytes):
|
|
# Directly decode bytes
|
|
response = result.decode("utf-8")
|
|
else:
|
|
# Handle as C pointer
|
|
response = self._from_bytes(result)
|
|
self._free_string(result)
|
|
|
|
logger.debug(f"Flushed pipeline. Response: {response}")
|
|
return response
|
|
except Exception as e:
|
|
logger.error(f"Error processing FlushPipeline result: {e}")
|
|
# Try to free if needed
|
|
if result and not isinstance(result, bytes):
|
|
try:
|
|
self._free_string(result)
|
|
except Exception as free_e:
|
|
logger.error(f"Error freeing FlushPipeline result: {free_e}")
|
|
return None
|
|
logger.warning("FlushPipeline returned NULL")
|
|
return None
|
|
except ConnectionError as e:
|
|
logger.error(f"Connection error in flush_pipeline: {e}")
|
|
raise
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Error in flush_pipeline: {e}. Traceback:\n{traceback.format_exc()}"
|
|
)
|
|
return None
|
|
|
|
def is_pipeline_mode(self):
|
|
"""Check if pipeline mode is enabled
|
|
|
|
Returns:
|
|
True if pipeline mode is enabled
|
|
"""
|
|
try:
|
|
self._check_connection()
|
|
result = self.lib.IsPipelineMode(self.client)
|
|
logger.debug(f"IsPipelineMode: {result}")
|
|
return result
|
|
except ConnectionError as e:
|
|
logger.error(f"Connection error in is_pipeline_mode: {e}")
|
|
raise
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Error in is_pipeline_mode: {e}. Traceback:\n{traceback.format_exc()}"
|
|
)
|
|
return False
|
|
|
|
def get_batch_size(self):
|
|
"""Get the current batch size
|
|
|
|
Returns:
|
|
The current batch size
|
|
"""
|
|
try:
|
|
self._check_connection()
|
|
result = self.lib.GetBatchSize(self.client)
|
|
logger.debug(f"GetBatchSize: {result}")
|
|
return result
|
|
except ConnectionError as e:
|
|
logger.error(f"Connection error in get_batch_size: {e}")
|
|
raise
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Error in get_batch_size: {e}. Traceback:\n{traceback.format_exc()}"
|
|
)
|
|
return 0
|
|
#endregion
|
|
|
|
def main():
|
|
logger.info("Starting FireflyDatabase test...")
|
|
|
|
try:
|
|
# Basic usage
|
|
logger.info("Creating FireflyDatabase instance...")
|
|
with FireflyDatabase(
|
|
host="135.134.202.221", port=6379, password="xyz123"
|
|
) as db:
|
|
logger.info("Connected to Firefly server")
|
|
logger.info("Performing operations...")
|
|
|
|
logger.info("About to call ping()...")
|
|
try:
|
|
ping_result = db.ping()
|
|
logger.info(f"Ping completed with result: {ping_result}")
|
|
if not ping_result:
|
|
logger.error("Ping failed, exiting test.")
|
|
exit(1)
|
|
except Exception as ping_error:
|
|
logger.error(f"Exception during ping: {ping_error}")
|
|
logger.error(f"Exception type: {type(ping_error)}")
|
|
logger.error(traceback.format_exc())
|
|
exit(1) # Exit if ping fails
|
|
|
|
logger.info("After ping attempt")
|
|
|
|
# String operations
|
|
logger.info("Testing Database")
|
|
|
|
db.string_set("test", "hello world")
|
|
value = db.string_get("test")
|
|
logger.info(f"Value: {value}")
|
|
|
|
# List operations
|
|
db.list_right_push("mylist", "item1")
|
|
db.list_right_push("mylist", "item2")
|
|
db.list_right_push("mylist", "item3")
|
|
|
|
items = db.list_range("mylist", 0, -1)
|
|
logger.info("List items:")
|
|
for item in items:
|
|
logger.info(f" - {item}")
|
|
|
|
# Hash operations
|
|
db.hash_set("user:1", "name", "John")
|
|
db.hash_set("user:1", "email", "john@example.com")
|
|
|
|
name = db.hash_get("user:1", "name")
|
|
logger.info(f"Name: {name}")
|
|
|
|
user_data = db.hash_get_all("user:1")
|
|
logger.info("User data:")
|
|
for field, value in user_data.items():
|
|
logger.info(f" {field}: {value}")
|
|
|
|
# Pipeline operations
|
|
logger.info("\nPipeline Operations:")
|
|
db.set_pipeline_mode(True)
|
|
logger.info("Note: When in pipeline mode, operations return 'QUEUED' instead of actual values")
|
|
logger.info("Values aren't actually set until flush_pipeline() is called")
|
|
db.set_batch_size(100)
|
|
|
|
# Add dummy entries at index 0 for each data type to handle Redis pipeline reordering
|
|
logger.info("Adding dummy entries for each data type to handle Redis pipeline shifting")
|
|
db.string_set("pipeline:string:0", "dummy-string")
|
|
db.list_right_push("pipeline:list:0", "dummy-list-item")
|
|
db.hash_set("pipeline:hash", "dummy-field", "dummy-value")
|
|
|
|
# Queue some commands with unique identifiable values
|
|
# Commands will return QUEUED but not be executed until flush_pipeline
|
|
for i in range(1, 6):
|
|
# Use distinct prefixes for each key type to identify in results
|
|
db.string_set(f"pipeline:string:{i}", f"string-value-{i}")
|
|
# Use separate list keys for each list item to prevent overwriting
|
|
db.list_right_push(f"pipeline:list", f"list-item-{i}")
|
|
# Make sure we use field names that match our expected pattern
|
|
db.hash_set("pipeline:hash", f"field-{i}", f"hash-value-{i}")
|
|
|
|
logger.info(f"Queued commands: {db.get_queued_command_count()}")
|
|
logger.info("Results should be obtained after flushing the pipeline")
|
|
|
|
# Flush pipeline (execute all queued commands as a batch)
|
|
result = db.flush_pipeline()
|
|
logger.info(f"Pipeline flush result: {result}")
|
|
|
|
logger.info("After pipeline flush, we need to exit pipeline mode to get actual values")
|
|
|
|
# Disable pipeline mode before verifying results to see actual values, not QUEUED
|
|
db.set_pipeline_mode(False)
|
|
logger.info("Pipeline mode disabled")
|
|
|
|
# Verify results - include index 0 for completeness
|
|
logger.info("\nVerifying results (note: Redis pipeline responses might not match input order):")
|
|
for i in range(0, 6): # Start from 0 to include dummy entry
|
|
value = db.string_get(f"pipeline:string:{i}")
|
|
logger.info(f"String key:{i} = {value}")
|
|
|
|
# Check each individual list including dummy list
|
|
for i in range(0, 6): # Start from 0 to include dummy entry
|
|
items = db.list_range("pipeline:list", 0, -1)
|
|
logger.info(f"List items: {items}")
|
|
|
|
# Fix the hash_get_all implementation issue
|
|
hash_data = db.hash_get_all("pipeline:hash")
|
|
|
|
# If hash_data is empty but we expect data, try a different approach
|
|
if not hash_data:
|
|
logger.info("Hash appears empty from hash_get_all, trying alternative approach...")
|
|
# Try to get each field individually
|
|
hash_data = {}
|
|
# Get the dummy field first
|
|
dummy_value = db.hash_get("pipeline:hash", "dummy-field")
|
|
if dummy_value:
|
|
hash_data["dummy-field"] = dummy_value
|
|
|
|
# Try to get each field we expect
|
|
for i in range(1, 6):
|
|
field_name = f"field-{i}"
|
|
field_value = db.hash_get("pipeline:hash", field_name)
|
|
if field_value:
|
|
hash_data[field_name] = field_value
|
|
|
|
logger.info("Hash fields:")
|
|
for field, value in hash_data.items():
|
|
logger.info(f" {field}: {value}")
|
|
|
|
# Cleanup - don't forget to clean up the dummy entries too
|
|
for i in range(0, 6): # Start from 0 to include dummy entry
|
|
db.delete(f"pipeline:string:{i}")
|
|
db.delete(f"pipeline:list:{i}")
|
|
db.delete("pipeline:hash")
|
|
logger.info("Cleanup complete")
|
|
except Exception as e:
|
|
logger.error(f"Exception occurred: {e}")
|
|
logger.error(traceback.format_exc())
|
|
finally:
|
|
logger.info("Test completed")
|
|
logger.info("Script completed successfully.")
|
|
# print("exiting")
|
|
# sys.exit()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|