import os DEMO_MODE = False MEMORY_STORAGE_TYPE = "RAM" HF_DATASET_MEMORY_REPO = "broadfield-dev/ai-brain" HF_DATASET_RULES_REPO = "broadfield-dev/ai-rules" os.environ['STORAGE_BACKEND'] = MEMORY_STORAGE_TYPE if MEMORY_STORAGE_TYPE == "HF_DATASET": os.environ['HF_MEMORY_DATASET_REPO'] = HF_DATASET_MEMORY_REPO os.environ['HF_RULES_DATASET_REPO'] = HF_DATASET_RULES_REPO import json import re import logging from datetime import datetime from dotenv import load_dotenv import gradio as gr import time import tempfile import xml.etree.ElementTree as ET from PIL import Image, ImageDraw from model_logic import ( get_available_providers, get_model_display_names_for_provider, get_default_model_display_name_for_provider, call_model_stream, MODELS_BY_PROVIDER ) from memory_logic import ( initialize_memory_system, add_memory_entry, retrieve_memories_semantic, get_all_memories_cached, clear_all_memory_data_backend, add_rule_entry, retrieve_rules_semantic, remove_rule_entry, get_all_rules_cached, clear_all_rules_data_backend, save_faiss_indices_to_disk, STORAGE_BACKEND as MEMORY_STORAGE_BACKEND, SQLITE_DB_PATH as MEMORY_SQLITE_PATH, HF_MEMORY_DATASET_REPO as MEMORY_HF_MEM_REPO, HF_RULES_DATASET_REPO as MEMORY_HF_RULES_REPO, load_rules_from_file, load_memories_from_file, process_rules_from_text_blob, import_kb_from_kv_dict ) from websearch_logic import scrape_url, search_and_scrape_duckduckgo, search_and_scrape_google from image_kb_logic import ( set_pil_image_format_to_png, extract_data_from_image, decrypt_data, InvalidTag, parse_kv_string_to_dict, convert_kb_to_kv_string, generate_brain_carrier_image, draw_key_list_dropdown_overlay, encrypt_data, embed_data_in_image, _get_font, PREFERRED_FONTS, ) from prompts import ( DEFAULT_SYSTEM_PROMPT, METRIC_GENERATION_SYSTEM_PROMPT, METRIC_GENERATION_USER_PROMPT_TEMPLATE, PLAN_GENERATION_SYSTEM_PROMPT, PLAN_GENERATION_USER_PROMPT_TEMPLATE, INSIGHT_GENERATION_SYSTEM_PROMPT, INSIGHT_GENERATION_USER_PROMPT_TEMPLATE ) from gradio_client import Client load_dotenv() logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(threadName)s - %(message)s') logger = logging.getLogger(__name__) for lib_name in ["urllib3", "requests", "huggingface_hub", "PIL.PngImagePlugin", "matplotlib", "gradio_client.client", "multipart.multipart", "httpx", "sentence_transformers", "faiss", "datasets"]: if logging.getLogger(lib_name): logging.getLogger(lib_name).setLevel(logging.WARNING) WEB_SEARCH_ENABLED = os.getenv("WEB_SEARCH_ENABLED", "true").lower() == "true" MAX_HISTORY_TURNS = int(os.getenv("MAX_HISTORY_TURNS", 7)) current_chat_session_history = [] LOAD_RULES_FILE = os.getenv("LOAD_RULES_FILE") LOAD_MEMORIES_FILE = os.getenv("LOAD_MEMORIES_FILE") logger.info(f"App Config: WebSearch={WEB_SEARCH_ENABLED}, MemoryBackend={MEMORY_STORAGE_BACKEND}") logger.info(f"Startup loading: Rules from {LOAD_RULES_FILE or 'None'}, Memories from {LOAD_MEMORIES_FILE or 'None'}") def format_insights_for_prompt(retrieved_insights_list: list[str]) -> tuple[str, list[dict]]: if not retrieved_insights_list: return "No specific guiding principles or learned insights retrieved.", [] parsed = [] for text in retrieved_insights_list: match = re.match(r"\[(CORE_RULE|RESPONSE_PRINCIPLE|BEHAVIORAL_ADJUSTMENT|GENERAL_LEARNING)\|([\d\.]+?)\](.*)", text.strip(), re.DOTALL | re.IGNORECASE) if match: parsed.append({"type": match.group(1).upper().replace(" ", "_"), "score": match.group(2), "text": match.group(3).strip(), "original": text.strip()}) else: parsed.append({"type": "GENERAL_LEARNING", "score": "0.5", "text": text.strip(), "original": text.strip()}) try: parsed.sort(key=lambda x: float(x["score"]) if x["score"].replace('.', '', 1).isdigit() else -1.0, reverse=True) except ValueError: logger.warning("FORMAT_INSIGHTS: Sort error due to invalid score format.") grouped = {"CORE_RULE": [], "RESPONSE_PRINCIPLE": [], "BEHAVIORAL_ADJUSTMENT": [], "GENERAL_LEARNING": []} for p_item in parsed: grouped.get(p_item["type"], grouped["GENERAL_LEARNING"]).append(f"- (Score: {p_item['score']}) {p_item['text']}") sections = [f"{k.replace('_', ' ').title()}:\n" + "\n".join(v) for k, v in grouped.items() if v] return "\n\n".join(sections) if sections else "No guiding principles retrieved.", parsed def generate_interaction_metrics(user_input: str, bot_response: str, provider: str, model_display_name: str, api_key_override: str = None) -> dict: metric_start_time = time.time() logger.info(f"Generating metrics with: {provider}/{model_display_name}") metric_prompt_content = METRIC_GENERATION_USER_PROMPT_TEMPLATE.format(user_input=user_input, bot_response=bot_response) metric_messages = [{"role": "system", "content": METRIC_GENERATION_SYSTEM_PROMPT}, {"role": "user", "content": metric_prompt_content}] try: metrics_provider_final, metrics_model_display_final = provider, model_display_name metrics_model_env = os.getenv("METRICS_MODEL") if metrics_model_env and "/" in metrics_model_env: m_prov, m_id = metrics_model_env.split('/', 1) m_disp_name = next((dn for dn, mid in MODELS_BY_PROVIDER.get(m_prov.lower(), {}).get("models", {}).items() if mid == m_id), None) if m_disp_name: metrics_provider_final, metrics_model_display_final = m_prov, m_disp_name else: logger.warning(f"METRICS_MODEL '{metrics_model_env}' not found, using interaction model.") response_chunks = list(call_model_stream(provider=metrics_provider_final, model_display_name=metrics_model_display_final, messages=metric_messages, api_key_override=api_key_override, temperature=0.05, max_tokens=200)) resp_str = "".join(response_chunks).strip() json_match = re.search(r"```json\s*(\{.*?\})\s*```", resp_str, re.DOTALL | re.IGNORECASE) or re.search(r"(\{.*?\})", resp_str, re.DOTALL) if json_match: metrics_data = json.loads(json_match.group(1)) else: logger.warning(f"METRICS_GEN: Non-JSON response from {metrics_provider_final}/{metrics_model_display_final}: '{resp_str}'") return {"takeaway": "N/A", "response_success_score": 0.5, "future_confidence_score": 0.5, "error": "metrics format error"} parsed_metrics = {"takeaway": metrics_data.get("takeaway", "N/A"), "response_success_score": float(metrics_data.get("response_success_score", 0.5)), "future_confidence_score": float(metrics_data.get("future_confidence_score", 0.5)), "error": metrics_data.get("error")} logger.info(f"METRICS_GEN: Generated in {time.time() - metric_start_time:.2f}s. Data: {parsed_metrics}") return parsed_metrics except Exception as e: logger.error(f"METRICS_GEN Error: {e}", exc_info=False) return {"takeaway": "N/A", "response_success_score": 0.5, "future_confidence_score": 0.5, "error": str(e)} def _generate_action_plan( original_query: str, provider_name: str, model_display_name: str, ui_api_key_override: str | None, chat_history: list[dict] ) -> dict: history_str = "\n".join([f"{msg['role']}: {msg['content'][:150]}" for msg in chat_history[-4:]]) plan_user_prompt = PLAN_GENERATION_USER_PROMPT_TEMPLATE.format(history_str=history_str, original_query=original_query) plan_messages = [{"role": "system", "content": PLAN_GENERATION_SYSTEM_PROMPT}, {"role": "user", "content": plan_user_prompt}] try: response_chunks = list(call_model_stream( provider=provider_name, model_display_name=model_display_name, messages=plan_messages, api_key_override=ui_api_key_override, temperature=0.0, max_tokens=1000 )) resp_str = "".join(response_chunks).strip() json_match = re.search(r"\{.*\}", resp_str, re.DOTALL) if json_match: plan_data = json.loads(json_match.group(0)) return plan_data except Exception as e: logger.error(f"PLAN_GEN: Failed to generate or parse action plan: {e}") return { "action_type": "multi_step_plan", "plan": [ {"tool": "web_search", "task": original_query}, {"tool": "respond", "task": "Synthesize all information from the scratchpad and provide a comprehensive final answer to the user."} ] } def process_user_interaction_gradio( user_input: str, max_research_steps: int, provider_name: str, model_display_name: str, chat_history: list[dict], custom_system_prompt: str = None, ui_api_key_override: str = None, ): process_start_time = time.time() request_id = os.urandom(4).hex() logger.info(f"PUI_GRADIO [{request_id}] Start. User: '{user_input[:50]}...' Max Steps: {max_research_steps}") yield "status", "[Deciding on an action plan...]" action_plan_data = _generate_action_plan(user_input, provider_name, model_display_name, ui_api_key_override, chat_history) action_type = action_plan_data.get("action_type") if action_type == "fast_response": yield "status", "[Executing fast response...]" yield "plan", [{"tool": "fast_response", "task": action_plan_data.get("reason", "Direct answer.")}] now_str = datetime.now().strftime('%Y-%m-%d %H:%M:%S') final_sys_prompt = custom_system_prompt or DEFAULT_SYSTEM_PROMPT final_sys_prompt = f"Current Date/Time: {now_str}.\n\n" + final_sys_prompt messages_for_llm = [{"role": "system", "content": final_sys_prompt}] + chat_history + [{"role": "user", "content": user_input}] streamed_response = "" try: for chunk in call_model_stream(provider=provider_name, model_display_name=model_display_name, messages=messages_for_llm, api_key_override=ui_api_key_override, temperature=0.7, max_tokens=3000): streamed_response += chunk yield "response_chunk", chunk except Exception as e: streamed_response = f"\n\n(Error during fast response: {str(e)[:150]})" yield "response_chunk", streamed_response final_bot_text = streamed_response.strip() yield "final_response", {"response": final_bot_text} return plan = action_plan_data.get("plan", []) if not plan: plan = [{"tool": "web_search", "task": user_input}, {"tool": "respond", "task": "Synthesize a response."}] yield "plan", plan research_scratchpad = "" now_str = datetime.now().strftime('%Y-%m-%d %H:%M:%S') for i, step_action in enumerate(plan): tool = step_action.get("tool") task = step_action.get("task") if tool == 'respond': break if i + 1 > max_research_steps: research_scratchpad += f"\n\n---NOTE: Maximum research step budget of {max_research_steps} reached. Proceeding to final response.---\n" logger.warning(f"PUI_GRADIO [{request_id}]: Max research steps ({max_research_steps}) reached.") break task_for_display = str(task) if isinstance(task, dict) else task yield "status", f"[Executing Step {i+1}/{len(plan)-1}: {tool} -> {task_for_display[:70]}...]" step_findings = f"Step {i+1} ({tool}: '{task_for_display[:1000]}'): " if tool == 'web_search': try: web_results = search_and_scrape_duckduckgo(task, num_results=5) scraped_content = "\n".join([f"Source:\nURL:{r.get('url','N/A')}\nContent:\n{(r.get('content') or r.get('error') or 'N/A')[:1500]}\n---" for r in web_results]) if web_results else "No results found." synthesis_prompt = f"Relevant web content for the task '{task}':\n\n{scraped_content}\n\nConcisely summarize the findings from the content." summary = "".join(list(call_model_stream(provider=provider_name, model_display_name=model_display_name, messages=[{"role": "user", "content": synthesis_prompt}], api_key_override=ui_api_key_override, temperature=0.1, max_tokens=400))) step_findings += summary except Exception as e: step_findings += f"Error during web search: {e}" elif tool == 'web_scrape': try: web_results = scrape_url(task) scraped_content = "\n".join([f"Source:\nURL:{r.get('url','N/A')}\nContent:\n{(r.get('content') or r.get('error') or 'N/A')[:1500]}\n---" for r in web_results]) if web_results else "No results found." synthesis_prompt = f"Relevant web content for the task '{task}':\n\n{scraped_content}\n\nConcisely summarize the findings from the content." summary = "".join(list(call_model_stream(provider=provider_name, model_display_name=model_display_name, messages=[{"role": "user", "content": synthesis_prompt}], api_key_override=ui_api_key_override, temperature=0.1, max_tokens=400))) step_findings += summary except Exception as e: step_findings += f"Error during web scrape: {e}" elif tool == 'gradio_view_api': try: client = Client(task) api_info = client.view_api(all_endpoints=True) summary = str(api_info) if summary and summary.strip(): step_findings += f"Successfully retrieved API endpoints for space '{task}':\n{summary}" else: step_findings += f"Could not retrieve valid API endpoint information for space '{task}'." except Exception as e: error_message = f"Error viewing Gradio API for space '{task}': {e}" logger.error(f"GRADIO_VIEW_API_TOOL Error: {e}\nTask was: {task}", exc_info=True) step_findings += error_message elif tool == 'gradio_client': try: if isinstance(task, str): try: params = json.loads(task) except json.JSONDecodeError: json_match = re.search(r"\{.*\}", task, re.DOTALL) if json_match: params = json.loads(json_match.group(0)) else: raise ValueError("Task is not a valid JSON string or does not contain a JSON object.") elif isinstance(task, dict): params = task else: raise TypeError(f"Unsupported task type for gradio_client: {type(task)}") space_id = params.get("space_id") api_name = params.get("api_name") parameters = params.get("parameters", {}) if not space_id or not api_name: raise ValueError("Missing 'space_id' or 'api_name' in task JSON.") if not isinstance(parameters, dict): raise TypeError("The 'parameters' field in the task must be a JSON object (dictionary).") client = Client(space_id) result = client.predict(**parameters, api_name=api_name) if isinstance(result, (str, int, float, bool)): result_str = str(result) elif isinstance(result, (dict, list)): result_str = json.dumps(result, indent=2) else: result_str = f"Received result of type {type(result)}." step_findings += f"Successfully called Gradio API {api_name} on space {space_id}. Result:\n{result_str}" except Exception as e: error_message = f"Error during Gradio Client operation: {e}" logger.error(f"GRADIO_CLIENT_TOOL Error: {e}\nTask was: {task}", exc_info=True) step_findings += error_message elif tool == 'memory_search': try: retrieved_mems = retrieve_memories_semantic(task, k=3) if retrieved_mems: memory_context = "\n".join([f"- User: {m.get('user_input','')} -> AI: {m.get('bot_response','')} (Takeaway: {m.get('metrics',{}).get('takeaway','N/A')})" for m in retrieved_mems]) step_findings += f"Found relevant memories:\n{memory_context}" else: step_findings += "No relevant memories found." except Exception as e: step_findings += f"Error during memory search: {e}" elif tool == 'think': try: think_prompt = f"Original Query: '{user_input}'\n\nResearch Scratchpad:\n```\n{research_scratchpad}\n```\n\nMy current thinking task is: '{task}'. Based on the scratchpad, what is the conclusion of this thinking step?" thought = "".join(list(call_model_stream(provider=provider_name, model_display_name=model_display_name, messages=[{"role": "user", "content": think_prompt}], api_key_override=ui_api_key_override, temperature=0.3, max_tokens=500))) step_findings += f"Conclusion: {thought}" except Exception as e: step_findings += f"Error during thinking step: {e}" else: step_findings += "Unknown tool specified in plan." research_scratchpad += f"\n\n---\n{step_findings}\n---" yield "step_result", {"step": i + 1, "tool": tool, "task": task_for_display, "result": step_findings} yield "status", "[Synthesizing final report...]" final_sys_prompt = custom_system_prompt or DEFAULT_SYSTEM_PROMPT final_sys_prompt += f"\n\nCurrent Date/Time: {now_str}. You have just completed a research plan. Synthesize the information in the 'Research Scratchpad' into a final, comprehensive answer. Cite sources by including URLs if available." final_user_prompt = f"Original user query: \"{user_input}\"\n\nResearch Scratchpad:\n```\n{research_scratchpad}\n```\n\nNow, provide the final, synthesized answer to the user." final_messages = [{"role": "system", "content": final_sys_prompt}, {"role": "user", "content": final_user_prompt}] streamed_response = "" try: for chunk in call_model_stream(provider=provider_name, model_display_name=model_display_name, messages=final_messages, api_key_override=ui_api_key_override, temperature=0.6, max_tokens=3000): streamed_response += chunk yield "response_chunk", chunk except Exception as e: error_msg = f"\n\n(Error during final synthesis: {str(e)[:150]})" streamed_response += error_msg yield "response_chunk", error_msg final_bot_text = streamed_response.strip() or "(No response or error during synthesis.)" logger.info(f"PUI_GRADIO [{request_id}]: Finished. Total: {time.time() - process_start_time:.2f}s. Resp len: {len(final_bot_text)}") yield "final_response", {"response": final_bot_text} def perform_post_interaction_learning(user_input: str, bot_response: str, provider: str, model_disp_name: str, api_key_override: str = None): task_id = os.urandom(4).hex() logger.info(f"POST_INTERACTION_LEARNING [{task_id}]: START User='{user_input[:40]}...', Bot='{bot_response[:40]}...'") learning_start_time = time.time() significant_learnings_summary = [] try: metrics = generate_interaction_metrics(user_input, bot_response, provider, model_disp_name, api_key_override) logger.info(f"POST_INTERACTION_LEARNING [{task_id}]: Metrics: {metrics}") add_memory_entry(user_input, metrics, bot_response) summary = f"User:\"{user_input}\"\nAI:\"{bot_response}\"\nMetrics(takeaway):{metrics.get('takeaway','N/A')},Success:{metrics.get('response_success_score','N/A')}" existing_rules_ctx = "\n".join([f"- \"{r}\"" for r in retrieve_rules_semantic(f"{summary}\n{user_input}", k=10)]) or "No existing rules context." insight_user_prompt = INSIGHT_GENERATION_USER_PROMPT_TEMPLATE.format(summary=summary, existing_rules_ctx=existing_rules_ctx) insight_msgs = [{"role":"system", "content":INSIGHT_GENERATION_SYSTEM_PROMPT}, {"role":"user", "content":insight_user_prompt}] insight_prov, insight_model_disp = provider, model_disp_name insight_env_model = os.getenv("INSIGHT_MODEL_OVERRIDE") if insight_env_model and "/" in insight_env_model: i_p, i_id = insight_env_model.split('/', 1) i_d_n = next((dn for dn, mid in MODELS_BY_PROVIDER.get(i_p.lower(), {}).get("models", {}).items() if mid == i_id), None) if i_d_n: insight_prov, insight_model_disp = i_p, i_d_n logger.info(f"POST_INTERACTION_LEARNING [{task_id}]: Generating insights with {insight_prov}/{insight_model_disp} (expecting XML)") raw_ops_xml_full = "".join(list(call_model_stream(provider=insight_prov, model_display_name=insight_model_disp, messages=insight_msgs, api_key_override=api_key_override, temperature=0.0, max_tokens=3500))).strip() ops_data_list, processed_count = [], 0 xml_match = re.search(r"```xml\s*(.*)\s*```", raw_ops_xml_full, re.DOTALL | re.IGNORECASE) or \ re.search(r"(.*)", raw_ops_xml_full, re.DOTALL | re.IGNORECASE) if xml_match: xml_content_str = xml_match.group(1) try: root = ET.fromstring(xml_content_str) if root.tag == "operations_list": for op_element in root.findall("operation"): action_el = op_element.find("action") insight_el = op_element.find("insight") old_insight_el = op_element.find("old_insight_to_replace") action = action_el.text.strip().lower() if action_el is not None and action_el.text else None insight_text = insight_el.text.strip() if insight_el is not None and insight_el.text else None old_insight_text = old_insight_el.text.strip() if old_insight_el is not None and old_insight_el.text else None if action and insight_text: ops_data_list.append({ "action": action, "insight": insight_text, "old_insight_to_replace": old_insight_text }) else: logger.warning(f"POST_INTERACTION_LEARNING [{task_id}]: Skipped XML operation due to missing action or insight text. Action: {action}, Insight: {insight_text}") else: logger.error(f"POST_INTERACTION_LEARNING [{task_id}]: XML root tag is not . Found: {root.tag}. XML content:\n{xml_content_str}") except ET.ParseError as e: logger.error(f"POST_INTERACTION_LEARNING [{task_id}]: XML parsing error: {e}. XML content that failed:\n{xml_content_str}") except Exception as e_xml_proc: logger.error(f"POST_INTERACTION_LEARNING [{task_id}]: Error processing parsed XML: {e_xml_proc}. XML content:\n{xml_content_str}") else: logger.info(f"POST_INTERACTION_LEARNING [{task_id}]: No XML structure found in LLM output. Full raw output:\n{raw_ops_xml_full}") if ops_data_list: logger.info(f"POST_INTERACTION_LEARNING [{task_id}]: LLM provided {len(ops_data_list)} insight ops from XML.") for op_idx, op_data in enumerate(ops_data_list): action = op_data["action"] insight_text = op_data["insight"] old_insight = op_data["old_insight_to_replace"] if not re.match(r"\[(CORE_RULE|RESPONSE_PRINCIPLE|BEHAVIORAL_ADJUSTMENT|GENERAL_LEARNING)\|([\d\.]+?)\]", insight_text, re.I|re.DOTALL): logger.warning(f"POST_INTERACTION_LEARNING [{task_id}]: Op {op_idx}: Skipped op due to invalid insight_text format from XML: '{insight_text[:100]}...'") continue if action == "add": success, status_msg = add_rule_entry(insight_text) if success: processed_count +=1 if insight_text.upper().startswith("[CORE_RULE"): significant_learnings_summary.append(f"New Core Rule Added: {insight_text}") else: logger.warning(f"POST_INTERACTION_LEARNING [{task_id}]: Op {op_idx} (add from XML): Failed to add rule '{insight_text[:50]}...'. Status: {status_msg}") elif action == "update": if old_insight and old_insight != insight_text: remove_success = remove_rule_entry(old_insight) if not remove_success: logger.warning(f"POST_INTERACTION_LEARNING [{task_id}]: Op {op_idx} (update from XML): Failed to remove old rule '{old_insight[:50]}...' before adding new.") success, status_msg = add_rule_entry(insight_text) if success: processed_count +=1 if insight_text.upper().startswith("[CORE_RULE"): significant_learnings_summary.append(f"Core Rule Updated to: {insight_text}") else: logger.warning(f"POST_INTERACTION_LEARNING [{task_id}]: Op {op_idx} (update from XML): Failed to add/update rule '{insight_text[:50]}...'. Status: {status_msg}") else: logger.warning(f"POST_INTERACTION_LEARNING [{task_id}]: Op {op_idx}: Skipped op due to unknown action '{action}' from XML.") if significant_learnings_summary: learning_digest = "SYSTEM CORE LEARNING DIGEST:\n" + "\n".join(significant_learnings_summary) system_metrics = { "takeaway": "Core knowledge refined.", "response_success_score": 1.0, "future_confidence_score": 1.0, "type": "SYSTEM_REFLECTION" } add_memory_entry( user_input="SYSTEM_INTERNAL_REFLECTION_TRIGGER", metrics=system_metrics, bot_response=learning_digest ) logger.info(f"POST_INTERACTION_LEARNING [{task_id}]: Added CORE_LEARNING_DIGEST to memories: {learning_digest[:100]}...") logger.info(f"POST_INTERACTION_LEARNING [{task_id}]: Processed {processed_count} insight ops out of {len(ops_data_list)} received from XML.") else: logger.info(f"POST_INTERACTION_LEARNING [{task_id}]: No valid insight operations derived from LLM's XML output.") except Exception as e: logger.error(f"POST_INTERACTION_LEARNING [{task_id}]: CRITICAL ERROR in learning task: {e}", exc_info=True) logger.info(f"POST_INTERACTION_LEARNING [{task_id}]: END. Total: {time.time() - learning_start_time:.2f}s") def handle_gradio_chat_submit(user_msg_txt: str, max_research_steps: int, gr_hist_list: list, sel_prov_name: str, sel_model_disp_name: str, ui_api_key: str|None, cust_sys_prompt: str): global current_chat_session_history cleared_input, updated_gr_hist, status_txt = "", list(gr_hist_list), "Initializing..." updated_rules_text = ui_refresh_rules_display_fn() updated_mems_json = ui_refresh_memories_display_fn() log_html_output = gr.HTML("

