This commit introduces database integration using FireflyDB for storing and retrieving audio file metadata. - Integrated FireflyDB for persistent storage of metadata. - Added methods to check for existing metadata in the database and retrieve it. - Modified the Organizer class to use FireflyDB for processing metadata. - Added auto-scanning and metadata extraction upon directory opening in Fbrowser. - Created archiver.py and metaextract.py to house the ArchiveExtractor and MetadataExtractor classes respectively. - Added .gitignore entries for Firefly related files. - Added Mock MIT License, Contributor License Agreement, and Pro Edition License Agreement files.
149 lines
4.9 KiB
Python
149 lines
4.9 KiB
Python
import os
|
|
import zipfile
|
|
import py7zr
|
|
import rarfile # typed: ignore
|
|
|
|
from PyQt6.QtCore import QThread, pyqtSignal
|
|
|
|
|
|
# Archive Extractor not fully tested or implemented
|
|
class ArchiveExtractor(QThread):
|
|
extraction_progress = pyqtSignal(int)
|
|
extraction_complete = pyqtSignal(list) # Emits list of extracted files
|
|
extraction_error = pyqtSignal(str)
|
|
|
|
def __init__(self, archive_path, extraction_dir):
|
|
super().__init__()
|
|
self.archive_path = archive_path
|
|
self.extraction_dir = extraction_dir
|
|
self.stop_requested = False
|
|
self.file_list = []
|
|
|
|
# Signals for UI updates
|
|
self.on_scan_complete = None
|
|
self.on_progress_update = None
|
|
self.on_metadata_complete = None
|
|
|
|
def run(self):
|
|
try:
|
|
extracted_files = []
|
|
|
|
if self.archive_path.lower().endswith(".zip"):
|
|
extracted_files = self._extract_zip()
|
|
elif self.archive_path.lower().endswith(".7z"):
|
|
extracted_files = self._extract_7z()
|
|
elif self.archive_path.lower().endswith(".rar"):
|
|
extracted_files = self._extract_rar()
|
|
else:
|
|
self.extraction_error.emit(
|
|
f"Unsupported archive format: {self.archive_path}"
|
|
)
|
|
return
|
|
|
|
self.extraction_complete.emit(extracted_files)
|
|
except Exception as e:
|
|
self.extraction_error.emit(f"Extraction error: {str(e)}")
|
|
|
|
def _extract_zip(self):
|
|
extracted_files = []
|
|
try:
|
|
with zipfile.ZipFile(self.archive_path, "r") as zip_ref:
|
|
file_list = zip_ref.namelist()
|
|
total_files = len(file_list)
|
|
|
|
for i, file in enumerate(file_list):
|
|
if self.stop_requested:
|
|
break
|
|
zip_ref.extract(file, self.extraction_dir)
|
|
extracted_files.append(
|
|
os.path.join(self.extraction_dir, file)
|
|
)
|
|
self.extraction_progress.emit(
|
|
int((i + 1) / total_files * 100)
|
|
)
|
|
except Exception as e:
|
|
self.extraction_error.emit(f"ZIP extraction error: {str(e)}")
|
|
|
|
return extracted_files
|
|
|
|
def _extract_7z(self):
|
|
extracted_files = []
|
|
try:
|
|
with py7zr.SevenZipFile(self.archive_path, mode="r") as z:
|
|
file_list = z.getnames()
|
|
total_files = len(file_list)
|
|
|
|
for i, file in enumerate(file_list):
|
|
if self.stop_requested:
|
|
break
|
|
z.extract(self.extraction_dir, [file])
|
|
extracted_files.append(
|
|
os.path.join(self.extraction_dir, file)
|
|
)
|
|
self.extraction_progress.emit(
|
|
int((i + 1) / total_files * 100)
|
|
)
|
|
except Exception as e:
|
|
self.extraction_error.emit(f"7Z extraction error: {str(e)}")
|
|
|
|
return extracted_files
|
|
|
|
def _extract_rar(self):
|
|
extracted_files = []
|
|
try:
|
|
with rarfile.RarFile(self.archive_path) as rf:
|
|
file_list = rf.namelist()
|
|
total_files = len(file_list)
|
|
|
|
for i, file in enumerate(file_list):
|
|
if self.stop_requested:
|
|
break
|
|
rf.extract(file, self.extraction_dir)
|
|
extracted_files.append(
|
|
os.path.join(self.extraction_dir, file)
|
|
)
|
|
self.extraction_progress.emit(
|
|
int((i + 1) / total_files * 100)
|
|
)
|
|
except Exception as e:
|
|
self.extraction_error.emit(f"RAR extraction error: {str(e)}")
|
|
|
|
return extracted_files
|
|
|
|
def stop(self):
|
|
self.stop_requested = True
|
|
|
|
def extract_archives(self):
|
|
"""Extract archives"""
|
|
if not self.file_list:
|
|
print("No files to extract archives from")
|
|
return
|
|
|
|
self.archive_extractor = ArchiveExtractor(
|
|
self.file_list, extraction_dir=None
|
|
)
|
|
self.archive_extractor.extraction_complete.connect(
|
|
self.archive_extraction_complete
|
|
)
|
|
|
|
# Connect progress signal if handler exists
|
|
if self.on_progress_update:
|
|
self.archive_extractor.extraction_progress.connect(
|
|
self.on_progress_update
|
|
)
|
|
|
|
self.archive_extractor.start()
|
|
|
|
def archive_extraction_complete(self):
|
|
"""Handle archive extraction completion"""
|
|
print("Archive extraction complete.")
|
|
|
|
if self.on_progress_update:
|
|
self.on_progress_update(100)
|
|
|
|
def stop_extraction(self):
|
|
"""Stop the current extraction operation"""
|
|
if self.archive_extractor:
|
|
self.archive_extractor.stop()
|
|
self.archive_extractor.wait()
|