notes.velouria.dev/scripts/publish.py

237 lines
8.2 KiB
Python
Executable File

#!/usr/bin/env python3
import os
import shutil
import sys
from pathlib import Path
import re
from typing import Set, Dict, Union
import logging
from datetime import datetime
from dotenv import load_dotenv
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(),
logging.FileHandler(f"/tmp/publish_log_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log")
]
)
def load_env_vars() -> tuple[Path, Path]:
"""Load environment variables and return source and destination directories."""
# Load .env file from the same directory as the script
script_dir = Path(__file__).parent.parent
env_path = script_dir / '.env'
if not env_path.exists():
logging.error(f".env file not found at {env_path}")
sys.exit(1)
load_dotenv(env_path)
source_dir = os.getenv('SOURCE_DIR')
dest_dir = os.getenv('DEST_DIR')
if not source_dir or not dest_dir:
logging.error("SOURCE_DIR and DEST_DIR must be set in .env file")
sys.exit(1)
# Expand environment variables like $HOME
source_dir = os.path.expandvars(source_dir)
dest_dir = os.path.expandvars(dest_dir)
return Path(source_dir), Path(dest_dir)
def get_publish_status(file_path: Path) -> Union[bool, None]:
"""Check if a markdown file has publish: true/false in its frontmatter."""
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
match = re.search(r'^---\n.*?publish:\s*(true|false).*?\n---', content, re.DOTALL | re.MULTILINE)
if match:
return match.group(1).lower() == 'true'
except Exception as e:
logging.error(f"Error reading {file_path}: {e}")
return None
def extract_image_references(content: str) -> Set[str]:
"""Extract image references from markdown content."""
# Match both ![[image.png]] and ![alt](image.png) syntax
obsidian_images = set(re.findall(r'!\[\[(.*?)\]\]', content))
markdown_images = set(re.findall(r'!\[.*?\]\((.*?)\)', content))
return obsidian_images.union(markdown_images)
def find_image_in_source(image_ref: str, source_dir: Path) -> Union[Path, None]:
"""Find an image file in the source directory."""
# First try exact path relative to source_dir
full_path = source_dir / image_ref
if full_path.exists():
if full_path.is_symlink():
resolved = full_path.resolve()
if resolved.is_file():
return resolved
elif full_path.is_file():
return full_path
# If exact path doesn't work, try finding by filename
image_name = Path(image_ref).name
# Search for the image recursively, following symlinks
for path in source_dir.rglob(image_name):
if path.is_symlink():
resolved_path = path.resolve()
if resolved_path.is_file():
return resolved_path
elif path.is_file():
return path
# If still not found, try searching with spaces replaced by hyphens
image_name_alt = image_name.replace(' ', '-')
for path in source_dir.rglob(image_name_alt):
if path.is_symlink():
resolved_path = path.resolve()
if resolved_path.is_file():
return resolved_path
elif path.is_file():
return path
return None
def process_images(content: str, source_dir: Path, dest_dir: Path) -> str:
"""Process and copy images, updating references in content."""
images_dir = dest_dir / "images"
images_dir.mkdir(exist_ok=True)
# Get all image references
image_refs = extract_image_references(content)
# Process each image reference
for img_ref in image_refs:
img_path = find_image_in_source(img_ref, source_dir)
if img_path:
# Copy image to images directory
dest_image = images_dir / img_path.name
if not dest_image.exists():
shutil.copy2(img_path, dest_image)
logging.info(f"Copied image: {img_path.name}")
# Update reference in content
if '![[' in content:
# Convert Obsidian to Markdown style
content = content.replace(f'![[{img_ref}]]', f'![{img_path.stem}](/images/{img_path.name})')
else:
# Update regular Markdown style
content = content.replace(f'({img_ref})', f'(/images/{img_path.name})')
else:
logging.warning(f"Image not found: {img_ref}")
return content
def clean_destination(dest_dir: Path):
"""Clean the destination directory."""
for item in dest_dir.iterdir():
if item.is_file():
item.unlink()
elif item.is_dir() and item.name != "images":
shutil.rmtree(item)
logging.info(f"Cleaned destination directory: {dest_dir}")
def sync_files(source_dir: Path, dest_dir: Path, clean: bool = False):
"""Synchronize files between source and destination directories."""
if clean:
clean_destination(dest_dir)
# Create images directory if it doesn't exist
images_dir = dest_dir / "images"
images_dir.mkdir(exist_ok=True)
# Track files to keep in destination
files_to_keep = set()
# Process markdown files
for source_file in source_dir.rglob("*.md"):
# Resolve symbolic links
actual_file = source_file.resolve() if source_file.is_symlink() else source_file
publish_status = get_publish_status(actual_file)
if publish_status is None:
continue
# Calculate relative path and destination path
rel_path = source_file.relative_to(source_dir)
dest_file = dest_dir / rel_path
if publish_status:
# Ensure parent directories exist
dest_file.parent.mkdir(parents=True, exist_ok=True)
# Read and process content
with open(source_file, 'r', encoding='utf-8') as f:
content = f.read()
# Process images and update references
content = process_images(content, source_dir, dest_dir)
# Write processed content
with open(dest_file, 'w', encoding='utf-8') as f:
f.write(content)
logging.info(f"Processed: {rel_path}")
files_to_keep.add(dest_file)
else:
# Remove if publish is false and file exists
if dest_file.exists():
dest_file.unlink()
logging.info(f"Removed: {rel_path}")
# Clean up old files in destination that weren't in source
for dest_file in dest_dir.rglob("*.md"):
if dest_file not in files_to_keep:
dest_file.unlink()
logging.info(f"Removed old file: {dest_file.relative_to(dest_dir)}")
# Clean up unused images
used_images = set()
for dest_file in dest_dir.rglob("*.md"):
with open(dest_file, 'r', encoding='utf-8') as f:
content = f.read()
# Extract image filenames from markdown links
used_images.update(re.findall(r'!\[.*?\]\(/images/(.*?)\)', content))
# Remove unused images
for image_file in images_dir.iterdir():
if image_file.name not in used_images:
image_file.unlink()
logging.info(f"Removed unused image: {image_file.name}")
def main():
"""Main function."""
# Parse command line arguments
clean = len(sys.argv) > 1 and sys.argv[1].lower() == 'clean'
try:
# Load environment variables
source_dir, dest_dir = load_env_vars()
# Validate directories
if not source_dir.exists():
logging.error(f"Source directory does not exist: {source_dir}")
sys.exit(1)
# Create destination directory if it doesn't exist
dest_dir.mkdir(parents=True, exist_ok=True)
# Sync files
sync_files(source_dir, dest_dir, clean)
logging.info("Sync completed successfully")
except Exception as e:
logging.error(f"An error occurred: {e}")
sys.exit(1)
if __name__ == "__main__":
main()