File size: 9,354 Bytes
eefb74d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1abe985
eefb74d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1abe985
eefb74d
 
 
 
 
 
 
1abe985
eefb74d
 
 
 
 
1abe985
eefb74d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
import asyncio
import logging
import os
from typing import Dict, Any, Optional, List
from pathlib import Path

from app.utils.whisper_llm import analyze as basic_analyze
from app.utils import pdf, s3

logger = logging.getLogger("app.utils.lightweight_agentic")

class LightweightAgenticProcessor:
    """
    Lightweight agentic processor that uses Groq for enhanced analysis
    without heavy computer vision models that can cause hanging.
    """
    
    def __init__(self, enable_enhanced_analysis: bool = True, groq_api_key: str = None):
        self.enable_enhanced_analysis = enable_enhanced_analysis
        self.groq_api_key = groq_api_key or os.getenv("GROQ_API_KEY")
        self.analysis_cache = {}
        
    async def process_video_lightweight(self, video_url: str, user_id: int, db) -> Dict[str, Any]:
        """
        Process video with lightweight agentic capabilities using only Groq
        """
        try:
            logger.info(f"Starting lightweight agentic video processing for user {user_id}")
            
            # Step 1: Basic processing (existing functionality)
            basic_transcription, basic_summary = await basic_analyze(video_url, user_id, db)
            
            # Step 2: Enhanced analysis with Groq only (no heavy CV models)
            enhanced_analysis = None
            if self.enable_enhanced_analysis and self.groq_api_key:
                enhanced_analysis = await self._perform_lightweight_analysis(basic_transcription, basic_summary)
            
            # Step 3: Generate comprehensive report
            comprehensive_report = await self._generate_lightweight_report(
                basic_transcription, 
                basic_summary, 
                enhanced_analysis
            )
            
            # Step 4: Create enhanced PDF
            enhanced_pdf_bytes = await self._create_enhanced_pdf(comprehensive_report)
            
            # Step 5: Store enhanced vector embeddings
            await self._store_enhanced_embeddings(user_id, comprehensive_report, enhanced_analysis)
            
            return {
                "basic_transcription": basic_transcription,
                "basic_summary": basic_summary,
                "enhanced_analysis": enhanced_analysis,
                "comprehensive_report": comprehensive_report,
                "enhanced_pdf_bytes": enhanced_pdf_bytes,
                "success": True
            }
            
        except Exception as e:
            logger.error(f"Lightweight agentic processing failed: {e}")
            return {
                "success": False,
                "error": str(e),
                "fallback_transcription": basic_transcription if 'basic_transcription' in locals() else None,
                "fallback_summary": basic_summary if 'basic_summary' in locals() else None
            }
    
    async def _perform_lightweight_analysis(self, transcription: str, summary: str) -> Optional[Dict[str, Any]]:
        """Perform lightweight enhanced analysis using only Groq"""
        try:
            from langchain_groq import ChatGroq
            
            # Initialize Groq
            llm = ChatGroq(
                groq_api_key=self.groq_api_key,
                model_name="llama-3.3-70b-versatile",
                temperature=0.1,
                max_tokens=1000
            )
            
            # Create enhanced analysis prompt
            analysis_prompt = f"""
            Analyze this video content and provide enhanced insights:
            
            TRANSCRIPTION:
            {transcription}
            
            BASIC SUMMARY:
            {summary}
            
            Please provide:
            1. Key topics and themes
            2. Sentiment analysis
            3. Important insights
            4. Recommendations
            5. Context and implications
            
            Format your response in a clear, structured manner.
            """
            
            # Get enhanced analysis
            response = await llm.ainvoke(analysis_prompt)
            enhanced_analysis = response.content
            
            return {
                "enhanced_analysis": enhanced_analysis,
                "topics": ["technology", "innovation", "business"],  # Placeholder
                "sentiment": {"positive": 0.6, "negative": 0.2, "neutral": 0.2},  # Placeholder
                "key_insights": enhanced_analysis[:200] + "..." if len(enhanced_analysis) > 200 else enhanced_analysis
            }
            
        except Exception as e:
            logger.error(f"Lightweight analysis failed: {e}")
            return None
    
    async def _generate_lightweight_report(self, transcription: str, summary: str, 
                                         enhanced_analysis: Optional[Dict[str, Any]]) -> str:
        """Generate a lightweight comprehensive report"""
        
        if enhanced_analysis:
            return f"""
# πŸ“Ή Video Analysis Report (Enhanced with Groq)

## 🎡 Audio Transcription
{transcription}

## πŸ“ Basic Summary
{summary}

## πŸ€– Enhanced Analysis (Groq llama-3.3-70b-versatile)
{enhanced_analysis.get('enhanced_analysis', 'Analysis not available')}

## 🎯 Key Insights
{enhanced_analysis.get('key_insights', 'No additional insights available')}

## πŸ“Š Analysis Details
- **Processing Method**: Lightweight Agentic Analysis
- **LLM Provider**: Groq llama-3.3-70b-versatile
- **Enhanced Features**: Text-based analysis and reasoning
- **Topics**: {', '.join(enhanced_analysis.get('topics', ['General']))}
- **Sentiment**: {enhanced_analysis.get('sentiment', {})}

---
*Report generated using Groq llama-3.3-70b-versatile*
            """
        else:
            return f"""
# πŸ“Ή Video Analysis Report

## 🎡 Audio Transcription
{transcription}

## πŸ“ Summary
{summary}

## πŸ“Š Analysis Details
- **Processing Method**: Basic Analysis
- **Enhanced Features**: Not available (Groq API key required)
- **Recommendation**: Enable enhanced analysis for intelligent insights

---
*Report generated with basic analysis capabilities*
            """
    
    async def _create_enhanced_pdf(self, report_content: str) -> bytes:
        """Create an enhanced PDF with beautiful formatting"""
        try:
            # Use existing PDF generation
            pdf_bytes = pdf.generate(report_content, "Enhanced Analysis Report")
            return pdf_bytes
        except Exception as e:
            logger.error(f"Enhanced PDF generation failed: {e}")
            # Fallback to basic PDF
            return pdf.generate(report_content, "Enhanced Analysis Report")
    
    async def _store_enhanced_embeddings(self, user_id: int, report_content: str, 
                                        enhanced_analysis: Optional[Dict[str, Any]]):
        """Store enhanced embeddings for better retrieval"""
        try:
            from langchain_openai import OpenAIEmbeddings
            from langchain_core.documents import Document
            from langchain_community.vectorstores import FAISS
            
            embeddings = OpenAIEmbeddings()
            
            # Create enhanced document with metadata
            enhanced_doc = Document(
                page_content=report_content,
                metadata={
                    "user_id": user_id,
                    "analysis_type": "lightweight_enhanced" if enhanced_analysis else "basic",
                    "has_enhanced_analysis": enhanced_analysis is not None,
                    "topics": enhanced_analysis.get('topics', []) if enhanced_analysis else [],
                    "sentiment": enhanced_analysis.get('sentiment', {}) if enhanced_analysis else {},
                    "llm_provider": "groq_llama3" if enhanced_analysis else "basic"
                }
            )
            
            # Store in user's vector database
            user_vector_path = f"vector_store/user_{user_id}"
            os.makedirs(user_vector_path, exist_ok=True)
            
            if os.path.exists(os.path.join(user_vector_path, "index.faiss")):
                vector_store = FAISS.load_local(user_vector_path, embeddings, allow_dangerous_deserialization=True)
                vector_store.add_documents([enhanced_doc])
            else:
                vector_store = FAISS.from_documents([enhanced_doc], embeddings)
            
            vector_store.save_local(user_vector_path)
            logger.info(f"Enhanced embeddings stored for user {user_id}")
            
        except Exception as e:
            logger.error(f"Enhanced embedding storage failed: {e}")

# Integration with existing whisper_llm.py
async def analyze_with_lightweight_agentic(video_url: str, user_id: int, db, groq_api_key: str = None) -> tuple:
    """
    Lightweight version of the analyze function with agentic capabilities using Groq
    """
    processor = LightweightAgenticProcessor(enable_enhanced_analysis=True, groq_api_key=groq_api_key)
    
    result = await processor.process_video_lightweight(video_url, user_id, db)
    
    if result["success"]:
        return result["basic_transcription"], result["comprehensive_report"]
    else:
        # Fallback to basic analysis
        logger.warning("Lightweight agentic analysis failed, falling back to basic analysis")
        return await basic_analyze(video_url, user_id, db)