import logging import os from collections import deque from pathlib import Path from threading import Event from typing import Dict, Optional, Tuple import gradio as gr from gefs_wave import WaveDownloadManager, logger as wave_logger DATA_ROOT = Path(os.environ.get("GEFS_WAVE_DATA_DIR", "data/gefswave")) manager = WaveDownloadManager(DATA_ROOT) _refresh_started = Event() _log_buffer: deque[str] = deque(maxlen=200) class UILogHandler(logging.Handler): def emit(self, record) -> None: # type: ignore[override] try: message = self.format(record) except Exception: message = record.getMessage() _log_buffer.append(message) log_formatter = logging.Formatter("%Y-%m-%d %H:%M:%S %(levelname)s %(message)s") ui_log_handler = UILogHandler() ui_log_handler.setFormatter(log_formatter) if not any(isinstance(handler, UILogHandler) for handler in wave_logger.handlers): wave_logger.addHandler(ui_log_handler) wave_logger.setLevel(logging.INFO) def _log_text() -> str: if not _log_buffer: return "No log messages yet." return "\n".join(_log_buffer) def ensure_refresh() -> None: """Kick off the downloader thread once per process.""" if not _refresh_started.is_set(): _refresh_started.set() manager.trigger_refresh() def format_status(status: Dict, notice: Optional[str] = None) -> Tuple[Dict, str]: """Return status JSON and a markdown summary.""" latest = status.get("latest_state", {}) lines = [ f"**Downloader status:** `{status.get('status', 'unknown')}`", ] if "message" in status: lines.append(f"- Message: {status['message']}") if latest: if dataset := latest.get("product_label"): lines.append(f"- Dataset: `{dataset}`") lines.extend( [ f"- Latest cycle: `{latest.get('cycle', 'N/A')}`", f"- Updated at: `{latest.get('updated_at', 'N/A')}`", f"- Files cached: {len(latest.get('files', []))}", ] ) if tarball := latest.get("tarball"): lines.append(f"- Tarball path: `{tarball}`") else: lines.append("- No cycle downloaded yet.") lines.append("\nUse **Trigger Refresh** to fetch a newer cycle when available.") if notice: lines.append(f"\n⚠️ {notice}") return status, "\n".join(lines) def get_status() -> Tuple[Dict, str, Optional[str], str]: ensure_refresh() status = manager.status() status_dict, status_text = format_status(status) return status_dict, status_text, None, _log_text() def trigger_refresh() -> Tuple[Dict, str, Optional[str], str]: ensure_refresh() manager.trigger_refresh() status = manager.status() status_dict, status_text = format_status(status) return status_dict, status_text, None, _log_text() def download_tarball() -> Tuple[Dict, str, Optional[str], str]: ensure_refresh() status = manager.status() state = status.get("latest_state", {}) tarball_rel = state.get("tarball") if not tarball_rel: status_dict, status_text = format_status( status, "Wave dataset not available yet. Trigger a refresh and try again." ) return status_dict, status_text, None, _log_text() tarball_path = DATA_ROOT / tarball_rel if not tarball_path.exists(): status_dict, status_text = format_status( status, "Tarball missing on disk. Trigger a refresh and try again." ) return status_dict, status_text, None, _log_text() status_dict, status_text = format_status(status) return status_dict, status_text, str(tarball_path), _log_text() with gr.Blocks(title="GFES Wave 0 Downloader") as demo: gr.Markdown( """ ## GFES Wave 0 Downloader Fetch the latest NOAA GEFS Wave control member (`c00`) global 0.25° dataset. 1. Check the status panel to confirm the current cycle. 2. Press **Trigger Refresh** to request a newer cycle if one is available. 3. Use **Download Latest Tarball** to grab a `.tar.gz` of every GRIB2 file for that cycle. """ ) status_json = gr.JSON(label="Downloader Status") status_md = gr.Markdown() logs_box = gr.Textbox(label="Recent Logs", lines=12, max_lines=12, interactive=False) with gr.Row(): refresh_button = gr.Button("Trigger Refresh", variant="secondary") download_button = gr.DownloadButton("Download Latest Tarball", variant="primary") demo.load(get_status, outputs=[status_json, status_md, download_button, logs_box]) refresh_button.click(trigger_refresh, outputs=[status_json, status_md, download_button, logs_box]) download_button.click(download_tarball, outputs=[status_json, status_md, download_button, logs_box]) demo.queue() if __name__ == "__main__": ensure_refresh() port = int(os.environ.get("PORT", 7860)) demo.launch(server_name="0.0.0.0", server_port=port, show_api=False)