|
|
import json |
|
|
import logging |
|
|
import re |
|
|
import tarfile |
|
|
import threading |
|
|
from dataclasses import dataclass |
|
|
from datetime import datetime, timedelta, timezone |
|
|
from pathlib import Path |
|
|
from typing import Dict, Iterable, List, Optional |
|
|
|
|
|
import requests |
|
|
|
|
|
|
|
|
NOMADS_URL = "https://nomads.ncep.noaa.gov/pub/data/nccf/com/gens/prod" |
|
|
AZURE_URL = "https://noaagefs.blob.core.windows.net/gefs" |
|
|
BASE_URLS = [AZURE_URL, NOMADS_URL] |
|
|
FORECAST_HOURS: List[int] = list(range(0, 387, 3)) |
|
|
STATE_FILENAME = "state.json" |
|
|
TARBALL_FILENAME = "gefswave-wave0-latest.tar.gz" |
|
|
|
|
|
logger = logging.getLogger("gefs_wave") |
|
|
if not logger.handlers: |
|
|
handler = logging.StreamHandler() |
|
|
handler.setFormatter(logging.Formatter("%(asctime)s %(levelname)s %(message)s")) |
|
|
logger.addHandler(handler) |
|
|
logger.setLevel(logging.INFO) |
|
|
logger.propagate = False |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class Cycle: |
|
|
date: str |
|
|
cycle: str |
|
|
|
|
|
@property |
|
|
def label(self) -> str: |
|
|
return f"{self.date} {self.cycle}Z" |
|
|
|
|
|
@property |
|
|
def directory(self) -> str: |
|
|
return f"gefs.{self.date}/{self.cycle}" |
|
|
|
|
|
|
|
|
class WaveDownloadError(Exception): |
|
|
"""Raised when a download or discovery step fails.""" |
|
|
|
|
|
|
|
|
class WaveDownloader: |
|
|
"""Handles discovery and download of the GEFS wave control-member dataset.""" |
|
|
|
|
|
def __init__(self, data_root: Path, session: Optional[requests.Session] = None) -> None: |
|
|
self.data_root = data_root |
|
|
self.data_root.mkdir(parents=True, exist_ok=True) |
|
|
self.session = session or requests.Session() |
|
|
|
|
|
def ensure_latest_cycle(self) -> Dict: |
|
|
"""Ensure the latest available control-member dataset is present locally.""" |
|
|
logger.info("Ensuring latest GEFS wave control-member cycle for %s", self.data_root) |
|
|
cycle = self._find_latest_cycle() |
|
|
if not cycle: |
|
|
logger.error("No GEFS wave cycle found in the recent window.") |
|
|
raise WaveDownloadError("Could not locate a recent GEFS wave cycle.") |
|
|
|
|
|
state = self._read_state() |
|
|
if state and state.get("cycle") == cycle.label and self._cycle_complete(cycle): |
|
|
logger.info("Cycle %s already cached; using existing files.", cycle.label) |
|
|
return state |
|
|
|
|
|
logger.info("Downloading cycle %s", cycle.label) |
|
|
files = self._download_cycle(cycle) |
|
|
tarball_path = self._build_tarball(cycle, files) |
|
|
logger.info("Packaged cycle %s into tarball %s", cycle.label, tarball_path.name) |
|
|
state = { |
|
|
"cycle": cycle.label, |
|
|
"files": [str(path.relative_to(self.data_root)) for path in files], |
|
|
"tarball": str(tarball_path.relative_to(self.data_root)), |
|
|
"updated_at": datetime.now(timezone.utc).isoformat(), |
|
|
} |
|
|
self._write_state(state) |
|
|
return state |
|
|
|
|
|
def current_state(self) -> Optional[Dict]: |
|
|
return self._read_state() |
|
|
|
|
|
def _find_latest_cycle(self, max_days_back: int = 9) -> Optional[Cycle]: |
|
|
now = datetime.now(timezone.utc) |
|
|
candidate_days = [(now - timedelta(days=days)).strftime("%Y%m%d") for days in range(max_days_back + 1)] |
|
|
cycles = ["18", "12", "06", "00"] |
|
|
|
|
|
for day in candidate_days: |
|
|
for cycle in cycles: |
|
|
logger.info("Probing cycle %s %sz", day, cycle) |
|
|
if self._cycle_available(day, cycle): |
|
|
logger.info("Cycle %s %sz is available.", day, cycle) |
|
|
return Cycle(date=day, cycle=cycle) |
|
|
logger.warning("No GEFS wave cycle found in the last %d days.", max_days_back) |
|
|
return None |
|
|
|
|
|
def _cycle_available(self, day: str, cycle: str) -> bool: |
|
|
pattern = re.compile(rf"gefswave\.t{cycle}z\.c00\.global\.0p25\.f000\.grib2") |
|
|
for base_url in BASE_URLS: |
|
|
url = f"{base_url}/gefs.{day}/{cycle}/wave/gridded/" |
|
|
try: |
|
|
response = self.session.get(url, timeout=20) |
|
|
if not response.ok: |
|
|
logger.debug( |
|
|
"Cycle %s %sz check failed on %s with HTTP %s.", |
|
|
day, |
|
|
cycle, |
|
|
base_url, |
|
|
response.status_code, |
|
|
) |
|
|
continue |
|
|
if pattern.search(response.text): |
|
|
logger.info("Cycle %s %sz present on %s", day, cycle, base_url) |
|
|
return True |
|
|
except requests.RequestException as exc: |
|
|
logger.debug("Error probing cycle %s %sz on %s: %s", day, cycle, base_url, exc) |
|
|
return False |
|
|
|
|
|
def _download_cycle(self, cycle: Cycle) -> List[Path]: |
|
|
files: List[Path] = [] |
|
|
target_dir = self.data_root / cycle.date / cycle.cycle |
|
|
target_dir.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
for hour in FORECAST_HOURS: |
|
|
filename = f"gefswave.t{cycle.cycle}z.c00.global.0p25.f{hour:03d}.grib2" |
|
|
destination = target_dir / filename |
|
|
if destination.exists() and destination.stat().st_size > 0: |
|
|
logger.debug("File %s already exists; skipping download.", destination) |
|
|
files.append(destination) |
|
|
continue |
|
|
logger.info("Downloading %s", filename) |
|
|
self._download_with_fallback(cycle, filename, destination) |
|
|
files.append(destination) |
|
|
return files |
|
|
|
|
|
def _build_tarball(self, cycle: Cycle, files: Iterable[Path]) -> Path: |
|
|
tarball_path = self.data_root / TARBALL_FILENAME |
|
|
with tarfile.open(tarball_path, "w:gz") as tar: |
|
|
for file_path in files: |
|
|
tar.add(file_path, arcname=file_path.relative_to(self.data_root)) |
|
|
return tarball_path |
|
|
|
|
|
def _download_with_fallback(self, cycle: Cycle, filename: str, destination: Path) -> None: |
|
|
errors = [] |
|
|
for base_url in BASE_URLS: |
|
|
url = f"{base_url}/{cycle.directory}/wave/gridded/{filename}" |
|
|
try: |
|
|
self._stream_to_file(url, destination) |
|
|
return |
|
|
except WaveDownloadError as exc: |
|
|
errors.append(str(exc)) |
|
|
logger.warning("Failed to download %s from %s (%s).", filename, base_url, exc) |
|
|
raise WaveDownloadError(f"All download attempts failed for {filename}: {'; '.join(errors)}") |
|
|
|
|
|
def _stream_to_file(self, url: str, destination: Path) -> None: |
|
|
tmp_path = destination.with_suffix(destination.suffix + ".part") |
|
|
try: |
|
|
with self.session.get(url, stream=True, timeout=120) as response: |
|
|
response.raise_for_status() |
|
|
with tmp_path.open("wb") as handle: |
|
|
for chunk in response.iter_content(chunk_size=1 << 20): |
|
|
if not chunk: |
|
|
continue |
|
|
handle.write(chunk) |
|
|
tmp_path.rename(destination) |
|
|
except requests.RequestException as exc: |
|
|
if tmp_path.exists(): |
|
|
tmp_path.unlink() |
|
|
logger.error("Failed to download %s: %s", url, exc) |
|
|
raise WaveDownloadError(f"Failed to download {url}: {exc}") from exc |
|
|
|
|
|
def _cycle_complete(self, cycle: Cycle) -> bool: |
|
|
target_dir = self.data_root / cycle.date / cycle.cycle |
|
|
if not target_dir.exists(): |
|
|
return False |
|
|
expected = {f"gefswave.t{cycle.cycle}z.c00.global.0p25.f{hour:03d}.grib2" for hour in FORECAST_HOURS} |
|
|
existing = {path.name for path in target_dir.glob("*.grib2") if path.stat().st_size > 0} |
|
|
return expected.issubset(existing) |
|
|
|
|
|
def _read_state(self) -> Optional[Dict]: |
|
|
path = self.data_root / STATE_FILENAME |
|
|
if not path.exists(): |
|
|
return None |
|
|
try: |
|
|
return json.loads(path.read_text()) |
|
|
except json.JSONDecodeError: |
|
|
return None |
|
|
|
|
|
def _write_state(self, state: Dict) -> None: |
|
|
path = self.data_root / STATE_FILENAME |
|
|
path.write_text(json.dumps(state, indent=2)) |
|
|
|
|
|
|
|
|
class WaveDownloadManager: |
|
|
"""Simple threaded wrapper to keep download work off the request thread.""" |
|
|
|
|
|
def __init__(self, data_root: Path) -> None: |
|
|
self.downloader = WaveDownloader(data_root) |
|
|
self._lock = threading.Lock() |
|
|
self._worker: Optional[threading.Thread] = None |
|
|
self._status: Dict = {"status": "idle"} |
|
|
|
|
|
def trigger_refresh(self) -> None: |
|
|
with self._lock: |
|
|
if self._worker and self._worker.is_alive(): |
|
|
logger.info("Refresh already in progress; ignoring new trigger.") |
|
|
return |
|
|
logger.info("Starting GEFS wave refresh thread.") |
|
|
self._status = {"status": "running"} |
|
|
self._worker = threading.Thread(target=self._run_refresh, daemon=True) |
|
|
self._worker.start() |
|
|
|
|
|
def status(self) -> Dict: |
|
|
with self._lock: |
|
|
status = dict(self._status) |
|
|
state = self.downloader.current_state() |
|
|
if state: |
|
|
status["latest_state"] = state |
|
|
return status |
|
|
|
|
|
def _run_refresh(self) -> None: |
|
|
try: |
|
|
result = self.downloader.ensure_latest_cycle() |
|
|
with self._lock: |
|
|
self._status = {"status": "ready", "latest_state": result} |
|
|
logger.info("GEFS wave refresh completed successfully.") |
|
|
except Exception as exc: |
|
|
with self._lock: |
|
|
self._status = {"status": "error", "message": str(exc)} |
|
|
logger.exception("GEFS wave refresh failed: %s", exc) |
|
|
|