import difflib import re import json import base64 import logging import time from typing import Dict, Any, List from fastapi import FastAPI, HTTPException, WebSocket, WebSocketDisconnect from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel from google.cloud import speech import openai from languages.indonesian.services import IndonesianConversationFlowService from languages.german.services import GermanConversationFlowService from config import config logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) app = FastAPI() config.validate() app.add_middleware( CORSMiddleware, allow_origins=["*"], # Temporarily allow all origins for debugging allow_credentials=False, # Set to False when using allow_origins=["*"] allow_methods=["*"], allow_headers=["*"], ) # Language-specific services language_services = { "indonesian": IndonesianConversationFlowService(), "german": GermanConversationFlowService() } class ResponseCheck(BaseModel): user_response: str expected_response: str scenario: str class ResponseResult(BaseModel): is_correct: bool feedback: str similarity: float class TranslationRequest(BaseModel): text: str source_language: str target_language: str class TranslationResult(BaseModel): translation: str source_text: str class SuggestionRequest(BaseModel): language: str scenario: str conversation_history: List[Dict[str, str]] class SuggestionResponse(BaseModel): intro: str suggestions: List[Dict[str, str]] class ConversationFeedbackRequest(BaseModel): language: str scenario: str conversation_history: List[Dict[str, str]] class ConversationFeedbackResponse(BaseModel): encouragement: str suggestions: List[Dict[str, str]] examples: List[Dict[str, str]] def normalize_text(text: str) -> str: text = text.lower().strip() text = re.sub(r"[^\w\s]", "", text) text = re.sub(r"\s+", " ", text) return text def calculate_similarity(text1: str, text2: str) -> float: normalized1 = normalize_text(text1) normalized2 = normalize_text(text2) return difflib.SequenceMatcher(None, normalized1, normalized2).ratio() def generate_feedback( user_response: str, expected_response: str, similarity: float, scenario: str ) -> str: if similarity >= 0.9: return "Perfect! Excellent Indonesian!" elif similarity >= 0.7: return "Great job! That's correct!" elif similarity >= 0.5: return f"Good attempt! Try: '{expected_response}'" elif similarity >= 0.3: return f"Close, but try again. Expected: '{expected_response}'" else: return f"Not quite right. The correct answer is: '{expected_response}'" @app.post("/api/check-response", response_model=ResponseResult) async def check_response(request: ResponseCheck) -> ResponseResult: """Check user response against expected response.""" try: similarity = calculate_similarity(request.user_response, request.expected_response) is_correct = similarity >= 0.7 feedback = generate_feedback( request.user_response, request.expected_response, similarity, request.scenario, ) return ResponseResult( is_correct=is_correct, feedback=feedback, similarity=similarity, ) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) from e @app.get("/api/scenarios/{language}") async def get_scenarios(language: str) -> dict: """Get scenarios for a specific language (indonesian or german)""" if language == "indonesian": from languages.indonesian.models import SCENARIO_PERSONALITIES native_key = "indonesian" elif language == "german": from languages.german.models import SCENARIO_PERSONALITIES native_key = "native" else: raise HTTPException(status_code=400, detail="Unsupported language") scenarios = {} for scenario_id, personalities in SCENARIO_PERSONALITIES.items(): default_personality = list(personalities.values())[0] scenarios[scenario_id] = { "id": scenario_id, "title": default_personality.scenario_title, "description": default_personality.scenario_description, "challenge": default_personality.scenario_challenge, "goal": default_personality.scenario_goal, "character": default_personality.name, "character_background": default_personality.background, "character_gender": default_personality.gender.value, "location": default_personality.location_context, "language": language, "goal_items": [ { "id": item.id, "description": item.description, "completed": False } for item in default_personality.goal_items ], "helpful_phrases": [ { native_key: phrase.native if hasattr(phrase, 'native') else phrase.indonesian, "english": phrase.english } for phrase in default_personality.helpful_phrases ], "available_characters": [ { "id": char_id, "name": char.name, "background": char.background, "tone": char.tone.value, "gender": char.gender.value } for char_id, char in personalities.items() ] } return scenarios @app.get("/api/scenarios") async def get_all_scenarios() -> dict: """Get all available scenarios for all languages""" all_scenarios = {} # Get Indonesian scenarios indonesian_scenarios = await get_scenarios("indonesian") all_scenarios["indonesian"] = indonesian_scenarios # Get German scenarios german_scenarios = await get_scenarios("german") all_scenarios["german"] = german_scenarios return all_scenarios @app.post("/api/suggestions", response_model=SuggestionResponse) async def generate_suggestions(request: SuggestionRequest) -> SuggestionResponse: """Generate contextual language suggestions based on conversation history.""" logger.info(f"Received suggestions request: language={request.language}, scenario={request.scenario}") try: client = openai.OpenAI(api_key=config.OPENAI_API_KEY) # Get recent conversation context conversation_context = "" for i, msg in enumerate(request.conversation_history[-4:]): conversation_context += f"{msg['type'].capitalize()}: {msg['text']}\n" # Determine target language and context if request.language == "german": target_language = "German" native_language = "English" scenario_prompt = f"in a {request.scenario} scenario in Germany" else: target_language = "Indonesian" native_language = "English" scenario_prompt = f"in a {request.scenario} scenario in Indonesia" suggestion_prompt = f"""You are a helpful language learning assistant. Based on the conversation history below, suggest 3 useful phrases the user might want to say next in {target_language}. Conversation context {scenario_prompt}: {conversation_context} Provide suggestions as a JSON object with: - "intro": A brief encouraging message about what they might want to say next - "suggestions": Array of 3 objects, each with: - "{target_language.lower()}_text": The phrase in {target_language} - "english_meaning": The English translation/meaning Make the suggestions contextual, natural, and progressively helpful for the conversation. Focus on practical phrases they might actually need. Example format: {{ "intro": "Here are some phrases you might find useful:", "suggestions": [ {{ "{target_language.lower()}_text": "Example phrase", "english_meaning": "English translation" }} ] }}""" response = client.chat.completions.create( model=config.OPENAI_MODEL, messages=[ {"role": "system", "content": f"You are a helpful {target_language} language learning assistant. Always respond with valid JSON."}, {"role": "user", "content": suggestion_prompt} ], max_tokens=500, temperature=0.7 ) suggestion_json = response.choices[0].message.content.strip() logger.info(f"AI suggestion response: {suggestion_json}") # Parse JSON response import json try: # Clean up the JSON response to handle potential formatting issues cleaned_json = suggestion_json.strip() if cleaned_json.startswith('```json'): cleaned_json = cleaned_json[7:-3].strip() elif cleaned_json.startswith('```'): cleaned_json = cleaned_json[3:-3].strip() suggestion_data = json.loads(cleaned_json) return SuggestionResponse( intro=suggestion_data.get("intro", "Here are some helpful phrases:"), suggestions=suggestion_data.get("suggestions", []) ) except json.JSONDecodeError as e: logger.error(f"JSON decode error: {str(e)} for content: {cleaned_json}") # Fallback if JSON parsing fails text_key = f"{target_language.lower()}_text" fallback_suggestions = [ { text_key: "Excuse me, can you help me?", "english_meaning": "A polite way to ask for assistance" }, { text_key: "Thank you very much", "english_meaning": "Express gratitude" }, { text_key: "I don't understand", "english_meaning": "When you need clarification" } ] return SuggestionResponse( intro="Here are some helpful phrases:", suggestions=fallback_suggestions ) except Exception as e: logger.error(f"Suggestion generation error: {str(e)}") # Return fallback suggestions instead of raising an error return SuggestionResponse( intro="Here are some helpful phrases:", suggestions=[ { "german_text" if request.language == "german" else "indonesian_text": "Hello", "english_meaning": "A basic greeting" }, { "german_text" if request.language == "german" else "indonesian_text": "Thank you", "english_meaning": "Express gratitude" }, { "german_text" if request.language == "german" else "indonesian_text": "Please", "english_meaning": "Polite request" } ] ) @app.post("/api/translate", response_model=TranslationResult) async def translate_text(request: TranslationRequest) -> TranslationResult: try: client = openai.OpenAI(api_key=config.OPENAI_API_KEY) translation_prompt = f"""Translate the following Indonesian text to natural, conversational English. Keep the tone and style appropriate for casual conversation. Indonesian text: "{request.text}" Provide only the English translation, nothing else.""" response = client.chat.completions.create( model=config.OPENAI_MODEL, messages=[ {"role": "system", "content": "You are a professional Indonesian to English translator. Provide natural, conversational translations."}, {"role": "user", "content": translation_prompt} ], max_tokens=200, temperature=0.3 ) translation = response.choices[0].message.content.strip() return TranslationResult( translation=translation, source_text=request.text ) except Exception as e: logger.error(f"Translation error: {str(e)}") raise HTTPException(status_code=500, detail=f"Translation failed: {str(e)}") @app.post("/api/conversation-feedback", response_model=ConversationFeedbackResponse) async def generate_conversation_feedback(request: ConversationFeedbackRequest) -> ConversationFeedbackResponse: """Generate encouraging feedback and suggestions for completed conversation.""" logger.info(f"Received feedback request: language={request.language}, scenario={request.scenario}") try: client = openai.OpenAI(api_key=config.OPENAI_API_KEY) # Build conversation history conversation_context = "" user_messages = [] for msg in request.conversation_history: if msg.get('type') == 'user': user_messages.append(msg['text']) conversation_context += f"{msg.get('type', 'unknown').capitalize()}: {msg.get('text', '')}\n" # Determine target language and feedback context if request.language == "german": target_language = "German" language_specific_feedback = """ Focus on common German language learning areas: - Article usage (der, die, das) - Verb conjugation and word order - Formal vs informal language (Sie vs du) - Separable verbs - Common German expressions and idioms """ else: target_language = "Indonesian" language_specific_feedback = """ Focus on common Indonesian language learning areas: - Formal vs informal language (using proper pronouns) - Sentence structure and word order - Common Indonesian expressions - Politeness levels and cultural context """ feedback_prompt = f"""You are an encouraging {target_language} language teacher. A student has just finished a conversation practice session in a {request.scenario} scenario. Here's their conversation: {conversation_context} {language_specific_feedback} Provide helpful, encouraging feedback as a JSON object with: - "encouragement": A positive, motivating message about their effort (2-3 sentences) - "suggestions": Array of 2-3 objects with: - "category": Area of improvement (e.g., "Pronunciation", "Grammar", "Vocabulary") - "tip": Specific, actionable advice - "examples": Array of 1-2 objects with: - "original": Something they actually said (from the conversation) - "improved": A better way to say it - "reason": Brief explanation of why it's better Make it encouraging and supportive, focusing on growth rather than criticism. If they did well, focus on areas to sound more natural or confident. Example format: {{ "encouragement": "You did a great job engaging in this conversation! Your effort to communicate is really paying off.", "suggestions": [ {{ "category": "Vocabulary", "tip": "Try using more common everyday words to sound more natural" }} ], "examples": [ {{ "original": "I want to purchase this item", "improved": "I'd like to buy this", "reason": "Sounds more natural and conversational" }} ] }}""" response = client.chat.completions.create( model=config.OPENAI_MODEL, messages=[ {"role": "system", "content": f"You are an encouraging {target_language} language teacher. Always respond with valid JSON and be supportive."}, {"role": "user", "content": feedback_prompt} ], max_tokens=600, temperature=0.7 ) feedback_json = response.choices[0].message.content.strip() logger.info(f"AI feedback response: {feedback_json}") # Parse JSON response try: # Clean up the JSON response cleaned_json = feedback_json.strip() if cleaned_json.startswith('```json'): cleaned_json = cleaned_json[7:-3].strip() elif cleaned_json.startswith('```'): cleaned_json = cleaned_json[3:-3].strip() feedback_data = json.loads(cleaned_json) return ConversationFeedbackResponse( encouragement=feedback_data.get("encouragement", "Great job practicing! Every conversation helps you improve."), suggestions=feedback_data.get("suggestions", []), examples=feedback_data.get("examples", []) ) except json.JSONDecodeError as e: logger.error(f"JSON decode error: {str(e)} for content: {cleaned_json}") # Fallback response return ConversationFeedbackResponse( encouragement="Great job practicing! Every conversation helps you improve.", suggestions=[ { "category": "Practice", "tip": "Keep practicing regular conversations to build confidence" } ], examples=[] ) except Exception as e: logger.error(f"Feedback generation error: {str(e)}") # Return encouraging fallback return ConversationFeedbackResponse( encouragement="Great job practicing! Every conversation helps you improve.", suggestions=[ { "category": "Practice", "tip": "Keep practicing regular conversations to build confidence" } ], examples=[] ) @app.get("/api/health") async def health_check() -> dict: return {"status": "healthy"} session_services: Dict[str, Any] = {} @app.websocket("/ws/speech/{language}") async def websocket_speech_endpoint(websocket: WebSocket, language: str): await websocket.accept() logger.info(f"WebSocket client connected for language: {language}") # Validate language if language not in language_services: await websocket.close(code=1008, reason="Unsupported language") return audio_buffer = bytearray() min_audio_length = 48000 is_recording = False chunk_count = 0 latest_transcript = "" recording_start_time = None max_recording_duration = 60 # 60 seconds max (increased to give more time after suggestions) transcript_repeat_count = 0 last_transcript = "" high_confidence_count = 0 import uuid session_id = str(uuid.uuid4()) session_conversation_service = language_services[language].__class__() # Create new instance session_services[session_id] = session_conversation_service try: while True: data = await websocket.receive_text() message = json.loads(data) logger.info(f"Received message type: {message['type']}") if message["type"] == "audio_start": is_recording = True audio_buffer.clear() chunk_count = 0 latest_transcript = "" recording_start_time = time.time() logger.info("Started recording session") elif message["type"] == "conversation_reset": session_conversation_service.ai_service.reset_conversation() logger.info("Conversation history reset") elif message["type"] == "audio_chunk": if is_recording: # Check for recording timeout if recording_start_time and time.time() - recording_start_time > max_recording_duration: logger.warning("Recording timeout reached, auto-stopping") # Send timeout notification to frontend timeout_notification = { "type": "recording_timeout", "message": "Recording stopped due to timeout" } await websocket.send_text(json.dumps(timeout_notification)) # Force audio_end processing message = {"type": "audio_end", "scenario_context": message.get("scenario_context", "")} # Don't return, let it fall through to audio_end processing else: audio_data = base64.b64decode(message["audio"]) logger.info(f"Received audio chunk: {len(audio_data)} bytes") audio_buffer.extend(audio_data) logger.info(f"Audio buffer size: {len(audio_buffer)} bytes") # Process chunk for real-time transcription chunk_count += 1 try: # Only process every 8th chunk to reduce log spam and API calls if chunk_count % 8 == 0 and len(audio_buffer) >= 19200: # ~0.4 seconds of audio at 48kHz recognition_audio = speech.RecognitionAudio(content=bytes(audio_buffer)) response = session_conversation_service.stt_service.client.recognize( config=session_conversation_service.stt_service.recognition_config, audio=recognition_audio ) if response.results: transcript = response.results[0].alternatives[0].transcript confidence = response.results[0].alternatives[0].confidence # Store transcript if confidence is reasonable (lowered for speed) if confidence > 0.6: latest_transcript = transcript # Store latest transcript # Check for repeated high-confidence transcripts if confidence > 0.9: if transcript == last_transcript: high_confidence_count += 1 logger.info(f"Repeated high confidence transcript #{high_confidence_count}: '{transcript}' (confidence: {confidence})") # If we've seen the same high-confidence transcript 4+ times, auto-stop if high_confidence_count >= 4: logger.info("Auto-stopping recording due to repeated high-confidence transcript") is_recording = False # Send final processing message final_message = {"type": "audio_end", "scenario_context": message.get("scenario_context", "")} # Process immediately without waiting for more chunks await websocket.send_text(json.dumps({ "type": "transcription", "transcript": transcript, "is_final": True, "confidence": confidence })) # Process AI response logger.info("Getting AI response...") ai_response = await session_conversation_service.process_conversation_flow_fast( transcript, message.get("scenario_context", "") ) logger.info(f"AI response: {ai_response.get('text', 'No text')}") await websocket.send_text(json.dumps(ai_response)) audio_buffer.clear() logger.info("Recording session ended due to repeated transcript") continue # Continue to next message else: high_confidence_count = 1 last_transcript = transcript logger.info(f"High confidence transcript ready: '{transcript}' (confidence: {confidence})") else: high_confidence_count = 0 last_transcript = "" transcription_result = { "type": "transcription", "transcript": transcript, "is_final": False, "confidence": confidence } await websocket.send_text(json.dumps(transcription_result)) # Only log interim transcriptions occasionally to reduce spam if chunk_count % 16 == 0: logger.info(f"Interim transcription: '{transcript}' (confidence: {confidence})") else: transcription_result = { "type": "transcription", "transcript": "Listening...", "is_final": False, "confidence": 0.0 } await websocket.send_text(json.dumps(transcription_result)) except Exception as e: # Only log transcription errors occasionally to reduce spam if chunk_count % 16 == 0: logger.error(f"Real-time transcription error: {str(e)}") transcription_result = { "type": "transcription", "transcript": "Listening...", "is_final": False, "confidence": 0.0 } await websocket.send_text(json.dumps(transcription_result)) else: # Reduce logging for non-recording chunks if chunk_count % 32 == 0: logger.info("Received audio chunk but not in recording mode") elif message["type"] == "audio_end": is_recording = False final_transcript = "" # Use latest interim transcript if available for faster response logger.info(f"Checking latest_transcript: '{latest_transcript}'") if latest_transcript.strip(): final_transcript = latest_transcript logger.info(f"Using latest interim transcript: '{final_transcript}'") # Send final transcription immediately transcription_result = { "type": "transcription", "transcript": final_transcript, "is_final": True, "confidence": 0.8 # Reasonable confidence for interim result } await websocket.send_text(json.dumps(transcription_result)) # Process AI response with faster flow logger.info("Getting AI response...") ai_response = await session_conversation_service.process_conversation_flow_fast( final_transcript, message.get("scenario_context", "") ) logger.info(f"AI response: {ai_response.get('text', 'No text')}") await websocket.send_text(json.dumps(ai_response)) # Clear buffer audio_buffer.clear() logger.info("Recording session ended, ready for next session") elif len(audio_buffer) > 0: # Fallback to full transcription if no interim results logger.info(f"Processing final audio buffer: {len(audio_buffer)} bytes") try: recognition_audio = speech.RecognitionAudio(content=bytes(audio_buffer)) response = session_conversation_service.stt_service.client.recognize( config=session_conversation_service.stt_service.recognition_config, audio=recognition_audio ) if response.results: transcript = response.results[0].alternatives[0].transcript confidence = response.results[0].alternatives[0].confidence logger.info(f"Final transcription: '{transcript}' (confidence: {confidence})") transcription_result = { "type": "transcription", "transcript": transcript, "is_final": True, "confidence": confidence } await websocket.send_text(json.dumps(transcription_result)) logger.info("Getting AI response...") ai_response = await session_conversation_service.process_conversation_flow( transcript, message.get("scenario_context", "") ) logger.info(f"AI response: {ai_response.get('text', 'No text')}") await websocket.send_text(json.dumps(ai_response)) else: logger.info("No transcription results from Google Speech") # Send empty final transcription so UI knows recording ended transcription_result = { "type": "transcription", "transcript": "", "is_final": True, "confidence": 0.0 } await websocket.send_text(json.dumps(transcription_result)) audio_buffer.clear() logger.info("Recording session ended, ready for next session") except Exception as e: logger.error(f"Final speech recognition error: {str(e)}") # Send empty final transcription so UI knows recording ended transcription_result = { "type": "transcription", "transcript": "", "is_final": True, "confidence": 0.0 } await websocket.send_text(json.dumps(transcription_result)) error_result = { "type": "error", "message": f"Speech recognition error: {str(e)}" } await websocket.send_text(json.dumps(error_result)) audio_buffer.clear() else: logger.info("No audio data to process") # Send empty final transcription so UI knows recording ended transcription_result = { "type": "transcription", "transcript": "", "is_final": True, "confidence": 0.0 } await websocket.send_text(json.dumps(transcription_result)) elif message["type"] == "text_message": logger.info(f"Processing text message: '{message['text']}'") ai_response = await session_conversation_service.process_conversation_flow( message["text"], message.get("scenario_context", "") ) logger.info(f"AI response: {ai_response.get('text', 'No text')}") await websocket.send_text(json.dumps(ai_response)) elif message["type"] == "initial_greeting": logger.info("Processing initial greeting request") ai_response = await session_conversation_service.generate_initial_greeting( message.get("scenario_context", "") ) logger.info(f"Initial greeting: {ai_response.get('text', 'No text')}") await websocket.send_text(json.dumps(ai_response)) except WebSocketDisconnect: logger.info("WebSocket client disconnected") except Exception as e: logger.error(f"WebSocket error: {str(e)}") error_message = { "type": "error", "message": f"WebSocket error: {str(e)}" } await websocket.send_text(json.dumps(error_message)) @app.websocket("/ws/tts") async def websocket_tts_endpoint(websocket: WebSocket): """WebSocket endpoint for text-to-speech streaming.""" await websocket.accept() try: while True: data = await websocket.receive_text() message = json.loads(data) if message["type"] == "synthesize": try: # Use the default TTS service for this endpoint tts_service = language_services["indonesian"].tts_service audio_content = await tts_service.synthesize_speech(message["text"]) audio_base64 = base64.b64encode(audio_content).decode('utf-8') response = { "type": "audio", "audio": audio_base64, "format": "mp3" } await websocket.send_text(json.dumps(response)) except Exception as e: error_response = { "type": "error", "message": f"TTS error: {str(e)}" } await websocket.send_text(json.dumps(error_response)) except WebSocketDisconnect: print("TTS client disconnected") except Exception as e: error_message = { "type": "error", "message": f"TTS WebSocket error: {str(e)}" } await websocket.send_text(json.dumps(error_message)) if __name__ == "__main__": import uvicorn uvicorn.run(app, host=config.HOST, port=config.PORT, log_level="debug" if config.DEBUG else "info")