Research Log will appear here.

") final_report_tb = gr.Textbox(value="*Waiting...*", interactive=True, show_copy_button=True) dl_report_btn = gr.DownloadButton(interactive=False, value=None, visible=False) if not user_msg_txt.strip(): status_txt = "Error: Empty message." updated_gr_hist.append((user_msg_txt or "(Empty)", status_txt)) yield (cleared_input, updated_gr_hist, status_txt, log_html_output, final_report_tb, dl_report_btn, updated_rules_text, updated_mems_json) return updated_gr_hist.append((user_msg_txt, "Thinking... See Research Log below for progress.")) yield (cleared_input, updated_gr_hist, status_txt, log_html_output, final_report_tb, dl_report_btn, updated_rules_text, updated_mems_json) internal_hist = list(current_chat_session_history) final_bot_resp_acc = "" temp_dl_file_path = None try: processor_gen = process_user_interaction_gradio( user_input=user_msg_txt, max_research_steps=max_research_steps, provider_name=sel_prov_name, model_display_name=sel_model_disp_name, chat_history=internal_hist, custom_system_prompt=cust_sys_prompt.strip() or None, ui_api_key_override=ui_api_key.strip() if ui_api_key else None ) curr_bot_disp_msg = "" full_plan = [] log_html_parts = [] for upd_type, upd_data in processor_gen: if upd_type == "status": status_txt = upd_data if "Deciding" in status_txt or "Executing" in status_txt: log_html_output = gr.HTML(f"

