engine / storage /db_routers.py
VeuReu's picture
Upload 7 files
46cf14b verified
import os
import io
import sqlite3
import zipfile
from pathlib import Path
from fastapi import APIRouter, UploadFile, File, Query, HTTPException
from fastapi.responses import JSONResponse, StreamingResponse
from storage.common import validate_token
router = APIRouter(prefix="/db", tags=["Database Manager"])
HF_TOKEN = os.getenv("HF_TOKEN")
DB_PATH = Path("/data/db")
@router.get("/download_all_db_files", tags=["Database Manager"])
async def download_all_db_files(
token: str = Query(..., description="Token required for authorization")
):
"""
Download all .db files from /data/db as a ZIP archive.
Steps:
- Validate the token.
- Verify that /data/db exists and contains .db files.
- Create an in-memory ZIP containing all .db files.
- Return the ZIP as a downloadable file.
"""
validate_token(token)
if not DB_PATH.exists():
raise HTTPException(status_code=404, detail="DB folder does not exist")
db_files = list(DB_PATH.glob("*.db"))
if not db_files:
raise HTTPException(status_code=404, detail="No .db files found")
# Create in-memory ZIP
zip_buffer = io.BytesIO()
with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zipf:
for file_path in db_files:
zipf.write(file_path, arcname=file_path.name)
zip_buffer.seek(0)
return StreamingResponse(
zip_buffer,
media_type="application/zip",
headers={
"Content-Disposition": 'attachment; filename="db_files.zip"'
}
)
@router.post("/upload_db_file", tags=["Database Manager"])
async def upload_db_file(
file: UploadFile = File(...),
token: str = Query(..., description="Token required for authorization")
):
"""
Upload a file to the /data/db folder.
Steps:
- Validate the token.
- Ensure /data/db exists (create it if missing).
- Save the uploaded file inside /data/db using its original filename.
- Return a JSON response confirming the upload.
"""
validate_token(token)
# Crear carpeta /data/db si no existe
DB_PATH.mkdir(parents=True, exist_ok=True)
final_path = DB_PATH / file.filename
try:
# Leer contenido en memoria y guardar
file_bytes = await file.read()
with open(final_path, "wb") as f:
f.write(file_bytes)
except Exception as exc:
raise HTTPException(status_code=500, detail=f"Failed to save file: {exc}")
return JSONResponse(
status_code=200,
content={
"status": "ok",
"saved_to": str(final_path)
}
)
@router.post("/execute_query", tags=["Database Manager"])
async def execute_query(
db_filename: str = Query(..., description="SQLite .db file name"),
query: str = Query(..., description="SQL query to execute"),
token: str = Query(..., description="Token required for authorization")
):
"""
Execute a SQL query against a specified SQLite database file in /data/db.
Steps:
- Validate the token.
- Ensure the requested .db file exists inside /data/db.
- Connect to the database using sqlite3.
- Execute the provided query.
- Return the results as a list of dictionaries (column: value).
- Capture and return errors if query execution fails.
"""
validate_token(token)
db_file_path = DB_PATH / db_filename
if not db_file_path.exists() or not db_file_path.is_file():
raise HTTPException(status_code=404, detail=f"Database file {db_filename} not found")
try:
conn = sqlite3.connect(db_file_path)
conn.row_factory = sqlite3.Row # para devolver columnas por nombre
cur = conn.cursor()
cur.execute(query)
rows = cur.fetchall()
# Convert rows to list of dicts
results = [dict(row) for row in rows]
conn.commit()
conn.close()
except sqlite3.Error as e:
raise HTTPException(status_code=400, detail=f"SQL error: {e}")
except Exception as e:
raise HTTPException(status_code=500, detail=f"Unexpected error: {e}")
return JSONResponse(content={"results": results})