slow-reader/backend/main.py

526 lines
18 KiB
Python

"""Main FastAPI application for Gentle Momentum Reader."""
import asyncio
import json
import re
from typing import Dict, List
from urllib.parse import urlparse
import nltk
import requests
import textstat
from bs4 import BeautifulSoup
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel
from readability import Document
try:
nltk.data.find("tokenizers/punkt")
except LookupError:
nltk.download("punkt")
try:
nltk.data.find("tokenizers/punkt_tab")
except LookupError:
nltk.download("punkt_tab")
app = FastAPI(title="Gentle Momentum Reader")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
class TextRequest(BaseModel):
"""Request model for text analysis."""
text: str
base_speed: float = 200.0 # words per minute
class URLRequest(BaseModel):
"""Request model for URL article extraction."""
url: str
class PaceCalculator:
"""Calculate optimal reading pace based on text analysis."""
def __init__(self, base_wpm: float = 200.0):
self.base_wpm = float(base_wpm) # Ensure it's always a float
self.complex_words = set()
def analyze_word_complexity(self, word: str) -> float:
"""Analyze individual word complexity."""
word_clean = re.sub(r'[^\w]', '', word.lower())
if len(word_clean) <= 3:
return -0.05 # Simple words 5% faster
elif len(word_clean) >= 8:
return 0.10 # Complex words 10% slower
elif word_clean in ["the", "and", "for", "are", "but", "not", "you", "all", "can", "had", "her", "was", "one", "our", "out", "day", "get", "has", "him", "his", "how", "its", "new", "now", "old", "see", "two", "way", "who", "boy", "did", "man", "run", "say", "she", "too", "use"]:
return -0.05 # Common words faster
# Check for technical terms (contains digits, capital letters mid-word)
if re.search(r'\d', word) or re.search(r'[A-Z]', word[1:]):
return 0.10
return 0.0
def analyze_sentence_complexity(self, sentence: str) -> float:
"""Analyze sentence complexity."""
words = sentence.split()
word_count = len(words)
if word_count > 20:
return 0.05 # Long sentences 5% slower
elif word_count < 8:
return -0.05 # Short sentences 5% faster
return 0.0
def analyze_content_type(self, text: str) -> float:
"""Analyze content type for base adjustment."""
# Simple heuristics for content type
technical_keywords = ["algorithm", "function", "variable", "parameter", "implementation", "configuration"]
academic_keywords = ["research", "study", "analysis", "methodology", "hypothesis", "conclusion"]
text_lower = text.lower()
if any(keyword in text_lower for keyword in technical_keywords):
return 0.15 # Technical content 15% slower
elif any(keyword in text_lower for keyword in academic_keywords):
return 0.10 # Academic content 10% slower
return 0.0
def get_suggested_wpm(self, text: str) -> Dict[str, float]:
"""Suggest optimal WPM based on text difficulty using textstat."""
# Get various readability scores
flesch_reading_ease = textstat.flesch_reading_ease(text)
flesch_kincaid_grade = textstat.flesch_kincaid_grade(text)
gunning_fog = textstat.gunning_fog(text)
# Base WPM suggestions based on reading level
# Average adult reads 200-250 WPM for normal text
if flesch_reading_ease >= 90: # Very Easy (5th grade)
suggested_wpm = 280
difficulty = "Very Easy"
elif flesch_reading_ease >= 80: # Easy (6th grade)
suggested_wpm = 250
difficulty = "Easy"
elif flesch_reading_ease >= 70: # Fairly Easy (7th grade)
suggested_wpm = 220
difficulty = "Fairly Easy"
elif flesch_reading_ease >= 60: # Standard (8th-9th grade)
suggested_wpm = 200
difficulty = "Standard"
elif flesch_reading_ease >= 50: # Fairly Difficult (10th-12th grade)
suggested_wpm = 180
difficulty = "Fairly Difficult"
elif flesch_reading_ease >= 30: # Difficult (college level)
suggested_wpm = 160
difficulty = "Difficult"
else: # Very Difficult (graduate level)
suggested_wpm = 140
difficulty = "Very Difficult"
# Adjust based on Gunning Fog (sentence complexity)
if gunning_fog > 16: # Graduate level
suggested_wpm *= 0.85
elif gunning_fog > 13: # College level
suggested_wpm *= 0.9
elif gunning_fog < 8: # Easy reading
suggested_wpm *= 1.1
return {
"suggested_wpm": round(suggested_wpm),
"difficulty_level": difficulty,
"flesch_reading_ease": round(flesch_reading_ease, 1),
"flesch_kincaid_grade": round(flesch_kincaid_grade, 1),
"gunning_fog": round(gunning_fog, 1),
"reading_time_minutes": round(len(text.split()) / suggested_wpm, 1)
}
def calculate_reading_speed(self, text: str) -> Dict[str, float]:
"""Calculate reading speed adjustments for text."""
sentences = nltk.sent_tokenize(text)
total_adjustment = 0.0
sentence_adjustments = []
content_type_adjustment = self.analyze_content_type(text)
for sentence in sentences:
sentence_adj = self.analyze_sentence_complexity(sentence)
words = sentence.split()
word_adjustments = []
for word in words:
word_adj = self.analyze_word_complexity(word)
word_adjustments.append(word_adj)
avg_word_adj = sum(word_adjustments) / len(word_adjustments) if word_adjustments else 0.0
final_sentence_adj = sentence_adj + avg_word_adj + content_type_adjustment
sentence_adjustments.append({
"text": sentence,
"adjustment": final_sentence_adj,
"word_count": len(words)
})
total_adjustment += final_sentence_adj
avg_adjustment = total_adjustment / len(sentences) if sentences else 0.0
final_wpm = self.base_wpm * (1 + avg_adjustment)
# Get text difficulty suggestion
suggestion = self.get_suggested_wpm(text)
return {
"base_wpm": self.base_wpm,
"adjustment_factor": avg_adjustment,
"final_wpm": final_wpm,
"sentences": sentence_adjustments,
"text_difficulty": suggestion
}
class ReadingSession:
"""Manage individual reading session state."""
def __init__(self, websocket: WebSocket, text: str, analysis: Dict):
self.websocket = websocket
self.words = text.split()
self.analysis = analysis
self.current_word_index = -1
self.is_paused = False
self.is_stopped = False
self.words_per_minute = analysis["final_wpm"]
self.seconds_per_word = 60.0 / self.words_per_minute
self.session_task = None
class ConnectionManager:
"""Manage WebSocket connections and reading sessions."""
def __init__(self):
self.active_connections: List[WebSocket] = []
self.sessions: Dict[WebSocket, ReadingSession] = {}
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
def disconnect(self, websocket: WebSocket):
if websocket in self.active_connections:
self.active_connections.remove(websocket)
if websocket in self.sessions:
session = self.sessions[websocket]
session.is_stopped = True
if session.session_task:
session.session_task.cancel()
del self.sessions[websocket]
manager = ConnectionManager()
def extract_article_from_url(url: str) -> Dict[str, str]:
"""Extract article content from URL using readability-lxml."""
# Validate URL
parsed = urlparse(url)
if not parsed.scheme or not parsed.netloc:
raise ValueError("Invalid URL format")
# Add protocol if missing
if not url.startswith(('http://', 'https://')):
url = 'https://' + url
try:
# Fetch the page
headers = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status()
# Method 1: Try readability-lxml first (most reliable for articles)
doc = Document(response.text)
title = doc.title()
content_html = doc.summary()
if content_html:
# Parse the extracted HTML to get clean text
soup = BeautifulSoup(content_html, 'html.parser')
# Get all paragraph text
paragraphs = soup.find_all(['p', 'div', 'span'])
text_parts = []
for p in paragraphs:
p_text = p.get_text().strip()
if len(p_text) > 20: # Filter out short snippets
text_parts.append(p_text)
text = '\n\n'.join(text_parts)
if text and len(text.strip()) > 100:
return {
"title": title or "Article",
"text": clean_article_text(text),
"url": url,
"method": "readability"
}
# Method 2: Fallback to manual BeautifulSoup extraction
soup = BeautifulSoup(response.text, 'html.parser')
# Remove unwanted elements
for element in soup.find_all(['script', 'style', 'nav', 'header', 'footer', 'aside', 'advertisement']):
element.decompose()
# Try common article selectors
article_selectors = [
'article',
'[role="main"]',
'.article-content',
'.post-content',
'.entry-content',
'.content',
'#content',
'.story-body',
'.article-body'
]
title = ""
text = ""
# Extract title
title_selectors = ['h1', 'title', '.headline', '.article-title']
for selector in title_selectors:
title_elem = soup.select_one(selector)
if title_elem:
title = title_elem.get_text().strip()
break
# Extract article text
for selector in article_selectors:
article_elem = soup.select_one(selector)
if article_elem:
# Get all paragraph text
paragraphs = article_elem.find_all(['p', 'div'], recursive=True)
text_parts = []
for p in paragraphs:
p_text = p.get_text().strip()
if len(p_text) > 20: # Filter out short snippets
text_parts.append(p_text)
text = '\n\n'.join(text_parts)
break
# Fallback: get all paragraph text from the page
if not text or len(text.strip()) < 100:
paragraphs = soup.find_all('p')
text_parts = []
for p in paragraphs:
p_text = p.get_text().strip()
if len(p_text) > 30: # Longer threshold for fallback
text_parts.append(p_text)
text = '\n\n'.join(text_parts[:20]) # Limit to first 20 paragraphs
if text and len(text.strip()) > 100:
return {
"title": title or "Article",
"text": clean_article_text(text),
"url": url,
"method": "beautifulsoup"
}
except Exception as e:
raise ValueError(f"Failed to extract article: {str(e)}")
raise ValueError("Could not extract readable content from this URL")
def clean_article_text(text: str) -> str:
"""Clean and format extracted article text."""
# Remove excessive whitespace
text = re.sub(r'\s+', ' ', text)
# Remove excessive line breaks
text = re.sub(r'\n\s*\n\s*\n+', '\n\n', text)
# Remove common web artifacts
text = re.sub(r'(Advertisement|ADVERTISEMENT|Subscribe|Newsletter)', '', text, flags=re.IGNORECASE)
# Fix sentence spacing
text = re.sub(r'\.(\w)', r'. \1', text)
# Remove lines that are too short (likely navigation/metadata)
lines = text.split('\n')
cleaned_lines = []
for line in lines:
line = line.strip()
if len(line) > 20 and not re.match(r'^(By |Share |Tweet |Email)', line):
cleaned_lines.append(line)
return '\n\n'.join(cleaned_lines).strip()
@app.post("/analyze-text")
async def analyze_text(request: TextRequest):
"""Analyze text and return reading pace calculations."""
calculator = PaceCalculator(request.base_speed)
analysis = calculator.calculate_reading_speed(request.text)
return {
"analysis": analysis,
"word_count": len(request.text.split()),
"estimated_time_minutes": len(request.text.split()) / analysis["final_wpm"]
}
@app.post("/extract-article")
async def extract_article(request: URLRequest):
"""Extract article content from URL."""
try:
result = extract_article_from_url(request.url)
word_count = len(result["text"].split())
return {
"title": result["title"],
"text": result["text"],
"url": result["url"],
"word_count": word_count,
"extraction_method": result["method"],
"success": True
}
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
@app.websocket("/ws/reading-session")
async def websocket_endpoint(websocket: WebSocket):
"""WebSocket endpoint for real-time reading sessions."""
await manager.connect(websocket)
try:
while True:
data = await websocket.receive_text()
message = json.loads(data)
if message["type"] == "start_reading":
text = message["text"]
base_speed = float(message.get("base_speed", 200.0))
calculator = PaceCalculator(base_speed)
analysis = calculator.calculate_reading_speed(text)
# Create session
session = ReadingSession(websocket, text, analysis)
manager.sessions[websocket] = session
# Send analysis back
await websocket.send_text(json.dumps({
"type": "analysis_complete",
"analysis": analysis,
"total_words": len(session.words)
}))
# Start reading session as background task
session.session_task = asyncio.create_task(
start_reading_session(session)
)
elif message["type"] == "pause":
if websocket in manager.sessions:
session = manager.sessions[websocket]
session.is_paused = True
await websocket.send_text(json.dumps({
"type": "paused"
}))
elif message["type"] == "resume":
if websocket in manager.sessions:
session = manager.sessions[websocket]
session.is_paused = False
await websocket.send_text(json.dumps({
"type": "resumed"
}))
elif message["type"] == "stop":
if websocket in manager.sessions:
session = manager.sessions[websocket]
session.is_stopped = True
if session.session_task:
session.session_task.cancel()
await websocket.send_text(json.dumps({
"type": "stopped"
}))
break
except WebSocketDisconnect:
manager.disconnect(websocket)
async def start_reading_session(session: ReadingSession):
"""Start the reading session with progressive text reveal."""
try:
for i, word in enumerate(session.words):
if session.is_stopped:
break
# Wait while paused
while session.is_paused and not session.is_stopped:
await asyncio.sleep(0.1)
if session.is_stopped:
break
session.current_word_index = i
# Send word reveal - this reveals the word progressively
await session.websocket.send_text(json.dumps({
"type": "word_reveal",
"word_index": i,
"word": word,
"revealed_words": session.words[:i+1], # All words up to current
"progress": (i + 1) / len(session.words)
}))
# Wait for the calculated time
await asyncio.sleep(session.seconds_per_word)
# Session complete
if not session.is_stopped:
await session.websocket.send_text(json.dumps({
"type": "session_complete"
}))
except (WebSocketDisconnect, asyncio.CancelledError):
pass
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)