Spaces:
Sleeping
Sleeping
career-conversation
/
community_contributions
/elchanio-76
/elchanio_wk1_lab2_llm_parallel_evaluation.py
import json | |
import re | |
import os | |
from concurrent.futures import ThreadPoolExecutor, as_completed | |
# Markdown not necessary if not running in a notebook | |
# from IPython.display import Markdown, display | |
import boto3 | |
from anthropic import Anthropic | |
from botocore import client as botocore_client | |
from dotenv import load_dotenv | |
from openai import OpenAI | |
from collections import defaultdict | |
# This exercise builds upon the week 1 lab 2 of Agentic AI course. | |
# Implementing two patterns: | |
# Agent parallelization with ThreadPoolExecutor and combined LLM as a judge | |
# We are asking all of the models to evaluate the anonymized responses | |
# and average out the rankings. | |
# This can eat up a lot of tokens, so be careful running it multiple times. | |
# I didn't limit the number of tokens on purpose. | |
# Modify the setup_environment() and the models dictionary in main() | |
# to adjust to your taste/environment. | |
def setup_environment(): | |
""" | |
Set up the environment by initializing the Bedrock, Anthropic, | |
and OpenAI clients. | |
Returns: | |
Dictionary with initialized clients | |
""" | |
try: | |
load_dotenv(override=True) | |
except Exception as e: | |
print(f"\U0000274C Warning: Could not load .env file: {e}") | |
try: | |
bedrock_api_key = os.environ["AWS_BEARER_TOKEN_BEDROCK"] | |
except KeyError: | |
bedrock_api_key = None | |
print("\U0000274C Warning: AWS_BEARER_TOKEN_BEDROCK not found in environment") | |
openai_api_key = os.getenv("OPENAI_API_KEY") | |
anthropic_api_key = os.getenv("ANTHROPIC_API_KEY") | |
google_api_key = os.getenv("GEMINI_API_KEY") | |
xai_api_key = os.getenv("XAI_API_KEY") | |
clients = {} | |
if bedrock_api_key: | |
try: | |
print("Bedrock API key loaded successfully. Initializing runtime client") | |
bedrock_client = boto3.client( | |
service_name="bedrock-runtime", region_name="us-east-1" | |
) | |
clients.update({"bedrock": bedrock_client}) | |
except Exception as e: | |
print(f"\U0000274C Error initializing Bedrock client: {e}") | |
if anthropic_api_key: | |
try: | |
print("Anthropic API key loaded successfully. Initializing client") | |
anthropic_client = Anthropic(api_key=anthropic_api_key) | |
clients.update({"anthropic": anthropic_client}) | |
except Exception as e: | |
print(f"\U0000274C Error initializing Anthropic client: {e}") | |
if openai_api_key: | |
try: | |
print("OpenAI API key loaded successfully. Initializing client") | |
openai_client = OpenAI(api_key=openai_api_key) | |
clients.update({"openai": openai_client}) | |
except Exception as e: | |
print(f"\U0000274C Error initializing OpenAI client: {e}") | |
if google_api_key: | |
try: | |
print("Google API key loaded successfully. Initializing client") | |
google_client = OpenAI( | |
api_key=google_api_key, | |
base_url="https://generativelanguage.googleapis.com/v1beta/openai/", | |
) | |
clients.update({"google": google_client}) | |
except Exception as e: | |
print(f"\U0000274C Error initializing Google client: {e}") | |
if xai_api_key: | |
try: | |
print("XAI API key loaded successfully. Initializing client") | |
xai_client = OpenAI( | |
api_key=xai_api_key, base_url="https://api.x.ai/v1" | |
) | |
clients.update({"xai": xai_client}) | |
except Exception as e: | |
print(f"\U0000274C Error initializing XAI client: {e}") | |
try: | |
ollama_client = OpenAI( | |
api_key="ollama", base_url="http://localhost:11434/v1" | |
) | |
clients.update({"ollama": ollama_client}) | |
except Exception as e: | |
print(f"\U0000274C Error initializing Ollama client: {e}") | |
return clients | |
def call_openai(client, prompt, model="gpt-5-nano", **kwargs): | |
""" | |
Call the OpenAI API with the given prompt and model. | |
""" | |
try: | |
messages = [{"role": "user", "content": prompt}] | |
response = client.chat.completions.create( | |
model=model, messages=messages, **kwargs | |
) | |
text = response.choices[0].message.content | |
return text | |
except Exception as e: | |
print(f"\U0000274C Error calling OpenAI API with model {model}: {e}") | |
raise | |
def call_anthropic(client, prompt, model="claude-3-5-haiku-latest", **kwargs): | |
""" | |
Call the Anthropic API with the given prompt and model. | |
""" | |
try: | |
message = client.messages.create( | |
model=model, | |
max_tokens=1024, | |
messages=[ | |
{ | |
"role": "user", | |
"content": prompt, | |
} | |
], | |
**kwargs, | |
) | |
return message.content[0].text | |
except Exception as e: | |
print(f"\U0000274C Error calling Anthropic API with model {model}: {e}") | |
raise | |
def call_bedrock(client, prompt, model="us.amazon.nova-micro-v1:0", **kwargs): | |
try: | |
messages = [{"role": "user", "content": [{"text": prompt}]}] | |
response = client.converse(modelId=model, messages=messages, **kwargs) | |
return response["output"]["message"]["content"][0]["text"] | |
except Exception as e: | |
print(f"\U0000274C Error calling Bedrock API with model {model}: {e}") | |
raise | |
def call_single_model(provider, model, client, prompt): | |
"""Call a single model and return the response.""" | |
try: | |
if isinstance(client, OpenAI): | |
print( | |
f"""-> \U0001f9e0 Asking {model} on {provider}\ | |
using OpenAI API... \U0001f9e0""" | |
) | |
response = call_openai(client, prompt, model=model) | |
elif isinstance(client, Anthropic): | |
print( | |
f"""-> \U0001f9e0 Asking {model} on {provider}\ | |
using Anthropic API... \U0001f9e0""" | |
) | |
response = call_anthropic(client, prompt, model=model) | |
elif isinstance(client, botocore_client.BaseClient): | |
print( | |
f"""-> \U0001f9e0 Asking {model} on {provider}\ | |
using Bedrock API... \U0001f9e0""" | |
) | |
response = call_bedrock(client, prompt, model=model) | |
else: | |
raise ValueError(f"\U0000274C Unknown client type for model {model}") | |
return model, response | |
except Exception as e: | |
print(f"\U0000274C Error calling model {model} on {provider}: {e}") | |
return model, f"Error: {str(e)}" | |
def call_models(clients, prompt, models): | |
""" | |
Call the models in parallel and return the responses. | |
""" | |
responses = {} | |
try: | |
with ThreadPoolExecutor(max_workers=len(models)) as executor: | |
futures = [] | |
for provider, model in models.items(): | |
if provider in clients: | |
client = clients[provider] | |
future = executor.submit( | |
call_single_model, provider, model, client, prompt | |
) | |
futures.append(future) | |
else: | |
print(f"Warning: No client found for provider {provider}") | |
responses[model] = f"Error: No client available for {provider}" | |
for future in as_completed(futures): | |
try: | |
model, response = future.result() | |
responses[model] = response | |
print(f"\U00002705 {model} completed responding! \U00002705") | |
except Exception as e: | |
print(f"\U0000274C Error processing future result: {e}") | |
except Exception as e: | |
print(f"\U0000274C Error in parallel model execution: {e}") | |
raise | |
return responses | |
def extract_json_response(text): | |
# Find JSON that starts with {"results" | |
pattern = r'(\{"results".*?\})' | |
match = re.search(pattern, text, re.DOTALL) | |
if match: | |
json_str = match.group(1) | |
try: | |
return json.loads(json_str) | |
except json.JSONDecodeError: | |
# Try to find the complete JSON object | |
return extract_complete_json(text) | |
return None | |
def extract_complete_json(text): | |
# More sophisticated approach to handle nested objects | |
start_idx = text.find('{"response"') | |
if start_idx == -1: | |
return None | |
bracket_count = 0 | |
for i, char in enumerate(text[start_idx:], start_idx): | |
if char == '{': | |
bracket_count += 1 | |
elif char == '}': | |
bracket_count -= 1 | |
if bracket_count == 0: | |
json_str = text[start_idx:i+1] | |
try: | |
return json.loads(json_str) | |
except json.JSONDecodeError: | |
continue | |
return None | |
def main(): | |
"""Main function""" | |
print("Demonstrate paralellization pattern of calling multiple LLM's") | |
print("=" * 50) | |
# Set up the environment | |
print("Setting up the environment...") | |
try: | |
clients = setup_environment() | |
if not clients: | |
print("Error: No clients were successfully initialized") | |
return | |
print(f"Initialized {len(clients)} clients:") | |
print(clients) | |
print("\n" + "=" * 50) | |
except Exception as e: | |
print(f"Error during client initialization: {e}") | |
import traceback | |
traceback.print_exc() | |
return | |
# Flow: | |
# 1. Ask a model to define a question. | |
# 2. Ask the 6 models in parallel to answer the question | |
# 3. Aggregate answers | |
# 4. Ask each judging model to evaluate the answers | |
# 5. Calculate average rank from model evaluations | |
# 6. Print results | |
# 1. Ask a model to define a question. | |
print("STEP 1: Asking a model to define a question...") | |
request = """Please come up with a challenging, nuanced question that\ | |
I can ask a number of LLMs to evaluate their intelligence. """ | |
request += ( | |
"Answer only with the question, without any explanation or preamble." | |
) | |
print("Request: " + request) | |
question_model = "gpt-oss:20b" | |
print("\U0001f9e0 Asking model: " + question_model + " \U0001f9e0") | |
try: | |
if "ollama" not in clients: | |
print("\U0000274C Error: Ollama client not available") | |
return | |
question = call_openai(clients["ollama"], request, model=question_model) | |
print("-" * 50) | |
print("Question: " + question) | |
print("-" * 50) | |
except Exception as e: | |
print(f"\U0000274C Error generating question: {e}") | |
return | |
# 2. Ask the 6 models in parallel to answer the question. | |
# Define the model names in a dictionary | |
print("=" * 50 + "\nSTEP 2: Ask the models..") | |
models = { | |
# "bedrock":"us.amazon.nova-lite-v1:0", | |
"bedrock": "us.meta.llama3-3-70b-instruct-v1:0", | |
"anthropic": "claude-3-7-sonnet-latest", | |
"openai": "gpt-5-mini", | |
"google": "gemini-2.5-flash", | |
"xai": "grok-3-mini", | |
"ollama": "gpt-oss:20b", | |
} | |
try: | |
answers = call_models(clients, question, models) | |
if not answers: | |
print("\U0000274C Error: No answers received from models") | |
return | |
except Exception as e: | |
print(f"\U0000274C Error getting model answers: {e}") | |
return | |
# 3. Aggregate answers | |
print("STEP 3: Aggregating answers...") | |
try: | |
answers_list = [answer for answer in answers.values()] | |
competitors = [model for model in answers.keys()] | |
print("... And the competitors are:") | |
for i in enumerate(competitors): | |
print(f"Competitor C{i[0]+1}: {i[1]}") | |
together = "" | |
for index, answer in enumerate(answers_list): | |
together += f"# Response from competitor 'C{index+1}'\n\n" | |
together += answer + "\n\n" + "-" * 50 + "\n\n" | |
except Exception as e: | |
print(f"\U0000274C Error aggregating answers: {e}") | |
return | |
# 4. Ask each model to evaluate the answers | |
print("=" * 50 + "\nSTEP 4: Evaluating answers...") | |
# Create evaluation prompt | |
judge = f""" | |
You are an expert evaluator of LLMS in a competition.\ | |
You are judging a competition between {len(competitors)} competitors.\ | |
Competitors are identified by an id such as 'C1', 'C2', etc.\ | |
Each competitor has been given this question: | |
{question} | |
Your job is to evaluate each response for clarity and strength of argument,\ | |
and rank them in order of best to worst. Think about your evaluation. | |
Respond with JSON with the following format: | |
{{"results": ["best competitor id", "second best competitor id", "third best competitor id", ...]}} | |
Here are the responses from each competitor: | |
{together} | |
Now respond with the JSON, and only JSON, with the ranked\ | |
order of the competitors, nothing else.\ | |
Do not include markdown formatting or code blocks.""" | |
# Write evaluation prompt to file | |
try: | |
print("Writing evaluation prompt to file 'evaluation_prompt.txt'") | |
with open("evaluation_prompt.txt", "w") as f: | |
f.write(together) | |
except Exception as e: | |
print(f"\U0000274C Error writing evaluation prompt to file: {e}") | |
judging_models = { | |
"bedrock": "us.amazon.nova-pro-v1:0", | |
"anthropic": "claude-sonnet-4-20250514", | |
"openai": "o3-mini", | |
"google": "gemini-2.5-pro", | |
} | |
try: | |
print(f"\U00002696"*5+" JUDGEMENT TIME! " + f"\U00002696"*5) | |
evaluations = call_models(clients, judge, judging_models) | |
if not evaluations: | |
print("\U0000274C Error: No evaluations received from judging models") | |
return | |
except Exception as e: | |
print(f"\U0000274C Error getting model evaluations: {e}") | |
return | |
# 5. Calculate average rank from model evaluations | |
print("=" * 42 + "\nSTEP 5: Calculating average rank from model evaluations...") | |
rankings = [] | |
for model, evaluation in evaluations.items(): | |
try: | |
parsed = extract_json_response(evaluation) | |
rankings.append(parsed["results"]) | |
except json.JSONDecodeError as e: | |
print( | |
f"\U0000274C Error parsing JSON response for model {model}: {e}\nResponse: {evaluation}" | |
) | |
rankings.append([]) | |
except Exception as e: | |
print(f"\U0000274C Unexpected error processing evaluation for model {model}: {e}") | |
rankings.append([]) | |
print(rankings) | |
try: | |
# Collect all rankings for each contestant | |
contestant_rankings = defaultdict(list) | |
for judge_ranking in rankings: | |
for position, contestant in enumerate(judge_ranking, 1): | |
contestant_rankings[contestant].append(position) | |
# Calculate average rankings | |
average_rankings = {contestant: sum(ranks)/len(ranks) | |
for contestant, ranks in contestant_rankings.items() if ranks} | |
#print(average_rankings) | |
if not average_rankings: | |
print("\U0000274C Error: No valid rankings to process") | |
return | |
# Sort by average (ascending - lowest average = best rank) | |
sorted_results = sorted(average_rankings.items(), key=lambda x: x[1]) | |
#print(sorted_results) | |
# 6. present the results by competitor | |
print("Final Rankings:\n"+"="*42) | |
for competitor, average in sorted_results: | |
try: | |
competitor_name = competitors[int(competitor.lower().strip('c'))-1] | |
rank = sorted_results.index((competitor, average))+1 | |
print(f"\U0001F3C6 Rank: {rank} ---- Model: {competitor_name} ---- Average rank: {average} \U0001F3C6") | |
except (ValueError, IndexError) as e: | |
print(f"\U0000274C Error processing competitor {competitor}: {e}") | |
print("=" * 42) | |
print("Done!") | |
except Exception as e: | |
print(f"\U0000274C Error calculating final rankings: {e}") | |
if __name__ == "__main__": | |
main() | |