File size: 14,130 Bytes
dfbb2da
 
 
 
b4562f5
 
 
dfbb2da
 
 
 
 
 
 
 
b4562f5
 
 
 
 
 
 
 
 
dfbb2da
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b4562f5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
dfbb2da
 
b4562f5
dfbb2da
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b4562f5
 
 
 
dfbb2da
 
 
 
 
 
 
 
 
b4562f5
dfbb2da
 
 
 
 
b4562f5
 
 
 
 
 
 
 
 
 
 
 
dfbb2da
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
"""
Transcript Service for YouTube Videos

This service extracts transcripts from YouTube videos using multiple methods:
1. First, try youtube_transcript_api (works well on cloud platforms)
2. Then try yt-dlp subtitle extraction
3. If no subtitles available, fallback to audio extraction + Whisper transcription

The fallback uses the SpeechToTextService for local Whisper transcription.
"""

import re
import os
import tempfile
import logging
from typing import Optional, Tuple, List

# Try to import youtube_transcript_api (more reliable for cloud deployments)
try:
    from youtube_transcript_api import YouTubeTranscriptApi
    from youtube_transcript_api._errors import TranscriptsDisabled, NoTranscriptFound
    HAS_YOUTUBE_TRANSCRIPT_API = True
except ImportError:
    HAS_YOUTUBE_TRANSCRIPT_API = False

import yt_dlp

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


