Fbrowser/python-src/ScanOrg.py
stan44 565be4e1e7 Migrate Fbrowser to Rust and Tauri desktop app
- Replace the Python/Docker setup with a Rust workspace and Tauri frontend
- Add core crates for archive, audio, MIDI, plugin, and desktop UI layers
- Refresh the app scaffolding, build config, and documentation
2026-03-30 16:18:26 -05:00

187 lines
6.9 KiB
Python

#Path: ScanOrg.py
# Description: A class to scan and organize music files
import concurrent.futures
import threading
import queue
import zipfile
import py7zr
import rarfile
import os
import mutagen
from PyQt6.QtCore import Qt, QSortFilterProxyModel, pyqtSignal
# Directory Filter Proxy Model
class DirectoryFilterProxyModel(QSortFilterProxyModel):
def __init__(self):
super().__init__()
self.setFilterCaseSensitivity(Qt.CaseSensitivity.CaseInsensitive)
self.setFilterKeyColumn(0)
def filterAcceptsRow(self, source_row, source_parent):
index = self.sourceModel().index(source_row, 0, source_parent)
return self.sourceModel().isDir(index)
# File Filter Proxy Model
class FileFilterProxyModel(QSortFilterProxyModel):
def __init__(self):
super().__init__()
self.setFilterCaseSensitivity(Qt.CaseSensitivity.CaseInsensitive)
self.setFilterKeyColumn(0)
self.allowed_extensions = ['.zip', '.mp3', '.wav', '.flac', '.mid', '.midi', '.aiff', '.aif', '.aifc', '.au', '.snd', '.wv', '.wma', '.m4a']
def filterAcceptsRow(self, source_row, source_parent):
index = self.sourceModel().index(source_row, 0, source_parent)
if self.sourceModel().isDir(index):
return True
else:
return self.sourceModel().fileName(index).endswith(tuple(self.allowed_extensions))
# File Scan and Organize
class file_scanner:
def __init__(self):
self.file_list = []
self.cache = {}
def scan(self, path):
def background_scan(self, path):
if path in self.cache:
return self.cache[path]
file_list = []
dirs_queue = queue.Queue()
dirs_queue.put(path)
while not dirs_queue.empty():
current_path = dirs_queue.get()
try:
for root, dirs, files in os.walk(current_path):
for dir in dirs:
dirs_queue.put(os.path.join(root, dir))
for file in files:
if file.endswith(('.mp3', '.wav', '.flac', '.mid', '.midi', '.aiff', '.aif', '.aifc', '.au', '.snd', '.wv', '.wma', '.m4a')):
file_list.append(os.path.join(root, file))
self.cache[current_path] = file_list
except (IOError, PermissionError, FileNotFoundError, OSError) as e:
print(f"Error Scanning Files: {e}")
return file_list
file_list = []
thread = threading.Thread(target=background_scan, args=(path, file_list))
thread.start()
return file_list
def get_file_list(self):
return self.file_list
def clear_file_list(self):
self.file_list = []
class extractor:
def zipviewer(self, index, file_filter_model, list_model, extraction_directory):
if index.isValid() and extraction_directory is not None:
index = file_filter_model.mapToSource(index)
file_path = list_model.filePath(index)
try:
if file_path.endswith(('.zip', '.rar', '.7z')):
with zipfile.ZipFile(file_path, 'r') as zip_ref:
for filename in zip_ref.namelist():
destination = os.path.join(extraction_directory, filename)
zip_ref.extract(filename, extraction_directory)
except (zipfile.BadZipFile, OSError, zipfile.LargeZipFile, zipfile.LargeZipFile) as e:
print(f"Extraction Error: {e}")
return
class organizer:
global metadata_queue
metadata_queue = queue.Queue()
def __init__(self):
self.file_list = []
self.artist_list = []
self.album_list = []
self.genre_list = []
self.year_list = []
self.file_scanner = file_scanner()
self.file_info_cache = {}
def scan(self, path):
if path in self.file_scanner.cache:
self.file_list = self.file_scanner.cache[path]
else:
self.file_list = self.file_scanner.scan(path)
def get_file_list(self):
return self.file_list
def clear_file_list(self):
self.file_list = []
def get_artist_list(self):
return self.artist_list
def get_album_list(self):
return self.album_list
def get_genre_list(self):
return self.genre_list
def get_year_list(self):
return self.year_list
def clear_artist_list(self):
self.artist_list = []
def clear_album_list(self):
self.album_list = []
def clear_genre_list(self):
self.genre_list = []
def clear_year_list(self):
self.year_list = []
def organize(self):
results_queue = queue.Queue()
metadata = pyqtSignal(dict)
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = []
for file in self.file_list:
futures.append(executor.submit(self.get_file_info, file, results_queue))
for future in concurrent.futures.as_completed(futures):
try:
metadata = future.result()
if metadata['artist'] not in self.artist_list:
self.artist_list.append(metadata['artist'])
if metadata['album'] not in self.album_list:
self.album_list.append(metadata['album'])
if metadata['genre'] not in self.genre_list:
self.genre_list.append(metadata['genre'])
if metadata['year'] not in self.year_list:
self.year_list.append(metadata['year'])
except mutagen.mp3.HeaderNotFoundError:
print('Error: ' + file)
continue
while not metadata_queue.put(metadata):
pass
def get_file_info(self, file, results_queue):
try:
audio = mutagen.File(file)
artist = audio['artist'][0]
album = audio['album'][0]
genre = audio['genre'][0]
year = audio['date'][0]
if artist not in self.artist_list:
self.artist_list.append(artist)
if album not in self.album_list:
self.album_list.append(album)
if genre not in self.genre_list:
self.genre_list.append(genre)
if year not in self.year_list:
self.year_list.append(year)
metadata = {
'artist': artist,
'album': album,
'genre': genre,
'year': year
}
self.metadata_extracted.emit(metadata)
except Exception as e:
results_queue.put(None)
print('Error: ' + file)
if os.path.splitext(file)[1] == ('.mp3', '.wav', '.flac', '.m4a', '.wma', 'mid', '.midi'):
self.organize_audio()
audio = mutagen.File(file)