File size: 11,273 Bytes
8f34dd6
 
7a972a6
2491bc4
87a569f
2491bc4
d14d0e5
 
 
 
 
 
7a972a6
75a2854
bbe94c9
d14d0e5
2491bc4
d14d0e5
 
2491bc4
 
d14d0e5
2491bc4
a02894c
2491bc4
 
8b50617
b7d0f8c
85fc2df
8b50617
0f35580
8b50617
d14d0e5
2f22a69
2491bc4
8f34dd6
2491bc4
d14d0e5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
52e6f04
d14d0e5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
390fb00
2491bc4
d14d0e5
 
2f22a69
2491bc4
d14d0e5
2491bc4
 
d14d0e5
2491bc4
d14d0e5
 
2f22a69
2491bc4
d14d0e5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a02894c
 
 
 
d14d0e5
 
 
 
 
 
 
 
 
 
a02894c
 
d14d0e5
 
a02894c
 
 
 
d14d0e5
a02894c
d14d0e5
 
2491bc4
d14d0e5
 
 
 
 
2491bc4
 
d14d0e5
 
2491bc4
d14d0e5
 
2491bc4
d14d0e5
 
2491bc4
d14d0e5
a02894c
d14d0e5
bbe94c9
d14d0e5
2491bc4
 
 
d14d0e5
 
 
2491bc4
 
d14d0e5
2491bc4
 
10c1c0d
d14d0e5
75a2854
bbe94c9
8b50617
 
0f35580
ca2f1b6
0f35580
2491bc4
0a2a989
ca2f1b6
 
 
2491bc4
ca2f1b6
2491bc4
ca2f1b6
 
 
 
 
2491bc4
ca2f1b6
 
0a2a989
2491bc4
 
0f35580
0a2a989
 
 
a02894c
2491bc4
0f35580
 
0a2a989
0f35580
ca2f1b6
 
 
0f35580
 
2491bc4
0a2a989
2491bc4
0a2a989
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
import os
import io
import time
from PIL import Image
from io import BytesIO
from google import genai
try:
    from google.genai import types
    print("Imported 'types' from 'google.genai'.")
except ImportError:
    print("Warning: 'types' not found under 'google.genai'. Config might not work.")
    types = None
from google.cloud import storage
import traceback
from typing import Union
import base64 

# --- 創建全域 genai_client ---
genai_client = None
try:
    genai_client = genai.Client(api_key=os.environ["GOOGLE_API_KEY"])
    print("Successfully initialized global genai.Client (High-Level Facade).")
except AttributeError:
    print("Warning: genai.Client not found. Trying API calls might fail.")
except Exception as client_err:
    print(f"Error initializing global genai.Client: {client_err}")

