Spaces:
Sleeping
Sleeping
import json | |
import os | |
import time | |
import requests | |
from aworld.core.common import Observation, ActionResult | |
from typing_extensions import Any | |
from aworld.runners.callback.decorator import reg_callback | |
from aworld.logs.util import logger | |
def gen_video(actionResult:ActionResult) -> ActionResult: | |
try: | |
if not actionResult or not actionResult.content: | |
return actionResult | |
content = json.loads(actionResult.content) | |
task_id = content.get("video_id") | |
if not task_id: | |
return actionResult | |
gen_video_item(task_id) | |
return actionResult | |
except Exception as e: | |
logger.warning(f"Exception gen_video occurred: {e}") | |
return actionResult | |
def gen_video_item(task_id:str) -> Any: | |
if not task_id: | |
return None | |
try: | |
from dotenv import load_dotenv | |
load_dotenv() | |
api_key = os.getenv('DASHSCOPE_API_KEY') | |
query_base_url = os.getenv('DASHSCOPE_QUERY_BASE_URL', '') | |
# Step 2: Poll for results | |
max_attempts = int(os.getenv('DASHSCOPE_VIDEO_RETRY_TIMES', 10)) # Increased default retries for video | |
wait_time = int(os.getenv('DASHSCOPE_VIDEO_SLEEP_TIME', 5)) # Increased default wait time for video | |
query_url = f"{query_base_url}{task_id}" | |
for attempt in range(max_attempts): | |
# Wait before polling | |
time.sleep(wait_time) | |
logger.info(f"Polling attempt {attempt + 1}/{max_attempts}...") | |
# Poll for results | |
query_response = requests.get(query_url, headers={'Authorization': f'Bearer {api_key}'}) | |
if query_response.status_code != 200: | |
logger.info(f"Poll request failed with status code {query_response.status_code}") | |
continue | |
try: | |
query_result = query_response.json() | |
except json.JSONDecodeError as e: | |
logger.warning(f"Failed to parse response as JSON: {e}") | |
continue | |
# Check task status | |
task_status = query_result.get("output", {}).get("task_status") | |
if task_status == "SUCCEEDED": | |
# Extract video URL | |
video_url = query_result.get("output", {}).get("video_url") | |
if video_url: | |
# Return as array of objects with video_url for consistency with image API | |
return json.dumps({"video_url": video_url}) | |
else: | |
logger.info("Video URL not found in the response") | |
return None | |
elif task_status in ["PENDING", "RUNNING"]: | |
# If still running, continue to next polling attempt | |
logger.info(f"gen_video_item Task status: {task_status}, continuing to next poll...") | |
continue | |
elif task_status == "FAILED": | |
logger.warning("Task failed") | |
return None | |
else: | |
# Any other status, return None | |
logger.warning(f"Unexpected status: {task_status}") | |
return None | |
# If we get here, polling timed out | |
logger.warning("Polling timed out after maximum attempts") | |
return None | |
except Exception as e: | |
logger.warning(f"Exception gen_video_item occurred: {e}") | |
return None |