{status_txt}

") elif upd_type == "plan": full_plan = upd_data log_html_parts = ["

Action Plan

    "] for i, step in enumerate(full_plan): log_html_parts.append(f'
  1. {step.get("tool")}: {step.get("task")} (Pending)
  2. ') log_html_parts.append("

Log

") log_html_output = gr.HTML("".join(log_html_parts)) elif upd_type == "step_result": step_num = upd_data["step"] sanitized_result = upd_data["result"].replace('<', '<').replace('>', '>').replace('\n', '
') log_html_parts[step_num] = f'
  • {upd_data.get("tool")}: {upd_data.get("task")} (Done)
  • ' log_html_parts.append(f'
    {sanitized_result}
    ') next_step_index_in_list = step_num + 1 if next_step_index_in_list < len(full_plan) + 1: next_step_action = full_plan[step_num] if next_step_action.get("tool") != "respond": log_html_parts[next_step_index_in_list] = f'
  • {next_step_action.get("tool")}: {next_step_action.get("task")} (In Progress...)
  • ' log_html_output = gr.HTML("".join(log_html_parts)) elif upd_type == "response_chunk": curr_bot_disp_msg += upd_data if updated_gr_hist and updated_gr_hist[-1][0] == user_msg_txt: updated_gr_hist[-1] = (user_msg_txt, curr_bot_disp_msg) elif upd_type == "final_response": final_bot_resp_acc = upd_data["response"] status_txt = "Response generated. Processing learning..." if not curr_bot_disp_msg and final_bot_resp_acc: curr_bot_disp_msg = final_bot_resp_acc if updated_gr_hist and updated_gr_hist[-1][0] == user_msg_txt: updated_gr_hist[-1] = (user_msg_txt, curr_bot_disp_msg or "(No text)") final_report_tb = gr.Textbox(value=curr_bot_disp_msg, interactive=True, show_copy_button=True) if curr_bot_disp_msg and not curr_bot_disp_msg.startswith("Error:"): try: with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".md", encoding='utf-8') as tmpfile: tmpfile.write(curr_bot_disp_msg) temp_dl_file_path = tmpfile.name dl_report_btn = gr.DownloadButton(value=temp_dl_file_path, visible=True, interactive=True) except Exception as e: logger.error(f"Error creating temp file for download: {e}", exc_info=False) dl_report_btn = gr.DownloadButton(interactive=False, value=None, visible=False, label="Download Error") else: dl_report_btn = gr.DownloadButton(interactive=False, value=None, visible=False) yield (cleared_input, updated_gr_hist, status_txt, log_html_output, final_report_tb, dl_report_btn, updated_rules_text, updated_mems_json) if upd_type == "final_response": break except Exception as e: logger.error(f"Chat handler error during main processing: {e}", exc_info=True) status_txt = f"Error: {str(e)[:100]}" error_message_for_chat = f"Sorry, an error occurred: {str(e)[:100]}" if updated_gr_hist and updated_gr_hist[-1][0] == user_msg_txt: updated_gr_hist[-1] = (user_msg_txt, error_message_for_chat) final_report_tb = gr.Textbox(value=error_message_for_chat, interactive=True) dl_report_btn = gr.DownloadButton(interactive=False, value=None, visible=False) log_html_output = gr.HTML(f'

    Error processing request.

    ') current_rules_text_on_error = ui_refresh_rules_display_fn() current_mems_json_on_error = ui_refresh_memories_display_fn() yield (cleared_input, updated_gr_hist, status_txt, log_html_output, final_report_tb, dl_report_btn, current_rules_text_on_error, current_mems_json_on_error) if temp_dl_file_path and os.path.exists(temp_dl_file_path): try: os.unlink(temp_dl_file_path) except Exception as e_unlink: logger.error(f"Error deleting temp download file {temp_dl_file_path} after error: {e_unlink}") return if final_bot_resp_acc and not final_bot_resp_acc.startswith("Error:"): current_chat_session_history.extend([{"role": "user", "content": user_msg_txt}, {"role": "assistant", "content": final_bot_resp_acc}]) status_txt = "[Performing post-interaction learning...]" current_rules_text_before_learn = ui_refresh_rules_display_fn() current_mems_json_before_learn = ui_refresh_memories_display_fn() yield (cleared_input, updated_gr_hist, status_txt, log_html_output, final_report_tb, dl_report_btn, current_rules_text_before_learn, current_mems_json_before_learn) try: perform_post_interaction_learning( user_input=user_msg_txt, bot_response=final_bot_resp_acc, provider=sel_prov_name, model_disp_name=sel_model_disp_name, api_key_override=ui_api_key.strip() if ui_api_key else None ) status_txt = "Response & Learning Complete." except Exception as e_learn: logger.error(f"Error during post-interaction learning: {e_learn}", exc_info=True) status_txt = "Response complete. Error during learning." else: status_txt = "Processing finished; no valid response or error occurred." updated_rules_text = ui_refresh_rules_display_fn() updated_mems_json = ui_refresh_memories_display_fn() yield (cleared_input, updated_gr_hist, status_txt, log_html_output, final_report_tb, dl_report_btn, updated_rules_text, updated_mems_json) if temp_dl_file_path and os.path.exists(temp_dl_file_path): try: os.unlink(temp_dl_file_path) except Exception as e_unlink: logger.error(f"Error deleting temp download file {temp_dl_file_path}: {e_unlink}") def ui_refresh_rules_display_fn(): return "\n\n---\n\n".join(get_all_rules_cached()) or "No rules found." def ui_refresh_memories_display_fn(): return get_all_memories_cached() or [] def ui_download_rules_action_fn(): rules_content = "\n\n---\n\n".join(get_all_rules_cached()) if not rules_content.strip(): gr.Warning("No rules to download.") return gr.DownloadButton(value=None, interactive=False, label="No Rules") try: with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".txt", encoding='utf-8') as tmpfile: tmpfile.write(rules_content) return tmpfile.name except Exception as e: logger.error(f"Error creating rules download file: {e}") gr.Error(f"Failed to prepare rules for download: {e}") return gr.DownloadButton(value=None, interactive=False, label="Error") def ui_upload_rules_action_fn(uploaded_file_obj, progress=gr.Progress()): if not uploaded_file_obj: return "No file provided for rules upload." try: with open(uploaded_file_obj.name, 'r', encoding='utf-8') as f: content = f.read() except Exception as e_read: return f"Error reading file: {e_read}" if not content.strip(): return "Uploaded rules file is empty." added_count, skipped_count, error_count = 0,0,0 potential_rules = [] file_name_lower = uploaded_file_obj.name.lower() if file_name_lower.endswith(".txt"): potential_rules = content.split("\n\n---\n\n") if len(potential_rules) == 1 and "\n" in content: potential_rules = [r.strip() for r in content.splitlines() if r.strip()] elif file_name_lower.endswith(".jsonl"): for line_num, line in enumerate(content.splitlines()): line = line.strip() if line: try: rule_text_in_json_string = json.loads(line) if isinstance(rule_text_in_json_string, str): potential_rules.append(rule_text_in_json_string) else: logger.warning(f"Rule Upload (JSONL): Line {line_num+1} did not contain a string value. Got: {type(rule_text_in_json_string)}") error_count +=1 except json.JSONDecodeError: logger.warning(f"Rule Upload (JSONL): Line {line_num+1} failed to parse as JSON: {line[:100]}") error_count +=1 else: return "Unsupported file type for rules. Please use .txt or .jsonl." valid_potential_rules = [r.strip() for r in potential_rules if r.strip()] total_to_process = len(valid_potential_rules) if total_to_process == 0 and error_count == 0: return "No valid rules found in file to process." elif total_to_process == 0 and error_count > 0: return f"No valid rules found to process. Encountered {error_count} parsing/format errors." progress(0, desc="Starting rules upload...") for idx, rule_text in enumerate(valid_potential_rules): success, status_msg = add_rule_entry(rule_text) if success: added_count += 1 elif status_msg == "duplicate": skipped_count += 1 else: error_count += 1 progress((idx + 1) / total_to_process, desc=f"Processed {idx+1}/{total_to_process} rules...") msg = f"Rules Upload: Total valid rule segments processed: {total_to_process}. Added: {added_count}, Skipped (duplicates): {skipped_count}, Errors (parsing/add): {error_count}." logger.info(msg); return msg def ui_download_memories_action_fn(): memories = get_all_memories_cached() if not memories: gr.Warning("No memories to download.") return gr.DownloadButton(value=None, interactive=False, label="No Memories") jsonl_content = "" for mem_dict in memories: try: jsonl_content += json.dumps(mem_dict) + "\n" except Exception as e: logger.error(f"Error serializing memory for download: {mem_dict}, Error: {e}") if not jsonl_content.strip(): gr.Warning("No valid memories to serialize for download.") return gr.DownloadButton(value=None, interactive=False, label="No Data") try: with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".jsonl", encoding='utf-8') as tmpfile: tmpfile.write(jsonl_content) return tmpfile.name except Exception as e: logger.error(f"Error creating memories download file: {e}") gr.Error(f"Failed to prepare memories for download: {e}") return gr.DownloadButton(value=None, interactive=False, label="Error") def ui_upload_memories_action_fn(uploaded_file_obj, progress=gr.Progress()): if not uploaded_file_obj: return "No file provided for memories upload." try: with open(uploaded_file_obj.name, 'r', encoding='utf-8') as f: content = f.read() except Exception as e_read: return f"Error reading file: {e_read}" if not content.strip(): return "Uploaded memories file is empty." added_count, format_error_count, save_error_count = 0,0,0 memory_objects_to_process = [] file_ext = os.path.splitext(uploaded_file_obj.name.lower())[1] if file_ext == ".json": try: parsed_json = json.loads(content) if isinstance(parsed_json, list): memory_objects_to_process = parsed_json elif isinstance(parsed_json, dict): memory_objects_to_process = [parsed_json] else: logger.warning(f"Memories Upload (.json): File content is not a JSON list or object. Type: {type(parsed_json)}"); format_error_count = 1 except json.JSONDecodeError as e: logger.warning(f"Memories Upload (.json): Invalid JSON file. Error: {e}"); format_error_count = 1 elif file_ext == ".jsonl": for line_num, line in enumerate(content.splitlines()): line = line.strip() if line: try: memory_objects_to_process.append(json.loads(line)) except json.JSONDecodeError: logger.warning(f"Memories Upload (.jsonl): Line {line_num+1} parse error: {line[:100]}"); format_error_count += 1 else: return "Unsupported file type for memories. Please use .json or .jsonl." if not memory_objects_to_process and format_error_count > 0 : return f"Memories Upload: File parsing failed. Found {format_error_count} format errors and no processable objects." elif not memory_objects_to_process: return "No valid memory objects found in the uploaded file." total_to_process = len(memory_objects_to_process) if total_to_process == 0: return "No memory objects to process (after parsing)." progress(0, desc="Starting memories upload...") for idx, mem_data in enumerate(memory_objects_to_process): if isinstance(mem_data, dict) and all(k in mem_data for k in ["user_input", "bot_response", "metrics"]): success, _ = add_memory_entry(mem_data["user_input"], mem_data["metrics"], mem_data["bot_response"]) if success: added_count += 1 else: save_error_count += 1 else: logger.warning(f"Memories Upload: Skipped invalid memory object structure: {str(mem_data)[:100]}"); format_error_count += 1 progress((idx + 1) / total_to_process, desc=f"Processed {idx+1}/{total_to_process} memories...") msg = f"Memories Upload: Processed {total_to_process} objects. Added: {added_count}, Format/Structure Errors: {format_error_count}, Save Errors: {save_error_count}." logger.info(msg); return msg def save_edited_rules_action_fn(edited_rules_text: str, progress=gr.Progress()): if DEMO_MODE: gr.Warning("Saving edited rules is disabled in Demo Mode.") return "Saving edited rules is disabled in Demo Mode." if not edited_rules_text.strip(): return "No rules text to save." stats = process_rules_from_text_blob(edited_rules_text, progress) return f"Editor Save: Added: {stats['added']}, Skipped (duplicates): {stats['skipped']}, Errors/Invalid: {stats['errors']} from {stats['total']} unique rules in text." def ui_upload_kb_from_image_fn(uploaded_image_filepath: str, password: str, progress=gr.Progress()): if DEMO_MODE: gr.Warning("Uploading is disabled in Demo Mode.") return "Upload disabled in Demo Mode." if not uploaded_image_filepath: return "No image file provided or pasted." progress(0, desc="Loading and standardizing image...") try: img_temp = Image.open(uploaded_image_filepath) img = set_pil_image_format_to_png(img_temp) except Exception as e: logger.error(f"KB ImgUL: Open/Standardize fail: {e}") return f"Error: Could not open or process image file: {e}" progress(0.2, desc="Extracting data from image...") try: extracted_bytes = extract_data_from_image(img) if not extracted_bytes: return "No data found embedded in the image." except ValueError as e: logger.error(f"KB ImgUL: Extract fail: {e}") return f"Error extracting data: {e}" except Exception as e: logger.error(f"KB ImgUL: Extract error: {e}", exc_info=True) return f"Unexpected extraction error: {e}" kv_string = "" try: if extracted_bytes[:20].decode('utf-8', errors='ignore').strip().startswith("# iLearn"): kv_string = extracted_bytes.decode('utf-8') progress(0.4, desc="Parsing data...") elif password and password.strip(): progress(0.3, desc="Attempting decryption...") kv_string = decrypt_data(extracted_bytes, password.strip()).decode('utf-8') progress(0.4, desc="Parsing decrypted data...") else: return "Data appears encrypted, but no password was provided." except (UnicodeDecodeError, InvalidTag, ValueError) as e: if "decryption" in str(e).lower() or isinstance(e, InvalidTag): return f"Decryption Failed. Check password or file integrity. Details: {e}" return "Data is binary and requires a password for decryption." except Exception as e: logger.error(f"KB ImgUL: Decrypt/Parse error: {e}", exc_info=True) return f"Unexpected error during decryption or parsing: {e}" if not kv_string: return "Could not get data from image (after potential decryption)." try: kv_dict = parse_kv_string_to_dict(kv_string) except Exception as e: logger.error(f"KB ImgUL: Parse fail: {e}") return f"Error parsing data: {e}" if not kv_dict: return "Parsed data is empty." stats = import_kb_from_kv_dict(kv_dict, progress) msg = f"Upload Complete. Rules - Add: {stats['rules_added']}, Skip: {stats['rules_skipped']}, Err: {stats['rules_errors']}. Mems - Add: {stats['mems_added']}, Err: {stats['mems_errors']}." logger.info(f"Image KB Upload: {msg}") return msg def app_load_fn(): logger.info("App loading. Initializing systems...") initialize_memory_system() logger.info("Memory system initialized.") rules_added, rules_skipped, rules_errors = load_rules_from_file(LOAD_RULES_FILE) rules_load_msg = f"Rules: Added {rules_added}, Skipped {rules_skipped}, Errors {rules_errors} from {LOAD_RULES_FILE or 'None'}." logger.info(rules_load_msg) mems_added, mems_format_errors, mems_save_errors = load_memories_from_file(LOAD_MEMORIES_FILE) mems_load_msg = f"Memories: Added {mems_added}, Format Errors {mems_format_errors}, Save Errors {mems_save_errors} from {LOAD_MEMORIES_FILE or 'None'}." logger.info(mems_load_msg) final_status = f"AI Systems Initialized. {rules_load_msg} {mems_load_msg} Ready." rules_on_load, mems_on_load = ui_refresh_rules_display_fn(), ui_refresh_memories_display_fn() return (final_status, rules_on_load, mems_on_load, gr.HTML("

    Research Log will appear here.

    "), gr.Textbox(value="*Waiting...*", interactive=True, show_copy_button=True), gr.DownloadButton(interactive=False, value=None, visible=False)) placeholder_filename = "placeholder_image.png" try: if not os.path.exists(placeholder_filename): img = Image.new('RGB', (200, 100), color='darkblue') draw = Image.Draw(img) try: font = _get_font(PREFERRED_FONTS, 14) draw.text((10, 45), "Placeholder KB Image", font=font, fill='white') except Exception: draw.text((10, 45), "Placeholder", fill='white') img.save(placeholder_filename) logger.info(f"Created '{placeholder_filename}' for Gradio examples.") except Exception as e: logger.error(f"Could not create placeholder image. The examples may not load correctly. Error: {e}") def ui_create_kb_image_fn(password: str, content_to_include: list, progress=gr.Progress()): include_rules = "Include Rules" in content_to_include include_memories = "Include Memories" in content_to_include if not include_rules and not include_memories: gr.Warning("Nothing selected to save.") return gr.update(value=None, visible=False), gr.update(value=None, visible=False), "Nothing selected to save." progress(0.1, desc="Fetching knowledge base...") rules = get_all_rules_cached() if include_rules else [] memories = get_all_memories_cached() if include_memories else [] if not rules and not memories: gr.Warning("Knowledge base is empty or selected content is empty.") return gr.update(value=None, visible=False), gr.update(value=None, visible=False), "No content to save." progress(0.2, desc="Serializing data...") kv_string = convert_kb_to_kv_string(rules, memories, include_rules, include_memories) data_bytes = kv_string.encode('utf-8') if password and password.strip(): progress(0.3, desc="Encrypting data...") try: data_bytes = encrypt_data(data_bytes, password.strip()) except Exception as e: logger.error(f"KB ImgDL: Encrypt failed: {e}") return gr.update(value=None, visible=False), gr.update(value=None, visible=False), f"Error: {e}" progress(0.5, desc="Generating carrier image...") carrier_image = generate_brain_carrier_image(w=800, h=800) progress(0.6, desc="Adding visual overlay...") keys_for_overlay = [] if include_rules: keys_for_overlay.append(f"Rule Count: {len(rules)}") if include_memories: keys_for_overlay.append(f"Memory Count: {len(memories)}") title_overlay = "Encrypted Knowledge Base" if password and password.strip() else "iLearn Knowledge Base" image_with_overlay = draw_key_list_dropdown_overlay(carrier_image, keys=keys_for_overlay, title=title_overlay) try: progress(0.8, desc="Embedding data into final image...") final_image_with_data = embed_data_in_image(image_with_overlay, data_bytes) except ValueError as e: logger.error(f"KB ImgDL: Embed failed: {e}") return gr.update(value=None, visible=False), gr.update(value=None, visible=False), f"Error: {e}" progress(0.9, desc="Preparing final image and download file...") try: with tempfile.NamedTemporaryFile(delete=False, suffix=".png") as tmpfile: final_image_with_data.save(tmpfile, format="PNG") tmp_path = tmpfile.name progress(1.0, desc="Image created!") return gr.update(value=tmp_path, visible=True), gr.update(value=tmp_path, visible=True), "Success! Image created." except Exception as e: logger.error(f"KB ImgDL: Save failed: {e}") return gr.update(value=None, visible=False), gr.update(value=None, visible=False), f"Error: {e}" def ui_load_from_sources_fn(image_filepath: str, rules_file_obj: object, mems_file_obj: object, password: str, progress=gr.Progress()): if image_filepath: progress(0.1, desc="Image source detected. Starting image processing...") return ui_upload_kb_from_image_fn(image_filepath, password, progress) if rules_file_obj: progress(0.1, desc="Rules file detected. Starting rules import...") return ui_upload_rules_action_fn(rules_file_obj, progress) if mems_file_obj: progress(0.1, desc="Memories file detected. Starting memories import...") return ui_upload_memories_action_fn(mems_file_obj, progress) return "No file or image uploaded. Please provide a source file to load." with gr.Blocks(theme=gr.themes.Soft(), css=".gr-button { margin: 5px; } .gr-textbox, .gr-text-area, .gr-dropdown, .gr-json { border-radius: 8px; } .gr-group { border: 1px solid #e0e0e0; border-radius: 8px; padding: 10px; } .gr-row { gap: 10px; } .gr-tab { border-radius: 8px; } .status-text { font-size: 0.9em; color: #555; } .gr-json { max-height: 400px; overflow-y: auto; }") as demo: gr.Markdown(f"# 🤖 iLearn: An Autonomous Learning Agent {'(DEMO MODE)' if DEMO_MODE else ''}", elem_classes=["header"]) is_sqlite, is_hf_dataset = (MEMORY_STORAGE_BACKEND == "SQLITE"), (MEMORY_STORAGE_BACKEND == "HF_DATASET") with gr.Row(variant="compact"): agent_stat_tb = gr.Textbox(label="Agent Status", value="Initializing systems...", interactive=False, elem_classes=["status-text"], scale=4) with gr.Column(scale=1, min_width=150): memory_backend_info_tb = gr.Textbox(label="Memory Backend", value=MEMORY_STORAGE_BACKEND, interactive=False, elem_classes=["status-text"]) sqlite_path_display = gr.Textbox(label="SQLite Path", value=MEMORY_SQLITE_PATH, interactive=False, visible=is_sqlite, elem_classes=["status-text"]) hf_repos_display = gr.Textbox(label="HF Repos", value=f"M: {MEMORY_HF_MEM_REPO}, R: {MEMORY_HF_RULES_REPO}", interactive=False, visible=is_hf_dataset, elem_classes=["status-text"]) with gr.Sidebar(): gr.Markdown("## ⚙️ Configuration") with gr.Group(): gr.Markdown("### AI Model Settings") api_key_tb = gr.Textbox(label="AI Provider API Key (Override)", type="password", placeholder="Uses .env if blank") available_providers = get_available_providers(); default_provider = available_providers[0] if "groq" not in available_providers else "groq" prov_sel_dd = gr.Dropdown(label="AI Provider", choices=available_providers, value=default_provider, interactive=True) default_model_display = get_default_model_display_name_for_provider(default_provider) if default_provider else None model_sel_dd = gr.Dropdown(label="AI Model", choices=get_model_display_names_for_provider(default_provider) if default_provider else [], value=default_model_display, interactive=True) research_steps_slider = gr.Slider(label="Max Research Steps", minimum=1, maximum=10, step=1, value=3, interactive=True) with gr.Group(): gr.Markdown("### System Prompt"); sys_prompt_tb = gr.Textbox(label="System Prompt Base", lines=8, value=DEFAULT_SYSTEM_PROMPT, interactive=True) with gr.Tabs(): with gr.TabItem("💬 Chat & Research"): with gr.Row(): with gr.Column(scale=3): gr.Markdown("### AI Chat Interface") main_chat_disp = gr.Chatbot(label=None, height=450, bubble_full_width=False,avatar_images=(None, "https://raw.githubusercontent.com/gradio-app/gradio/main/guides/assets/logo.png"), show_copy_button=True, render_markdown=True, sanitize_html=True) with gr.Row(variant="compact"): user_msg_tb = gr.Textbox(show_label=False, placeholder="Ask your research question...", scale=7, lines=1, max_lines=3) send_btn = gr.Button("Send", variant="primary", scale=1, min_width=100) with gr.Accordion("📝 Detailed Response & Research Log", open=True): research_log_html = gr.HTML(label="Research Log", value="

    Waiting for a new task to begin...

    ") fmt_report_tb = gr.Textbox(label="Full AI Response", lines=8, interactive=True, show_copy_button=True) dl_report_btn = gr.DownloadButton("Download Report", value=None, interactive=False, visible=False) with gr.TabItem("🧠 Knowledge Base"): with gr.Tabs(): with gr.TabItem("🎛️ System"): gr.Markdown("View and directly manage the current rules and memories in the system.") with gr.Row(equal_height=False, variant='compact'): with gr.Column(): gr.Markdown("### 📜 Current Rules") rules_disp_ta = gr.TextArea(label=None, lines=15, placeholder="Rules will appear here.", interactive=True) save_edited_rules_btn = gr.Button("💾 Save Edited Rules", variant="primary", interactive=not DEMO_MODE) clear_rules_btn = gr.Button("🗑️ Clear All Rules", variant="stop", visible=not DEMO_MODE) with gr.Column(): gr.Markdown("### 📚 Current Memories") mems_disp_json = gr.JSON(label=None, value=[], scale=1) clear_mems_btn = gr.Button("🗑️ Clear All Memories", variant="stop", visible=not DEMO_MODE) with gr.TabItem("💾 Save KB"): gr.Markdown("Export the current knowledge base as text files or as a single, portable PNG image.") with gr.Row(): rules_stat_tb = gr.Textbox(label="Rules Status", interactive=False, lines=1, elem_classes=["status-text"]) mems_stat_tb = gr.Textbox(label="Memories Status", interactive=False, lines=1, elem_classes=["status-text"]) with gr.Row(): with gr.Column(): gr.Markdown("### Text File Export") dl_rules_btn = gr.DownloadButton("⬇️ Download Rules (.txt)", value=None) dl_mems_btn = gr.DownloadButton("⬇️ Download Memories (.jsonl)", value=None) gr.Row() if MEMORY_STORAGE_BACKEND == "RAM": save_faiss_sidebar_btn = gr.Button("Save FAISS Indices", variant="secondary") with gr.Column(): gr.Markdown("### Image Export") with gr.Group(): save_kb_password_tb = gr.Textbox(label="Password (optional for encryption)", type="password") save_kb_include_cbg = gr.CheckboxGroup(label="Content to Include", choices=["Include Rules", "Include Memories"], value=["Include Rules", "Include Memories"]) create_kb_img_btn = gr.Button("✨ Create KB Image", variant="secondary") kb_image_display_output = gr.Image(label="Generated Image (Right-click to copy)", type="filepath", visible=False) kb_image_download_output = gr.DownloadButton("⬇️ Download Image File", visible=False) with gr.TabItem("📂 Load KB"): gr.Markdown("Import rules, memories, or a full KB from local files or a portable PNG image.") load_status_tb = gr.Textbox(label="Load Operation Status", interactive=False, lines=2) load_kb_password_tb = gr.Textbox(label="Password (for decrypting images)", type="password") with gr.Group(): gr.Markdown("#### Sources (Priority: Image > Rules File > Memories File)") with gr.Row(): upload_kb_img_fobj = gr.Image(label="1. Image Source", type="filepath", sources=["upload", "clipboard"], interactive=not DEMO_MODE) upload_rules_fobj = gr.File(label="2. Rules File Source (.txt/.jsonl)", file_types=[".txt", ".jsonl"], interactive=not DEMO_MODE) upload_mems_fobj = gr.File(label="3. Memories File Source (.json/.jsonl)", file_types=[".jsonl", ".json"], interactive=not DEMO_MODE) load_master_btn = gr.Button("⬆️ Load from Sources", variant="primary", interactive=not DEMO_MODE) gr.Examples( examples=[ ["https://huggingface.co/spaces/Agents-MCP-Hackathon/iLearn/resolve/main/evolutions/e0.01.01.png", ""], ["https://huggingface.co/spaces/Agents-MCP-Hackathon/iLearn/resolve/main/evolutions/e0.01.011.png", ""], ["https://huggingface.co/spaces/Agents-MCP-Hackathon/iLearn/resolve/main/evolutions/e0.01.012.png", ""], ], inputs=[upload_kb_img_fobj, load_kb_password_tb], label="Click an Example to Load Data" ) def dyn_upd_model_dd(sel_prov_dyn: str): models_dyn = get_model_display_names_for_provider(sel_prov_dyn); def_model_dyn = get_default_model_display_name_for_provider(sel_prov_dyn) return gr.Dropdown(choices=models_dyn, value=def_model_dyn, interactive=True) prov_sel_dd.change(fn=dyn_upd_model_dd, inputs=prov_sel_dd, outputs=model_sel_dd) chat_ins = [user_msg_tb, research_steps_slider, main_chat_disp, prov_sel_dd, model_sel_dd, api_key_tb, sys_prompt_tb] chat_outs = [user_msg_tb, main_chat_disp, agent_stat_tb, research_log_html, fmt_report_tb, dl_report_btn, rules_disp_ta, mems_disp_json] chat_event_args = {"fn": handle_gradio_chat_submit, "inputs": chat_ins, "outputs": chat_outs} send_btn.click(**chat_event_args); user_msg_tb.submit(**chat_event_args) save_edited_rules_btn.click(fn=save_edited_rules_action_fn, inputs=[rules_disp_ta], outputs=[rules_stat_tb], show_progress="full").then(fn=ui_refresh_rules_display_fn, outputs=rules_disp_ta, show_progress=False) clear_rules_btn.click(fn=lambda: ("All rules cleared." if clear_all_rules_data_backend() else "Error clearing rules."), outputs=rules_stat_tb, show_progress=False).then(fn=ui_refresh_rules_display_fn, outputs=rules_disp_ta, show_progress=False) clear_mems_btn.click(fn=lambda: ("All memories cleared." if clear_all_memory_data_backend() else "Error clearing memories."), outputs=mems_stat_tb, show_progress=False).then(fn=ui_refresh_memories_display_fn, outputs=mems_disp_json, show_progress=False) dl_rules_btn.click(fn=ui_download_rules_action_fn, inputs=None, outputs=dl_rules_btn, show_progress=False) dl_mems_btn.click(fn=ui_download_memories_action_fn, inputs=None, outputs=dl_mems_btn, show_progress=False) create_kb_img_btn.click( fn=ui_create_kb_image_fn, inputs=[save_kb_password_tb, save_kb_include_cbg], outputs=[kb_image_display_output, kb_image_download_output, load_status_tb], show_progress="full" ) load_master_btn.click( fn=ui_load_from_sources_fn, inputs=[upload_kb_img_fobj, upload_rules_fobj, upload_mems_fobj, load_kb_password_tb], outputs=[load_status_tb], show_progress="full" ).then( fn=ui_refresh_rules_display_fn, outputs=rules_disp_ta ).then( fn=ui_refresh_memories_display_fn, outputs=mems_disp_json ) if MEMORY_STORAGE_BACKEND == "RAM" and 'save_faiss_sidebar_btn' in locals(): def save_faiss_action_with_feedback_sidebar_fn(): try: save_faiss_indices_to_disk(); gr.Info("Attempted to save FAISS indices to disk.") except Exception as e: logger.error(f"Error saving FAISS indices: {e}", exc_info=True); gr.Error(f"Error saving FAISS indices: {e}") save_faiss_sidebar_btn.click(fn=save_faiss_action_with_feedback_sidebar_fn, inputs=None, outputs=None, show_progress=False) app_load_outputs = [agent_stat_tb, rules_disp_ta, mems_disp_json, research_log_html, fmt_report_tb, dl_report_btn] demo.load(fn=app_load_fn, inputs=None, outputs=app_load_outputs, show_progress="full") if __name__ == "__main__": logger.info(f"Starting Gradio AI Research Mega Agent (v9.1 - Correct 1-Click JS Download, Memory: {MEMORY_STORAGE_BACKEND})...") app_port = int(os.getenv("GRADIO_PORT", 7860)) app_server = os.getenv("GRADIO_SERVER_NAME", "127.0.0.1") app_debug = os.getenv("GRADIO_DEBUG", "False").lower() == "false" app_share = os.getenv("GRADIO_SHARE", "False").lower() == "true" logger.info(f"Launching Gradio server: http://{app_server}:{app_port}. Debug: {app_debug}, Share: {app_share}") demo.queue().launch(server_name=app_server, server_port=app_port, debug=app_debug, share=app_share, mcp_server=True, max_threads=40) logger.info("Gradio application shut down.")