import gradio as gr import pandas as pd from datasets import load_dataset import random from typing import Dict, Any, List import json # Load the dataset def load_community_alignment_dataset(): """Load the Facebook Community Alignment Dataset""" try: dataset = load_dataset("facebook/community-alignment-dataset") print(f"Dataset loaded successfully. Available splits: {list(dataset.keys())}") for split_name, split_data in dataset.items(): print(f"Split '{split_name}': {len(split_data)} items") return dataset except Exception as e: print(f"Error loading dataset: {e}") return None # Initialize dataset dataset = load_community_alignment_dataset() def format_conversation_turn(turn_data: Dict[str, Any], turn_number: int) -> str: """Format a conversation turn for display""" if not turn_data: return "" prompt = turn_data.get('prompt', '') responses = turn_data.get('responses', '') preferred = turn_data.get('preferred_response', '') formatted = f"**Turn {turn_number}**\n\n" formatted += f"**Prompt:** {prompt}\n\n" if responses: formatted += "**Responses:**\n" formatted += responses.replace('# Response ', '**Response ').replace(':\n', ':**\n') formatted += "\n" if preferred: formatted += f"**Preferred Response:** {preferred.upper()}\n" return formatted def get_conversation_data(conversation_id: int) -> Dict[str, Any]: """Get conversation data by ID""" if not dataset: return None try: # Search for conversation in the dataset for split in dataset.keys(): split_data = dataset[split] for i in range(len(split_data)): item = split_data[i] if item.get('conversation_id') == conversation_id: return item return None except Exception as e: print(f"Error getting conversation data: {e}") return None def format_annotator_info(item: Dict[str, Any]) -> str: """Format annotator information""" info = "**Annotator Information:**\n\n" demographics = [ ('Age', 'annotator_age'), ('Gender', 'annotator_gender'), ('Education', 'annotator_education_level'), ('Political', 'annotator_political'), ('Ethnicity', 'annotator_ethnicity'), ('Country', 'annotator_country') ] for label, key in demographics: value = item.get(key, 'N/A') if value and value != 'None': info += f"**{label}:** {value}\n" return info def display_conversation(conversation_id: int) -> tuple: """Display a conversation by ID""" if not dataset: return "Dataset not loaded", "", "", "" item = get_conversation_data(conversation_id) if not item: return f"Conversation ID {conversation_id} not found", "", "", "" # Format conversation turns conversation_text = "" # First turn if item.get('first_turn_prompt'): first_turn = { 'prompt': item['first_turn_prompt'], 'responses': item['first_turn_responses'], 'preferred_response': item['first_turn_preferred_response'] } conversation_text += format_conversation_turn(first_turn, 1) + "\n" # Second turn if item.get('second_turn_prompt'): second_turn = { 'prompt': item['second_turn_prompt'], 'responses': item['second_turn_responses'], 'preferred_response': item['second_turn_preferred_response'] } conversation_text += format_conversation_turn(second_turn, 2) + "\n" # Third turn if item.get('third_turn_prompt'): third_turn = { 'prompt': item['third_turn_prompt'], 'responses': item['third_turn_responses'], 'preferred_response': item['third_turn_preferred_response'] } conversation_text += format_conversation_turn(third_turn, 3) + "\n" # Fourth turn if item.get('fourth_turn_prompt'): fourth_turn = { 'prompt': item['fourth_turn_prompt'], 'responses': item['fourth_turn_responses'], 'preferred_response': item['fourth_turn_preferred_response'] } conversation_text += format_conversation_turn(fourth_turn, 4) + "\n" # Annotator information annotator_info = format_annotator_info(item) # Metadata metadata = f"**Metadata:**\n\n" metadata += f"**Conversation ID:** {item.get('conversation_id', 'N/A')}\n" metadata += f"**Language:** {item.get('assigned_lang', 'N/A')}\n" metadata += f"**Annotator ID:** {item.get('annotator_id', 'N/A')}\n" metadata += f"**In Balanced Subset:** {item.get('in_balanced_subset', 'N/A')}\n" metadata += f"**In Balanced Subset 10:** {item.get('in_balanced_subset_10', 'N/A')}\n" metadata += f"**Is Pregenerated First Prompt:** {item.get('is_pregenerated_first_prompt', 'N/A')}\n" # Raw JSON for debugging raw_json = json.dumps(item, indent=2) return conversation_text, annotator_info, metadata, raw_json def get_random_conversation() -> int: """Get a random conversation ID""" if not dataset: return 0 try: # Get a random split split = random.choice(list(dataset.keys())) split_data = dataset[split] # Get a random index random_index = random.randint(0, len(split_data) - 1) item = split_data[random_index] return item.get('conversation_id', 0) except Exception as e: print(f"Error getting random conversation: {e}") # Fallback: return a default conversation ID return 1061830552573006 # The ID from your example def get_dataset_stats() -> str: """Get dataset statistics""" if not dataset: return "Dataset not loaded" stats = "**Dataset Statistics:**\n\n" for split_name, split_data in dataset.items(): stats += f"**{split_name}:** {len(split_data)} conversations\n" # Sample some metadata if 'train' in dataset and len(dataset['train']) > 0: sample_item = dataset['train'][0] stats += f"\n**Sample Fields:**\n" for key in list(sample_item.keys())[:10]: # Show first 10 fields stats += f"- {key}\n" return stats def search_conversations(query: str, field: str) -> str: """Search conversations by field""" if not dataset or not query: return "Please provide a search query" results = [] query_lower = query.lower() try: for split_name, split_data in dataset.items(): # Limit search to first 100 items per split for i in range(min(100, len(split_data))): item = split_data[i] if field in item and item[field]: field_value = str(item[field]).lower() if query_lower in field_value: results.append({ 'conversation_id': item.get('conversation_id'), 'split': split_name, 'field_value': str(item[field])[:100] + "..." if len(str(item[field])) > 100 else str(item[field]) }) except Exception as e: return f"Error during search: {e}" if not results: return f"No results found for '{query}' in field '{field}'" result_text = f"**Search Results for '{query}' in '{field}':**\n\n" for i, result in enumerate(results[:10]): # Limit to 10 results result_text += f"{i+1}. **Conversation ID:** {result['conversation_id']} (Split: {result['split']})\n" result_text += f" **Value:** {result['field_value']}\n\n" return result_text # Create the Gradio interface def create_interface(): with gr.Blocks(title="Facebook Community Alignment Dataset Viewer", theme=gr.themes.Soft()) as demo: gr.Markdown("# 🤖 Facebook Community Alignment Dataset Viewer") gr.Markdown("Explore conversations, responses, and annotations from the Facebook Community Alignment Dataset.") with gr.Tabs(): # Tab 1: Conversation Viewer with gr.Tab("Conversation Viewer"): with gr.Row(): with gr.Column(scale=1): conversation_id_input = gr.Number( label="Conversation ID", value=get_random_conversation(), interactive=True ) random_btn = gr.Button("🎲 Random Conversation", variant="secondary") load_btn = gr.Button("🔍 Load Conversation", variant="primary") with gr.Column(scale=3): conversation_display = gr.Markdown(label="Conversation") annotator_display = gr.Markdown(label="Annotator Information") metadata_display = gr.Markdown(label="Metadata") raw_json_display = gr.Code(label="Raw JSON", language="json") # Connect buttons random_btn.click( fn=get_random_conversation, outputs=conversation_id_input ) load_btn.click( fn=display_conversation, inputs=conversation_id_input, outputs=[conversation_display, annotator_display, metadata_display, raw_json_display] ) conversation_id_input.submit( fn=display_conversation, inputs=conversation_id_input, outputs=[conversation_display, annotator_display, metadata_display, raw_json_display] ) # Tab 2: Dataset Statistics with gr.Tab("Dataset Statistics"): stats_btn = gr.Button("📊 Load Statistics", variant="primary") stats_display = gr.Markdown(label="Dataset Statistics") stats_btn.click( fn=get_dataset_stats, outputs=stats_display ) # Tab 3: Search with gr.Tab("Search Conversations"): with gr.Row(): with gr.Column(scale=1): search_query = gr.Textbox( label="Search Query", placeholder="Enter search term...", interactive=True ) search_field = gr.Dropdown( label="Search Field", choices=[ "first_turn_prompt", "second_turn_prompt", "third_turn_prompt", "annotator_country", "annotator_age", "annotator_gender", "assigned_lang" ], value="first_turn_prompt", interactive=True ) search_btn = gr.Button("🔍 Search", variant="primary") with gr.Column(scale=2): search_results = gr.Markdown(label="Search Results") search_btn.click( fn=search_conversations, inputs=[search_query, search_field], outputs=search_results ) search_query.submit( fn=search_conversations, inputs=[search_query, search_field], outputs=search_results ) # Tab 4: About with gr.Tab("About"): gr.Markdown(""" ## About the Facebook Community Alignment Dataset This dataset contains conversations with multiple response options and human annotations indicating which responses are preferred by different demographic groups. ### Dataset Structure: - **Conversations**: Multi-turn dialogues with prompts and multiple response options - **Annotations**: Human preferences for different response options - **Demographics**: Annotator information including age, gender, education, political views, ethnicity, and country ### Key Features: - Multi-turn conversations (up to 4 turns) - 4 response options per turn (A, B, C, D) - Human preference annotations - Diverse annotator demographics - Balanced subsets for analysis ### Use Cases: - Studying response preferences across demographics - Training models to generate community-aligned responses - Analyzing conversation dynamics - Understanding cultural and demographic differences in communication preferences ### Citation: If you use this dataset, please cite the original Facebook research paper. """) # Auto-load a random conversation on startup demo.load( fn=display_conversation, inputs=conversation_id_input, outputs=[conversation_display, annotator_display, metadata_display, raw_json_display] ) return demo # Create and launch the app if __name__ == "__main__": demo = create_interface() demo.launch( server_name="0.0.0.0", server_port=7860, share=False, show_error=True )