Research Log will appear here.
") final_report_tb = gr.Textbox(value="*Waiting...*", interactive=True, show_copy_button=True) dl_report_btn = gr.DownloadButton(interactive=False, value=None, visible=False) if not user_msg_txt.strip(): status_txt = "Error: Empty message." updated_gr_hist.append((user_msg_txt or "(Empty)", status_txt)) yield (cleared_input, updated_gr_hist, status_txt, log_html_output, final_report_tb, dl_report_btn, updated_rules_text, updated_mems_json) return updated_gr_hist.append((user_msg_txt, "Thinking... See Research Log below for progress.")) yield (cleared_input, updated_gr_hist, status_txt, log_html_output, final_report_tb, dl_report_btn, updated_rules_text, updated_mems_json) internal_hist = list(current_chat_session_history) final_bot_resp_acc = "" temp_dl_file_path = None try: processor_gen = process_user_interaction_gradio( user_input=user_msg_txt, max_research_steps=max_research_steps, provider_name=sel_prov_name, model_display_name=sel_model_disp_name, chat_history=internal_hist, custom_system_prompt=cust_sys_prompt.strip() or None, ui_api_key_override=ui_api_key.strip() if ui_api_key else None ) curr_bot_disp_msg = "" full_plan = [] log_html_parts = [] for upd_type, upd_data in processor_gen: if upd_type == "status": status_txt = upd_data if "Deciding" in status_txt or "Executing" in status_txt: log_html_output = gr.HTML(f"{status_txt}
") elif upd_type == "plan": full_plan = upd_data log_html_parts = ["Error processing request.
') current_rules_text_on_error = ui_refresh_rules_display_fn() current_mems_json_on_error = ui_refresh_memories_display_fn() yield (cleared_input, updated_gr_hist, status_txt, log_html_output, final_report_tb, dl_report_btn, current_rules_text_on_error, current_mems_json_on_error) if temp_dl_file_path and os.path.exists(temp_dl_file_path): try: os.unlink(temp_dl_file_path) except Exception as e_unlink: logger.error(f"Error deleting temp download file {temp_dl_file_path} after error: {e_unlink}") return if final_bot_resp_acc and not final_bot_resp_acc.startswith("Error:"): current_chat_session_history.extend([{"role": "user", "content": user_msg_txt}, {"role": "assistant", "content": final_bot_resp_acc}]) status_txt = "[Performing post-interaction learning...]" current_rules_text_before_learn = ui_refresh_rules_display_fn() current_mems_json_before_learn = ui_refresh_memories_display_fn() yield (cleared_input, updated_gr_hist, status_txt, log_html_output, final_report_tb, dl_report_btn, current_rules_text_before_learn, current_mems_json_before_learn) try: perform_post_interaction_learning( user_input=user_msg_txt, bot_response=final_bot_resp_acc, provider=sel_prov_name, model_disp_name=sel_model_disp_name, api_key_override=ui_api_key.strip() if ui_api_key else None ) status_txt = "Response & Learning Complete." except Exception as e_learn: logger.error(f"Error during post-interaction learning: {e_learn}", exc_info=True) status_txt = "Response complete. Error during learning." else: status_txt = "Processing finished; no valid response or error occurred." updated_rules_text = ui_refresh_rules_display_fn() updated_mems_json = ui_refresh_memories_display_fn() yield (cleared_input, updated_gr_hist, status_txt, log_html_output, final_report_tb, dl_report_btn, updated_rules_text, updated_mems_json) if temp_dl_file_path and os.path.exists(temp_dl_file_path): try: os.unlink(temp_dl_file_path) except Exception as e_unlink: logger.error(f"Error deleting temp download file {temp_dl_file_path}: {e_unlink}") def ui_refresh_rules_display_fn(): return "\n\n---\n\n".join(get_all_rules_cached()) or "No rules found." def ui_refresh_memories_display_fn(): return get_all_memories_cached() or [] def ui_download_rules_action_fn(): rules_content = "\n\n---\n\n".join(get_all_rules_cached()) if not rules_content.strip(): gr.Warning("No rules to download.") return gr.DownloadButton(value=None, interactive=False, label="No Rules") try: with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".txt", encoding='utf-8') as tmpfile: tmpfile.write(rules_content) return tmpfile.name except Exception as e: logger.error(f"Error creating rules download file: {e}") gr.Error(f"Failed to prepare rules for download: {e}") return gr.DownloadButton(value=None, interactive=False, label="Error") def ui_upload_rules_action_fn(uploaded_file_obj, progress=gr.Progress()): if not uploaded_file_obj: return "No file provided for rules upload." try: with open(uploaded_file_obj.name, 'r', encoding='utf-8') as f: content = f.read() except Exception as e_read: return f"Error reading file: {e_read}" if not content.strip(): return "Uploaded rules file is empty." added_count, skipped_count, error_count = 0,0,0 potential_rules = [] file_name_lower = uploaded_file_obj.name.lower() if file_name_lower.endswith(".txt"): potential_rules = content.split("\n\n---\n\n") if len(potential_rules) == 1 and "\n" in content: potential_rules = [r.strip() for r in content.splitlines() if r.strip()] elif file_name_lower.endswith(".jsonl"): for line_num, line in enumerate(content.splitlines()): line = line.strip() if line: try: rule_text_in_json_string = json.loads(line) if isinstance(rule_text_in_json_string, str): potential_rules.append(rule_text_in_json_string) else: logger.warning(f"Rule Upload (JSONL): Line {line_num+1} did not contain a string value. Got: {type(rule_text_in_json_string)}") error_count +=1 except json.JSONDecodeError: logger.warning(f"Rule Upload (JSONL): Line {line_num+1} failed to parse as JSON: {line[:100]}") error_count +=1 else: return "Unsupported file type for rules. Please use .txt or .jsonl." valid_potential_rules = [r.strip() for r in potential_rules if r.strip()] total_to_process = len(valid_potential_rules) if total_to_process == 0 and error_count == 0: return "No valid rules found in file to process." elif total_to_process == 0 and error_count > 0: return f"No valid rules found to process. Encountered {error_count} parsing/format errors." progress(0, desc="Starting rules upload...") for idx, rule_text in enumerate(valid_potential_rules): success, status_msg = add_rule_entry(rule_text) if success: added_count += 1 elif status_msg == "duplicate": skipped_count += 1 else: error_count += 1 progress((idx + 1) / total_to_process, desc=f"Processed {idx+1}/{total_to_process} rules...") msg = f"Rules Upload: Total valid rule segments processed: {total_to_process}. Added: {added_count}, Skipped (duplicates): {skipped_count}, Errors (parsing/add): {error_count}." logger.info(msg); return msg def ui_download_memories_action_fn(): memories = get_all_memories_cached() if not memories: gr.Warning("No memories to download.") return gr.DownloadButton(value=None, interactive=False, label="No Memories") jsonl_content = "" for mem_dict in memories: try: jsonl_content += json.dumps(mem_dict) + "\n" except Exception as e: logger.error(f"Error serializing memory for download: {mem_dict}, Error: {e}") if not jsonl_content.strip(): gr.Warning("No valid memories to serialize for download.") return gr.DownloadButton(value=None, interactive=False, label="No Data") try: with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".jsonl", encoding='utf-8') as tmpfile: tmpfile.write(jsonl_content) return tmpfile.name except Exception as e: logger.error(f"Error creating memories download file: {e}") gr.Error(f"Failed to prepare memories for download: {e}") return gr.DownloadButton(value=None, interactive=False, label="Error") def ui_upload_memories_action_fn(uploaded_file_obj, progress=gr.Progress()): if not uploaded_file_obj: return "No file provided for memories upload." try: with open(uploaded_file_obj.name, 'r', encoding='utf-8') as f: content = f.read() except Exception as e_read: return f"Error reading file: {e_read}" if not content.strip(): return "Uploaded memories file is empty." added_count, format_error_count, save_error_count = 0,0,0 memory_objects_to_process = [] file_ext = os.path.splitext(uploaded_file_obj.name.lower())[1] if file_ext == ".json": try: parsed_json = json.loads(content) if isinstance(parsed_json, list): memory_objects_to_process = parsed_json elif isinstance(parsed_json, dict): memory_objects_to_process = [parsed_json] else: logger.warning(f"Memories Upload (.json): File content is not a JSON list or object. Type: {type(parsed_json)}"); format_error_count = 1 except json.JSONDecodeError as e: logger.warning(f"Memories Upload (.json): Invalid JSON file. Error: {e}"); format_error_count = 1 elif file_ext == ".jsonl": for line_num, line in enumerate(content.splitlines()): line = line.strip() if line: try: memory_objects_to_process.append(json.loads(line)) except json.JSONDecodeError: logger.warning(f"Memories Upload (.jsonl): Line {line_num+1} parse error: {line[:100]}"); format_error_count += 1 else: return "Unsupported file type for memories. Please use .json or .jsonl." if not memory_objects_to_process and format_error_count > 0 : return f"Memories Upload: File parsing failed. Found {format_error_count} format errors and no processable objects." elif not memory_objects_to_process: return "No valid memory objects found in the uploaded file." total_to_process = len(memory_objects_to_process) if total_to_process == 0: return "No memory objects to process (after parsing)." progress(0, desc="Starting memories upload...") for idx, mem_data in enumerate(memory_objects_to_process): if isinstance(mem_data, dict) and all(k in mem_data for k in ["user_input", "bot_response", "metrics"]): success, _ = add_memory_entry(mem_data["user_input"], mem_data["metrics"], mem_data["bot_response"]) if success: added_count += 1 else: save_error_count += 1 else: logger.warning(f"Memories Upload: Skipped invalid memory object structure: {str(mem_data)[:100]}"); format_error_count += 1 progress((idx + 1) / total_to_process, desc=f"Processed {idx+1}/{total_to_process} memories...") msg = f"Memories Upload: Processed {total_to_process} objects. Added: {added_count}, Format/Structure Errors: {format_error_count}, Save Errors: {save_error_count}." logger.info(msg); return msg def save_edited_rules_action_fn(edited_rules_text: str, progress=gr.Progress()): if DEMO_MODE: gr.Warning("Saving edited rules is disabled in Demo Mode.") return "Saving edited rules is disabled in Demo Mode." if not edited_rules_text.strip(): return "No rules text to save." stats = process_rules_from_text_blob(edited_rules_text, progress) return f"Editor Save: Added: {stats['added']}, Skipped (duplicates): {stats['skipped']}, Errors/Invalid: {stats['errors']} from {stats['total']} unique rules in text." def ui_upload_kb_from_image_fn(uploaded_image_filepath: str, password: str, progress=gr.Progress()): if DEMO_MODE: gr.Warning("Uploading is disabled in Demo Mode.") return "Upload disabled in Demo Mode." if not uploaded_image_filepath: return "No image file provided or pasted." progress(0, desc="Loading and standardizing image...") try: img_temp = Image.open(uploaded_image_filepath) img = set_pil_image_format_to_png(img_temp) except Exception as e: logger.error(f"KB ImgUL: Open/Standardize fail: {e}") return f"Error: Could not open or process image file: {e}" progress(0.2, desc="Extracting data from image...") try: extracted_bytes = extract_data_from_image(img) if not extracted_bytes: return "No data found embedded in the image." except ValueError as e: logger.error(f"KB ImgUL: Extract fail: {e}") return f"Error extracting data: {e}" except Exception as e: logger.error(f"KB ImgUL: Extract error: {e}", exc_info=True) return f"Unexpected extraction error: {e}" kv_string = "" try: if extracted_bytes[:20].decode('utf-8', errors='ignore').strip().startswith("# iLearn"): kv_string = extracted_bytes.decode('utf-8') progress(0.4, desc="Parsing data...") elif password and password.strip(): progress(0.3, desc="Attempting decryption...") kv_string = decrypt_data(extracted_bytes, password.strip()).decode('utf-8') progress(0.4, desc="Parsing decrypted data...") else: return "Data appears encrypted, but no password was provided." except (UnicodeDecodeError, InvalidTag, ValueError) as e: if "decryption" in str(e).lower() or isinstance(e, InvalidTag): return f"Decryption Failed. Check password or file integrity. Details: {e}" return "Data is binary and requires a password for decryption." except Exception as e: logger.error(f"KB ImgUL: Decrypt/Parse error: {e}", exc_info=True) return f"Unexpected error during decryption or parsing: {e}" if not kv_string: return "Could not get data from image (after potential decryption)." try: kv_dict = parse_kv_string_to_dict(kv_string) except Exception as e: logger.error(f"KB ImgUL: Parse fail: {e}") return f"Error parsing data: {e}" if not kv_dict: return "Parsed data is empty." stats = import_kb_from_kv_dict(kv_dict, progress) msg = f"Upload Complete. Rules - Add: {stats['rules_added']}, Skip: {stats['rules_skipped']}, Err: {stats['rules_errors']}. Mems - Add: {stats['mems_added']}, Err: {stats['mems_errors']}." logger.info(f"Image KB Upload: {msg}") return msg def app_load_fn(): logger.info("App loading. Initializing systems...") initialize_memory_system() logger.info("Memory system initialized.") rules_added, rules_skipped, rules_errors = load_rules_from_file(LOAD_RULES_FILE) rules_load_msg = f"Rules: Added {rules_added}, Skipped {rules_skipped}, Errors {rules_errors} from {LOAD_RULES_FILE or 'None'}." logger.info(rules_load_msg) mems_added, mems_format_errors, mems_save_errors = load_memories_from_file(LOAD_MEMORIES_FILE) mems_load_msg = f"Memories: Added {mems_added}, Format Errors {mems_format_errors}, Save Errors {mems_save_errors} from {LOAD_MEMORIES_FILE or 'None'}." logger.info(mems_load_msg) final_status = f"AI Systems Initialized. {rules_load_msg} {mems_load_msg} Ready." rules_on_load, mems_on_load = ui_refresh_rules_display_fn(), ui_refresh_memories_display_fn() return (final_status, rules_on_load, mems_on_load, gr.HTML("Research Log will appear here.
"), gr.Textbox(value="*Waiting...*", interactive=True, show_copy_button=True), gr.DownloadButton(interactive=False, value=None, visible=False)) placeholder_filename = "placeholder_image.png" try: if not os.path.exists(placeholder_filename): img = Image.new('RGB', (200, 100), color='darkblue') draw = Image.Draw(img) try: font = _get_font(PREFERRED_FONTS, 14) draw.text((10, 45), "Placeholder KB Image", font=font, fill='white') except Exception: draw.text((10, 45), "Placeholder", fill='white') img.save(placeholder_filename) logger.info(f"Created '{placeholder_filename}' for Gradio examples.") except Exception as e: logger.error(f"Could not create placeholder image. The examples may not load correctly. Error: {e}") def ui_create_kb_image_fn(password: str, content_to_include: list, progress=gr.Progress()): include_rules = "Include Rules" in content_to_include include_memories = "Include Memories" in content_to_include if not include_rules and not include_memories: gr.Warning("Nothing selected to save.") return gr.update(value=None, visible=False), gr.update(value=None, visible=False), "Nothing selected to save." progress(0.1, desc="Fetching knowledge base...") rules = get_all_rules_cached() if include_rules else [] memories = get_all_memories_cached() if include_memories else [] if not rules and not memories: gr.Warning("Knowledge base is empty or selected content is empty.") return gr.update(value=None, visible=False), gr.update(value=None, visible=False), "No content to save." progress(0.2, desc="Serializing data...") kv_string = convert_kb_to_kv_string(rules, memories, include_rules, include_memories) data_bytes = kv_string.encode('utf-8') if password and password.strip(): progress(0.3, desc="Encrypting data...") try: data_bytes = encrypt_data(data_bytes, password.strip()) except Exception as e: logger.error(f"KB ImgDL: Encrypt failed: {e}") return gr.update(value=None, visible=False), gr.update(value=None, visible=False), f"Error: {e}" progress(0.5, desc="Generating carrier image...") carrier_image = generate_brain_carrier_image(w=800, h=800) progress(0.6, desc="Adding visual overlay...") keys_for_overlay = [] if include_rules: keys_for_overlay.append(f"Rule Count: {len(rules)}") if include_memories: keys_for_overlay.append(f"Memory Count: {len(memories)}") title_overlay = "Encrypted Knowledge Base" if password and password.strip() else "iLearn Knowledge Base" image_with_overlay = draw_key_list_dropdown_overlay(carrier_image, keys=keys_for_overlay, title=title_overlay) try: progress(0.8, desc="Embedding data into final image...") final_image_with_data = embed_data_in_image(image_with_overlay, data_bytes) except ValueError as e: logger.error(f"KB ImgDL: Embed failed: {e}") return gr.update(value=None, visible=False), gr.update(value=None, visible=False), f"Error: {e}" progress(0.9, desc="Preparing final image and download file...") try: with tempfile.NamedTemporaryFile(delete=False, suffix=".png") as tmpfile: final_image_with_data.save(tmpfile, format="PNG") tmp_path = tmpfile.name progress(1.0, desc="Image created!") return gr.update(value=tmp_path, visible=True), gr.update(value=tmp_path, visible=True), "Success! Image created." except Exception as e: logger.error(f"KB ImgDL: Save failed: {e}") return gr.update(value=None, visible=False), gr.update(value=None, visible=False), f"Error: {e}" def ui_load_from_sources_fn(image_filepath: str, rules_file_obj: object, mems_file_obj: object, password: str, progress=gr.Progress()): if image_filepath: progress(0.1, desc="Image source detected. Starting image processing...") return ui_upload_kb_from_image_fn(image_filepath, password, progress) if rules_file_obj: progress(0.1, desc="Rules file detected. Starting rules import...") return ui_upload_rules_action_fn(rules_file_obj, progress) if mems_file_obj: progress(0.1, desc="Memories file detected. Starting memories import...") return ui_upload_memories_action_fn(mems_file_obj, progress) return "No file or image uploaded. Please provide a source file to load." with gr.Blocks(theme=gr.themes.Soft(), css=".gr-button { margin: 5px; } .gr-textbox, .gr-text-area, .gr-dropdown, .gr-json { border-radius: 8px; } .gr-group { border: 1px solid #e0e0e0; border-radius: 8px; padding: 10px; } .gr-row { gap: 10px; } .gr-tab { border-radius: 8px; } .status-text { font-size: 0.9em; color: #555; } .gr-json { max-height: 400px; overflow-y: auto; }") as demo: gr.Markdown(f"# 🤖 iLearn: An Autonomous Learning Agent {'(DEMO MODE)' if DEMO_MODE else ''}", elem_classes=["header"]) is_sqlite, is_hf_dataset = (MEMORY_STORAGE_BACKEND == "SQLITE"), (MEMORY_STORAGE_BACKEND == "HF_DATASET") with gr.Row(variant="compact"): agent_stat_tb = gr.Textbox(label="Agent Status", value="Initializing systems...", interactive=False, elem_classes=["status-text"], scale=4) with gr.Column(scale=1, min_width=150): memory_backend_info_tb = gr.Textbox(label="Memory Backend", value=MEMORY_STORAGE_BACKEND, interactive=False, elem_classes=["status-text"]) sqlite_path_display = gr.Textbox(label="SQLite Path", value=MEMORY_SQLITE_PATH, interactive=False, visible=is_sqlite, elem_classes=["status-text"]) hf_repos_display = gr.Textbox(label="HF Repos", value=f"M: {MEMORY_HF_MEM_REPO}, R: {MEMORY_HF_RULES_REPO}", interactive=False, visible=is_hf_dataset, elem_classes=["status-text"]) with gr.Sidebar(): gr.Markdown("## ⚙️ Configuration") with gr.Group(): gr.Markdown("### AI Model Settings") api_key_tb = gr.Textbox(label="AI Provider API Key (Override)", type="password", placeholder="Uses .env if blank") available_providers = get_available_providers(); default_provider = available_providers[0] if "groq" not in available_providers else "groq" prov_sel_dd = gr.Dropdown(label="AI Provider", choices=available_providers, value=default_provider, interactive=True) default_model_display = get_default_model_display_name_for_provider(default_provider) if default_provider else None model_sel_dd = gr.Dropdown(label="AI Model", choices=get_model_display_names_for_provider(default_provider) if default_provider else [], value=default_model_display, interactive=True) research_steps_slider = gr.Slider(label="Max Research Steps", minimum=1, maximum=10, step=1, value=3, interactive=True) with gr.Group(): gr.Markdown("### System Prompt"); sys_prompt_tb = gr.Textbox(label="System Prompt Base", lines=8, value=DEFAULT_SYSTEM_PROMPT, interactive=True) with gr.Tabs(): with gr.TabItem("💬 Chat & Research"): with gr.Row(): with gr.Column(scale=3): gr.Markdown("### AI Chat Interface") main_chat_disp = gr.Chatbot(label=None, height=450, bubble_full_width=False,avatar_images=(None, "https://raw.githubusercontent.com/gradio-app/gradio/main/guides/assets/logo.png"), show_copy_button=True, render_markdown=True, sanitize_html=True) with gr.Row(variant="compact"): user_msg_tb = gr.Textbox(show_label=False, placeholder="Ask your research question...", scale=7, lines=1, max_lines=3) send_btn = gr.Button("Send", variant="primary", scale=1, min_width=100) with gr.Accordion("📝 Detailed Response & Research Log", open=True): research_log_html = gr.HTML(label="Research Log", value="Waiting for a new task to begin...