File size: 9,657 Bytes
e037628
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ab3578e
e037628
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
import io
import base64
import os
from PIL import Image
import config
from openai import OpenAI
import warnings
import time
from google.generativeai import GenerativeModel
from datetime import datetime
warnings.filterwarnings("ignore", message="IMAGE_SAFETY is not a valid FinishReason")

global_image_data_url = None
global_image_prompt = None
global_image_description = None

def log_execution(func):
    def wrapper(*args, **kwargs):
        start_time = time.time()
        start_str = datetime.fromtimestamp(start_time).strftime('%Y-%m-%d %H:%M:%S')
        
        result = func(*args, **kwargs)
        
        end_time = time.time()
        end_str = datetime.fromtimestamp(end_time).strftime('%Y-%m-%d %H:%M:%S')
        duration = end_time - start_time

        
        return result
    return wrapper

@log_execution
def generate_image_fn_deprecated (selected_prompt, model="gpt-image-1", output_path="models\benchmark"):
    """
    Generate an image from the prompt via the OpenAI API using gpt-image-1.
    Convert the image to a data URL and optionally save it to a file.

    Args:
        selected_prompt (str): The prompt to generate the image from.
        model (str): Should be "gpt-image-1". Parameter kept for compatibility.
        output_path (str, optional): If provided, saves the image to this path. Defaults to None.

    Returns:
        PIL.Image.Image or None: The generated image as a PIL Image object, or None on error.
    """
    global global_image_data_url, global_image_prompt
    
    MAX_PROMPT_LENGTH = 32000
    if len(selected_prompt) > MAX_PROMPT_LENGTH:
        selected_prompt = smart_truncate_prompt(selected_prompt, MAX_PROMPT_LENGTH)
        print(f"Warning: Prompt was smartly truncated to {len(selected_prompt)} characters while preserving critical details")
    
    global_image_prompt = selected_prompt

    model = "gpt-image-1"

    try:
        client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY", config.OPENAI_API_KEY))

        api_params = {
            "model": model,
            "prompt": selected_prompt,
            "size": "1024x1536" ,
            "quality": "high",
            "moderation":"low"
        }

        result = client.images.generate(**api_params)

        

        image_bytes = base64.b64decode(image_base64)

        image = Image.open(io.BytesIO(image_bytes))

        if output_path:
            try:
                os.makedirs(os.path.dirname(output_path), exist_ok=True)
                with open(output_path, "wb") as f:
                    f.write(image_bytes)
                print(f"Successfully saved image to {output_path}")
            except Exception as e:
                print(f"Error saving image to {output_path}: {str(e)}")

        buffered = io.BytesIO()
        image.save(buffered, format="PNG")
        img_bytes = buffered.getvalue()
        img_b64 = base64.b64encode(img_bytes).decode("utf-8")
        global_image_data_url = f"data:image/png;base64,{img_b64}"

        print(f"Successfully generated image with prompt: {selected_prompt[:50]}...")
        return image  
    except Exception as e:
        print(f"Error generating image: {str(e)}")
        return None 
