street-lingo/backend/speech_service.py

507 lines
22 KiB
Python

import asyncio
import json
import os
import logging
from typing import AsyncGenerator, Dict, Any, Optional, List
import base64
from google.cloud import speech
from google.cloud import texttospeech
from google.api_core import exceptions
import openai
from config import config
from models import Personality, SCENARIO_PERSONALITIES, GoalItem, Gender
logger = logging.getLogger(__name__)
class SpeechToTextService:
def __init__(self):
self.client = speech.SpeechClient()
# Get encoding from config
encoding_map = {
"WEBM_OPUS": speech.RecognitionConfig.AudioEncoding.WEBM_OPUS,
"LINEAR16": speech.RecognitionConfig.AudioEncoding.LINEAR16,
"FLAC": speech.RecognitionConfig.AudioEncoding.FLAC,
"MULAW": speech.RecognitionConfig.AudioEncoding.MULAW,
"AMR": speech.RecognitionConfig.AudioEncoding.AMR,
"AMR_WB": speech.RecognitionConfig.AudioEncoding.AMR_WB,
"OGG_OPUS": speech.RecognitionConfig.AudioEncoding.OGG_OPUS,
"MP3": speech.RecognitionConfig.AudioEncoding.MP3,
}
self.recognition_config = speech.RecognitionConfig(
encoding=encoding_map.get(config.SPEECH_ENCODING, speech.RecognitionConfig.AudioEncoding.WEBM_OPUS),
sample_rate_hertz=config.SPEECH_SAMPLE_RATE,
language_code=config.SPEECH_LANGUAGE_CODE,
enable_automatic_punctuation=True,
use_enhanced=True,
model="latest_long",
)
self.streaming_config = speech.StreamingRecognitionConfig(
config=self.recognition_config,
interim_results=True,
single_utterance=False,
)
async def transcribe_streaming(self, audio_generator: AsyncGenerator[bytes, None]) -> AsyncGenerator[Dict[str, Any], None]:
"""Stream audio data to Google Cloud Speech-to-Text and yield transcription results."""
try:
async def request_generator():
# First request with config
yield speech.StreamingRecognizeRequest(streaming_config=self.streaming_config)
# Then audio requests
async for chunk in audio_generator:
yield speech.StreamingRecognizeRequest(audio_content=chunk)
responses = self.client.streaming_recognize(request_generator())
for response in responses:
for result in response.results:
transcript = result.alternatives[0].transcript
is_final = result.is_final
yield {
"type": "transcription",
"transcript": transcript,
"is_final": is_final,
"confidence": result.alternatives[0].confidence if is_final else 0.0
}
except exceptions.GoogleAPICallError as e:
yield {
"type": "error",
"message": f"Speech recognition error: {str(e)}"
}
class TextToSpeechService:
def __init__(self):
self.client = texttospeech.TextToSpeechClient()
# Gender mapping for Google TTS
self.gender_map = {
"FEMALE": texttospeech.SsmlVoiceGender.FEMALE,
"MALE": texttospeech.SsmlVoiceGender.MALE,
"NEUTRAL": texttospeech.SsmlVoiceGender.NEUTRAL,
"male": texttospeech.SsmlVoiceGender.MALE,
"female": texttospeech.SsmlVoiceGender.FEMALE,
}
def _get_voice_and_audio_config(self, gender: str, character_name: str = None) -> tuple:
"""Get appropriate voice and audio configuration based on gender."""
tts_gender = self.gender_map.get(gender, texttospeech.SsmlVoiceGender.FEMALE)
character_voice_map = {
"Pak Budi": {
"name": "id-ID-Chirp3-HD-Charon",
"speaking_rate": 0.95,
"pitch": None,
"ssml_gender": texttospeech.SsmlVoiceGender.MALE,
},
"Ibu Sari": {
"name": "id-ID-Chirp3-HD-Kore",
"speaking_rate": 1.0,
"pitch": None,
"ssml_gender": texttospeech.SsmlVoiceGender.FEMALE,
},
"Mbak Sari": {
"name": "id-ID-Chirp3-HD-Zephyr",
"speaking_rate": 1.1,
"pitch": None,
"ssml_gender": texttospeech.SsmlVoiceGender.FEMALE,
},
"Adik Kasir": {
"name": "id-ID-Chirp3-HD-Aoede",
"speaking_rate": 1.05,
"pitch": None,
"ssml_gender": texttospeech.SsmlVoiceGender.FEMALE,
},
"Tetangga Ali": {
"name": "id-ID-Chirp3-HD-Puck",
"speaking_rate": 1.05,
"pitch": None,
"ssml_gender": texttospeech.SsmlVoiceGender.MALE,
}
}
gender_voice_fallback = {
texttospeech.SsmlVoiceGender.MALE: {
"name": "id-ID-Chirp3-HD-Fenrir",
"speaking_rate": 1.0,
"pitch": None,
"ssml_gender": texttospeech.SsmlVoiceGender.MALE,
},
texttospeech.SsmlVoiceGender.FEMALE: {
"name": "id-ID-Chirp3-HD-Leda",
"speaking_rate": 1.0,
"pitch": None,
"ssml_gender": texttospeech.SsmlVoiceGender.FEMALE,
}
}
config_set = None
if character_name and character_name in character_voice_map:
config_set = character_voice_map[character_name]
logger.info(f"Using character-specific voice for '{character_name}': {config_set['name']}")
if not config_set:
config_set = gender_voice_fallback.get(tts_gender, gender_voice_fallback[texttospeech.SsmlVoiceGender.FEMALE])
logger.info(f"Using gender fallback voice for {tts_gender}: {config_set['name']}")
voice = texttospeech.VoiceSelectionParams(
language_code=config.TTS_LANGUAGE_CODE,
name=config_set["name"],
ssml_gender=config_set["ssml_gender"],
)
audio_config_params = {
"audio_encoding": texttospeech.AudioEncoding.LINEAR16,
"speaking_rate": config_set["speaking_rate"],
"effects_profile_id": ['handset-class-device'],
}
if config_set["pitch"] is not None:
audio_config_params["pitch"] = config_set["pitch"]
audio_config = texttospeech.AudioConfig(**audio_config_params)
return voice, audio_config
async def synthesize_speech(self, text: str, gender: str = "female", character_name: str = None) -> bytes:
"""Convert text to speech using Google Cloud Text-to-Speech with natural, conversational voice."""
try:
logger.info(f"TTS synthesize_speech called with text: '{text}', gender: '{gender}', character: '{character_name}'")
voice, audio_config = self._get_voice_and_audio_config(gender, character_name)
logger.info(f"Using voice: {voice.name}, requested gender: '{gender}', mapped TTS gender: {voice.ssml_gender}")
synthesis_input = texttospeech.SynthesisInput(text=text)
response = self.client.synthesize_speech(
input=synthesis_input,
voice=voice,
audio_config=audio_config,
)
logger.info(f"TTS successful, audio length: {len(response.audio_content)} bytes")
return response.audio_content
except exceptions.GoogleAPICallError as e:
logger.error(f"Text-to-speech error: {str(e)}")
raise Exception(f"Text-to-speech error: {str(e)}")
class AIConversationService:
def __init__(self):
self.client = openai.OpenAI(api_key=config.OPENAI_API_KEY)
self.model = config.OPENAI_MODEL
self.current_personality: Optional[Personality] = None
self.conversation_history: List[Dict[str, str]] = []
self.goal_progress: List[GoalItem] = []
def set_personality(self, personality: Personality):
"""Set the current personality for the conversation."""
self.current_personality = personality
# Reset conversation history when personality changes
self.conversation_history = []
# Initialize goal progress
self.goal_progress = [GoalItem(**item.dict()) for item in personality.goal_items]
def reset_conversation(self):
"""Reset the conversation history."""
self.conversation_history = []
# Reset goal progress
if self.current_personality:
self.goal_progress = [GoalItem(**item.dict()) for item in self.current_personality.goal_items]
def get_personality_for_scenario(self, scenario: str, character_name: str = None) -> Personality:
"""Get personality based on scenario and optional character name."""
if scenario in SCENARIO_PERSONALITIES:
personalities = SCENARIO_PERSONALITIES[scenario]
if character_name and character_name in personalities:
return personalities[character_name]
else:
# Return first personality if no specific character requested
return list(personalities.values())[0]
# Return default personality if scenario not found
return Personality(
character_type="generic",
name="Pak/Bu",
tone="friendly",
age_range="middle-aged",
background="Helpful Indonesian person",
typical_phrases=["Halo!", "Apa kabar?", "Bisa saya bantu?"],
response_style="Friendly and helpful",
location_context="Indonesia",
is_helpful=True,
is_talkative=True
)
async def check_goal_completion(self, user_message: str, ai_response: str) -> bool:
"""Check if any goals are completed using LLM judge."""
if not self.goal_progress:
return False
goals_completed = False
# Only check goals that aren't already completed
incomplete_goals = [g for g in self.goal_progress if not g.completed]
if not incomplete_goals:
return False
logger.info(f"Checking goal completion for user message: '{user_message}'")
logger.info(f"Incomplete goals: {[g.description for g in incomplete_goals]}")
conversation_context = ""
for exchange in self.conversation_history[-3:]:
conversation_context += f"User: {exchange['user']}\nAI: {exchange['assistant']}\n"
for goal in incomplete_goals:
logger.info(f"Checking goal: '{goal.description}'")
completion_check = await self._judge_goal_completion(
goal,
user_message,
ai_response,
conversation_context
)
if completion_check:
goal.completed = True
goals_completed = True
logger.info(f"✅ Goal completed: {goal.description}")
else:
logger.info(f"❌ Goal not completed: {goal.description}")
return goals_completed
async def _judge_goal_completion(self, goal: GoalItem, user_message: str, ai_response: str, conversation_context: str) -> bool:
"""Use LLM to judge if a specific goal was completed."""
try:
if "order" in goal.description.lower() or "buy" in goal.description.lower():
judge_prompt = f"""You are a strict judge determining if a specific goal was FULLY completed in a conversation.
GOAL TO CHECK: {goal.description}
RECENT CONVERSATION CONTEXT:
{conversation_context}
LATEST EXCHANGE:
User: {user_message}
AI: {ai_response}
CRITICAL RULES FOR ORDERING GOALS:
1. ONLY return "YES" if the user has COMPLETELY finished this exact goal
2. Return "NO" if the goal is partial, incomplete, or just being discussed
3. For "Order [item]" goals: user must explicitly say they want/order that EXACT item with ALL specifications
4. For drink goals: user must specifically mention wanting/ordering a drink
5. Don't mark as complete just because the AI is asking about it
Answer ONLY "YES" or "NO":"""
else:
judge_prompt = f"""You are judging if a conversational goal was completed in a natural small talk scenario.
GOAL TO CHECK: {goal.description}
RECENT CONVERSATION CONTEXT:
{conversation_context}
LATEST EXCHANGE:
User: {user_message}
AI: {ai_response}
RULES FOR SMALL TALK GOALS:
1. Return "YES" if the user has naturally accomplished this conversational goal ANYWHERE in the conversation
2. For "Share something about yourself" goals: Look through the ENTIRE conversation for work, family, hobbies, personal interests, financial situation, dreams, etc.
3. For "Ask follow-up questions" goals: user asks questions to continue conversation
4. For "Exchange greetings" goals: user greets or responds to greetings
5. For "Discuss weather/daily life" goals: user talks about weather, daily activities, current events
6. Goals can be completed through natural conversation flow, not just direct statements
7. IMPORTANT: Check the FULL conversation context, not just the latest exchange
EXAMPLES:
- Goal: "Share something about yourself (work, family, hobbies, etc.)"
- User mentions work: "sibuk banget di kantor sering lembur" → YES (work situation)
- User mentions finances: "nggak punya duit" → YES (personal finance)
- User mentions hobbies: "sukanya ke Afrika" → YES (travel interests)
- User mentions dreams: "Belum pernah mimpi aja dulu sih" → YES (personal aspirations)
- User just greets: "Baik nih" → NO (just greeting, no personal info)
- Goal: "Ask follow-up questions to keep the conversation going"
- User: "Mas Ali suka lari juga gak?" → YES (asking follow-up question)
- User: "Gimana kabar keluarga?" → YES (asking about family)
- User: "Iya" → NO (just responding, not asking)
Be reasonable and natural - small talk goals should be completed through normal conversation.
SCAN THE ENTIRE CONVERSATION, not just the latest message.
Answer ONLY "YES" or "NO":"""
response = self.client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": judge_prompt}],
max_tokens=5,
temperature=0.1, # Low temperature for consistent judging
)
result = response.choices[0].message.content.strip().upper()
logger.info(f"Goal judge result for '{goal.description}': {result}")
return result == "YES"
except Exception as e:
logger.error(f"Error in goal completion judge: {str(e)}")
return False
def are_all_goals_completed(self) -> bool:
"""Check if all goals are completed."""
return all(goal.completed for goal in self.goal_progress)
def get_goal_status(self) -> Dict[str, Any]:
"""Get current goal status."""
return {
"scenario_goal": self.current_personality.scenario_goal if self.current_personality else "",
"goal_items": [
{
"id": goal.id,
"description": goal.description,
"completed": goal.completed
} for goal in self.goal_progress
],
"all_completed": self.are_all_goals_completed()
}
async def get_response(self, user_message: str, context: str = "") -> str:
"""Get AI response to user message using current personality and conversation history."""
try:
# Use current personality or default
if not self.current_personality:
default_personality = self.get_personality_for_scenario("warung", "pak_budi")
self.set_personality(default_personality)
system_prompt = self.current_personality.get_system_prompt(context)
# Build messages with conversation history
messages = [{"role": "system", "content": system_prompt}]
# Add conversation history (keep last 15 exchanges for better chitchat context)
recent_history = self.conversation_history[-15:] if len(self.conversation_history) > 15 else self.conversation_history
for exchange in recent_history:
messages.append({"role": "user", "content": exchange["user"]})
messages.append({"role": "assistant", "content": exchange["assistant"]})
# Add current user message
messages.append({"role": "user", "content": user_message})
logger.info(f"Sending {len(messages)} messages to AI:")
for i, msg in enumerate(messages):
if msg["role"] == "system":
logger.info(f" {i}: SYSTEM (length: {len(msg['content'])})")
else:
logger.info(f" {i}: {msg['role'].upper()}: '{msg['content']}'")
response = self.client.chat.completions.create(
model=self.model,
messages=messages,
max_tokens=250,
temperature=0.7,
)
ai_response = response.choices[0].message.content
self.conversation_history.append({
"user": user_message,
"assistant": ai_response
})
await self.check_goal_completion(user_message, ai_response)
logger.info(f"Conversation history length: {len(self.conversation_history)}")
if len(self.conversation_history) > 0:
logger.info(f"Last exchange - User: '{self.conversation_history[-1]['user']}', AI: '{self.conversation_history[-1]['assistant']}'")
if self.goal_progress:
completed_goals = [g.description for g in self.goal_progress if g.completed]
logger.info(f"Completed goals: {completed_goals}")
logger.info(f"All goals completed: {self.are_all_goals_completed()}")
return ai_response
except Exception as e:
return f"Maaf, ada error: {str(e)}"
class ConversationFlowService:
def __init__(self):
self.stt_service = SpeechToTextService()
self.tts_service = TextToSpeechService()
self.ai_service = AIConversationService()
def set_scenario_personality(self, scenario: str, character_name: str = None):
"""Set the personality based on scenario and character."""
personality = self.ai_service.get_personality_for_scenario(scenario, character_name)
if not self.ai_service.current_personality or self.ai_service.current_personality.name != personality.name:
logger.info(f"Setting new personality: {personality.name}")
self.ai_service.set_personality(personality)
logger.info("Goal progress initialized for new personality")
else:
logger.info(f"Keeping existing personality: {personality.name}")
async def process_conversation_flow(self, transcribed_text: str, scenario_context: str = "") -> Dict[str, Any]:
"""Process the complete conversation flow: Text → AI → Speech."""
try:
scenario = self.extract_scenario_from_context(scenario_context)
if scenario:
self.set_scenario_personality(scenario)
ai_response = await self.ai_service.get_response(transcribed_text, scenario_context)
gender = self.ai_service.current_personality.gender.value if self.ai_service.current_personality else "female"
personality_name = self.ai_service.current_personality.name if self.ai_service.current_personality else "Unknown"
logger.info(f"Generating TTS for character '{personality_name}' with text: '{ai_response}' and gender: '{gender}'")
audio_content = await self.tts_service.synthesize_speech(ai_response, gender, personality_name)
logger.info(f"TTS generation successful, audio length: {len(audio_content)} bytes")
audio_base64 = base64.b64encode(audio_content).decode('utf-8')
goal_status = self.ai_service.get_goal_status()
return {
"type": "ai_response",
"text": ai_response,
"audio": audio_base64,
"audio_format": "mp3",
"character": self.ai_service.current_personality.name if self.ai_service.current_personality else "Unknown",
"goal_status": goal_status,
"conversation_complete": goal_status.get("all_completed", False)
}
except Exception as e:
return {
"type": "error",
"message": f"Conversation flow error: {str(e)}"
}
def extract_scenario_from_context(self, context: str) -> str:
"""Extract scenario type from context string."""
logger.info(f"Extracting scenario from context: '{context}'")
context_lower = context.lower()
detected_scenario = None
if "coffee_shop" in context_lower or "coffee" in context_lower:
detected_scenario = "coffee_shop"
elif "warung" in context_lower or "nasi goreng" in context_lower:
detected_scenario = "warung"
elif "ojek" in context_lower or "mall" in context_lower:
detected_scenario = "ojek"
elif "alfamart" in context_lower or "indomie" in context_lower:
detected_scenario = "alfamart"
else:
detected_scenario = "warung" # Default to warung
logger.info(f"Detected scenario: '{detected_scenario}'")
return detected_scenario