class ImageGenerator:
    def __init__(self):
        """
        初始化 ImageGenerator。
        """
        pass # 使用全域 client

    def generate_image_with_gemini(self, prompt) -> Union[bytes, None]:
        """
        使用 Gemini 模型生成圖片。
        """
        if not genai_client:
             print("Error: Global genai_client is not available.")
             return None
        try:
            print(f"Requesting image data for prompt: '{prompt}' using global genai_client.")
            model_name = "models/gemini-2.0-flash-exp-image-generation"
            if not hasattr(genai_client, 'models') or not hasattr(genai_client.models, 'generate_content'):
                 print("Error: Global genai_client does not have 'models.generate_content' method.")
                 return None

            gen_config = None
            if types and hasattr(types, 'GenerateContentConfig'):
                 try: gen_config = types.GenerateContentConfig(response_modalities=['Text', 'Image'])
                 except: gen_config = None
            else: print("types.GenerateContentConfig not available. Proceeding without config.")

            print(">>> Calling genai_client.models.generate_content...")
            start_time = time.time()
            if gen_config:
                 response = genai_client.models.generate_content(model=model_name, contents=prompt, config=gen_config)
            else:
                 response = genai_client.models.generate_content(model=model_name, contents=prompt)
            end_time = time.time()
            print(f"<<< Call to generate_content completed in {end_time - start_time:.2f} seconds.")
            # print("--- Full API Response (generate) ---"); try: print(response); except: pass; print("---") # 可取消註解除錯

            image_bytes = None
            if response.candidates:
                 candidate = response.candidates[0]
                 if candidate.content and candidate.content.parts:
                      for part in candidate.content.parts:
                           if hasattr(part, 'inline_data') and part.inline_data is not None:
                                print("Found inline image data.")
                                image_bytes = part.inline_data.data
                                try: _ = Image.open(BytesIO(image_bytes)); print(f"Image data verified, size: {len(image_bytes)} bytes.")
                                except Exception as img_verify_err: print(f"Warning: Received data might not be a valid image: {img_verify_err}"); return None
                                break
            else:
                 print("Response has no candidates."); # ... (檢查 prompt_feedback) ...

            if not image_bytes: print("No valid inline image data found in response.")
            return image_bytes

        except AttributeError as ae: print(f"圖像數據生成過程中發生屬性錯誤: {ae}"); traceback.print_exc(); return None
        except Exception as e: print(f"圖像數據生成過程中發生未預期錯誤: {e}"); traceback.print_exc(); return None

    # --- 圖片編輯方法 ---
    def edit_image_with_gemini(self, image_bytes: bytes, edit_prompt: str) -> Union[bytes, None]:
        """
        使用 Gemini 模型編輯圖片 (依照官方文件範例)。

        Args:
            image_bytes (bytes): 要編輯的原始圖片的二進位數據。
            edit_prompt (str): 描述如何編輯圖片的文字提示。

        Returns:
            Union[bytes, None]: 編輯後的圖片二進位數據或 None。
        """
        if not genai_client:
             print("Error: Global genai_client is not available.")
             return None
        if not image_bytes: print("Error: No image bytes provided for editing."); return None
        if not edit_prompt: print("Error: No edit prompt provided."); return None

        try:
            print(f"Requesting image edit with prompt: '{edit_prompt}' using global genai_client.")
            model_name = "models/gemini-2.0-flash-exp-image-generation" # 同一個模型

            # --- 準備 contents 列表 (文字 + PIL Image 物件) ---
            try:
                img = Image.open(BytesIO(image_bytes))
                # 確保是 RGB 模式,如果需要的話
                # if img.mode == 'RGBA': img = img.convert('RGB')
            except Exception as img_open_err:
                print(f"Error opening provided image bytes for editing: {img_open_err}")
                return None

            contents_for_edit = [edit_prompt, img] # 列表包含文字和 PIL Image 物件
            # ---

            # 檢查 API 調用方法是否存在
            if not hasattr(genai_client, 'models') or not hasattr(genai_client.models, 'generate_content'):
                 print("Error: Global genai_client does not have 'models.generate_content' method.")
                 return None

            # 準備 config (同圖片生成)
            gen_config = None
            if types and hasattr(types, 'GenerateContentConfig'):
                 try: gen_config = types.GenerateContentConfig(response_modalities=['Text', 'Image'])
                 except: gen_config = None

            print(">>> Calling genai_client.models.generate_content for image edit...")
            start_time = time.time()
            # 根據是否有 config 調用 API
            if gen_config:
                 response = genai_client.models.generate_content(
                     model=model_name,
                     contents=contents_for_edit, # <--- 傳遞包含圖片和文字的列表
                     config=gen_config
                 )
            else:
                 response = genai_client.models.generate_content(
                     model=model_name,
                     contents=contents_for_edit # <--- 傳遞包含圖片和文字的列表
                 )
            end_time = time.time()
            print(f"<<< Call to generate_content (edit) completed in {end_time - start_time:.2f} seconds.")

            # print("--- Full API Response (edit) ---"); try: print(response); except: pass; print("---") # 可取消註解除錯

            # --- 處理回應 (尋找編輯後的圖片 inline_data) ---
            edited_image_bytes = None
            if response.candidates:
                 candidate = response.candidates[0]
                 if candidate.content and candidate.content.parts:
                      print("Iterating through edit response parts...")
                      for i, part in enumerate(candidate.content.parts):
                           if hasattr(part, 'inline_data') and part.inline_data is not None:
                                print(f"  Part {i}: Found inline image data (edited).")
                                edited_image_bytes = part.inline_data.data
                                try:
                                     _ = Image.open(BytesIO(edited_image_bytes))
                                     print(f"  Edited image data received and verified, size: {len(edited_image_bytes)} bytes.")
                                except Exception as img_verify_err:
                                     print(f"  Warning: Received edited data might not be a valid image: {img_verify_err}")
                                     return None
                                break
            else:
                 print("Edit response has no candidates.")
                 if hasattr(response, 'prompt_feedback') and response.prompt_feedback.block_reason:
                     print(f"Block reason: {response.prompt_feedback.block_reason}")

            if not edited_image_bytes:
                 print("No valid inline image data found in edit response.")
            return edited_image_bytes # 返回編輯後的 bytes 或 None

        except AttributeError as ae:
             print(f"圖片編輯過程中發生屬性錯誤: {ae}")
             traceback.print_exc()
             return None
        except Exception as e:
            print(f"圖片編輯過程中發生未預期錯誤: {e}")
            traceback.print_exc()
            return None

    def upload_image_to_gcs(self, image_bytes: bytes, file_name_prefix: str = "gemini-image") -> Union[str, None]:
        """
        將圖片二進位數據 (bytes) 轉換為 JPG 並上傳到 Google Cloud Storage。
        """
        if not image_bytes: return None
        try:
            print("Converting received image bytes to JPG format...")
            try:
                image = Image.open(BytesIO(image_bytes))
                if image.mode == 'RGBA': image = image.convert('RGB')
                jpg_buffer = io.BytesIO()
                image.save(jpg_buffer, format="JPEG", quality=85)
                jpg_bytes = jpg_buffer.getvalue()
                print(f"Image converted to JPG successfully, size: {len(jpg_bytes)} bytes.")
            except Exception as convert_err:
                print(f"Error converting image to JPG: {convert_err}")
                traceback.print_exc()
                return None

            bucket_name = "stt_bucket_for_allen"
            timestamp = int(time.time() * 1000)
            file_extension = "jpg"
            content_type = "image/jpeg"
            gcs_file_path = f"EJ/{file_name_prefix}_{timestamp}.{file_extension}"
            storage_client = storage.Client()
            bucket = storage_client.bucket(bucket_name)
            blob = bucket.blob(gcs_file_path)
            print(f"Uploading JPG image bytes to gs://{bucket_name}/{gcs_file_path}")
            blob.upload_from_string(data=jpg_bytes, content_type=content_type)
            print("Image uploaded to GCS.")

            public_url = blob.public_url
            print(f"Generated GCS public URL: {public_url}")
            if not public_url or not public_url.startswith("https://"):
                 print(f"Error or Warning: Invalid public URL generated: {public_url}")
                 if public_url and public_url.startswith("http://"):
                     public_url = "https://" + public_url[len("http://"):]
                     print(f"Corrected URL to HTTPS: {public_url}")
                 else: return None
            print(f"Image uploaded successfully to GCS: {public_url}")
            return public_url
        except Exception as e:
            print(f"上傳圖片到 GCP 時出錯: {e}")
            traceback.print_exc()
            return None