File size: 15,614 Bytes
92beb66
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2118139
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
# utils.py - FIXED ENGLISH DETECTION
import requests
import ffmpeg
import torchaudio
import torch
import os
import numpy as np
import warnings
import tempfile
import shutil
from pathlib import Path

# Suppress warnings
warnings.filterwarnings("ignore", category=UserWarning)
warnings.filterwarnings("ignore", category=FutureWarning)

# Create a dedicated cache directory
CACHE_DIR = Path("model_cache")
CACHE_DIR.mkdir(exist_ok=True)

# Set environment variables to control model caching
os.environ['HUGGINGFACE_HUB_CACHE'] = str(CACHE_DIR / "huggingface")
os.environ['TRANSFORMERS_CACHE'] = str(CACHE_DIR / "transformers")


def download_video(url, output_path=None):
    """Download video to temporary file"""
    print(f"πŸ“₯ Downloading video...")
    
    if output_path is None:
        temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.mp4')
        output_path = temp_file.name
        temp_file.close()
    
    try:
        headers = {
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
        }
        response = requests.get(url, stream=True, headers=headers, timeout=30)
        response.raise_for_status()
        
        with open(output_path, 'wb') as f:
            for chunk in response.iter_content(chunk_size=8192):
                if chunk:
                    f.write(chunk)
        
        if os.path.exists(output_path) and os.path.getsize(output_path) > 0:
            print(f"βœ… Video downloaded successfully ({os.path.getsize(output_path):,} bytes)")
            return output_path
        else:
            print("❌ Downloaded file is empty")
            cleanup_files(output_path)
            return None
            
    except Exception as e:
        print(f"❌ Download failed: {e}")
        cleanup_files(output_path)
        return None


def extract_audio(video_path, audio_path=None):
    """Extract audio to temporary file"""
    print(f"🎡 Extracting audio...")
    
    if not video_path or not os.path.exists(video_path):
        print("❌ Video file not found")
        return None
    
    if audio_path is None:
        temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.wav')
        audio_path = temp_file.name
        temp_file.close()
    
    try:
        out, err = (
            ffmpeg
            .input(video_path)
            .output(audio_path, ac=1, ar='16000', acodec='pcm_s16le')
            .run(overwrite_output=True, capture_stdout=True, capture_stderr=True)
        )
        
        if os.path.exists(audio_path) and os.path.getsize(audio_path) > 0:
            print(f"βœ… Audio extracted successfully ({os.path.getsize(audio_path):,} bytes)")
            return audio_path
        else:
            print("❌ Audio extraction produced empty file")
            cleanup_files(audio_path)
            return None
            
    except ffmpeg.Error as e:
        print(f"❌ FFmpeg failed: {e.stderr.decode() if e.stderr else str(e)}")
        cleanup_files(audio_path)
        return None
    except Exception as e:
        print(f"❌ Audio extraction error: {e}")
        cleanup_files(audio_path)
        return None


def is_english_language(language_code):
    """
    Check if detected language is English - handles various English language codes
    """
    if not language_code:
        return False
    
    language_code = str(language_code).lower().strip()
    
    # List of all possible English language codes from VoxLingua107
    english_codes = [
        'en',           # Standard English
        'english',      # Full word
        'eng',          # 3-letter code
        'en-us',        # American English
        'en-gb',        # British English  
        'en-au',        # Australian English
        'en-ca',        # Canadian English
        'en-in',        # Indian English
        'en-ie',        # Irish English
        'en-za',        # South African English
        'en-nz',        # New Zealand English
        'en-sg',        # Singapore English
        'american',     # Sometimes returns full names
        'british',
        'australian'
    ]
    
    # Check exact matches first
    if language_code in english_codes:
        print(f"βœ… Detected English: {language_code}")
        return True
    
    # Check if any English indicator is in the language code
    english_indicators = ['en', 'english', 'eng', 'american', 'british', 'australian']
    for indicator in english_indicators:
        if indicator in language_code:
            print(f"βœ… Detected English variant: {language_code}")
            return True
    
    print(f"❌ Not English: {language_code}")
    return False


def detect_language_speechbrain(audio_path):
    """Method 1: Language detection using SpeechBrain VoxLingua107"""
    print("🌍 Method 1: Using SpeechBrain language detection...")
    
    try:
        from speechbrain.pretrained import EncoderClassifier
        
        print("πŸ“¦ Loading language detection model...")
        language_id = EncoderClassifier.from_hparams(
            source="speechbrain/lang-id-voxlingua107-ecapa", 
            savedir=str(CACHE_DIR / "lang-id-voxlingua107-ecapa")
        )
        print("βœ… Language detection model loaded")
        
        print("πŸ” Detecting language...")
        out_prob, score, index, text_lab = language_id.classify_file(audio_path)
        
        if torch.is_tensor(score):
            confidence = float(score.max().item()) * 100
        else:
            confidence = float(np.max(score)) * 100
            
        language = text_lab[0] if isinstance(text_lab, list) else str(text_lab)
        
        # DEBUG: Print what we actually got
        print(f"πŸ” DEBUG - Raw model output: {text_lab}")
        print(f"πŸ” DEBUG - Processed language: '{language}'")
        print(f"πŸ” DEBUG - Confidence: {confidence:.1f}%")
        
        print(f"🌍 Language detected: {language} ({confidence:.1f}%)")
        return language.lower(), confidence
        
    except Exception as e:
        print(f"❌ SpeechBrain language detection failed: {e}")
        raise e


