|
import gradio as gr |
|
import datetime |
|
import requests |
|
import os |
|
from telegram import Bot |
|
from zeep import Client |
|
|
|
|
|
TELEGRAM_TOKEN = os.getenv("TELEGRAM_TOKEN") |
|
CHAT_ID = "-4620812399" |
|
HF_TOKEN = os.getenv("HF_TOKEN") |
|
|
|
|
|
client = Client("https://www.cbr.ru/DailyInfoWebServ/DailyInfo.asmx?WSDL") |
|
|
|
def get_ruonia(start_date, end_date): |
|
try: |
|
data = client.service.Ruonia(fromDate=start_date.strftime('%Y-%m-%d'), ToDate=end_date.strftime('%Y-%m-%d')) |
|
return data |
|
except Exception as e: |
|
print("Ошибка при получении RUONIA:", e) |
|
return [] |
|
|
|
def summarize_ruonia(data): |
|
summary = [] |
|
total_rate = 0 |
|
total_volume = 0 |
|
|
|
|
|
for entry in data['_value_1']['_value_1']: |
|
try: |
|
|
|
date = entry['ro']['D0'].strftime('%d.%m.%Y') |
|
rate = float(entry['ro']['ruo']) |
|
volume = float(entry['ro']['vol']) |
|
total_rate += rate |
|
total_volume += volume |
|
summary.append(f"📅 {date}: Ставка RUONIA: {rate:.4f}%, Объём: {volume:.2f} млн руб.") |
|
except Exception as e: |
|
print("Ошибка при разборе:", entry, e) |
|
|
|
avg_rate = total_rate / len(data['_value_1']['_value_1']) if data['_value_1']['_value_1'] else 0 |
|
return "\n".join(summary), avg_rate, total_volume |
|
|
|
def get_weather(): |
|
try: |
|
|
|
url = "https://wttr.in/Moscow?format=%C+%t" |
|
response = requests.get(url) |
|
|
|
weather_data = response.text.strip() |
|
temp_celsius = weather_data.split()[1].replace("°C", "") |
|
return f"{weather_data.split()[0]} {temp_celsius}°C" |
|
except: |
|
return "Погода недоступна." |
|
|
|
def get_history_event(date): |
|
|
|
year = date.year |
|
|
|
events = { |
|
2020: "В 2020 году в этот день в России был введен карантин по коронавирусу.", |
|
2010: "В 2010 году в этот день произошел большой землетрясение в Чили." |
|
} |
|
return events.get(year, "Интересных событий в этот день нет.") |
|
|
|
def ai_commentary(avg_rate, weather, historical_event): |
|
|
|
thoughts = ( |
|
f"📊 Средняя ставка RUONIA за последние дни составила примерно {avg_rate:.2f}%. " |
|
f"Текущая погода в Москве: {weather}.\n" |
|
"🌍 В условиях высокой ключевой ставки это может свидетельствовать о продолжающемся давлении на денежный рынок. " |
|
"Это также может повлиять на ипотечные ставки, которые продолжат оставаться высокими для заёмщиков. " |
|
"Цены на электронику могут продолжить расти, так как затраты на производство и логистику остаются высокими.\n" |
|
f"🔍 Историческое событие на этот день: {historical_event}\n" |
|
"💭 Также стоит отметить, что погода в этот день может повлиять на потребительские настроения и экономическую активность. " |
|
"Если погода прохладная, возможно, люди меньше тратят на отдых и больше на отопление и энергоснабжение." |
|
) |
|
return thoughts |
|
|
|
def generate_daily_report(): |
|
today = datetime.date.today() |
|
start_date = today - datetime.timedelta(days=7) |
|
|
|
ruonia_data = get_ruonia(start_date, today) |
|
ruonia_summary, avg_rate, total_volume = summarize_ruonia(ruonia_data) |
|
weather = get_weather() |
|
historical_event = get_history_event(today) |
|
ai_summary = ai_commentary(avg_rate, weather, historical_event) |
|
|
|
full_report = f"🗓 Ежедневный отчёт: {today.strftime('%d.%m.%Y')}\n\n" + ruonia_summary + "\n\n" + ai_summary |
|
|
|
try: |
|
bot = Bot(token=TELEGRAM_TOKEN) |
|
bot.send_message(chat_id=CHAT_ID, text=full_report) |
|
except Exception as e: |
|
print("Ошибка отправки Telegram:", e) |
|
|
|
return full_report |
|
|
|
|
|
with gr.Blocks(title="📰 Ежедневный отчёт RUONIA + Погода") as demo: |
|
gr.Markdown("# 📉 Ежедневная аналитика: RUONIA + Погода") |
|
btn = gr.Button("Сгенерировать отчёт") |
|
out = gr.Textbox(label="Результат") |
|
|
|
btn.click(fn=generate_daily_report, outputs=out) |
|
|
|
if __name__ == "__main__": |
|
demo.launch(server_name="0.0.0.0", server_port=7860) |
|
|