minor bug fixes. known stutter bug in database/meta systems appears to be the delay in connecting to the database. future work: 1. intergrate firefly.dll as server module. 2. ensure full database funcctionality. 3. add toggle to use database or not. (by default we check for firefly if we don't find we default to python systems. if found we automatically use firefly.(so maybe on toggles)) 4. investigate and fix the stutter bug.
154 lines
5.0 KiB
Python
154 lines
5.0 KiB
Python
import os
|
|
import logging
|
|
import zipfile
|
|
import py7zr
|
|
import rarfile # typed: ignore
|
|
|
|
from PyQt6.QtCore import QThread, pyqtSignal
|
|
|
|
|
|
# Get the logger
|
|
logger = logging.getLogger("fbroswer")
|
|
|
|
|
|
# Archive Extractor not fully tested or implemented
|
|
class ArchiveExtractor(QThread):
|
|
extraction_progress = pyqtSignal(int)
|
|
extraction_complete = pyqtSignal(list) # Emits list of extracted files
|
|
extraction_error = pyqtSignal(str)
|
|
|
|
def __init__(self, archive_path, extraction_dir):
|
|
super().__init__()
|
|
self.archive_path = archive_path
|
|
self.extraction_dir = extraction_dir
|
|
self.stop_requested = False
|
|
self.file_list = []
|
|
|
|
# Signals for UI updates
|
|
self.on_scan_complete = None
|
|
self.on_progress_update = None
|
|
self.on_metadata_complete = None
|
|
|
|
def run(self):
|
|
try:
|
|
extracted_files = []
|
|
|
|
if self.archive_path.lower().endswith(".zip"):
|
|
extracted_files = self._extract_zip()
|
|
elif self.archive_path.lower().endswith(".7z"):
|
|
extracted_files = self._extract_7z()
|
|
elif self.archive_path.lower().endswith(".rar"):
|
|
extracted_files = self._extract_rar()
|
|
else:
|
|
self.extraction_error.emit(
|
|
f"Unsupported archive format: {self.archive_path}"
|
|
)
|
|
return
|
|
|
|
self.extraction_complete.emit(extracted_files)
|
|
except Exception as e:
|
|
self.extraction_error.emit(f"Extraction error: {str(e)}")
|
|
|
|
def _extract_zip(self):
|
|
extracted_files = []
|
|
try:
|
|
with zipfile.ZipFile(self.archive_path, "r") as zip_ref:
|
|
file_list = zip_ref.namelist()
|
|
total_files = len(file_list)
|
|
|
|
for i, file in enumerate(file_list):
|
|
if self.stop_requested:
|
|
break
|
|
zip_ref.extract(file, self.extraction_dir)
|
|
extracted_files.append(
|
|
os.path.join(self.extraction_dir, file)
|
|
)
|
|
self.extraction_progress.emit(
|
|
int((i + 1) / total_files * 100)
|
|
)
|
|
except Exception as e:
|
|
self.extraction_error.emit(f"ZIP extraction error: {str(e)}")
|
|
|
|
return extracted_files
|
|
|
|
def _extract_7z(self):
|
|
extracted_files = []
|
|
try:
|
|
with py7zr.SevenZipFile(self.archive_path, mode="r") as z:
|
|
file_list = z.getnames()
|
|
total_files = len(file_list)
|
|
|
|
for i, file in enumerate(file_list):
|
|
if self.stop_requested:
|
|
break
|
|
z.extract(self.extraction_dir, [file])
|
|
extracted_files.append(
|
|
os.path.join(self.extraction_dir, file)
|
|
)
|
|
self.extraction_progress.emit(
|
|
int((i + 1) / total_files * 100)
|
|
)
|
|
except Exception as e:
|
|
self.extraction_error.emit(f"7Z extraction error: {str(e)}")
|
|
|
|
return extracted_files
|
|
|
|
def _extract_rar(self):
|
|
extracted_files = []
|
|
try:
|
|
with rarfile.RarFile(self.archive_path) as rf:
|
|
file_list = rf.namelist()
|
|
total_files = len(file_list)
|
|
|
|
for i, file in enumerate(file_list):
|
|
if self.stop_requested:
|
|
break
|
|
rf.extract(file, self.extraction_dir)
|
|
extracted_files.append(
|
|
os.path.join(self.extraction_dir, file)
|
|
)
|
|
self.extraction_progress.emit(
|
|
int((i + 1) / total_files * 100)
|
|
)
|
|
except Exception as e:
|
|
self.extraction_error.emit(f"RAR extraction error: {str(e)}")
|
|
|
|
return extracted_files
|
|
|
|
def stop(self):
|
|
self.stop_requested = True
|
|
|
|
def extract_archives(self):
|
|
"""Extract archives"""
|
|
if not self.file_list:
|
|
logger.debug("No files to extract archives from")
|
|
return
|
|
|
|
self.archive_extractor = ArchiveExtractor(
|
|
self.file_list, extraction_dir=None
|
|
)
|
|
self.archive_extractor.extraction_complete.connect(
|
|
self.archive_extraction_complete
|
|
)
|
|
|
|
# Connect progress signal if handler exists
|
|
if self.on_progress_update:
|
|
self.archive_extractor.extraction_progress.connect(
|
|
self.on_progress_update
|
|
)
|
|
|
|
self.archive_extractor.start()
|
|
|
|
def archive_extraction_complete(self):
|
|
"""Handle archive extraction completion"""
|
|
logger.debug("Archive extraction complete.")
|
|
|
|
if self.on_progress_update:
|
|
self.on_progress_update(100)
|
|
|
|
def stop_extraction(self):
|
|
"""Stop the current extraction operation"""
|
|
if self.archive_extractor:
|
|
self.archive_extractor.stop()
|
|
self.archive_extractor.wait()
|