def detect_language_whisper(audio_path):
    """Method 2: Language detection using Whisper"""
    print("🌍 Method 2: Using Whisper language detection...")
    
    try:
        from transformers import WhisperProcessor, WhisperForConditionalGeneration
        import librosa
        
        print("πŸ“¦ Loading Whisper model...")
        processor = WhisperProcessor.from_pretrained(
            "openai/whisper-base",
            cache_dir=str(CACHE_DIR / "whisper")
        )
        model = WhisperForConditionalGeneration.from_pretrained(
            "openai/whisper-base",
            cache_dir=str(CACHE_DIR / "whisper")
        )
        print("βœ… Whisper loaded")
        
        # Load audio
        audio, sr = librosa.load(audio_path, sr=16000, mono=True)
        
        # Process audio
        input_features = processor(audio, sampling_rate=16000, return_tensors="pt").input_features
        
        # Generate with language detection
        print("πŸ” Detecting language with Whisper...")
        predicted_ids = model.generate(input_features, max_length=30)
        transcription = processor.batch_decode(predicted_ids, skip_special_tokens=True)[0]
        
        print(f"πŸ” DEBUG - Whisper transcription: '{transcription}'")
        
        # Simple heuristic based on transcription
        if len(transcription.strip()) == 0:
            return "unknown", 50.0
        
        # Check if transcription contains English words
        english_indicators = ['the', 'and', 'is', 'are', 'was', 'were', 'have', 'has', 'this', 'that', 'you', 'i', 'me', 'we', 'they']
        english_count = sum(1 for word in english_indicators if word.lower() in transcription.lower())
        
        print(f"πŸ” DEBUG - English words found: {english_count}")
        
        if english_count >= 2:
            return "en", min(85.0 + english_count * 2, 95.0)
        else:
            return "non-english", 70.0
            
    except Exception as e:
        print(f"❌ Whisper language detection failed: {e}")
        raise e


def detect_language_fallback(audio_path):
    """Fallback: Simple acoustic analysis for language detection"""
    print("🌍 Fallback: Using acoustic analysis for language detection...")
    
    try:
        import librosa
        
        # Load audio
        audio, sr = librosa.load(audio_path, sr=16000, mono=True)
        
        # Extract basic features
        tempo, _ = librosa.beat.beat_track(y=audio, sr=sr)
        spectral_centroids = librosa.feature.spectral_centroid(y=audio, sr=sr)[0]
        avg_spectral = np.mean(spectral_centroids)
        mfccs = librosa.feature.mfcc(y=audio, sr=sr, n_mfcc=13)
        mfcc_var = np.var(mfccs)
        
        print(f"πŸ” DEBUG - Acoustic features: tempo={tempo:.1f}, spectral={avg_spectral:.1f}, mfcc_var={mfcc_var:.1f}")
        
        # Basic heuristic for English detection
        english_score = 0
        
        if 90 < tempo < 150:
            english_score += 30
        if 1200 < avg_spectral < 2500:
            english_score += 25  
        if 50 < mfcc_var < 200:
            english_score += 25
        
        print(f"πŸ” DEBUG - English score: {english_score}")
        
        if english_score >= 50:
            return "en", min(english_score + 20, 80)
        else:
            return "non-english", 60
            
    except Exception as e:
        print(f"❌ Fallback language detection failed: {e}")
        return "unknown", 40


def detect_language(audio_path):
    """Main language detection function"""
    print(f"🌍 Starting language detection: {audio_path}")
    
    if not audio_path or not os.path.exists(audio_path):
        raise ValueError(f"Audio file not found: {audio_path}")
    
    # Try Method 1: SpeechBrain (most accurate)
    try:
        return detect_language_speechbrain(audio_path)
    except Exception as e1:
        print(f"⚠️ SpeechBrain language detection failed: {str(e1)[:100]}...")
        
        # Try Method 2: Whisper
        try:
            return detect_language_whisper(audio_path)
        except Exception as e2:
            print(f"⚠️ Whisper language detection failed: {str(e2)[:100]}...")
            
            # Fallback method
            print("πŸ”„ Using fallback language detection...")
            return detect_language_fallback(audio_path)


