File size: 16,015 Bytes
0c0bf8e
31f5268
0c0bf8e
31f5268
0c0bf8e
31f5268
 
 
 
 
 
 
 
 
0c0bf8e
31f5268
0c0bf8e
 
 
 
 
 
 
31f5268
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0c0bf8e
31f5268
 
 
 
 
 
0c0bf8e
31f5268
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0c0bf8e
31f5268
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0c0bf8e
31f5268
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0c0bf8e
31f5268
 
 
0c0bf8e
31f5268
 
 
 
0c0bf8e
 
31f5268
 
 
 
 
 
 
 
 
 
 
 
 
0c0bf8e
31f5268
 
 
 
0c0bf8e
 
31f5268
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
import gradio as gr
from huggingface_hub import InferenceClient
from GoogleNews import GoogleNews
import logging
import warnings
import textwrap
from tabulate import tabulate
import yfinance as yf
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import requests
from fuzzywuzzy import process
import re

# Suppress warnings
warnings.filterwarnings("ignore", category=UserWarning, module="fuzzywuzzy")

# Set up logging
logging.basicConfig(
    level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)

class FinancialAnalyzer:
    def __init__(self):
        # Load the DeepSeek model directly from Hugging Face Hub
        self.client = InferenceClient("deepseek-ai/DeepSeek-R1-Distill-Qwen-32B") 
        self.ta_config = {
            'rsi_window': 14,
            'macd_fast': 12,
            'macd_slow': 26,
            'macd_signal': 9,
            'bollinger_window': 20,
            'sma_windows': [20, 50, 200],
            'ema_windows': [12, 26],
            'volatility_window': 30
        }
        logging.info("Initialized Financial Analyzer")

    def resolve_ticker_symbol(self, query: str) -> str:
        """Convert company names to valid Yahoo Finance tickers"""
        logging.info(f"Resolving ticker symbol for query: {query}")
        url = "https://query2.finance.yahoo.com/v1/finance/search"
        headers = {"User-Agent": "Mozilla/5.0"}
        params = {"q": query, "quotesCount": 5, "country": "India"}

        try:
            response = requests.get(url, headers=headers, params=params, timeout=10)
            response.raise_for_status()
            data = response.json()

            if not data.get("quotes"):
                raise ValueError(f"No ticker found for: {query}")

            quotes = data["quotes"]
            names = [quote.get("longname") or quote.get("shortname", "") for quote in quotes]

            best_match, score = process.extractOne(query, names)
            if not best_match or score < 60:
                raise ValueError(f"No matching ticker found for: {query}")

            index = names.index(best_match)
            best_quote = quotes[index]
            resolved_ticker = best_quote["symbol"]
            exchange_code = best_quote.get("exchange", "").upper()

            exchange_suffix_map = {
                "NSI": ".NS",  # NSE
                "BOM": ".BO",  # BSE
                "BSE": ".BO",
                "NSE": ".NS",
            }
            suffix = exchange_suffix_map.get(exchange_code, ".NS")

            if not resolved_ticker.endswith(suffix):
                resolved_ticker += suffix

            logging.info(f"Resolved ticker symbol: {resolved_ticker}")
            return resolved_ticker

        except Exception as e:
            logging.error(f"Ticker resolution failed: {str(e)}")
            raise

    def fetch_stock_data(self, ticker):
        """Fetch historical data and technical indicators"""
        logging.info(f"Fetching stock data for ticker: {ticker}")
        try:
            stock = yf.Ticker(ticker)
            history = stock.history(period="1y", interval="1d")

            if history.empty:
                logging.error(f"No data found for {ticker}")
                return {"error": f"No data found for {ticker}"}

            logging.info(f"Successfully fetched stock data for {ticker}")
            return {
                'history': history,
                'current_price': history['Close'].iloc[-1],
                'indicators': self.calculate_technical_indicators(history),
                'info': stock.info
            }
        except Exception as e:
            logging.error(f"Error fetching stock data: {str(e)}")
            return {"error": str(e)}

    def calculate_technical_indicators(self, history):
        """Calculate technical analysis metrics"""
        logging.info("Calculating technical indicators")
        ta = {}

        # RSI
        delta = history['Close'].diff()
        gain = delta.where(delta > 0, 0)
        loss = -delta.where(delta < 0, 0)
        avg_gain = gain.rolling(self.ta_config['rsi_window']).mean()
        avg_loss = loss.rolling(self.ta_config['rsi_window']).mean()
        rs = avg_gain / avg_loss
        ta['rsi'] = 100 - (100 / (1 + rs)).iloc[-1]

        # MACD
        ema_fast = history['Close'].ewm(span=self.ta_config['macd_fast'], adjust=False).mean()
        ema_slow = history['Close'].ewm(span=self.ta_config['macd_slow'], adjust=False).mean()
        macd = ema_fast - ema_slow
        signal = macd.ewm(span=self.ta_config['macd_signal'], adjust=False).mean()
        ta['macd'] = macd.iloc[-1]
        ta['macd_signal'] = signal.iloc[-1]

        # Bollinger Bands
        sma = history['Close'].rolling(self.ta_config['bollinger_window']).mean()
        std = history['Close'].rolling(self.ta_config['bollinger_window']).std()
        ta['bollinger_upper'] = (sma + 2 * std).iloc[-1]
        ta['bollinger_lower'] = (sma - 2 * std).iloc[-1]

        logging.info("Technical indicators calculated")
        return ta

    def generate_price_chart(self, history):
        """Generate interactive price chart"""
        logging.info("Generating price chart")
        try:
            fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(10, 6), sharex=True)

            # Price plot
            history['Close'].plot(ax=ax1, label='Price')
            ax1.set_title('Price Trend')
            ax1.legend()

            # Volume plot
            history['Volume'].plot(ax=ax2, kind='bar', color='skyblue')
            ax2.set_title('Trading Volume')

            plt.tight_layout()
            logging.info("Price chart generated")
            return fig
        except Exception as e:
            logging.error(f"Chart generation failed: {str(e)}")
            return self.create_error_plot("Chart unavailable")

    def create_error_plot(self, message):
        """Create a placeholder plot for error messages"""
        fig, ax = plt.subplots(figsize=(10, 2))
        ax.text(0.5, 0.5, message, 
                ha='center', va='center', 
                fontsize=12, color='red')
        ax.axis('off')
        return fig

    def fetch_articles(self, query):
        """Fetch news articles from Google News"""
        logging.info(f"Fetching news articles for query: {query}")
        try:
            googlenews = GoogleNews(lang="en")
            googlenews.search(query)
            articles = googlenews.result()
            logging.info(f"Fetched {len(articles)} news articles")
            return articles[:5]  # Limit to 5 articles
        except Exception as e:
            logging.error(f"Error fetching articles: {str(e)}")
            return []

    def analyze_article_sentiment(self, article):
        """Analyze article sentiment using DeepSeek model with improved parsing"""
        logging.info(f"Analyzing sentiment for article: {article['title']}")
        prompt = f"""
        Analyze the sentiment and provide a brief analysis of this news article about a financial asset.
        Respond EXACTLY in this format:
        SENTIMENT: [POSITIVE/NEGATIVE/NEUTRAL]
        ANALYSIS: [2-3 sentence analysis]

        Title: {article['title']}
        Description: {article['desc']}
        """

        try:
            response = self.client.chat.completions.create(
                model="deepseek-ai/DeepSeek-R1-Distill-Qwen-32B",
                messages=[{"role": "user", "content": prompt}],
                temperature=0.1,
                max_tokens=150
            )

            response_text = response.choices[0].message.content.strip()
            
            # Improved parsing using regular expressions
            sentiment_match = re.search(r"SENTIMENT:\s*(POSITIVE|NEGATIVE|NEUTRAL)", response_text, re.IGNORECASE)
            analysis_match = re.search(r"ANALYSIS:\s*(.+)$", response_text, re.DOTALL)

            sentiment = "neutral"  # Default value
            if sentiment_match:
                sentiment = sentiment_match.group(1).lower()
            else:
                logging.warning(f"Failed to parse sentiment from response: {response_text}")

            analysis = "Sentiment analysis unavailable"
            if analysis_match:
                analysis = analysis_match.group(1).strip()

            # Validate sentiment value
            if sentiment not in ['positive', 'negative', 'neutral']:
                sentiment = 'neutral'
                logging.warning(f"Invalid sentiment value: {sentiment}")

            logging.info(f"Sentiment analysis complete: {sentiment}")
            return {
                **article,
                "sentiment": sentiment,
                "analysis": analysis
            }

        except Exception as e:
            logging.error(f"Sentiment analysis failed: {str(e)}")
            return {
                **article,
                "sentiment": "neutral",
                "analysis": "Sentiment analysis failed"
            }

    def generate_recommendation(self, articles, stock_data):
        """Generate investment recommendation with fallback values"""
        logging.info("Generating investment recommendation")
        
        # Initialize sentiment scores with default values
        sentiment_scores = {
            'positive': 0,
            'negative': 0,
            'neutral': 0
        }

        for article in articles:
            sentiment = article.get('sentiment', 'neutral')
            if sentiment in sentiment_scores:
                sentiment_scores[sentiment] += 1

        # Technical analysis with fallback values
        ta = stock_data.get('indicators', {})
        price_change = stock_data['history']['Close'].pct_change().iloc[-1] if not stock_data['history'].empty else 0

        # Recommendation logic with safeguards
        recommendation = "HOLD"
        reasons = []

        try:
            rsi = ta.get('rsi', 50)
            if rsi < 30 and sentiment_scores['positive'] > sentiment_scores['negative']:
                recommendation = "BUY"
                reasons.append("Oversold condition with positive news sentiment")
            elif rsi > 70 and sentiment_scores['negative'] > sentiment_scores['positive']:
                recommendation = "SELL"
                reasons.append("Overbought condition with negative news sentiment")
            elif price_change > 0.05 and sentiment_scores['positive'] > 3:
                recommendation = "STRONG BUY"
                reasons.append("Strong positive momentum and news sentiment")
            elif price_change < -0.05 and sentiment_scores['negative'] > 3:
                recommendation = "STRONG SELL"
                reasons.append("Significant downward pressure and negative news")
        except Exception as e:
            logging.error(f"Recommendation logic failed: {str(e)}")
            recommendation = "HOLD"
            reasons.append("Analysis incomplete due to data issues")

        logging.info(f"Recommendation generated: {recommendation}")
        return {
            "recommendation": recommendation,
            "reasons": reasons,
            "sentiment_distribution": sentiment_scores,
            "technical_indicators": ta
        }

