Spaces:
Sleeping
Sleeping
File size: 11,273 Bytes
8f34dd6 7a972a6 2491bc4 87a569f 2491bc4 d14d0e5 7a972a6 75a2854 bbe94c9 d14d0e5 2491bc4 d14d0e5 2491bc4 d14d0e5 2491bc4 a02894c 2491bc4 8b50617 b7d0f8c 85fc2df 8b50617 0f35580 8b50617 d14d0e5 2f22a69 2491bc4 8f34dd6 2491bc4 d14d0e5 52e6f04 d14d0e5 390fb00 2491bc4 d14d0e5 2f22a69 2491bc4 d14d0e5 2491bc4 d14d0e5 2491bc4 d14d0e5 2f22a69 2491bc4 d14d0e5 a02894c d14d0e5 a02894c d14d0e5 a02894c d14d0e5 a02894c d14d0e5 2491bc4 d14d0e5 2491bc4 d14d0e5 2491bc4 d14d0e5 2491bc4 d14d0e5 2491bc4 d14d0e5 a02894c d14d0e5 bbe94c9 d14d0e5 2491bc4 d14d0e5 2491bc4 d14d0e5 2491bc4 10c1c0d d14d0e5 75a2854 bbe94c9 8b50617 0f35580 ca2f1b6 0f35580 2491bc4 0a2a989 ca2f1b6 2491bc4 ca2f1b6 2491bc4 ca2f1b6 2491bc4 ca2f1b6 0a2a989 2491bc4 0f35580 0a2a989 a02894c 2491bc4 0f35580 0a2a989 0f35580 ca2f1b6 0f35580 2491bc4 0a2a989 2491bc4 0a2a989 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 |
import os
import io
import time
from PIL import Image
from io import BytesIO
from google import genai
try:
from google.genai import types
print("Imported 'types' from 'google.genai'.")
except ImportError:
print("Warning: 'types' not found under 'google.genai'. Config might not work.")
types = None
from google.cloud import storage
import traceback
from typing import Union
import base64
# --- 創建全域 genai_client ---
genai_client = None
try:
genai_client = genai.Client(api_key=os.environ["GOOGLE_API_KEY"])
print("Successfully initialized global genai.Client (High-Level Facade).")
except AttributeError:
print("Warning: genai.Client not found. Trying API calls might fail.")
except Exception as client_err:
print(f"Error initializing global genai.Client: {client_err}")
class ImageGenerator:
def __init__(self):
"""
初始化 ImageGenerator。
"""
pass # 使用全域 client
def generate_image_with_gemini(self, prompt) -> Union[bytes, None]:
"""
使用 Gemini 模型生成圖片。
"""
if not genai_client:
print("Error: Global genai_client is not available.")
return None
try:
print(f"Requesting image data for prompt: '{prompt}' using global genai_client.")
model_name = "models/gemini-2.0-flash-exp-image-generation"
if not hasattr(genai_client, 'models') or not hasattr(genai_client.models, 'generate_content'):
print("Error: Global genai_client does not have 'models.generate_content' method.")
return None
gen_config = None
if types and hasattr(types, 'GenerateContentConfig'):
try: gen_config = types.GenerateContentConfig(response_modalities=['Text', 'Image'])
except: gen_config = None
else: print("types.GenerateContentConfig not available. Proceeding without config.")
print(">>> Calling genai_client.models.generate_content...")
start_time = time.time()
if gen_config:
response = genai_client.models.generate_content(model=model_name, contents=prompt, config=gen_config)
else:
response = genai_client.models.generate_content(model=model_name, contents=prompt)
end_time = time.time()
print(f"<<< Call to generate_content completed in {end_time - start_time:.2f} seconds.")
# print("--- Full API Response (generate) ---"); try: print(response); except: pass; print("---") # 可取消註解除錯
image_bytes = None
if response.candidates:
candidate = response.candidates[0]
if candidate.content and candidate.content.parts:
for part in candidate.content.parts:
if hasattr(part, 'inline_data') and part.inline_data is not None:
print("Found inline image data.")
image_bytes = part.inline_data.data
try: _ = Image.open(BytesIO(image_bytes)); print(f"Image data verified, size: {len(image_bytes)} bytes.")
except Exception as img_verify_err: print(f"Warning: Received data might not be a valid image: {img_verify_err}"); return None
break
else:
print("Response has no candidates."); # ... (檢查 prompt_feedback) ...
if not image_bytes: print("No valid inline image data found in response.")
return image_bytes
except AttributeError as ae: print(f"圖像數據生成過程中發生屬性錯誤: {ae}"); traceback.print_exc(); return None
except Exception as e: print(f"圖像數據生成過程中發生未預期錯誤: {e}"); traceback.print_exc(); return None
# --- 圖片編輯方法 ---
def edit_image_with_gemini(self, image_bytes: bytes, edit_prompt: str) -> Union[bytes, None]:
"""
使用 Gemini 模型編輯圖片 (依照官方文件範例)。
Args:
image_bytes (bytes): 要編輯的原始圖片的二進位數據。
edit_prompt (str): 描述如何編輯圖片的文字提示。
Returns:
Union[bytes, None]: 編輯後的圖片二進位數據或 None。
"""
if not genai_client:
print("Error: Global genai_client is not available.")
return None
if not image_bytes: print("Error: No image bytes provided for editing."); return None
if not edit_prompt: print("Error: No edit prompt provided."); return None
try:
print(f"Requesting image edit with prompt: '{edit_prompt}' using global genai_client.")
model_name = "models/gemini-2.0-flash-exp-image-generation" # 同一個模型
# --- 準備 contents 列表 (文字 + PIL Image 物件) ---
try:
img = Image.open(BytesIO(image_bytes))
# 確保是 RGB 模式,如果需要的話
# if img.mode == 'RGBA': img = img.convert('RGB')
except Exception as img_open_err:
print(f"Error opening provided image bytes for editing: {img_open_err}")
return None
contents_for_edit = [edit_prompt, img] # 列表包含文字和 PIL Image 物件
# ---
# 檢查 API 調用方法是否存在
if not hasattr(genai_client, 'models') or not hasattr(genai_client.models, 'generate_content'):
print("Error: Global genai_client does not have 'models.generate_content' method.")
return None
# 準備 config (同圖片生成)
gen_config = None
if types and hasattr(types, 'GenerateContentConfig'):
try: gen_config = types.GenerateContentConfig(response_modalities=['Text', 'Image'])
except: gen_config = None
print(">>> Calling genai_client.models.generate_content for image edit...")
start_time = time.time()
# 根據是否有 config 調用 API
if gen_config:
response = genai_client.models.generate_content(
model=model_name,
contents=contents_for_edit, # <--- 傳遞包含圖片和文字的列表
config=gen_config
)
else:
response = genai_client.models.generate_content(
model=model_name,
contents=contents_for_edit # <--- 傳遞包含圖片和文字的列表
)
end_time = time.time()
print(f"<<< Call to generate_content (edit) completed in {end_time - start_time:.2f} seconds.")
# print("--- Full API Response (edit) ---"); try: print(response); except: pass; print("---") # 可取消註解除錯
# --- 處理回應 (尋找編輯後的圖片 inline_data) ---
edited_image_bytes = None
if response.candidates:
candidate = response.candidates[0]
if candidate.content and candidate.content.parts:
print("Iterating through edit response parts...")
for i, part in enumerate(candidate.content.parts):
if hasattr(part, 'inline_data') and part.inline_data is not None:
print(f" Part {i}: Found inline image data (edited).")
edited_image_bytes = part.inline_data.data
try:
_ = Image.open(BytesIO(edited_image_bytes))
print(f" Edited image data received and verified, size: {len(edited_image_bytes)} bytes.")
except Exception as img_verify_err:
print(f" Warning: Received edited data might not be a valid image: {img_verify_err}")
return None
break
else:
print("Edit response has no candidates.")
if hasattr(response, 'prompt_feedback') and response.prompt_feedback.block_reason:
print(f"Block reason: {response.prompt_feedback.block_reason}")
if not edited_image_bytes:
print("No valid inline image data found in edit response.")
return edited_image_bytes # 返回編輯後的 bytes 或 None
except AttributeError as ae:
print(f"圖片編輯過程中發生屬性錯誤: {ae}")
traceback.print_exc()
return None
except Exception as e:
print(f"圖片編輯過程中發生未預期錯誤: {e}")
traceback.print_exc()
return None
def upload_image_to_gcs(self, image_bytes: bytes, file_name_prefix: str = "gemini-image") -> Union[str, None]:
"""
將圖片二進位數據 (bytes) 轉換為 JPG 並上傳到 Google Cloud Storage。
"""
if not image_bytes: return None
try:
print("Converting received image bytes to JPG format...")
try:
image = Image.open(BytesIO(image_bytes))
if image.mode == 'RGBA': image = image.convert('RGB')
jpg_buffer = io.BytesIO()
image.save(jpg_buffer, format="JPEG", quality=85)
jpg_bytes = jpg_buffer.getvalue()
print(f"Image converted to JPG successfully, size: {len(jpg_bytes)} bytes.")
except Exception as convert_err:
print(f"Error converting image to JPG: {convert_err}")
traceback.print_exc()
return None
bucket_name = "stt_bucket_for_allen"
timestamp = int(time.time() * 1000)
file_extension = "jpg"
content_type = "image/jpeg"
gcs_file_path = f"EJ/{file_name_prefix}_{timestamp}.{file_extension}"
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(gcs_file_path)
print(f"Uploading JPG image bytes to gs://{bucket_name}/{gcs_file_path}")
blob.upload_from_string(data=jpg_bytes, content_type=content_type)
print("Image uploaded to GCS.")
public_url = blob.public_url
print(f"Generated GCS public URL: {public_url}")
if not public_url or not public_url.startswith("https://"):
print(f"Error or Warning: Invalid public URL generated: {public_url}")
if public_url and public_url.startswith("http://"):
public_url = "https://" + public_url[len("http://"):]
print(f"Corrected URL to HTTPS: {public_url}")
else: return None
print(f"Image uploaded successfully to GCS: {public_url}")
return public_url
except Exception as e:
print(f"上傳圖片到 GCP 時出錯: {e}")
traceback.print_exc()
return None |