import json import os import time import requests from aworld.core.common import Observation, ActionResult from typing_extensions import Any from aworld.runners.callback.decorator import reg_callback from aworld.logs.util import logger @reg_callback("gen_video_server__video_tasks") def gen_video(actionResult:ActionResult) -> ActionResult: try: if not actionResult or not actionResult.content: return actionResult content = json.loads(actionResult.content) task_id = content.get("video_id") if not task_id: return actionResult gen_video_item(task_id) return actionResult except Exception as e: logger.warning(f"Exception gen_video occurred: {e}") return actionResult def gen_video_item(task_id:str) -> Any: if not task_id: return None try: from dotenv import load_dotenv load_dotenv() api_key = os.getenv('DASHSCOPE_API_KEY') query_base_url = os.getenv('DASHSCOPE_QUERY_BASE_URL', '') # Step 2: Poll for results max_attempts = int(os.getenv('DASHSCOPE_VIDEO_RETRY_TIMES', 10)) # Increased default retries for video wait_time = int(os.getenv('DASHSCOPE_VIDEO_SLEEP_TIME', 5)) # Increased default wait time for video query_url = f"{query_base_url}{task_id}" for attempt in range(max_attempts): # Wait before polling time.sleep(wait_time) logger.info(f"Polling attempt {attempt + 1}/{max_attempts}...") # Poll for results query_response = requests.get(query_url, headers={'Authorization': f'Bearer {api_key}'}) if query_response.status_code != 200: logger.info(f"Poll request failed with status code {query_response.status_code}") continue try: query_result = query_response.json() except json.JSONDecodeError as e: logger.warning(f"Failed to parse response as JSON: {e}") continue # Check task status task_status = query_result.get("output", {}).get("task_status") if task_status == "SUCCEEDED": # Extract video URL video_url = query_result.get("output", {}).get("video_url") if video_url: # Return as array of objects with video_url for consistency with image API return json.dumps({"video_url": video_url}) else: logger.info("Video URL not found in the response") return None elif task_status in ["PENDING", "RUNNING"]: # If still running, continue to next polling attempt logger.info(f"gen_video_item Task status: {task_status}, continuing to next poll...") continue elif task_status == "FAILED": logger.warning("Task failed") return None else: # Any other status, return None logger.warning(f"Unexpected status: {task_status}") return None # If we get here, polling timed out logger.warning("Polling timed out after maximum attempts") return None except Exception as e: logger.warning(f"Exception gen_video_item occurred: {e}") return None