Duibonduil's picture
Upload 7 files
d0c79e9 verified
import logging
from abc import ABC
from typing import Dict, Any, Union, List, Literal, Optional
from datetime import datetime
import uuid
from aworld.models.model_response import ToolCall
from examples.debate.agent.base import DebateSpeech
from examples.debate.agent.prompts import user_assignment_prompt, user_assignment_system_prompt, affirmative_few_shots, \
negative_few_shots, \
user_debate_prompt
from examples.debate.agent.search.search_engine import SearchEngine
from examples.debate.agent.search.tavily_search_engine import TavilySearchEngine
from examples.debate.agent.stream_output_agent import StreamOutputAgent
from aworld.config import AgentConfig
from aworld.core.common import Observation, ActionModel
from aworld.output import SearchOutput, SearchItem, MessageOutput
from aworld.output.artifact import ArtifactType
def truncate_content(raw_content, char_limit):
if raw_content is None:
raw_content = ''
if len(raw_content) > char_limit:
raw_content = raw_content[:char_limit] + "... [truncated]"
return raw_content
class DebateAgent(StreamOutputAgent, ABC):
stance: Literal["affirmative", "negative"]
def __init__(self, name: str, stance: Literal["affirmative", "negative"], conf: AgentConfig, search_engine: Optional[SearchEngine] = TavilySearchEngine()):
conf.name = name
super().__init__(conf)
self.steps = 0
self.stance = stance
self.search_engine = search_engine
async def speech(self, topic: str, opinion: str,oppose_opinion: str, round: int, speech_history: list[DebateSpeech]) -> DebateSpeech:
observation = Observation(content=self.get_latest_speech(speech_history).content if self.get_latest_speech(speech_history) else "")
info = {
"topic": topic,
"round": round,
"opinion": opinion,
"oppose_opinion": oppose_opinion,
"history": speech_history
}
actions = await self.async_policy(observation, info)
return actions[0].policy_info
async def async_policy(self, observation: Observation, info: Dict[str, Any] = {}, **kwargs) -> Union[
List[ActionModel], None]:
## step 1: params
opponent_claim = observation.content
round = info["round"]
opinion = info["opinion"]
oppose_opinion = info["oppose_opinion"]
topic = info["topic"]
history: list[DebateSpeech] = info["history"]
#Event.emit("xxx")
## step2: gen keywords
keywords = await self.gen_keywords(topic, opinion, oppose_opinion, opponent_claim, history)
logging.info(f"gen keywords = {keywords}")
## step3:search_webpages
search_results = await self.search_webpages(keywords, max_results=5)
for search_result in search_results:
logging.info(f"keyword#{search_result['query']}-> result size is {len(search_result['results'])}")
search_item = {
"query": search_result.get("query", ""),
"results": [SearchItem(title=result["title"],url=result["url"], content=result['content'], raw_content=result['raw_content'], metadata={}) for result in search_result["results"]],
"origin_tool_call": ToolCall.from_dict({
"id": f"call_search",
"type": "function",
"function": {
"name": "search",
"arguments": keywords
}
})
}
search_output = SearchOutput.from_dict(search_item)
await self.workspace.create_artifact(
artifact_type=ArtifactType.WEB_PAGES,
artifact_id=str(uuid.uuid4()),
content=search_output,
metadata={
"query": search_output.query,
"user": self.name(),
"round": info["round"],
"opinion": info["opinion"],
"oppose_opinion": info["oppose_opinion"],
"topic": info["topic"],
"tags": [f"user#{self.name()}",f"Rounds#{info['round']}"]
}
)
## step4 gen result
user_response = await self.gen_statement(topic, opinion, oppose_opinion, opponent_claim, history, search_results)
logging.info(f"user_response is {user_response}")
## step3: gen speech
speech = DebateSpeech.from_dict({
"round": round,
"type": "speech",
"stance": self.stance,
"name": self.name(),
})
async def after_speech_call(message_output_response):
logging.info(f"{self.stance}#{self.name()}: after_speech_call")
speech.metadata = {}
speech.content = message_output_response
speech.finished = True
await speech.convert_to_parts(user_response, after_speech_call)
action = ActionModel(
policy_info=speech
)
return [action]
async def gen_keywords(self, topic, opinion, oppose_opinion, last_oppose_speech_content, history):
current_time = datetime.now().strftime("%Y-%m-%d-%H")
human_prompt = user_assignment_prompt.format(topic=topic,
opinion=opinion,
oppose_opinion=oppose_opinion,
last_oppose_speech_content=last_oppose_speech_content,
current_time = current_time,
limit=2
)
messages = [{'role': 'system', 'content': user_assignment_system_prompt},
{'role': 'user', 'content': human_prompt}]
output = await self.async_call_llm(messages)
response = await output.get_finished_response()
return response.split(",")
async def search_webpages(self, keywords, max_results):
return await self.search_engine.async_batch_search(queries=keywords, max_results=max_results)
async def gen_statement(self, topic, opinion, oppose_opinion, opponent_claim, history, search_results) -> MessageOutput:
search_results_content = ""
for search_result in search_results:
search_results_content += f"SearchQuery: {search_result['query']}"
search_results_content += "\n\n".join([truncate_content(s['content'], 1000) for s in search_result['results']])
unique_history = history
# if len(history) >= 2:
# for i in range(len(history)):
# # Check if the current element is the same as the next one
# if i == len(history) - 1 or history[i] != history[i+1]:
# # Add the current element to the result list
# unique_history.append(history[i])
affirmative_chat_history = ""
negative_chat_history = ""
if len(unique_history) >= 2:
if self.stance == "affirmative":
for speech in unique_history[:-1]:
if speech.stance == "affirmative":
affirmative_chat_history = affirmative_chat_history + "You: " + speech.content + "\n"
elif speech.stance == "negative":
affirmative_chat_history = affirmative_chat_history + "Your Opponent: " + speech.content + "\n"
elif self.stance == "negative":
for speech in unique_history[:-1]:
if speech.stance == "negative":
negative_chat_history = negative_chat_history + "You: " + speech.content + "\n"
elif speech.stance == "affirmative":
negative_chat_history = negative_chat_history + "Your Opponent: " + speech.content + "\n"
few_shots = ""
chat_history = ""
if self.stance == "affirmative":
chat_history = affirmative_chat_history
few_shots = affirmative_few_shots
elif self.stance == "negative":
chat_history = negative_chat_history
few_shots = negative_few_shots
human_prompt = user_debate_prompt.format(topic=topic,
opinion=opinion,
oppose_opinion=oppose_opinion,
last_oppose_speech_content=opponent_claim,
search_results_content=search_results_content,
chat_history = chat_history,
few_shots = few_shots
)
messages = [{'role': 'system', 'content': user_assignment_system_prompt},
{'role': 'user', 'content': human_prompt}]
return await self.async_call_llm(messages)
def get_latest_speech(self, history: list[DebateSpeech]):
"""
get the latest speech from history
"""
if len(history) == 0:
return None
return history[-1]
def set_workspace(self, workspace):
self.workspace = workspace