# utils.py - các hàm tiện ích chung cho toàn bộ ứng dụng import pandas as pd import numpy as np import pandas_ta as ta import scipy.signal import mplfinance as mpf import matplotlib.pyplot as plt from datetime import datetime import os from groq import Groq # Đường dẫn thư mục dữ liệu tài chính DATA_DIR = os.path.join(os.path.dirname(os.path.dirname(__file__))) # Đặt API key Groq os.environ["GROQ_API_KEY"] = "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" client = Groq(api_key=os.environ.get("GROQ_API_KEY")) # Tìm và thay thế tên model Groq AI GROQ_MODEL = "meta-llama/llama-4-scout-17b-16e-instruct" def read_csv_safely(path): if not os.path.exists(path): return None try: return pd.read_csv(path) except Exception as e: print(f"[utils] Lỗi đọc file {path}: {e}") return None def save_csv_safely(df, path): try: df.to_csv(path, index=False) except Exception as e: print(f"[utils] Lỗi ghi file {path}: {e}") def get_today_str(): return datetime.now().strftime('%Y-%m-%d') def detect_candlestick_patterns(df): """ Detects various candlestick patterns in the given dataframe using pandas-ta. """ import pandas_ta as ta # Ensure that the necessary columns are present if not all(col in df.columns for col in ['open', 'high', 'low', 'close']): print("Missing OHLC data") return {} patterns = {} # Hammer (đảo chiều tăng) if hasattr(ta, 'cdl_hammer'): patterns['hammer'] = ta.cdl_hammer(df['open'], df['high'], df['low'], df['close']) else: patterns['hammer'] = None # Shooting Star (đảo chiều giảm) if hasattr(ta, 'cdl_shootingstar'): patterns['shooting_star'] = ta.cdl_shootingstar(df['open'], df['high'], df['low'], df['close']) else: patterns['shooting_star'] = None # Engulfing (Bullish/Bearish) if hasattr(ta, 'cdl_engulfing'): engulfing = ta.cdl_engulfing(df['open'], df['high'], df['low'], df['close']) patterns['bullish_engulfing'] = (engulfing == 100).astype(int) patterns['bearish_engulfing'] = (engulfing == -100).astype(int) else: patterns['bullish_engulfing'] = None patterns['bearish_engulfing'] = None return patterns def calculate_fibonacci_levels(df, period=250): """ Calculates Fibonacci retracement levels based on the most recent significant swing high and low. """ df_period = df.tail(period) if df_period.empty: return None, None, None # Find the index of the highest high and lowest low in the period idx_high = df_period['high'].idxmax() idx_low = df_period['low'].idxmin() # Determine if the trend is up (low then high) or down (high then low) if idx_low < idx_high: # Uptrend: swing low to swing high swing_low = df_period.loc[idx_low]['low'] swing_high = df_period.loc[idx_high]['high'] is_uptrend = True else: # Downtrend: swing high to swing low swing_high = df_period.loc[idx_high]['high'] swing_low = df_period.loc[idx_low]['low'] is_uptrend = False if pd.isna(swing_high) or pd.isna(swing_low): return None, None, None diff = swing_high - swing_low # Standard Fibonacci retracement levels fibo_ratios = [0.236, 0.382, 0.5, 0.618, 0.786] levels = {} if is_uptrend: # Retracement levels are below the swing high for ratio in fibo_ratios: levels[f'{ratio*100:.1f}%'] = round(swing_high - diff * ratio, 2) else: # Retracement levels are above the swing low for ratio in fibo_ratios: levels[f'{ratio*100:.1f}%'] = round(swing_low + diff * ratio, 2) # Add extension levels fibo_ext_ratios = [1.272, 1.618] if is_uptrend: for ratio in fibo_ext_ratios: levels[f'Ext {ratio*100:.1f}%'] = round(swing_high + diff * (ratio - 1), 2) else: for ratio in fibo_ext_ratios: levels[f'Ext {ratio*100:.1f}%'] = round(swing_low - diff * (ratio - 1), 2) return levels, swing_low, swing_high def calculate_money_flow(df): """ Calculates money flow for each day and compares it to the 20-day average. """ df['money_flow'] = df['volume'] * df['close'] df['money_flow_20d_avg'] = df['money_flow'].rolling(window=20).mean() return df def find_double_top_bottom(df): import scipy.signal peaks, _ = scipy.signal.find_peaks(df['close']) troughs, _ = scipy.signal.find_peaks(-df['close']) double_tops = [] double_bottoms = [] for i in range(1, len(peaks)): if peaks[i] - peaks[i-1] < 10: double_tops.append((str(df['time'].iloc[peaks[i-1]]), str(df['time'].iloc[peaks[i]]))) for i in range(1, len(troughs)): if troughs[i] - troughs[i-1] < 10: double_bottoms.append((str(df['time'].iloc[troughs[i-1]]), str(df['time'].iloc[troughs[i]]))) return double_tops, double_bottoms def detect_w_double_bottom(df, min_distance=5, max_distance=40, tolerance=0.03): import scipy.signal closes = df['close'].values troughs, _ = scipy.signal.find_peaks(-closes) results = [] for i in range(len(troughs)-1): idx1, idx2 = troughs[i], troughs[i+1] if min_distance <= idx2 - idx1 <= max_distance: val1, val2 = closes[idx1], closes[idx2] if abs(val1-val2)/max(val1, val2) <= tolerance: mid_idx = idx1 + (idx2-idx1)//2 peak_between = closes[idx1:idx2+1].max() if peak_between > max(val1, val2) * 1.05: results.append((str(df['time'].iloc[idx1]), str(df['time'].iloc[idx2]))) return results def detect_m_double_top(df, min_distance=5, max_distance=40, tolerance=0.03): import scipy.signal closes = df['close'].values peaks, _ = scipy.signal.find_peaks(closes) results = [] for i in range(len(peaks)-1): idx1, idx2 = peaks[i], peaks[i+1] if min_distance <= idx2 - idx1 <= max_distance: val1, val2 = closes[idx1], closes[idx2] if abs(val1-val2)/max(val1, val2) <= tolerance: mid_idx = idx1 + (idx2-idx1)//2 trough_between = closes[idx1:idx2+1].min() if trough_between < min(val1, val2) * 0.95: results.append((str(df['time'].iloc[idx1]), str(df['time'].iloc[idx2]))) return results def detect_cup_and_handle(df, window=40, min_cup_depth=0.08, min_handle_depth=0.03): closes = df['close'].values n = len(closes) results = [] for i in range(window, n - window): left = i - window right = i cup_bottom_idx = left + closes[left:right].argmin() cup_bottom = closes[cup_bottom_idx] cup_left = closes[left] cup_right = closes[right] cup_top = max(cup_left, cup_right) cup_depth = (cup_top - cup_bottom) / cup_top if cup_top > 0 else 0 if cup_depth < min_cup_depth: continue handle_start = right handle_end = min(n-1, handle_start + window//2) handle_min_idx = handle_start + closes[handle_start:handle_end].argmin() handle_min = closes[handle_min_idx] handle_depth = (cup_top - handle_min) / cup_top if cup_top > 0 else 0 if handle_depth < min_handle_depth: continue if closes[handle_end-1] > cup_top: results.append({ 'cup_start': str(df['time'].iloc[left]), 'cup_bottom': str(df['time'].iloc[cup_bottom_idx]), 'cup_end': str(df['time'].iloc[right]), 'handle_start': str(df['time'].iloc[handle_start]), 'handle_end': str(df['time'].iloc[handle_end-1]) }) return results def plot_candlestick_with_fibo_patterns(df, fibonacci_levels, pattern_results, symbol, chart_path, double_tops=None, double_bottoms=None, cup_handle_patterns=None, w_double_bottoms=None, m_double_tops=None, color_map=None): import mplfinance as mpf import numpy as np import logging # Remove unused imports # import matplotlib.pyplot as plt # import pandas as pd # Parameter Validation required_columns = ['open', 'high', 'low', 'close', 'volume', 'time'] if not all(col in df.columns for col in required_columns): raise ValueError(f"DataFrame must contain columns: {required_columns}") df_plot = df.copy() df_plot['time'] = pd.to_datetime(df_plot['time']) df_plot.set_index('time', inplace=True) df_plot = df_plot[['open', 'high', 'low', 'close', 'volume']] fibo_lines = [] if fibonacci_levels: for level in fibonacci_levels: fibo_lines.append(mpf.make_addplot([level]*len(df_plot), color='purple', linestyle='--')) pattern_markers = [] # Use default color map if none is provided default_color_map = { 'hammer': 'red', 'shooting_star': 'blue', 'bullish_engulfing': 'green', 'bearish_engulfing': 'orange' } color_map = color_map or default_color_map for pattern, pattern_data in pattern_results.items(): if pattern_data is not None: indices = df_plot.index[df[pattern] != 0] if pattern in df else [] if len(indices) > 0: marker_color = color_map.get(pattern, 'black') marker_vals = [df_plot['close'].loc[idx] if idx in indices else None for idx in df_plot.index] pattern_markers.append( mpf.make_addplot( marker_vals, type='scatter', markersize=40, marker='o', color=marker_color, alpha=0.8, secondary_y=False # Ensure markers are plotted on the primary y-axis ) ) if double_tops: top_dates = [pd.to_datetime(t[1]) for t in double_tops if pd.to_datetime(t[1]) in df_plot.index] marker_vals = [float(df_plot['high'].loc[date]) if date in top_dates else np.nan for date in df_plot.index] if any(marker_vals): pattern_markers.append( mpf.make_addplot( marker_vals, type='scatter', markersize=80, marker='^', color='magenta', alpha=0.9, secondary_y=False, label=""#data['label'] + ': ' + ', '.join([str(date.date()) for date in top_dates]) if top_dates else None ) ) cup_handle_lines = [] if cup_handle_patterns: for pattern in cup_handle_patterns: try: cup_x = [pd.to_datetime(pattern['cup_start']), pd.to_datetime(pattern['cup_bottom']), pd.to_datetime(pattern['cup_end'])] cup_y = [df_plot['close'].loc[cup_x[0]], df_plot['close'].loc[cup_x[1]], df_plot['close'].loc[cup_x[2]]] cup_handle_lines.append( mpf.make_addplot( [cup_y[0] if date == cup_x[0] else cup_y[1] if date == cup_x[1] else cup_y[2] if date == cup_x[2] else np.nan for date in df_plot.index], color='blue', width=2, secondary_y=False, label='Cup: ' + ', '.join([str(x.date()) for x in cup_x]) ) ) handle_x = [pd.to_datetime(pattern['handle_start']), pd.to_datetime(pattern['handle_end'])] handle_y = [df_plot['close'].loc[handle_x[0]], df_plot['close'].loc[handle_x[1]]] cup_handle_lines.append( mpf.make_addplot( [handle_y[0] if date == handle_x[0] else handle_y[1] if date == handle_x[1] else np.nan for date in df_plot.index], color='green', width=2, secondary_y=False, label='Handle: ' + ', '.join([str(x.date()) for x in handle_x]) ) ) marker_vals = [df_plot['close'].loc[cup_x[1]] if date == cup_x[1] else np.nan for date in df_plot.index] cup_handle_lines.append( mpf.make_addplot( marker_vals, type='scatter', markersize=120, marker='o', color='red', alpha=0.5, secondary_y=False, label='Cup Bottom: ' + str(cup_x[1].date()) ) ) except Exception as e: logging.exception("Error plotting cup and handle pattern") continue addplots = [] if fibo_lines: addplots.extend([ap for ap in fibo_lines if ap is not None]) if pattern_markers: addplots.extend([ap for ap in pattern_markers if ap is not None]) if cup_handle_lines: addplots.extend([ap for ap in cup_handle_lines if ap is not None]) plot_kwargs = dict( type='candle', style='yahoo', volume=True, title=f'Biểu đồ nến, Fibonacci, mẫu hình nến và Cup & Handle: {symbol}', ylabel='Giá', ylabel_lower='Khối lượng', returnfig=True, figsize=(12, 8) ) #if addplots: # plot_kwargs['addplot'] = addplots fig, axlist = mpf.plot(df_plot, **plot_kwargs) fig.savefig(chart_path) plt.close(fig) def get_financial_valuation(stock): if stock is None or not hasattr(stock, 'finance') or stock.finance is None: return {'error': 'Không có dữ liệu tài chính cho mã này hoặc API trả về lỗi.'} try: # Lấy bảng ratio (tỷ số tài chính) ratio = stock.finance.ratio(period='year', lang='vi', dropna=True) bs = stock.finance.balance_sheet(period='year', lang='vi', dropna=True) is_ = stock.finance.income_statement(period='year', lang='vi', dropna=True) # Kiểm tra tồn tại cột 'year' và không rỗng for df, name in zip([ratio, bs, is_], ['ratio', 'balance_sheet', 'income_statement']): if not isinstance(df, pd.DataFrame) or df.empty or 'year' not in df.columns: return {'error': f'Dữ liệu {name} không hợp lệ hoặc thiếu cột year.'} # Lấy năm gần nhất có đủ dữ liệu years = set(ratio['year']).intersection(bs['year']).intersection(is_['year']) if not years: return {'error': 'Không đủ dữ liệu tài chính để định giá.'} latest_year = max(years) # Lấy dữ liệu năm gần nhất ratio_row = ratio[ratio['year'] == latest_year].iloc[0] bs_row = bs[bs['year'] == latest_year].iloc[0] is_row = is_[is_['year'] == latest_year].iloc[0] # Giá đóng cửa gần nhất close_price = stock.quote.history(start=f"{latest_year}-01-01", end=f"{latest_year}-12-31", interval='1D')['close'].iloc[-1] # Lấy chỉ số từ ratio nếu có pe = ratio_row.get('P/E', None) pb = ratio_row.get('P/B', None) roe = ratio_row.get('ROE (%)', None) eps = ratio_row.get('EPS', None) bvps = ratio_row.get('BVPS', None) # Fallback tự tính nếu thiếu equity = bs_row.get('Vốn chủ sở hữu', None) net_income = is_row.get('Lợi nhuận sau thuế', None) shares = bs_row.get('Vốn chủ sở hữu', None) if shares is None or shares == 0: shares = bs_row.get('Vốn góp của chủ sở hữu', None) if roe is None and net_income and equity and equity != 0: roe = round(net_income / equity * 100, 2) if pe is None and net_income and shares and shares != 0: pe = round(close_price * shares / net_income, 2) if pb is None and equity and shares and shares != 0: pb = round(close_price * shares / equity, 2) if eps is None and net_income and shares and shares != 0: eps = round(net_income / shares, 2) if bvps is None and equity and shares and shares != 0: bvps = round(equity / shares, 2) # Định giá nội tại đơn giản theo phương pháp chiết khấu ROE/PB intrinsic_value = None if roe and pb and pb != 0 and equity: intrinsic_value = round((roe / 100) * equity / pb, 2) return { 'year': latest_year, 'close_price': close_price, 'pe': pe, 'pb': pb, 'roe': roe, 'eps': eps, 'bvps': bvps, 'intrinsic_value': intrinsic_value, 'revenue': is_row.get('Doanh thu thuần', None), 'net_income': net_income, 'equity': equity } except Exception as e: return {'error': f'Lỗi khi lấy dữ liệu tài chính: {e}'} def calculate_dcf_valuation(fcf, growth_rate, wacc, years=5, terminal_growth=0.03): """ Tính giá trị nội tại theo phương pháp DCF (Discounted Cash Flow). fcf: Free Cash Flow năm gần nhất growth_rate: tốc độ tăng trưởng FCF dự kiến (vd: 0.1 = 10%) wacc: Weighted Average Cost of Capital (tỷ lệ chiết khấu) years: số năm dự báo terminal_growth: tốc độ tăng trưởng dài hạn (vd: 0.03 = 3%) """ import numpy_financial as npf cash_flows = [] for i in range(1, years+1): cash_flows.append(fcf * (1 + growth_rate) ** i) terminal_value = cash_flows[-1] * (1 + terminal_growth) / (wacc - terminal_growth) cash_flows[-1] += terminal_value dcf_value = npf.npv(wacc, cash_flows) return dcf_value def calculate_ddm_valuation(dividend, growth_rate, required_return): """ Định giá theo mô hình chiết khấu cổ tức (DDM) với tăng trưởng không đổi. """ if required_return <= growth_rate: return None return dividend * (1 + growth_rate) / (required_return - growth_rate) def calculate_nav(equity, debt=0): """ Định giá theo giá trị tài sản ròng (NAV). """ return equity - debt def calculate_residual_income(net_income, equity, cost_of_equity): """ Tính Residual Income = Lợi nhuận sau thuế - chi phí vốn chủ sở hữu """ return net_income - cost_of_equity * equity def calculate_eva(net_income, equity, debt, cost_of_equity, cost_of_debt): """ EVA = Lợi nhuận sau thuế - chi phí tổng vốn (vốn chủ + nợ) """ return net_income - (cost_of_equity * equity + cost_of_debt * debt) def safe_float(val, default=0.0): try: return float(val) except (TypeError, ValueError): return default def analyze_financial_csv_with_groq(csv_content, user_question=None): prompt = ( "Bạn là chuyên gia tài chính. Hãy phân tích dữ liệu tài chính sau (dưới dạng CSV):\n" f"{csv_content}\n" "1. Tóm tắt các điểm nổi bật: doanh thu, lợi nhuận, tăng trưởng, rủi ro.\n" "2. Phát hiện xu hướng, bất thường, cảnh báo sớm nếu có.\n" "3. Đưa ra nhận định, gợi ý chiến lược đầu tư.\n" "4. Định giá cổ phiếu theo các phương pháp: P/E, P/B, Book Value, DDM, DCF (nếu đủ dữ liệu).\n" "5. Thống kê quý nào trong năm thường có doanh thu và lợi nhuận sau thuế cao nhất (nếu có dữ liệu quý).\n" "\nTrả lời toàn bộ bằng tiếng Việt." ) if user_question: prompt += f"\n6. Trả lời câu hỏi: {user_question}\n" chat_completion = client.chat.completions.create( messages=[{"role": "user", "content": prompt}], model=GROQ_MODEL, ) return chat_completion.choices[0].message.content def fetch_vietstock_news(limit=15): """ Fetches the latest financial news from Vietstock. """ import requests from bs4 import BeautifulSoup try: url = "https://vietstock.vn/chung-khoan.htm" headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36'} res = requests.get(url, headers=headers, timeout=10) res.raise_for_status() soup = BeautifulSoup(res.content, 'html.parser') news_items = [] articles = soup.find_all('div', class_='article-content', limit=limit) for article in articles: title_tag = article.find('a', class_='channel-title') if title_tag: title = title_tag.text.strip() link = "https://vietstock.vn" + title_tag['href'] news_items.append({'title': title, 'link': link}) return news_items except Exception as e: print(f"Error fetching Vietstock news: {e}") return [] def analyze_news_with_groq(news_items): """ Analyzes a list of news headlines with Groq AI and returns a summary. """ if not news_items: return "Không có tin tức để phân tích." headlines = "- " + "\n- ".join([item['title'] for item in news_items]) prompt = ( "Bạn là một chuyên gia phân tích thị trường chứng khoán Việt Nam. " "Dưới đây là các tiêu đề tin tức mới nhất:\n" f"{headlines}\n\n" "Dựa vào các tin tức này, hãy:\n" "1. Tóm tắt các xu hướng chính của thị trường (tích cực, tiêu cực, trung lập).\n" "2. Nhận định các ngành hoặc nhóm cổ phiếu nào có thể bị ảnh hưởng.\n" "3. Đưa ra một bình luận ngắn gọn về tâm lý thị trường hiện tại.\n" "Trình bày súc tích, chuyên nghiệp và hoàn toàn bằng tiếng Việt." ) try: chat_completion = client.chat.completions.create( messages=[{"role": "user", "content": prompt}], model=GROQ_MODEL, max_tokens=500, temperature=0.3, ) return chat_completion.choices[0].message.content except Exception as e: print(f"Error analyzing news with Groq: {e}") return "Lỗi khi phân tích tin tức với AI." # Thêm các hàm tiện ích khác nếu cần