street-lingo/backend/main.py

818 lines
36 KiB
Python

import difflib
import re
import json
import base64
import logging
import time
from typing import Dict, Any, List
from fastapi import FastAPI, HTTPException, WebSocket, WebSocketDisconnect
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from google.cloud import speech
import openai
from languages.indonesian.services import IndonesianConversationFlowService
from languages.german.services import GermanConversationFlowService
from config import config
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = FastAPI()
config.validate()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Temporarily allow all origins for debugging
allow_credentials=False, # Set to False when using allow_origins=["*"]
allow_methods=["*"],
allow_headers=["*"],
)
# Language-specific services
language_services = {
"indonesian": IndonesianConversationFlowService(),
"german": GermanConversationFlowService()
}
class ResponseCheck(BaseModel):
user_response: str
expected_response: str
scenario: str
class ResponseResult(BaseModel):
is_correct: bool
feedback: str
similarity: float
class TranslationRequest(BaseModel):
text: str
source_language: str
target_language: str
class TranslationResult(BaseModel):
translation: str
source_text: str
class SuggestionRequest(BaseModel):
language: str
scenario: str
conversation_history: List[Dict[str, str]]
class SuggestionResponse(BaseModel):
intro: str
suggestions: List[Dict[str, str]]
class ConversationFeedbackRequest(BaseModel):
language: str
scenario: str
conversation_history: List[Dict[str, str]]
class ConversationFeedbackResponse(BaseModel):
encouragement: str
suggestions: List[Dict[str, str]]
examples: List[Dict[str, str]]
def normalize_text(text: str) -> str:
text = text.lower().strip()
text = re.sub(r"[^\w\s]", "", text)
text = re.sub(r"\s+", " ", text)
return text
def calculate_similarity(text1: str, text2: str) -> float:
normalized1 = normalize_text(text1)
normalized2 = normalize_text(text2)
return difflib.SequenceMatcher(None, normalized1, normalized2).ratio()
def generate_feedback(
user_response: str, expected_response: str, similarity: float, scenario: str
) -> str:
if similarity >= 0.9:
return "Perfect! Excellent Indonesian!"
elif similarity >= 0.7:
return "Great job! That's correct!"
elif similarity >= 0.5:
return f"Good attempt! Try: '{expected_response}'"
elif similarity >= 0.3:
return f"Close, but try again. Expected: '{expected_response}'"
else:
return f"Not quite right. The correct answer is: '{expected_response}'"
@app.post("/api/check-response", response_model=ResponseResult)
async def check_response(request: ResponseCheck) -> ResponseResult:
"""Check user response against expected response."""
try:
similarity = calculate_similarity(request.user_response, request.expected_response)
is_correct = similarity >= 0.7
feedback = generate_feedback(
request.user_response,
request.expected_response,
similarity,
request.scenario,
)
return ResponseResult(
is_correct=is_correct,
feedback=feedback,
similarity=similarity,
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e)) from e
@app.get("/api/scenarios/{language}")
async def get_scenarios(language: str) -> dict:
"""Get scenarios for a specific language (indonesian or german)"""
if language == "indonesian":
from languages.indonesian.models import SCENARIO_PERSONALITIES
native_key = "indonesian"
elif language == "german":
from languages.german.models import SCENARIO_PERSONALITIES
native_key = "native"
else:
raise HTTPException(status_code=400, detail="Unsupported language")
scenarios = {}
for scenario_id, personalities in SCENARIO_PERSONALITIES.items():
default_personality = list(personalities.values())[0]
scenarios[scenario_id] = {
"id": scenario_id,
"title": default_personality.scenario_title,
"description": default_personality.scenario_description,
"challenge": default_personality.scenario_challenge,
"goal": default_personality.scenario_goal,
"character": default_personality.name,
"character_background": default_personality.background,
"character_gender": default_personality.gender.value,
"location": default_personality.location_context,
"language": language,
"goal_items": [
{
"id": item.id,
"description": item.description,
"completed": False
} for item in default_personality.goal_items
],
"helpful_phrases": [
{
native_key: phrase.native if hasattr(phrase, 'native') else phrase.indonesian,
"english": phrase.english
} for phrase in default_personality.helpful_phrases
],
"available_characters": [
{
"id": char_id,
"name": char.name,
"background": char.background,
"tone": char.tone.value,
"gender": char.gender.value
} for char_id, char in personalities.items()
]
}
return scenarios
@app.get("/api/scenarios")
async def get_all_scenarios() -> dict:
"""Get all available scenarios for all languages"""
all_scenarios = {}
# Get Indonesian scenarios
indonesian_scenarios = await get_scenarios("indonesian")
all_scenarios["indonesian"] = indonesian_scenarios
# Get German scenarios
german_scenarios = await get_scenarios("german")
all_scenarios["german"] = german_scenarios
return all_scenarios
@app.post("/api/suggestions", response_model=SuggestionResponse)
async def generate_suggestions(request: SuggestionRequest) -> SuggestionResponse:
"""Generate contextual language suggestions based on conversation history."""
logger.info(f"Received suggestions request: language={request.language}, scenario={request.scenario}")
try:
client = openai.OpenAI(api_key=config.OPENAI_API_KEY)
# Get recent conversation context
conversation_context = ""
for i, msg in enumerate(request.conversation_history[-4:]):
conversation_context += f"{msg['type'].capitalize()}: {msg['text']}\n"
# Determine target language and context
if request.language == "german":
target_language = "German"
native_language = "English"
scenario_prompt = f"in a {request.scenario} scenario in Germany"
else:
target_language = "Indonesian"
native_language = "English"
scenario_prompt = f"in a {request.scenario} scenario in Indonesia"
suggestion_prompt = f"""You are a helpful language learning assistant. Based on the conversation history below, suggest 3 useful phrases the user might want to say next in {target_language}.
Conversation context {scenario_prompt}:
{conversation_context}
Provide suggestions as a JSON object with:
- "intro": A brief encouraging message about what they might want to say next
- "suggestions": Array of 3 objects, each with:
- "{target_language.lower()}_text": The phrase in {target_language}
- "english_meaning": The English translation/meaning
Make the suggestions contextual, natural, and progressively helpful for the conversation. Focus on practical phrases they might actually need.
Example format:
{{
"intro": "Here are some phrases you might find useful:",
"suggestions": [
{{
"{target_language.lower()}_text": "Example phrase",
"english_meaning": "English translation"
}}
]
}}"""
response = client.chat.completions.create(
model=config.OPENAI_MODEL,
messages=[
{"role": "system", "content": f"You are a helpful {target_language} language learning assistant. Always respond with valid JSON."},
{"role": "user", "content": suggestion_prompt}
],
max_tokens=500,
temperature=0.7
)
suggestion_json = response.choices[0].message.content.strip()
logger.info(f"AI suggestion response: {suggestion_json}")
# Parse JSON response
import json
try:
# Clean up the JSON response to handle potential formatting issues
cleaned_json = suggestion_json.strip()
if cleaned_json.startswith('```json'):
cleaned_json = cleaned_json[7:-3].strip()
elif cleaned_json.startswith('```'):
cleaned_json = cleaned_json[3:-3].strip()
suggestion_data = json.loads(cleaned_json)
return SuggestionResponse(
intro=suggestion_data.get("intro", "Here are some helpful phrases:"),
suggestions=suggestion_data.get("suggestions", [])
)
except json.JSONDecodeError as e:
logger.error(f"JSON decode error: {str(e)} for content: {cleaned_json}")
# Fallback if JSON parsing fails
text_key = f"{target_language.lower()}_text"
fallback_suggestions = [
{
text_key: "Excuse me, can you help me?",
"english_meaning": "A polite way to ask for assistance"
},
{
text_key: "Thank you very much",
"english_meaning": "Express gratitude"
},
{
text_key: "I don't understand",
"english_meaning": "When you need clarification"
}
]
return SuggestionResponse(
intro="Here are some helpful phrases:",
suggestions=fallback_suggestions
)
except Exception as e:
logger.error(f"Suggestion generation error: {str(e)}")
# Return fallback suggestions instead of raising an error
return SuggestionResponse(
intro="Here are some helpful phrases:",
suggestions=[
{
"german_text" if request.language == "german" else "indonesian_text": "Hello",
"english_meaning": "A basic greeting"
},
{
"german_text" if request.language == "german" else "indonesian_text": "Thank you",
"english_meaning": "Express gratitude"
},
{
"german_text" if request.language == "german" else "indonesian_text": "Please",
"english_meaning": "Polite request"
}
]
)
@app.post("/api/translate", response_model=TranslationResult)
async def translate_text(request: TranslationRequest) -> TranslationResult:
try:
client = openai.OpenAI(api_key=config.OPENAI_API_KEY)
translation_prompt = f"""Translate the following Indonesian text to natural, conversational English.
Keep the tone and style appropriate for casual conversation.
Indonesian text: "{request.text}"
Provide only the English translation, nothing else."""
response = client.chat.completions.create(
model=config.OPENAI_MODEL,
messages=[
{"role": "system", "content": "You are a professional Indonesian to English translator. Provide natural, conversational translations."},
{"role": "user", "content": translation_prompt}
],
max_tokens=200,
temperature=0.3
)
translation = response.choices[0].message.content.strip()
return TranslationResult(
translation=translation,
source_text=request.text
)
except Exception as e:
logger.error(f"Translation error: {str(e)}")
raise HTTPException(status_code=500, detail=f"Translation failed: {str(e)}")
@app.post("/api/conversation-feedback", response_model=ConversationFeedbackResponse)
async def generate_conversation_feedback(request: ConversationFeedbackRequest) -> ConversationFeedbackResponse:
"""Generate encouraging feedback and suggestions for completed conversation."""
logger.info(f"Received feedback request: language={request.language}, scenario={request.scenario}")
try:
client = openai.OpenAI(api_key=config.OPENAI_API_KEY)
# Build conversation history
conversation_context = ""
user_messages = []
for msg in request.conversation_history:
if msg.get('type') == 'user':
user_messages.append(msg['text'])
conversation_context += f"{msg.get('type', 'unknown').capitalize()}: {msg.get('text', '')}\n"
# Determine target language and feedback context
if request.language == "german":
target_language = "German"
language_specific_feedback = """
Focus on common German language learning areas:
- Article usage (der, die, das)
- Verb conjugation and word order
- Formal vs informal language (Sie vs du)
- Separable verbs
- Common German expressions and idioms
"""
else:
target_language = "Indonesian"
language_specific_feedback = """
Focus on common Indonesian language learning areas:
- Formal vs informal language (using proper pronouns)
- Sentence structure and word order
- Common Indonesian expressions
- Politeness levels and cultural context
"""
feedback_prompt = f"""You are an encouraging {target_language} language teacher. A student has just finished a conversation practice session in a {request.scenario} scenario.
Here's their conversation:
{conversation_context}
{language_specific_feedback}
Provide helpful, encouraging feedback as a JSON object with:
- "encouragement": A positive, motivating message about their effort (2-3 sentences)
- "suggestions": Array of 2-3 objects with:
- "category": Area of improvement (e.g., "Pronunciation", "Grammar", "Vocabulary")
- "tip": Specific, actionable advice
- "examples": Array of 1-2 objects with:
- "original": Something they actually said (from the conversation)
- "improved": A better way to say it
- "reason": Brief explanation of why it's better
Make it encouraging and supportive, focusing on growth rather than criticism. If they did well, focus on areas to sound more natural or confident.
Example format:
{{
"encouragement": "You did a great job engaging in this conversation! Your effort to communicate is really paying off.",
"suggestions": [
{{
"category": "Vocabulary",
"tip": "Try using more common everyday words to sound more natural"
}}
],
"examples": [
{{
"original": "I want to purchase this item",
"improved": "I'd like to buy this",
"reason": "Sounds more natural and conversational"
}}
]
}}"""
response = client.chat.completions.create(
model=config.OPENAI_MODEL,
messages=[
{"role": "system", "content": f"You are an encouraging {target_language} language teacher. Always respond with valid JSON and be supportive."},
{"role": "user", "content": feedback_prompt}
],
max_tokens=600,
temperature=0.7
)
feedback_json = response.choices[0].message.content.strip()
logger.info(f"AI feedback response: {feedback_json}")
# Parse JSON response
try:
# Clean up the JSON response
cleaned_json = feedback_json.strip()
if cleaned_json.startswith('```json'):
cleaned_json = cleaned_json[7:-3].strip()
elif cleaned_json.startswith('```'):
cleaned_json = cleaned_json[3:-3].strip()
feedback_data = json.loads(cleaned_json)
return ConversationFeedbackResponse(
encouragement=feedback_data.get("encouragement", "Great job practicing! Every conversation helps you improve."),
suggestions=feedback_data.get("suggestions", []),
examples=feedback_data.get("examples", [])
)
except json.JSONDecodeError as e:
logger.error(f"JSON decode error: {str(e)} for content: {cleaned_json}")
# Fallback response
return ConversationFeedbackResponse(
encouragement="Great job practicing! Every conversation helps you improve.",
suggestions=[
{
"category": "Practice",
"tip": "Keep practicing regular conversations to build confidence"
}
],
examples=[]
)
except Exception as e:
logger.error(f"Feedback generation error: {str(e)}")
# Return encouraging fallback
return ConversationFeedbackResponse(
encouragement="Great job practicing! Every conversation helps you improve.",
suggestions=[
{
"category": "Practice",
"tip": "Keep practicing regular conversations to build confidence"
}
],
examples=[]
)
@app.get("/api/health")
async def health_check() -> dict:
return {"status": "healthy"}
session_services: Dict[str, Any] = {}
@app.websocket("/ws/speech/{language}")
async def websocket_speech_endpoint(websocket: WebSocket, language: str):
await websocket.accept()
logger.info(f"WebSocket client connected for language: {language}")
# Validate language
if language not in language_services:
await websocket.close(code=1008, reason="Unsupported language")
return
audio_buffer = bytearray()
min_audio_length = 48000
is_recording = False
chunk_count = 0
latest_transcript = ""
recording_start_time = None
max_recording_duration = 60 # 60 seconds max (increased to give more time after suggestions)
transcript_repeat_count = 0
last_transcript = ""
high_confidence_count = 0
import uuid
session_id = str(uuid.uuid4())
session_conversation_service = language_services[language].__class__() # Create new instance
session_services[session_id] = session_conversation_service
try:
while True:
data = await websocket.receive_text()
message = json.loads(data)
logger.info(f"Received message type: {message['type']}")
if message["type"] == "audio_start":
is_recording = True
audio_buffer.clear()
chunk_count = 0
latest_transcript = ""
recording_start_time = time.time()
logger.info("Started recording session")
elif message["type"] == "conversation_reset":
session_conversation_service.ai_service.reset_conversation()
logger.info("Conversation history reset")
elif message["type"] == "audio_chunk":
if is_recording:
# Check for recording timeout
if recording_start_time and time.time() - recording_start_time > max_recording_duration:
logger.warning("Recording timeout reached, auto-stopping")
# Send timeout notification to frontend
timeout_notification = {
"type": "recording_timeout",
"message": "Recording stopped due to timeout"
}
await websocket.send_text(json.dumps(timeout_notification))
# Force audio_end processing
message = {"type": "audio_end", "scenario_context": message.get("scenario_context", "")}
# Don't return, let it fall through to audio_end processing
else:
audio_data = base64.b64decode(message["audio"])
logger.info(f"Received audio chunk: {len(audio_data)} bytes")
audio_buffer.extend(audio_data)
logger.info(f"Audio buffer size: {len(audio_buffer)} bytes")
# Process chunk for real-time transcription
chunk_count += 1
try:
# Only process every 8th chunk to reduce log spam and API calls
if chunk_count % 8 == 0 and len(audio_buffer) >= 19200: # ~0.4 seconds of audio at 48kHz
recognition_audio = speech.RecognitionAudio(content=bytes(audio_buffer))
response = session_conversation_service.stt_service.client.recognize(
config=session_conversation_service.stt_service.recognition_config,
audio=recognition_audio
)
if response.results:
transcript = response.results[0].alternatives[0].transcript
confidence = response.results[0].alternatives[0].confidence
# Store transcript if confidence is reasonable (lowered for speed)
if confidence > 0.6:
latest_transcript = transcript # Store latest transcript
# Check for repeated high-confidence transcripts
if confidence > 0.9:
if transcript == last_transcript:
high_confidence_count += 1
logger.info(f"Repeated high confidence transcript #{high_confidence_count}: '{transcript}' (confidence: {confidence})")
# If we've seen the same high-confidence transcript 4+ times, auto-stop
if high_confidence_count >= 4:
logger.info("Auto-stopping recording due to repeated high-confidence transcript")
is_recording = False
# Send final processing message
final_message = {"type": "audio_end", "scenario_context": message.get("scenario_context", "")}
# Process immediately without waiting for more chunks
await websocket.send_text(json.dumps({
"type": "transcription",
"transcript": transcript,
"is_final": True,
"confidence": confidence
}))
# Process AI response
logger.info("Getting AI response...")
ai_response = await session_conversation_service.process_conversation_flow_fast(
transcript,
message.get("scenario_context", "")
)
logger.info(f"AI response: {ai_response.get('text', 'No text')}")
await websocket.send_text(json.dumps(ai_response))
audio_buffer.clear()
logger.info("Recording session ended due to repeated transcript")
continue # Continue to next message
else:
high_confidence_count = 1
last_transcript = transcript
logger.info(f"High confidence transcript ready: '{transcript}' (confidence: {confidence})")
else:
high_confidence_count = 0
last_transcript = ""
transcription_result = {
"type": "transcription",
"transcript": transcript,
"is_final": False,
"confidence": confidence
}
await websocket.send_text(json.dumps(transcription_result))
# Only log interim transcriptions occasionally to reduce spam
if chunk_count % 16 == 0:
logger.info(f"Interim transcription: '{transcript}' (confidence: {confidence})")
else:
transcription_result = {
"type": "transcription",
"transcript": "Listening...",
"is_final": False,
"confidence": 0.0
}
await websocket.send_text(json.dumps(transcription_result))
except Exception as e:
# Only log transcription errors occasionally to reduce spam
if chunk_count % 16 == 0:
logger.error(f"Real-time transcription error: {str(e)}")
transcription_result = {
"type": "transcription",
"transcript": "Listening...",
"is_final": False,
"confidence": 0.0
}
await websocket.send_text(json.dumps(transcription_result))
else:
# Reduce logging for non-recording chunks
if chunk_count % 32 == 0:
logger.info("Received audio chunk but not in recording mode")
elif message["type"] == "audio_end":
is_recording = False
final_transcript = ""
# Use latest interim transcript if available for faster response
logger.info(f"Checking latest_transcript: '{latest_transcript}'")
if latest_transcript.strip():
final_transcript = latest_transcript
logger.info(f"Using latest interim transcript: '{final_transcript}'")
# Send final transcription immediately
transcription_result = {
"type": "transcription",
"transcript": final_transcript,
"is_final": True,
"confidence": 0.8 # Reasonable confidence for interim result
}
await websocket.send_text(json.dumps(transcription_result))
# Process AI response with faster flow
logger.info("Getting AI response...")
ai_response = await session_conversation_service.process_conversation_flow_fast(
final_transcript,
message.get("scenario_context", "")
)
logger.info(f"AI response: {ai_response.get('text', 'No text')}")
await websocket.send_text(json.dumps(ai_response))
# Clear buffer
audio_buffer.clear()
logger.info("Recording session ended, ready for next session")
elif len(audio_buffer) > 0:
# Fallback to full transcription if no interim results
logger.info(f"Processing final audio buffer: {len(audio_buffer)} bytes")
try:
recognition_audio = speech.RecognitionAudio(content=bytes(audio_buffer))
response = session_conversation_service.stt_service.client.recognize(
config=session_conversation_service.stt_service.recognition_config,
audio=recognition_audio
)
if response.results:
transcript = response.results[0].alternatives[0].transcript
confidence = response.results[0].alternatives[0].confidence
logger.info(f"Final transcription: '{transcript}' (confidence: {confidence})")
transcription_result = {
"type": "transcription",
"transcript": transcript,
"is_final": True,
"confidence": confidence
}
await websocket.send_text(json.dumps(transcription_result))
logger.info("Getting AI response...")
ai_response = await session_conversation_service.process_conversation_flow(
transcript,
message.get("scenario_context", "")
)
logger.info(f"AI response: {ai_response.get('text', 'No text')}")
await websocket.send_text(json.dumps(ai_response))
else:
logger.info("No transcription results from Google Speech")
# Send empty final transcription so UI knows recording ended
transcription_result = {
"type": "transcription",
"transcript": "",
"is_final": True,
"confidence": 0.0
}
await websocket.send_text(json.dumps(transcription_result))
audio_buffer.clear()
logger.info("Recording session ended, ready for next session")
except Exception as e:
logger.error(f"Final speech recognition error: {str(e)}")
# Send empty final transcription so UI knows recording ended
transcription_result = {
"type": "transcription",
"transcript": "",
"is_final": True,
"confidence": 0.0
}
await websocket.send_text(json.dumps(transcription_result))
error_result = {
"type": "error",
"message": f"Speech recognition error: {str(e)}"
}
await websocket.send_text(json.dumps(error_result))
audio_buffer.clear()
else:
logger.info("No audio data to process")
# Send empty final transcription so UI knows recording ended
transcription_result = {
"type": "transcription",
"transcript": "",
"is_final": True,
"confidence": 0.0
}
await websocket.send_text(json.dumps(transcription_result))
elif message["type"] == "text_message":
logger.info(f"Processing text message: '{message['text']}'")
ai_response = await session_conversation_service.process_conversation_flow(
message["text"],
message.get("scenario_context", "")
)
logger.info(f"AI response: {ai_response.get('text', 'No text')}")
await websocket.send_text(json.dumps(ai_response))
elif message["type"] == "initial_greeting":
logger.info("Processing initial greeting request")
ai_response = await session_conversation_service.generate_initial_greeting(
message.get("scenario_context", "")
)
logger.info(f"Initial greeting: {ai_response.get('text', 'No text')}")
await websocket.send_text(json.dumps(ai_response))
except WebSocketDisconnect:
logger.info("WebSocket client disconnected")
except Exception as e:
logger.error(f"WebSocket error: {str(e)}")
error_message = {
"type": "error",
"message": f"WebSocket error: {str(e)}"
}
await websocket.send_text(json.dumps(error_message))
@app.websocket("/ws/tts")
async def websocket_tts_endpoint(websocket: WebSocket):
"""WebSocket endpoint for text-to-speech streaming."""
await websocket.accept()
try:
while True:
data = await websocket.receive_text()
message = json.loads(data)
if message["type"] == "synthesize":
try:
# Use the default TTS service for this endpoint
tts_service = language_services["indonesian"].tts_service
audio_content = await tts_service.synthesize_speech(message["text"])
audio_base64 = base64.b64encode(audio_content).decode('utf-8')
response = {
"type": "audio",
"audio": audio_base64,
"format": "mp3"
}
await websocket.send_text(json.dumps(response))
except Exception as e:
error_response = {
"type": "error",
"message": f"TTS error: {str(e)}"
}
await websocket.send_text(json.dumps(error_response))
except WebSocketDisconnect:
print("TTS client disconnected")
except Exception as e:
error_message = {
"type": "error",
"message": f"TTS WebSocket error: {str(e)}"
}
await websocket.send_text(json.dumps(error_message))
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host=config.HOST, port=config.PORT, log_level="debug" if config.DEBUG else "info")