import os import pandas as pd import googlemaps import re import tweepy import logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) from dotenv import load_dotenv from googleapiclient.discovery import build ## Youtube from sqlalchemy import create_engine import time load_dotenv() YOUTUBE_API_KEY = os.environ.get("YOUTUBE_API_KEY") MAPS_API_KEY = os.environ.get("MAPS_API_KEY") TWITTER_API_KEY = os.environ.get("TWITTER_API_KEY") TWITTER_API_SECRET = os.environ.get("TWITTER_API_SECRET") TWITTER_ACCESS_TOKEN = os.environ.get("TWITTER_ACCESS_TOKEN") TWITTER_ACCESS_SECRET = os.environ.get("TWITTER_ACCESS_SECRET") TWITTER_BEARER_TOKEN = os.environ.get("TWITTER_BEARER_TOKEN") GEMINI_API_KEY = os.environ.get("GEMINI_API_KEY") REDDIT_CLIENT_ID = os.environ.get("REDDIT_CLIENT_ID") REDDIT_CLIENT_SECRET = os.environ.get("REDDIT_CLIENT_SECRET") REDDIT_USER_AGENT = os.environ.get("REDDIT_USER_AGENT") DETECT_LANGUAGE_API_KEY = os.environ.get("DETECT_LANGUAGE_API_KEY") OPEN_AI_API_KEY = os.getenv("OPEN_AI_API_KEY") OPENROUTER_API_KEY = os.getenv("OPENROUTER_API_KEY") PINECONE_API_KEY = os.getenv("PINECONE_API_KEY") PINECONE_INDEX_NAME = os.getenv("PINECONE_INDEX_NAME", "iris-gemini-chat") EMBEDDING_DIMENSION = 768 # Para models/embedding-001 de Gemini SUPABASE_URL = os.getenv("SUPABASE_URL") SUPABASE_KEY = os.getenv("SUPABASE_KEY") SUPABASE_KEY_PSQL = os.getenv("SUPABASE_KEY_PSQL") SUPABASE_URL_PSQL = os.getenv("SUPABASE_URL_PSQL") SUPABASE_USER_PSQL = os.getenv("SUPABASE_USER") def _maps_comments(maps_query: str, radius:int, lat:int, long: int) -> pd.DataFrame: """ Adapta la lógica de searchMapsCommentsQuery del prompt original. Busca lugares cercanos por palabra clave y obtiene sus comentarios. """ if not MAPS_API_KEY: return pd.DataFrame({'Error': ["Clave API de Google Maps no configurada."]}) gmaps = googlemaps.Client(MAPS_API_KEY) comments_list = [] try: # 1. Buscar lugares cercanos por la palabra clave # geocoder.ip('me') no está disponible globalmente en app.py, # se asume una ubicación por defecto o se podría integrar un servicio de geocodificación. # Para simplificar, usaremos una ubicación central de CDMX o se podría hacer configurable. # O se podría usar el input.maps_query() si es una dirección. # Si maps_query es una URL con place_id, se prioriza. place_id = None if maps_query.startswith("http") and "place_id" in maps_query: match = re.search(r'place_id=([^&]+)', maps_query) if match: place_id = match.group(1) if place_id: # Si se proporciona un place_id, obtener directamente los detalles de ese lugar places_data = gmaps.place(place_id=place_id, fields=['name', 'rating', 'review', 'formatted_address', 'geometry', 'place_id'], language='es') if places_data.get('result'): result_data = places_data['result'] placeName = result_data.get('name', 'N/A') lat = result_data['geometry']['location']['lat'] if result_data.get('geometry') and result_data['geometry'].get('location') else None long = result_data['geometry']['location']['lng'] if result_data.get('geometry') and result_data['geometry'].get('location') else None reviews_data = result_data.get('reviews', []) current_place_comments = [] for review in reviews_data: current_place_comments.append({ 'author': review.get('author_name', 'N/A'), 'comment': review.get('text', ''), 'rating': review.get('rating', 'N/A'), 'lat': lat, 'long': long, 'place_name': placeName, 'place_id': place_id, 'query': maps_query, 'source': 'Google Maps' }) if current_place_comments: comments_list.append(pd.DataFrame(current_place_comments)) else: # Si no hay place_id, realizar una búsqueda de lugares cercanos # Usar una ubicación por defecto (ej. Ciudad de México) si geocoder.ip('me') no es viable # O se podría hacer que el usuario ingrese una ubicación de referencia if not lat: default_location = (19.4326, -99.1332) # Lat/Long de CDMX else: default_location = (lat, long) searchPlaces = gmaps.places_nearby(keyword=maps_query, location=default_location, radius=radius) # Radio de 5km searchPlacesResults = searchPlaces['results'] for place in searchPlacesResults: placeName = place.get('name', None) # Filtrar por nombre para asegurar relevancia, como en el prompt original if placeName and re.search(re.escape(maps_query.lower()), placeName.lower(), re.IGNORECASE): placesData = gmaps.place(place_id=place.get('place_id'), fields=['name', 'rating', 'review', 'formatted_address', 'geometry', 'place_id'], language='es') current_place_comments = [] if placesData.get('result'): result_data = placesData['result'] lat = result_data['geometry']['location']['lat'] if result_data.get('geometry') and result_data['geometry'].get('location') else None long = result_data['geometry']['location']['lng'] if result_data.get('geometry') and result_data['geometry'].get('location') else None reviews_data = result_data.get('reviews', []) if reviews_data: for review in reviews_data: current_place_comments.append({ 'author': review.get('author_name', 'N/A'), 'comment': review.get('text', ''), 'rating': review.get('rating', 'N/A'), 'lat': lat, 'long': long, 'place_name': placeName, 'place_id': place.get('place_id', None), 'query': maps_query, 'source': 'Google Maps' }) if current_place_comments: comments_list.append(pd.DataFrame(current_place_comments)) except Exception as e: logger.error(f"Error al obtener comentarios de Google Maps para '{maps_query}': {e}", exc_info=True) return pd.DataFrame({"Error": [f"Error al obtener comentarios de Google Maps: {e}"]}) if comments_list: final_df = pd.concat(comments_list, ignore_index=True) final_df['origin'] = 'maps' # Asegurar la columna 'origin' return final_df return pd.DataFrame({'Mensaje': [f"No se encontraron reviews para '{maps_query}'."]}) def _get_youtube_channels_and_comments(query: str, max_results_comments: int = 10) -> pd.DataFrame: """ Adapta la lógica de searchYoutubeQuery del prompt original. Busca canales de YouTube por query, obtiene videos del canal más relevante y luego los comentarios de esos videos. """ if not YOUTUBE_API_KEY: return pd.DataFrame({'Error': ["Clave API de YouTube no configurada."]}) youtube_service = build('youtube', 'v3', developerKey=YOUTUBE_API_KEY) comments_list = [] try: # 1. Buscar canales de YouTube por la query request_channels = youtube_service.search().list( q=query, type='channel', part='snippet', maxResults=10 # Buscar hasta 5 canales ) response_channels = request_channels.execute() canales_similares = [] for item in response_channels.get('items', []): channel_title_search = item['snippet']['title'].lower() if re.search(re.escape(query.lower()), channel_title_search, re.IGNORECASE): canales_similares.append({ 'channelId': item['id']['channelId'], 'title': item['snippet']['title'] }) if not canales_similares: logger.warning(f"No se encontraron canales similares a '{query}'.") return pd.DataFrame(columns=['author', 'comment', 'published_at', 'video_id', 'video_title', 'query', 'source', 'channelId', 'official_channel', 'channel_title', 'origin']) # Tomar el primer canal similar encontrado selected_channel = canales_similares[0] channel_id = selected_channel['channelId'] channel_title = selected_channel['title'] # Determinar si el canal es oficial (ej. verificado o vinculado) testStatus = youtube_service.channels().list(part='status', id=channel_id).execute() is_official = testStatus.get('items', [{}])[0].get('status', {}).get('isLinked', False) # 2. Buscar videos del canal seleccionado request_videos = youtube_service.search().list( channelId=channel_id, part='snippet', maxResults=20, # Obtener hasta 20 videos order='date' ) response_videos = request_videos.execute() for video in response_videos.get('items', []): video_id = video['id'].get('videoId') video_title = video['snippet']['title'] if video_id: comments = _get_comments_from_video_helper(video_id, youtube_service, max_results_comments) if comments: df_comments = pd.DataFrame(comments) df_comments['video_id'] = video_id df_comments['video_title'] = video_title df_comments['query'] = query df_comments['source'] = 'YouTube' df_comments['channelId'] = channel_id df_comments['official_channel'] = is_official df_comments['channel_title'] = channel_title comments_list.append(df_comments) except Exception as e: logger.error(f"Error en _get_youtube_channels_and_comments para '{query}': {e}", exc_info=True) return pd.DataFrame({"Error": [f"Error al obtener datos de YouTube: {e}"]}) if comments_list: final_df = pd.concat(comments_list, ignore_index=True) final_df['origin'] = 'youtube' # Asegurar la columna 'origin' return final_df return pd.DataFrame(columns=['author', 'comment', 'published_at', 'video_id', 'video_title', 'query', 'source', 'channelId', 'official_channel', 'channel_title', 'origin']) def _get_comments_from_video_helper(videoId: str, youtube_service, max_results: int = 20) : """ Función auxiliar para obtener comentarios de un video, usada por _get_youtube_channels_and_comments. """ comments = [] request = youtube_service.commentThreads().list( part='snippet', videoId=videoId, textFormat='plainText', maxResults=max_results ) while request: response = request.execute() for item in response.get('items', []): comment_info = { 'author': item['snippet']['topLevelComment']['snippet']['authorDisplayName'], 'comment': item['snippet']['topLevelComment']['snippet']['textDisplay'], 'published_at': item['snippet']['topLevelComment']['snippet']['publishedAt'] } comments.append(comment_info) request = youtube_service.commentThreads().list_next(request, response) return comments def _get_youtube_comments(video_url_or_id: str) -> pd.DataFrame: """ Obtiene comentarios de un video de YouTube específico por su URL o ID. """ if not YOUTUBE_API_KEY: return pd.DataFrame({'Error': ["Clave API de YouTube no configurada."]}) youtube = build('youtube', 'v3', developerKey=YOUTUBE_API_KEY) video_id = None # Extrae el ID del video de varios formatos de URL if "v=" in video_url_or_id: video_id = video_url_or_id.split("v=")[-1].split("&")[0] elif "youtu.be/" in video_url_or_id: video_id = video_url_or_id.split("youtu.be/")[-1].split("?")[0] elif len(video_url_or_id) == 11 and video_url_or_id.isalnum(): # Asume que es un ID si tiene 11 caracteres alfanuméricos video_id = video_url_or_id if not video_id: return pd.DataFrame({'Error': ["URL/ID de YouTube no válido."]}) try: comments = [] # Realiza la llamada a la API para obtener los hilos de comentarios response = youtube.commentThreads().list( part='snippet', videoId=video_id, textFormat='plainText', maxResults=10 # Obtiene los 10 comentarios principales ).execute() # Procesa la respuesta for item in response.get('items', []): comment_snippet = item['snippet']['topLevelComment']['snippet'] comments.append({ 'author': comment_snippet['authorDisplayName'], 'comment': comment_snippet['textDisplay'], 'video_id': video_id, 'published_at': comment_snippet['publishedAt'] }) if not comments: return pd.DataFrame({'Mensaje': [f"No se encontraron comentarios para el video de YouTube {video_id}."]}) # Convierte la lista de comentarios a un DataFrame de pandas df = pd.DataFrame(comments) df['origin'] = "youtube" # Añade la columna de origen para análisis posteriores return df except Exception as e: logger.error(f"Error al obtener comentarios de YouTube para '{video_url_or_id}': {e}", exc_info=True) return pd.DataFrame({"Error": [f"Error al obtener comentarios de YouTube: {e}"]}) def _get_tweets_from_twitter_api(query_or_url: str) -> pd.DataFrame: """ Obtiene tweets de la API de Twitter (X) basándose en una consulta. Soporta búsquedas por hashtag, usuario, URL de tweet o ID de tweet. Args: query_or_url (str): La consulta de búsqueda (ej. "#hashtag", "@usuario", URL de tweet como "https://x.com/user/status/ID", o ID numérico de tweet). Returns: pd.DataFrame: Un DataFrame de pandas con los tweets encontrados y sus metadatos, incluyendo columnas 'query' y 'source' para el análisis comparativo. Devuelve un DataFrame con un mensaje de error o vacío si no se encuentran datos. """ if not TWITTER_BEARER_TOKEN: return pd.DataFrame({"Error": ["Bearer Token de Twitter no configurado."]}) client = tweepy.Client(bearer_token=TWITTER_BEARER_TOKEN) tweets_list = [] response_data = None tweet_fields = ['created_at', 'public_metrics', 'author_id', 'conversation_id', 'in_reply_to_user_id', 'lang'] expansions = ['author_id', 'in_reply_to_user_id'] user_fields = ['username', 'name', 'profile_image_url', 'verified'] max_results_count = 10 user_profile_match = re.search(r'(?:https?://(?:www\.)?(?:twitter|x)\.com/)([a-zA-Z0-9_]+)(?:/?(?:\?.*)?(?:#.*)?)?$', query_or_url) if user_profile_match: username_to_search = user_profile_match.group(1) logger.info(f'Detectada URL de perfil. Buscando tweets del usuario: @{username_to_search}') user_lookup = client.get_user(username=username_to_search, user_fields=['id']) if user_lookup.data: user_id = user_lookup.data.id response_data = client.get_users_tweets(id=user_id, tweet_fields=tweet_fields, expansions=expansions, user_fields=user_fields, max_results=10) else: return pd.DataFrame({"Error": [f'Usuario de Twitter no encontrado a partir de la URL: {query_or_url}']}) try: # Verifica si es una URL o ID de tweet if ("x.com/" in query_or_url or "twitter.com/" in query_or_url) and "/status/" in query_or_url: match = re.search(r'.*/status/(\d+)', query_or_url) if match: tweet_id = match.group(1) logger.info(f'Buscando tweet por ID: {tweet_id}') response_data = client.get_tweet(id=tweet_id, tweet_fields=tweet_fields, expansions=expansions, user_fields=user_fields) if response_data.data: response_data.data = [response_data.data] # Envuelve el tweet individual en una lista para consistencia else: return pd.DataFrame({"Mensaje": [f"No se encontró el tweet con ID: {tweet_id}"]}) else: return pd.DataFrame({"Error": ["No se pudo extraer el ID del tweet de la URL."]}) elif query_or_url.startswith("#"): logger.info(f'Buscando tweets por hashtag: {query_or_url}') response_data = client.search_recent_tweets(query=query_or_url, tweet_fields=tweet_fields, expansions=expansions, user_fields=user_fields, max_results=max_results_count) elif query_or_url.startswith("@"): username_to_search = query_or_url.lstrip('@') logger.info(f'Buscando tweets del usuario: {username_to_search}') user_lookup = client.get_user(username=username_to_search, user_fields=['id']) if user_lookup.data: user_id = user_lookup.data.id response_data = client.get_users_tweets(id=user_id, tweet_fields=tweet_fields, expansions=expansions, user_fields=user_fields, max_results=max_results_count) else: return pd.DataFrame({"Error": [f'Usuario de Twitter no encontrado: {username_to_search}']}) elif query_or_url.isdigit(): # Si es solo un ID de tweet numérico tweet_id = query_or_url logger.info(f'Buscando tweet por ID numérico: {tweet_id}') response_data = client.get_tweet(id=tweet_id, tweet_fields=tweet_fields, expansions=expansions, user_fields=user_fields) if response_data.data: response_data.data = [response_data.data] # Consistencia else: return pd.DataFrame({"Mensaje": [f"No se encontró el tweet con ID: {tweet_id}"]}) else: logger.warning(f"Consulta de Twitter no válida: '{query_or_url}'.") return pd.DataFrame({"Error": ["Consulta de Twitter no válida (formato @usuario, #hashtag, ID o URL de tweet)"]}) if response_data and response_data.data: users_data = {u["id"]: u for u in response_data.includes.get('users', [])} if response_data.includes else {} for tweet_obj in response_data.data: author_info = users_data.get(tweet_obj.author_id, {}) metrics = tweet_obj.public_metrics if tweet_obj.public_metrics else {} tweets_list.append({ 'tweet_id': tweet_obj.id, 'text': tweet_obj.text, 'author_id': tweet_obj.author_id, 'username': author_info.get("username", "N/A"), 'author_name': author_info.get("name", "N/A"), 'author_verified': author_info.get("verified", False), 'created_at': tweet_obj.created_at, 'like_count': metrics.get('like_count', 0), 'retweet_count': metrics.get('retweet_count', 0), 'reply_count': metrics.get('reply_count', 0), 'quote_count': metrics.get('quote_count', 0), 'impression_count': metrics.get('impression_count', 0), 'conversation_id': tweet_obj.conversation_id, 'in_reply_to_user_id': tweet_obj.in_reply_to_user_id, 'lang': tweet_obj.lang ,'query': query_or_url, # Añadir la query original para el contexto de comparación 'source': 'Twitter' # Añadir la fuente para el análisis comparativo }) if not tweets_list: return pd.DataFrame({'Mensaje': [f"No se encontraron tweets para la consulta '{query_or_url}' o la respuesta no contenía datos."]}) df = pd.DataFrame(tweets_list) df['origin'] = "twitter" # Columna 'origin' para identificar la plataforma en análisis posteriores return df except tweepy.TweepyException as e: error_message = str(e) if hasattr(e, 'api_errors') and e.api_errors and isinstance(e.api_errors, list) and e.api_errors[0]: api_error = e.api_errors[0] if isinstance(api_error, dict): error_message = api_error.get('detail', error_message) if 'title' in api_error: error_message = f"{api_error['title']}: {error_message}" elif hasattr(api_error, 'message'): error_message = api_error.message elif hasattr(e, 'response') and e.response is not None: try: error_details = e.response.json(); error_message = error_details.get('detail', error_details.get('title', str(e))) except ValueError: error_message = e.response.text if e.response.text else str(e) logger.error(f"TweepyException para '{query_or_url}': {error_message}", exc_info=True) return pd.DataFrame({"Error": [f"Error de API de Twitter: {error_message}"]}) except Exception as e: logger.error(f"Error general en _get_tweets_from_twitter_api para '{query_or_url}': {e}", exc_info=True) return pd.DataFrame({"Error": [f"Error general al obtener tweets: {e}"]}) # --- Twitter API Account Rotation Setup --- # Estas variables y funciones se inicializan una vez al inicio de la aplicación # y gestionan la rotación de cuentas de Twitter (X) para evitar límites de tasa. def getXAccounts(): """ Obtiene las credenciales de las cuentas de Twitter (X) desde la base de datos. Asume que las variables de entorno para la conexión a Supabase están configuradas. """ USER = 'postgres.rdqtsoydvgxdbbvhnmlk' PASSWORD = os.getenv("SUPABASE_KEY_PSQL") HOST = os.getenv("SUPABASE_URL_PSQL") PORT = 5432 DBNAME = 'postgres' query_sql = """ select xa.* from iris_scraper.x_accounts xa """ try: engine = create_engine(f'postgresql+psycopg2://{USER}:{PASSWORD}@{HOST}/{DBNAME}') dfFb = pd.read_sql(query_sql, engine) return dfFb except Exception as e: logger.error(f"Error al conectar a la base de datos o al obtener cuentas de X: {e}") return pd.DataFrame() # Retorna un DataFrame vacío en caso de error #xaccountsdf = getXAccounts() """ # Asegura que el DataFrame no esté vacío antes de procesar if not xaccountsdf.empty: # Asegura que las columnas coincidan con el orden esperado de la consulta SQL xaccountsdf.columns = ['name', 'bearer_token', 'api_key', 'api_secret', 'access_token', 'access_token_secret', 'created_at', 'id'] xaccountsdf['calls_made'] = 0 # Inicializa el contador de llamadas para cada cuenta twitter_accounts = xaccountsdf.to_dict(orient='records') else: logger.warning("No se pudieron cargar las cuentas de Twitter (X) desde la base de datos. Las funciones de Twitter podrían no operar.") twitter_accounts = [] # Lista vacía si no hay cuentas """ twitter_accounts = None current_account_index = 0 MAX_CALLS_PER_ACCOUNT = 100 # Límite de llamadas por cuenta antes de rotar def _switch_to_next_twitter_account(): """Pasa a la siguiente cuenta de API en la lista.""" global current_account_index if not twitter_accounts: logger.warning("No hay cuentas de Twitter configuradas para rotar.") return previous_index = current_account_index current_account_index = (current_account_index + 1) % len(twitter_accounts) logger.info(f"Cambiando de cuenta de API de Twitter: de '{twitter_accounts[previous_index]['name']}' a '{twitter_accounts[current_account_index]['name']}'") def _get_current_twitter_clients(): """ Obtiene los clientes de Tweepy (v1.1 y v2) para la cuenta activa. Maneja la rotación de cuentas si la actual está agotada. """ global current_account_index if not twitter_accounts: logger.error("No hay cuentas de Twitter configuradas.") return None, None account = twitter_accounts[current_account_index] # Si la cuenta actual ha alcanzado su límite, intenta cambiar a la siguiente if account['calls_made'] >= MAX_CALLS_PER_ACCOUNT: logger.info(f"La cuenta '{account['name']}' ha alcanzado su límite de {MAX_CALLS_PER_ACCOUNT} llamadas.") _switch_to_next_twitter_account() account = twitter_accounts[current_account_index] # Obtiene la nueva cuenta # Si la nueva cuenta también está agotada (y no es la única), se registrará una advertencia if account['calls_made'] >= MAX_CALLS_PER_ACCOUNT and len(twitter_accounts) > 1: logger.warning(f"La cuenta '{account['name']}' a la que se cambió también está agotada. Esto puede indicar que todas las cuentas están agotadas o que el límite es muy bajo.") try: # auth_v1 y api_v1 se mantienen por consistencia con el prompt original, # aunque client_v2 es el que se usa para la mayoría de las operaciones modernas. auth_v1 = tweepy.OAuth1UserHandler( account['api_key'], account['api_secret'], account['access_token'], account['access_token_secret'] ) api_v1 = tweepy.API(auth_v1) client_v2 = tweepy.Client(bearer_token=account['bearer_token']) logger.info(f"Usando cuenta de API: '{account['name']}'. Llamadas realizadas: {account['calls_made']}") return api_v1, client_v2 except Exception as e: logger.error(f"Error de autenticación con la cuenta '{account['name']}': {e}") # Si la autenticación falla, se cambia a la siguiente cuenta y se reintenta (manejado por el bucle de reintentos) _switch_to_next_twitter_account() return None, None def _get_tweets_from_twitter_api(query_or_url: str, max_results: int = 10) -> pd.DataFrame: """ Obtiene tweets de la API de Twitter (X) basándose en una consulta, utilizando un sistema de rotación de cuentas para manejar límites de tasa. Args: query_or_url (str): La consulta de búsqueda (ej. "#hashtag", "@usuario", URL de tweet como "https://x.com/user/status/ID", o ID numérico de tweet). max_results (int): Número máximo de resultados a devolver (máx. 100 por llamada para search_recent_tweets). Returns: pd.DataFrame: Un DataFrame de pandas con los tweets encontrados y sus metadatos, incluyendo columnas 'query' y 'source' para el análisis comparativo. Devuelve un DataFrame con un mensaje de error o vacío si no se encuentran datos. """ if not twitter_accounts: logger.error("No hay cuentas de Twitter configuradas. No se pueden obtener tweets.") return pd.DataFrame({"Error": ["No hay cuentas de Twitter (X) configuradas."]}) tweets_list = [] MAX_RETRIES = len(twitter_accounts) # Intentar con cada cuenta una vez retries = 0 while retries < MAX_RETRIES: _api_v1, client_v2 = _get_current_twitter_clients() # Obtiene el cliente de la API con rotación if not client_v2: logger.error(f"No se pudo obtener un cliente de Twitter (X) válido en el intento {retries + 1}.") retries += 1 time.sleep(1) # Pequeña pausa antes de reintentar continue try: response_data = None tweet_fields = ['created_at', 'public_metrics', 'author_id', 'conversation_id', 'in_reply_to_user_id', 'lang'] expansions = ['author_id', 'in_reply_to_user_id'] user_fields = ['username', 'name', 'profile_image_url', 'verified'] # Lógica para diferentes tipos de consulta if ("x.com/" in query_or_url or "twitter.com/" in query_or_url) and "/status/" in query_or_url: match = re.search(r'.*/status/(\d+)', query_or_url) if match: tweet_id = match.group(1) logger.info(f'Buscando tweet por ID: {tweet_id}') response_data = client_v2.get_tweet(id=tweet_id, tweet_fields=tweet_fields, expansions=expansions, user_fields=user_fields) if response_data.data: response_data.data = [response_data.data] # Envuelve el tweet individual en una lista para consistencia else: return pd.DataFrame({"Error": ["No se pudo extraer el ID del tweet de la URL."]}) elif query_or_url.startswith("#"): logger.info(f'Buscando tweets por hashtag: {query_or_url}') response_data = client_v2.search_recent_tweets(query=query_or_url, tweet_fields=tweet_fields, expansions=expansions, user_fields=user_fields, max_results=min(max_results, 100)) elif query_or_url.startswith("@"): username_to_search = query_or_url.lstrip('@') logger.info(f'Buscando tweets del usuario: {username_to_search}') #user_lookup = client_v2.get_user(username=username_to_search, user_fields=['id']) user_lookup = client_v2.get_user(username=username_to_search, user_fields=['id', 'name', 'username']) if user_lookup.data: user_id = user_lookup.data.id #response_data = client_v2.get_users_tweets(id=user_id, tweet_fields=tweet_fields, expansions=expansions, user_fields=user_fields, max_results=min(max_results, 100)) response_data = client_v2.get_users_tweets(id=user_id, tweet_fields=tweet_fields, expansions=expansions, user_fields=user_fields, max_results=min(max_results, 100)) else: return pd.DataFrame({"Error": [f'Usuario de Twitter no encontrado: {username_to_search}']}) elif query_or_url.isdigit(): # Si es solo un ID de tweet numérico tweet_id = query_or_url logger.info(f'Buscando tweet por ID numérico: {tweet_id}') response_data = client_v2.get_tweet(id=tweet_id, tweet_fields=tweet_fields, expansions=expansions, user_fields=user_fields) if response_data.data: response_data.data = [response_data.data] # Consistencia else: # Este es el caso de búsqueda general (ej. "Elektra", "Coppel") logger.info(f"Consulta general detectada: '{query_or_url}'. Intentando buscar usuario similar.") # Busca el usuario más relevante basado en la consulta general logger.info(f"Paso 1: Buscando usuario exacto '@{query_or_url}' con API v2.") #user_lookup_v2 = client_v2.get_user(username=query_or_url, user_fields=user_fields) user_lookup_v2 = client_v2.get_user(username=query_or_url, user_fields=user_fields) if user_lookup_v2.data: user_id = user_lookup_v2.data.id logger.info(f"Usuario exacto encontrado {query_or_url}") response_data = client_v2.get_users_tweets(id=user_id, tweet_fields=tweet_fields, expansions=expansions, user_fields=user_fields, max_results=min(max_results, 10)) else: # Paso 2: Si no hay usuario exacto, buscar usuarios similares (API v1.1 es necesaria para esto). logger.info(f"Paso 2: No se encontró usuario exacto. Buscando usuarios similares con API v1.1.") # La búsqueda de usuarios por un texto general solo está disponible en la API v1.1 users_lookup_v1 = _api_v1.search_users(q=query_or_url, count=1) if users_lookup_v1: most_relevant_user = users_lookup_v1[0] user_id = most_relevant_user.id logger.info(f"Usuario más relevante encontrado: '{most_relevant_user.screen_name}' (ID: {user_id}). Obteniendo sus tweets.") response_data = client_v2.get_users_tweets(id=user_id, tweet_fields=tweet_fields, expansions=expansions, user_fields=user_fields, max_results=min(max_results, 10)) else: # Paso 3: Si no se encuentra ningún usuario, buscar tweets con el texto. logger.warning(f"Paso 3: No se encontraron usuarios. Buscando tweets que contengan '{query_or_url}'.") response_data = client_v2.search_recent_tweets(query=query_or_url, tweet_fields=tweet_fields, expansions=expansions, user_fields=user_fields, max_results=min(max_results, 10)) #else: # logger.warning(f"Consulta de Twitter no válida: '{query_or_url}'.") # return pd.DataFrame({"Error": ["Consulta de Twitter no válida (formato @usuario, #hashtag, ID o URL de tweet)"]}) # Incrementa el contador de llamadas para la cuenta actual twitter_accounts[current_account_index]['calls_made'] += 1 if response_data and response_data.data: users_data = {u["id"]: u for u in response_data.includes.get('users', [])} if response_data.includes else {} for tweet_obj in response_data.data: author_info = users_data.get(tweet_obj.author_id, {}) metrics = tweet_obj.public_metrics if tweet_obj.public_metrics else {} tweets_list.append({ 'tweet_id': tweet_obj.id, 'text': tweet_obj.text, 'author_id': tweet_obj.author_id, 'username': author_info.get("username", "N/A"), 'author_name': author_info.get("name", "N/A"), 'author_verified': author_info.get("verified", False), 'created_at': tweet_obj.created_at, 'like_count': metrics.get('like_count', 0), 'retweet_count': metrics.get('retweet_count', 0), 'reply_count': metrics.get('reply_count', 0), 'quote_count': metrics.get('quote_count', 0), 'impression_count': metrics.get('impression_count', 0), 'conversation_id': tweet_obj.conversation_id, 'in_reply_to_user_id': tweet_obj.in_reply_to_user_id, 'lang': tweet_obj.lang ,'query': query_or_url, # Añadir la query original para el contexto de comparación 'source': 'Twitter' # Añadir la fuente para el análisis comparativo }) df = pd.DataFrame(tweets_list) df['origin'] = "twitter" # Columna 'origin' para identificar la plataforma en análisis posteriores return df else: # Si no hay datos, pero no hay error, significa que no hay resultados para la consulta return pd.DataFrame({'Mensaje': [f"No se encontraron tweets para la consulta '{query_or_url}'."]}) except tweepy.TooManyRequests: logger.warning(f"Límite de tasa excedido para la cuenta '{twitter_accounts[current_account_index]['name']}'. Cambiando de cuenta...") _switch_to_next_twitter_account() retries += 1 time.sleep(1) # Pequeña pausa antes de reintentar except tweepy.TweepyException as e: error_message = str(e) # Intenta extraer un mensaje de error más específico de la excepción de Tweepy if hasattr(e, 'api_errors') and e.api_errors and isinstance(e.api_errors, list) and e.api_errors[0]: api_error = e.api_errors[0] if isinstance(api_error, dict): error_message = api_error.get('detail', error_message) if 'title' in api_error: error_message = f"{api_error['title']}: {error_message}" elif hasattr(api_error, 'message'): error_message = api_error.message elif hasattr(e, 'response') and e.response is not None: try: error_details = e.response.json(); error_message = error_details.get('detail', error_details.get('title', str(e))) except ValueError: error_message = e.response.text if e.response.text else str(e) logger.error(f"TweepyException para '{query_or_url}' con cuenta '{twitter_accounts[current_account_index]['name']}': {error_message}", exc_info=True) _switch_to_next_twitter_account() # Cambia de cuenta también en otros errores de Tweepy retries += 1 time.sleep(1) except Exception as e: logger.error(f"Error general en _get_tweets_from_twitter_api para '{query_or_url}' con cuenta '{twitter_accounts[current_account_index]['name']}': {e}", exc_info=True) _switch_to_next_twitter_account() # Cambia de cuenta en errores generales retries += 1 time.sleep(1) logger.error(f"Falló la obtención de tweets para '{query_or_url}' después de {MAX_RETRIES} intentos con todas las cuentas disponibles.") return pd.DataFrame({"Error": [f"Falló la obtención de tweets después de múltiples intentos para '{query_or_url}'."]})