526 lines
18 KiB
Python
526 lines
18 KiB
Python
"""Main FastAPI application for Gentle Momentum Reader."""
|
|
|
|
import asyncio
|
|
import json
|
|
import re
|
|
from typing import Dict, List
|
|
from urllib.parse import urlparse
|
|
|
|
import nltk
|
|
import requests
|
|
import textstat
|
|
from bs4 import BeautifulSoup
|
|
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, HTTPException
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from fastapi.staticfiles import StaticFiles
|
|
from pydantic import BaseModel
|
|
from readability import Document
|
|
|
|
try:
|
|
nltk.data.find("tokenizers/punkt")
|
|
except LookupError:
|
|
nltk.download("punkt")
|
|
|
|
try:
|
|
nltk.data.find("tokenizers/punkt_tab")
|
|
except LookupError:
|
|
nltk.download("punkt_tab")
|
|
|
|
app = FastAPI(title="Gentle Momentum Reader")
|
|
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=["*"],
|
|
allow_credentials=True,
|
|
allow_methods=["*"],
|
|
allow_headers=["*"],
|
|
)
|
|
|
|
|
|
class TextRequest(BaseModel):
|
|
"""Request model for text analysis."""
|
|
|
|
text: str
|
|
base_speed: float = 200.0 # words per minute
|
|
|
|
|
|
class URLRequest(BaseModel):
|
|
"""Request model for URL article extraction."""
|
|
|
|
url: str
|
|
|
|
|
|
class PaceCalculator:
|
|
"""Calculate optimal reading pace based on text analysis."""
|
|
|
|
def __init__(self, base_wpm: float = 200.0):
|
|
self.base_wpm = float(base_wpm) # Ensure it's always a float
|
|
self.complex_words = set()
|
|
|
|
def analyze_word_complexity(self, word: str) -> float:
|
|
"""Analyze individual word complexity."""
|
|
word_clean = re.sub(r'[^\w]', '', word.lower())
|
|
|
|
if len(word_clean) <= 3:
|
|
return -0.05 # Simple words 5% faster
|
|
elif len(word_clean) >= 8:
|
|
return 0.10 # Complex words 10% slower
|
|
elif word_clean in ["the", "and", "for", "are", "but", "not", "you", "all", "can", "had", "her", "was", "one", "our", "out", "day", "get", "has", "him", "his", "how", "its", "new", "now", "old", "see", "two", "way", "who", "boy", "did", "man", "run", "say", "she", "too", "use"]:
|
|
return -0.05 # Common words faster
|
|
|
|
# Check for technical terms (contains digits, capital letters mid-word)
|
|
if re.search(r'\d', word) or re.search(r'[A-Z]', word[1:]):
|
|
return 0.10
|
|
|
|
return 0.0
|
|
|
|
def analyze_sentence_complexity(self, sentence: str) -> float:
|
|
"""Analyze sentence complexity."""
|
|
words = sentence.split()
|
|
word_count = len(words)
|
|
|
|
if word_count > 20:
|
|
return 0.05 # Long sentences 5% slower
|
|
elif word_count < 8:
|
|
return -0.05 # Short sentences 5% faster
|
|
|
|
return 0.0
|
|
|
|
def analyze_content_type(self, text: str) -> float:
|
|
"""Analyze content type for base adjustment."""
|
|
# Simple heuristics for content type
|
|
technical_keywords = ["algorithm", "function", "variable", "parameter", "implementation", "configuration"]
|
|
academic_keywords = ["research", "study", "analysis", "methodology", "hypothesis", "conclusion"]
|
|
|
|
text_lower = text.lower()
|
|
|
|
if any(keyword in text_lower for keyword in technical_keywords):
|
|
return 0.15 # Technical content 15% slower
|
|
elif any(keyword in text_lower for keyword in academic_keywords):
|
|
return 0.10 # Academic content 10% slower
|
|
|
|
return 0.0
|
|
|
|
def get_suggested_wpm(self, text: str) -> Dict[str, float]:
|
|
"""Suggest optimal WPM based on text difficulty using textstat."""
|
|
|
|
# Get various readability scores
|
|
flesch_reading_ease = textstat.flesch_reading_ease(text)
|
|
flesch_kincaid_grade = textstat.flesch_kincaid_grade(text)
|
|
gunning_fog = textstat.gunning_fog(text)
|
|
|
|
# Base WPM suggestions based on reading level
|
|
# Average adult reads 200-250 WPM for normal text
|
|
if flesch_reading_ease >= 90: # Very Easy (5th grade)
|
|
suggested_wpm = 280
|
|
difficulty = "Very Easy"
|
|
elif flesch_reading_ease >= 80: # Easy (6th grade)
|
|
suggested_wpm = 250
|
|
difficulty = "Easy"
|
|
elif flesch_reading_ease >= 70: # Fairly Easy (7th grade)
|
|
suggested_wpm = 220
|
|
difficulty = "Fairly Easy"
|
|
elif flesch_reading_ease >= 60: # Standard (8th-9th grade)
|
|
suggested_wpm = 200
|
|
difficulty = "Standard"
|
|
elif flesch_reading_ease >= 50: # Fairly Difficult (10th-12th grade)
|
|
suggested_wpm = 180
|
|
difficulty = "Fairly Difficult"
|
|
elif flesch_reading_ease >= 30: # Difficult (college level)
|
|
suggested_wpm = 160
|
|
difficulty = "Difficult"
|
|
else: # Very Difficult (graduate level)
|
|
suggested_wpm = 140
|
|
difficulty = "Very Difficult"
|
|
|
|
# Adjust based on Gunning Fog (sentence complexity)
|
|
if gunning_fog > 16: # Graduate level
|
|
suggested_wpm *= 0.85
|
|
elif gunning_fog > 13: # College level
|
|
suggested_wpm *= 0.9
|
|
elif gunning_fog < 8: # Easy reading
|
|
suggested_wpm *= 1.1
|
|
|
|
return {
|
|
"suggested_wpm": round(suggested_wpm),
|
|
"difficulty_level": difficulty,
|
|
"flesch_reading_ease": round(flesch_reading_ease, 1),
|
|
"flesch_kincaid_grade": round(flesch_kincaid_grade, 1),
|
|
"gunning_fog": round(gunning_fog, 1),
|
|
"reading_time_minutes": round(len(text.split()) / suggested_wpm, 1)
|
|
}
|
|
|
|
def calculate_reading_speed(self, text: str) -> Dict[str, float]:
|
|
"""Calculate reading speed adjustments for text."""
|
|
sentences = nltk.sent_tokenize(text)
|
|
|
|
total_adjustment = 0.0
|
|
sentence_adjustments = []
|
|
|
|
content_type_adjustment = self.analyze_content_type(text)
|
|
|
|
for sentence in sentences:
|
|
sentence_adj = self.analyze_sentence_complexity(sentence)
|
|
words = sentence.split()
|
|
|
|
word_adjustments = []
|
|
for word in words:
|
|
word_adj = self.analyze_word_complexity(word)
|
|
word_adjustments.append(word_adj)
|
|
|
|
avg_word_adj = sum(word_adjustments) / len(word_adjustments) if word_adjustments else 0.0
|
|
final_sentence_adj = sentence_adj + avg_word_adj + content_type_adjustment
|
|
|
|
sentence_adjustments.append({
|
|
"text": sentence,
|
|
"adjustment": final_sentence_adj,
|
|
"word_count": len(words)
|
|
})
|
|
|
|
total_adjustment += final_sentence_adj
|
|
|
|
avg_adjustment = total_adjustment / len(sentences) if sentences else 0.0
|
|
final_wpm = self.base_wpm * (1 + avg_adjustment)
|
|
|
|
# Get text difficulty suggestion
|
|
suggestion = self.get_suggested_wpm(text)
|
|
|
|
return {
|
|
"base_wpm": self.base_wpm,
|
|
"adjustment_factor": avg_adjustment,
|
|
"final_wpm": final_wpm,
|
|
"sentences": sentence_adjustments,
|
|
"text_difficulty": suggestion
|
|
}
|
|
|
|
|
|
class ReadingSession:
|
|
"""Manage individual reading session state."""
|
|
|
|
def __init__(self, websocket: WebSocket, text: str, analysis: Dict):
|
|
self.websocket = websocket
|
|
self.words = text.split()
|
|
self.analysis = analysis
|
|
self.current_word_index = -1
|
|
self.is_paused = False
|
|
self.is_stopped = False
|
|
self.words_per_minute = analysis["final_wpm"]
|
|
self.seconds_per_word = 60.0 / self.words_per_minute
|
|
self.session_task = None
|
|
|
|
|
|
class ConnectionManager:
|
|
"""Manage WebSocket connections and reading sessions."""
|
|
|
|
def __init__(self):
|
|
self.active_connections: List[WebSocket] = []
|
|
self.sessions: Dict[WebSocket, ReadingSession] = {}
|
|
|
|
async def connect(self, websocket: WebSocket):
|
|
await websocket.accept()
|
|
self.active_connections.append(websocket)
|
|
|
|
def disconnect(self, websocket: WebSocket):
|
|
if websocket in self.active_connections:
|
|
self.active_connections.remove(websocket)
|
|
if websocket in self.sessions:
|
|
session = self.sessions[websocket]
|
|
session.is_stopped = True
|
|
if session.session_task:
|
|
session.session_task.cancel()
|
|
del self.sessions[websocket]
|
|
|
|
|
|
manager = ConnectionManager()
|
|
|
|
|
|
def extract_article_from_url(url: str) -> Dict[str, str]:
|
|
"""Extract article content from URL using readability-lxml."""
|
|
|
|
# Validate URL
|
|
parsed = urlparse(url)
|
|
if not parsed.scheme or not parsed.netloc:
|
|
raise ValueError("Invalid URL format")
|
|
|
|
# Add protocol if missing
|
|
if not url.startswith(('http://', 'https://')):
|
|
url = 'https://' + url
|
|
|
|
try:
|
|
# Fetch the page
|
|
headers = {
|
|
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
|
|
}
|
|
|
|
response = requests.get(url, headers=headers, timeout=10)
|
|
response.raise_for_status()
|
|
|
|
# Method 1: Try readability-lxml first (most reliable for articles)
|
|
doc = Document(response.text)
|
|
title = doc.title()
|
|
content_html = doc.summary()
|
|
|
|
if content_html:
|
|
# Parse the extracted HTML to get clean text
|
|
soup = BeautifulSoup(content_html, 'html.parser')
|
|
|
|
# Get all paragraph text
|
|
paragraphs = soup.find_all(['p', 'div', 'span'])
|
|
text_parts = []
|
|
|
|
for p in paragraphs:
|
|
p_text = p.get_text().strip()
|
|
if len(p_text) > 20: # Filter out short snippets
|
|
text_parts.append(p_text)
|
|
|
|
text = '\n\n'.join(text_parts)
|
|
|
|
if text and len(text.strip()) > 100:
|
|
return {
|
|
"title": title or "Article",
|
|
"text": clean_article_text(text),
|
|
"url": url,
|
|
"method": "readability"
|
|
}
|
|
|
|
# Method 2: Fallback to manual BeautifulSoup extraction
|
|
soup = BeautifulSoup(response.text, 'html.parser')
|
|
|
|
# Remove unwanted elements
|
|
for element in soup.find_all(['script', 'style', 'nav', 'header', 'footer', 'aside', 'advertisement']):
|
|
element.decompose()
|
|
|
|
# Try common article selectors
|
|
article_selectors = [
|
|
'article',
|
|
'[role="main"]',
|
|
'.article-content',
|
|
'.post-content',
|
|
'.entry-content',
|
|
'.content',
|
|
'#content',
|
|
'.story-body',
|
|
'.article-body'
|
|
]
|
|
|
|
title = ""
|
|
text = ""
|
|
|
|
# Extract title
|
|
title_selectors = ['h1', 'title', '.headline', '.article-title']
|
|
for selector in title_selectors:
|
|
title_elem = soup.select_one(selector)
|
|
if title_elem:
|
|
title = title_elem.get_text().strip()
|
|
break
|
|
|
|
# Extract article text
|
|
for selector in article_selectors:
|
|
article_elem = soup.select_one(selector)
|
|
if article_elem:
|
|
# Get all paragraph text
|
|
paragraphs = article_elem.find_all(['p', 'div'], recursive=True)
|
|
text_parts = []
|
|
|
|
for p in paragraphs:
|
|
p_text = p.get_text().strip()
|
|
if len(p_text) > 20: # Filter out short snippets
|
|
text_parts.append(p_text)
|
|
|
|
text = '\n\n'.join(text_parts)
|
|
break
|
|
|
|
# Fallback: get all paragraph text from the page
|
|
if not text or len(text.strip()) < 100:
|
|
paragraphs = soup.find_all('p')
|
|
text_parts = []
|
|
|
|
for p in paragraphs:
|
|
p_text = p.get_text().strip()
|
|
if len(p_text) > 30: # Longer threshold for fallback
|
|
text_parts.append(p_text)
|
|
|
|
text = '\n\n'.join(text_parts[:20]) # Limit to first 20 paragraphs
|
|
|
|
if text and len(text.strip()) > 100:
|
|
return {
|
|
"title": title or "Article",
|
|
"text": clean_article_text(text),
|
|
"url": url,
|
|
"method": "beautifulsoup"
|
|
}
|
|
|
|
except Exception as e:
|
|
raise ValueError(f"Failed to extract article: {str(e)}")
|
|
|
|
raise ValueError("Could not extract readable content from this URL")
|
|
|
|
|
|
def clean_article_text(text: str) -> str:
|
|
"""Clean and format extracted article text."""
|
|
|
|
# Remove excessive whitespace
|
|
text = re.sub(r'\s+', ' ', text)
|
|
|
|
# Remove excessive line breaks
|
|
text = re.sub(r'\n\s*\n\s*\n+', '\n\n', text)
|
|
|
|
# Remove common web artifacts
|
|
text = re.sub(r'(Advertisement|ADVERTISEMENT|Subscribe|Newsletter)', '', text, flags=re.IGNORECASE)
|
|
|
|
# Fix sentence spacing
|
|
text = re.sub(r'\.(\w)', r'. \1', text)
|
|
|
|
# Remove lines that are too short (likely navigation/metadata)
|
|
lines = text.split('\n')
|
|
cleaned_lines = []
|
|
|
|
for line in lines:
|
|
line = line.strip()
|
|
if len(line) > 20 and not re.match(r'^(By |Share |Tweet |Email)', line):
|
|
cleaned_lines.append(line)
|
|
|
|
return '\n\n'.join(cleaned_lines).strip()
|
|
|
|
|
|
@app.post("/analyze-text")
|
|
async def analyze_text(request: TextRequest):
|
|
"""Analyze text and return reading pace calculations."""
|
|
calculator = PaceCalculator(request.base_speed)
|
|
analysis = calculator.calculate_reading_speed(request.text)
|
|
|
|
return {
|
|
"analysis": analysis,
|
|
"word_count": len(request.text.split()),
|
|
"estimated_time_minutes": len(request.text.split()) / analysis["final_wpm"]
|
|
}
|
|
|
|
|
|
@app.post("/extract-article")
|
|
async def extract_article(request: URLRequest):
|
|
"""Extract article content from URL."""
|
|
try:
|
|
result = extract_article_from_url(request.url)
|
|
word_count = len(result["text"].split())
|
|
|
|
return {
|
|
"title": result["title"],
|
|
"text": result["text"],
|
|
"url": result["url"],
|
|
"word_count": word_count,
|
|
"extraction_method": result["method"],
|
|
"success": True
|
|
}
|
|
|
|
except ValueError as e:
|
|
raise HTTPException(status_code=400, detail=str(e))
|
|
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
|
|
|
|
|
|
@app.websocket("/ws/reading-session")
|
|
async def websocket_endpoint(websocket: WebSocket):
|
|
"""WebSocket endpoint for real-time reading sessions."""
|
|
await manager.connect(websocket)
|
|
|
|
try:
|
|
while True:
|
|
data = await websocket.receive_text()
|
|
message = json.loads(data)
|
|
|
|
if message["type"] == "start_reading":
|
|
text = message["text"]
|
|
base_speed = float(message.get("base_speed", 200.0))
|
|
|
|
calculator = PaceCalculator(base_speed)
|
|
analysis = calculator.calculate_reading_speed(text)
|
|
|
|
# Create session
|
|
session = ReadingSession(websocket, text, analysis)
|
|
manager.sessions[websocket] = session
|
|
|
|
# Send analysis back
|
|
await websocket.send_text(json.dumps({
|
|
"type": "analysis_complete",
|
|
"analysis": analysis,
|
|
"total_words": len(session.words)
|
|
}))
|
|
|
|
# Start reading session as background task
|
|
session.session_task = asyncio.create_task(
|
|
start_reading_session(session)
|
|
)
|
|
|
|
elif message["type"] == "pause":
|
|
if websocket in manager.sessions:
|
|
session = manager.sessions[websocket]
|
|
session.is_paused = True
|
|
await websocket.send_text(json.dumps({
|
|
"type": "paused"
|
|
}))
|
|
|
|
elif message["type"] == "resume":
|
|
if websocket in manager.sessions:
|
|
session = manager.sessions[websocket]
|
|
session.is_paused = False
|
|
await websocket.send_text(json.dumps({
|
|
"type": "resumed"
|
|
}))
|
|
|
|
elif message["type"] == "stop":
|
|
if websocket in manager.sessions:
|
|
session = manager.sessions[websocket]
|
|
session.is_stopped = True
|
|
if session.session_task:
|
|
session.session_task.cancel()
|
|
await websocket.send_text(json.dumps({
|
|
"type": "stopped"
|
|
}))
|
|
break
|
|
|
|
except WebSocketDisconnect:
|
|
manager.disconnect(websocket)
|
|
|
|
|
|
async def start_reading_session(session: ReadingSession):
|
|
"""Start the reading session with progressive text reveal."""
|
|
try:
|
|
for i, word in enumerate(session.words):
|
|
if session.is_stopped:
|
|
break
|
|
|
|
# Wait while paused
|
|
while session.is_paused and not session.is_stopped:
|
|
await asyncio.sleep(0.1)
|
|
|
|
if session.is_stopped:
|
|
break
|
|
|
|
session.current_word_index = i
|
|
|
|
# Send word reveal - this reveals the word progressively
|
|
await session.websocket.send_text(json.dumps({
|
|
"type": "word_reveal",
|
|
"word_index": i,
|
|
"word": word,
|
|
"revealed_words": session.words[:i+1], # All words up to current
|
|
"progress": (i + 1) / len(session.words)
|
|
}))
|
|
|
|
# Wait for the calculated time
|
|
await asyncio.sleep(session.seconds_per_word)
|
|
|
|
# Session complete
|
|
if not session.is_stopped:
|
|
await session.websocket.send_text(json.dumps({
|
|
"type": "session_complete"
|
|
}))
|
|
|
|
except (WebSocketDisconnect, asyncio.CancelledError):
|
|
pass
|
|
|
|
|
|
if __name__ == "__main__":
|
|
import uvicorn
|
|
uvicorn.run(app, host="0.0.0.0", port=8000) |