File size: 3,407 Bytes
44cd698
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
import json
import os
import time

import requests
from aworld.core.common import Observation, ActionResult
from typing_extensions import Any

from aworld.runners.callback.decorator import reg_callback

from aworld.logs.util import logger

@reg_callback("gen_video_server__video_tasks")
def gen_video(actionResult:ActionResult) -> ActionResult:
    try:
        if not actionResult or not actionResult.content:
            return actionResult
        content = json.loads(actionResult.content)
        task_id = content.get("video_id")
        if not task_id:
            return actionResult
        gen_video_item(task_id)

        return actionResult

    except Exception as e:
        logger.warning(f"Exception gen_video occurred: {e}")
        return actionResult

def gen_video_item(task_id:str) -> Any:
    if not task_id:
        return None
    try:
        from dotenv import load_dotenv
        load_dotenv()
        api_key = os.getenv('DASHSCOPE_API_KEY')
        query_base_url = os.getenv('DASHSCOPE_QUERY_BASE_URL', '')
        # Step 2: Poll for results
        max_attempts = int(os.getenv('DASHSCOPE_VIDEO_RETRY_TIMES', 10))  # Increased default retries for video
        wait_time = int(os.getenv('DASHSCOPE_VIDEO_SLEEP_TIME', 5))  # Increased default wait time for video
        query_url = f"{query_base_url}{task_id}"

        for attempt in range(max_attempts):
            # Wait before polling
            time.sleep(wait_time)
            logger.info(f"Polling attempt {attempt + 1}/{max_attempts}...")

            # Poll for results
            query_response = requests.get(query_url, headers={'Authorization': f'Bearer {api_key}'})

            if query_response.status_code != 200:
                logger.info(f"Poll request failed with status code {query_response.status_code}")
                continue

            try:
                query_result = query_response.json()
            except json.JSONDecodeError as e:
                logger.warning(f"Failed to parse response as JSON: {e}")
                continue

            # Check task status
            task_status = query_result.get("output", {}).get("task_status")

            if task_status == "SUCCEEDED":
                # Extract video URL
                video_url = query_result.get("output", {}).get("video_url")

                if video_url:
                    # Return as array of objects with video_url for consistency with image API
                    return json.dumps({"video_url": video_url})
                else:
                    logger.info("Video URL not found in the response")
                    return None
            elif task_status in ["PENDING", "RUNNING"]:
                # If still running, continue to next polling attempt
                logger.info(f"gen_video_item Task status: {task_status}, continuing to next poll...")
                continue
            elif task_status == "FAILED":
                logger.warning("Task failed")
                return None
            else:
                # Any other status, return None
                logger.warning(f"Unexpected status: {task_status}")
                return None

        # If we get here, polling timed out
        logger.warning("Polling timed out after maximum attempts")
        return None
    except Exception as e:
        logger.warning(f"Exception gen_video_item occurred: {e}")
        return None