IAMTFRMZA's picture
Update app.py
8910d8d verified
import streamlit as st
import os
import time
import re
import requests
import json
from PIL import Image
from io import BytesIO
from urllib.parse import quote
from openai import OpenAI
# ------------------ Authentication ------------------
VALID_USERS = {
"andrew@lortechnologies.com": "Pass.123",
"asherS@schlagergroup.com.au": "Pass.123",
"daniel@schlagergroup.com.au": "Pass.123",
"admin@schlagergroup.com.au": "Pass.123",
}
def login():
st.title("πŸ” Login Required")
email = st.text_input("Email")
password = st.text_input("Password", type="password")
if st.button("Login"):
if VALID_USERS.get(email) == password:
st.session_state.authenticated = True
st.rerun()
else:
st.error("❌ Incorrect email or password.")
if "authenticated" not in st.session_state:
st.session_state.authenticated = False
if not st.session_state.authenticated:
login()
st.stop()
# ------------------ App Configuration ------------------
st.set_page_config(page_title="Schlager Forrestdale DocAIAssist", layout="wide", initial_sidebar_state="collapsed")
st.title("πŸ“„ Schlager Forrestdale Document Assistant")
st.caption("Explore City of Armadale construction documents using AI + OCR 🧐")
# ------------------ Load API Key ------------------
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY")
if not OPENAI_API_KEY:
st.error("❌ Missing OPENAI_API_KEY environment variable.")
st.stop()
client = OpenAI(api_key=OPENAI_API_KEY)
# ------------------ Tabs (Reordered) ------------------
tab1, tab2, tab3 = st.tabs(["πŸ’¬ Assistant", "πŸ“‘ Contract Queries", "πŸ“ Visual Queries"])
# ------------------ Tab 1: General Chat Assistant ------------------
with tab1:
CHAT_ASSISTANT_ID = "asst_KsQRedoJUnEeStzfox1o06lO"
if "chatbot_messages" not in st.session_state:
st.session_state.chatbot_messages = []
if "chatbot_thread_id" not in st.session_state:
st.session_state.chatbot_thread_id = None
with st.sidebar:
st.header("πŸ’¬ Chat Assistant Options")
if st.button("🧹 Clear Chat Assistant"):
st.session_state.chatbot_messages = []
st.session_state.chatbot_thread_id = None
st.rerun()
st.markdown("### πŸ€– General Chat Assistant")
user_prompt = st.chat_input("Ask anything related to the project or documents...")
if user_prompt:
st.session_state.chatbot_messages.append({"role": "user", "content": user_prompt})
for msg in st.session_state.chatbot_messages:
with st.chat_message(msg["role"]):
st.markdown(msg["content"], unsafe_allow_html=True)
if st.session_state.chatbot_messages and st.session_state.chatbot_messages[-1]["role"] == "user":
try:
if st.session_state.chatbot_thread_id is None:
thread = client.beta.threads.create()
st.session_state.chatbot_thread_id = thread.id
for m in st.session_state.chatbot_messages[:-1]:
client.beta.threads.messages.create(
thread_id=st.session_state.chatbot_thread_id,
role=m["role"],
content=m["content"]
)
client.beta.threads.messages.create(
thread_id=st.session_state.chatbot_thread_id,
role="user",
content=st.session_state.chatbot_messages[-1]["content"]
)
run = client.beta.threads.runs.create(
thread_id=st.session_state.chatbot_thread_id,
assistant_id=CHAT_ASSISTANT_ID
)
with st.spinner("πŸ€– Assistant replying..."):
while True:
status = client.beta.threads.runs.retrieve(thread_id=st.session_state.chatbot_thread_id, run_id=run.id)
if status.status in ("completed", "failed", "cancelled"):
break
time.sleep(1)
if status.status == "completed":
messages = client.beta.threads.messages.list(thread_id=st.session_state.chatbot_thread_id)
assistant_replies = [m for m in messages.data if m.role == "assistant"]
if assistant_replies:
latest_reply = assistant_replies[0].content[0].text.value.strip()
if not any(m["content"].strip() == latest_reply for m in st.session_state.chatbot_messages if m["role"] == "assistant"):
st.session_state.chatbot_messages.append({"role": "assistant", "content": latest_reply})
st.rerun()
else:
st.error("❌ Assistant failed to respond.")
except Exception as e:
st.error(f"❌ Chat Assistant Error: {e}")
# ------------------ Tab 2: Contract Queries ------------------
with tab2:
ASSISTANT_ID = "asst_KsQRedoJUnEeStzfox1o06lO"
if "messages" not in st.session_state:
st.session_state.messages = []
if "thread_id" not in st.session_state:
st.session_state.thread_id = None
if "image_url" not in st.session_state:
st.session_state.image_url = None
if "image_updated" not in st.session_state:
st.session_state.image_updated = False
if "pending_prompt" not in st.session_state:
st.session_state.pending_prompt = None
with st.sidebar:
st.header("ℹ️ Contract Tools")
if st.button("🧹 Clear Chat"):
st.session_state.messages = []
st.session_state.thread_id = None
st.session_state.image_url = None
st.session_state.image_updated = False
st.session_state.pending_prompt = None
st.rerun()
show_image = st.toggle("πŸ“‘ Show Page Image", value=True)
keyword = st.text_input("Search by Keyword", placeholder="e.g. defects, WHS, delay")
if st.button("πŸ”Ž Search Keyword") and keyword:
st.session_state.pending_prompt = f"Find clauses or references related to: {keyword}"
section_options = [
"Select a section...",
"1. Formal Instrument of Contract",
"2. Offer and Acceptance",
"3. Key Personnel",
"4. Contract Pricing",
"5. Specifications",
"6. WHS Policies",
"7. Penalties and Delays",
"8. Dispute Resolution",
"9. Principal Obligations"
]
section = st.selectbox("πŸ“„ Jump to Section", section_options)
if section != section_options[0]:
st.session_state.pending_prompt = f"Summarize or list key points from section: {section}"
actions = [
"Select an action...",
"List all contractual obligations",
"Summarize payment terms",
"List WHS responsibilities",
"Find delay-related penalties",
"Extract dispute resolution steps"
]
action = st.selectbox("βš™οΈ Common Queries", actions)
if action != actions[0]:
st.session_state.pending_prompt = action
chat_col, image_col = st.columns([2, 1])
with chat_col:
st.markdown("### 🧠 Ask a Document-Specific Question")
user_input = st.chat_input("Example: What is the defects liability period?")
if user_input:
st.session_state.messages.append({"role": "user", "content": user_input})
elif st.session_state.pending_prompt:
st.session_state.messages.append({"role": "user", "content": st.session_state.pending_prompt})
st.session_state.pending_prompt = None
if st.session_state.messages and st.session_state.messages[-1]["role"] == "user":
try:
if st.session_state.thread_id is None:
thread = client.beta.threads.create()
st.session_state.thread_id = thread.id
client.beta.threads.messages.create(
thread_id=st.session_state.thread_id,
role="user",
content=st.session_state.messages[-1]["content"]
)
run = client.beta.threads.runs.create(
thread_id=st.session_state.thread_id,
assistant_id=ASSISTANT_ID
)
with st.spinner("πŸ€– Thinking..."):
while True:
status = client.beta.threads.runs.retrieve(thread_id=st.session_state.thread_id, run_id=run.id)
if status.status in ("completed", "failed", "cancelled"):
break
time.sleep(1)
if status.status == "completed":
messages = client.beta.threads.messages.list(thread_id=st.session_state.thread_id)
for m in reversed(messages.data):
if m.role == "assistant":
reply = m.content[0].text.value.strip()
# βœ… Avoid near-duplicate assistant responses
is_duplicate = any(
reply in msg["content"] or msg["content"] in reply
for msg in st.session_state.messages if msg["role"] == "assistant"
)
if not is_duplicate:
st.session_state.messages.append({"role": "assistant", "content": reply})
# πŸ” Extract image reference if found
match = re.search(r'Document Reference:\s*(.*?),\s*Page\s*(\d+)', reply)
if match:
doc, page = match.group(1).strip(), int(match.group(2))
folder = quote(doc)
img_url = f"https://raw.githubusercontent.com/AndrewLORTech/c2ozschlaegerforrestdale/main/{folder}/{folder}_page_{page:04d}.png"
st.session_state.image_url = img_url
st.session_state.image_updated = True
break
else:
st.error("❌ Assistant failed.")
st.rerun()
except Exception as e:
st.error(f"❌ Error: {e}")
# 🧾 Show messages in reverse (latest on top)
for msg in reversed(st.session_state.messages):
with st.chat_message(msg["role"]):
st.markdown(msg["content"], unsafe_allow_html=True)
with image_col:
if show_image and st.session_state.image_url:
try:
r = requests.get(st.session_state.image_url)
r.raise_for_status()
img = Image.open(BytesIO(r.content))
st.image(img, caption="πŸ“„ OCR Page Image", use_container_width=True)
except Exception as e:
st.error(f"πŸ–ΌοΈ Image failed: {e}")
# ------------------ Technical Tab ------------------
with tab3:
ASSISTANT_ID = "asst_DjvuWBc7tCvMbAhY7n1em4BZ"
if "tech_messages" not in st.session_state:
st.session_state.tech_messages = []
if "tech_thread_id" not in st.session_state:
st.session_state.tech_thread_id = None
if "tech_results" not in st.session_state:
st.session_state.tech_results = []
st.session_state.tech_lightbox = None
# βœ… Input moved inside tab3 block
tech_input = st.chat_input("Ask about plans, drawings or components")
if tech_input:
st.session_state.tech_messages.append({"role": "user", "content": tech_input})
if st.session_state.tech_messages and st.session_state.tech_messages[-1]["role"] == "user":
try:
if st.session_state.tech_thread_id is None:
thread = client.beta.threads.create()
st.session_state.tech_thread_id = thread.id
client.beta.threads.messages.create(
thread_id=st.session_state.tech_thread_id,
role="user",
content=st.session_state.tech_messages[-1]["content"]
)
run = client.beta.threads.runs.create(
thread_id=st.session_state.tech_thread_id,
assistant_id=ASSISTANT_ID
)
with st.spinner("πŸ” Searching technical drawings..."):
while True:
run_status = client.beta.threads.runs.retrieve(
thread_id=st.session_state.tech_thread_id,
run_id=run.id
)
if run_status.status in ("completed", "failed", "cancelled"):
break
time.sleep(1)
if run_status.status == "completed":
messages = client.beta.threads.messages.list(thread_id=st.session_state.tech_thread_id)
for msg in reversed(messages.data):
if msg.role == "assistant":
content = msg.content[0].text.value
st.session_state.tech_messages.append({"role": "assistant", "content": content})
try:
st.session_state.tech_results = json.loads(content.strip("`json "))
except:
st.session_state.tech_results = []
break
except Exception as e:
st.error(f"❌ Technical Assistant Error: {e}")
with st.expander("πŸ”§ Options (Filter + Pagination)", expanded=False):
disciplines = sorted(set(d.get("discipline", "") for d in st.session_state.tech_results))
selected = st.selectbox("🌍 Filter by discipline", ["All"] + disciplines)
page_size = 8
page = st.number_input("Page", min_value=1, step=1, value=1)
if st.session_state.tech_results:
st.subheader("πŸ“‚ Results")
results = [r for r in st.session_state.tech_results if selected == "All" or r.get("discipline") == selected]
paged = results[(page - 1) * page_size : page * page_size]
cols = st.columns(4)
for i, item in enumerate(paged):
with cols[i % 4]:
st.markdown(f"**πŸ“ {item['drawing_number']} ({item['discipline']})**")
st.caption(item.get("summary", ""))
image_urls = item.get("images", [])
if not image_urls:
st.warning("⚠️ No image available.")
else:
url = image_urls[0]
st.caption(f"πŸ”— Image URL: {url}")
try:
st.image(url, caption=f"{item['drawing_number']} - Page 1", use_container_width=True)
except Exception as e:
st.error(f"❌ Could not load image: {e}")
if st.button("πŸ–ΌοΈ View Drawing Details", key=f"thumb_{i}"):
st.session_state.tech_lightbox = url
if st.session_state.tech_lightbox:
st.image(st.session_state.tech_lightbox, caption="πŸ” Enlarged Drawing Preview", use_container_width=True)
if st.button("❌ Close Preview"):
st.session_state.tech_lightbox = None
st.rerun()
else:
for msg in st.session_state.tech_messages:
with st.chat_message(msg["role"]):
st.markdown(msg["content"], unsafe_allow_html=True)