import os import requests import json import time from qiniu import put_file import tempfile import shutil import random API_KEY = os.environ['API_KEY'] headers = { "User-Agent": "PexelDance Sdk/1.0", 'Authorization': 'Bearer %s' % API_KEY, 'Content-Type': 'application/json' } def get_endpoint(): hosts = [ "https://api-01.pexeldance.com", "https://api-02.pexeldance.com", "https://api-03.pexeldance.com", "https://api-04.pexeldance.com", "https://api-05.pexeldance.com", "https://api-06.pexeldance.com", "https://api-07.pexeldance.com", "https://api-08.pexeldance.com", "https://api-09.pexeldance.com", "https://api-10.pexeldance.com", # "https://api-11.pexeldance.com", # "https://api-12.pexeldance.com", # "https://api-13.pexeldance.com", # "https://api-14.pexeldance.com", # "https://api-15.pexeldance.com", # "https://api-16.pexeldance.com", # "https://api-17.pexeldance.com", # "https://api-18.pexeldance.com", # "https://api-19.pexeldance.com", # "https://api-20.pexeldance.com", ] return random.choice(hosts) def get_upload_token(extension="mp4"): endpoint = get_endpoint() url = endpoint + "/api/upload/token" payload = json.dumps({ "file_ext": extension, }) response = requests.request("POST", url, headers=headers, data=bytes(payload, "utf-8")) if 200<= response.status_code < 300: return response.json() return None def upload_file(local_file, file_key, token): ret, info = put_file(token, file_key, local_file) if info.status_code == 200: return ret else: return None def confirm_upload(key, name, hash_, fsize, bucket): endpoint = get_endpoint() url = endpoint + "/api/upload/confirm" payload = json.dumps({ "key": key, "name": name, "hash": hash_, "fsize": fsize, "bucket": bucket, }) response = requests.request("POST", url, headers=headers, data=payload) if 200<= response.status_code < 300: return response.json() return None def upload_video(file): ext = file.split(".")[-1] if not os.path.exists(file): return False token_res = get_upload_token(ext) upload_data = token_res.get('data') if not upload_data: return False upload_token = upload_data.get('token') if not upload_token: return False bucket_key = upload_data.get('bucket_key') if not bucket_key: return False ret = upload_file(file, bucket_key, upload_token) if not ret: return False key, name, hash_, fsize, bucket = ret.get('key'), upload_data.get('c_string'), ret.get('hash'), ret.get('fsize'), ret.get('bucket') upload_res = confirm_upload(key, name, hash_, fsize, bucket) if not upload_res: return False return upload_res.get('data') def generate(item_id, style_id, width, height, prompt): endpoint = get_endpoint() url = endpoint + "/api/job/common" payload = json.dumps({ "user_content": { "item_id": item_id, "seed": 11111, "width": width, "height": height, "prompt": prompt }, "unit": 3, "use_credit_type": "credit", "name": style_id }) response = requests.request("POST", url, headers=headers, data=payload) if 200<= response.status_code < 300: return response.json() else: return None def get_job_status(job_id): endpoint = get_endpoint() url = endpoint + "/api/job" payload = json.dumps({ "id": job_id }) response = requests.request("GET", url, headers=headers, data=payload) if 200<= response.status_code < 300: return response.json() return None def get_item_url(_id): endpoint = get_endpoint() url = endpoint + "/api/download_item" payload = json.dumps({ "key_id": [ _id ] }) response = requests.request("POST", url, headers=headers, data=payload) if 200<= response.status_code < 300: return response.json() return None def get_result(job_id): endpoint = get_endpoint() url = endpoint + "/api/item/job_id" payload = json.dumps({ "job_id": job_id }) response = requests.request("GET", url, headers=headers, data=payload) if 200<= response.status_code < 300: return response.json() return None def download_file(url): response = requests.get(url, stream=True) if response.status_code == 200: tmpfile = tempfile.NamedTemporaryFile(delete=False) tmpfile.close() with open(tmpfile.name, 'wb') as f: shutil.copyfileobj(response.raw, f) return tmpfile.name else: return None def processing(in_video, width, height, style_id,transfer_duration, prompt): videoRes = upload_video(in_video) if not videoRes: return None videoId = videoRes.get('id') if not videoId: return None req_access = False try_times = 1000 for i in range(try_times): generate_result = generate(videoId, style_id, width, height, prompt) if not generate_result: time.sleep(8) continue jobInfo = generate_result.get('data') if not jobInfo: time.sleep(8) continue job_id = jobInfo.get("job_id") if job_id: req_access = True break if not req_access: raise "too many requests" while True: jobStatusInfo = get_job_status(job_id) js = jobStatusInfo.get('data') if js is None: return None if js.get('status') == "finished": break elif js.get('status') == "failed": return None time.sleep(10) result = get_result(job_id) if not result: return None res_data = result.get('data') if not res_data: return None li = res_data.get('list') if len(li) == 0: return None jobRes = li[0] item_id = jobRes.get('id') if not item_id: return None itemDes = get_item_url(item_id) itemData = itemDes.get('data') if not itemData: return None signed_urls = itemData.get('signed_urls') if len(signed_urls) == 0: return None item_url = signed_urls[0] file_path = download_file(item_url) if not file_path: return None return file_path