Fbrowser/archiver.py
Stan44 98ee9ce50a feat: Implement database integration
This commit introduces database integration using FireflyDB for storing and retrieving audio file metadata.

- Integrated FireflyDB for persistent storage of metadata.
- Added methods to check for existing metadata in the database and retrieve it.
- Modified the Organizer class to use FireflyDB for processing metadata.
- Added auto-scanning and metadata extraction upon directory opening in Fbrowser.
- Created archiver.py and metaextract.py to house the ArchiveExtractor and MetadataExtractor classes respectively.
- Added .gitignore entries for Firefly related files.
- Added Mock MIT License, Contributor License Agreement, and Pro Edition License Agreement files.
2025-04-11 22:30:39 -05:00

149 lines
4.9 KiB
Python

import os
import zipfile
import py7zr
import rarfile # typed: ignore
from PyQt6.QtCore import QThread, pyqtSignal
# Archive Extractor not fully tested or implemented
class ArchiveExtractor(QThread):
extraction_progress = pyqtSignal(int)
extraction_complete = pyqtSignal(list) # Emits list of extracted files
extraction_error = pyqtSignal(str)
def __init__(self, archive_path, extraction_dir):
super().__init__()
self.archive_path = archive_path
self.extraction_dir = extraction_dir
self.stop_requested = False
self.file_list = []
# Signals for UI updates
self.on_scan_complete = None
self.on_progress_update = None
self.on_metadata_complete = None
def run(self):
try:
extracted_files = []
if self.archive_path.lower().endswith(".zip"):
extracted_files = self._extract_zip()
elif self.archive_path.lower().endswith(".7z"):
extracted_files = self._extract_7z()
elif self.archive_path.lower().endswith(".rar"):
extracted_files = self._extract_rar()
else:
self.extraction_error.emit(
f"Unsupported archive format: {self.archive_path}"
)
return
self.extraction_complete.emit(extracted_files)
except Exception as e:
self.extraction_error.emit(f"Extraction error: {str(e)}")
def _extract_zip(self):
extracted_files = []
try:
with zipfile.ZipFile(self.archive_path, "r") as zip_ref:
file_list = zip_ref.namelist()
total_files = len(file_list)
for i, file in enumerate(file_list):
if self.stop_requested:
break
zip_ref.extract(file, self.extraction_dir)
extracted_files.append(
os.path.join(self.extraction_dir, file)
)
self.extraction_progress.emit(
int((i + 1) / total_files * 100)
)
except Exception as e:
self.extraction_error.emit(f"ZIP extraction error: {str(e)}")
return extracted_files
def _extract_7z(self):
extracted_files = []
try:
with py7zr.SevenZipFile(self.archive_path, mode="r") as z:
file_list = z.getnames()
total_files = len(file_list)
for i, file in enumerate(file_list):
if self.stop_requested:
break
z.extract(self.extraction_dir, [file])
extracted_files.append(
os.path.join(self.extraction_dir, file)
)
self.extraction_progress.emit(
int((i + 1) / total_files * 100)
)
except Exception as e:
self.extraction_error.emit(f"7Z extraction error: {str(e)}")
return extracted_files
def _extract_rar(self):
extracted_files = []
try:
with rarfile.RarFile(self.archive_path) as rf:
file_list = rf.namelist()
total_files = len(file_list)
for i, file in enumerate(file_list):
if self.stop_requested:
break
rf.extract(file, self.extraction_dir)
extracted_files.append(
os.path.join(self.extraction_dir, file)
)
self.extraction_progress.emit(
int((i + 1) / total_files * 100)
)
except Exception as e:
self.extraction_error.emit(f"RAR extraction error: {str(e)}")
return extracted_files
def stop(self):
self.stop_requested = True
def extract_archives(self):
"""Extract archives"""
if not self.file_list:
print("No files to extract archives from")
return
self.archive_extractor = ArchiveExtractor(
self.file_list, extraction_dir=None
)
self.archive_extractor.extraction_complete.connect(
self.archive_extraction_complete
)
# Connect progress signal if handler exists
if self.on_progress_update:
self.archive_extractor.extraction_progress.connect(
self.on_progress_update
)
self.archive_extractor.start()
def archive_extraction_complete(self):
"""Handle archive extraction completion"""
print("Archive extraction complete.")
if self.on_progress_update:
self.on_progress_update(100)
def stop_extraction(self):
"""Stop the current extraction operation"""
if self.archive_extractor:
self.archive_extractor.stop()
self.archive_extractor.wait()