def format_analysis_output(analyzer, articles, stock_data, recommendation):
    """Format all analysis components for display with error handling"""
    logging.info("Formatting analysis output")
    
    try:
        # News table
        news_table = []
        for article in articles:
            news_table.append([
                article.get('date', 'N/A'),
                textwrap.fill(article.get('title', 'No title'), 40),
                textwrap.fill(article.get('analysis', 'No analysis'), 60),
                "🟒" if article.get('sentiment') == 'positive' else "πŸ”΄" if article.get('sentiment') == 'negative' else "βšͺ"
            ])

        # Stock info with fallback values
        info = stock_data.get('info', {})
        stock_info = f"""
        <div style="padding: 20px; background: #f8f9fa; border-radius: 10px;">
            <h3>{info.get('longName', 'N/A')} ({info.get('symbol', 'N/A')})</h3>
            <p>Price: ${stock_data.get('current_price', 0):.2f}</p>
            <p>Market Cap: {info.get('marketCap', 'N/A')}</p>
            <p>PE Ratio: {info.get('trailingPE', 'N/A')}</p>
        </div>
        """

        # Recommendation styling
        rec_style = {
            "BUY": ("#d4edda", "🟒"),
            "STRONG BUY": ("#d4edda", "🟒"),
            "SELL": ("#f8d7da", "πŸ”΄"),
            "STRONG SELL": ("#f8d7da", "πŸ”΄"),
            "HOLD": ("#fff3cd", "βšͺ")
        }.get(recommendation['recommendation'].split()[0], ("#ffffff", "βšͺ"))

        rec_html = f"""
        <div style="padding: 20px; background: {rec_style[0]}; border-radius: 10px;">
            <h2>{rec_style[1]} Recommendation: {recommendation['recommendation']}</h2>
            <ul>
                {"".join(f'<li>{reason}</li>' for reason in recommendation.get('reasons', ['No analysis available']))}
            </ul>
        </div>
        """

        # Generate chart
        chart = analyzer.generate_price_chart(stock_data['history'])

        return {
            "news_table": tabulate(news_table, headers=["Date", "Title", "Analysis", "Sentiment"], tablefmt="html"),
            "stock_info": stock_info,
            "recommendation": rec_html,
            "chart": chart
        }

    except Exception as e:
        logging.error(f"Formatting failed: {str(e)}")
        return {
            "error": f"Output formatting failed: {str(e)}"
        }

