Fbrowser/ScanOrg101.py
2025-04-06 20:00:26 -05:00

652 lines
21 KiB
Python

"""
ScanOrg101.py - Enhanced file scanning and organization module
"""
# flake8: noqa: E501
import os
import concurrent.futures
import zipfile
import py7zr
import rarfile # typed: ignore
import mutagen
from PyQt6.QtCore import Qt, QThread, QSortFilterProxyModel, pyqtSignal
# Directory Filter Proxy Model
class DirectoryFilterProxyModel(QSortFilterProxyModel):
def __init__(self):
super().__init__()
self.setFilterCaseSensitivity(Qt.CaseSensitivity.CaseInsensitive)
self.setFilterKeyColumn(0)
def filterAcceptsRow(self, source_row, source_parent):
source_model = self.sourceModel()
if source_model is None:
return False
index = source_model.index(source_row, 0, source_parent)
if hasattr(source_model, "isDir"):
return source_model.isDir(index) # type: ignore
return False
# File Filter Proxy Model
class FileFilterProxyModel(QSortFilterProxyModel):
def __init__(self):
super().__init__()
self.setFilterCaseSensitivity(Qt.CaseSensitivity.CaseInsensitive)
self.setFilterKeyColumn(0)
self.allowed_extensions = [
".zip",
".mp3",
".wav",
".flac",
".mid",
".midi",
".aiff",
".aif",
".aifc",
".au",
".snd",
".wv",
".wma",
".m4a",
".7z",
".rar",
]
def filterAcceptsRow(self, source_row, source_parent):
source_model = self.sourceModel()
if source_model is None:
return False
index = source_model.index(source_row, 0, source_parent)
if hasattr(source_model, "isDir") and source_model.isDir(index): # type: ignore
return True
if hasattr(source_model, "fileName"):
return source_model.fileName(index).endswith( # type: ignore
tuple(self.allowed_extensions)
)
return False
# Enhanced File Scanner with optimizations
class FileScanner(QThread):
items_found = pyqtSignal(list) # Now emits batches of items
scan_complete = pyqtSignal()
progress_update = pyqtSignal(int)
directory_scanned = pyqtSignal(str) # New signal for lazy loading
def __init__(self, path, batch_size=500, max_workers=4):
"""
Initialize the file scanner with performance optimizations
Args:
path: Starting path to scan
batch_size: Number of items to collect before emitting a batch
max_workers: Maximum number of parallel scanning threads
"""
super().__init__()
self.path = path
self.stop_requested = False
self.cache = {}
self.scanned_directories = (
set()
) # Track which directories have been scanned
self.batch_size = batch_size
self.max_workers = max_workers
self.allowed_extensions = {
".mid",
".midi",
".mp3",
".wav",
".ogg",
".flac",
".aac",
".m4a",
".wma",
".flp",
".als",
".logic",
".logicx",
".ptx",
".pts",
".cpr",
".rpp",
".reason",
".sng",
".ardour",
".bwproject",
".zip",
".7z",
".rar",
}
def run(self):
"""Main thread run method - only scans the root path initially"""
# Check cache first
if self.path in self.cache:
self.items_found.emit(self.cache[self.path])
self.scan_complete.emit()
return
# Only scan the top level directory initially (lazy loading)
self.scan_single_directory(self.path)
self.scan_complete.emit()
def scan_directory_recursive(self, path):
"""
Recursively scan a directory - used when explicitly requesting
a full scan of all subdirectories
"""
if path in self.cache:
return self.cache[path]
items = []
batch = []
dirs_to_scan = deque([path]) # type: ignore
# For progress estimation
start_time = time.time() # type: ignore
progress_update_interval = 0.2 # seconds
last_update_time = start_time
entries_processed = 0
# Estimate total number of items
try:
sample_count = len(list(os.scandir(path)))
estimated_total = sample_count * 10 # Simple heuristic
except (PermissionError, OSError):
estimated_total = 1000 # Fallback estimate
with concurrent.futures.ThreadPoolExecutor(
max_workers=self.max_workers
) as executor:
futures = {}
while dirs_to_scan and not self.stop_requested:
# Process directories in parallel
while dirs_to_scan and len(futures) < self.max_workers:
dir_path = dirs_to_scan.popleft()
if dir_path not in self.scanned_directories:
futures[
executor.submit(
self.scan_single_directory_helper, dir_path
)
] = dir_path
# Process completed directories
for future in list(
concurrent.futures.as_completed(futures.keys())
):
dir_path = futures.pop(future)
try:
dir_items, subdirs = future.result()
entries_processed += len(dir_items)
# Add results to our list
items.extend(dir_items)
batch.extend(dir_items)
# Add subdirectories to our queue
dirs_to_scan.extend(subdirs)
# Mark directory as scanned
self.scanned_directories.add(dir_path)
self.directory_scanned.emit(dir_path)
# Emit batch if it's full
if len(batch) >= self.batch_size:
self.items_found.emit(batch)
batch = []
# Update progress periodically
current_time = time.time() # type: ignore
if (
current_time - last_update_time
> progress_update_interval
):
# Simple progress estimation
progress = min(
99,
int(entries_processed / estimated_total * 100),
)
self.progress_update.emit(progress)
last_update_time = current_time
except Exception as e:
print(f"Error scanning directory {dir_path}: {e}")
# Emit any remaining items in the final batch
if batch and not self.stop_requested:
self.items_found.emit(batch)
# Store in cache
self.cache[path] = items
self.progress_update.emit(100) # Final update
return items
def scan_single_directory(self, path):
"""
Scan a single directory without recursion - supports lazy loading
"""
if self.stop_requested:
return []
if path in self.cache:
items = self.cache[path]
self.items_found.emit(items)
return items
try:
items = []
with os.scandir(path) as entries:
for entry in entries:
if self.stop_requested:
break
if entry.is_dir():
# For directories, just add them to the list
# but don't scan them yet (lazy loading)
items.append((entry.path, True))
elif entry.is_file() and entry.name.lower().endswith(
tuple(self.allowed_extensions)
):
items.append((entry.path, False))
# Store in cache and emit
self.cache[path] = items
self.items_found.emit(items)
self.scanned_directories.add(path)
self.directory_scanned.emit(path)
self.progress_update.emit(100) # Show complete for this directory
return items
except PermissionError:
print(f"Permission denied: {path}")
return []
except OSError as e:
print(f"Error accessing {path}: {e}")
return []
def scan_single_directory_helper(self, path):
"""Helper method for parallel directory scanning"""
items = []
subdirs = []
try:
with os.scandir(path) as entries:
for entry in entries:
if self.stop_requested:
break
if entry.is_dir():
items.append((entry.path, True))
subdirs.append(entry.path)
elif entry.is_file() and entry.name.lower().endswith(
tuple(self.allowed_extensions)
):
items.append((entry.path, False))
except (PermissionError, OSError) as e:
print(f"Error accessing {path}: {e}")
return items, subdirs
def request_directory_scan(self, path):
"""Request a scan of a specific directory (for lazy loading)"""
if path in self.scanned_directories:
return
items = self.scan_single_directory(path)
return items
def request_full_scan(self):
"""Request a full recursive scan of all subdirectories"""
items = self.scan_directory_recursive(self.path)
self.scan_complete.emit()
return items
def stop(self):
self.stop_requested = True
# Metadata Extractor
class MetadataExtractor(QThread):
metadata_extracted = pyqtSignal(dict)
extraction_complete = pyqtSignal()
progress_update = pyqtSignal(int)
def __init__(self, file_list):
super().__init__()
self.file_list = file_list
self.stop_requested = False
self.metadata_cache = {}
def run(self):
total_files = len(self.file_list)
processed_files = 0
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = []
for file_path in self.file_list:
if self.stop_requested:
break
if file_path in self.metadata_cache:
self.metadata_extracted.emit(
self.metadata_cache[file_path]
)
processed_files += 1
self.progress_update.emit(
int(processed_files / total_files * 100)
)
else:
futures.append(
executor.submit(self.extract_metadata, file_path)
)
for future in concurrent.futures.as_completed(futures):
if self.stop_requested:
break
try:
metadata = future.result()
if metadata:
self.metadata_extracted.emit(metadata)
except Exception as e:
print(f"Error extracting metadata: {e}")
processed_files += 1
self.progress_update.emit(
int(processed_files / total_files * 100)
)
self.extraction_complete.emit()
def extract_metadata(self, file_path):
try:
if not os.path.isfile(file_path):
return None
# Skip non-audio files
if not file_path.lower().endswith(
(".mp3", ".wav", ".flac", ".m4a", ".wma", ".mid", ".midi")
):
return None
audio = mutagen.File(file_path) # type: ignore
if not audio:
return None
metadata = {
"file_path": file_path,
"artist": self._get_tag(audio, "artist", "Unknown Artist"),
"album": self._get_tag(audio, "album", "Unknown Album"),
"title": self._get_tag(
audio, "title", os.path.basename(file_path)
),
"genre": self._get_tag(audio, "genre", "Unknown Genre"),
"year": self._get_tag(audio, "date", "Unknown Year"),
}
# Cache the result
self.metadata_cache[file_path] = metadata
return metadata
except Exception as e:
print(f"Error processing {file_path}: {e}")
return None
def _get_tag(self, audio, tag_name, default_value):
"""Helper method to safely extract tags from audio files"""
try:
if tag_name in audio:
value = audio[tag_name]
if isinstance(value, list) and len(value) > 0:
return str(value[0])
return str(value)
except Exception:
pass
return default_value
def stop(self):
self.stop_requested = True
# Archive Extractor not fully tested or implemented
class ArchiveExtractor(QThread):
extraction_progress = pyqtSignal(int)
extraction_complete = pyqtSignal(list) # Emits list of extracted files
extraction_error = pyqtSignal(str)
def __init__(self, archive_path, extraction_dir):
super().__init__()
self.archive_path = archive_path
self.extraction_dir = extraction_dir
self.stop_requested = False
def run(self):
try:
extracted_files = []
if self.archive_path.lower().endswith(".zip"):
extracted_files = self._extract_zip()
elif self.archive_path.lower().endswith(".7z"):
extracted_files = self._extract_7z()
elif self.archive_path.lower().endswith(".rar"):
extracted_files = self._extract_rar()
else:
self.extraction_error.emit(
f"Unsupported archive format: {self.archive_path}"
)
return
self.extraction_complete.emit(extracted_files)
except Exception as e:
self.extraction_error.emit(f"Extraction error: {str(e)}")
def _extract_zip(self):
extracted_files = []
try:
with zipfile.ZipFile(self.archive_path, "r") as zip_ref:
file_list = zip_ref.namelist()
total_files = len(file_list)
for i, file in enumerate(file_list):
if self.stop_requested:
break
zip_ref.extract(file, self.extraction_dir)
extracted_files.append(
os.path.join(self.extraction_dir, file)
)
self.extraction_progress.emit(
int((i + 1) / total_files * 100)
)
except Exception as e:
self.extraction_error.emit(f"ZIP extraction error: {str(e)}")
return extracted_files
def _extract_7z(self):
extracted_files = []
try:
with py7zr.SevenZipFile(self.archive_path, mode="r") as z:
file_list = z.getnames()
total_files = len(file_list)
for i, file in enumerate(file_list):
if self.stop_requested:
break
z.extract(self.extraction_dir, [file])
extracted_files.append(
os.path.join(self.extraction_dir, file)
)
self.extraction_progress.emit(
int((i + 1) / total_files * 100)
)
except Exception as e:
self.extraction_error.emit(f"7Z extraction error: {str(e)}")
return extracted_files
def _extract_rar(self):
extracted_files = []
try:
with rarfile.RarFile(self.archive_path) as rf:
file_list = rf.namelist()
total_files = len(file_list)
for i, file in enumerate(file_list):
if self.stop_requested:
break
rf.extract(file, self.extraction_dir)
extracted_files.append(
os.path.join(self.extraction_dir, file)
)
self.extraction_progress.emit(
int((i + 1) / total_files * 100)
)
except Exception as e:
self.extraction_error.emit(f"RAR extraction error: {str(e)}")
return extracted_files
def stop(self):
self.stop_requested = True
# Main Organizer class
class Organizer:
def __init__(self):
self.file_list = []
self.dir_list = []
self.scanner = None
self.metadata_extractor = None
self.archive_extractor = None
# Metadata organization
self.artists = set()
self.albums = set()
self.genres = set()
self.years = set()
# Signals for UI updates
self.on_scan_complete = None
self.on_progress_update = None
self.on_metadata_complete = None
def start_scan(self, path):
"""Start scanning a directory for files"""
self.file_list.clear()
self.dir_list.clear()
self.scanner = FileScanner(path)
self.scanner.items_found.connect(self.add_items)
self.scanner.scan_complete.connect(self.scan_finished)
# Connect progress signal if handler exists
if self.on_progress_update:
self.scanner.progress_update.connect(self.on_progress_update)
self.scanner.start()
def add_items(self, items):
"""Process items found during scanning"""
for path, is_dir in items:
if is_dir:
self.dir_list.append(path)
else:
self.file_list.append(path)
def scan_finished(self):
"""Handle scan completion"""
print(
f"Scan complete. Found {len(self.dir_list)} directories\
and {len(self.file_list)} files."
)
if self.on_scan_complete:
self.on_scan_complete()
def stop_scan(self):
"""Stop the current scan operation"""
if self.scanner:
self.scanner.stop()
self.scanner.wait()
def extract_metadata(self):
"""Extract metadata from audio files"""
if not self.file_list:
print("No files to extract metadata from")
return
self.metadata_extractor = MetadataExtractor(self.file_list)
self.metadata_extractor.metadata_extracted.connect(
self.process_metadata
)
self.metadata_extractor.extraction_complete.connect(
self.metadata_extraction_complete
)
# Connect progress signal if handler exists
if self.on_progress_update:
self.metadata_extractor.progress_update.connect(
self.on_progress_update
)
self.metadata_extractor.start()
def process_metadata(self, metadata):
"""Process extracted metadata"""
if "artist" in metadata and metadata["artist"]:
self.artists.add(metadata["artist"])
if "album" in metadata and metadata["album"]:
self.albums.add(metadata["album"])
if "genre" in metadata and metadata["genre"]:
self.genres.add(metadata["genre"])
if "year" in metadata and metadata["year"]:
self.years.add(metadata["year"])
def metadata_extraction_complete(self):
"""Handle metadata extraction completion"""
print(
f"Metadata extraction complete. Artists: {len(self.artists)},\
Albums: {len(self.albums)}, Genres: {len(self.genres)},\
Years: {len(self.years)}"
)
if self.on_metadata_complete:
self.on_metadata_complete()
def extract_archives(self):
"""Extract archives"""
if not self.file_list:
print("No files to extract archives from")
return
self.archive_extractor = ArchiveExtractor(
self.file_list, extraction_dir=None
)
self.archive_extractor.extraction_complete.connect(
self.archive_extraction_complete
)
# Connect progress signal if handler exists
if self.on_progress_update:
self.archive_extractor.extraction_progress.connect(
self.on_progress_update
)
self.archive_extractor.start()
def archive_extraction_complete(self):
"""Handle archive extraction completion"""
print("Archive extraction complete.")
if self.on_progress_update:
self.on_progress_update(100)
def stop_extraction(self):
"""Stop the current extraction operation"""
if self.archive_extractor:
self.archive_extractor.stop()
self.archive_extractor.wait()