File size: 5,008 Bytes
d093138 4283b24 d093138 4283b24 558e0b2 396b409 4283b24 88e6644 4283b24 d093138 4283b24 558e0b2 d093138 4283b24 558e0b2 88e6644 558e0b2 4283b24 396b409 88e6644 a56debb 88e6644 396b409 88e6644 d093138 558e0b2 88e6644 396b409 d093138 4283b24 d093138 558e0b2 4283b24 88e6644 396b409 d093138 4283b24 d093138 558e0b2 396b409 d093138 4283b24 396b409 4283b24 396b409 d093138 396b409 d093138 88e6644 4283b24 88e6644 d093138 4283b24 88e6644 1ad0a92 4283b24 d093138 4283b24 050706c 4283b24 558e0b2 88e6644 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 |
import logging
import os
from collections import deque
from pathlib import Path
from threading import Event
from typing import Dict, Optional, Tuple
import gradio as gr
from gefs_wave import WaveDownloadManager, logger as wave_logger
DATA_ROOT = Path(os.environ.get("GEFS_WAVE_DATA_DIR", "data/gefswave"))
manager = WaveDownloadManager(DATA_ROOT)
_refresh_started = Event()
_log_buffer: deque[str] = deque(maxlen=200)
class UILogHandler(logging.Handler):
def emit(self, record) -> None: # type: ignore[override]
try:
message = self.format(record)
except Exception:
message = record.getMessage()
_log_buffer.append(message)
log_formatter = logging.Formatter("%Y-%m-%d %H:%M:%S %(levelname)s %(message)s")
ui_log_handler = UILogHandler()
ui_log_handler.setFormatter(log_formatter)
if not any(isinstance(handler, UILogHandler) for handler in wave_logger.handlers):
wave_logger.addHandler(ui_log_handler)
wave_logger.setLevel(logging.INFO)
def _log_text() -> str:
if not _log_buffer:
return "No log messages yet."
return "\n".join(_log_buffer)
def ensure_refresh() -> None:
"""Kick off the downloader thread once per process."""
if not _refresh_started.is_set():
_refresh_started.set()
manager.trigger_refresh()
def format_status(status: Dict, notice: Optional[str] = None) -> Tuple[Dict, str]:
"""Return status JSON and a markdown summary."""
latest = status.get("latest_state", {})
lines = [
f"**Downloader status:** `{status.get('status', 'unknown')}`",
]
if "message" in status:
lines.append(f"- Message: {status['message']}")
if latest:
if dataset := latest.get("product_label"):
lines.append(f"- Dataset: `{dataset}`")
lines.extend(
[
f"- Latest cycle: `{latest.get('cycle', 'N/A')}`",
f"- Updated at: `{latest.get('updated_at', 'N/A')}`",
f"- Files cached: {len(latest.get('files', []))}",
]
)
if tarball := latest.get("tarball"):
lines.append(f"- Tarball path: `{tarball}`")
else:
lines.append("- No cycle downloaded yet.")
lines.append("\nUse **Trigger Refresh** to fetch a newer cycle when available.")
if notice:
lines.append(f"\n⚠️ {notice}")
return status, "\n".join(lines)
def get_status() -> Tuple[Dict, str, Optional[str], str]:
ensure_refresh()
status = manager.status()
status_dict, status_text = format_status(status)
return status_dict, status_text, None, _log_text()
def trigger_refresh() -> Tuple[Dict, str, Optional[str], str]:
ensure_refresh()
manager.trigger_refresh()
status = manager.status()
status_dict, status_text = format_status(status)
return status_dict, status_text, None, _log_text()
def download_tarball() -> Tuple[Dict, str, Optional[str], str]:
ensure_refresh()
status = manager.status()
state = status.get("latest_state", {})
tarball_rel = state.get("tarball")
if not tarball_rel:
status_dict, status_text = format_status(
status, "Wave dataset not available yet. Trigger a refresh and try again."
)
return status_dict, status_text, None, _log_text()
tarball_path = DATA_ROOT / tarball_rel
if not tarball_path.exists():
status_dict, status_text = format_status(
status, "Tarball missing on disk. Trigger a refresh and try again."
)
return status_dict, status_text, None, _log_text()
status_dict, status_text = format_status(status)
return status_dict, status_text, str(tarball_path), _log_text()
with gr.Blocks(title="GFES Wave 0 Downloader") as demo:
gr.Markdown(
"""
## GFES Wave 0 Downloader
Fetch the latest NOAA GEFS Wave control member (`c00`) global 0.25° dataset.
1. Check the status panel to confirm the current cycle.
2. Press **Trigger Refresh** to request a newer cycle if one is available.
3. Use **Download Latest Tarball** to grab a `.tar.gz` of every GRIB2 file for that cycle.
"""
)
status_json = gr.JSON(label="Downloader Status")
status_md = gr.Markdown()
logs_box = gr.Textbox(label="Recent Logs", lines=12, max_lines=12, interactive=False)
with gr.Row():
refresh_button = gr.Button("Trigger Refresh", variant="secondary")
download_button = gr.DownloadButton("Download Latest Tarball", variant="primary")
demo.load(get_status, outputs=[status_json, status_md, download_button, logs_box])
refresh_button.click(trigger_refresh, outputs=[status_json, status_md, download_button, logs_box])
download_button.click(download_tarball, outputs=[status_json, status_md, download_button, logs_box])
demo.queue()
if __name__ == "__main__":
ensure_refresh()
port = int(os.environ.get("PORT", 7860))
demo.launch(server_name="0.0.0.0", server_port=port, show_api=False)
|