Fbrowser/ScanOrg101.py
Stan44 a69391b1d8 toggle logging added (--debug-on) to enable logging
minor bug fixes.
known stutter bug in database/meta systems
appears to be the delay in connecting to the database.

future work:
1. intergrate firefly.dll as server module.
2. ensure full database funcctionality.
3. add toggle to use database or not. (by default we check for firefly if we don't find we default to python systems. if found we automatically use firefly.(so maybe on toggles))
4. investigate and fix the stutter bug.
2025-04-11 23:59:50 -05:00

625 lines
21 KiB
Python

"""
ScanOrg101.py - Enhanced file scanning and organization module
"""
# flake8: noqa: E501
import os
import logging
import concurrent.futures
from collections import deque
import time
from PyQt6.QtCore import Qt, QThread, QSortFilterProxyModel, pyqtSignal
from dbman import FireflyDB
from metaextract import MetadataExtractor, mutagen
from archiver import ArchiveExtractor
# Get the logger
logger = logging.getLogger("fbroswer")
# Directory Filter Proxy Model
class DirectoryFilterProxyModel(QSortFilterProxyModel):
def __init__(self):
super().__init__()
self.setFilterCaseSensitivity(Qt.CaseSensitivity.CaseInsensitive)
self.setFilterKeyColumn(0)
logger.debug("DirectoryFilterProxyModel initialized")
def filterAcceptsRow(self, source_row, source_parent):
source_model = self.sourceModel()
if source_model is None:
return False
index = source_model.index(source_row, 0, source_parent)
if hasattr(source_model, "isDir"):
return source_model.isDir(index) # type: ignore
return False
# File Filter Proxy Model
class FileFilterProxyModel(QSortFilterProxyModel):
def __init__(self):
super().__init__()
self.setFilterCaseSensitivity(Qt.CaseSensitivity.CaseInsensitive)
self.setFilterKeyColumn(0)
self.allowed_extensions = [
".zip",
".mp3",
".wav",
".flac",
".mid",
".midi",
".aiff",
".aif",
".aifc",
".au",
".snd",
".wv",
".wma",
".m4a",
".7z",
".rar",
]
logger.debug(
"FileFilterProxyModel initialized with allowed extensions"
)
def filterAcceptsRow(self, source_row, source_parent):
source_model = self.sourceModel()
if source_model is None:
return False
index = source_model.index(source_row, 0, source_parent)
if hasattr(source_model, "isDir") and source_model.isDir(index): # type: ignore
return True
if hasattr(source_model, "fileName"):
return source_model.fileName(index).endswith( # type: ignore
tuple(self.allowed_extensions)
)
return False
# Enhanced File Scanner with optimizations
class FileScanner(QThread):
items_found = pyqtSignal(list) # Now emits batches of items
scan_complete = pyqtSignal()
progress_update = pyqtSignal(int)
directory_scanned = pyqtSignal(str) # New signal for lazy loading
def __init__(self, path, batch_size=500, max_workers=4):
"""
Initialize the file scanner with performance optimizations
Args:
path: Starting path to scan
batch_size: Number of items to collect before emitting a batch
max_workers: Maximum number of parallel scanning threads
"""
super().__init__()
self.path = path
self.stop_requested = False
self.cache = {}
self.scanned_directories = (
set()
) # Track which directories have been scanned
self.batch_size = batch_size
self.max_workers = max_workers
self.allowed_extensions = {
".mid",
".midi",
".mp3",
".wav",
".ogg",
".flac",
".aac",
".m4a",
".wma",
".flp",
".als",
".logic",
".logicx",
".ptx",
".pts",
".cpr",
".rpp",
".reason",
".sng",
".ardour",
".bwproject",
".zip",
".7z",
".rar",
}
def run(self):
"""Main thread run method - only scans the root path initially"""
# Check cache first
if self.path in self.cache:
self.items_found.emit(self.cache[self.path])
self.scan_complete.emit()
return
# Only scan the top level directory initially (lazy loading)
self.scan_single_directory(self.path)
self.scan_complete.emit()
def scan_directory_recursive(self, path):
"""
Recursively scan a directory - used when explicitly requesting
a full scan of all subdirectories
"""
if path in self.cache:
return self.cache[path]
items = []
batch = []
dirs_to_scan = deque([path]) # type: ignore
# For progress estimation
start_time = time.time() # type: ignore
progress_update_interval = 0.2 # seconds
last_update_time = start_time
entries_processed = 0
# Estimate total number of items
try:
sample_count = len(list(os.scandir(path)))
estimated_total = sample_count * 10 # Simple heuristic
except (PermissionError, OSError):
estimated_total = 1000 # Fallback estimate
with concurrent.futures.ThreadPoolExecutor(
max_workers=self.max_workers
) as executor:
futures = {}
while dirs_to_scan and not self.stop_requested:
# Process directories in parallel
while dirs_to_scan and len(futures) < self.max_workers:
dir_path = dirs_to_scan.popleft()
if dir_path not in self.scanned_directories:
futures[
executor.submit(
self.scan_single_directory_helper, dir_path
)
] = dir_path
# Process completed directories
for future in list(
concurrent.futures.as_completed(futures.keys())
):
dir_path = futures.pop(future)
try:
dir_items, subdirs = future.result()
entries_processed += len(dir_items)
# Add results to our list
items.extend(dir_items)
batch.extend(dir_items)
# Add subdirectories to our queue
dirs_to_scan.extend(subdirs)
# Mark directory as scanned
self.scanned_directories.add(dir_path)
self.directory_scanned.emit(dir_path)
# Emit batch if it's full
if len(batch) >= self.batch_size:
self.items_found.emit(batch)
batch = []
# Update progress periodically
current_time = time.time() # type: ignore
if (
current_time - last_update_time
> progress_update_interval
):
# Simple progress estimation
progress = min(
99,
int(entries_processed / estimated_total * 100),
)
self.progress_update.emit(progress)
last_update_time = current_time
except Exception as e:
logger.debug(
f"Error scanning directory {dir_path}: {e}"
)
# Emit any remaining items in the final batch
if batch and not self.stop_requested:
self.items_found.emit(batch)
# Store in cache
self.cache[path] = items
self.progress_update.emit(100) # Final update
return items
def scan_single_directory(self, path):
"""
Scan a single directory without recursion - supports lazy loading
"""
if self.stop_requested:
return []
if path in self.cache:
items = self.cache[path]
self.items_found.emit(items)
return items
try:
items = []
with os.scandir(path) as entries:
for entry in entries:
if self.stop_requested:
break
if entry.is_dir():
# For directories, just add them to the list
# but don't scan them yet (lazy loading)
items.append((entry.path, True))
elif entry.is_file() and entry.name.lower().endswith(
tuple(self.allowed_extensions)
):
items.append((entry.path, False))
# Store in cache and emit
self.cache[path] = items
self.items_found.emit(items)
self.scanned_directories.add(path)
self.directory_scanned.emit(path)
self.progress_update.emit(100) # Show complete for this directory
return items
except PermissionError:
logger.debug(f"Permission denied: {path}")
return []
except OSError as e:
logger.debug(f"Error accessing {path}: {e}")
return []
def scan_single_directory_helper(self, path):
"""Helper method for parallel directory scanning"""
items = []
subdirs = []
try:
with os.scandir(path) as entries:
for entry in entries:
if self.stop_requested:
break
if entry.is_dir():
items.append((entry.path, True))
subdirs.append(entry.path)
elif entry.is_file() and entry.name.lower().endswith(
tuple(self.allowed_extensions)
):
items.append((entry.path, False))
except (PermissionError, OSError) as e:
logger.debug(f"Error accessing {path}: {e}")
return items, subdirs
def request_directory_scan(self, path):
"""Request a scan of a specific directory (for lazy loading)"""
if path in self.scanned_directories:
return
items = self.scan_single_directory(path)
return items
def request_full_scan(self):
"""Request a full recursive scan of all subdirectories"""
items = self.scan_directory_recursive(self.path)
self.scan_complete.emit()
return items
def stop(self):
self.stop_requested = True
# Main Organizer class
class Organizer:
def __init__(
self, use_db=False, db_host="localhost", db_port=6379, db_password=None
):
self.file_list = []
self.dir_list = []
self.scanner = None
self.metadata_extractor = None
self.archive_extractor = None
# Signals for UI updates
self.on_scan_complete = None
self.on_progress_update = None
self.on_metadata_complete = None
# Database integration - use FireflyDB from dbman.py
self.use_db = use_db
self.db_manager = FireflyDB()
if use_db:
self.db_manager.connect_to(use_db, db_host, db_port, db_password)
self.db = self.db_manager.db
else:
self.db = None
def close(self):
"""Close database connection when done"""
if hasattr(self, "db_manager"):
self.db_manager.close()
self.db = None
def start_scan(self, path):
"""Start scanning a directory for files"""
self.file_list.clear()
self.dir_list.clear()
self.scanner = FileScanner(path)
self.scanner.items_found.connect(self.add_items)
self.scanner.scan_complete.connect(self.scan_finished)
# Connect progress signal if handler exists
if self.on_progress_update:
self.scanner.progress_update.connect(self.on_progress_update)
self.scanner.start()
def add_items(self, items):
"""Process items found during scanning"""
for path, is_dir in items:
if is_dir:
self.dir_list.append(path)
else:
self.file_list.append(path)
def scan_finished(self):
"""Handle scan completion"""
logger.debug(
f"Scan complete. Found {len(self.dir_list)} directories\
and {len(self.file_list)} files."
)
if self.on_scan_complete:
self.on_scan_complete()
def stop_scan(self):
"""Stop the current scan operation"""
if self.scanner:
self.scanner.stop()
self.scanner.wait()
def extract_metadata(self):
"""Extract metadata from audio files using MetadataExtractor from metaextract.py"""
if not self.file_list:
logger.debug("No files to extract metadata from")
return
# Verify database connection if enabled
if self.use_db and self.db:
if not self.db_manager.verify_database_connection():
logger.debug(
"Warning: Database verification failed, continuing without database"
)
self.use_db = False
self.db = None
# Use MetadataExtractor from metaextract.py
self.metadata_extractor = MetadataExtractor(self.file_list)
self.metadata_extractor.metadata_extracted.connect(
self.process_metadata
)
self.metadata_extractor.extraction_complete.connect(
self.metadata_extraction_complete
)
# Connect progress signal if handler exists
if self.on_progress_update:
self.metadata_extractor.progress_update.connect(
self.on_progress_update
)
# Set the callback for metadata completion
self.metadata_extractor.on_metadata_complete = (
self.on_metadata_complete
)
self.metadata_extractor.start()
def process_metadata(self, metadata):
"""Process extracted metadata using FireflyDB from dbman.py"""
# Use the database manager to process metadata
if hasattr(self, "db_manager"):
self.db_manager.process_metadata(metadata)
# Also update local sets for UI display
if "artist" in metadata and metadata["artist"]:
if not hasattr(self, "artists"):
self.artists = set()
self.artists.add(metadata["artist"])
if "album" in metadata and metadata["album"]:
if not hasattr(self, "albums"):
self.albums = set()
self.albums.add(metadata["album"])
if "genre" in metadata and metadata["genre"]:
if not hasattr(self, "genres"):
self.genres = set()
self.genres.add(metadata["genre"])
if "year" in metadata and metadata["year"]:
if not hasattr(self, "years"):
self.years = set()
self.years.add(metadata["year"])
def metadata_extraction_complete(self):
"""Handle metadata extraction completion"""
logger.debug(
f"Metadata extraction complete. Artists: {len(getattr(self, 'artists', []))}, "
f"Albums: {len(getattr(self, 'albums', []))}, Genres: {len(getattr(self, 'genres', []))}, "
f"Years: {len(getattr(self, 'years', []))}"
)
def extract_archive(self, archive_path, extraction_dir):
"""Extract an archive to specified directory using ArchiveExtractor from archiver.py"""
if not os.path.isfile(archive_path):
logger.debug(f"Error: Archive file {archive_path} does not exist")
return False
if not os.path.isdir(extraction_dir):
logger.debug(
f"Error: Extraction directory {extraction_dir} does not exist"
)
return False
logger.debug(f"Extracting archive {archive_path} to {extraction_dir}")
# Create an ArchiveExtractor instance from archiver.py
self.archive_extractor = ArchiveExtractor(archive_path, extraction_dir)
# Connect signals
if self.on_progress_update:
self.archive_extractor.extraction_progress.connect(
self.on_progress_update
)
# Define completion handler
def on_extraction_complete(extracted_files):
logger.debug(
f"Archive extraction complete. Extracted {len(extracted_files)} files."
)
# Add extracted files to our file list if they match our criteria
for file_path in extracted_files:
if os.path.isfile(file_path) and any(
file_path.lower().endswith(ext)
for ext in [
".mp3",
".wav",
".flac",
".m4a",
".wma",
".mid",
".midi",
]
):
self.file_list.append(file_path)
# Automatically extract metadata from audio files if enabled
audio_files = [
f
for f in extracted_files
if any(
f.lower().endswith(ext)
for ext in [
".mp3",
".wav",
".flac",
".m4a",
".wma",
".mid",
".midi",
]
)
]
if audio_files and self.use_db:
logger.debug(
f"Found {len(audio_files)} audio files in archive, extracting metadata..."
)
temp_extractor = MetadataExtractor(audio_files)
temp_extractor.metadata_extracted.connect(
self.process_metadata
)
temp_extractor.start()
# Connect completion signal
self.archive_extractor.extraction_complete.connect(
on_extraction_complete
)
# Define error handler
def on_extraction_error(error_message):
logger.debug(f"Archive extraction error: {error_message}")
# Connect error signal
self.archive_extractor.extraction_error.connect(on_extraction_error)
# Start extraction
self.archive_extractor.start()
return True
def extract_archive_to_directory(
self, archive_path, target_directory=None
):
"""Extract an archive to a specified directory or to a subdirectory in the same location"""
if not os.path.isfile(archive_path):
logger.debug(f"Error: Archive file {archive_path} does not exist")
return False
# If no target directory is specified, create one based on the archive name
if not target_directory:
archive_name = os.path.splitext(os.path.basename(archive_path))[0]
target_directory = os.path.join(
os.path.dirname(archive_path), archive_name
)
# Create the directory if it doesn't exist
if not os.path.exists(target_directory):
try:
os.makedirs(target_directory)
logger.debug(f"Created directory {target_directory}")
except OSError as e:
logger.debug(
f"Error creating directory {target_directory}: {e}"
)
return False
return self.extract_archive(archive_path, target_directory)
def has_metadata_in_db(self, file_path):
"""Check if metadata for a file already exists in the database"""
if not self.use_db or not self.db:
return False
try:
# Delegate to the db_manager
return self.db_manager.has_metadata_in_db(file_path)
except Exception as e:
logger.debug(f"Error checking metadata in database: {e}")
return False
def get_metadata_from_db(self, file_path):
"""Retrieve metadata for a file from the database"""
if not self.use_db or not self.db:
return None
try:
# Delegate to the db_manager
return self.db_manager.get_metadata_from_db(file_path)
except Exception as e:
logger.debug(f"Error retrieving metadata from database: {e}")
return None
def store_metadata(self, metadata):
"""Store audio file metadata in the database"""
if not self.use_db or not self.db:
logger.debug("Database usage is disabled, not storing metadata")
return False
try:
# Delegate to the db_manager
return self.db_manager.store_metadata(metadata)
except Exception as e:
logger.debug(f"Error storing metadata in database: {e}")
import traceback
traceback.print_exc()
return False