super_AI / test_final.py
bhavesh122's picture
Upload 19 files
c890bc7 verified
import httpx
import json
import asyncio
BACKEND_URL = "http://localhost:8000"
async def test_super_search():
print("\n--- Testing Super Search (Gemini 2.5 Flash-Lite) ---")
payload = {
"prompt": "What is the capital of France and its current population?",
"models": ["google/gemini-2.5-flash-lite"], # This will trigger the Neural engine
"user_email": "test-user@superai.com"
}
try:
async with httpx.AsyncClient(timeout=60.0) as client:
async with client.stream("POST", f"{BACKEND_URL}/chat/super-search", json=payload) as response:
if response.status_code != 200:
print(f"Error: {response.status_code} - {await response.aread()}")
return
async for line in response.aiter_lines():
if line.startswith("data: "):
try:
data = json.loads(line[6:])
if data.get("type") == "thought":
print(f"[THOUGHT] {data['content']}")
elif data.get("type") == "sources":
print(f"[SOURCES] {len(data['content'])} found")
elif data.get("type") == "content":
print(data['content'], end="", flush=True)
except: pass
print("\nSearch Test Complete.")
except Exception as e:
print(f"FAILED: {e}")
async def test_image_generate():
print("\n--- Testing Image Generate (Flux) ---")
payload = {
"prompt": "A beautiful sunset over the mountains",
"model_id": "black-forest-labs/FLUX.1-schnell",
"user_email": "test-user@superai.com"
}
try:
async with httpx.AsyncClient(timeout=60.0) as client:
response = await client.post(f"{BACKEND_URL}/chat/image-generate", json=payload)
if response.status_code == 200:
data = response.json()
if "image" in data:
print(f"SUCCESS: Image generated! ({data['image'][:50]}...)")
else:
print(f"FAILED: No image in response: {data}")
else:
print(f"FAILED: {response.status_code} - {response.text}")
except Exception as e:
print(f"FAILED: {e}")
if __name__ == "__main__":
asyncio.run(test_super_search())
asyncio.run(test_image_generate())