""" ScanOrg101.py - Enhanced file scanning and organization module """ # flake8: noqa: E501 import os import concurrent.futures import zipfile import py7zr import rarfile # typed: ignore import mutagen from PyQt6.QtCore import Qt, QThread, QSortFilterProxyModel, pyqtSignal # Directory Filter Proxy Model class DirectoryFilterProxyModel(QSortFilterProxyModel): def __init__(self): super().__init__() self.setFilterCaseSensitivity(Qt.CaseSensitivity.CaseInsensitive) self.setFilterKeyColumn(0) def filterAcceptsRow(self, source_row, source_parent): source_model = self.sourceModel() if source_model is None: return False index = source_model.index(source_row, 0, source_parent) if hasattr(source_model, "isDir"): return source_model.isDir(index) # type: ignore return False # File Filter Proxy Model class FileFilterProxyModel(QSortFilterProxyModel): def __init__(self): super().__init__() self.setFilterCaseSensitivity(Qt.CaseSensitivity.CaseInsensitive) self.setFilterKeyColumn(0) self.allowed_extensions = [ ".zip", ".mp3", ".wav", ".flac", ".mid", ".midi", ".aiff", ".aif", ".aifc", ".au", ".snd", ".wv", ".wma", ".m4a", ".7z", ".rar", ] def filterAcceptsRow(self, source_row, source_parent): source_model = self.sourceModel() if source_model is None: return False index = source_model.index(source_row, 0, source_parent) if hasattr(source_model, "isDir") and source_model.isDir(index): # type: ignore return True if hasattr(source_model, "fileName"): return source_model.fileName(index).endswith( # type: ignore tuple(self.allowed_extensions) ) return False # Enhanced File Scanner with optimizations class FileScanner(QThread): items_found = pyqtSignal(list) # Now emits batches of items scan_complete = pyqtSignal() progress_update = pyqtSignal(int) directory_scanned = pyqtSignal(str) # New signal for lazy loading def __init__(self, path, batch_size=500, max_workers=4): """ Initialize the file scanner with performance optimizations Args: path: Starting path to scan batch_size: Number of items to collect before emitting a batch max_workers: Maximum number of parallel scanning threads """ super().__init__() self.path = path self.stop_requested = False self.cache = {} self.scanned_directories = ( set() ) # Track which directories have been scanned self.batch_size = batch_size self.max_workers = max_workers self.allowed_extensions = { ".mid", ".midi", ".mp3", ".wav", ".ogg", ".flac", ".aac", ".m4a", ".wma", ".flp", ".als", ".logic", ".logicx", ".ptx", ".pts", ".cpr", ".rpp", ".reason", ".sng", ".ardour", ".bwproject", ".zip", ".7z", ".rar", } def run(self): """Main thread run method - only scans the root path initially""" # Check cache first if self.path in self.cache: self.items_found.emit(self.cache[self.path]) self.scan_complete.emit() return # Only scan the top level directory initially (lazy loading) self.scan_single_directory(self.path) self.scan_complete.emit() def scan_directory_recursive(self, path): """ Recursively scan a directory - used when explicitly requesting a full scan of all subdirectories """ if path in self.cache: return self.cache[path] items = [] batch = [] dirs_to_scan = deque([path]) # type: ignore # For progress estimation start_time = time.time() # type: ignore progress_update_interval = 0.2 # seconds last_update_time = start_time entries_processed = 0 # Estimate total number of items try: sample_count = len(list(os.scandir(path))) estimated_total = sample_count * 10 # Simple heuristic except (PermissionError, OSError): estimated_total = 1000 # Fallback estimate with concurrent.futures.ThreadPoolExecutor( max_workers=self.max_workers ) as executor: futures = {} while dirs_to_scan and not self.stop_requested: # Process directories in parallel while dirs_to_scan and len(futures) < self.max_workers: dir_path = dirs_to_scan.popleft() if dir_path not in self.scanned_directories: futures[ executor.submit( self.scan_single_directory_helper, dir_path ) ] = dir_path # Process completed directories for future in list( concurrent.futures.as_completed(futures.keys()) ): dir_path = futures.pop(future) try: dir_items, subdirs = future.result() entries_processed += len(dir_items) # Add results to our list items.extend(dir_items) batch.extend(dir_items) # Add subdirectories to our queue dirs_to_scan.extend(subdirs) # Mark directory as scanned self.scanned_directories.add(dir_path) self.directory_scanned.emit(dir_path) # Emit batch if it's full if len(batch) >= self.batch_size: self.items_found.emit(batch) batch = [] # Update progress periodically current_time = time.time() # type: ignore if ( current_time - last_update_time > progress_update_interval ): # Simple progress estimation progress = min( 99, int(entries_processed / estimated_total * 100), ) self.progress_update.emit(progress) last_update_time = current_time except Exception as e: print(f"Error scanning directory {dir_path}: {e}") # Emit any remaining items in the final batch if batch and not self.stop_requested: self.items_found.emit(batch) # Store in cache self.cache[path] = items self.progress_update.emit(100) # Final update return items def scan_single_directory(self, path): """ Scan a single directory without recursion - supports lazy loading """ if self.stop_requested: return [] if path in self.cache: items = self.cache[path] self.items_found.emit(items) return items try: items = [] with os.scandir(path) as entries: for entry in entries: if self.stop_requested: break if entry.is_dir(): # For directories, just add them to the list # but don't scan them yet (lazy loading) items.append((entry.path, True)) elif entry.is_file() and entry.name.lower().endswith( tuple(self.allowed_extensions) ): items.append((entry.path, False)) # Store in cache and emit self.cache[path] = items self.items_found.emit(items) self.scanned_directories.add(path) self.directory_scanned.emit(path) self.progress_update.emit(100) # Show complete for this directory return items except PermissionError: print(f"Permission denied: {path}") return [] except OSError as e: print(f"Error accessing {path}: {e}") return [] def scan_single_directory_helper(self, path): """Helper method for parallel directory scanning""" items = [] subdirs = [] try: with os.scandir(path) as entries: for entry in entries: if self.stop_requested: break if entry.is_dir(): items.append((entry.path, True)) subdirs.append(entry.path) elif entry.is_file() and entry.name.lower().endswith( tuple(self.allowed_extensions) ): items.append((entry.path, False)) except (PermissionError, OSError) as e: print(f"Error accessing {path}: {e}") return items, subdirs def request_directory_scan(self, path): """Request a scan of a specific directory (for lazy loading)""" if path in self.scanned_directories: return items = self.scan_single_directory(path) return items def request_full_scan(self): """Request a full recursive scan of all subdirectories""" items = self.scan_directory_recursive(self.path) self.scan_complete.emit() return items def stop(self): self.stop_requested = True # Metadata Extractor class MetadataExtractor(QThread): metadata_extracted = pyqtSignal(dict) extraction_complete = pyqtSignal() progress_update = pyqtSignal(int) def __init__(self, file_list): super().__init__() self.file_list = file_list self.stop_requested = False self.metadata_cache = {} def run(self): total_files = len(self.file_list) processed_files = 0 with concurrent.futures.ThreadPoolExecutor() as executor: futures = [] for file_path in self.file_list: if self.stop_requested: break if file_path in self.metadata_cache: self.metadata_extracted.emit( self.metadata_cache[file_path] ) processed_files += 1 self.progress_update.emit( int(processed_files / total_files * 100) ) else: futures.append( executor.submit(self.extract_metadata, file_path) ) for future in concurrent.futures.as_completed(futures): if self.stop_requested: break try: metadata = future.result() if metadata: self.metadata_extracted.emit(metadata) except Exception as e: print(f"Error extracting metadata: {e}") processed_files += 1 self.progress_update.emit( int(processed_files / total_files * 100) ) self.extraction_complete.emit() def extract_metadata(self, file_path): try: if not os.path.isfile(file_path): return None # Skip non-audio files if not file_path.lower().endswith( (".mp3", ".wav", ".flac", ".m4a", ".wma", ".mid", ".midi") ): return None audio = mutagen.File(file_path) # type: ignore if not audio: return None metadata = { "file_path": file_path, "artist": self._get_tag(audio, "artist", "Unknown Artist"), "album": self._get_tag(audio, "album", "Unknown Album"), "title": self._get_tag( audio, "title", os.path.basename(file_path) ), "genre": self._get_tag(audio, "genre", "Unknown Genre"), "year": self._get_tag(audio, "date", "Unknown Year"), } # Cache the result self.metadata_cache[file_path] = metadata return metadata except Exception as e: print(f"Error processing {file_path}: {e}") return None def _get_tag(self, audio, tag_name, default_value): """Helper method to safely extract tags from audio files""" try: if tag_name in audio: value = audio[tag_name] if isinstance(value, list) and len(value) > 0: return str(value[0]) return str(value) except Exception: pass return default_value def stop(self): self.stop_requested = True # Archive Extractor not fully tested or implemented class ArchiveExtractor(QThread): extraction_progress = pyqtSignal(int) extraction_complete = pyqtSignal(list) # Emits list of extracted files extraction_error = pyqtSignal(str) def __init__(self, archive_path, extraction_dir): super().__init__() self.archive_path = archive_path self.extraction_dir = extraction_dir self.stop_requested = False def run(self): try: extracted_files = [] if self.archive_path.lower().endswith(".zip"): extracted_files = self._extract_zip() elif self.archive_path.lower().endswith(".7z"): extracted_files = self._extract_7z() elif self.archive_path.lower().endswith(".rar"): extracted_files = self._extract_rar() else: self.extraction_error.emit( f"Unsupported archive format: {self.archive_path}" ) return self.extraction_complete.emit(extracted_files) except Exception as e: self.extraction_error.emit(f"Extraction error: {str(e)}") def _extract_zip(self): extracted_files = [] try: with zipfile.ZipFile(self.archive_path, "r") as zip_ref: file_list = zip_ref.namelist() total_files = len(file_list) for i, file in enumerate(file_list): if self.stop_requested: break zip_ref.extract(file, self.extraction_dir) extracted_files.append( os.path.join(self.extraction_dir, file) ) self.extraction_progress.emit( int((i + 1) / total_files * 100) ) except Exception as e: self.extraction_error.emit(f"ZIP extraction error: {str(e)}") return extracted_files def _extract_7z(self): extracted_files = [] try: with py7zr.SevenZipFile(self.archive_path, mode="r") as z: file_list = z.getnames() total_files = len(file_list) for i, file in enumerate(file_list): if self.stop_requested: break z.extract(self.extraction_dir, [file]) extracted_files.append( os.path.join(self.extraction_dir, file) ) self.extraction_progress.emit( int((i + 1) / total_files * 100) ) except Exception as e: self.extraction_error.emit(f"7Z extraction error: {str(e)}") return extracted_files def _extract_rar(self): extracted_files = [] try: with rarfile.RarFile(self.archive_path) as rf: file_list = rf.namelist() total_files = len(file_list) for i, file in enumerate(file_list): if self.stop_requested: break rf.extract(file, self.extraction_dir) extracted_files.append( os.path.join(self.extraction_dir, file) ) self.extraction_progress.emit( int((i + 1) / total_files * 100) ) except Exception as e: self.extraction_error.emit(f"RAR extraction error: {str(e)}") return extracted_files def stop(self): self.stop_requested = True # Main Organizer class class Organizer: def __init__(self): self.file_list = [] self.dir_list = [] self.scanner = None self.metadata_extractor = None self.archive_extractor = None # Metadata organization self.artists = set() self.albums = set() self.genres = set() self.years = set() # Signals for UI updates self.on_scan_complete = None self.on_progress_update = None self.on_metadata_complete = None def start_scan(self, path): """Start scanning a directory for files""" self.file_list.clear() self.dir_list.clear() self.scanner = FileScanner(path) self.scanner.items_found.connect(self.add_items) self.scanner.scan_complete.connect(self.scan_finished) # Connect progress signal if handler exists if self.on_progress_update: self.scanner.progress_update.connect(self.on_progress_update) self.scanner.start() def add_items(self, items): """Process items found during scanning""" for path, is_dir in items: if is_dir: self.dir_list.append(path) else: self.file_list.append(path) def scan_finished(self): """Handle scan completion""" print( f"Scan complete. Found {len(self.dir_list)} directories\ and {len(self.file_list)} files." ) if self.on_scan_complete: self.on_scan_complete() def stop_scan(self): """Stop the current scan operation""" if self.scanner: self.scanner.stop() self.scanner.wait() def extract_metadata(self): """Extract metadata from audio files""" if not self.file_list: print("No files to extract metadata from") return self.metadata_extractor = MetadataExtractor(self.file_list) self.metadata_extractor.metadata_extracted.connect( self.process_metadata ) self.metadata_extractor.extraction_complete.connect( self.metadata_extraction_complete ) # Connect progress signal if handler exists if self.on_progress_update: self.metadata_extractor.progress_update.connect( self.on_progress_update ) self.metadata_extractor.start() def process_metadata(self, metadata): """Process extracted metadata""" if "artist" in metadata and metadata["artist"]: self.artists.add(metadata["artist"]) if "album" in metadata and metadata["album"]: self.albums.add(metadata["album"]) if "genre" in metadata and metadata["genre"]: self.genres.add(metadata["genre"]) if "year" in metadata and metadata["year"]: self.years.add(metadata["year"]) def metadata_extraction_complete(self): """Handle metadata extraction completion""" print( f"Metadata extraction complete. Artists: {len(self.artists)},\ Albums: {len(self.albums)}, Genres: {len(self.genres)},\ Years: {len(self.years)}" ) if self.on_metadata_complete: self.on_metadata_complete() def extract_archives(self): """Extract archives""" if not self.file_list: print("No files to extract archives from") return self.archive_extractor = ArchiveExtractor( self.file_list, extraction_dir=None ) self.archive_extractor.extraction_complete.connect( self.archive_extraction_complete ) # Connect progress signal if handler exists if self.on_progress_update: self.archive_extractor.extraction_progress.connect( self.on_progress_update ) self.archive_extractor.start() def archive_extraction_complete(self): """Handle archive extraction completion""" print("Archive extraction complete.") if self.on_progress_update: self.on_progress_update(100) def stop_extraction(self): """Stop the current extraction operation""" if self.archive_extractor: self.archive_extractor.stop() self.archive_extractor.wait()