import os import logging import zipfile import py7zr import rarfile # typed: ignore from PyQt6.QtCore import QThread, pyqtSignal # Get the logger logger = logging.getLogger("fbroswer") # Archive Extractor not fully tested or implemented class ArchiveExtractor(QThread): extraction_progress = pyqtSignal(int) extraction_complete = pyqtSignal(list) # Emits list of extracted files extraction_error = pyqtSignal(str) def __init__(self, archive_path, extraction_dir): super().__init__() self.archive_path = archive_path self.extraction_dir = extraction_dir self.stop_requested = False self.file_list = [] # Signals for UI updates self.on_scan_complete = None self.on_progress_update = None self.on_metadata_complete = None def run(self): try: extracted_files = [] if self.archive_path.lower().endswith(".zip"): extracted_files = self._extract_zip() elif self.archive_path.lower().endswith(".7z"): extracted_files = self._extract_7z() elif self.archive_path.lower().endswith(".rar"): extracted_files = self._extract_rar() else: self.extraction_error.emit( f"Unsupported archive format: {self.archive_path}" ) return self.extraction_complete.emit(extracted_files) except Exception as e: self.extraction_error.emit(f"Extraction error: {str(e)}") def _extract_zip(self): extracted_files = [] try: with zipfile.ZipFile(self.archive_path, "r") as zip_ref: file_list = zip_ref.namelist() total_files = len(file_list) for i, file in enumerate(file_list): if self.stop_requested: break zip_ref.extract(file, self.extraction_dir) extracted_files.append( os.path.join(self.extraction_dir, file) ) self.extraction_progress.emit( int((i + 1) / total_files * 100) ) except Exception as e: self.extraction_error.emit(f"ZIP extraction error: {str(e)}") return extracted_files def _extract_7z(self): extracted_files = [] try: with py7zr.SevenZipFile(self.archive_path, mode="r") as z: file_list = z.getnames() total_files = len(file_list) for i, file in enumerate(file_list): if self.stop_requested: break z.extract(self.extraction_dir, [file]) extracted_files.append( os.path.join(self.extraction_dir, file) ) self.extraction_progress.emit( int((i + 1) / total_files * 100) ) except Exception as e: self.extraction_error.emit(f"7Z extraction error: {str(e)}") return extracted_files def _extract_rar(self): extracted_files = [] try: with rarfile.RarFile(self.archive_path) as rf: file_list = rf.namelist() total_files = len(file_list) for i, file in enumerate(file_list): if self.stop_requested: break rf.extract(file, self.extraction_dir) extracted_files.append( os.path.join(self.extraction_dir, file) ) self.extraction_progress.emit( int((i + 1) / total_files * 100) ) except Exception as e: self.extraction_error.emit(f"RAR extraction error: {str(e)}") return extracted_files def stop(self): self.stop_requested = True def extract_archives(self): """Extract archives""" if not self.file_list: logger.debug("No files to extract archives from") return self.archive_extractor = ArchiveExtractor( self.file_list, extraction_dir=None ) self.archive_extractor.extraction_complete.connect( self.archive_extraction_complete ) # Connect progress signal if handler exists if self.on_progress_update: self.archive_extractor.extraction_progress.connect( self.on_progress_update ) self.archive_extractor.start() def archive_extraction_complete(self): """Handle archive extraction completion""" logger.debug("Archive extraction complete.") if self.on_progress_update: self.on_progress_update(100) def stop_extraction(self): """Stop the current extraction operation""" if self.archive_extractor: self.archive_extractor.stop() self.archive_extractor.wait()