#Path: ScanOrg.py # Description: A class to scan and organize music files import concurrent.futures import threading import queue import zipfile import py7zr import rarfile import os import mutagen from PyQt5.QtCore import Qt, QSortFilterProxyModel, QAbstractTableModel, QModelIndex, QVariant, QAbstractItemModel, QFileInfo, QDir, QMimeDatabase, QMimeData, QUrl, QItemSelectionModel, QItemSelection, QItemSelectionRange, QObject, QThread, QTimer, QEventLoop, QCoreApplication, QUrl, pyqtSignal # Directory Filter Proxy Model class DirectoryFilterProxyModel(QSortFilterProxyModel): def __init__(self): super().__init__() self.setFilterCaseSensitivity(Qt.CaseInsensitive) self.setFilterKeyColumn(0) def filterAcceptsRow(self, source_row, source_parent): index = self.sourceModel().index(source_row, 0, source_parent) return self.sourceModel().isDir(index) # File Filter Proxy Model class FileFilterProxyModel(QSortFilterProxyModel): def __init__(self): super().__init__() self.setFilterCaseSensitivity(Qt.CaseInsensitive) self.setFilterKeyColumn(0) self.allowed_extensions = ['.zip', '.mp3', '.wav', '.flac', '.mid', '.midi', '.aiff', '.aif', '.aifc', '.au', '.snd', '.wv', '.wma', '.m4a'] def filterAcceptsRow(self, source_row, source_parent): index = self.sourceModel().index(source_row, 0, source_parent) if self.sourceModel().isDir(index): return True else: return self.sourceModel().fileName(index).endswith(tuple(self.allowed_extensions)) # File Scan and Organize class file_scanner: def __init__(self): self.file_list = [] self.cache = {} def scan(self, path): def background_scan(self, path): if path in self.cache: return self.cache[path] file_list = [] dirs_queue = queue.Queue() dirs_queue.put(path) while not dirs_queue.empty(): current_path = dirs_queue.get() try: for root, dirs, files in os.walk(current_path): for dir in dirs: dirs_queue.put(os.path.join(root, dir)) for file in files: if file.endswith(('.mp3', '.wav', '.flac', '.mid', '.midi', '.aiff', '.aif', '.aifc', '.au', '.snd', '.wv', '.wma', '.m4a')): file_list.append(os.path.join(root, file)) self.cache[current_path] = file_list except (IOError, PermissionError, FileNotFoundError, OSError) as e: print(f"Error Scanning Files: {e}") return file_list file_list = [] thread = threading.Thread(target=background_scan, args=(path, file_list)) thread.start() return file_list def get_file_list(self): return self.file_list def clear_file_list(self): self.file_list = [] class Extractor: def zipviewer(self, index, file_filter_model, list_model, extraction_directory): if index.isValid() and extraction_directory is not None: index = file_filter_model.mapToSource(index) file_path = list_model.filePath(index) try: if file_path.endswith(".zip"): with zipfile.ZipFile(file_path, 'r') as zip_ref: self._extract_files(zip_ref, extraction_directory) elif file_path.endswith(".rar"): with rarfile.RarFile(file_path, 'r') as rar_ref: self._extract_files(rar_ref, extraction_directory) elif file_path.endswith(".7z"): with py7zr.SevenZipFile(file_path, 'r') as sevenzip_ref: self._extract_files(sevenzip_ref, extraction_directory) else: print(f"Unsupported file format: {file_path}") except (zipfile.BadZipFile, zipfile.LargeZipFile) as e: print(f"ZIP Extraction Error: {e}") except (rarfile.RarFileException, rarfile.NotRARFile) as e: print(f"RAR Extraction Error: {e}") except py7zr.exceptions.SevenZipException as e: print(f"7z Extraction Error: {e}") except OSError as e: print(f"Extraction Error: {e}") def _extract_files(self, archive_ref, extraction_directory): for filename in archive_ref.namelist(): destination = os.path.join(extraction_directory, filename) if os.path.isfile(destination): print(f"File already exists: {destination}") else: os.makedirs(os.path.dirname(destination), exist_ok=True) with open(destination, 'wb') as file: file.write(archive_ref.read(filename)) print(f"Extracted: {filename}") class organizer: global metadata_queue metadata_queue = queue.Queue() def __init__(self): self.file_list = [] self.artist_list = [] self.album_list = [] self.genre_list = [] self.year_list = [] self.file_scanner = file_scanner() self.file_info_cache = {} def scan(self, path): if path in self.file_scanner.cache: self.file_list = self.file_scanner.cache[path] else: self.file_list = self.file_scanner.scan(path) def get_file_list(self): return self.file_list def clear_file_list(self): self.file_list = [] def get_artist_list(self): return self.artist_list def get_album_list(self): return self.album_list def get_genre_list(self): return self.genre_list def get_year_list(self): return self.year_list def clear_artist_list(self): self.artist_list = [] def clear_album_list(self): self.album_list = [] def clear_genre_list(self): self.genre_list = [] def clear_year_list(self): self.year_list = [] def organize(self): results_queue = queue.Queue() metadata = pyqtSignal(dict) with concurrent.futures.ThreadPoolExecutor() as executor: futures = [] for file in self.file_list: futures.append(executor.submit(self.get_file_info, file, results_queue)) for future in concurrent.futures.as_completed(futures): try: metadata = future.result() if metadata['artist'] not in self.artist_list: self.artist_list.append(metadata['artist']) if metadata['album'] not in self.album_list: self.album_list.append(metadata['album']) if metadata['genre'] not in self.genre_list: self.genre_list.append(metadata['genre']) if metadata['year'] not in self.year_list: self.year_list.append(metadata['year']) except mutagen.mp3.HeaderNotFoundError: print('Error: ' + file) continue while not metadata_queue.put(metadata): pass def get_file_info(self, file, results_queue): try: audio = mutagen.File(file) artist = audio['artist'][0] album = audio['album'][0] genre = audio['genre'][0] year = audio['date'][0] if artist not in self.artist_list: self.artist_list.append(artist) if album not in self.album_list: self.album_list.append(album) if genre not in self.genre_list: self.genre_list.append(genre) if year not in self.year_list: self.year_list.append(year) metadata = { 'artist': artist, 'album': album, 'genre': genre, 'year': year } self.metadata_extracted.emit(metadata) except Exception as e: results_queue.put(None) print('Error: ' + file) if os.path.splitext(file)[1] == ('.mp3', '.wav', '.flac', '.m4a', '.wma', 'mid', '.midi'): self.organize_audio() audio = mutagen.File(file)