refactor: fix image rendering and add logging
This commit is contained in:
parent
9c4e320063
commit
25e80a7b2e
|
@ -39,4 +39,6 @@ media/
|
|||
|
||||
# OS
|
||||
.DS_Store
|
||||
Thumbs.db
|
||||
Thumbs.db
|
||||
|
||||
output/
|
|
@ -1,5 +1,6 @@
|
|||
import os
|
||||
import random
|
||||
import logging
|
||||
import genanki
|
||||
from datetime import datetime
|
||||
from typing import List, Tuple, Optional
|
||||
|
@ -7,12 +8,21 @@ from .models import GermanWord, UnsplashImage
|
|||
from .clients.llm import AnthropicClient
|
||||
from .clients.unsplash import UnsplashClient
|
||||
|
||||
class GermanDeckPackage(genanki.Package):
|
||||
"""Custom Package class to include media files"""
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class GermanDeckPackage:
|
||||
"""Package class to create Anki deck with media files"""
|
||||
def __init__(self, deck, media_files):
|
||||
super().__init__(deck)
|
||||
self.deck = deck
|
||||
self.media_files = media_files
|
||||
|
||||
def write_to_file(self, file):
|
||||
"""Write deck to a file"""
|
||||
package = genanki.Package([self.deck])
|
||||
package.media_files = self.media_files
|
||||
logger.info("Writing deck with media files: %s", self.media_files)
|
||||
package.write_to_file(file)
|
||||
|
||||
class CardGenerator:
|
||||
def __init__(self, llm_client: AnthropicClient, unsplash_client: UnsplashClient):
|
||||
self.llm_client = llm_client
|
||||
|
@ -46,7 +56,7 @@ class CardGenerator:
|
|||
<div style="font-size: 24px;">{{German_Word}}</div>
|
||||
<div style="font-size: 18px;">Part of speech: {{Part_of_Speech}}</div>
|
||||
<div style="font-size: 14px;">Source: {{Source}}</div>
|
||||
{{#Image}}<div><img src="{{text:Image}}" style="max-width: 300px; max-height: 200px;"></div>{{/Image}}
|
||||
{{Image}}
|
||||
''',
|
||||
'afmt': '''
|
||||
{{FrontSide}}
|
||||
|
@ -79,29 +89,65 @@ class CardGenerator:
|
|||
'''
|
||||
)
|
||||
|
||||
def _load_processed_words(self) -> set:
|
||||
"""Load previously processed words from tracking file"""
|
||||
processed_words = set()
|
||||
tracking_file = os.path.join("output", "processed_words.txt")
|
||||
if os.path.exists(tracking_file):
|
||||
with open(tracking_file, 'r', encoding='utf-8') as f:
|
||||
processed_words = set(line.strip() for line in f if line.strip())
|
||||
logger.info("Found %d previously processed words", len(processed_words))
|
||||
return processed_words
|
||||
|
||||
def _add_to_processed_words(self, word: str):
|
||||
"""Add a word to the tracking file"""
|
||||
tracking_file = os.path.join("output", "processed_words.txt")
|
||||
os.makedirs("output", exist_ok=True)
|
||||
with open(tracking_file, 'a', encoding='utf-8') as f:
|
||||
f.write(f"{word}\n")
|
||||
logger.debug("Added %s to processed words", word)
|
||||
|
||||
def create_deck(self, word_list: List[Tuple[str, str]], deck_name: str = "German Vocabulary") -> Tuple[genanki.Deck, List[str]]:
|
||||
"""Create an Anki deck from a list of words"""
|
||||
deck_id = random.randrange(1 << 30, 1 << 31)
|
||||
deck = genanki.Deck(deck_id, deck_name)
|
||||
media_files = []
|
||||
|
||||
# Load previously processed words
|
||||
processed_words = self._load_processed_words()
|
||||
|
||||
for word, source in word_list:
|
||||
# Skip if word has been processed before
|
||||
if word in processed_words:
|
||||
logger.info("Skipping %s - already exists in previous deck", word)
|
||||
continue
|
||||
try:
|
||||
card_info_dict = self.llm_client.get_card_info(word, source)
|
||||
card_info = GermanWord(**card_info_dict)
|
||||
|
||||
image_filename = f"{word.lower().replace(' ', '_')}.jpg"
|
||||
|
||||
# Get image
|
||||
image_filename = f"media/{word.lower().replace(' ', '_')}.jpg"
|
||||
# Create output directory if it doesn't exist
|
||||
output_dir = "output"
|
||||
os.makedirs(output_dir, exist_ok=True)
|
||||
|
||||
media_dir = "media"
|
||||
os.makedirs(media_dir, exist_ok=True)
|
||||
|
||||
# Save image directly to media directory
|
||||
image_path = os.path.join(media_dir, image_filename)
|
||||
image = self.unsplash_client.get_image(
|
||||
card_info.image_search_term,
|
||||
image_filename
|
||||
image_path
|
||||
)
|
||||
|
||||
if image and image.local_path:
|
||||
if image and image.local_path and os.path.exists(image.local_path):
|
||||
# Use the full path for the media file
|
||||
media_files.append(image.local_path)
|
||||
image_filename = os.path.basename(image.local_path)
|
||||
logger.info("Added image: %s", image.local_path)
|
||||
else:
|
||||
image_filename = ""
|
||||
image_path = ""
|
||||
logger.warning("No image for: %s", word)
|
||||
|
||||
# Create note
|
||||
note = genanki.Note(
|
||||
|
@ -117,15 +163,17 @@ class CardGenerator:
|
|||
card_info.sentence_translation,
|
||||
card_info.usage_notes,
|
||||
card_info.related_words,
|
||||
image_filename,
|
||||
f'<img src="{os.path.basename(image.local_path) if image and image.local_path else ""}" />',
|
||||
image.photographer if image else "",
|
||||
' '.join(card_info.tags)
|
||||
]
|
||||
)
|
||||
deck.add_note(note)
|
||||
print(f"Added card for: {word}")
|
||||
# Track the word after successfully creating the note
|
||||
self._add_to_processed_words(word)
|
||||
logger.info("Added card for: %s", word)
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error creating card for {word}: {str(e)}")
|
||||
logger.error("Error creating card for %s: %s", word, str(e))
|
||||
|
||||
return deck, media_files
|
||||
|
|
|
@ -2,6 +2,7 @@ from typing import Optional
|
|||
import requests
|
||||
from PIL import Image
|
||||
from io import BytesIO
|
||||
import os
|
||||
|
||||
from ..models import Settings, UnsplashImage
|
||||
from anki_generator.settings import get_settings, Settings
|
||||
|
@ -11,7 +12,7 @@ class UnsplashClient:
|
|||
self.settings = settings or get_settings()
|
||||
self.api_key = self.settings.unsplash_api_key.get_secret_value()
|
||||
|
||||
def get_image(self, search_term: str, save_path: str) -> Optional[UnsplashImage]:
|
||||
def get_image(self, search_term: str, filename: str) -> Optional[UnsplashImage]:
|
||||
"""Fetch and save an image from Unsplash"""
|
||||
url = "https://api.unsplash.com/search/photos"
|
||||
headers = {"Authorization": f"Client-ID {self.api_key}"}
|
||||
|
@ -37,15 +38,18 @@ class UnsplashClient:
|
|||
img_response = requests.get(image_url)
|
||||
img_response.raise_for_status()
|
||||
|
||||
# Create directory if it doesn't exist
|
||||
os.makedirs(os.path.dirname(filename) or '.', exist_ok=True)
|
||||
|
||||
# Process and save image
|
||||
img = Image.open(BytesIO(img_response.content))
|
||||
img.thumbnail((800, 600))
|
||||
img.save(save_path, "JPEG", quality=85)
|
||||
img.save(filename, "JPEG", quality=85)
|
||||
|
||||
return UnsplashImage(
|
||||
url=image_url,
|
||||
photographer=photographer,
|
||||
local_path=save_path
|
||||
local_path=filename
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
|
|
|
@ -1,24 +1,29 @@
|
|||
import os
|
||||
import argparse
|
||||
import logging
|
||||
from datetime import datetime
|
||||
from typing import List, Tuple
|
||||
import shutil
|
||||
import glob
|
||||
from typing import List, Tuple, Optional
|
||||
from pathlib import Path
|
||||
|
||||
from anki_generator.clients.llm import AnthropicClient
|
||||
from anki_generator.clients.unsplash import UnsplashClient
|
||||
from anki_generator.card_generator import CardGenerator, GermanDeckPackage
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
def read_word_list(file_path: str) -> List[Tuple[str, str]]:
|
||||
"""Read word list from file"""
|
||||
words = []
|
||||
print(f"Reading file: {file_path}")
|
||||
logger.info("Reading file: %s", file_path)
|
||||
with open(file_path, 'r', encoding='utf-8') as f:
|
||||
for line in f:
|
||||
print(f"Processing line: {line.strip()}")
|
||||
logger.debug("Processing line: %s", line.strip())
|
||||
# Expected format: word,source
|
||||
word, source = line.strip().split(',', 1)
|
||||
words.append((word.strip(), source.strip()))
|
||||
print(f"Added word: {word.strip()} with source: {source.strip()}")
|
||||
logger.debug("Added word: %s with source: %s", word.strip(), source.strip())
|
||||
return words
|
||||
|
||||
def main():
|
||||
|
@ -29,35 +34,41 @@ def main():
|
|||
|
||||
args = parser.parse_args()
|
||||
|
||||
print("Loading program")
|
||||
logger.info("Starting Anki card generation")
|
||||
|
||||
# Create media directory
|
||||
os.makedirs('media', exist_ok=True)
|
||||
try:
|
||||
llm_client = AnthropicClient()
|
||||
unsplash_client = UnsplashClient()
|
||||
|
||||
# Initialize clients
|
||||
llm_client = AnthropicClient()
|
||||
unsplash_client = UnsplashClient()
|
||||
generator = CardGenerator(llm_client, unsplash_client)
|
||||
|
||||
# Initialize card generator
|
||||
generator = CardGenerator(llm_client, unsplash_client)
|
||||
words = read_word_list(args.input)
|
||||
|
||||
# Read word list
|
||||
words = read_word_list(args.input)
|
||||
deck, media_files = generator.create_deck(words, args.deck_name)
|
||||
|
||||
# Generate deck
|
||||
deck, media_files = generator.create_deck(words, args.deck_name)
|
||||
output_dir = "output"
|
||||
os.makedirs(output_dir, exist_ok=True)
|
||||
|
||||
# Generate output filename if not provided
|
||||
if not args.output:
|
||||
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||||
output_file = f"german_vocab_{timestamp}.apkg"
|
||||
else:
|
||||
output_file = args.output
|
||||
if not args.output:
|
||||
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||||
output_file = os.path.join(output_dir, f"german_vocab_{timestamp}.apkg")
|
||||
else:
|
||||
output_file = os.path.join(output_dir, args.output)
|
||||
|
||||
# Save deck
|
||||
package = GermanDeckPackage(deck, media_files)
|
||||
package.write_to_file(output_file)
|
||||
print(f"Deck saved as: {output_file}")
|
||||
media_paths = []
|
||||
for media_file in media_files:
|
||||
media_path = os.path.join(output_dir, os.path.basename(media_file))
|
||||
if os.path.exists(media_file) and media_file != media_path:
|
||||
shutil.copy2(media_file, media_path)
|
||||
media_paths.append(media_path)
|
||||
|
||||
package = GermanDeckPackage(deck, media_paths)
|
||||
package.write_to_file(output_file)
|
||||
logger.info("Deck and media files saved in: %s", output_dir)
|
||||
|
||||
except Exception as e:
|
||||
logger.error("Error: %s", str(e))
|
||||
raise
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
|
|
@ -1,8 +1,17 @@
|
|||
import logging
|
||||
from typing import Optional
|
||||
from functools import lru_cache
|
||||
from pydantic import SecretStr
|
||||
from pydantic_settings import BaseSettings, SettingsConfigDict
|
||||
|
||||
def setup_logging():
|
||||
"""Configure logging for the application."""
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
|
||||
datefmt='%Y-%m-%d %H:%M:%S'
|
||||
)
|
||||
|
||||
class Settings(BaseSettings):
|
||||
"""Application settings that are loaded from environment variables."""
|
||||
|
||||
|
@ -44,4 +53,5 @@ def get_settings() -> Settings:
|
|||
Returns:
|
||||
Settings: Application settings instance
|
||||
"""
|
||||
return Settings()
|
||||
setup_logging()
|
||||
return Settings()
|
||||
|
|
|
@ -1,5 +1,3 @@
|
|||
Katze,Duolingo
|
||||
Hund,Book: German Made Simple
|
||||
Apfel,Language Exchange
|
||||
Brot,DW Learn German
|
||||
Wasser,Memrise
|
||||
Bauch,Babbel
|
||||
Kopf,Babbel
|
||||
Rücken,Babbel
|
||||
|
|
|
Loading…
Reference in New Issue