File size: 5,410 Bytes
5c588b1 d0cf919 5c588b1 d0cf919 5c588b1 9308276 5c588b1 710af7b 5c588b1 4a5151c 9308276 4a5151c 5c588b1 9308276 4a5151c 5c588b1 9308276 4a5151c 9308276 5c588b1 bcf6023 5c588b1 bcf6023 5c588b1 bcf6023 9308276 5fef689 9308276 d0cf919 d6e9a0c 5c588b1 d6e9a0c 5c588b1 9308276 d6e9a0c 5c588b1 bcf6023 d6e9a0c 5c588b1 d6e9a0c 9308276 5c588b1 9308276 5c588b1 9308276 d6e9a0c 9308276 5c588b1 9308276 5c588b1 4a5151c 5c588b1 4a5151c 5c588b1 4a5151c 9308276 5c588b1 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 |
import gradio as gr
import datetime
import requests
import os
from telegram import Bot
from zeep import Client
# Настройки
TELEGRAM_TOKEN = os.getenv("TELEGRAM_TOKEN")
CHAT_ID = "-4620812399"
HF_TOKEN = os.getenv("HF_TOKEN")
# Подключение к RUONIA через SOAP
client = Client("https://www.cbr.ru/DailyInfoWebServ/DailyInfo.asmx?WSDL")
def get_ruonia(start_date, end_date):
try:
data = client.service.Ruonia(fromDate=start_date.strftime('%Y-%m-%d'), ToDate=end_date.strftime('%Y-%m-%d'))
return data
except Exception as e:
print("Ошибка при получении RUONIA:", e)
return []
def summarize_ruonia(data):
summary = []
total_rate = 0
total_volume = 0
# Доступ к данным через два уровня '_value_1'
for entry in data['_value_1']['_value_1']:
try:
# Доступ к 'ro' внутри каждого элемента
date = entry['ro']['D0'].strftime('%d.%m.%Y')
rate = float(entry['ro']['ruo'])
volume = float(entry['ro']['vol'])
total_rate += rate
total_volume += volume
summary.append(f"📅 {date}: Ставка RUONIA: {rate:.4f}%, Объём: {volume:.2f} млн руб.")
except Exception as e:
print("Ошибка при разборе:", entry, e)
avg_rate = total_rate / len(data['_value_1']['_value_1']) if data['_value_1']['_value_1'] else 0
return "\n".join(summary), avg_rate, total_volume
def get_weather():
try:
# Получение погоды в Цельсиях
url = "https://wttr.in/Moscow?format=%C+%t"
response = requests.get(url)
# Погода в Цельсиях и температуре
weather_data = response.text.strip()
temp_celsius = weather_data.split()[1].replace("°C", "")
return f"{weather_data.split()[0]} {temp_celsius}°C"
except:
return "Погода недоступна."
def get_history_event(date):
# Пример запроса исторического события на этот день
year = date.year
# Здесь можно использовать API для исторических событий или статические данные
events = {
2020: "В 2020 году в этот день в России был введен карантин по коронавирусу.",
2010: "В 2010 году в этот день произошел большой землетрясение в Чили."
}
return events.get(year, "Интересных событий в этот день нет.")
def ai_commentary(avg_rate, weather, historical_event):
# Размышления бота
thoughts = (
f"📊 Средняя ставка RUONIA за последние дни составила примерно {avg_rate:.2f}%. "
f"Текущая погода в Москве: {weather}.\n"
"🌍 В условиях высокой ключевой ставки это может свидетельствовать о продолжающемся давлении на денежный рынок. "
"Это также может повлиять на ипотечные ставки, которые продолжат оставаться высокими для заёмщиков. "
"Цены на электронику могут продолжить расти, так как затраты на производство и логистику остаются высокими.\n"
f"🔍 Историческое событие на этот день: {historical_event}\n"
"💭 Также стоит отметить, что погода в этот день может повлиять на потребительские настроения и экономическую активность. "
"Если погода прохладная, возможно, люди меньше тратят на отдых и больше на отопление и энергоснабжение."
)
return thoughts
def generate_daily_report():
today = datetime.date.today()
start_date = today - datetime.timedelta(days=7)
ruonia_data = get_ruonia(start_date, today)
ruonia_summary, avg_rate, total_volume = summarize_ruonia(ruonia_data)
weather = get_weather()
historical_event = get_history_event(today)
ai_summary = ai_commentary(avg_rate, weather, historical_event)
full_report = f"🗓 Ежедневный отчёт: {today.strftime('%d.%m.%Y')}\n\n" + ruonia_summary + "\n\n" + ai_summary
try:
bot = Bot(token=TELEGRAM_TOKEN)
bot.send_message(chat_id=CHAT_ID, text=full_report)
except Exception as e:
print("Ошибка отправки Telegram:", e)
return full_report
# Интерфейс Gradio
with gr.Blocks(title="📰 Ежедневный отчёт RUONIA + Погода") as demo:
gr.Markdown("# 📉 Ежедневная аналитика: RUONIA + Погода")
btn = gr.Button("Сгенерировать отчёт")
out = gr.Textbox(label="Результат")
btn.click(fn=generate_daily_report, outputs=out)
if __name__ == "__main__":
demo.launch(server_name="0.0.0.0", server_port=7860)
|