Spaces:
Runtime error
Runtime error
File size: 2,813 Bytes
eebf941 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 |
import imaplib
import email
import os
from dotenv import load_dotenv
from pathlib import Path
# تحميل المتغيرات من ملف .env
load_dotenv()
EMAIL = os.getenv("NOURA_EMAIL")
PASSWORD = os.getenv("NOURA_PASSWORD")
# مجلد حفظ المرفقات
ATTACHMENTS_DIR = Path("attachments")
ATTACHMENTS_DIR.mkdir(exist_ok=True)
class EmailChecker:
def __init__(self, server="imap.gmail.com", mailbox="inbox"):
self.server = server
self.mailbox = mailbox
self.connection = None
def connect(self):
try:
self.connection = imaplib.IMAP4_SSL(self.server)
self.connection.login(EMAIL, PASSWORD)
self.connection.select(self.mailbox)
except Exception as e:
raise ConnectionError(f"فشل الاتصال بالبريد: {e}")
def get_email_content(self, msg):
content = ""
for part in msg.walk():
if part.get_content_type() == "text/plain" and not part.get("Content-Disposition"):
try:
content += part.get_payload(decode=True).decode(errors="ignore")
except:
continue
return content.strip() if content else "لا يوجد محتوى نصي متاح."
def save_attachments(self, msg):
saved_files = []
for part in msg.walk():
if part.get_content_disposition() == "attachment":
filename = part.get_filename()
if filename:
filepath = ATTACHMENTS_DIR / filename
with open(filepath, "wb") as f:
f.write(part.get_payload(decode=True))
saved_files.append(str(filepath))
return saved_files
def get_latest_email_info(self):
try:
status, messages = self.connection.search(None, "ALL")
ids = messages[0].split()
if not ids:
return {"status": "لا توجد رسائل"}
latest_id = ids[-1]
status, data = self.connection.fetch(latest_id, "(RFC822)")
msg = email.message_from_bytes(data[0][1])
subject = msg["subject"] or "بدون عنوان"
content = self.get_email_content(msg)
attachments = self.save_attachments(msg)
return {
"status": "نجح",
"subject": subject,
"content": content,
"attachments": attachments if attachments else ["لا توجد مرفقات"]
}
except Exception as e:
return {"status": f"خطأ في جلب الرسائل: {e}"}
def close(self):
if self.connection:
try:
self.connection.logout()
except:
pass
|