Harsh Upadhyay
made the relative imports work.
718633d
# All sqlite3 and local DB logic will be removed and replaced with SQLAlchemy/Postgres in the next step.
# This file will be refactored to use SQLAlchemy models and sessions.
from sqlalchemy import create_engine, Column, Integer, String, Text, Float, ForeignKey, DateTime, LargeBinary
from sqlalchemy.orm import declarative_base, sessionmaker, relationship
from sqlalchemy.sql import func
import os
from sqlalchemy.exc import IntegrityError
from werkzeug.security import check_password_hash, generate_password_hash
from dotenv import load_dotenv
import re
load_dotenv(dotenv_path=os.path.join(os.path.dirname(__file__), '..', '.env'))
# SQLAlchemy setup
DATABASE_URL = os.environ.get('DATABASE_URL')
if not DATABASE_URL or DATABASE_URL.strip() == "":
raise ValueError("DATABASE_URL is not set or is empty. Please set it as an environment variable or in your .env file for NeonDB.")
engine = create_engine(DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
# User model
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True, index=True)
username = Column(String, unique=True, nullable=False, index=True)
email = Column(String, unique=True, nullable=False, index=True)
password_hash = Column(String, nullable=False)
phone = Column(String)
company = Column(String)
created_at = Column(DateTime(timezone=True), server_default=func.now())
documents = relationship('Document', back_populates='user')
question_answers = relationship('QuestionAnswer', back_populates='user')
# Document model
class Document(Base):
__tablename__ = 'documents'
id = Column(Integer, primary_key=True, index=True)
title = Column(String, nullable=False)
full_text = Column(Text)
summary = Column(Text)
clauses = Column(Text)
features = Column(Text)
context_analysis = Column(Text)
file_data = Column(LargeBinary) # Store file content in DB
file_size = Column(Integer) # Add this
upload_time = Column(DateTime(timezone=True), server_default=func.now())
user_id = Column(Integer, ForeignKey('users.id'))
user = relationship('User', back_populates='documents')
question_answers = relationship('QuestionAnswer', back_populates='document')
# QuestionAnswer model
class QuestionAnswer(Base):
__tablename__ = 'question_answers'
id = Column(Integer, primary_key=True, index=True)
document_id = Column(Integer, ForeignKey('documents.id'), nullable=False)
user_id = Column(Integer, ForeignKey('users.id'), nullable=False)
question = Column(Text, nullable=False)
answer = Column(Text, nullable=False)
score = Column(Float, default=0.0)
created_at = Column(DateTime(timezone=True), server_default=func.now())
document = relationship('Document', back_populates='question_answers')
user = relationship('User', back_populates='question_answers')
# Create tables if they don't exist
Base.metadata.create_all(bind=engine)
def get_db_session():
return SessionLocal()
# --- Document CRUD ---
def save_document(title, full_text, summary, clauses, features, context_analysis, file_data, user_id):
session = get_db_session()
try:
doc = Document(
title=title,
full_text=full_text,
summary=summary,
clauses=str(clauses),
features=str(features),
context_analysis=str(context_analysis),
file_data=file_data,
file_size=len(file_data) if file_data else 0, # Store file size
user_id=user_id
)
session.add(doc)
session.commit()
return doc.id
except Exception as e:
session.rollback()
raise
finally:
session.close()
def get_all_documents(user_id=None):
session = get_db_session()
try:
query = session.query(Document)
if user_id is not None:
query = query.filter(Document.user_id == user_id)
documents = query.order_by(Document.upload_time.desc()).all()
result = []
for doc in documents:
d = doc.__dict__.copy()
d.pop('_sa_instance_state', None)
d.pop('file_data', None) # Don't return file data in list
# Do NOT pop 'summary'; keep it in the result
# file_size is included
result.append(d)
return result
finally:
session.close()
def get_document_by_id(doc_id, user_id=None):
session = get_db_session()
try:
query = session.query(Document).filter(Document.id == doc_id)
if user_id is not None:
query = query.filter(Document.user_id == user_id)
doc = query.first()
if doc:
d = doc.__dict__.copy()
d.pop('_sa_instance_state', None)
# Don't return file_data by default
d.pop('file_data', None)
return d
return None
finally:
session.close()
def delete_document(doc_id):
session = get_db_session()
try:
doc = session.query(Document).filter(Document.id == doc_id).first()
if doc:
session.delete(doc)
session.commit()
return True
finally:
session.close()
def search_documents(query, search_type='all'):
session = get_db_session()
try:
results = []
if query.isdigit():
docs = session.query(Document).filter(Document.id == int(query)).all()
else:
docs = session.query(Document).filter(Document.title.ilike(f'%{query}%')).order_by(Document.id.desc()).all()
for doc in docs:
results.append({
"id": doc.id,
"title": doc.title,
"summary": doc.summary or "",
"upload_time": doc.upload_time,
"match_score": 1.0
})
return results
finally:
session.close()
# --- Q&A ---
def search_questions_answers(query, user_id=None):
session = get_db_session()
try:
q = session.query(QuestionAnswer)
if user_id is not None:
q = q.filter(QuestionAnswer.user_id == user_id)
q = q.filter((QuestionAnswer.question.ilike(f'%{query}%')) | (QuestionAnswer.answer.ilike(f'%{query}%')))
q = q.order_by(QuestionAnswer.created_at.desc())
results = []
for row in q.all():
results.append({
'id': row.id,
'document_id': row.document_id,
'question': row.question,
'answer': row.answer,
'created_at': row.created_at.isoformat() if row.created_at else None,
})
return results
finally:
session.close()
def clean_answer(answer):
# Remove patterns like (3), extra spaces, and leading/trailing punctuation
answer = re.sub(r'\(\d+\)', '', answer)
answer = re.sub(r'\s+', ' ', answer)
answer = answer.strip(' ,.;:')
return answer
def save_question_answer(document_id, user_id, question, answer, score):
score = float(score) # Convert np.float64 to Python float
answer = clean_answer(answer) # Clean up answer format
session = get_db_session()
try:
qa = QuestionAnswer(
document_id=document_id,
user_id=user_id,
question=question,
answer=answer,
score=score
)
session.add(qa)
session.commit()
except Exception as e:
session.rollback()
raise
finally:
session.close()
# --- User Profile ---
def get_user_profile(username):
session = get_db_session()
try:
user = session.query(User).filter(User.username == username).first()
if user:
return {
'username': user.username,
'email': user.email,
'phone': user.phone,
'company': user.company
}
return None
finally:
session.close()
def update_user_profile(username, email, phone, company):
session = get_db_session()
try:
user = session.query(User).filter(User.username == username).first()
if user:
user.email = email
user.phone = phone
user.company = company
session.commit()
return True
return False
finally:
session.close()
def change_user_password(username, current_password, new_password):
session = get_db_session()
try:
user = session.query(User).filter(User.username == username).first()
if not user:
return False, 'User not found'
if not check_password_hash(user.password_hash, current_password):
return False, 'Current password is incorrect'
user.password_hash = generate_password_hash(new_password)
session.commit()
return True, 'Password updated successfully'
finally:
session.close()