GFES_WAVE / gefs_wave.py
nakas's picture
Add mirror fallback and log viewer
d093138
raw
history blame
9.46 kB
import json
import logging
import re
import tarfile
import threading
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Dict, Iterable, List, Optional
import requests
NOMADS_URL = "https://nomads.ncep.noaa.gov/pub/data/nccf/com/gens/prod"
AZURE_URL = "https://noaagefs.blob.core.windows.net/gefs"
BASE_URLS = [AZURE_URL, NOMADS_URL]
FORECAST_HOURS: List[int] = list(range(0, 387, 3))
STATE_FILENAME = "state.json"
TARBALL_FILENAME = "gefswave-wave0-latest.tar.gz"
logger = logging.getLogger("gefs_wave")
if not logger.handlers:
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter("%(asctime)s %(levelname)s %(message)s"))
logger.addHandler(handler)
logger.setLevel(logging.INFO)
logger.propagate = False
@dataclass
class Cycle:
date: str # YYYYMMDD
cycle: str # HH
@property
def label(self) -> str:
return f"{self.date} {self.cycle}Z"
@property
def directory(self) -> str:
return f"gefs.{self.date}/{self.cycle}"
class WaveDownloadError(Exception):
"""Raised when a download or discovery step fails."""
class WaveDownloader:
"""Handles discovery and download of the GEFS wave control-member dataset."""
def __init__(self, data_root: Path, session: Optional[requests.Session] = None) -> None:
self.data_root = data_root
self.data_root.mkdir(parents=True, exist_ok=True)
self.session = session or requests.Session()
def ensure_latest_cycle(self) -> Dict:
"""Ensure the latest available control-member dataset is present locally."""
logger.info("Ensuring latest GEFS wave control-member cycle for %s", self.data_root)
cycle = self._find_latest_cycle()
if not cycle:
logger.error("No GEFS wave cycle found in the recent window.")
raise WaveDownloadError("Could not locate a recent GEFS wave cycle.")
state = self._read_state()
if state and state.get("cycle") == cycle.label and self._cycle_complete(cycle):
logger.info("Cycle %s already cached; using existing files.", cycle.label)
return state
logger.info("Downloading cycle %s", cycle.label)
files = self._download_cycle(cycle)
tarball_path = self._build_tarball(cycle, files)
logger.info("Packaged cycle %s into tarball %s", cycle.label, tarball_path.name)
state = {
"cycle": cycle.label,
"files": [str(path.relative_to(self.data_root)) for path in files],
"tarball": str(tarball_path.relative_to(self.data_root)),
"updated_at": datetime.now(timezone.utc).isoformat(),
}
self._write_state(state)
return state
def current_state(self) -> Optional[Dict]:
return self._read_state()
def _find_latest_cycle(self, max_days_back: int = 9) -> Optional[Cycle]:
now = datetime.now(timezone.utc)
candidate_days = [(now - timedelta(days=days)).strftime("%Y%m%d") for days in range(max_days_back + 1)]
cycles = ["18", "12", "06", "00"]
for day in candidate_days:
for cycle in cycles:
logger.info("Probing cycle %s %sz", day, cycle)
if self._cycle_available(day, cycle):
logger.info("Cycle %s %sz is available.", day, cycle)
return Cycle(date=day, cycle=cycle)
logger.warning("No GEFS wave cycle found in the last %d days.", max_days_back)
return None
def _cycle_available(self, day: str, cycle: str) -> bool:
pattern = re.compile(rf"gefswave\.t{cycle}z\.c00\.global\.0p25\.f000\.grib2")
for base_url in BASE_URLS:
url = f"{base_url}/gefs.{day}/{cycle}/wave/gridded/"
try:
response = self.session.get(url, timeout=20)
if not response.ok:
logger.debug(
"Cycle %s %sz check failed on %s with HTTP %s.",
day,
cycle,
base_url,
response.status_code,
)
continue
if pattern.search(response.text):
logger.info("Cycle %s %sz present on %s", day, cycle, base_url)
return True
except requests.RequestException as exc:
logger.debug("Error probing cycle %s %sz on %s: %s", day, cycle, base_url, exc)
return False
def _download_cycle(self, cycle: Cycle) -> List[Path]:
files: List[Path] = []
target_dir = self.data_root / cycle.date / cycle.cycle
target_dir.mkdir(parents=True, exist_ok=True)
for hour in FORECAST_HOURS:
filename = f"gefswave.t{cycle.cycle}z.c00.global.0p25.f{hour:03d}.grib2"
destination = target_dir / filename
if destination.exists() and destination.stat().st_size > 0:
logger.debug("File %s already exists; skipping download.", destination)
files.append(destination)
continue
logger.info("Downloading %s", filename)
self._download_with_fallback(cycle, filename, destination)
files.append(destination)
return files
def _build_tarball(self, cycle: Cycle, files: Iterable[Path]) -> Path:
tarball_path = self.data_root / TARBALL_FILENAME
with tarfile.open(tarball_path, "w:gz") as tar:
for file_path in files:
tar.add(file_path, arcname=file_path.relative_to(self.data_root))
return tarball_path
def _download_with_fallback(self, cycle: Cycle, filename: str, destination: Path) -> None:
errors = []
for base_url in BASE_URLS:
url = f"{base_url}/{cycle.directory}/wave/gridded/{filename}"
try:
self._stream_to_file(url, destination)
return
except WaveDownloadError as exc:
errors.append(str(exc))
logger.warning("Failed to download %s from %s (%s).", filename, base_url, exc)
raise WaveDownloadError(f"All download attempts failed for {filename}: {'; '.join(errors)}")
def _stream_to_file(self, url: str, destination: Path) -> None:
tmp_path = destination.with_suffix(destination.suffix + ".part")
try:
with self.session.get(url, stream=True, timeout=120) as response:
response.raise_for_status()
with tmp_path.open("wb") as handle:
for chunk in response.iter_content(chunk_size=1 << 20):
if not chunk:
continue
handle.write(chunk)
tmp_path.rename(destination)
except requests.RequestException as exc:
if tmp_path.exists():
tmp_path.unlink()
logger.error("Failed to download %s: %s", url, exc)
raise WaveDownloadError(f"Failed to download {url}: {exc}") from exc
def _cycle_complete(self, cycle: Cycle) -> bool:
target_dir = self.data_root / cycle.date / cycle.cycle
if not target_dir.exists():
return False
expected = {f"gefswave.t{cycle.cycle}z.c00.global.0p25.f{hour:03d}.grib2" for hour in FORECAST_HOURS}
existing = {path.name for path in target_dir.glob("*.grib2") if path.stat().st_size > 0}
return expected.issubset(existing)
def _read_state(self) -> Optional[Dict]:
path = self.data_root / STATE_FILENAME
if not path.exists():
return None
try:
return json.loads(path.read_text())
except json.JSONDecodeError:
return None
def _write_state(self, state: Dict) -> None:
path = self.data_root / STATE_FILENAME
path.write_text(json.dumps(state, indent=2))
class WaveDownloadManager:
"""Simple threaded wrapper to keep download work off the request thread."""
def __init__(self, data_root: Path) -> None:
self.downloader = WaveDownloader(data_root)
self._lock = threading.Lock()
self._worker: Optional[threading.Thread] = None
self._status: Dict = {"status": "idle"}
def trigger_refresh(self) -> None:
with self._lock:
if self._worker and self._worker.is_alive():
logger.info("Refresh already in progress; ignoring new trigger.")
return
logger.info("Starting GEFS wave refresh thread.")
self._status = {"status": "running"}
self._worker = threading.Thread(target=self._run_refresh, daemon=True)
self._worker.start()
def status(self) -> Dict:
with self._lock:
status = dict(self._status)
state = self.downloader.current_state()
if state:
status["latest_state"] = state
return status
def _run_refresh(self) -> None:
try:
result = self.downloader.ensure_latest_cycle()
with self._lock:
self._status = {"status": "ready", "latest_state": result}
logger.info("GEFS wave refresh completed successfully.")
except Exception as exc:
with self._lock:
self._status = {"status": "error", "message": str(exc)}
logger.exception("GEFS wave refresh failed: %s", exc)