import logging from abc import ABC from datetime import datetime from typing import Dict, Any, Union, List from pydantic import Field from aworld.config import AgentConfig from aworld.core.common import Observation, ActionModel from aworld.output import MessageOutput, WorkSpace, ArtifactType, SearchOutput from examples.debate.agent.base import DebateSpeech from examples.debate.agent.prompts import user_assignment_system_prompt, summary_system_prompt, summary_debate_prompt from examples.debate.agent.stream_output_agent import StreamOutputAgent def truncate_content(raw_content, char_limit): if raw_content is None: raw_content = '' if len(raw_content) > char_limit: raw_content = raw_content[:char_limit] + "... [truncated]" return raw_content class ModeratorAgent(StreamOutputAgent, ABC): stance: str = "moderator" topic: str = Field(default=None) affirmative_opinion: str = Field(default=None) negative_opinion: str = Field(default=None) def __init__(self, conf: AgentConfig, **kwargs ): super().__init__(conf) async def async_policy(self, observation: Observation, info: Dict[str, Any] = {}, **kwargs) -> Union[ List[ActionModel], None]: ## step 1: params topic = observation.content ## step2: gen opinions output = await self.gen_opinions(topic) ## step3: gen speech moderator_speech = DebateSpeech.from_dict({ "content": "", "round": 0, "type": "speech", "stance": "moderator", "name": self.name(), }) async def after_speech_call(message_output_response): logging.info("moderator: after_speech_call") opinions = message_output_response self.affirmative_opinion = opinions.get("positive_opinion") self.negative_opinion = opinions.get("negative_opinion") moderator_speech.metadata = { "topic": topic, "affirmative_opinion": self.affirmative_opinion, "negative_opinion": self.negative_opinion, } moderator_speech.finished = True await moderator_speech.convert_to_parts(output, after_speech_call) action = ActionModel( policy_info=moderator_speech ) return [action] async def gen_opinions(self, topic) -> MessageOutput: current_time = datetime.now().strftime("%Y-%m-%d-%H") human_prompt = self.agent_prompt.format(topic=topic, current_time=current_time, ) messages = [ {"role": "system", "content": user_assignment_system_prompt}, {"role": "user", "content": human_prompt} ] output = await self.async_call_llm(messages, json_parse=True) return output async def summary_speech(self) -> DebateSpeech: chat_history = await self.get_formated_history() print(f"chat_history is \n {chat_history}") search_results_content_history = await self.get_formated_search_results_content_history() print(f"search_results_content_history is \n {search_results_content_history}") human_prompt = summary_debate_prompt.format(topic=self.topic, opinion=self.affirmative_opinion, oppose_opinion=self.negative_opinion, chat_history=chat_history, search_results_content_history=search_results_content_history ) messages = [ {"role": "system", "content": summary_system_prompt}, {"role": "user", "content": human_prompt} ] output = await self.async_call_llm(messages, json_parse=False) moderator_speech = DebateSpeech.from_dict({ "content": "", "round": 0, "type": "summary", "stance": "moderator", "name": self.name(), }) async def after_speech_call(message_output_response): moderator_speech.finished = True await moderator_speech.convert_to_parts(output, after_speech_call) return moderator_speech async def get_formated_history(self): formated = [] for item in self.memory.get_all(): formated.append(f"{item.metadata['speaker']} (round {item.metadata['round']}): {item.content}") return "\n".join(formated) async def get_formated_search_results_content_history(self): if not self.workspace: return search_results = self.workspace.list_artifacts(ArtifactType.WEB_PAGES) materials = [] for search_result in search_results: if isinstance(search_result.content, SearchOutput): for item in search_result.content.results: materials.append( f"{search_result.metadata['user']} (round {search_result.metadata['round']}): {search_result.content.query}: url: {item.url}, title: {item.title}, description: {item.content}") return "\n".join(materials) def set_workspace(self, workspace: WorkSpace): self.workspace = workspace