""" ScanOrg101.py - Enhanced file scanning and organization module """ # flake8: noqa: E501 import os import concurrent.futures from collections import deque import time from PyQt6.QtCore import Qt, QThread, QSortFilterProxyModel, pyqtSignal from dbman import FireflyDB from metaextract import MetadataExtractor, mutagen from archiver import ArchiveExtractor # Directory Filter Proxy Model class DirectoryFilterProxyModel(QSortFilterProxyModel): def __init__(self): super().__init__() self.setFilterCaseSensitivity(Qt.CaseSensitivity.CaseInsensitive) self.setFilterKeyColumn(0) def filterAcceptsRow(self, source_row, source_parent): source_model = self.sourceModel() if source_model is None: return False index = source_model.index(source_row, 0, source_parent) if hasattr(source_model, "isDir"): return source_model.isDir(index) # type: ignore return False # File Filter Proxy Model class FileFilterProxyModel(QSortFilterProxyModel): def __init__(self): super().__init__() self.setFilterCaseSensitivity(Qt.CaseSensitivity.CaseInsensitive) self.setFilterKeyColumn(0) self.allowed_extensions = [ ".zip", ".mp3", ".wav", ".flac", ".mid", ".midi", ".aiff", ".aif", ".aifc", ".au", ".snd", ".wv", ".wma", ".m4a", ".7z", ".rar", ] def filterAcceptsRow(self, source_row, source_parent): source_model = self.sourceModel() if source_model is None: return False index = source_model.index(source_row, 0, source_parent) if hasattr(source_model, "isDir") and source_model.isDir(index): # type: ignore return True if hasattr(source_model, "fileName"): return source_model.fileName(index).endswith( # type: ignore tuple(self.allowed_extensions) ) return False # Enhanced File Scanner with optimizations class FileScanner(QThread): items_found = pyqtSignal(list) # Now emits batches of items scan_complete = pyqtSignal() progress_update = pyqtSignal(int) directory_scanned = pyqtSignal(str) # New signal for lazy loading def __init__(self, path, batch_size=500, max_workers=4): """ Initialize the file scanner with performance optimizations Args: path: Starting path to scan batch_size: Number of items to collect before emitting a batch max_workers: Maximum number of parallel scanning threads """ super().__init__() self.path = path self.stop_requested = False self.cache = {} self.scanned_directories = ( set() ) # Track which directories have been scanned self.batch_size = batch_size self.max_workers = max_workers self.allowed_extensions = { ".mid", ".midi", ".mp3", ".wav", ".ogg", ".flac", ".aac", ".m4a", ".wma", ".flp", ".als", ".logic", ".logicx", ".ptx", ".pts", ".cpr", ".rpp", ".reason", ".sng", ".ardour", ".bwproject", ".zip", ".7z", ".rar", } def run(self): """Main thread run method - only scans the root path initially""" # Check cache first if self.path in self.cache: self.items_found.emit(self.cache[self.path]) self.scan_complete.emit() return # Only scan the top level directory initially (lazy loading) self.scan_single_directory(self.path) self.scan_complete.emit() def scan_directory_recursive(self, path): """ Recursively scan a directory - used when explicitly requesting a full scan of all subdirectories """ if path in self.cache: return self.cache[path] items = [] batch = [] dirs_to_scan = deque([path]) # type: ignore # For progress estimation start_time = time.time() # type: ignore progress_update_interval = 0.2 # seconds last_update_time = start_time entries_processed = 0 # Estimate total number of items try: sample_count = len(list(os.scandir(path))) estimated_total = sample_count * 10 # Simple heuristic except (PermissionError, OSError): estimated_total = 1000 # Fallback estimate with concurrent.futures.ThreadPoolExecutor( max_workers=self.max_workers ) as executor: futures = {} while dirs_to_scan and not self.stop_requested: # Process directories in parallel while dirs_to_scan and len(futures) < self.max_workers: dir_path = dirs_to_scan.popleft() if dir_path not in self.scanned_directories: futures[ executor.submit( self.scan_single_directory_helper, dir_path ) ] = dir_path # Process completed directories for future in list( concurrent.futures.as_completed(futures.keys()) ): dir_path = futures.pop(future) try: dir_items, subdirs = future.result() entries_processed += len(dir_items) # Add results to our list items.extend(dir_items) batch.extend(dir_items) # Add subdirectories to our queue dirs_to_scan.extend(subdirs) # Mark directory as scanned self.scanned_directories.add(dir_path) self.directory_scanned.emit(dir_path) # Emit batch if it's full if len(batch) >= self.batch_size: self.items_found.emit(batch) batch = [] # Update progress periodically current_time = time.time() # type: ignore if ( current_time - last_update_time > progress_update_interval ): # Simple progress estimation progress = min( 99, int(entries_processed / estimated_total * 100), ) self.progress_update.emit(progress) last_update_time = current_time except Exception as e: print(f"Error scanning directory {dir_path}: {e}") # Emit any remaining items in the final batch if batch and not self.stop_requested: self.items_found.emit(batch) # Store in cache self.cache[path] = items self.progress_update.emit(100) # Final update return items def scan_single_directory(self, path): """ Scan a single directory without recursion - supports lazy loading """ if self.stop_requested: return [] if path in self.cache: items = self.cache[path] self.items_found.emit(items) return items try: items = [] with os.scandir(path) as entries: for entry in entries: if self.stop_requested: break if entry.is_dir(): # For directories, just add them to the list # but don't scan them yet (lazy loading) items.append((entry.path, True)) elif entry.is_file() and entry.name.lower().endswith( tuple(self.allowed_extensions) ): items.append((entry.path, False)) # Store in cache and emit self.cache[path] = items self.items_found.emit(items) self.scanned_directories.add(path) self.directory_scanned.emit(path) self.progress_update.emit(100) # Show complete for this directory return items except PermissionError: print(f"Permission denied: {path}") return [] except OSError as e: print(f"Error accessing {path}: {e}") return [] def scan_single_directory_helper(self, path): """Helper method for parallel directory scanning""" items = [] subdirs = [] try: with os.scandir(path) as entries: for entry in entries: if self.stop_requested: break if entry.is_dir(): items.append((entry.path, True)) subdirs.append(entry.path) elif entry.is_file() and entry.name.lower().endswith( tuple(self.allowed_extensions) ): items.append((entry.path, False)) except (PermissionError, OSError) as e: print(f"Error accessing {path}: {e}") return items, subdirs def request_directory_scan(self, path): """Request a scan of a specific directory (for lazy loading)""" if path in self.scanned_directories: return items = self.scan_single_directory(path) return items def request_full_scan(self): """Request a full recursive scan of all subdirectories""" items = self.scan_directory_recursive(self.path) self.scan_complete.emit() return items def stop(self): self.stop_requested = True # Main Organizer class class Organizer: def __init__( self, use_db=False, db_host="localhost", db_port=6379, db_password=None ): self.file_list = [] self.dir_list = [] self.scanner = None self.metadata_extractor = None self.archive_extractor = None # Signals for UI updates self.on_scan_complete = None self.on_progress_update = None self.on_metadata_complete = None # Database integration - use FireflyDB from dbman.py self.use_db = use_db self.db_manager = FireflyDB() if use_db: self.db_manager.connect_to(use_db, db_host, db_port, db_password) self.db = self.db_manager.db else: self.db = None def close(self): """Close database connection when done""" if hasattr(self, "db_manager"): self.db_manager.close() self.db = None def start_scan(self, path): """Start scanning a directory for files""" self.file_list.clear() self.dir_list.clear() self.scanner = FileScanner(path) self.scanner.items_found.connect(self.add_items) self.scanner.scan_complete.connect(self.scan_finished) # Connect progress signal if handler exists if self.on_progress_update: self.scanner.progress_update.connect(self.on_progress_update) self.scanner.start() def add_items(self, items): """Process items found during scanning""" for path, is_dir in items: if is_dir: self.dir_list.append(path) else: self.file_list.append(path) def scan_finished(self): """Handle scan completion""" print( f"Scan complete. Found {len(self.dir_list)} directories\ and {len(self.file_list)} files." ) if self.on_scan_complete: self.on_scan_complete() def stop_scan(self): """Stop the current scan operation""" if self.scanner: self.scanner.stop() self.scanner.wait() def extract_metadata(self): """Extract metadata from audio files using MetadataExtractor from metaextract.py""" if not self.file_list: print("No files to extract metadata from") return # Verify database connection if enabled if self.use_db and self.db: if not self.db_manager.verify_database_connection(): print( "Warning: Database verification failed, continuing without database" ) self.use_db = False self.db = None # Use MetadataExtractor from metaextract.py self.metadata_extractor = MetadataExtractor(self.file_list) self.metadata_extractor.metadata_extracted.connect( self.process_metadata ) self.metadata_extractor.extraction_complete.connect( self.metadata_extraction_complete ) # Connect progress signal if handler exists if self.on_progress_update: self.metadata_extractor.progress_update.connect( self.on_progress_update ) # Set the callback for metadata completion self.metadata_extractor.on_metadata_complete = ( self.on_metadata_complete ) self.metadata_extractor.start() def process_metadata(self, metadata): """Process extracted metadata using FireflyDB from dbman.py""" # Use the database manager to process metadata if hasattr(self, "db_manager"): self.db_manager.process_metadata(metadata) # Also update local sets for UI display if "artist" in metadata and metadata["artist"]: if not hasattr(self, "artists"): self.artists = set() self.artists.add(metadata["artist"]) if "album" in metadata and metadata["album"]: if not hasattr(self, "albums"): self.albums = set() self.albums.add(metadata["album"]) if "genre" in metadata and metadata["genre"]: if not hasattr(self, "genres"): self.genres = set() self.genres.add(metadata["genre"]) if "year" in metadata and metadata["year"]: if not hasattr(self, "years"): self.years = set() self.years.add(metadata["year"]) def metadata_extraction_complete(self): """Handle metadata extraction completion""" print( f"Metadata extraction complete. Artists: {len(getattr(self, 'artists', []))}, " f"Albums: {len(getattr(self, 'albums', []))}, Genres: {len(getattr(self, 'genres', []))}, " f"Years: {len(getattr(self, 'years', []))}" ) def extract_archive(self, archive_path, extraction_dir): """Extract an archive to specified directory using ArchiveExtractor from archiver.py""" if not os.path.isfile(archive_path): print(f"Error: Archive file {archive_path} does not exist") return False if not os.path.isdir(extraction_dir): print( f"Error: Extraction directory {extraction_dir} does not exist" ) return False print(f"Extracting archive {archive_path} to {extraction_dir}") # Create an ArchiveExtractor instance from archiver.py self.archive_extractor = ArchiveExtractor(archive_path, extraction_dir) # Connect signals if self.on_progress_update: self.archive_extractor.extraction_progress.connect( self.on_progress_update ) # Define completion handler def on_extraction_complete(extracted_files): print( f"Archive extraction complete. Extracted {len(extracted_files)} files." ) # Add extracted files to our file list if they match our criteria for file_path in extracted_files: if os.path.isfile(file_path) and any( file_path.lower().endswith(ext) for ext in [ ".mp3", ".wav", ".flac", ".m4a", ".wma", ".mid", ".midi", ] ): self.file_list.append(file_path) # Automatically extract metadata from audio files if enabled audio_files = [ f for f in extracted_files if any( f.lower().endswith(ext) for ext in [ ".mp3", ".wav", ".flac", ".m4a", ".wma", ".mid", ".midi", ] ) ] if audio_files and self.use_db: print( f"Found {len(audio_files)} audio files in archive, extracting metadata..." ) temp_extractor = MetadataExtractor(audio_files) temp_extractor.metadata_extracted.connect( self.process_metadata ) temp_extractor.start() # Connect completion signal self.archive_extractor.extraction_complete.connect( on_extraction_complete ) # Define error handler def on_extraction_error(error_message): print(f"Archive extraction error: {error_message}") # Connect error signal self.archive_extractor.extraction_error.connect(on_extraction_error) # Start extraction self.archive_extractor.start() return True def extract_archive_to_directory( self, archive_path, target_directory=None ): """Extract an archive to a specified directory or to a subdirectory in the same location""" if not os.path.isfile(archive_path): print(f"Error: Archive file {archive_path} does not exist") return False # If no target directory is specified, create one based on the archive name if not target_directory: archive_name = os.path.splitext(os.path.basename(archive_path))[0] target_directory = os.path.join( os.path.dirname(archive_path), archive_name ) # Create the directory if it doesn't exist if not os.path.exists(target_directory): try: os.makedirs(target_directory) print(f"Created directory {target_directory}") except OSError as e: print(f"Error creating directory {target_directory}: {e}") return False return self.extract_archive(archive_path, target_directory) def has_metadata_in_db(self, file_path): """Check if metadata for a file already exists in the database""" if not self.use_db or not self.db: return False try: # Delegate to the db_manager return self.db_manager.has_metadata_in_db(file_path) except Exception as e: print(f"Error checking metadata in database: {e}") return False def get_metadata_from_db(self, file_path): """Retrieve metadata for a file from the database""" if not self.use_db or not self.db: return None try: # Delegate to the db_manager return self.db_manager.get_metadata_from_db(file_path) except Exception as e: print(f"Error retrieving metadata from database: {e}") return None def store_metadata(self, metadata): """Store audio file metadata in the database""" if not self.use_db or not self.db: print("Database usage is disabled, not storing metadata") return False try: # Delegate to the db_manager return self.db_manager.store_metadata(metadata) except Exception as e: print(f"Error storing metadata in database: {e}") import traceback traceback.print_exc() return False