class TranscriptService:
    """
    Service for extracting transcripts from YouTube videos.
    
    Supports two methods:
    1. Subtitle extraction (fast, no ML models)
    2. Audio transcription via Whisper (slower, requires SpeechToTextService)
    """
    
    def __init__(self):
        """Initialize the transcript service."""
        self._speech_to_text = None  # Lazy-loaded
    
    def _get_speech_to_text_service(self):
        """Lazy-load the SpeechToTextService to avoid loading Whisper unless needed."""
        if self._speech_to_text is None:
            from services.speech_to_text import SpeechToTextService
            self._speech_to_text = SpeechToTextService()
        return self._speech_to_text
    
    def extract_video_id(self, url: str) -> str:
        """
        Extract video ID from YouTube URL.
        
        Args:
            url: YouTube URL in various formats
            
        Returns:
            11-character video ID
            
        Raises:
            ValueError: If URL is invalid
        """
        regex = r"(?:v=|\/|youtu\.be\/)([0-9A-Za-z_-]{11}).*"
        match = re.search(regex, url)
        if match:
            return match.group(1)
        raise ValueError("Invalid YouTube URL")
    
    def clean_autogen_transcript(self, text: str) -> str:
        """
        Clean auto-generated YouTube captions.
        
        Removes:
        - <c>...</c> tags
        - Timestamps like <00:00:06.480>
        - Multiple spaces
        
        Args:
            text: Raw VTT subtitle text
            
        Returns:
            Cleaned transcript text
        """
        # Remove <c>...</c> tags
        text = re.sub(r"</?c>", "", text)
        
        # Remove timestamps like <00:00:06.480>
        text = re.sub(r"<\d{2}:\d{2}:\d{2}\.\d{3}>", "", text)
        
        # Collapse multiple spaces
        text = re.sub(r"\s+", " ", text).strip()
        
        return text
    
    def get_transcript_api(self, video_id: str) -> Optional[dict]:
        """
        Get transcript using youtube_transcript_api (works better on cloud platforms).
        
        Args:
            video_id: YouTube video ID
            
        Returns:
            Dictionary with transcript and language, or None if not available
        """
        if not HAS_YOUTUBE_TRANSCRIPT_API:
            logger.info("youtube_transcript_api not installed, skipping...")
            return None
        
        try:
            # Try to get transcript in preferred languages
            preferred_langs = ['en', 'en-IN', 'hi', 'ta', 'te', 'kn', 'ml', 'gu', 'bn', 'mr', 'pa', 'ur']
            
            try:
                transcript_list = YouTubeTranscriptApi.list_transcripts(video_id)
                
                # Try to find a manual transcript first, then auto-generated
                transcript = None
                detected_lang = "eng"
                
                # First try manual transcripts
                for lang in preferred_langs:
                    try:
                        transcript = transcript_list.find_manually_created_transcript([lang])
                        detected_lang = lang
                        break
                    except:
                        pass
                
                # Then try auto-generated
                if not transcript:
                    for lang in preferred_langs:
                        try:
                            transcript = transcript_list.find_generated_transcript([lang])
                            detected_lang = lang
                            break
                        except:
                            pass
                
                # If still no transcript, try to get any available
                if not transcript:
                    for t in transcript_list:
                        transcript = t
                        detected_lang = t.language_code
                        break
                
                if transcript:
                    # Fetch the actual transcript
                    transcript_data = transcript.fetch()
                    
                    # Combine all text
                    text_parts = [entry['text'] for entry in transcript_data]
                    full_text = ' '.join(text_parts)
                    
                    # Clean the text
                    clean_text = self.clean_autogen_transcript(full_text)
                    
                    if len(clean_text.strip()) < 50:
                        logger.info("Transcript too short")
                        return None
                    
                    # Normalize language code
                    lang_map = {
                        "en": "eng", "en-IN": "eng", "en-US": "eng", "en-GB": "eng",
                        "hi": "hin", "hi-IN": "hin",
                        "ta": "tam", "ta-IN": "tam",
                        "te": "tel", "te-IN": "tel",
                        "kn": "kan", "kn-IN": "kan",
                        "ml": "mal", "ml-IN": "mal",
                        "gu": "guj", "gu-IN": "guj",
                        "bn": "ben", "bn-IN": "ben",
                        "mr": "mar", "mr-IN": "mar",
                        "pa": "pan", "pa-IN": "pan",
                        "ur": "urd", "ur-PK": "urd",
                    }
                    normalized_lang = lang_map.get(detected_lang, detected_lang)
                    
                    logger.info(f"Transcript fetched via API (language: {normalized_lang})")
                    
                    return {
                        "transcript": clean_text,
                        "language": normalized_lang,
                        "source": "youtube_api",
                        "word_count": len(clean_text.split())
                    }
                    
            except TranscriptsDisabled:
                logger.info("Transcripts are disabled for this video")
                return None
            except NoTranscriptFound:
                logger.info("No transcript found for this video")
                return None
                
        except Exception as e:
            logger.warning(f"youtube_transcript_api failed: {e}")
            return None
        
        return None
    
    def get_subtitles(self, url: str, lang: str = "en") -> Optional[dict]:
        """
        Try to get existing subtitles from YouTube using yt-dlp.
        
        Args:
            url: YouTube video URL
            lang: Preferred language code (default: "en")
            
        Returns:
            Dictionary with transcript and language, or None if no subtitles
        """
        with tempfile.TemporaryDirectory() as temp_dir:
            ydl_opts = {
                "skip_download": True,
                "writesubtitles": True,
                "writeautomaticsub": True,
                "subtitlesformat": "vtt",
                "outtmpl": os.path.join(temp_dir, "%(id)s.%(ext)s"),
                "quiet": True,
            }
            
            try:
                with yt_dlp.YoutubeDL(ydl_opts) as ydl:
                    info = ydl.extract_info(url, download=False)
                    ydl.download([url])
                    
                    # Find subtitle file
                    video_id = info["id"]
                    sub_file = None
                    detected_lang = "eng"
                    
                    for file in os.listdir(temp_dir):
                        if file.startswith(video_id) and file.endswith(".vtt"):
                            sub_file = os.path.join(temp_dir, file)
                            # Try to extract language from filename
                            # Format: videoId.lang.vtt
                            parts = file.split(".")
                            if len(parts) >= 3:
                                detected_lang = parts[-2]
                            break
                    
                    if not sub_file:
                        logger.info("No subtitle file found")
                        return None
                    
                    # Read and clean VTT file
                    lines = []
                    with open(sub_file, "r", encoding="utf-8") as f:
                        for line in f:
                            line = line.strip()
                            if not line:
                                continue
                            if line.startswith("WEBVTT"):
                                continue
                            if "-->" in line:
                                continue
                            if re.match(r"^\d+$", line):
                                continue
                            lines.append(line)
                    
                    raw_text = " ".join(lines)
                    clean_text = self.clean_autogen_transcript(raw_text)
                    
                    if not clean_text or len(clean_text.strip()) < 50:
                        logger.info("Extracted subtitles too short")
                        return None
                    
                    # Map common language codes
                    lang_map = {
                        "en": "eng", "en-US": "eng", "en-GB": "eng",
                        "hi": "hin", "hi-IN": "hin",
                        "ta": "tam", "ta-IN": "tam",
                        "te": "tel", "te-IN": "tel",
                        "kn": "kan", "kn-IN": "kan",
                        "ml": "mal", "ml-IN": "mal",
                        "gu": "guj", "gu-IN": "guj",
                        "bn": "ben", "bn-IN": "ben",
                        "mr": "mar", "mr-IN": "mar",
                        "pa": "pan", "pa-IN": "pan",
                        "ur": "urd", "ur-PK": "urd",
                    }
                    
                    normalized_lang = lang_map.get(detected_lang, detected_lang)
                    
                    logger.info(f"Subtitles extracted successfully (language: {normalized_lang})")
                    
                    return {
                        "transcript": clean_text,
                        "language": normalized_lang,
                        "source": "subtitles",
                        "word_count": len(clean_text.split())
                    }
                    
            except Exception as e:
                logger.warning(f"Subtitle extraction failed: {e}")
                return None
    
    def get_video_transcript(self, url: str, use_whisper_fallback: bool = True) -> dict:
        """
        Get transcript from a YouTube video.
        
        Tries multiple methods in order:
        1. youtube_transcript_api (works best on cloud platforms)
        2. yt-dlp subtitle extraction
        3. Whisper transcription (fallback)
        
        Args:
            url: YouTube video URL
            use_whisper_fallback: Whether to use Whisper if no subtitles (default: True)
            
        Returns:
            Dictionary with:
                - transcript: The transcript text
                - language: Detected/extracted language code
                - source: "youtube_api", "subtitles", or "whisper"
                - word_count: Number of words
                
        Raises:
            Exception: If transcript cannot be obtained
        """
        # Extract video ID for API-based methods
        video_id = self.extract_video_id(url)
        
        # Method 1: Try youtube_transcript_api first (best for cloud platforms)
        logger.info("Attempting to get transcript via YouTube API...")
        result = self.get_transcript_api(video_id)
        
        if result:
            return result
        
        # Method 2: Try yt-dlp subtitle extraction
        logger.info("Attempting to get subtitles via yt-dlp...")
        result = self.get_subtitles(url)
        
        if result:
            return result
        
        # Fallback to Whisper transcription
        if use_whisper_fallback:
            logger.info("No subtitles found. Falling back to Whisper transcription...")
            
            try:
                stt_service = self._get_speech_to_text_service()
                whisper_result = stt_service.transcribe_youtube_video(url)
                
                return {
                    "transcript": whisper_result["text"],
                    "language": whisper_result["language"],
                    "source": "whisper",
                    "word_count": whisper_result["word_count"]
                }
                
            except Exception as e:
                logger.error(f"Whisper transcription failed: {e}")
                raise Exception(f"Could not retrieve transcript: {str(e)}")
        
        raise Exception("No subtitles available and Whisper fallback is disabled")
    
    def get_video_transcript_legacy(self, url: str, lang: str = "en") -> str:
        """
        Legacy method for backward compatibility.
        Returns only the transcript text (no language info).
        """
        result = self.get_video_transcript(url, use_whisper_fallback=True)
        return result["transcript"]