GFES_WAVE / app.py
nakas's picture
Add mirror fallback and log viewer
d093138
raw
history blame
4.91 kB
import logging
import os
from collections import deque
from pathlib import Path
from threading import Event
from typing import Dict, Optional, Tuple
import gradio as gr
from gefs_wave import WaveDownloadManager, logger as wave_logger
DATA_ROOT = Path(os.environ.get("GEFS_WAVE_DATA_DIR", "data/gefswave"))
manager = WaveDownloadManager(DATA_ROOT)
_refresh_started = Event()
_log_buffer: deque[str] = deque(maxlen=200)
class UILogHandler(logging.Handler):
def emit(self, record) -> None: # type: ignore[override]
try:
message = self.format(record)
except Exception:
message = record.getMessage()
_log_buffer.append(message)
log_formatter = logging.Formatter("%Y-%m-%d %H:%M:%S %(levelname)s %(message)s")
ui_log_handler = UILogHandler()
ui_log_handler.setFormatter(log_formatter)
if not any(isinstance(handler, UILogHandler) for handler in wave_logger.handlers):
wave_logger.addHandler(ui_log_handler)
wave_logger.setLevel(logging.INFO)
def _log_text() -> str:
if not _log_buffer:
return "No log messages yet."
return "\n".join(_log_buffer)
def ensure_refresh() -> None:
"""Kick off the downloader thread once per process."""
if not _refresh_started.is_set():
_refresh_started.set()
manager.trigger_refresh()
def format_status(status: Dict, notice: Optional[str] = None) -> Tuple[Dict, str]:
"""Return status JSON and a markdown summary."""
latest = status.get("latest_state", {})
lines = [
f"**Downloader status:** `{status.get('status', 'unknown')}`",
]
if "message" in status:
lines.append(f"- Message: {status['message']}")
if latest:
lines.extend(
[
f"- Latest cycle: `{latest.get('cycle', 'N/A')}`",
f"- Updated at: `{latest.get('updated_at', 'N/A')}`",
f"- Files cached: {len(latest.get('files', []))}",
]
)
if tarball := latest.get("tarball"):
lines.append(f"- Tarball path: `{tarball}`")
else:
lines.append("- No cycle downloaded yet.")
lines.append("\nUse **Trigger Refresh** to fetch a newer cycle when available.")
if notice:
lines.append(f"\n⚠️ {notice}")
return status, "\n".join(lines)
def get_status() -> Tuple[Dict, str, Optional[str], str]:
ensure_refresh()
status = manager.status()
status_dict, status_text = format_status(status)
return status_dict, status_text, None, _log_text()
def trigger_refresh() -> Tuple[Dict, str, Optional[str], str]:
ensure_refresh()
manager.trigger_refresh()
status = manager.status()
status_dict, status_text = format_status(status)
return status_dict, status_text, None, _log_text()
def download_tarball() -> Tuple[Dict, str, Optional[str], str]:
ensure_refresh()
status = manager.status()
state = status.get("latest_state", {})
tarball_rel = state.get("tarball")
if not tarball_rel:
status_dict, status_text = format_status(
status, "Wave dataset not available yet. Trigger a refresh and try again."
)
return status_dict, status_text, None, _log_text()
tarball_path = DATA_ROOT / tarball_rel
if not tarball_path.exists():
status_dict, status_text = format_status(
status, "Tarball missing on disk. Trigger a refresh and try again."
)
return status_dict, status_text, None, _log_text()
status_dict, status_text = format_status(status)
return status_dict, status_text, str(tarball_path), _log_text()
with gr.Blocks(title="GFES Wave 0 Downloader") as demo:
gr.Markdown(
"""
## GFES Wave 0 Downloader
Fetch the latest NOAA GEFS Wave control member (`c00`) global 0.25° dataset.
1. Check the status panel to confirm the current cycle.
2. Press **Trigger Refresh** to request a newer cycle if one is available.
3. Use **Download Latest Tarball** to grab a `.tar.gz` of every GRIB2 file for that cycle.
"""
)
status_json = gr.JSON(label="Downloader Status")
status_md = gr.Markdown()
logs_box = gr.Textbox(label="Recent Logs", lines=12, max_lines=12, interactive=False)
with gr.Row():
refresh_button = gr.Button("Trigger Refresh", variant="secondary")
download_button = gr.DownloadButton("Download Latest Tarball", variant="primary")
demo.load(get_status, outputs=[status_json, status_md, download_button, logs_box])
refresh_button.click(trigger_refresh, outputs=[status_json, status_md, download_button, logs_box])
download_button.click(download_tarball, outputs=[status_json, status_md, download_button, logs_box])
demo.queue()
if __name__ == "__main__":
ensure_refresh()
port = int(os.environ.get("PORT", 7860))
demo.launch(server_name="0.0.0.0", server_port=port, show_api=False)