652 lines
21 KiB
Python
652 lines
21 KiB
Python
"""
|
|
ScanOrg101.py - Enhanced file scanning and organization module
|
|
"""
|
|
|
|
# flake8: noqa: E501
|
|
import os
|
|
import concurrent.futures
|
|
import zipfile
|
|
import py7zr
|
|
import rarfile # typed: ignore
|
|
import mutagen
|
|
from PyQt6.QtCore import Qt, QThread, QSortFilterProxyModel, pyqtSignal
|
|
|
|
|
|
# Directory Filter Proxy Model
|
|
class DirectoryFilterProxyModel(QSortFilterProxyModel):
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.setFilterCaseSensitivity(Qt.CaseSensitivity.CaseInsensitive)
|
|
self.setFilterKeyColumn(0)
|
|
|
|
def filterAcceptsRow(self, source_row, source_parent):
|
|
source_model = self.sourceModel()
|
|
if source_model is None:
|
|
return False
|
|
|
|
index = source_model.index(source_row, 0, source_parent)
|
|
|
|
if hasattr(source_model, "isDir"):
|
|
return source_model.isDir(index) # type: ignore
|
|
return False
|
|
|
|
|
|
# File Filter Proxy Model
|
|
class FileFilterProxyModel(QSortFilterProxyModel):
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.setFilterCaseSensitivity(Qt.CaseSensitivity.CaseInsensitive)
|
|
self.setFilterKeyColumn(0)
|
|
self.allowed_extensions = [
|
|
".zip",
|
|
".mp3",
|
|
".wav",
|
|
".flac",
|
|
".mid",
|
|
".midi",
|
|
".aiff",
|
|
".aif",
|
|
".aifc",
|
|
".au",
|
|
".snd",
|
|
".wv",
|
|
".wma",
|
|
".m4a",
|
|
".7z",
|
|
".rar",
|
|
]
|
|
|
|
def filterAcceptsRow(self, source_row, source_parent):
|
|
source_model = self.sourceModel()
|
|
if source_model is None:
|
|
return False
|
|
|
|
index = source_model.index(source_row, 0, source_parent)
|
|
|
|
if hasattr(source_model, "isDir") and source_model.isDir(index): # type: ignore
|
|
return True
|
|
|
|
if hasattr(source_model, "fileName"):
|
|
return source_model.fileName(index).endswith( # type: ignore
|
|
tuple(self.allowed_extensions)
|
|
)
|
|
|
|
return False
|
|
|
|
|
|
# Enhanced File Scanner with optimizations
|
|
class FileScanner(QThread):
|
|
items_found = pyqtSignal(list) # Now emits batches of items
|
|
scan_complete = pyqtSignal()
|
|
progress_update = pyqtSignal(int)
|
|
directory_scanned = pyqtSignal(str) # New signal for lazy loading
|
|
|
|
def __init__(self, path, batch_size=500, max_workers=4):
|
|
"""
|
|
Initialize the file scanner with performance optimizations
|
|
|
|
Args:
|
|
path: Starting path to scan
|
|
batch_size: Number of items to collect before emitting a batch
|
|
max_workers: Maximum number of parallel scanning threads
|
|
"""
|
|
super().__init__()
|
|
self.path = path
|
|
self.stop_requested = False
|
|
self.cache = {}
|
|
self.scanned_directories = (
|
|
set()
|
|
) # Track which directories have been scanned
|
|
self.batch_size = batch_size
|
|
self.max_workers = max_workers
|
|
self.allowed_extensions = {
|
|
".mid",
|
|
".midi",
|
|
".mp3",
|
|
".wav",
|
|
".ogg",
|
|
".flac",
|
|
".aac",
|
|
".m4a",
|
|
".wma",
|
|
".flp",
|
|
".als",
|
|
".logic",
|
|
".logicx",
|
|
".ptx",
|
|
".pts",
|
|
".cpr",
|
|
".rpp",
|
|
".reason",
|
|
".sng",
|
|
".ardour",
|
|
".bwproject",
|
|
".zip",
|
|
".7z",
|
|
".rar",
|
|
}
|
|
|
|
def run(self):
|
|
"""Main thread run method - only scans the root path initially"""
|
|
# Check cache first
|
|
if self.path in self.cache:
|
|
self.items_found.emit(self.cache[self.path])
|
|
self.scan_complete.emit()
|
|
return
|
|
|
|
# Only scan the top level directory initially (lazy loading)
|
|
self.scan_single_directory(self.path)
|
|
self.scan_complete.emit()
|
|
|
|
def scan_directory_recursive(self, path):
|
|
"""
|
|
Recursively scan a directory - used when explicitly requesting
|
|
a full scan of all subdirectories
|
|
"""
|
|
if path in self.cache:
|
|
return self.cache[path]
|
|
|
|
items = []
|
|
batch = []
|
|
dirs_to_scan = deque([path]) # type: ignore
|
|
|
|
# For progress estimation
|
|
start_time = time.time() # type: ignore
|
|
progress_update_interval = 0.2 # seconds
|
|
last_update_time = start_time
|
|
entries_processed = 0
|
|
|
|
# Estimate total number of items
|
|
try:
|
|
sample_count = len(list(os.scandir(path)))
|
|
estimated_total = sample_count * 10 # Simple heuristic
|
|
except (PermissionError, OSError):
|
|
estimated_total = 1000 # Fallback estimate
|
|
|
|
with concurrent.futures.ThreadPoolExecutor(
|
|
max_workers=self.max_workers
|
|
) as executor:
|
|
futures = {}
|
|
|
|
while dirs_to_scan and not self.stop_requested:
|
|
# Process directories in parallel
|
|
while dirs_to_scan and len(futures) < self.max_workers:
|
|
dir_path = dirs_to_scan.popleft()
|
|
if dir_path not in self.scanned_directories:
|
|
futures[
|
|
executor.submit(
|
|
self.scan_single_directory_helper, dir_path
|
|
)
|
|
] = dir_path
|
|
|
|
# Process completed directories
|
|
for future in list(
|
|
concurrent.futures.as_completed(futures.keys())
|
|
):
|
|
dir_path = futures.pop(future)
|
|
|
|
try:
|
|
dir_items, subdirs = future.result()
|
|
entries_processed += len(dir_items)
|
|
|
|
# Add results to our list
|
|
items.extend(dir_items)
|
|
batch.extend(dir_items)
|
|
|
|
# Add subdirectories to our queue
|
|
dirs_to_scan.extend(subdirs)
|
|
|
|
# Mark directory as scanned
|
|
self.scanned_directories.add(dir_path)
|
|
self.directory_scanned.emit(dir_path)
|
|
|
|
# Emit batch if it's full
|
|
if len(batch) >= self.batch_size:
|
|
self.items_found.emit(batch)
|
|
batch = []
|
|
|
|
# Update progress periodically
|
|
current_time = time.time() # type: ignore
|
|
if (
|
|
current_time - last_update_time
|
|
> progress_update_interval
|
|
):
|
|
# Simple progress estimation
|
|
progress = min(
|
|
99,
|
|
int(entries_processed / estimated_total * 100),
|
|
)
|
|
self.progress_update.emit(progress)
|
|
last_update_time = current_time
|
|
except Exception as e:
|
|
print(f"Error scanning directory {dir_path}: {e}")
|
|
|
|
# Emit any remaining items in the final batch
|
|
if batch and not self.stop_requested:
|
|
self.items_found.emit(batch)
|
|
|
|
# Store in cache
|
|
self.cache[path] = items
|
|
self.progress_update.emit(100) # Final update
|
|
return items
|
|
|
|
def scan_single_directory(self, path):
|
|
"""
|
|
Scan a single directory without recursion - supports lazy loading
|
|
"""
|
|
if self.stop_requested:
|
|
return []
|
|
|
|
if path in self.cache:
|
|
items = self.cache[path]
|
|
self.items_found.emit(items)
|
|
return items
|
|
|
|
try:
|
|
items = []
|
|
with os.scandir(path) as entries:
|
|
for entry in entries:
|
|
if self.stop_requested:
|
|
break
|
|
|
|
if entry.is_dir():
|
|
# For directories, just add them to the list
|
|
# but don't scan them yet (lazy loading)
|
|
items.append((entry.path, True))
|
|
elif entry.is_file() and entry.name.lower().endswith(
|
|
tuple(self.allowed_extensions)
|
|
):
|
|
items.append((entry.path, False))
|
|
|
|
# Store in cache and emit
|
|
self.cache[path] = items
|
|
self.items_found.emit(items)
|
|
self.scanned_directories.add(path)
|
|
self.directory_scanned.emit(path)
|
|
self.progress_update.emit(100) # Show complete for this directory
|
|
return items
|
|
except PermissionError:
|
|
print(f"Permission denied: {path}")
|
|
return []
|
|
except OSError as e:
|
|
print(f"Error accessing {path}: {e}")
|
|
return []
|
|
|
|
def scan_single_directory_helper(self, path):
|
|
"""Helper method for parallel directory scanning"""
|
|
items = []
|
|
subdirs = []
|
|
|
|
try:
|
|
with os.scandir(path) as entries:
|
|
for entry in entries:
|
|
if self.stop_requested:
|
|
break
|
|
|
|
if entry.is_dir():
|
|
items.append((entry.path, True))
|
|
subdirs.append(entry.path)
|
|
elif entry.is_file() and entry.name.lower().endswith(
|
|
tuple(self.allowed_extensions)
|
|
):
|
|
items.append((entry.path, False))
|
|
except (PermissionError, OSError) as e:
|
|
print(f"Error accessing {path}: {e}")
|
|
|
|
return items, subdirs
|
|
|
|
def request_directory_scan(self, path):
|
|
"""Request a scan of a specific directory (for lazy loading)"""
|
|
if path in self.scanned_directories:
|
|
return
|
|
|
|
items = self.scan_single_directory(path)
|
|
return items
|
|
|
|
def request_full_scan(self):
|
|
"""Request a full recursive scan of all subdirectories"""
|
|
items = self.scan_directory_recursive(self.path)
|
|
self.scan_complete.emit()
|
|
return items
|
|
|
|
def stop(self):
|
|
self.stop_requested = True
|
|
|
|
|
|
# Metadata Extractor
|
|
class MetadataExtractor(QThread):
|
|
metadata_extracted = pyqtSignal(dict)
|
|
extraction_complete = pyqtSignal()
|
|
progress_update = pyqtSignal(int)
|
|
|
|
def __init__(self, file_list):
|
|
super().__init__()
|
|
self.file_list = file_list
|
|
self.stop_requested = False
|
|
self.metadata_cache = {}
|
|
|
|
def run(self):
|
|
total_files = len(self.file_list)
|
|
processed_files = 0
|
|
|
|
with concurrent.futures.ThreadPoolExecutor() as executor:
|
|
futures = []
|
|
for file_path in self.file_list:
|
|
if self.stop_requested:
|
|
break
|
|
if file_path in self.metadata_cache:
|
|
self.metadata_extracted.emit(
|
|
self.metadata_cache[file_path]
|
|
)
|
|
processed_files += 1
|
|
self.progress_update.emit(
|
|
int(processed_files / total_files * 100)
|
|
)
|
|
else:
|
|
futures.append(
|
|
executor.submit(self.extract_metadata, file_path)
|
|
)
|
|
|
|
for future in concurrent.futures.as_completed(futures):
|
|
if self.stop_requested:
|
|
break
|
|
try:
|
|
metadata = future.result()
|
|
if metadata:
|
|
self.metadata_extracted.emit(metadata)
|
|
except Exception as e:
|
|
print(f"Error extracting metadata: {e}")
|
|
|
|
processed_files += 1
|
|
self.progress_update.emit(
|
|
int(processed_files / total_files * 100)
|
|
)
|
|
|
|
self.extraction_complete.emit()
|
|
|
|
def extract_metadata(self, file_path):
|
|
try:
|
|
if not os.path.isfile(file_path):
|
|
return None
|
|
|
|
# Skip non-audio files
|
|
if not file_path.lower().endswith(
|
|
(".mp3", ".wav", ".flac", ".m4a", ".wma", ".mid", ".midi")
|
|
):
|
|
return None
|
|
|
|
audio = mutagen.File(file_path) # type: ignore
|
|
if not audio:
|
|
return None
|
|
|
|
metadata = {
|
|
"file_path": file_path,
|
|
"artist": self._get_tag(audio, "artist", "Unknown Artist"),
|
|
"album": self._get_tag(audio, "album", "Unknown Album"),
|
|
"title": self._get_tag(
|
|
audio, "title", os.path.basename(file_path)
|
|
),
|
|
"genre": self._get_tag(audio, "genre", "Unknown Genre"),
|
|
"year": self._get_tag(audio, "date", "Unknown Year"),
|
|
}
|
|
|
|
# Cache the result
|
|
self.metadata_cache[file_path] = metadata
|
|
return metadata
|
|
except Exception as e:
|
|
print(f"Error processing {file_path}: {e}")
|
|
return None
|
|
|
|
def _get_tag(self, audio, tag_name, default_value):
|
|
"""Helper method to safely extract tags from audio files"""
|
|
try:
|
|
if tag_name in audio:
|
|
value = audio[tag_name]
|
|
if isinstance(value, list) and len(value) > 0:
|
|
return str(value[0])
|
|
return str(value)
|
|
except Exception:
|
|
pass
|
|
return default_value
|
|
|
|
def stop(self):
|
|
self.stop_requested = True
|
|
|
|
|
|
# Archive Extractor not fully tested or implemented
|
|
class ArchiveExtractor(QThread):
|
|
extraction_progress = pyqtSignal(int)
|
|
extraction_complete = pyqtSignal(list) # Emits list of extracted files
|
|
extraction_error = pyqtSignal(str)
|
|
|
|
def __init__(self, archive_path, extraction_dir):
|
|
super().__init__()
|
|
self.archive_path = archive_path
|
|
self.extraction_dir = extraction_dir
|
|
self.stop_requested = False
|
|
|
|
def run(self):
|
|
try:
|
|
extracted_files = []
|
|
|
|
if self.archive_path.lower().endswith(".zip"):
|
|
extracted_files = self._extract_zip()
|
|
elif self.archive_path.lower().endswith(".7z"):
|
|
extracted_files = self._extract_7z()
|
|
elif self.archive_path.lower().endswith(".rar"):
|
|
extracted_files = self._extract_rar()
|
|
else:
|
|
self.extraction_error.emit(
|
|
f"Unsupported archive format: {self.archive_path}"
|
|
)
|
|
return
|
|
|
|
self.extraction_complete.emit(extracted_files)
|
|
except Exception as e:
|
|
self.extraction_error.emit(f"Extraction error: {str(e)}")
|
|
|
|
def _extract_zip(self):
|
|
extracted_files = []
|
|
try:
|
|
with zipfile.ZipFile(self.archive_path, "r") as zip_ref:
|
|
file_list = zip_ref.namelist()
|
|
total_files = len(file_list)
|
|
|
|
for i, file in enumerate(file_list):
|
|
if self.stop_requested:
|
|
break
|
|
zip_ref.extract(file, self.extraction_dir)
|
|
extracted_files.append(
|
|
os.path.join(self.extraction_dir, file)
|
|
)
|
|
self.extraction_progress.emit(
|
|
int((i + 1) / total_files * 100)
|
|
)
|
|
except Exception as e:
|
|
self.extraction_error.emit(f"ZIP extraction error: {str(e)}")
|
|
|
|
return extracted_files
|
|
|
|
def _extract_7z(self):
|
|
extracted_files = []
|
|
try:
|
|
with py7zr.SevenZipFile(self.archive_path, mode="r") as z:
|
|
file_list = z.getnames()
|
|
total_files = len(file_list)
|
|
|
|
for i, file in enumerate(file_list):
|
|
if self.stop_requested:
|
|
break
|
|
z.extract(self.extraction_dir, [file])
|
|
extracted_files.append(
|
|
os.path.join(self.extraction_dir, file)
|
|
)
|
|
self.extraction_progress.emit(
|
|
int((i + 1) / total_files * 100)
|
|
)
|
|
except Exception as e:
|
|
self.extraction_error.emit(f"7Z extraction error: {str(e)}")
|
|
|
|
return extracted_files
|
|
|
|
def _extract_rar(self):
|
|
extracted_files = []
|
|
try:
|
|
with rarfile.RarFile(self.archive_path) as rf:
|
|
file_list = rf.namelist()
|
|
total_files = len(file_list)
|
|
|
|
for i, file in enumerate(file_list):
|
|
if self.stop_requested:
|
|
break
|
|
rf.extract(file, self.extraction_dir)
|
|
extracted_files.append(
|
|
os.path.join(self.extraction_dir, file)
|
|
)
|
|
self.extraction_progress.emit(
|
|
int((i + 1) / total_files * 100)
|
|
)
|
|
except Exception as e:
|
|
self.extraction_error.emit(f"RAR extraction error: {str(e)}")
|
|
|
|
return extracted_files
|
|
|
|
def stop(self):
|
|
self.stop_requested = True
|
|
|
|
|
|
# Main Organizer class
|
|
class Organizer:
|
|
def __init__(self):
|
|
self.file_list = []
|
|
self.dir_list = []
|
|
self.scanner = None
|
|
self.metadata_extractor = None
|
|
self.archive_extractor = None
|
|
|
|
# Metadata organization
|
|
self.artists = set()
|
|
self.albums = set()
|
|
self.genres = set()
|
|
self.years = set()
|
|
|
|
# Signals for UI updates
|
|
self.on_scan_complete = None
|
|
self.on_progress_update = None
|
|
self.on_metadata_complete = None
|
|
|
|
def start_scan(self, path):
|
|
"""Start scanning a directory for files"""
|
|
self.file_list.clear()
|
|
self.dir_list.clear()
|
|
|
|
self.scanner = FileScanner(path)
|
|
self.scanner.items_found.connect(self.add_items)
|
|
self.scanner.scan_complete.connect(self.scan_finished)
|
|
|
|
# Connect progress signal if handler exists
|
|
if self.on_progress_update:
|
|
self.scanner.progress_update.connect(self.on_progress_update)
|
|
|
|
self.scanner.start()
|
|
|
|
def add_items(self, items):
|
|
"""Process items found during scanning"""
|
|
for path, is_dir in items:
|
|
if is_dir:
|
|
self.dir_list.append(path)
|
|
else:
|
|
self.file_list.append(path)
|
|
|
|
def scan_finished(self):
|
|
"""Handle scan completion"""
|
|
print(
|
|
f"Scan complete. Found {len(self.dir_list)} directories\
|
|
and {len(self.file_list)} files."
|
|
)
|
|
if self.on_scan_complete:
|
|
self.on_scan_complete()
|
|
|
|
def stop_scan(self):
|
|
"""Stop the current scan operation"""
|
|
if self.scanner:
|
|
self.scanner.stop()
|
|
self.scanner.wait()
|
|
|
|
def extract_metadata(self):
|
|
"""Extract metadata from audio files"""
|
|
if not self.file_list:
|
|
print("No files to extract metadata from")
|
|
return
|
|
|
|
self.metadata_extractor = MetadataExtractor(self.file_list)
|
|
self.metadata_extractor.metadata_extracted.connect(
|
|
self.process_metadata
|
|
)
|
|
self.metadata_extractor.extraction_complete.connect(
|
|
self.metadata_extraction_complete
|
|
)
|
|
|
|
# Connect progress signal if handler exists
|
|
if self.on_progress_update:
|
|
self.metadata_extractor.progress_update.connect(
|
|
self.on_progress_update
|
|
)
|
|
|
|
self.metadata_extractor.start()
|
|
|
|
def process_metadata(self, metadata):
|
|
"""Process extracted metadata"""
|
|
if "artist" in metadata and metadata["artist"]:
|
|
self.artists.add(metadata["artist"])
|
|
if "album" in metadata and metadata["album"]:
|
|
self.albums.add(metadata["album"])
|
|
if "genre" in metadata and metadata["genre"]:
|
|
self.genres.add(metadata["genre"])
|
|
if "year" in metadata and metadata["year"]:
|
|
self.years.add(metadata["year"])
|
|
|
|
def metadata_extraction_complete(self):
|
|
"""Handle metadata extraction completion"""
|
|
print(
|
|
f"Metadata extraction complete. Artists: {len(self.artists)},\
|
|
Albums: {len(self.albums)}, Genres: {len(self.genres)},\
|
|
Years: {len(self.years)}"
|
|
)
|
|
if self.on_metadata_complete:
|
|
self.on_metadata_complete()
|
|
|
|
def extract_archives(self):
|
|
"""Extract archives"""
|
|
if not self.file_list:
|
|
print("No files to extract archives from")
|
|
return
|
|
|
|
self.archive_extractor = ArchiveExtractor(
|
|
self.file_list, extraction_dir=None
|
|
)
|
|
self.archive_extractor.extraction_complete.connect(
|
|
self.archive_extraction_complete
|
|
)
|
|
|
|
# Connect progress signal if handler exists
|
|
if self.on_progress_update:
|
|
self.archive_extractor.extraction_progress.connect(
|
|
self.on_progress_update
|
|
)
|
|
|
|
self.archive_extractor.start()
|
|
|
|
def archive_extraction_complete(self):
|
|
"""Handle archive extraction completion"""
|
|
print("Archive extraction complete.")
|
|
|
|
if self.on_progress_update:
|
|
self.on_progress_update(100)
|
|
|
|
def stop_extraction(self):
|
|
"""Stop the current extraction operation"""
|
|
if self.archive_extractor:
|
|
self.archive_extractor.stop()
|
|
self.archive_extractor.wait()
|