|
|
import os
|
|
|
import shutil
|
|
|
import joblib
|
|
|
import json
|
|
|
import numpy as np
|
|
|
from fastapi import FastAPI, File, UploadFile, Depends, HTTPException, status
|
|
|
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
|
|
|
from datetime import datetime
|
|
|
import unicodedata
|
|
|
import io
|
|
|
from PIL import Image
|
|
|
from starlette.responses import StreamingResponse
|
|
|
from feature_extractor_single import process_single_image, segment_image
|
|
|
|
|
|
def normalize_string(s: str) -> str:
|
|
|
"""
|
|
|
Remove inconsistências de Unicode (NFC vs NFD) e espaços extras.
|
|
|
Transforma 'Vírus' (2 chars) em 'Vírus' (1 char) e remove espaços nas pontas.
|
|
|
"""
|
|
|
return unicodedata.normalize('NFC', s).strip()
|
|
|
|
|
|
|
|
|
app = FastAPI(title="SojaClassifierAPI")
|
|
|
security = HTTPBearer()
|
|
|
API_SECRET_TOKEN = os.environ.get("API_SECRET_TOKEN")
|
|
|
|
|
|
if API_SECRET_TOKEN is None:
|
|
|
print("AVISO: Variável de ambiente API_SECRET_TOKEN não definida.")
|
|
|
|
|
|
async def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)):
|
|
|
"""Verifica se o token enviado pelo cliente é o correto."""
|
|
|
if not API_SECRET_TOKEN:
|
|
|
raise HTTPException(
|
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
|
detail="Token de segurança não configurado no servidor",
|
|
|
)
|
|
|
if credentials.scheme != "Bearer" or credentials.credentials != API_SECRET_TOKEN:
|
|
|
raise HTTPException(
|
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
|
detail="Token inválido ou ausente",
|
|
|
headers={"WWW-Authenticate": "Bearer"},
|
|
|
)
|
|
|
return credentials.credentials
|
|
|
|
|
|
|
|
|
|
|
|
print(f">> [{datetime.now().strftime('%d/%m %H:%M:%S')}] | CARREGANDO MODELOS de classificação (.pkl)...")
|
|
|
try:
|
|
|
SCALER = joblib.load('scaler.pkl')
|
|
|
UMAP = joblib.load('umap_reducer.pkl')
|
|
|
SVM = joblib.load('svm_model.pkl')
|
|
|
ENCODER = joblib.load('encoder.pkl')
|
|
|
print(f">> [{datetime.now().strftime('%d/%m %H:%M:%S')}] | Modelos de classificação carregados com SUCESSO.")
|
|
|
except FileNotFoundError:
|
|
|
print(f">> [{datetime.now().strftime('%d/%m %H:%M:%S')}] | ERRO: Arquivos .pkl do modelo não encontrados.\nCertifique-se de que 'scaler.pkl', 'umap_reducer.pkl', 'svm_model.pkl', e 'encoder.pkl' estão no repositório.")
|
|
|
|
|
|
|
|
|
print("Carregando banco de dados de informações (doencas.json)...")
|
|
|
try:
|
|
|
with open('doencas.json', 'r', encoding='utf-8') as f:
|
|
|
raw_db = json.load(f)
|
|
|
|
|
|
DB_INFO = {normalize_string(k): v for k, v in raw_db.items()}
|
|
|
print(f"Banco de dados carregado. {len(DB_INFO)} doenças indexadas.")
|
|
|
|
|
|
print(f"Chaves normalizadas: {list(DB_INFO.keys())}")
|
|
|
except FileNotFoundError:
|
|
|
print("ERRO: 'doencas.json' não encontrado.")
|
|
|
DB_INFO = {}
|
|
|
|
|
|
|
|
|
|
|
|
@app.get("/")
|
|
|
async def root():
|
|
|
return {"message": "API Dr. Plant está ONLINE! Use o endpoint /classify/ para enviar imagens."}
|
|
|
|
|
|
@app.post("/classify/")
|
|
|
async def classify_image(file: UploadFile = File(...), token: str = Depends(verify_token)):
|
|
|
"""
|
|
|
Endpoint principal: Recebe uma imagem, extrai features e classifica com % de confiança.
|
|
|
"""
|
|
|
|
|
|
print(f">> [{datetime.now().strftime('%d/%m %H:%M:%S')}] | Recebida imagem: {file.filename}.")
|
|
|
temp_path = f"/tmp/temp_{file.filename}"
|
|
|
|
|
|
|
|
|
try:
|
|
|
with open(temp_path, "wb") as buffer:
|
|
|
shutil.copyfileobj(file.file, buffer)
|
|
|
except Exception as e:
|
|
|
raise HTTPException(status_code=500, detail=f"Erro ao salvar arquivo: {e}")
|
|
|
|
|
|
try:
|
|
|
|
|
|
features_array = process_single_image(temp_path)
|
|
|
|
|
|
|
|
|
nova_feature = features_array.reshape(1, -1)
|
|
|
|
|
|
|
|
|
nova_feature_scaled = SCALER.transform(nova_feature)
|
|
|
nova_feature_umap = UMAP.transform(nova_feature_scaled)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
probs = SVM.predict_proba(nova_feature_umap)[0]
|
|
|
|
|
|
|
|
|
max_idx = np.argmax(probs)
|
|
|
|
|
|
|
|
|
confianca_valor = probs[max_idx]
|
|
|
|
|
|
|
|
|
classe_predita_raw = ENCODER.inverse_transform([max_idx])[0]
|
|
|
|
|
|
|
|
|
classe_predita = normalize_string(str(classe_predita_raw))
|
|
|
|
|
|
|
|
|
confianca_str = f"{confianca_valor * 100:.2f}%"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
info_adicional = DB_INFO.get(classe_predita, {
|
|
|
"nome": classe_predita,
|
|
|
"descricao": "Informações não disponíveis.",
|
|
|
"sintomas": [],
|
|
|
"tratamento": ""
|
|
|
})
|
|
|
|
|
|
|
|
|
nome_exibicao = info_adicional.get("nome", classe_predita)
|
|
|
|
|
|
print(f">> [{datetime.now().strftime('%d/%m %H:%M:%S')}] | Diagnóstico: {classe_predita} | Confiança: {confianca_str}")
|
|
|
|
|
|
return {
|
|
|
"diagnostico": nome_exibicao,
|
|
|
"confianca": confianca_str,
|
|
|
"id_tecnico": classe_predita,
|
|
|
"info": info_adicional
|
|
|
}
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
raise HTTPException(status_code=500, detail=f"Erro no processamento: {e}")
|
|
|
|
|
|
finally:
|
|
|
|
|
|
if os.path.exists(temp_path):
|
|
|
os.remove(temp_path)
|
|
|
|
|
|
@app.post("/extract_features/")
|
|
|
async def extract_features(file: UploadFile = File(...), token: str = Depends(verify_token)):
|
|
|
"""
|
|
|
Endpoint de debug: Apenas extrai as features sem classificar.
|
|
|
"""
|
|
|
temp_path = f"/tmp/temp_{file.filename}"
|
|
|
with open(temp_path, "wb") as buffer:
|
|
|
shutil.copyfileobj(file.file, buffer)
|
|
|
|
|
|
features_array = process_single_image(temp_path)
|
|
|
os.remove(temp_path)
|
|
|
|
|
|
features_list = features_array.tolist()
|
|
|
return {"features": features_list}
|
|
|
|
|
|
@app.post("/debug/view_segmentation/")
|
|
|
async def view_segmentation(file: UploadFile = File(...), token: str = Depends(verify_token)):
|
|
|
"""
|
|
|
Endpoint de debug.
|
|
|
Retorna a imagem processada (fundo preto) para verificação visual.
|
|
|
Útil para saber o que o modelo está "enxergando".
|
|
|
"""
|
|
|
try:
|
|
|
|
|
|
contents = await file.read()
|
|
|
pil_image = Image.open(io.BytesIO(contents)).convert("RGB")
|
|
|
|
|
|
|
|
|
processed_image = segment_image(pil_image)
|
|
|
|
|
|
|
|
|
img_byte_arr = io.BytesIO()
|
|
|
processed_image.save(img_byte_arr, format='JPEG', quality=95)
|
|
|
img_byte_arr.seek(0)
|
|
|
|
|
|
|
|
|
return StreamingResponse(img_byte_arr, media_type="image/jpeg")
|
|
|
|
|
|
except Exception as e:
|
|
|
raise HTTPException(status_code=500, detail=f"Erro ao processar imagem: {e}") |