#!/usr/bin/env python3 import os import shutil import sys from pathlib import Path import re from typing import Set, Dict, Union import logging from datetime import datetime from dotenv import load_dotenv # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler(), logging.FileHandler(f"/tmp/publish_log_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log") ] ) def load_env_vars() -> tuple[Path, Path]: """Load environment variables and return source and destination directories.""" # Load .env file from the same directory as the script script_dir = Path(__file__).parent.parent env_path = script_dir / '.env' if not env_path.exists(): logging.error(f".env file not found at {env_path}") sys.exit(1) load_dotenv(env_path) source_dir = os.getenv('SOURCE_DIR') dest_dir = os.getenv('DEST_DIR') if not source_dir or not dest_dir: logging.error("SOURCE_DIR and DEST_DIR must be set in .env file") sys.exit(1) # Expand environment variables like $HOME source_dir = os.path.expandvars(source_dir) dest_dir = os.path.expandvars(dest_dir) return Path(source_dir), Path(dest_dir) def get_publish_status(file_path: Path) -> Union[bool, None]: """Check if a markdown file has publish: true/false in its frontmatter.""" try: with open(file_path, 'r', encoding='utf-8') as f: content = f.read() match = re.search(r'^---\n.*?publish:\s*(true|false).*?\n---', content, re.DOTALL | re.MULTILINE) if match: return match.group(1).lower() == 'true' except Exception as e: logging.error(f"Error reading {file_path}: {e}") return None def extract_image_references(content: str) -> Set[str]: """Extract image references from markdown content.""" # Match both ![[image.png]] and ![alt](image.png) syntax obsidian_images = set(re.findall(r'!\[\[(.*?)\]\]', content)) markdown_images = set(re.findall(r'!\[.*?\]\((.*?)\)', content)) return obsidian_images.union(markdown_images) def find_image_in_source(image_ref: str, source_dir: Path) -> Union[Path, None]: """Find an image file in the source directory.""" # First try exact path relative to source_dir full_path = source_dir / image_ref if full_path.exists(): if full_path.is_symlink(): resolved = full_path.resolve() if resolved.is_file(): return resolved elif full_path.is_file(): return full_path # If exact path doesn't work, try finding by filename image_name = Path(image_ref).name # Search for the image recursively, following symlinks for path in source_dir.rglob(image_name): if path.is_symlink(): resolved_path = path.resolve() if resolved_path.is_file(): return resolved_path elif path.is_file(): return path # If still not found, try searching with spaces replaced by hyphens image_name_alt = image_name.replace(' ', '-') for path in source_dir.rglob(image_name_alt): if path.is_symlink(): resolved_path = path.resolve() if resolved_path.is_file(): return resolved_path elif path.is_file(): return path return None def process_images(content: str, source_dir: Path, dest_dir: Path) -> str: """Process and copy images, updating references in content.""" images_dir = dest_dir / "images" images_dir.mkdir(exist_ok=True) # Get all image references image_refs = extract_image_references(content) # Process each image reference for img_ref in image_refs: img_path = find_image_in_source(img_ref, source_dir) if img_path: # Copy image to images directory dest_image = images_dir / img_path.name if not dest_image.exists(): shutil.copy2(img_path, dest_image) logging.info(f"Copied image: {img_path.name}") # Update reference in content if '![[' in content: # Convert Obsidian to Markdown style content = content.replace(f'![[{img_ref}]]', f'![{img_path.stem}](/images/{img_path.name})') else: # Update regular Markdown style content = content.replace(f'({img_ref})', f'(/images/{img_path.name})') else: logging.warning(f"Image not found: {img_ref}") return content def clean_destination(dest_dir: Path): """Clean the destination directory.""" for item in dest_dir.iterdir(): if item.is_file(): item.unlink() elif item.is_dir() and item.name != "images": shutil.rmtree(item) logging.info(f"Cleaned destination directory: {dest_dir}") def sync_files(source_dir: Path, dest_dir: Path, clean: bool = False): """Synchronize files between source and destination directories.""" if clean: clean_destination(dest_dir) # Create images directory if it doesn't exist images_dir = dest_dir / "images" images_dir.mkdir(exist_ok=True) # Track files to keep in destination files_to_keep = set() # Process markdown files for source_file in source_dir.rglob("*.md"): # Resolve symbolic links actual_file = source_file.resolve() if source_file.is_symlink() else source_file publish_status = get_publish_status(actual_file) if publish_status is None: continue # Calculate relative path and destination path rel_path = source_file.relative_to(source_dir) dest_file = dest_dir / rel_path if publish_status: # Ensure parent directories exist dest_file.parent.mkdir(parents=True, exist_ok=True) # Read and process content with open(source_file, 'r', encoding='utf-8') as f: content = f.read() # Process images and update references content = process_images(content, source_dir, dest_dir) # Write processed content with open(dest_file, 'w', encoding='utf-8') as f: f.write(content) logging.info(f"Processed: {rel_path}") files_to_keep.add(dest_file) else: # Remove if publish is false and file exists if dest_file.exists(): dest_file.unlink() logging.info(f"Removed: {rel_path}") # Clean up old files in destination that weren't in source for dest_file in dest_dir.rglob("*.md"): if dest_file not in files_to_keep: dest_file.unlink() logging.info(f"Removed old file: {dest_file.relative_to(dest_dir)}") # Clean up unused images used_images = set() for dest_file in dest_dir.rglob("*.md"): with open(dest_file, 'r', encoding='utf-8') as f: content = f.read() # Extract image filenames from markdown links used_images.update(re.findall(r'!\[.*?\]\(/images/(.*?)\)', content)) # Remove unused images for image_file in images_dir.iterdir(): if image_file.name not in used_images: image_file.unlink() logging.info(f"Removed unused image: {image_file.name}") def main(): """Main function.""" # Parse command line arguments clean = len(sys.argv) > 1 and sys.argv[1].lower() == 'clean' try: # Load environment variables source_dir, dest_dir = load_env_vars() # Validate directories if not source_dir.exists(): logging.error(f"Source directory does not exist: {source_dir}") sys.exit(1) # Create destination directory if it doesn't exist dest_dir.mkdir(parents=True, exist_ok=True) # Sync files sync_files(source_dir, dest_dir, clean) logging.info("Sync completed successfully") except Exception as e: logging.error(f"An error occurred: {e}") sys.exit(1) if __name__ == "__main__": main()