street-lingo/backend/main.py

893 lines
40 KiB
Python

import difflib
import re
import json
import base64
import logging
import time
from typing import Dict, Any, List
from fastapi import FastAPI, HTTPException, WebSocket, WebSocketDisconnect
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from google.cloud import speech
import openai
from languages.indonesian.services import IndonesianConversationFlowService
from languages.german.services import GermanConversationFlowService
from config import config
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = FastAPI()
config.validate()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Temporarily allow all origins for debugging
allow_credentials=False, # Set to False when using allow_origins=["*"]
allow_methods=["*"],
allow_headers=["*"],
)
# Language-specific services
language_services = {
"indonesian": IndonesianConversationFlowService(),
"german": GermanConversationFlowService()
}
class ResponseCheck(BaseModel):
user_response: str
expected_response: str
scenario: str
class ResponseResult(BaseModel):
is_correct: bool
feedback: str
similarity: float
class TranslationRequest(BaseModel):
text: str
source_language: str
target_language: str
class TranslationResult(BaseModel):
translation: str
source_text: str
class SuggestionRequest(BaseModel):
language: str
scenario: str
conversation_history: List[Dict[str, str]]
class SuggestionResponse(BaseModel):
intro: str
suggestions: List[Dict[str, str]]
class ConversationFeedbackRequest(BaseModel):
language: str
scenario: str
conversation_history: List[Dict[str, str]]
class ConversationFeedbackResponse(BaseModel):
encouragement: str
suggestions: List[Dict[str, str]]
examples: List[Dict[str, str]]
def normalize_text(text: str) -> str:
text = text.lower().strip()
text = re.sub(r"[^\w\s]", "", text)
text = re.sub(r"\s+", " ", text)
return text
def calculate_similarity(text1: str, text2: str) -> float:
normalized1 = normalize_text(text1)
normalized2 = normalize_text(text2)
return difflib.SequenceMatcher(None, normalized1, normalized2).ratio()
def generate_feedback(
user_response: str, expected_response: str, similarity: float, scenario: str
) -> str:
if similarity >= 0.9:
return "Perfect! Excellent Indonesian!"
elif similarity >= 0.7:
return "Great job! That's correct!"
elif similarity >= 0.5:
return f"Good attempt! Try: '{expected_response}'"
elif similarity >= 0.3:
return f"Close, but try again. Expected: '{expected_response}'"
else:
return f"Not quite right. The correct answer is: '{expected_response}'"
@app.post("/api/check-response", response_model=ResponseResult)
async def check_response(request: ResponseCheck) -> ResponseResult:
"""Check user response against expected response."""
try:
similarity = calculate_similarity(request.user_response, request.expected_response)
is_correct = similarity >= 0.7
feedback = generate_feedback(
request.user_response,
request.expected_response,
similarity,
request.scenario,
)
return ResponseResult(
is_correct=is_correct,
feedback=feedback,
similarity=similarity,
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e)) from e
@app.get("/api/scenarios/{language}")
async def get_scenarios(language: str) -> dict:
"""Get scenarios for a specific language (indonesian or german)"""
if language == "indonesian":
from languages.indonesian.models import SCENARIO_PERSONALITIES
native_key = "indonesian"
elif language == "german":
from languages.german.models import SCENARIO_PERSONALITIES
native_key = "native"
else:
raise HTTPException(status_code=400, detail="Unsupported language")
scenarios = {}
for scenario_id, personalities in SCENARIO_PERSONALITIES.items():
default_personality = list(personalities.values())[0]
scenarios[scenario_id] = {
"id": scenario_id,
"title": default_personality.scenario_title,
"description": default_personality.scenario_description,
"challenge": default_personality.scenario_challenge,
"goal": default_personality.scenario_goal,
"character": default_personality.name,
"character_background": default_personality.background,
"character_gender": default_personality.gender.value,
"location": default_personality.location_context,
"language": language,
"goal_items": [
{
"id": item.id,
"description": item.description,
"completed": False
} for item in default_personality.goal_items
],
"helpful_phrases": [
{
native_key: phrase.native if hasattr(phrase, 'native') else phrase.indonesian,
"english": phrase.english
} for phrase in default_personality.helpful_phrases
],
"available_characters": [
{
"id": char_id,
"name": char.name,
"background": char.background,
"tone": char.tone.value,
"gender": char.gender.value
} for char_id, char in personalities.items()
]
}
return scenarios
@app.get("/api/scenarios")
async def get_all_scenarios() -> dict:
"""Get all available scenarios for all languages"""
all_scenarios = {}
# Get Indonesian scenarios
indonesian_scenarios = await get_scenarios("indonesian")
all_scenarios["indonesian"] = indonesian_scenarios
# Get German scenarios
german_scenarios = await get_scenarios("german")
all_scenarios["german"] = german_scenarios
return all_scenarios
@app.post("/api/suggestions", response_model=SuggestionResponse)
async def generate_suggestions(request: SuggestionRequest) -> SuggestionResponse:
"""Generate contextual language suggestions based on conversation history."""
logger.info(f"Received suggestions request: language={request.language}, scenario={request.scenario}")
try:
client = openai.OpenAI(api_key=config.OPENAI_API_KEY)
# Get recent conversation context
conversation_context = ""
for i, msg in enumerate(request.conversation_history[-4:]):
conversation_context += f"{msg['type'].capitalize()}: {msg['text']}\n"
# Determine target language and context
if request.language == "german":
target_language = "German"
native_language = "English"
scenario_prompt = f"in a {request.scenario} scenario in Germany"
else:
target_language = "Indonesian"
native_language = "English"
scenario_prompt = f"in a {request.scenario} scenario in Indonesia"
suggestion_prompt = f"""You are a helpful language learning assistant. Based on the conversation history below, suggest 3 useful phrases the user might want to say next in {target_language}.
Conversation context {scenario_prompt}:
{conversation_context}
Provide suggestions as a JSON object with:
- "intro": A brief encouraging message about what they might want to say next
- "suggestions": Array of 3 objects, each with:
- "{target_language.lower()}_text": The phrase in {target_language}
- "english_meaning": The English translation/meaning
Make the suggestions contextual, natural, and progressively helpful for the conversation. Focus on practical phrases they might actually need.
Example format:
{{
"intro": "Here are some phrases you might find useful:",
"suggestions": [
{{
"{target_language.lower()}_text": "Example phrase",
"english_meaning": "English translation"
}}
]
}}"""
response = client.chat.completions.create(
model=config.OPENAI_MODEL,
messages=[
{"role": "system", "content": f"You are a helpful {target_language} language learning assistant. Always respond with valid JSON."},
{"role": "user", "content": suggestion_prompt}
],
max_tokens=500,
temperature=0.7
)
suggestion_json = response.choices[0].message.content.strip()
logger.info(f"AI suggestion response: {suggestion_json}")
# Parse JSON response
import json
try:
# Clean up the JSON response to handle potential formatting issues
cleaned_json = suggestion_json.strip()
if cleaned_json.startswith('```json'):
cleaned_json = cleaned_json[7:-3].strip()
elif cleaned_json.startswith('```'):
cleaned_json = cleaned_json[3:-3].strip()
suggestion_data = json.loads(cleaned_json)
return SuggestionResponse(
intro=suggestion_data.get("intro", "Here are some helpful phrases:"),
suggestions=suggestion_data.get("suggestions", [])
)
except json.JSONDecodeError as e:
logger.error(f"JSON decode error: {str(e)} for content: {cleaned_json}")
# Fallback if JSON parsing fails
text_key = f"{target_language.lower()}_text"
fallback_suggestions = [
{
text_key: "Excuse me, can you help me?",
"english_meaning": "A polite way to ask for assistance"
},
{
text_key: "Thank you very much",
"english_meaning": "Express gratitude"
},
{
text_key: "I don't understand",
"english_meaning": "When you need clarification"
}
]
return SuggestionResponse(
intro="Here are some helpful phrases:",
suggestions=fallback_suggestions
)
except Exception as e:
logger.error(f"Suggestion generation error: {str(e)}")
# Return fallback suggestions instead of raising an error
return SuggestionResponse(
intro="Here are some helpful phrases:",
suggestions=[
{
"german_text" if request.language == "german" else "indonesian_text": "Hello",
"english_meaning": "A basic greeting"
},
{
"german_text" if request.language == "german" else "indonesian_text": "Thank you",
"english_meaning": "Express gratitude"
},
{
"german_text" if request.language == "german" else "indonesian_text": "Please",
"english_meaning": "Polite request"
}
]
)
@app.post("/api/translate", response_model=TranslationResult)
async def translate_text(request: TranslationRequest) -> TranslationResult:
try:
client = openai.OpenAI(api_key=config.OPENAI_API_KEY)
translation_prompt = f"""Translate the following Indonesian text to natural, conversational English.
Keep the tone and style appropriate for casual conversation.
Indonesian text: "{request.text}"
Provide only the English translation, nothing else."""
response = client.chat.completions.create(
model=config.OPENAI_MODEL,
messages=[
{"role": "system", "content": "You are a professional Indonesian to English translator. Provide natural, conversational translations."},
{"role": "user", "content": translation_prompt}
],
max_tokens=200,
temperature=0.3
)
translation = response.choices[0].message.content.strip()
return TranslationResult(
translation=translation,
source_text=request.text
)
except Exception as e:
logger.error(f"Translation error: {str(e)}")
raise HTTPException(status_code=500, detail=f"Translation failed: {str(e)}")
@app.post("/api/conversation-feedback", response_model=ConversationFeedbackResponse)
async def generate_conversation_feedback(request: ConversationFeedbackRequest) -> ConversationFeedbackResponse:
"""Generate encouraging feedback and suggestions for completed conversation."""
logger.info(f"Received feedback request: language={request.language}, scenario={request.scenario}")
try:
client = openai.OpenAI(api_key=config.OPENAI_API_KEY)
# Build conversation history
conversation_context = ""
user_messages = []
for msg in request.conversation_history:
if msg.get('type') == 'user':
user_messages.append(msg['text'])
conversation_context += f"{msg.get('type', 'unknown').capitalize()}: {msg.get('text', '')}\n"
# Determine target language and feedback context
if request.language == "german":
target_language = "German"
language_specific_feedback = """
Focus on common German language learning areas:
- Article usage (der, die, das)
- Verb conjugation and word order
- Formal vs informal language (Sie vs du)
- Separable verbs
- Common German expressions and idioms
"""
else:
target_language = "Indonesian"
language_specific_feedback = """
Focus on common Indonesian language learning areas:
- Using everyday, natural Indonesian words and expressions
- Sounding more natural and conversational (not textbook formal)
- Common Indonesian idioms and colloquial expressions
- Sentence structure and word order
- Building confidence in casual conversation
"""
feedback_prompt = f"""You are an encouraging {target_language} language teacher. A student has just finished a conversation practice session in a {request.scenario} scenario.
Here's their conversation:
{conversation_context}
{language_specific_feedback}
MANDATORY ANALYSIS: Before providing feedback, carefully examine each thing the student said for language issues. Look for:
1. Unnatural phrasing or word choices
2. Grammar mistakes or awkward constructions
3. Word order problems
4. Missing words that would make meaning clearer
5. Overly formal or informal expressions for the context
If you find ANY of these issues in their actual speech, you MUST provide specific suggestions and examples. Do not give empty suggestions/examples arrays unless their language was genuinely perfect.
Provide helpful, encouraging feedback as a JSON object with:
- "encouragement": A positive, motivating message about their effort (2-3 sentences)
- "suggestions": Array of 0-3 objects with:
- "category": Area of improvement (e.g., "Pronunciation", "Grammar", "Vocabulary")
- "tip": Specific, actionable advice based ONLY on what they actually said in the conversation
- "examples": Array of 0-2 objects with:
- "original": Something they actually said (from the conversation)
- "improved": A better way to say it
- "reason": Brief explanation of why it's better
CRITICAL REQUIREMENT: You MUST analyze the student's actual words and phrases for improvement opportunities. Common Indonesian learner issues to look for:
1. **Word Order**: "ayam indomi" should be "indomie ayam" (flavor comes after product)
2. **Phrasing**: "beli ayam indomi satu sama mau minum" is awkward - should be "mau beli indomie ayam sama minum"
3. **Missing Words**: "mau stroberi ultra milk" missing "yang" (mau yang stroberi)
4. **Unclear Intent**: "saya beli minum" should be "saya mau beli minum" (clearer intention)
Do NOT give empty suggestions/examples unless the conversation was genuinely flawless. If there are language issues (which there usually are), provide specific, helpful corrections.
Make it encouraging and supportive, focusing on growth rather than criticism. If they did well, focus on areas to sound more natural or confident.
For Indonesian specifically:
- Focus on everyday conversational language rather than formal politeness
- Emphasize natural, casual expressions that locals actually use
- Include specific word examples in your tips
- Avoid focusing on formal grammar rules - prioritize natural communication
Example format for good conversation (no meaningful improvements needed):
{{
"encouragement": "Fantastic job in your conversation practice! You really engaged well and made your choices clear. Keep up the great work, and your confidence will only grow!",
"suggestions": [],
"examples": []
}}
Example format for conversation with meaningful improvements:
{{
"encouragement": "You did a great job engaging in this conversation! Your effort to communicate is really paying off.",
"suggestions": [
{{
"category": "Word Order",
"tip": "In Indonesian, the product name comes first, then the flavor - so 'indomie ayam' instead of 'ayam indomi'"
}},
{{
"category": "Phrasing",
"tip": "When expressing what you want to buy, use 'mau beli' to be clearer about your intention"
}}
],
"examples": [
{{
"original": "beli ayam indomi satu sama mau minum",
"improved": "mau beli indomie ayam satu sama minum",
"reason": "Better word order and clearer intention - 'mau beli' shows you want to buy"
}},
{{
"original": "mau stroberi ultra milk",
"improved": "mau ultra milk yang stroberi",
"reason": "Adding 'yang' makes it clearer which flavor you want"
}}
]
}}
IMPORTANT: Analyze the student's actual phrases from the conversation above and provide specific corrections for any unnatural or incorrect expressions. Don't give empty arrays unless their Indonesian was perfect."""
response = client.chat.completions.create(
model=config.OPENAI_MODEL,
messages=[
{"role": "system", "content": f"You are an encouraging {target_language} language teacher. Always respond with valid JSON and be supportive."},
{"role": "user", "content": feedback_prompt}
],
max_tokens=600,
temperature=0.7
)
feedback_json = response.choices[0].message.content.strip()
logger.info(f"AI feedback response: {feedback_json}")
# Parse JSON response
try:
# Clean up the JSON response
cleaned_json = feedback_json.strip()
if cleaned_json.startswith('```json'):
cleaned_json = cleaned_json[7:-3].strip()
elif cleaned_json.startswith('```'):
cleaned_json = cleaned_json[3:-3].strip()
feedback_data = json.loads(cleaned_json)
return ConversationFeedbackResponse(
encouragement=feedback_data.get("encouragement", "Great job practicing! Every conversation helps you improve."),
suggestions=feedback_data.get("suggestions", []),
examples=feedback_data.get("examples", [])
)
except json.JSONDecodeError as e:
logger.error(f"JSON decode error: {str(e)} for content: {cleaned_json}")
# Fallback response
return ConversationFeedbackResponse(
encouragement="Great job practicing! Every conversation helps you improve.",
suggestions=[
{
"category": "Practice",
"tip": "Keep practicing regular conversations to build confidence"
}
],
examples=[]
)
except Exception as e:
logger.error(f"Feedback generation error: {str(e)}")
# Return encouraging fallback
return ConversationFeedbackResponse(
encouragement="Great job practicing! Every conversation helps you improve.",
suggestions=[
{
"category": "Practice",
"tip": "Keep practicing regular conversations to build confidence"
}
],
examples=[]
)
@app.get("/api/health")
async def health_check() -> dict:
return {"status": "healthy"}
session_services: Dict[str, Any] = {}
@app.websocket("/ws/speech/{language}")
async def websocket_speech_endpoint(websocket: WebSocket, language: str):
await websocket.accept()
logger.info(f"WebSocket client connected for language: {language}")
# Validate language
if language not in language_services:
await websocket.close(code=1008, reason="Unsupported language")
return
audio_buffer = bytearray()
min_audio_length = 48000
is_recording = False
chunk_count = 0
latest_transcript = ""
recording_start_time = None
max_recording_duration = 60 # 60 seconds max (increased to give more time after suggestions)
transcript_repeat_count = 0
last_transcript = ""
high_confidence_count = 0
session_processed = False # Flag to prevent duplicate processing
import uuid
session_id = str(uuid.uuid4())
session_conversation_service = language_services[language].__class__() # Create new instance
session_services[session_id] = session_conversation_service
try:
while True:
data = await websocket.receive_text()
message = json.loads(data)
logger.info(f"Received message type: {message['type']}")
if message["type"] == "audio_start":
is_recording = True
audio_buffer.clear()
chunk_count = 0
latest_transcript = ""
recording_start_time = time.time()
session_processed = False # Reset processing flag for new session
logger.info("Started recording session")
elif message["type"] == "conversation_reset":
session_conversation_service.ai_service.reset_conversation()
logger.info("Conversation history reset")
elif message["type"] == "audio_chunk":
if is_recording:
# Check for recording timeout
if recording_start_time and time.time() - recording_start_time > max_recording_duration and not session_processed:
logger.warning("Recording timeout reached, auto-stopping")
is_recording = False
session_processed = True # Mark as processed to prevent duplicates
# Send timeout notification to frontend
timeout_notification = {
"type": "recording_timeout",
"message": "Recording stopped due to timeout"
}
await websocket.send_text(json.dumps(timeout_notification))
# Process final transcript if available
if latest_transcript.strip():
transcription_result = {
"type": "transcription",
"transcript": latest_transcript,
"is_final": True,
"confidence": 0.8
}
await websocket.send_text(json.dumps(transcription_result))
# Process AI response
logger.info("Getting AI response after timeout...")
ai_response = await session_conversation_service.process_conversation_flow_fast(
latest_transcript,
message.get("scenario_context", "")
)
logger.info(f"AI response: {ai_response.get('text', 'No text')}")
await websocket.send_text(json.dumps(ai_response))
audio_buffer.clear()
continue # Skip further processing for this message
else:
audio_data = base64.b64decode(message["audio"])
logger.info(f"Received audio chunk: {len(audio_data)} bytes")
audio_buffer.extend(audio_data)
logger.info(f"Audio buffer size: {len(audio_buffer)} bytes")
# Process chunk for real-time transcription
chunk_count += 1
try:
# Only process every 6th chunk for faster response (reduced from 8th)
if chunk_count % 6 == 0 and len(audio_buffer) >= 14400: # ~0.3 seconds of audio at 48kHz
recognition_audio = speech.RecognitionAudio(content=bytes(audio_buffer))
response = session_conversation_service.stt_service.client.recognize(
config=session_conversation_service.stt_service.recognition_config,
audio=recognition_audio
)
if response.results:
transcript = response.results[0].alternatives[0].transcript
confidence = response.results[0].alternatives[0].confidence
# Store transcript if confidence is reasonable (lowered for speed)
if confidence > 0.4: # Lowered threshold for faster processing
latest_transcript = transcript # Store latest transcript
# Check for repeated high-confidence transcripts
if confidence > 0.9:
if transcript == last_transcript:
high_confidence_count += 1
logger.info(f"Repeated high confidence transcript #{high_confidence_count}: '{transcript}' (confidence: {confidence})")
# If we've seen the same high-confidence transcript 3+ times, auto-stop (reduced from 4)
if high_confidence_count >= 3 and not session_processed:
logger.info("Auto-stopping recording due to repeated high-confidence transcript")
is_recording = False
session_processed = True # Mark as processed to prevent duplicates
# Send final processing message
await websocket.send_text(json.dumps({
"type": "transcription",
"transcript": transcript,
"is_final": True,
"confidence": confidence
}))
# Process AI response
logger.info("Getting AI response...")
ai_response = await session_conversation_service.process_conversation_flow_fast(
transcript,
message.get("scenario_context", "")
)
logger.info(f"AI response: {ai_response.get('text', 'No text')}")
await websocket.send_text(json.dumps(ai_response))
audio_buffer.clear()
logger.info("Recording session ended due to repeated transcript")
continue # Continue to next message
else:
high_confidence_count = 1
last_transcript = transcript
logger.info(f"High confidence transcript ready: '{transcript}' (confidence: {confidence})")
else:
high_confidence_count = 0
last_transcript = ""
transcription_result = {
"type": "transcription",
"transcript": transcript,
"is_final": False,
"confidence": confidence
}
await websocket.send_text(json.dumps(transcription_result))
# Only log interim transcriptions occasionally to reduce spam
if chunk_count % 16 == 0:
logger.info(f"Interim transcription: '{transcript}' (confidence: {confidence})")
else:
transcription_result = {
"type": "transcription",
"transcript": "Listening...",
"is_final": False,
"confidence": 0.0
}
await websocket.send_text(json.dumps(transcription_result))
except Exception as e:
# Only log transcription errors occasionally to reduce spam
if chunk_count % 16 == 0:
logger.error(f"Real-time transcription error: {str(e)}")
transcription_result = {
"type": "transcription",
"transcript": "Listening...",
"is_final": False,
"confidence": 0.0
}
await websocket.send_text(json.dumps(transcription_result))
else:
# Reduce logging for non-recording chunks
if chunk_count % 32 == 0:
logger.info("Received audio chunk but not in recording mode")
elif message["type"] == "audio_end":
is_recording = False
# Check if this session was already processed (e.g., by auto-stop logic)
if session_processed:
logger.info("Audio session already processed, skipping duplicate processing")
continue
final_transcript = ""
# Use latest interim transcript if available for faster response
logger.info(f"Checking latest_transcript: '{latest_transcript}'")
if latest_transcript.strip() and len(latest_transcript.strip()) > 3: # More aggressive check
final_transcript = latest_transcript
logger.info(f"Using latest interim transcript: '{final_transcript}'")
session_processed = True # Mark as processed to prevent duplicates
# Send final transcription immediately - no "Processing..." delay
transcription_result = {
"type": "transcription",
"transcript": final_transcript,
"is_final": True,
"confidence": 0.8 # Reasonable confidence for interim result
}
await websocket.send_text(json.dumps(transcription_result))
# Process AI response with faster flow - start immediately
logger.info("Getting AI response immediately...")
ai_response = await session_conversation_service.process_conversation_flow_fast(
final_transcript,
message.get("scenario_context", "")
)
logger.info(f"AI response: {ai_response.get('text', 'No text')}")
await websocket.send_text(json.dumps(ai_response))
# Clear buffer
audio_buffer.clear()
logger.info("Recording session ended, ready for next session")
elif len(audio_buffer) > 0:
# Fallback to full transcription if no interim results
logger.info(f"Processing final audio buffer: {len(audio_buffer)} bytes")
session_processed = True # Mark as processed to prevent duplicates
try:
recognition_audio = speech.RecognitionAudio(content=bytes(audio_buffer))
response = session_conversation_service.stt_service.client.recognize(
config=session_conversation_service.stt_service.recognition_config,
audio=recognition_audio
)
if response.results:
transcript = response.results[0].alternatives[0].transcript
confidence = response.results[0].alternatives[0].confidence
logger.info(f"Final transcription: '{transcript}' (confidence: {confidence})")
transcription_result = {
"type": "transcription",
"transcript": transcript,
"is_final": True,
"confidence": confidence
}
await websocket.send_text(json.dumps(transcription_result))
logger.info("Getting AI response...")
ai_response = await session_conversation_service.process_conversation_flow(
transcript,
message.get("scenario_context", "")
)
logger.info(f"AI response: {ai_response.get('text', 'No text')}")
await websocket.send_text(json.dumps(ai_response))
else:
logger.info("No transcription results from Google Speech")
# Send empty final transcription so UI knows recording ended
transcription_result = {
"type": "transcription",
"transcript": "",
"is_final": True,
"confidence": 0.0
}
await websocket.send_text(json.dumps(transcription_result))
audio_buffer.clear()
logger.info("Recording session ended, ready for next session")
except Exception as e:
logger.error(f"Final speech recognition error: {str(e)}")
# Send empty final transcription so UI knows recording ended
transcription_result = {
"type": "transcription",
"transcript": "",
"is_final": True,
"confidence": 0.0
}
await websocket.send_text(json.dumps(transcription_result))
error_result = {
"type": "error",
"message": f"Speech recognition error: {str(e)}"
}
await websocket.send_text(json.dumps(error_result))
audio_buffer.clear()
else:
logger.info("No audio data to process")
# Send empty final transcription so UI knows recording ended
transcription_result = {
"type": "transcription",
"transcript": "",
"is_final": True,
"confidence": 0.0
}
await websocket.send_text(json.dumps(transcription_result))
elif message["type"] == "text_message":
logger.info(f"Processing text message: '{message['text']}'")
ai_response = await session_conversation_service.process_conversation_flow(
message["text"],
message.get("scenario_context", "")
)
logger.info(f"AI response: {ai_response.get('text', 'No text')}")
await websocket.send_text(json.dumps(ai_response))
elif message["type"] == "initial_greeting":
logger.info("Processing initial greeting request")
ai_response = await session_conversation_service.generate_initial_greeting(
message.get("scenario_context", "")
)
logger.info(f"Initial greeting: {ai_response.get('text', 'No text')}")
await websocket.send_text(json.dumps(ai_response))
except WebSocketDisconnect:
logger.info("WebSocket client disconnected")
except Exception as e:
logger.error(f"WebSocket error: {str(e)}")
error_message = {
"type": "error",
"message": f"WebSocket error: {str(e)}"
}
await websocket.send_text(json.dumps(error_message))
@app.websocket("/ws/tts")
async def websocket_tts_endpoint(websocket: WebSocket):
"""WebSocket endpoint for text-to-speech streaming."""
await websocket.accept()
try:
while True:
data = await websocket.receive_text()
message = json.loads(data)
if message["type"] == "synthesize":
try:
# Use the default TTS service for this endpoint
tts_service = language_services["indonesian"].tts_service
audio_content = await tts_service.synthesize_speech(message["text"])
audio_base64 = base64.b64encode(audio_content).decode('utf-8')
response = {
"type": "audio",
"audio": audio_base64,
"format": "mp3"
}
await websocket.send_text(json.dumps(response))
except Exception as e:
error_response = {
"type": "error",
"message": f"TTS error: {str(e)}"
}
await websocket.send_text(json.dumps(error_response))
except WebSocketDisconnect:
print("TTS client disconnected")
except Exception as e:
error_message = {
"type": "error",
"message": f"TTS WebSocket error: {str(e)}"
}
await websocket.send_text(json.dumps(error_message))
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host=config.HOST, port=config.PORT, log_level="debug" if config.DEBUG else "info")