import logging from abc import ABC from typing import Dict, Any, Union, List, Literal, Optional from datetime import datetime import uuid from aworld.models.model_response import ToolCall from examples.debate.agent.base import DebateSpeech from examples.debate.agent.prompts import user_assignment_prompt, user_assignment_system_prompt, affirmative_few_shots, \ negative_few_shots, \ user_debate_prompt from examples.debate.agent.search.search_engine import SearchEngine from examples.debate.agent.search.tavily_search_engine import TavilySearchEngine from examples.debate.agent.stream_output_agent import StreamOutputAgent from aworld.config import AgentConfig from aworld.core.common import Observation, ActionModel from aworld.output import SearchOutput, SearchItem, MessageOutput from aworld.output.artifact import ArtifactType def truncate_content(raw_content, char_limit): if raw_content is None: raw_content = '' if len(raw_content) > char_limit: raw_content = raw_content[:char_limit] + "... [truncated]" return raw_content class DebateAgent(StreamOutputAgent, ABC): stance: Literal["affirmative", "negative"] def __init__(self, name: str, stance: Literal["affirmative", "negative"], conf: AgentConfig, search_engine: Optional[SearchEngine] = TavilySearchEngine()): conf.name = name super().__init__(conf) self.steps = 0 self.stance = stance self.search_engine = search_engine async def speech(self, topic: str, opinion: str,oppose_opinion: str, round: int, speech_history: list[DebateSpeech]) -> DebateSpeech: observation = Observation(content=self.get_latest_speech(speech_history).content if self.get_latest_speech(speech_history) else "") info = { "topic": topic, "round": round, "opinion": opinion, "oppose_opinion": oppose_opinion, "history": speech_history } actions = await self.async_policy(observation, info) return actions[0].policy_info async def async_policy(self, observation: Observation, info: Dict[str, Any] = {}, **kwargs) -> Union[ List[ActionModel], None]: ## step 1: params opponent_claim = observation.content round = info["round"] opinion = info["opinion"] oppose_opinion = info["oppose_opinion"] topic = info["topic"] history: list[DebateSpeech] = info["history"] #Event.emit("xxx") ## step2: gen keywords keywords = await self.gen_keywords(topic, opinion, oppose_opinion, opponent_claim, history) logging.info(f"gen keywords = {keywords}") ## step3:search_webpages search_results = await self.search_webpages(keywords, max_results=5) for search_result in search_results: logging.info(f"keyword#{search_result['query']}-> result size is {len(search_result['results'])}") search_item = { "query": search_result.get("query", ""), "results": [SearchItem(title=result["title"],url=result["url"], content=result['content'], raw_content=result['raw_content'], metadata={}) for result in search_result["results"]], "origin_tool_call": ToolCall.from_dict({ "id": f"call_search", "type": "function", "function": { "name": "search", "arguments": keywords } }) } search_output = SearchOutput.from_dict(search_item) await self.workspace.create_artifact( artifact_type=ArtifactType.WEB_PAGES, artifact_id=str(uuid.uuid4()), content=search_output, metadata={ "query": search_output.query, "user": self.name(), "round": info["round"], "opinion": info["opinion"], "oppose_opinion": info["oppose_opinion"], "topic": info["topic"], "tags": [f"user#{self.name()}",f"Rounds#{info['round']}"] } ) ## step4 gen result user_response = await self.gen_statement(topic, opinion, oppose_opinion, opponent_claim, history, search_results) logging.info(f"user_response is {user_response}") ## step3: gen speech speech = DebateSpeech.from_dict({ "round": round, "type": "speech", "stance": self.stance, "name": self.name(), }) async def after_speech_call(message_output_response): logging.info(f"{self.stance}#{self.name()}: after_speech_call") speech.metadata = {} speech.content = message_output_response speech.finished = True await speech.convert_to_parts(user_response, after_speech_call) action = ActionModel( policy_info=speech ) return [action] async def gen_keywords(self, topic, opinion, oppose_opinion, last_oppose_speech_content, history): current_time = datetime.now().strftime("%Y-%m-%d-%H") human_prompt = user_assignment_prompt.format(topic=topic, opinion=opinion, oppose_opinion=oppose_opinion, last_oppose_speech_content=last_oppose_speech_content, current_time = current_time, limit=2 ) messages = [{'role': 'system', 'content': user_assignment_system_prompt}, {'role': 'user', 'content': human_prompt}] output = await self.async_call_llm(messages) response = await output.get_finished_response() return response.split(",") async def search_webpages(self, keywords, max_results): return await self.search_engine.async_batch_search(queries=keywords, max_results=max_results) async def gen_statement(self, topic, opinion, oppose_opinion, opponent_claim, history, search_results) -> MessageOutput: search_results_content = "" for search_result in search_results: search_results_content += f"SearchQuery: {search_result['query']}" search_results_content += "\n\n".join([truncate_content(s['content'], 1000) for s in search_result['results']]) unique_history = history # if len(history) >= 2: # for i in range(len(history)): # # Check if the current element is the same as the next one # if i == len(history) - 1 or history[i] != history[i+1]: # # Add the current element to the result list # unique_history.append(history[i]) affirmative_chat_history = "" negative_chat_history = "" if len(unique_history) >= 2: if self.stance == "affirmative": for speech in unique_history[:-1]: if speech.stance == "affirmative": affirmative_chat_history = affirmative_chat_history + "You: " + speech.content + "\n" elif speech.stance == "negative": affirmative_chat_history = affirmative_chat_history + "Your Opponent: " + speech.content + "\n" elif self.stance == "negative": for speech in unique_history[:-1]: if speech.stance == "negative": negative_chat_history = negative_chat_history + "You: " + speech.content + "\n" elif speech.stance == "affirmative": negative_chat_history = negative_chat_history + "Your Opponent: " + speech.content + "\n" few_shots = "" chat_history = "" if self.stance == "affirmative": chat_history = affirmative_chat_history few_shots = affirmative_few_shots elif self.stance == "negative": chat_history = negative_chat_history few_shots = negative_few_shots human_prompt = user_debate_prompt.format(topic=topic, opinion=opinion, oppose_opinion=oppose_opinion, last_oppose_speech_content=opponent_claim, search_results_content=search_results_content, chat_history = chat_history, few_shots = few_shots ) messages = [{'role': 'system', 'content': user_assignment_system_prompt}, {'role': 'user', 'content': human_prompt}] return await self.async_call_llm(messages) def get_latest_speech(self, history: list[DebateSpeech]): """ get the latest speech from history """ if len(history) == 0: return None return history[-1] def set_workspace(self, workspace): self.workspace = workspace