#Path: ScanOrg.py # Description: A class to scan and organize music files import concurrent.futures import threading import queue import zipfile import py7zr import rarfile import os import mutagen from PyQt6.QtCore import Qt, QSortFilterProxyModel, pyqtSignal # Directory Filter Proxy Model class DirectoryFilterProxyModel(QSortFilterProxyModel): def __init__(self): super().__init__() self.setFilterCaseSensitivity(Qt.CaseSensitivity.CaseInsensitive) self.setFilterKeyColumn(0) def filterAcceptsRow(self, source_row, source_parent): index = self.sourceModel().index(source_row, 0, source_parent) return self.sourceModel().isDir(index) # File Filter Proxy Model class FileFilterProxyModel(QSortFilterProxyModel): def __init__(self): super().__init__() self.setFilterCaseSensitivity(Qt.CaseSensitivity.CaseInsensitive) self.setFilterKeyColumn(0) self.allowed_extensions = ['.zip', '.mp3', '.wav', '.flac', '.mid', '.midi', '.aiff', '.aif', '.aifc', '.au', '.snd', '.wv', '.wma', '.m4a'] def filterAcceptsRow(self, source_row, source_parent): index = self.sourceModel().index(source_row, 0, source_parent) if self.sourceModel().isDir(index): return True else: return self.sourceModel().fileName(index).endswith(tuple(self.allowed_extensions)) # File Scan and Organize class file_scanner: def __init__(self): self.file_list = [] self.cache = {} def scan(self, path): def background_scan(self, path): if path in self.cache: return self.cache[path] file_list = [] dirs_queue = queue.Queue() dirs_queue.put(path) while not dirs_queue.empty(): current_path = dirs_queue.get() try: for root, dirs, files in os.walk(current_path): for dir in dirs: dirs_queue.put(os.path.join(root, dir)) for file in files: if file.endswith(('.mp3', '.wav', '.flac', '.mid', '.midi', '.aiff', '.aif', '.aifc', '.au', '.snd', '.wv', '.wma', '.m4a')): file_list.append(os.path.join(root, file)) self.cache[current_path] = file_list except (IOError, PermissionError, FileNotFoundError, OSError) as e: print(f"Error Scanning Files: {e}") return file_list file_list = [] thread = threading.Thread(target=background_scan, args=(path, file_list)) thread.start() return file_list def get_file_list(self): return self.file_list def clear_file_list(self): self.file_list = [] class extractor: def zipviewer(self, index, file_filter_model, list_model, extraction_directory): if index.isValid() and extraction_directory is not None: index = file_filter_model.mapToSource(index) file_path = list_model.filePath(index) try: if file_path.endswith(('.zip', '.rar', '.7z')): with zipfile.ZipFile(file_path, 'r') as zip_ref: for filename in zip_ref.namelist(): destination = os.path.join(extraction_directory, filename) zip_ref.extract(filename, extraction_directory) except (zipfile.BadZipFile, OSError, zipfile.LargeZipFile, zipfile.LargeZipFile) as e: print(f"Extraction Error: {e}") return class organizer: global metadata_queue metadata_queue = queue.Queue() def __init__(self): self.file_list = [] self.artist_list = [] self.album_list = [] self.genre_list = [] self.year_list = [] self.file_scanner = file_scanner() self.file_info_cache = {} def scan(self, path): if path in self.file_scanner.cache: self.file_list = self.file_scanner.cache[path] else: self.file_list = self.file_scanner.scan(path) def get_file_list(self): return self.file_list def clear_file_list(self): self.file_list = [] def get_artist_list(self): return self.artist_list def get_album_list(self): return self.album_list def get_genre_list(self): return self.genre_list def get_year_list(self): return self.year_list def clear_artist_list(self): self.artist_list = [] def clear_album_list(self): self.album_list = [] def clear_genre_list(self): self.genre_list = [] def clear_year_list(self): self.year_list = [] def organize(self): results_queue = queue.Queue() metadata = pyqtSignal(dict) with concurrent.futures.ThreadPoolExecutor() as executor: futures = [] for file in self.file_list: futures.append(executor.submit(self.get_file_info, file, results_queue)) for future in concurrent.futures.as_completed(futures): try: metadata = future.result() if metadata['artist'] not in self.artist_list: self.artist_list.append(metadata['artist']) if metadata['album'] not in self.album_list: self.album_list.append(metadata['album']) if metadata['genre'] not in self.genre_list: self.genre_list.append(metadata['genre']) if metadata['year'] not in self.year_list: self.year_list.append(metadata['year']) except mutagen.mp3.HeaderNotFoundError: print('Error: ' + file) continue while not metadata_queue.put(metadata): pass def get_file_info(self, file, results_queue): try: audio = mutagen.File(file) artist = audio['artist'][0] album = audio['album'][0] genre = audio['genre'][0] year = audio['date'][0] if artist not in self.artist_list: self.artist_list.append(artist) if album not in self.album_list: self.album_list.append(album) if genre not in self.genre_list: self.genre_list.append(genre) if year not in self.year_list: self.year_list.append(year) metadata = { 'artist': artist, 'album': album, 'genre': genre, 'year': year } self.metadata_extracted.emit(metadata) except Exception as e: results_queue.put(None) print('Error: ' + file) if os.path.splitext(file)[1] == ('.mp3', '.wav', '.flac', '.m4a', '.wma', 'mid', '.midi'): self.organize_audio() audio = mutagen.File(file)