File size: 5,506 Bytes
a33458e
 
6c6cf17
 
a33458e
 
 
6c6cf17
 
 
 
a33458e
 
 
 
 
 
6c6cf17
 
 
 
a33458e
 
 
 
6c6cf17
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a33458e
 
 
6c6cf17
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a33458e
 
 
6c6cf17
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
import os
import sys
import logging
import shutil
from datetime import datetime
from typing import List, Dict, Any

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def sanitize_filename(filename: str) -> str:
    """Sanitize a filename by removing invalid characters."""
    # Replace invalid characters with underscores
    invalid_chars = '<>:"/\\|?*'
    for char in invalid_chars:
        filename = filename.replace(char, '_')
    # Limit filename length to avoid issues
    if len(filename) > 200:
        base, ext = os.path.splitext(filename)
        filename = base[:195] + ext
    return filename

def get_document_path(filename: str) -> str:
    """Get the path to store a document."""
    try:
        # Get the documents directory
        docs_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))), 'data', 'documents')
        
        # Create the directory if it doesn't exist
        os.makedirs(docs_dir, exist_ok=True)
        
        # Try to ensure the directory has write permissions
        try:
            # Test file to check write permissions
            test_file = os.path.join(docs_dir, '.test_write_access')
            with open(test_file, 'w') as f:
                f.write('test')
            os.remove(test_file)
        except Exception as e:
            logger.warning(f"Document directory may not be writable: {e}")
            # Try alternative location
            docs_dir = '/tmp/documents' if os.name != 'nt' else os.path.join(os.environ.get('TEMP', 'C:\\Temp'), 'documents')
            os.makedirs(docs_dir, exist_ok=True)
        
        # Sanitize the filename
        filename = sanitize_filename(filename)
        
        # Add a timestamp to make the filename unique
        timestamp = datetime.now().strftime('%Y%m%d%H%M%S')
        base, ext = os.path.splitext(filename)
        unique_filename = f"{base}_{timestamp}{ext}"
        
        filepath = os.path.join(docs_dir, unique_filename)
        logger.info(f"Document will be stored at: {filepath}")
        return filepath
    except Exception as e:
        logger.error(f"Error getting document path: {e}")
        # Fallback to a simple path in /tmp or temp directory
        fallback_dir = '/tmp' if os.name != 'nt' else os.environ.get('TEMP', 'C:\\Temp')
        os.makedirs(fallback_dir, exist_ok=True)
        return os.path.join(fallback_dir, f"doc_{datetime.now().strftime('%Y%m%d%H%M%S')}")

def copy_uploaded_file(source_path: str, destination_path: str) -> bool:
    """Copy an uploaded file with proper error handling."""
    try:
        shutil.copy2(source_path, destination_path)
        logger.info(f"File copied from {source_path} to {destination_path}")
        return True
    except Exception as e:
        logger.error(f"Error copying file: {e}")
        # Try alternate approach
        try:
            with open(source_path, 'rb') as src, open(destination_path, 'wb') as dst:
                dst.write(src.read())
            logger.info(f"File copied using alternate method")
            return True
        except Exception as e2:
            logger.error(f"All methods of copying file failed: {e2}")
            return False

def format_sources(sources: List[Dict[str, Any]]) -> str:
    """Format source documents for display."""
    try:
        if not sources:
            return "No sources found."
        
        formatted = []
        for i, source in enumerate(sources, 1):
            source_str = f"{i}. {source.get('file_name', 'Unknown Source')} "
            if source.get('page'):
                source_str += f"(Page {source['page']}) "
            formatted.append(source_str)
        
        return "\n".join(formatted)
    except Exception as e:
        logger.error(f"Error formatting sources: {e}")
        return "Error displaying sources."

def save_conversation(question: str, answer: str, sources: List[Dict[str, Any]]) -> str:
    """Save a conversation to a file."""
    try:
        # Create a directory for conversations
        conv_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))), 'data', 'conversations')
        try:
            os.makedirs(conv_dir, exist_ok=True)
        except Exception as e:
            logger.warning(f"Could not create conversation directory: {e}")
            # Use alternative directory
            conv_dir = '/tmp/conversations' if os.name != 'nt' else os.path.join(os.environ.get('TEMP', 'C:\\Temp'), 'conversations')
            os.makedirs(conv_dir, exist_ok=True)
        
        # Create a filename based on the timestamp and first few words of the question
        timestamp = datetime.now().strftime('%Y%m%d%H%M%S')
        question_slug = "_".join((question or "empty_question").split()[:5]).lower()
        question_slug = sanitize_filename(question_slug)
        filename = f"{timestamp}_{question_slug}.txt"
        
        # Format the conversation
        formatted_sources = format_sources(sources)
        content = f"Question: {question}\n\nAnswer: {answer}\n\nSources:\n{formatted_sources}\n"
        
        # Save the conversation
        filepath = os.path.join(conv_dir, filename)
        with open(filepath, 'w') as f:
            f.write(content)
        
        logger.info(f"Conversation saved to {filepath}")
        return filepath
    except Exception as e:
        logger.error(f"Error saving conversation: {e}")
        return ""