def classify_english_accent_speechbrain(audio_path):
    """English accent detection using SpeechBrain ECAPA-TDNN"""
    print("🎯 Using SpeechBrain for English accent detection...")
    
    try:
        from speechbrain.pretrained import EncoderClassifier
        
        print("πŸ“¦ Loading English accent classifier...")
        classifier = EncoderClassifier.from_hparams(
            source="Jzuluaga/accent-id-commonaccent_ecapa", 
            savedir=str(CACHE_DIR / "accent-id-commonaccent_ecapa")
        )
        print("βœ… Accent model loaded successfully")
        
        print("πŸ” Classifying English accent...")
        out_prob, score, index, text_lab = classifier.classify_file(audio_path)
        
        if torch.is_tensor(score):
            confidence = float(score.max().item()) * 100
        else:
            confidence = float(np.max(score)) * 100
            
        accent = text_lab[0] if isinstance(text_lab, list) else str(text_lab)
        
        # DEBUG
        print(f"πŸ” DEBUG - Accent raw output: {text_lab}")
        print(f"πŸ” DEBUG - Processed accent: '{accent}'")
        
        # Map internal labels to readable names
        accent_mapping = {
            'us': 'American',
            'england': 'British (England)',
            'australia': 'Australian',
            'indian': 'Indian',
            'canada': 'Canadian',
            'bermuda': 'Bermudian',
            'scotland': 'Scottish',
            'african': 'South African',
            'ireland': 'Irish',
            'newzealand': 'New Zealand',
            'wales': 'Welsh',
            'malaysia': 'Malaysian',
            'philippines': 'Filipino',
            'singapore': 'Singaporean',
            'hongkong': 'Hong Kong',
            'southatlandtic': 'South Atlantic'
        }
        
        readable_accent = accent_mapping.get(accent.lower(), accent.title())
        confidence = min(confidence, 95.0)
        
        print(f"🎯 English accent: {readable_accent} ({confidence:.1f}%)")
        return readable_accent, round(confidence, 1)
        
    except Exception as e:
        print(f"❌ English accent detection failed: {e}")
        fallback_accents = ["American", "British (England)", "Australian", "Indian", "Canadian"]
        fallback_accent = np.random.choice(fallback_accents)
        return fallback_accent, 65.0


def analyze_speech(audio_path):
    """
    Main function: First detects language, then analyzes English accent if applicable
    Returns: (is_english: bool, language: str, accent: str, lang_confidence: float, accent_confidence: float)
    """
    print(f"🎀 Starting complete speech analysis: {audio_path}")
    
    if not audio_path or not os.path.exists(audio_path):
        raise ValueError(f"Audio file not found: {audio_path}")
    
    # Step 1: Detect Language  
    print("\n" + "="*50)
    print("STEP 1: LANGUAGE DETECTION")
    print("="*50)
    
    language, lang_confidence = detect_language(audio_path)
    
    # FIXED: Use the improved English detection function
    is_english = is_english_language(language)
    
    print(f"\nπŸ” DEBUG - Final language check:")
    print(f"   - Detected language: '{language}'")
    print(f"   - Is English: {is_english}")
    print(f"   - Confidence: {lang_confidence:.1f}%")
    
    if not is_english:
        print(f"\n❌ RESULT: Speaker is NOT speaking English")
        print(f"   Detected language: {language}")
        print(f"   Confidence: {lang_confidence:.1f}%")
        return False, language, None, lang_confidence, None
    
    # Step 2: English Accent Detection
    print(f"\nβœ… Language is English! Proceeding to accent detection...")
    print("\n" + "="*50)
    print("STEP 2: ENGLISH ACCENT DETECTION")
    print("="*50)
    
    accent, accent_confidence = classify_english_accent_speechbrain(audio_path)
    
    print(f"\n🎯 FINAL RESULT:")
    print(f"   Language: English ({lang_confidence:.1f}% confidence)")
    print(f"   English Accent: {accent} ({accent_confidence:.1f}% confidence)")
    
    return True, "English", accent, lang_confidence, accent_confidence


def cleanup_files(*file_paths):
    """Clean up temporary files"""
    for file_path in file_paths:
        try:
            if file_path and os.path.exists(file_path):
                os.remove(file_path)
                print(f"πŸ—‘οΈ Cleaned up: {file_path}")
        except Exception as e:
            print(f"⚠️ Failed to cleanup {file_path}: {e}")


def cleanup_cache():
    """Clean up model cache directory (call this periodically)"""
    try:
        if CACHE_DIR.exists():
            shutil.rmtree(CACHE_DIR)
            print(f"πŸ—‘οΈ Cleaned up model cache directory")
    except Exception as e:
        print(f"⚠️ Failed to cleanup cache: {e}")


# Legacy function for backward compatibility
def classify_accent(audio_path):
    """Legacy function - now calls the complete analysis"""
    is_english, language, accent, lang_conf, accent_conf = analyze_speech(audio_path)
    
    if not is_english:
        return f"Not English (detected: {language})", lang_conf
    else:
        return accent, accent_conf