File size: 9,657 Bytes
e037628 ab3578e e037628 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 |
import io
import base64
import os
from PIL import Image
import config
from openai import OpenAI
import warnings
import time
from google.generativeai import GenerativeModel
from datetime import datetime
warnings.filterwarnings("ignore", message="IMAGE_SAFETY is not a valid FinishReason")
global_image_data_url = None
global_image_prompt = None
global_image_description = None
def log_execution(func):
def wrapper(*args, **kwargs):
start_time = time.time()
start_str = datetime.fromtimestamp(start_time).strftime('%Y-%m-%d %H:%M:%S')
result = func(*args, **kwargs)
end_time = time.time()
end_str = datetime.fromtimestamp(end_time).strftime('%Y-%m-%d %H:%M:%S')
duration = end_time - start_time
return result
return wrapper
@log_execution
def generate_image_fn_deprecated (selected_prompt, model="gpt-image-1", output_path="models\benchmark"):
"""
Generate an image from the prompt via the OpenAI API using gpt-image-1.
Convert the image to a data URL and optionally save it to a file.
Args:
selected_prompt (str): The prompt to generate the image from.
model (str): Should be "gpt-image-1". Parameter kept for compatibility.
output_path (str, optional): If provided, saves the image to this path. Defaults to None.
Returns:
PIL.Image.Image or None: The generated image as a PIL Image object, or None on error.
"""
global global_image_data_url, global_image_prompt
MAX_PROMPT_LENGTH = 32000
if len(selected_prompt) > MAX_PROMPT_LENGTH:
selected_prompt = smart_truncate_prompt(selected_prompt, MAX_PROMPT_LENGTH)
print(f"Warning: Prompt was smartly truncated to {len(selected_prompt)} characters while preserving critical details")
global_image_prompt = selected_prompt
model = "gpt-image-1"
try:
client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY", config.OPENAI_API_KEY))
api_params = {
"model": model,
"prompt": selected_prompt,
"size": "1024x1536" ,
"quality": "high",
"moderation":"low"
}
result = client.images.generate(**api_params)
image_bytes = base64.b64decode(image_base64)
image = Image.open(io.BytesIO(image_bytes))
if output_path:
try:
os.makedirs(os.path.dirname(output_path), exist_ok=True)
with open(output_path, "wb") as f:
f.write(image_bytes)
print(f"Successfully saved image to {output_path}")
except Exception as e:
print(f"Error saving image to {output_path}: {str(e)}")
buffered = io.BytesIO()
image.save(buffered, format="PNG")
img_bytes = buffered.getvalue()
img_b64 = base64.b64encode(img_bytes).decode("utf-8")
global_image_data_url = f"data:image/png;base64,{img_b64}"
print(f"Successfully generated image with prompt: {selected_prompt[:50]}...")
return image
except Exception as e:
print(f"Error generating image: {str(e)}")
return None
@log_execution
def generate_image_fn(selected_prompt, model="gemini-2.5-flash-image-preview", output_path="models/benchmark"):
"""
Generate an image from the prompt via the Google Gemini API using vertexai.
Convert the image to a data URL and optionally save it to a file.
Args:
selected_prompt (str): The prompt to generate the image from.
model (str): The Gemini model to use. Defaults to "gemini-2.5-flash-image-preview".
output_path (str, optional): If provided, saves the image to this path. Defaults to "models/benchmark".
Returns:
PIL.Image.Image or None: The generated image as a PIL Image object, or None on error.
"""
global global_image_data_url, global_image_prompt
MAX_PROMPT_LENGTH = 32000
if len(selected_prompt) > MAX_PROMPT_LENGTH:
selected_prompt = smart_truncate_prompt(selected_prompt, MAX_PROMPT_LENGTH)
print(f"Warning: Prompt was smartly truncated to {len(selected_prompt)} characters while preserving critical details")
global_image_prompt = selected_prompt
try:
from google.generativeai import GenerativeModel
from PIL import Image
import io
import base64
import os
# Initialize the Gemini model
gemini_model = GenerativeModel(model)
# Generate content with the prompt
response = gemini_model.generate_content([selected_prompt])
# Extract the generated image from the response
image = None
image_bytes = None
has_text_response = False
for part in response.candidates[0].content.parts:
# Check for text responses (ignore these)
if hasattr(part, 'text') and part.text:
has_text_response = True
print(f"Ignoring text response from API: {part.text[:100]}...")
continue
# Look for image data
if hasattr(part, 'inline_data') and part.inline_data is not None:
image_bytes = part.inline_data.data
# Verify we have valid data
if not image_bytes or len(image_bytes) == 0:
print("Warning: inline_data.data is empty, skipping...")
continue
# Try to parse the image
try:
img_io = io.BytesIO(image_bytes)
image = Image.open(img_io)
image.load() # Force load to verify it's valid
print(f"Successfully loaded image: {len(image_bytes)} bytes")
break
except Exception as img_error:
print(f"Invalid image data received, skipping: {img_error}")
continue
# If we only got text and no image, return None
if image is None:
if has_text_response:
print("API returned text instead of image - skipping this response")
else:
print("No image data found in response")
return None
# Save image to file if output_path is provided
if output_path:
try:
os.makedirs(os.path.dirname(output_path), exist_ok=True)
# Ensure output_path has an image extension
if not output_path.lower().endswith(('.png', '.jpg', '.jpeg')):
output_path = f"{output_path}.png"
image.save(output_path)
print(f"Successfully saved image to {output_path}")
except Exception as e:
print(f"Error saving image to {output_path}: {str(e)}")
# Create data URL for the image
buffered = io.BytesIO()
image.save(buffered, format="PNG")
img_bytes = buffered.getvalue()
img_b64 = base64.b64encode(img_bytes).decode("utf-8")
global_image_data_url = f"data:image/png;base64,{img_b64}"
print(f"Successfully generated image with prompt: {selected_prompt[:50]}...")
return image
except Exception as e:
print(f"Error generating image: {str(e)}")
import traceback
traceback.print_exc()
return None
@log_execution
def smart_truncate_prompt(prompt, max_length):
"""
Smart truncation that preserves critical details and visual consistency information.
Prioritizes character descriptions, layout specifications, and technical requirements.
"""
if len(prompt) <= max_length:
return prompt
critical_sections = [
"CRITICAL LAYOUT:",
"π CRITICAL CHARACTER CONSISTENCY PROTOCOL:",
"CHARACTER 1",
"CHARACTER 2",
"CHARACTER 3",
"STORY CONTENT:",
"ποΈ ENVIRONMENTAL CONSISTENCY PROTOCOL:",
"π¨ COMIC BOOK STYLE MASTERY:",
"π¨ AUTHENTIC MANGA STYLE:",
"π¨ PHOTOREALISTIC EXCELLENCE:",
"π¨ CINEMATIC VISUAL MASTERY:",
"π¨ HIGH-QUALITY ILLUSTRATION:",
"π PANEL COMPOSITION MASTERY:",
"π DETAIL PRESERVATION PROTOCOL:",
"β‘ ADVANCED QUALITY REQUIREMENTS:"
]
sections = prompt.split(" || ")
preserved_sections = []
preserved_length = 0
for section in sections:
section_trimmed = section.strip()
if not section_trimmed:
continue
is_critical = any(critical_marker in section_trimmed for critical_marker in critical_sections[:8])
if is_critical or (preserved_length + len(section_trimmed) + 4 < max_length - 200):
preserved_sections.append(section_trimmed)
preserved_length += len(section_trimmed) + 4
elif preserved_length < max_length * 0.7:
available_space = max_length - preserved_length - 200
if available_space > 100:
truncated_section = section_trimmed[:available_space-20] + "..."
preserved_sections.append(truncated_section)
break
preserved_prompt = " || ".join(preserved_sections)
final_mandate = " || FINAL MANDATE: Create a masterpiece with perfect character consistency and narrative clarity"
if len(preserved_prompt) + len(final_mandate) <= max_length:
preserved_prompt += final_mandate
return preserved_prompt
|