Fbrowser/metaextract.py
Stan44 a69391b1d8 toggle logging added (--debug-on) to enable logging
minor bug fixes.
known stutter bug in database/meta systems
appears to be the delay in connecting to the database.

future work:
1. intergrate firefly.dll as server module.
2. ensure full database funcctionality.
3. add toggle to use database or not. (by default we check for firefly if we don't find we default to python systems. if found we automatically use firefly.(so maybe on toggles))
4. investigate and fix the stutter bug.
2025-04-11 23:59:50 -05:00

187 lines
6.7 KiB
Python

import os
import logging
import concurrent.futures
import mutagen
from datetime import datetime
from PyQt6.QtCore import QThread, pyqtSignal
# Get the logger
logger = logging.getLogger("fbroswer")
# Metadata Extractor
class MetadataExtractor(QThread):
metadata_extracted = pyqtSignal(dict)
extraction_complete = pyqtSignal()
progress_update = pyqtSignal(int)
def __init__(self, file_list):
super().__init__()
self.file_list = file_list
self.stop_requested = False
self.metadata_cache = {}
self.on_metadata_complete = None
# Metadata organization
self.artists = set()
self.albums = set()
self.genres = set()
self.years = set()
logger.debug(
f"MetadataExtractor initialized with {len(file_list)} files"
)
def run(self):
total_files = len(self.file_list)
processed_files = 0
logger.info(f"Starting metadata extraction for {total_files} files")
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = []
for file_path in self.file_list:
if self.stop_requested:
logger.debug("Metadata extraction stopped by user request")
break
if file_path in self.metadata_cache:
logger.debug(f"Using cached metadata for {file_path}")
self.metadata_extracted.emit(
self.metadata_cache[file_path]
)
processed_files += 1
self.progress_update.emit(
int(processed_files / total_files * 100)
)
else:
futures.append(
executor.submit(self.extract_metadata, file_path)
)
for future in concurrent.futures.as_completed(futures):
if self.stop_requested:
logger.debug(
"Metadata extraction stopped during processing"
)
break
try:
metadata = future.result()
if metadata:
self.metadata_extracted.emit(metadata)
except Exception as e:
logger.error(
f"Error extracting metadata: {e}", exc_info=True
)
processed_files += 1
progress = int(processed_files / total_files * 100)
logger.debug(f"Metadata extraction progress: {progress}%")
self.progress_update.emit(progress)
logger.info("Metadata extraction complete")
self.extraction_complete.emit()
def extract_metadata(self, file_path):
try:
if not os.path.isfile(file_path):
logger.warning(f"File does not exist: {file_path}")
return None
# Skip non-audio files
if not file_path.lower().endswith(
(".mp3", ".wav", ".flac", ".m4a", ".wma", ".mid", ".midi")
):
logger.debug(f"Skipping non-audio file: {file_path}")
return None
logger.info(f"Extracting metadata for {file_path}")
# Add additional error handling for mutagen
try:
audio = mutagen.File(file_path) # type: ignore
except (OSError, IOError) as e:
logger.error(f"Mutagen error reading file {file_path}: {e}")
# Return basic metadata without audio tags
file_size_mb = os.path.getsize(file_path) / (1024 * 1024)
return {
"file_path": file_path,
"artist": "Unknown Artist",
"album": "Unknown Album",
"title": os.path.basename(file_path),
"genre": "Unknown Genre",
"year": "Unknown Year",
"size_mb": f"{file_size_mb:.2f}",
"filename": os.path.basename(file_path),
"extension": os.path.splitext(file_path)[1].lower(),
"last_modified": str(
datetime.fromtimestamp(os.path.getmtime(file_path))
),
"extracted_at": str(datetime.now()),
"error": str(e),
}
if not audio:
logger.warning(f"No metadata found for {file_path}")
return None
# Get file size in MB
file_size_mb = os.path.getsize(file_path) / (1024 * 1024)
metadata = {
"file_path": file_path,
"artist": self._get_tag(audio, "artist", "Unknown Artist"),
"album": self._get_tag(audio, "album", "Unknown Album"),
"title": self._get_tag(
audio, "title", os.path.basename(file_path)
),
"genre": self._get_tag(audio, "genre", "Unknown Genre"),
"year": self._get_tag(audio, "date", "Unknown Year"),
"size_mb": f"{file_size_mb:.2f}",
"filename": os.path.basename(file_path),
"extension": os.path.splitext(file_path)[1].lower(),
"last_modified": str(
datetime.fromtimestamp(os.path.getmtime(file_path))
),
"extracted_at": str(datetime.now()),
}
logger.debug(f"Extracted metadata: {metadata}")
# Cache the result
self.metadata_cache[file_path] = metadata
return metadata
except Exception as e:
logger.error(f"Error processing {file_path}: {e}", exc_info=True)
import traceback
traceback.print_exc()
return None
def _get_tag(self, audio, tag_name, default_value):
"""Helper method to safely extract tags from audio files"""
try:
if tag_name in audio:
value = audio[tag_name]
if isinstance(value, list) and len(value) > 0:
return str(value[0])
return str(value)
except Exception as e:
logger.debug(f"Error extracting tag {tag_name}: {e}")
return default_value
def stop(self):
logger.info("Stopping metadata extraction")
self.stop_requested = True
def metadata_extraction_complete(self):
"""Handle metadata extraction completion"""
logger.info(
f"Metadata extraction complete. Artists: {len(self.artists)}, "
f"Albums: {len(self.albums)}, Genres: {len(self.genres)}, "
f"Years: {len(self.years)}"
)
if self.on_metadata_complete:
self.on_metadata_complete()