import os import concurrent.futures import mutagen from datetime import datetime from PyQt6.QtCore import QThread, pyqtSignal # Metadata Extractor class MetadataExtractor(QThread): metadata_extracted = pyqtSignal(dict) extraction_complete = pyqtSignal() progress_update = pyqtSignal(int) def __init__(self, file_list): super().__init__() self.file_list = file_list self.stop_requested = False self.metadata_cache = {} self.on_metadata_complete = None # Metadata organization self.artists = set() self.albums = set() self.genres = set() self.years = set() def run(self): total_files = len(self.file_list) processed_files = 0 with concurrent.futures.ThreadPoolExecutor() as executor: futures = [] for file_path in self.file_list: if self.stop_requested: break if file_path in self.metadata_cache: self.metadata_extracted.emit( self.metadata_cache[file_path] ) processed_files += 1 self.progress_update.emit( int(processed_files / total_files * 100) ) else: futures.append( executor.submit(self.extract_metadata, file_path) ) for future in concurrent.futures.as_completed(futures): if self.stop_requested: break try: metadata = future.result() if metadata: self.metadata_extracted.emit(metadata) except Exception as e: print(f"Error extracting metadata: {e}") processed_files += 1 self.progress_update.emit( int(processed_files / total_files * 100) ) self.extraction_complete.emit() def extract_metadata(self, file_path): try: if not os.path.isfile(file_path): return None # Skip non-audio files if not file_path.lower().endswith( (".mp3", ".wav", ".flac", ".m4a", ".wma", ".mid", ".midi") ): return None print(f"Extracting metadata for {file_path}") audio = mutagen.File(file_path) # type: ignore if not audio: print(f"No metadata found for {file_path}") return None # Get file size in MB file_size_mb = os.path.getsize(file_path) / (1024 * 1024) metadata = { "file_path": file_path, "artist": self._get_tag(audio, "artist", "Unknown Artist"), "album": self._get_tag(audio, "album", "Unknown Album"), "title": self._get_tag( audio, "title", os.path.basename(file_path) ), "genre": self._get_tag(audio, "genre", "Unknown Genre"), "year": self._get_tag(audio, "date", "Unknown Year"), "size_mb": f"{file_size_mb:.2f}", "filename": os.path.basename(file_path), "extension": os.path.splitext(file_path)[1].lower(), "last_modified": str( datetime.fromtimestamp(os.path.getmtime(file_path)) ), "extracted_at": str(datetime.now()), } print(f"Extracted metadata: {metadata}") # Cache the result self.metadata_cache[file_path] = metadata return metadata except Exception as e: print(f"Error processing {file_path}: {e}") import traceback traceback.print_exc() return None def _get_tag(self, audio, tag_name, default_value): """Helper method to safely extract tags from audio files""" try: if tag_name in audio: value = audio[tag_name] if isinstance(value, list) and len(value) > 0: return str(value[0]) return str(value) except Exception: pass return default_value def stop(self): self.stop_requested = True def metadata_extraction_complete(self): """Handle metadata extraction completion""" print( f"Metadata extraction complete. Artists: {len(self.artists)},\ Albums: {len(self.albums)}, Genres: {len(self.genres)},\ Years: {len(self.years)}" ) if self.on_metadata_complete: self.on_metadata_complete()