soja-api / main.py
Ju-Am's picture
Add endpoint de debug pra visualizar img segmentada.
5ec0dd3
import os
import shutil
import joblib
import json
import numpy as np
from fastapi import FastAPI, File, UploadFile, Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from datetime import datetime
import unicodedata
import io
from PIL import Image
from starlette.responses import StreamingResponse
from feature_extractor_single import process_single_image, segment_image
def normalize_string(s: str) -> str:
"""
Remove inconsistências de Unicode (NFC vs NFD) e espaços extras.
Transforma 'Vírus' (2 chars) em 'Vírus' (1 char) e remove espaços nas pontas.
"""
return unicodedata.normalize('NFC', s).strip()
#1. CONFIGURAÇÃO DA APLICAÇÃO E AUTENTICAÇÃO
app = FastAPI(title="SojaClassifierAPI")
security = HTTPBearer()
API_SECRET_TOKEN = os.environ.get("API_SECRET_TOKEN")
if API_SECRET_TOKEN is None:
print("AVISO: Variável de ambiente API_SECRET_TOKEN não definida.")
async def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)):
"""Verifica se o token enviado pelo cliente é o correto."""
if not API_SECRET_TOKEN:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Token de segurança não configurado no servidor",
)
if credentials.scheme != "Bearer" or credentials.credentials != API_SECRET_TOKEN:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token inválido ou ausente",
headers={"WWW-Authenticate": "Bearer"},
)
return credentials.credentials
#2. CARREGAMENTO DOS MODELOS E DADOS
#Carrega os 4 arquivos .pkl UMA VEZ quando a API inicia.
print(f">> [{datetime.now().strftime('%d/%m %H:%M:%S')}] | CARREGANDO MODELOS de classificação (.pkl)...")
try:
SCALER = joblib.load('scaler.pkl')
UMAP = joblib.load('umap_reducer.pkl')
SVM = joblib.load('svm_model.pkl')
ENCODER = joblib.load('encoder.pkl')
print(f">> [{datetime.now().strftime('%d/%m %H:%M:%S')}] | Modelos de classificação carregados com SUCESSO.")
except FileNotFoundError:
print(f">> [{datetime.now().strftime('%d/%m %H:%M:%S')}] | ERRO: Arquivos .pkl do modelo não encontrados.\nCertifique-se de que 'scaler.pkl', 'umap_reducer.pkl', 'svm_model.pkl', e 'encoder.pkl' estão no repositório.")
#Em um cenário real, poderíamos impedir a API de iniciar aqui
print("Carregando banco de dados de informações (doencas.json)...")
try:
with open('doencas.json', 'r', encoding='utf-8') as f:
raw_db = json.load(f)
#Cria um novo dicionário com as chaves limpas
DB_INFO = {normalize_string(k): v for k, v in raw_db.items()}
print(f"Banco de dados carregado. {len(DB_INFO)} doenças indexadas.")
#DEBUG: Mostra como as chaves ficaram na memória do servidor
print(f"Chaves normalizadas: {list(DB_INFO.keys())}")
except FileNotFoundError:
print("ERRO: 'doencas.json' não encontrado.")
DB_INFO = {}
#3. ENDPOINTS DA API
@app.get("/") #HEALTH CHECK
async def root():
return {"message": "API Dr. Plant está ONLINE! Use o endpoint /classify/ para enviar imagens."}
@app.post("/classify/")
async def classify_image(file: UploadFile = File(...), token: str = Depends(verify_token)):
"""
Endpoint principal: Recebe uma imagem, extrai features e classifica com % de confiança.
"""
print(f">> [{datetime.now().strftime('%d/%m %H:%M:%S')}] | Recebida imagem: {file.filename}.")
temp_path = f"/tmp/temp_{file.filename}"
#Salva a imagem temporariamente
try:
with open(temp_path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Erro ao salvar arquivo: {e}")
try:
#Extrai Features
features_array = process_single_image(temp_path)
#Prepara o vetor
nova_feature = features_array.reshape(1, -1)
#Normaliza e Reduz
nova_feature_scaled = SCALER.transform(nova_feature)
nova_feature_umap = UMAP.transform(nova_feature_scaled)
#MUDANÇA: LÓGICA DE PROBABILIDADE
#Obtém as probabilidades de TODAS as classes
#Retorna algo como [[0.05, 0.90, 0.05...]]
probs = SVM.predict_proba(nova_feature_umap)[0]
#Descobre o índice da maior probabilidade (o vencedor)
max_idx = np.argmax(probs)
#Pega o valor dessa probabilidade (a confiança)
confianca_valor = probs[max_idx] # Ex: 0.9542
#Converte o índice vencedor para o nome técnico
classe_predita_raw = ENCODER.inverse_transform([max_idx])[0]
#BLINDAGEM: normaliza a string
classe_predita = normalize_string(str(classe_predita_raw))
#Formata a confiança para string (Ex: "95.42%")
confianca_str = f"{confianca_valor * 100:.2f}%"
#DEBUG: Verificando chaves
#print(f"DEBUG CHAVES: Modelo='{repr(classe_predita)}'")
#BUSCA AS INFORMAÇÕES
info_adicional = DB_INFO.get(classe_predita, {
"nome": classe_predita,
"descricao": "Informações não disponíveis.",
"sintomas": [],
"tratamento": ""
})
#PEGA O NOME "BONITO" DO JSON
nome_exibicao = info_adicional.get("nome", classe_predita)
print(f">> [{datetime.now().strftime('%d/%m %H:%M:%S')}] | Diagnóstico: {classe_predita} | Confiança: {confianca_str}")
return {
"diagnostico": nome_exibicao,
"confianca": confianca_str, # <--- NOVO CAMPO
"id_tecnico": classe_predita,
"info": info_adicional
}
except Exception as e:
#Pega qualquer erro que acontecer durante a extração ou classificação
raise HTTPException(status_code=500, detail=f"Erro no processamento: {e}")
finally:
#Remove a imagem temporária, aconteça o que acontecer
if os.path.exists(temp_path):
os.remove(temp_path)
@app.post("/extract_features/")
async def extract_features(file: UploadFile = File(...), token: str = Depends(verify_token)):
"""
Endpoint de debug: Apenas extrai as features sem classificar.
"""
temp_path = f"/tmp/temp_{file.filename}"
with open(temp_path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
features_array = process_single_image(temp_path)
os.remove(temp_path)
features_list = features_array.tolist()
return {"features": features_list}
@app.post("/debug/view_segmentation/")
async def view_segmentation(file: UploadFile = File(...), token: str = Depends(verify_token)):
"""
Endpoint de debug.
Retorna a imagem processada (fundo preto) para verificação visual.
Útil para saber o que o modelo está "enxergando".
"""
try:
#1. Lê a imagem da memória (sem salvar no disco pra ser rápido)
contents = await file.read()
pil_image = Image.open(io.BytesIO(contents)).convert("RGB")
#2. Aplica a mesma lógica de segmentação do modelo
processed_image = segment_image(pil_image)
#3. Salva a imagem processada em um buffer de memória (bytes)
img_byte_arr = io.BytesIO()
processed_image.save(img_byte_arr, format='JPEG', quality=95)
img_byte_arr.seek(0)
#4. Retorna como uma stream de imagem (O navegador/Swagger exibe isso!)
return StreamingResponse(img_byte_arr, media_type="image/jpeg")
except Exception as e:
raise HTTPException(status_code=500, detail=f"Erro ao processar imagem: {e}")