from __future__ import annotations from fastapi import FastAPI, UploadFile, File,Query, Form, BackgroundTasks, HTTPException from fastapi import Body from fastapi.responses import JSONResponse, FileResponse from fastapi.middleware.cors import CORSMiddleware from pathlib import Path import shutil import uvicorn import json import uuid from datetime import datetime from typing import Dict from enum import Enum import os import yaml import io from video_processing import process_video_pipeline from audio_tools import process_audio_for_video, extract_audio_ffmpeg, embed_voice_segments from casting_loader import ensure_chroma, build_faces_index, build_voices_index from narration_system import NarrationSystem from llm_router import load_yaml, LLMRouter from character_detection import detect_characters_from_video from pipelines.audiodescription import generate as ad_generate from storage.files.file_manager import FileManager from storage.media_routers import router as media_router app = FastAPI(title="Veureu Engine API", version="0.2.0") app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) ROOT = Path("/tmp/veureu") ROOT.mkdir(parents=True, exist_ok=True) TEMP_ROOT = Path("/tmp/temp") TEMP_ROOT.mkdir(parents=True, exist_ok=True) VIDEOS_ROOT = Path("/tmp/data/videos") VIDEOS_ROOT.mkdir(parents=True, exist_ok=True) IDENTITIES_ROOT = Path("/tmp/characters") IDENTITIES_ROOT.mkdir(parents=True, exist_ok=True) # Sistema de jobs asíncronos class JobStatus(str, Enum): QUEUED = "queued" PROCESSING = "processing" DONE = "done" FAILED = "failed" jobs: Dict[str, dict] = {} app.include_router(media_router) def describe_image_with_svision(image_path: str, is_face: bool = True) -> tuple[str, str]: """ Llama al space svision para describir una imagen (usado en generación de AD). Args: image_path: Ruta absoluta a la imagen is_face: True si es una cara, False si es una escena Returns: tuple (descripción_completa, nombre_abreviado) """ try: from pathlib import Path as _P import yaml from llm_router import LLMRouter # Cargar configuración config_path = _P(__file__).parent / "config.yaml" if not config_path.exists(): print(f"[svision] Config no encontrado: {config_path}") return ("", "") with open(config_path, 'r', encoding='utf-8') as f: cfg = yaml.safe_load(f) or {} router = LLMRouter(cfg) # Contexto diferente para caras vs escenas if is_face: context = { "task": "describe_person", "instructions": "Descriu la persona en la imatge. Inclou: edat aproximada (jove/adult), gènere, característiques físiques notables (ulleres, barba, bigoti, etc.), expressió i vestimenta.", "max_tokens": 256 } else: context = { "task": "describe_scene", "instructions": "Descriu aquesta escena breument en 2-3 frases: tipus de localització i elements principals.", "max_tokens": 128 } # Llamar a svision descriptions = router.vision_describe([str(image_path)], context=context, model="salamandra-vision") full_description = descriptions[0] if descriptions else "" if not full_description: return ("", "") print(f"[svision] Descripció generada: {full_description[:100]}...") return (full_description, "") except Exception as e: print(f"[svision] Error al descriure imatge: {e}") import traceback traceback.print_exc() return ("", "") def normalize_face_lighting(image): """ Normaliza el brillo de una imagen de cara usando técnicas combinadas: 1. CLAHE para ecualización adaptativa 2. Normalización de rango para homogeneizar brillo general Esto reduce el impacto de diferentes condiciones de iluminación en los embeddings y en la visualización de las imágenes. Args: image: Imagen BGR (OpenCV format) Returns: Imagen normalizada en el mismo formato """ import cv2 import numpy as np # Paso 1: Convertir a LAB color space (más robusto para iluminación) lab = cv2.cvtColor(image, cv2.COLOR_BGR2LAB) l, a, b = cv2.split(lab) # Paso 2: Aplicar CLAHE (Contrast Limited Adaptive Histogram Equalization) al canal L # Usar clipLimit más alto para normalización más agresiva clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8, 8)) l_clahe = clahe.apply(l) # Paso 3: Normalizar el rango del canal L para asegurar distribución uniforme # Esto garantiza que todas las imágenes tengan un rango de brillo similar l_min, l_max = l_clahe.min(), l_clahe.max() if l_max > l_min: # Estirar el histograma al rango completo [0, 255] l_normalized = ((l_clahe - l_min) * 255.0 / (l_max - l_min)).astype(np.uint8) else: l_normalized = l_clahe # Paso 4: Aplicar suavizado suave para reducir ruido introducido por la normalización l_normalized = cv2.GaussianBlur(l_normalized, (3, 3), 0) # Recombinar canales lab_normalized = cv2.merge([l_normalized, a, b]) # Convertir de vuelta a BGR normalized = cv2.cvtColor(lab_normalized, cv2.COLOR_LAB2BGR) return normalized def hierarchical_cluster_with_min_size(X, max_groups: int, min_cluster_size: int, sensitivity: float = 0.5) -> np.ndarray: """ Clustering jerárquico con silhouette score para encontrar automáticamente el mejor número de clusters. Selecciona automáticamente el mejor número de clusters (hasta max_groups) usando silhouette score. Filtra clusters con menos de min_cluster_size muestras (marcados como -1/ruido). Args: X: Array de embeddings (N, D) max_groups: Número máximo de clusters a formar min_cluster_size: Tamaño mínimo de cluster válido sensitivity: Sensibilidad del clustering (0.0-1.0) - 0.0 = muy agresivo (menos clusters) - 0.5 = balanceado (recomendado) - 1.0 = muy permisivo (más clusters) Returns: Array de labels (N,) donde -1 indica ruido """ import numpy as np from scipy.cluster.hierarchy import linkage, fcluster from sklearn.metrics import silhouette_score from collections import Counter if len(X) == 0: return np.array([]) if len(X) < min_cluster_size: # Si hay menos muestras que el mínimo, todo es ruido return np.full(len(X), -1, dtype=int) # Linkage usando average linkage (más flexible que ward, menos sensible a outliers) # Esto ayuda a agrupar mejor la misma persona con diferentes ángulos/expresiones Z = linkage(X, method='average', metric='cosine') # Cosine similarity para embeddings # Encontrar el número óptimo de clusters usando silhouette score best_n_clusters = 2 best_score = -1 # Probar diferentes números de clusters (de 2 a max_groups) max_to_try = min(max_groups, len(X) - 1) # No puede haber más clusters que muestras if max_to_try >= 2: for n_clusters in range(2, max_to_try + 1): trial_labels = fcluster(Z, t=n_clusters, criterion='maxclust') - 1 # Calcular cuántos clusters válidos tendríamos después del filtrado trial_counts = Counter(trial_labels) valid_clusters = sum(1 for count in trial_counts.values() if count >= min_cluster_size) # Solo evaluar si hay al menos 2 clusters válidos if valid_clusters >= 2: try: score = silhouette_score(X, trial_labels, metric='cosine') # Penalización dinámica basada en sensibilidad: # - sensitivity=0.0 → penalty=0.14 (muy agresivo, menos clusters) # - sensitivity=0.5 → penalty=0.07 (balanceado, recomendado) # - sensitivity=1.0 → penalty=0.01 (permisivo, más clusters) penalty = 0.14 - (sensitivity * 0.13) adjusted_score = score - (n_clusters * penalty) if adjusted_score > best_score: best_score = adjusted_score best_n_clusters = n_clusters except: pass # Si falla el cálculo, ignorar esta configuración # Usar el número óptimo de clusters encontrado penalty = 0.14 - (sensitivity * 0.13) print(f"Clustering óptimo: {best_n_clusters} clusters (de máximo {max_groups}), sensitivity={sensitivity:.2f}, penalty={penalty:.3f}, silhouette={best_score:.3f}") labels = fcluster(Z, t=best_n_clusters, criterion='maxclust') # fcluster devuelve labels 1-indexed, convertir a 0-indexed labels = labels - 1 # Filtrar clusters pequeños label_counts = Counter(labels) filtered_labels = [] for lbl in labels: if label_counts[lbl] >= min_cluster_size: filtered_labels.append(lbl) else: filtered_labels.append(-1) # Ruido return np.array(filtered_labels, dtype=int) @app.get("/") def root(): return {"ok": True, "service": "veureu-engine"} @app.post("/process_video") async def process_video( video_file: UploadFile = File(...), config_path: str = Form("config.yaml"), out_root: str = Form("results"), db_dir: str = Form("chroma_db"), ): tmp_video = ROOT / video_file.filename with tmp_video.open("wb") as f: shutil.copyfileobj(video_file.file, f) result = process_video_pipeline(str(tmp_video), config_path=config_path, out_root=out_root, db_dir=db_dir) return JSONResponse(result) @app.post("/create_initial_casting") async def create_initial_casting( background_tasks: BackgroundTasks, video: UploadFile = File(...), max_groups: int = Form(default=3), min_cluster_size: int = Form(default=3), face_sensitivity: float = Form(default=0.5), voice_max_groups: int = Form(default=3), voice_min_cluster_size: int = Form(default=3), voice_sensitivity: float = Form(default=0.5), max_frames: int = Form(default=100), ): """ Crea un job para procesar el vídeo de forma asíncrona usando clustering jerárquico. Devuelve un job_id inmediatamente. """ # Guardar vídeo en carpeta de datos video_name = Path(video.filename).stem dst_video = VIDEOS_ROOT / f"{video_name}.mp4" with dst_video.open("wb") as f: shutil.copyfileobj(video.file, f) # Crear job_id único job_id = str(uuid.uuid4()) # Inicializar el job jobs[job_id] = { "id": job_id, "status": JobStatus.QUEUED, "video_path": str(dst_video), "video_name": video_name, "max_groups": int(max_groups), "min_cluster_size": int(min_cluster_size), "face_sensitivity": float(face_sensitivity), "voice_max_groups": int(voice_max_groups), "voice_min_cluster_size": int(voice_min_cluster_size), "voice_sensitivity": float(voice_sensitivity), "max_frames": int(max_frames), "created_at": datetime.now().isoformat(), "results": None, "error": None } print(f"[{job_id}] Job creado para vídeo: {video_name}") # Iniciar procesamiento en background background_tasks.add_task(process_video_job, job_id) # Devolver job_id inmediatamente return {"job_id": job_id} @app.get("/jobs/{job_id}/status") def get_job_status(job_id: str): """ Devuelve el estado actual de un job. El UI hace polling de este endpoint cada 5 segundos. """ if job_id not in jobs: raise HTTPException(status_code=404, detail="Job not found") job = jobs[job_id] # Normalizar el estado a string status_value = job["status"].value if isinstance(job["status"], JobStatus) else str(job["status"]) response = {"status": status_value} # Incluir resultados si existen (evita condiciones de carrera) if job.get("results") is not None: response["results"] = job["results"] # Incluir error si existe if job.get("error"): response["error"] = job["error"] return response @app.get("/files/{video_name}/{char_id}/{filename}") def serve_character_file(video_name: str, char_id: str, filename: str): """ Sirve archivos estáticos de personajes (imágenes). Ejemplo: /files/dif_catala_1/char1/representative.jpg """ # Las caras se guardan en /tmp/temp/