Spaces:
Sleeping
Sleeping
import logging | |
from abc import ABC | |
from datetime import datetime | |
from typing import Dict, Any, Union, List | |
from pydantic import Field | |
from aworld.config import AgentConfig | |
from aworld.core.common import Observation, ActionModel | |
from aworld.output import MessageOutput, WorkSpace, ArtifactType, SearchOutput | |
from examples.debate.agent.base import DebateSpeech | |
from examples.debate.agent.prompts import user_assignment_system_prompt, summary_system_prompt, summary_debate_prompt | |
from examples.debate.agent.stream_output_agent import StreamOutputAgent | |
def truncate_content(raw_content, char_limit): | |
if raw_content is None: | |
raw_content = '' | |
if len(raw_content) > char_limit: | |
raw_content = raw_content[:char_limit] + "... [truncated]" | |
return raw_content | |
class ModeratorAgent(StreamOutputAgent, ABC): | |
stance: str = "moderator" | |
topic: str = Field(default=None) | |
affirmative_opinion: str = Field(default=None) | |
negative_opinion: str = Field(default=None) | |
def __init__(self, conf: AgentConfig, | |
**kwargs | |
): | |
super().__init__(conf) | |
async def async_policy(self, observation: Observation, info: Dict[str, Any] = {}, **kwargs) -> Union[ | |
List[ActionModel], None]: | |
## step 1: params | |
topic = observation.content | |
## step2: gen opinions | |
output = await self.gen_opinions(topic) | |
## step3: gen speech | |
moderator_speech = DebateSpeech.from_dict({ | |
"content": "", | |
"round": 0, | |
"type": "speech", | |
"stance": "moderator", | |
"name": self.name(), | |
}) | |
async def after_speech_call(message_output_response): | |
logging.info("moderator: after_speech_call") | |
opinions = message_output_response | |
self.affirmative_opinion = opinions.get("positive_opinion") | |
self.negative_opinion = opinions.get("negative_opinion") | |
moderator_speech.metadata = { | |
"topic": topic, | |
"affirmative_opinion": self.affirmative_opinion, | |
"negative_opinion": self.negative_opinion, | |
} | |
moderator_speech.finished = True | |
await moderator_speech.convert_to_parts(output, after_speech_call) | |
action = ActionModel( | |
policy_info=moderator_speech | |
) | |
return [action] | |
async def gen_opinions(self, topic) -> MessageOutput: | |
current_time = datetime.now().strftime("%Y-%m-%d-%H") | |
human_prompt = self.agent_prompt.format(topic=topic, | |
current_time=current_time, | |
) | |
messages = [ | |
{"role": "system", "content": user_assignment_system_prompt}, | |
{"role": "user", "content": human_prompt} | |
] | |
output = await self.async_call_llm(messages, json_parse=True) | |
return output | |
async def summary_speech(self) -> DebateSpeech: | |
chat_history = await self.get_formated_history() | |
print(f"chat_history is \n {chat_history}") | |
search_results_content_history = await self.get_formated_search_results_content_history() | |
print(f"search_results_content_history is \n {search_results_content_history}") | |
human_prompt = summary_debate_prompt.format(topic=self.topic, | |
opinion=self.affirmative_opinion, | |
oppose_opinion=self.negative_opinion, | |
chat_history=chat_history, | |
search_results_content_history=search_results_content_history | |
) | |
messages = [ | |
{"role": "system", "content": summary_system_prompt}, | |
{"role": "user", "content": human_prompt} | |
] | |
output = await self.async_call_llm(messages, json_parse=False) | |
moderator_speech = DebateSpeech.from_dict({ | |
"content": "", | |
"round": 0, | |
"type": "summary", | |
"stance": "moderator", | |
"name": self.name(), | |
}) | |
async def after_speech_call(message_output_response): | |
moderator_speech.finished = True | |
await moderator_speech.convert_to_parts(output, after_speech_call) | |
return moderator_speech | |
async def get_formated_history(self): | |
formated = [] | |
for item in self.memory.get_all(): | |
formated.append(f"{item.metadata['speaker']} (round {item.metadata['round']}): {item.content}") | |
return "\n".join(formated) | |
async def get_formated_search_results_content_history(self): | |
if not self.workspace: | |
return | |
search_results = self.workspace.list_artifacts(ArtifactType.WEB_PAGES) | |
materials = [] | |
for search_result in search_results: | |
if isinstance(search_result.content, SearchOutput): | |
for item in search_result.content.results: | |
materials.append( | |
f"{search_result.metadata['user']} (round {search_result.metadata['round']}): {search_result.content.query}: url: {item.url}, title: {item.title}, description: {item.content}") | |
return "\n".join(materials) | |
def set_workspace(self, workspace: WorkSpace): | |
self.workspace = workspace | |