import os import io import json import shutil import sqlite3 from pathlib import Path from fastapi import APIRouter, UploadFile, File, Query, HTTPException from fastapi.responses import FileResponse, JSONResponse from storage.files.file_manager import FileManager from storage.common import validate_token router = APIRouter(prefix="/embeddings", tags=["Embeddings Manager"]) EMBEDDINGS_ROOT = Path("/data/embeddings") file_manager = FileManager(EMBEDDINGS_ROOT) HF_TOKEN = os.getenv("HF_TOKEN") @router.get("/list_embeddings", tags=["Embeddings Manager"]) def list_all_embeddings( token: str = Query(..., description="Token required for authorization") ): """ List all embeddings stored under /data/embeddings. For each video hash folder, returns: - video: folder name (hash) - faces: true/false depending on whether faces/embeddings.json exists - voices: true/false depending on whether voices/embeddings.json exists Notes: - A video folder may contain only faces, only voices, or neither. - Missing folders are treated as false. """ validate_token(token) results = [] # If embeddings root does not exist, return empty list if not EMBEDDINGS_ROOT.exists(): return [] for video_dir in EMBEDDINGS_ROOT.iterdir(): if not video_dir.is_dir(): continue # Skip anything that is not a folder faces_path = video_dir / "faces" / "embeddings.json" voices_path = video_dir / "voices" / "embeddings.json" results.append({ "video": video_dir.name, "faces": faces_path.exists(), "voices": voices_path.exists() }) return results @router.post("/upload_embeddings", tags=["Embeddings Manager"]) async def upload_embeddings( file: UploadFile = File(...), embedding_type: str = Query(..., description="faces or voices"), video_hash: str = Query(..., description="Hash of the video"), token: str = Query(..., description="Token required for authorization") ): """ Upload embeddings JSON for a given video and type (faces or voices). Behavior: - Validate the token. - Validate embedding_type. - Ensure directory structure: /data/embeddings/// - Delete any existing .json file inside that folder. - Save the uploaded embeddings as embeddings.json. """ validate_token(token) # Validación del tipo if embedding_type not in ("faces", "voices"): raise HTTPException(status_code=400, detail="embedding_type must be 'faces' or 'voices'") # Rutas objetivo video_path = EMBEDDINGS_ROOT / video_hash type_path = video_path / embedding_type # Crear carpetas si no existen type_path.mkdir(parents=True, exist_ok=True) # Eliminar JSONs previos for existing in type_path.glob("*.json"): try: existing.unlink() except Exception as exc: raise HTTPException(status_code=500, detail=f"Failed to delete old embeddings: {exc}") # Guardar como embeddings.json final_path = type_path / "embeddings.json" try: file_bytes = await file.read() with open(final_path, "wb") as f: f.write(file_bytes) except Exception as exc: raise HTTPException(status_code=500, detail=f"Failed to save embeddings: {exc}") return JSONResponse( status_code=200, content={ "status": "ok", "saved_to": str(final_path) } ) def get_embeddings_json(video_hash: str, embedding_type: str): """ Returns the parsed embeddings.json for a given video and type. Behavior: - Validate embedding_type. - Build the file path: /data/embeddings///embeddings.json - Raise HTTPException if missing. - Load and return parsed JSON. """ if embedding_type not in ("faces", "voices"): raise HTTPException(status_code=400, detail="embedding_type must be 'faces' or 'voices'") target_file = EMBEDDINGS_ROOT / video_hash / embedding_type / "embeddings.json" if not target_file.exists(): raise HTTPException( status_code=404, detail=f"embeddings.json not found for video={video_hash}, type={embedding_type}" ) try: with open(target_file, "r", encoding="utf-8") as f: data = json.load(f) except Exception as exc: raise HTTPException(status_code=500, detail=f"Failed to read embeddings: {exc}") return data @router.get("/get_embedding", tags=["Embeddings Manager"]) def get_embeddings( video_hash: str = Query(..., description="Hash of the video"), embedding_type: str = Query(..., description="faces or voices"), token: str = Query(..., description="Token required for authorization") ): """ Endpoint to retrieve embeddings.json for a given video hash and type. """ validate_token(token) data = get_embeddings_json(video_hash, embedding_type) return data