237 lines
8.2 KiB
Python
Executable File
237 lines
8.2 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
import os
|
|
import shutil
|
|
import sys
|
|
from pathlib import Path
|
|
import re
|
|
from typing import Set, Dict, Union
|
|
import logging
|
|
from datetime import datetime
|
|
from dotenv import load_dotenv
|
|
|
|
# Configure logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(levelname)s - %(message)s',
|
|
handlers=[
|
|
logging.StreamHandler(),
|
|
logging.FileHandler(f"/tmp/publish_log_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log")
|
|
]
|
|
)
|
|
|
|
def load_env_vars() -> tuple[Path, Path]:
|
|
"""Load environment variables and return source and destination directories."""
|
|
# Load .env file from the same directory as the script
|
|
script_dir = Path(__file__).parent.parent
|
|
env_path = script_dir / '.env'
|
|
|
|
if not env_path.exists():
|
|
logging.error(f".env file not found at {env_path}")
|
|
sys.exit(1)
|
|
|
|
load_dotenv(env_path)
|
|
|
|
source_dir = os.getenv('SOURCE_DIR')
|
|
dest_dir = os.getenv('DEST_DIR')
|
|
|
|
if not source_dir or not dest_dir:
|
|
logging.error("SOURCE_DIR and DEST_DIR must be set in .env file")
|
|
sys.exit(1)
|
|
|
|
# Expand environment variables like $HOME
|
|
source_dir = os.path.expandvars(source_dir)
|
|
dest_dir = os.path.expandvars(dest_dir)
|
|
|
|
return Path(source_dir), Path(dest_dir)
|
|
|
|
def get_publish_status(file_path: Path) -> Union[bool, None]:
|
|
"""Check if a markdown file has publish: true/false in its frontmatter."""
|
|
try:
|
|
with open(file_path, 'r', encoding='utf-8') as f:
|
|
content = f.read()
|
|
match = re.search(r'^---\n.*?publish:\s*(true|false).*?\n---', content, re.DOTALL | re.MULTILINE)
|
|
if match:
|
|
return match.group(1).lower() == 'true'
|
|
except Exception as e:
|
|
logging.error(f"Error reading {file_path}: {e}")
|
|
return None
|
|
|
|
def extract_image_references(content: str) -> Set[str]:
|
|
"""Extract image references from markdown content."""
|
|
# Match both ![[image.png]] and  syntax
|
|
obsidian_images = set(re.findall(r'!\[\[(.*?)\]\]', content))
|
|
markdown_images = set(re.findall(r'!\[.*?\]\((.*?)\)', content))
|
|
return obsidian_images.union(markdown_images)
|
|
|
|
def find_image_in_source(image_ref: str, source_dir: Path) -> Union[Path, None]:
|
|
"""Find an image file in the source directory."""
|
|
# First try exact path relative to source_dir
|
|
full_path = source_dir / image_ref
|
|
if full_path.exists():
|
|
if full_path.is_symlink():
|
|
resolved = full_path.resolve()
|
|
if resolved.is_file():
|
|
return resolved
|
|
elif full_path.is_file():
|
|
return full_path
|
|
|
|
# If exact path doesn't work, try finding by filename
|
|
image_name = Path(image_ref).name
|
|
|
|
# Search for the image recursively, following symlinks
|
|
for path in source_dir.rglob(image_name):
|
|
if path.is_symlink():
|
|
resolved_path = path.resolve()
|
|
if resolved_path.is_file():
|
|
return resolved_path
|
|
elif path.is_file():
|
|
return path
|
|
|
|
# If still not found, try searching with spaces replaced by hyphens
|
|
image_name_alt = image_name.replace(' ', '-')
|
|
for path in source_dir.rglob(image_name_alt):
|
|
if path.is_symlink():
|
|
resolved_path = path.resolve()
|
|
if resolved_path.is_file():
|
|
return resolved_path
|
|
elif path.is_file():
|
|
return path
|
|
|
|
return None
|
|
|
|
def process_images(content: str, source_dir: Path, dest_dir: Path) -> str:
|
|
"""Process and copy images, updating references in content."""
|
|
images_dir = dest_dir / "images"
|
|
images_dir.mkdir(exist_ok=True)
|
|
|
|
# Get all image references
|
|
image_refs = extract_image_references(content)
|
|
|
|
# Process each image reference
|
|
for img_ref in image_refs:
|
|
img_path = find_image_in_source(img_ref, source_dir)
|
|
if img_path:
|
|
# Copy image to images directory
|
|
dest_image = images_dir / img_path.name
|
|
if not dest_image.exists():
|
|
shutil.copy2(img_path, dest_image)
|
|
logging.info(f"Copied image: {img_path.name}")
|
|
|
|
# Update reference in content
|
|
if '![[' in content:
|
|
# Convert Obsidian to Markdown style
|
|
content = content.replace(f'![[{img_ref}]]', f'')
|
|
else:
|
|
# Update regular Markdown style
|
|
content = content.replace(f'({img_ref})', f'(/images/{img_path.name})')
|
|
else:
|
|
logging.warning(f"Image not found: {img_ref}")
|
|
|
|
return content
|
|
|
|
def clean_destination(dest_dir: Path):
|
|
"""Clean the destination directory."""
|
|
for item in dest_dir.iterdir():
|
|
if item.is_file():
|
|
item.unlink()
|
|
elif item.is_dir() and item.name != "images":
|
|
shutil.rmtree(item)
|
|
logging.info(f"Cleaned destination directory: {dest_dir}")
|
|
|
|
def sync_files(source_dir: Path, dest_dir: Path, clean: bool = False):
|
|
"""Synchronize files between source and destination directories."""
|
|
if clean:
|
|
clean_destination(dest_dir)
|
|
|
|
# Create images directory if it doesn't exist
|
|
images_dir = dest_dir / "images"
|
|
images_dir.mkdir(exist_ok=True)
|
|
|
|
# Track files to keep in destination
|
|
files_to_keep = set()
|
|
|
|
# Process markdown files
|
|
for source_file in source_dir.rglob("*.md"):
|
|
# Resolve symbolic links
|
|
actual_file = source_file.resolve() if source_file.is_symlink() else source_file
|
|
publish_status = get_publish_status(actual_file)
|
|
if publish_status is None:
|
|
continue
|
|
|
|
# Calculate relative path and destination path
|
|
rel_path = source_file.relative_to(source_dir)
|
|
dest_file = dest_dir / rel_path
|
|
|
|
if publish_status:
|
|
# Ensure parent directories exist
|
|
dest_file.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Read and process content
|
|
with open(source_file, 'r', encoding='utf-8') as f:
|
|
content = f.read()
|
|
|
|
# Process images and update references
|
|
content = process_images(content, source_dir, dest_dir)
|
|
|
|
# Write processed content
|
|
with open(dest_file, 'w', encoding='utf-8') as f:
|
|
f.write(content)
|
|
|
|
logging.info(f"Processed: {rel_path}")
|
|
files_to_keep.add(dest_file)
|
|
else:
|
|
# Remove if publish is false and file exists
|
|
if dest_file.exists():
|
|
dest_file.unlink()
|
|
logging.info(f"Removed: {rel_path}")
|
|
|
|
# Clean up old files in destination that weren't in source
|
|
for dest_file in dest_dir.rglob("*.md"):
|
|
if dest_file not in files_to_keep:
|
|
dest_file.unlink()
|
|
logging.info(f"Removed old file: {dest_file.relative_to(dest_dir)}")
|
|
|
|
# Clean up unused images
|
|
used_images = set()
|
|
for dest_file in dest_dir.rglob("*.md"):
|
|
with open(dest_file, 'r', encoding='utf-8') as f:
|
|
content = f.read()
|
|
# Extract image filenames from markdown links
|
|
used_images.update(re.findall(r'!\[.*?\]\(/images/(.*?)\)', content))
|
|
|
|
# Remove unused images
|
|
for image_file in images_dir.iterdir():
|
|
if image_file.name not in used_images:
|
|
image_file.unlink()
|
|
logging.info(f"Removed unused image: {image_file.name}")
|
|
|
|
def main():
|
|
"""Main function."""
|
|
# Parse command line arguments
|
|
clean = len(sys.argv) > 1 and sys.argv[1].lower() == 'clean'
|
|
|
|
try:
|
|
# Load environment variables
|
|
source_dir, dest_dir = load_env_vars()
|
|
|
|
# Validate directories
|
|
if not source_dir.exists():
|
|
logging.error(f"Source directory does not exist: {source_dir}")
|
|
sys.exit(1)
|
|
|
|
# Create destination directory if it doesn't exist
|
|
dest_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Sync files
|
|
sync_files(source_dir, dest_dir, clean)
|
|
|
|
logging.info("Sync completed successfully")
|
|
|
|
except Exception as e:
|
|
logging.error(f"An error occurred: {e}")
|
|
sys.exit(1)
|
|
|
|
if __name__ == "__main__":
|
|
main()
|