Dmtlant's picture
Update app.py
d6e9a0c verified
import gradio as gr
import datetime
import requests
import os
from telegram import Bot
from zeep import Client
# Настройки
TELEGRAM_TOKEN = os.getenv("TELEGRAM_TOKEN")
CHAT_ID = "-4620812399"
HF_TOKEN = os.getenv("HF_TOKEN")
# Подключение к RUONIA через SOAP
client = Client("https://www.cbr.ru/DailyInfoWebServ/DailyInfo.asmx?WSDL")
def get_ruonia(start_date, end_date):
try:
data = client.service.Ruonia(fromDate=start_date.strftime('%Y-%m-%d'), ToDate=end_date.strftime('%Y-%m-%d'))
return data
except Exception as e:
print("Ошибка при получении RUONIA:", e)
return []
def summarize_ruonia(data):
summary = []
total_rate = 0
total_volume = 0
# Доступ к данным через два уровня '_value_1'
for entry in data['_value_1']['_value_1']:
try:
# Доступ к 'ro' внутри каждого элемента
date = entry['ro']['D0'].strftime('%d.%m.%Y')
rate = float(entry['ro']['ruo'])
volume = float(entry['ro']['vol'])
total_rate += rate
total_volume += volume
summary.append(f"📅 {date}: Ставка RUONIA: {rate:.4f}%, Объём: {volume:.2f} млн руб.")
except Exception as e:
print("Ошибка при разборе:", entry, e)
avg_rate = total_rate / len(data['_value_1']['_value_1']) if data['_value_1']['_value_1'] else 0
return "\n".join(summary), avg_rate, total_volume
def get_weather():
try:
# Получение погоды в Цельсиях
url = "https://wttr.in/Moscow?format=%C+%t"
response = requests.get(url)
# Погода в Цельсиях и температуре
weather_data = response.text.strip()
temp_celsius = weather_data.split()[1].replace("°C", "")
return f"{weather_data.split()[0]} {temp_celsius}°C"
except:
return "Погода недоступна."
def get_history_event(date):
# Пример запроса исторического события на этот день
year = date.year
# Здесь можно использовать API для исторических событий или статические данные
events = {
2020: "В 2020 году в этот день в России был введен карантин по коронавирусу.",
2010: "В 2010 году в этот день произошел большой землетрясение в Чили."
}
return events.get(year, "Интересных событий в этот день нет.")
def ai_commentary(avg_rate, weather, historical_event):
# Размышления бота
thoughts = (
f"📊 Средняя ставка RUONIA за последние дни составила примерно {avg_rate:.2f}%. "
f"Текущая погода в Москве: {weather}.\n"
"🌍 В условиях высокой ключевой ставки это может свидетельствовать о продолжающемся давлении на денежный рынок. "
"Это также может повлиять на ипотечные ставки, которые продолжат оставаться высокими для заёмщиков. "
"Цены на электронику могут продолжить расти, так как затраты на производство и логистику остаются высокими.\n"
f"🔍 Историческое событие на этот день: {historical_event}\n"
"💭 Также стоит отметить, что погода в этот день может повлиять на потребительские настроения и экономическую активность. "
"Если погода прохладная, возможно, люди меньше тратят на отдых и больше на отопление и энергоснабжение."
)
return thoughts
def generate_daily_report():
today = datetime.date.today()
start_date = today - datetime.timedelta(days=7)
ruonia_data = get_ruonia(start_date, today)
ruonia_summary, avg_rate, total_volume = summarize_ruonia(ruonia_data)
weather = get_weather()
historical_event = get_history_event(today)
ai_summary = ai_commentary(avg_rate, weather, historical_event)
full_report = f"🗓 Ежедневный отчёт: {today.strftime('%d.%m.%Y')}\n\n" + ruonia_summary + "\n\n" + ai_summary
try:
bot = Bot(token=TELEGRAM_TOKEN)
bot.send_message(chat_id=CHAT_ID, text=full_report)
except Exception as e:
print("Ошибка отправки Telegram:", e)
return full_report
# Интерфейс Gradio
with gr.Blocks(title="📰 Ежедневный отчёт RUONIA + Погода") as demo:
gr.Markdown("# 📉 Ежедневная аналитика: RUONIA + Погода")
btn = gr.Button("Сгенерировать отчёт")
out = gr.Textbox(label="Результат")
btn.click(fn=generate_daily_report, outputs=out)
if __name__ == "__main__":
demo.launch(server_name="0.0.0.0", server_port=7860)