Source code for record_shelf.report_generator

"""
Report generator for music collection data
"""

import logging
import time
from pathlib import Path
from typing import Any, Dict, List, Optional

import discogs_client
import pandas as pd
from tqdm import tqdm

from .config import Config


[docs] class ReportGenerator: """Generates custom reports from Discogs collection data""" def __init__(self, config: Config): self.config = config self.logger = logging.getLogger(__name__) # Initialize Discogs client self.client = discogs_client.Client( user_agent=config.user_agent, token=config.token )
[docs] def get_user_categories(self, username: str) -> List[str]: """Get list of categories for a user""" try: user = self.client.user(username) collection_folders = user.collection_folders categories = [] for folder in collection_folders: categories.append(folder.name) time.sleep(self.config.rate_limit_delay) return sorted(categories) except Exception as e: self.logger.error(f"Error fetching categories for {username}: {e}") raise
[docs] def fetch_collection_data( self, username: str, category_filter: Optional[str] = None ) -> List[Dict[str, Any]]: """Fetch collection data for a user""" try: user = self.client.user(username) collection_items = [] # Get collection folders folders = user.collection_folders for folder in tqdm(folders, desc="Processing folders"): # Skip if filtering by category and this isn't the target category if category_filter and folder.name != category_filter: continue self.logger.info(f"Processing folder: {folder.name}") # Get releases in this folder releases = folder.releases for release in tqdm( releases, desc=f"Processing {folder.name}", leave=False ): try: item_data = self._extract_release_data(release, folder.name) collection_items.append(item_data) # Rate limiting time.sleep(self.config.rate_limit_delay) except Exception as e: self.logger.warning( f"Error processing release {release.id}: {e}" ) continue # Sort by category, then alphabetically by artist/title collection_items.sort( key=lambda x: (x["category"], x["artist"].lower(), x["title"].lower()) ) return collection_items except Exception as e: self.logger.error(f"Error fetching collection for {username}: {e}") raise
def _extract_release_data(self, release: Any, category_name: str) -> Dict[str, Any]: """Extract relevant data from a release object""" try: # Get basic release info master_release = getattr(release, "master", None) return { "category": category_name, "artist": self._get_artist_name(release), "title": getattr(release, "title", ""), "label": self._get_label_name(release), "catalog_number": self._get_catalog_number(release), "format": self._get_format_info(release), "year": getattr(release, "year", ""), "genre": self._get_genres(release), "style": self._get_styles(release), "country": getattr(release, "country", ""), "discogs_id": getattr(release, "id", ""), "master_id": ( getattr(master_release, "id", "") if master_release else "" ), "rating": getattr(release, "rating", ""), "notes": getattr(release, "notes", ""), } except Exception as e: self.logger.warning(f"Error extracting release data: {e}") return { "category": category_name, "artist": "Unknown", "title": "Unknown", "label": "", "catalog_number": "", "format": "", "year": "", "genre": "", "style": "", "country": "", "discogs_id": "", "master_id": "", "rating": "", "notes": "", } def _get_artist_name(self, release: Any) -> str: """Extract artist name from release""" try: artists = getattr(release, "artists", []) if artists: return ", ".join([artist.name for artist in artists]) return "Unknown Artist" except: return "Unknown Artist" def _get_label_name(self, release: Any) -> str: """Extract label name from release""" try: labels = getattr(release, "labels", []) if labels: return ", ".join([label.name for label in labels]) return "" except: return "" def _get_catalog_number(self, release: Any) -> str: """Extract catalog number from release""" try: labels = getattr(release, "labels", []) if labels: cat_nums = [ getattr(label, "catno", "") for label in labels if hasattr(label, "catno") ] return ", ".join(filter(None, cat_nums)) return "" except: return "" def _get_format_info(self, release: Any) -> str: """Extract format information from release""" try: formats = getattr(release, "formats", []) if formats: format_info = [] for fmt in formats: fmt_name = getattr(fmt, "name", "") descriptions = getattr(fmt, "descriptions", []) if descriptions: fmt_name += f" ({', '.join(descriptions)})" format_info.append(fmt_name) return ", ".join(format_info) return "" except: return "" def _get_genres(self, release: Any) -> str: """Extract genres from release""" try: genres = getattr(release, "genres", []) return ", ".join(genres) if genres else "" except: return "" def _get_styles(self, release: Any) -> str: """Extract styles from release""" try: styles = getattr(release, "styles", []) return ", ".join(styles) if styles else "" except: return ""
[docs] def create_report( self, data: List[Dict[str, Any]], output_path: str, format_type: str = "xlsx" ) -> None: """Create a report from the collection data""" if not data: raise ValueError("No data to generate report") # Create DataFrame df = pd.DataFrame(data) # Reorder columns for better readability column_order = [ "category", "artist", "title", "label", "catalog_number", "format", "year", "genre", "style", "country", "discogs_id", "master_id", "rating", "notes", ] # Only include columns that exist in the data existing_columns = [col for col in column_order if col in df.columns] df = df[existing_columns] # Save based on format output_file_path = Path(output_path) if format_type == "xlsx": with pd.ExcelWriter(output_file_path, engine="openpyxl") as writer: df.to_excel(writer, sheet_name="Collection", index=False) # Create separate sheets for each category for category in df["category"].unique(): category_data = df[df["category"] == category] sheet_name = category[:31] # Excel sheet name limit category_data.to_excel(writer, sheet_name=sheet_name, index=False) elif format_type == "csv": df.to_csv(output_file_path, index=False) elif format_type == "html": df.to_html(output_file_path, index=False, escape=False) self.logger.info(f"Report saved to {output_file_path}")
[docs] def generate_summary_stats(self, data: List[Dict[str, Any]]) -> Dict[str, Any]: """Generate summary statistics for the collection""" if not data: return {} df = pd.DataFrame(data) stats = { "total_items": len(df), "categories": df["category"].unique().tolist(), "items_per_category": df["category"].value_counts().to_dict(), "top_artists": df["artist"].value_counts().head(10).to_dict(), "top_labels": df["label"].value_counts().head(10).to_dict(), "formats": df["format"].value_counts().to_dict(), "years": df["year"].value_counts().sort_index().to_dict(), } return stats