Spaces:
Sleeping
Sleeping
File size: 5,450 Bytes
d0c79e9 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 |
import logging
from abc import ABC
from datetime import datetime
from typing import Dict, Any, Union, List
from pydantic import Field
from aworld.config import AgentConfig
from aworld.core.common import Observation, ActionModel
from aworld.output import MessageOutput, WorkSpace, ArtifactType, SearchOutput
from examples.debate.agent.base import DebateSpeech
from examples.debate.agent.prompts import user_assignment_system_prompt, summary_system_prompt, summary_debate_prompt
from examples.debate.agent.stream_output_agent import StreamOutputAgent
def truncate_content(raw_content, char_limit):
if raw_content is None:
raw_content = ''
if len(raw_content) > char_limit:
raw_content = raw_content[:char_limit] + "... [truncated]"
return raw_content
class ModeratorAgent(StreamOutputAgent, ABC):
stance: str = "moderator"
topic: str = Field(default=None)
affirmative_opinion: str = Field(default=None)
negative_opinion: str = Field(default=None)
def __init__(self, conf: AgentConfig,
**kwargs
):
super().__init__(conf)
async def async_policy(self, observation: Observation, info: Dict[str, Any] = {}, **kwargs) -> Union[
List[ActionModel], None]:
## step 1: params
topic = observation.content
## step2: gen opinions
output = await self.gen_opinions(topic)
## step3: gen speech
moderator_speech = DebateSpeech.from_dict({
"content": "",
"round": 0,
"type": "speech",
"stance": "moderator",
"name": self.name(),
})
async def after_speech_call(message_output_response):
logging.info("moderator: after_speech_call")
opinions = message_output_response
self.affirmative_opinion = opinions.get("positive_opinion")
self.negative_opinion = opinions.get("negative_opinion")
moderator_speech.metadata = {
"topic": topic,
"affirmative_opinion": self.affirmative_opinion,
"negative_opinion": self.negative_opinion,
}
moderator_speech.finished = True
await moderator_speech.convert_to_parts(output, after_speech_call)
action = ActionModel(
policy_info=moderator_speech
)
return [action]
async def gen_opinions(self, topic) -> MessageOutput:
current_time = datetime.now().strftime("%Y-%m-%d-%H")
human_prompt = self.agent_prompt.format(topic=topic,
current_time=current_time,
)
messages = [
{"role": "system", "content": user_assignment_system_prompt},
{"role": "user", "content": human_prompt}
]
output = await self.async_call_llm(messages, json_parse=True)
return output
async def summary_speech(self) -> DebateSpeech:
chat_history = await self.get_formated_history()
print(f"chat_history is \n {chat_history}")
search_results_content_history = await self.get_formated_search_results_content_history()
print(f"search_results_content_history is \n {search_results_content_history}")
human_prompt = summary_debate_prompt.format(topic=self.topic,
opinion=self.affirmative_opinion,
oppose_opinion=self.negative_opinion,
chat_history=chat_history,
search_results_content_history=search_results_content_history
)
messages = [
{"role": "system", "content": summary_system_prompt},
{"role": "user", "content": human_prompt}
]
output = await self.async_call_llm(messages, json_parse=False)
moderator_speech = DebateSpeech.from_dict({
"content": "",
"round": 0,
"type": "summary",
"stance": "moderator",
"name": self.name(),
})
async def after_speech_call(message_output_response):
moderator_speech.finished = True
await moderator_speech.convert_to_parts(output, after_speech_call)
return moderator_speech
async def get_formated_history(self):
formated = []
for item in self.memory.get_all():
formated.append(f"{item.metadata['speaker']} (round {item.metadata['round']}): {item.content}")
return "\n".join(formated)
async def get_formated_search_results_content_history(self):
if not self.workspace:
return
search_results = self.workspace.list_artifacts(ArtifactType.WEB_PAGES)
materials = []
for search_result in search_results:
if isinstance(search_result.content, SearchOutput):
for item in search_result.content.results:
materials.append(
f"{search_result.metadata['user']} (round {search_result.metadata['round']}): {search_result.content.query}: url: {item.url}, title: {item.title}, description: {item.content}")
return "\n".join(materials)
def set_workspace(self, workspace: WorkSpace):
self.workspace = workspace
|