@log_execution
def generate_image_fn(selected_prompt, model="gemini-2.5-flash-image-preview", output_path="models/benchmark"):
    """
    Generate an image from the prompt via the Google Gemini API using vertexai.
    Convert the image to a data URL and optionally save it to a file.

    Args:
        selected_prompt (str): The prompt to generate the image from.
        model (str): The Gemini model to use. Defaults to "gemini-2.5-flash-image-preview".
        output_path (str, optional): If provided, saves the image to this path. Defaults to "models/benchmark".

    Returns:
        PIL.Image.Image or None: The generated image as a PIL Image object, or None on error.
    """
    global global_image_data_url, global_image_prompt
    
    MAX_PROMPT_LENGTH = 32000
    if len(selected_prompt) > MAX_PROMPT_LENGTH:
        selected_prompt = smart_truncate_prompt(selected_prompt, MAX_PROMPT_LENGTH)
        print(f"Warning: Prompt was smartly truncated to {len(selected_prompt)} characters while preserving critical details")
    
    global_image_prompt = selected_prompt

    try:
        from google.generativeai import GenerativeModel
        from PIL import Image
        import io
        import base64
        import os
        
        # Initialize the Gemini model
        gemini_model = GenerativeModel(model)
        
        # Generate content with the prompt
        response = gemini_model.generate_content([selected_prompt])
        
        # Extract the generated image from the response
        image = None
        image_bytes = None
        has_text_response = False
        
        for part in response.candidates[0].content.parts:
            # Check for text responses (ignore these)
            if hasattr(part, 'text') and part.text:
                has_text_response = True
                print(f"Ignoring text response from API: {part.text[:100]}...")
                continue
            
            # Look for image data
            if hasattr(part, 'inline_data') and part.inline_data is not None:
                image_bytes = part.inline_data.data
                
                # Verify we have valid data
                if not image_bytes or len(image_bytes) == 0:
                    print("Warning: inline_data.data is empty, skipping...")
                    continue
                
                # Try to parse the image
                try:
                    img_io = io.BytesIO(image_bytes)
                    image = Image.open(img_io)
                    image.load()  # Force load to verify it's valid
                    print(f"Successfully loaded image: {len(image_bytes)} bytes")
                    break
                except Exception as img_error:
                    print(f"Invalid image data received, skipping: {img_error}")
                    continue
        
        # If we only got text and no image, return None
        if image is None:
            if has_text_response:
                print("API returned text instead of image - skipping this response")
            else:
                print("No image data found in response")
            return None
        
        # Save image to file if output_path is provided
        if output_path:
            try:
                os.makedirs(os.path.dirname(output_path), exist_ok=True)
                # Ensure output_path has an image extension
                if not output_path.lower().endswith(('.png', '.jpg', '.jpeg')):
                    output_path = f"{output_path}.png"
                
                image.save(output_path)
                print(f"Successfully saved image to {output_path}")
            except Exception as e:
                print(f"Error saving image to {output_path}: {str(e)}")
        
        # Create data URL for the image
        buffered = io.BytesIO()
        image.save(buffered, format="PNG")
        img_bytes = buffered.getvalue()
        img_b64 = base64.b64encode(img_bytes).decode("utf-8")
        global_image_data_url = f"data:image/png;base64,{img_b64}"
        
        print(f"Successfully generated image with prompt: {selected_prompt[:50]}...")
        return image
    
    except Exception as e:
        print(f"Error generating image: {str(e)}")
        import traceback
        traceback.print_exc()
        return None
    

@log_execution
def smart_truncate_prompt(prompt, max_length):
    """
    Smart truncation that preserves critical details and visual consistency information.
    Prioritizes character descriptions, layout specifications, and technical requirements.
    """
    if len(prompt) <= max_length:
        return prompt
    
    critical_sections = [
        "CRITICAL LAYOUT:",
        "🎭 CRITICAL CHARACTER CONSISTENCY PROTOCOL:",
        "CHARACTER 1",
        "CHARACTER 2", 
        "CHARACTER 3",
        "STORY CONTENT:",
        "πŸ—οΈ ENVIRONMENTAL CONSISTENCY PROTOCOL:",
        "🎨 COMIC BOOK STYLE MASTERY:",
        "🎨 AUTHENTIC MANGA STYLE:",
        "🎨 PHOTOREALISTIC EXCELLENCE:",
        "🎨 CINEMATIC VISUAL MASTERY:",
        "🎨 HIGH-QUALITY ILLUSTRATION:",
        "πŸ“ PANEL COMPOSITION MASTERY:",
        "πŸ” DETAIL PRESERVATION PROTOCOL:",
        "⚑ ADVANCED QUALITY REQUIREMENTS:"
    ]
    
    sections = prompt.split(" || ")
    
    preserved_sections = []
    preserved_length = 0
    
    for section in sections:
        section_trimmed = section.strip()
        if not section_trimmed:
            continue
            
        is_critical = any(critical_marker in section_trimmed for critical_marker in critical_sections[:8])  
        
        if is_critical or (preserved_length + len(section_trimmed) + 4 < max_length - 200):  
            preserved_sections.append(section_trimmed)
            preserved_length += len(section_trimmed) + 4  
        elif preserved_length < max_length * 0.7:  
            available_space = max_length - preserved_length - 200
            if available_space > 100:  
                truncated_section = section_trimmed[:available_space-20] + "..."
                preserved_sections.append(truncated_section)
                break
    
    preserved_prompt = " || ".join(preserved_sections)
    
    final_mandate = " || FINAL MANDATE: Create a masterpiece with perfect character consistency and narrative clarity"
    if len(preserved_prompt) + len(final_mandate) <= max_length:
        preserved_prompt += final_mandate
    
    return preserved_prompt