import os import logging import concurrent.futures import mutagen from datetime import datetime from PyQt6.QtCore import QThread, pyqtSignal # Get the logger logger = logging.getLogger("fbroswer") # Metadata Extractor class MetadataExtractor(QThread): metadata_extracted = pyqtSignal(dict) extraction_complete = pyqtSignal() progress_update = pyqtSignal(int) def __init__(self, file_list): super().__init__() self.file_list = file_list self.stop_requested = False self.metadata_cache = {} self.on_metadata_complete = None # Metadata organization self.artists = set() self.albums = set() self.genres = set() self.years = set() logger.debug( f"MetadataExtractor initialized with {len(file_list)} files" ) def run(self): total_files = len(self.file_list) processed_files = 0 logger.info(f"Starting metadata extraction for {total_files} files") with concurrent.futures.ThreadPoolExecutor() as executor: futures = [] for file_path in self.file_list: if self.stop_requested: logger.debug("Metadata extraction stopped by user request") break if file_path in self.metadata_cache: logger.debug(f"Using cached metadata for {file_path}") self.metadata_extracted.emit( self.metadata_cache[file_path] ) processed_files += 1 self.progress_update.emit( int(processed_files / total_files * 100) ) else: futures.append( executor.submit(self.extract_metadata, file_path) ) for future in concurrent.futures.as_completed(futures): if self.stop_requested: logger.debug( "Metadata extraction stopped during processing" ) break try: metadata = future.result() if metadata: self.metadata_extracted.emit(metadata) except Exception as e: logger.error( f"Error extracting metadata: {e}", exc_info=True ) processed_files += 1 progress = int(processed_files / total_files * 100) logger.debug(f"Metadata extraction progress: {progress}%") self.progress_update.emit(progress) logger.info("Metadata extraction complete") self.extraction_complete.emit() def extract_metadata(self, file_path): try: if not os.path.isfile(file_path): logger.warning(f"File does not exist: {file_path}") return None # Skip non-audio files if not file_path.lower().endswith( (".mp3", ".wav", ".flac", ".m4a", ".wma", ".mid", ".midi") ): logger.debug(f"Skipping non-audio file: {file_path}") return None logger.info(f"Extracting metadata for {file_path}") # Add additional error handling for mutagen try: audio = mutagen.File(file_path) # type: ignore except (OSError, IOError) as e: logger.error(f"Mutagen error reading file {file_path}: {e}") # Return basic metadata without audio tags file_size_mb = os.path.getsize(file_path) / (1024 * 1024) return { "file_path": file_path, "artist": "Unknown Artist", "album": "Unknown Album", "title": os.path.basename(file_path), "genre": "Unknown Genre", "year": "Unknown Year", "size_mb": f"{file_size_mb:.2f}", "filename": os.path.basename(file_path), "extension": os.path.splitext(file_path)[1].lower(), "last_modified": str( datetime.fromtimestamp(os.path.getmtime(file_path)) ), "extracted_at": str(datetime.now()), "error": str(e), } if not audio: logger.warning(f"No metadata found for {file_path}") return None # Get file size in MB file_size_mb = os.path.getsize(file_path) / (1024 * 1024) metadata = { "file_path": file_path, "artist": self._get_tag(audio, "artist", "Unknown Artist"), "album": self._get_tag(audio, "album", "Unknown Album"), "title": self._get_tag( audio, "title", os.path.basename(file_path) ), "genre": self._get_tag(audio, "genre", "Unknown Genre"), "year": self._get_tag(audio, "date", "Unknown Year"), "size_mb": f"{file_size_mb:.2f}", "filename": os.path.basename(file_path), "extension": os.path.splitext(file_path)[1].lower(), "last_modified": str( datetime.fromtimestamp(os.path.getmtime(file_path)) ), "extracted_at": str(datetime.now()), } logger.debug(f"Extracted metadata: {metadata}") # Cache the result self.metadata_cache[file_path] = metadata return metadata except Exception as e: logger.error(f"Error processing {file_path}: {e}", exc_info=True) import traceback traceback.print_exc() return None def _get_tag(self, audio, tag_name, default_value): """Helper method to safely extract tags from audio files""" try: if tag_name in audio: value = audio[tag_name] if isinstance(value, list) and len(value) > 0: return str(value[0]) return str(value) except Exception as e: logger.debug(f"Error extracting tag {tag_name}: {e}") return default_value def stop(self): logger.info("Stopping metadata extraction") self.stop_requested = True def metadata_extraction_complete(self): """Handle metadata extraction completion""" logger.info( f"Metadata extraction complete. Artists: {len(self.artists)}, " f"Albums: {len(self.albums)}, Genres: {len(self.genres)}, " f"Years: {len(self.years)}" ) if self.on_metadata_complete: self.on_metadata_complete()