|
|
import logging |
|
|
import os |
|
|
from collections import deque |
|
|
from pathlib import Path |
|
|
from threading import Event |
|
|
from typing import Dict, Optional, Tuple |
|
|
|
|
|
import gradio as gr |
|
|
|
|
|
from gefs_wave import WaveDownloadManager, logger as wave_logger |
|
|
|
|
|
DATA_ROOT = Path(os.environ.get("GEFS_WAVE_DATA_DIR", "data/gefswave")) |
|
|
manager = WaveDownloadManager(DATA_ROOT) |
|
|
_refresh_started = Event() |
|
|
_log_buffer: deque[str] = deque(maxlen=200) |
|
|
|
|
|
|
|
|
class UILogHandler(logging.Handler): |
|
|
def emit(self, record) -> None: |
|
|
try: |
|
|
message = self.format(record) |
|
|
except Exception: |
|
|
message = record.getMessage() |
|
|
_log_buffer.append(message) |
|
|
|
|
|
|
|
|
log_formatter = logging.Formatter("%Y-%m-%d %H:%M:%S %(levelname)s %(message)s") |
|
|
ui_log_handler = UILogHandler() |
|
|
ui_log_handler.setFormatter(log_formatter) |
|
|
|
|
|
if not any(isinstance(handler, UILogHandler) for handler in wave_logger.handlers): |
|
|
wave_logger.addHandler(ui_log_handler) |
|
|
wave_logger.setLevel(logging.INFO) |
|
|
|
|
|
|
|
|
def _log_text() -> str: |
|
|
if not _log_buffer: |
|
|
return "No log messages yet." |
|
|
return "\n".join(_log_buffer) |
|
|
|
|
|
|
|
|
def ensure_refresh() -> None: |
|
|
"""Kick off the downloader thread once per process.""" |
|
|
if not _refresh_started.is_set(): |
|
|
_refresh_started.set() |
|
|
manager.trigger_refresh() |
|
|
|
|
|
|
|
|
def format_status(status: Dict, notice: Optional[str] = None) -> Tuple[Dict, str]: |
|
|
"""Return status JSON and a markdown summary.""" |
|
|
latest = status.get("latest_state", {}) |
|
|
lines = [ |
|
|
f"**Downloader status:** `{status.get('status', 'unknown')}`", |
|
|
] |
|
|
if "message" in status: |
|
|
lines.append(f"- Message: {status['message']}") |
|
|
if latest: |
|
|
lines.extend( |
|
|
[ |
|
|
f"- Latest cycle: `{latest.get('cycle', 'N/A')}`", |
|
|
f"- Updated at: `{latest.get('updated_at', 'N/A')}`", |
|
|
f"- Files cached: {len(latest.get('files', []))}", |
|
|
] |
|
|
) |
|
|
if tarball := latest.get("tarball"): |
|
|
lines.append(f"- Tarball path: `{tarball}`") |
|
|
else: |
|
|
lines.append("- No cycle downloaded yet.") |
|
|
lines.append("\nUse **Trigger Refresh** to fetch a newer cycle when available.") |
|
|
if notice: |
|
|
lines.append(f"\n⚠️ {notice}") |
|
|
return status, "\n".join(lines) |
|
|
|
|
|
|
|
|
def get_status() -> Tuple[Dict, str, Optional[str], str]: |
|
|
ensure_refresh() |
|
|
status = manager.status() |
|
|
status_dict, status_text = format_status(status) |
|
|
return status_dict, status_text, None, _log_text() |
|
|
|
|
|
|
|
|
def trigger_refresh() -> Tuple[Dict, str, Optional[str], str]: |
|
|
ensure_refresh() |
|
|
manager.trigger_refresh() |
|
|
status = manager.status() |
|
|
status_dict, status_text = format_status(status) |
|
|
return status_dict, status_text, None, _log_text() |
|
|
|
|
|
|
|
|
def download_tarball() -> Tuple[Dict, str, Optional[str], str]: |
|
|
ensure_refresh() |
|
|
status = manager.status() |
|
|
state = status.get("latest_state", {}) |
|
|
|
|
|
tarball_rel = state.get("tarball") |
|
|
if not tarball_rel: |
|
|
status_dict, status_text = format_status( |
|
|
status, "Wave dataset not available yet. Trigger a refresh and try again." |
|
|
) |
|
|
return status_dict, status_text, None, _log_text() |
|
|
|
|
|
tarball_path = DATA_ROOT / tarball_rel |
|
|
if not tarball_path.exists(): |
|
|
status_dict, status_text = format_status( |
|
|
status, "Tarball missing on disk. Trigger a refresh and try again." |
|
|
) |
|
|
return status_dict, status_text, None, _log_text() |
|
|
|
|
|
status_dict, status_text = format_status(status) |
|
|
return status_dict, status_text, str(tarball_path), _log_text() |
|
|
|
|
|
|
|
|
with gr.Blocks(title="GFES Wave 0 Downloader") as demo: |
|
|
gr.Markdown( |
|
|
""" |
|
|
## GFES Wave 0 Downloader |
|
|
Fetch the latest NOAA GEFS Wave control member (`c00`) global 0.25° dataset. |
|
|
|
|
|
1. Check the status panel to confirm the current cycle. |
|
|
2. Press **Trigger Refresh** to request a newer cycle if one is available. |
|
|
3. Use **Download Latest Tarball** to grab a `.tar.gz` of every GRIB2 file for that cycle. |
|
|
""" |
|
|
) |
|
|
status_json = gr.JSON(label="Downloader Status") |
|
|
status_md = gr.Markdown() |
|
|
logs_box = gr.Textbox(label="Recent Logs", lines=12, max_lines=12, interactive=False) |
|
|
|
|
|
with gr.Row(): |
|
|
refresh_button = gr.Button("Trigger Refresh", variant="secondary") |
|
|
download_button = gr.DownloadButton("Download Latest Tarball", variant="primary") |
|
|
|
|
|
demo.load(get_status, outputs=[status_json, status_md, download_button, logs_box]) |
|
|
refresh_button.click(trigger_refresh, outputs=[status_json, status_md, download_button, logs_box]) |
|
|
download_button.click(download_tarball, outputs=[status_json, status_md, download_button, logs_box]) |
|
|
|
|
|
demo.queue() |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
ensure_refresh() |
|
|
port = int(os.environ.get("PORT", 7860)) |
|
|
demo.launch(server_name="0.0.0.0", server_port=port, show_api=False) |
|
|
|