def analyze_asset(asset_input):
    logging.info(f"Analyzing asset: {asset_input}")
    analyzer = FinancialAnalyzer() 

    try:
        # Resolve ticker symbol
        ticker = analyzer.resolve_ticker_symbol(asset_input)

        # Fetch data
        stock_data = analyzer.fetch_stock_data(ticker)
        if 'error' in stock_data:
            raise ValueError(stock_data['error'])

        articles = analyzer.fetch_articles(asset_input)
        analyzed_articles = [analyzer.analyze_article_sentiment(a) for a in articles]

        # Generate recommendation
        recommendation = analyzer.generate_recommendation(analyzed_articles, stock_data)

        # Format output
        results = format_analysis_output(analyzer, analyzed_articles, stock_data, recommendation)
        logging.info(f"Analysis complete for asset: {asset_input}")
        return results

    except Exception as e:
        logging.error(f"Analysis failed: {str(e)}")
        return {"error": f"Analysis failed: {str(e)}"}

def main():
    with gr.Blocks(theme=gr.themes.Default()) as app:
        gr.Markdown("# Advanced Stock Analysis Suite")

        with gr.Row():
            asset_input = gr.Textbox(label="Stock/Company Name", placeholder="Enter stock name or symbol...")
            analyze_btn = gr.Button("Analyze", variant="primary")

        with gr.Tabs():
            with gr.Tab("News Sentiment"):
                news_table = gr.HTML(label="News Analysis")

            with gr.Tab("Technical Analysis"):
                stock_info = gr.HTML()
                price_chart = gr.Plot()

            with gr.Tab("Recommendation"):
                recommendation = gr.HTML()

        @analyze_btn.click(inputs=[asset_input], outputs=[news_table, stock_info, price_chart, recommendation])
        def update_analysis(asset):
            logging.info(f"Update analysis triggered for asset: {asset}")
            results = analyze_asset(asset)
            if 'error' in results:
                logging.error(f"Error in analysis: {results['error']}")
                return [f"<div style='color: red'>{results['error']}</div>"]*4
            logging.info(f"Analysis results returned for asset: {asset}")
            return [
                results["news_table"],
                results["stock_info"],
                results["chart"],
                results["recommendation"]
            ]

    app.launch()

if __name__ == "__main__":
    main()