nakas commited on
Commit
a56debb
·
1 Parent(s): 1f9c688

Fallback to GFS wave dataset and improve UI messaging

Browse files
Files changed (3) hide show
  1. README.md +3 -3
  2. app.py +2 -0
  3. gefs_wave.py +203 -89
README.md CHANGED
@@ -8,12 +8,12 @@ pinned: false
8
  license: cc-by-4.0
9
  ---
10
 
11
- ## GEFS Wave Control Member Downloader
12
 
13
- This Space now provides a Gradio dashboard that keeps a local cache of the latest GEFS Wave control member (`c00`, “wave 0”) global 0.25° GRIB2 dataset from NOAA NOMADS. Use the interface to:
14
 
15
  - Inspect the current downloader status and metadata for the newest available cycle.
16
  - Trigger a refresh to fetch a more recent cycle if one has been published.
17
  - Download a `.tar.gz` archive containing every `global.0p25` GRIB2 file for that cycle.
18
 
19
- Data is stored under `data/gefswave` inside the Space. Override this path with the `GEFS_WAVE_DATA_DIR` environment variable if needed. The downloader searches up to five days back to locate the newest complete cycle.
 
8
  license: cc-by-4.0
9
  ---
10
 
11
+ ## Wave Dataset Downloader
12
 
13
+ This Space now provides a Gradio dashboard that keeps a local cache of the latest NOAA wave datasets. It first tries the GEFS Wave control member (`c00`, “wave 0”) via the Azure mirror and NOMADS; if no ensemble cycle is available, it automatically falls back to the operational GFS Wave global 0.25° product served from NOMADS and the NCEP Polar site. Use the interface to:
14
 
15
  - Inspect the current downloader status and metadata for the newest available cycle.
16
  - Trigger a refresh to fetch a more recent cycle if one has been published.
17
  - Download a `.tar.gz` archive containing every `global.0p25` GRIB2 file for that cycle.
18
 
19
+ Data is stored under `data/gefswave` (configurable via `GEFS_WAVE_DATA_DIR`). You can optionally supply an Azure SAS token with `GEFS_AZURE_SAS_TOKEN`; otherwise, the downloader will request one from the Planetary Computer API when needed. Logs in the UI show which source was used and how many cycles were checked.
app.py CHANGED
@@ -55,6 +55,8 @@ def format_status(status: Dict, notice: Optional[str] = None) -> Tuple[Dict, str
55
  if "message" in status:
56
  lines.append(f"- Message: {status['message']}")
57
  if latest:
 
 
58
  lines.extend(
59
  [
60
  f"- Latest cycle: `{latest.get('cycle', 'N/A')}`",
 
55
  if "message" in status:
56
  lines.append(f"- Message: {status['message']}")
57
  if latest:
58
+ if dataset := latest.get("product_label"):
59
+ lines.append(f"- Dataset: `{dataset}`")
60
  lines.extend(
61
  [
62
  f"- Latest cycle: `{latest.get('cycle', 'N/A')}`",
gefs_wave.py CHANGED
@@ -11,13 +11,11 @@ from typing import Dict, Iterable, List, Optional
11
  import requests
12
 
13
 
14
- NOMADS_URL = "https://nomads.ncep.noaa.gov/pub/data/nccf/com/gens/prod"
15
- AZURE_URL = "https://noaagefs.blob.core.windows.net/gefs"
16
- BASE_URLS = [AZURE_URL, NOMADS_URL]
17
  AZURE_SAS_ENDPOINT = "https://planetarycomputer.microsoft.com/api/sas/v1/token/noaagefs/gefs"
18
- FORECAST_HOURS: List[int] = list(range(0, 387, 3))
 
19
  STATE_FILENAME = "state.json"
20
- TARBALL_FILENAME = "gefswave-wave0-latest.tar.gz"
21
 
22
  logger = logging.getLogger("gefs_wave")
23
  if not logger.handlers:
@@ -28,6 +26,53 @@ logger.setLevel(logging.INFO)
28
  logger.propagate = False
29
 
30
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
31
  @dataclass
32
  class Cycle:
33
  date: str # YYYYMMDD
@@ -37,17 +82,17 @@ class Cycle:
37
  def label(self) -> str:
38
  return f"{self.date} {self.cycle}Z"
39
 
40
- @property
41
- def directory(self) -> str:
42
- return f"gefs.{self.date}/{self.cycle}"
43
-
44
 
45
  class WaveDownloadError(Exception):
46
  """Raised when a download or discovery step fails."""
47
 
48
 
 
 
 
 
49
  class WaveDownloader:
50
- """Handles discovery and download of the GEFS wave control-member dataset."""
51
 
52
  def __init__(self, data_root: Path, session: Optional[requests.Session] = None) -> None:
53
  self.data_root = data_root
@@ -57,23 +102,39 @@ class WaveDownloader:
57
  self._azure_token_checked = False
58
 
59
  def ensure_latest_cycle(self) -> Dict:
60
- """Ensure the latest available control-member dataset is present locally."""
61
- logger.info("Ensuring latest GEFS wave control-member cycle for %s", self.data_root)
62
- cycle = self._find_latest_cycle()
 
 
 
 
 
 
 
 
 
 
 
63
  if not cycle:
64
- logger.error("No GEFS wave cycle found in the recent window.")
65
- raise WaveDownloadError("Could not locate a recent GEFS wave cycle.")
66
 
67
  state = self._read_state()
68
- if state and state.get("cycle") == cycle.label and self._cycle_complete(cycle):
69
- logger.info("Cycle %s already cached; using existing files.", cycle.label)
 
 
 
 
 
70
  return state
71
 
72
- logger.info("Downloading cycle %s", cycle.label)
73
- files = self._download_cycle(cycle)
74
- tarball_path = self._build_tarball(cycle, files)
75
- logger.info("Packaged cycle %s into tarball %s", cycle.label, tarball_path.name)
76
  state = {
 
 
77
  "cycle": cycle.label,
78
  "files": [str(path.relative_to(self.data_root)) for path in files],
79
  "tarball": str(tarball_path.relative_to(self.data_root)),
@@ -85,104 +146,131 @@ class WaveDownloader:
85
  def current_state(self) -> Optional[Dict]:
86
  return self._read_state()
87
 
88
- def _find_latest_cycle(self, max_days_back: int = 9) -> Optional[Cycle]:
89
  now = datetime.now(timezone.utc)
90
- candidate_days = [(now - timedelta(days=days)).strftime("%Y%m%d") for days in range(max_days_back + 1)]
91
  cycles = ["18", "12", "06", "00"]
92
 
93
  for day in candidate_days:
94
  for cycle in cycles:
95
- logger.info("Probing cycle %s %sz", day, cycle)
96
- if self._cycle_available(day, cycle):
97
- logger.info("Cycle %s %sz is available.", day, cycle)
98
  return Cycle(date=day, cycle=cycle)
99
- logger.warning("No GEFS wave cycle found in the last %d days.", max_days_back)
100
  return None
101
 
102
- def _cycle_available(self, day: str, cycle: str) -> bool:
103
- filename = f"gefswave.t{cycle}z.c00.global.0p25.f000.grib2"
104
- for base_url in BASE_URLS:
105
- path = f"gefs.{day}/{cycle}/wave/gridded/{filename}"
106
- url = self._build_url(base_url, path)
107
- if not url:
108
- logger.debug("Skipping %s for cycle %s %sz due to missing credentials.", base_url, day, cycle)
109
- continue
110
- try:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
111
  response = self.session.head(url, timeout=20)
112
  if response.ok:
113
- logger.info("Cycle %s %sz file found on %s", day, cycle, base_url)
114
  return True
115
- if response.status_code == 404:
116
- logger.debug("Cycle %s %sz missing on %s (404).", day, cycle, base_url)
117
- elif response.status_code in (401, 403):
118
- logger.warning(
119
- "Cycle %s %sz request to %s returned %s; will retry after refreshing credentials.",
120
- day,
121
- cycle,
122
- base_url,
123
- response.status_code,
124
- )
125
- if base_url == AZURE_URL:
126
- self._invalidate_azure_token()
127
- else:
128
- logger.debug(
129
- "Cycle %s %sz head request on %s returned HTTP %s.",
130
- day,
131
- cycle,
132
- base_url,
133
- response.status_code,
134
- )
135
- except requests.RequestException as exc:
136
- logger.debug("Error probing cycle %s %sz on %s: %s", day, cycle, base_url, exc)
137
  return False
138
 
139
- def _download_cycle(self, cycle: Cycle) -> List[Path]:
140
  files: List[Path] = []
141
- target_dir = self.data_root / cycle.date / cycle.cycle
142
  target_dir.mkdir(parents=True, exist_ok=True)
143
 
144
- for hour in FORECAST_HOURS:
145
- filename = f"gefswave.t{cycle.cycle}z.c00.global.0p25.f{hour:03d}.grib2"
146
  destination = target_dir / filename
147
  if destination.exists() and destination.stat().st_size > 0:
148
- logger.debug("File %s already exists; skipping download.", destination)
149
  files.append(destination)
150
  continue
151
  logger.info("Downloading %s", filename)
152
- self._download_with_fallback(cycle, filename, destination)
153
  files.append(destination)
154
  return files
155
 
156
- def _build_tarball(self, cycle: Cycle, files: Iterable[Path]) -> Path:
157
- tarball_path = self.data_root / TARBALL_FILENAME
158
  with tarfile.open(tarball_path, "w:gz") as tar:
159
  for file_path in files:
160
  tar.add(file_path, arcname=file_path.relative_to(self.data_root))
161
  return tarball_path
162
 
163
- def _download_with_fallback(self, cycle: Cycle, filename: str, destination: Path) -> None:
164
  errors = []
165
- for base_url in BASE_URLS:
166
- path = f"{cycle.directory}/wave/gridded/{filename}"
167
- url = self._build_url(base_url, path)
168
- if not url:
169
- logger.debug("Skipping download from %s for %s due to missing URL.", base_url, filename)
170
- continue
171
  try:
172
- self._stream_to_file(url, destination, base_url)
 
 
 
 
173
  return
174
  except WaveDownloadError as exc:
175
  errors.append(str(exc))
176
- logger.warning("Failed to download %s from %s (%s).", filename, base_url, exc)
177
  raise WaveDownloadError(f"All download attempts failed for {filename}: {'; '.join(errors)}")
178
 
179
- def _stream_to_file(self, url: str, destination: Path, base_url: str) -> None:
 
 
 
 
 
 
 
 
 
 
 
180
  tmp_path = destination.with_suffix(destination.suffix + ".part")
181
  try:
182
  with self.session.get(url, stream=True, timeout=120) as response:
183
- if response.status_code in (401, 403) and base_url == AZURE_URL:
184
  self._invalidate_azure_token()
185
- response.raise_for_status()
186
  response.raise_for_status()
187
  with tmp_path.open("wb") as handle:
188
  for chunk in response.iter_content(chunk_size=1 << 20):
@@ -190,20 +278,44 @@ class WaveDownloader:
190
  continue
191
  handle.write(chunk)
192
  tmp_path.rename(destination)
 
 
 
 
193
  except requests.RequestException as exc:
194
  if tmp_path.exists():
195
  tmp_path.unlink()
196
  logger.error("Failed to download %s: %s", url, exc)
197
  raise WaveDownloadError(f"Failed to download {url}: {exc}") from exc
198
 
199
- def _build_url(self, base_url: str, path: str) -> Optional[str]:
200
- if base_url == AZURE_URL:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
201
  token = self._get_azure_sas_token()
202
  if not token:
203
  return None
204
  token = token if token.startswith("?") else f"?{token}"
205
- return f"{base_url}/{path}{token}"
206
- return f"{base_url}/{path}"
207
 
208
  def _get_azure_sas_token(self) -> Optional[str]:
209
  if self._azure_sas_token:
@@ -238,11 +350,13 @@ class WaveDownloader:
238
  self._azure_sas_token = None
239
  self._azure_token_checked = False
240
 
241
- def _cycle_complete(self, cycle: Cycle) -> bool:
242
- target_dir = self.data_root / cycle.date / cycle.cycle
243
  if not target_dir.exists():
244
  return False
245
- expected = {f"gefswave.t{cycle.cycle}z.c00.global.0p25.f{hour:03d}.grib2" for hour in FORECAST_HOURS}
 
 
246
  existing = {path.name for path in target_dir.glob("*.grib2") if path.stat().st_size > 0}
247
  return expected.issubset(existing)
248
 
@@ -274,7 +388,7 @@ class WaveDownloadManager:
274
  if self._worker and self._worker.is_alive():
275
  logger.info("Refresh already in progress; ignoring new trigger.")
276
  return
277
- logger.info("Starting GEFS wave refresh thread.")
278
  self._status = {"status": "running"}
279
  self._worker = threading.Thread(target=self._run_refresh, daemon=True)
280
  self._worker.start()
@@ -292,8 +406,8 @@ class WaveDownloadManager:
292
  result = self.downloader.ensure_latest_cycle()
293
  with self._lock:
294
  self._status = {"status": "ready", "latest_state": result}
295
- logger.info("GEFS wave refresh completed successfully.")
296
  except Exception as exc:
297
  with self._lock:
298
  self._status = {"status": "error", "message": str(exc)}
299
- logger.exception("GEFS wave refresh failed: %s", exc)
 
11
  import requests
12
 
13
 
 
 
 
14
  AZURE_SAS_ENDPOINT = "https://planetarycomputer.microsoft.com/api/sas/v1/token/noaagefs/gefs"
15
+ FORECAST_HOURS_GEFS: List[int] = list(range(0, 387, 3))
16
+ FORECAST_HOURS_GFS: List[int] = list(range(0, 387, 3))
17
  STATE_FILENAME = "state.json"
18
+
19
 
20
  logger = logging.getLogger("gefs_wave")
21
  if not logger.handlers:
 
26
  logger.propagate = False
27
 
28
 
29
+ @dataclass
30
+ class Endpoint:
31
+ base_url: str
32
+ mode: str = "dated" # "dated" or "latest"
33
+ needs_sas: bool = False
34
+ label: Optional[str] = None
35
+
36
+
37
+ @dataclass
38
+ class WaveProduct:
39
+ key: str
40
+ label: str
41
+ directory_template: str
42
+ filename_template: str
43
+ endpoints: List[Endpoint]
44
+ forecast_hours: List[int]
45
+ max_days_back: int
46
+
47
+
48
+ PRODUCTS: List[WaveProduct] = [
49
+ WaveProduct(
50
+ key="gefswave_c00",
51
+ label="GEFS Wave Control (c00)",
52
+ directory_template="gefs.{date}/{cycle}/wave/gridded",
53
+ filename_template="gefswave.t{cycle}z.c00.global.0p25.f{hour:03d}.grib2",
54
+ endpoints=[
55
+ Endpoint("https://noaagefs.blob.core.windows.net/gefs", needs_sas=True, label="Azure GEFS mirror"),
56
+ Endpoint("https://nomads.ncep.noaa.gov/pub/data/nccf/com/gens/prod", label="NOMADS GEFS"),
57
+ ],
58
+ forecast_hours=FORECAST_HOURS_GEFS,
59
+ max_days_back=9,
60
+ ),
61
+ WaveProduct(
62
+ key="gfswave_global",
63
+ label="GFS Wave Global 0.25°",
64
+ directory_template="gfs.{date}/{cycle}/wave/gridded",
65
+ filename_template="gfswave.t{cycle}z.global.0p25.f{hour:03d}.grib2",
66
+ endpoints=[
67
+ Endpoint("https://nomads.ncep.noaa.gov/pub/data/nccf/com/wave/prod", label="NOMADS GFS"),
68
+ Endpoint("https://polar.ncep.noaa.gov/waves/latest_run", mode="latest", label="NOAA Polar latest_run"),
69
+ ],
70
+ forecast_hours=FORECAST_HOURS_GFS,
71
+ max_days_back=5,
72
+ ),
73
+ ]
74
+
75
+
76
  @dataclass
77
  class Cycle:
78
  date: str # YYYYMMDD
 
82
  def label(self) -> str:
83
  return f"{self.date} {self.cycle}Z"
84
 
 
 
 
 
85
 
86
  class WaveDownloadError(Exception):
87
  """Raised when a download or discovery step fails."""
88
 
89
 
90
+ class RetryableAuthError(WaveDownloadError):
91
+ """Raised when an auth token should be refreshed and retried."""
92
+
93
+
94
  class WaveDownloader:
95
+ """Handles discovery and download of wave datasets."""
96
 
97
  def __init__(self, data_root: Path, session: Optional[requests.Session] = None) -> None:
98
  self.data_root = data_root
 
102
  self._azure_token_checked = False
103
 
104
  def ensure_latest_cycle(self) -> Dict:
105
+ """Ensure the latest available dataset (GEFS first, then GFS) is cached locally."""
106
+ errors: List[str] = []
107
+ for product in PRODUCTS:
108
+ try:
109
+ return self._ensure_product(product)
110
+ except WaveDownloadError as exc:
111
+ logger.warning("Failed to refresh %s: %s", product.label, exc)
112
+ errors.append(f"{product.label}: {exc}")
113
+
114
+ raise WaveDownloadError("All configured products failed. " + " | ".join(errors))
115
+
116
+ def _ensure_product(self, product: WaveProduct) -> Dict:
117
+ logger.info("Ensuring latest %s dataset in %s", product.label, self.data_root)
118
+ cycle = self._find_latest_cycle(product)
119
  if not cycle:
120
+ raise WaveDownloadError("Could not locate a recent cycle.")
 
121
 
122
  state = self._read_state()
123
+ if (
124
+ state
125
+ and state.get("product") == product.key
126
+ and state.get("cycle") == cycle.label
127
+ and self._cycle_complete(product, cycle)
128
+ ):
129
+ logger.info("Cycle %s already cached for %s.", cycle.label, product.label)
130
  return state
131
 
132
+ logger.info("Downloading %s cycle %s", product.label, cycle.label)
133
+ files = self._download_cycle(product, cycle)
134
+ tarball_path = self._build_tarball(product, cycle, files)
 
135
  state = {
136
+ "product": product.key,
137
+ "product_label": product.label,
138
  "cycle": cycle.label,
139
  "files": [str(path.relative_to(self.data_root)) for path in files],
140
  "tarball": str(tarball_path.relative_to(self.data_root)),
 
146
  def current_state(self) -> Optional[Dict]:
147
  return self._read_state()
148
 
149
+ def _find_latest_cycle(self, product: WaveProduct) -> Optional[Cycle]:
150
  now = datetime.now(timezone.utc)
151
+ candidate_days = [(now - timedelta(days=days)).strftime("%Y%m%d") for days in range(product.max_days_back + 1)]
152
  cycles = ["18", "12", "06", "00"]
153
 
154
  for day in candidate_days:
155
  for cycle in cycles:
156
+ logger.info("Probing %s cycle %s %sz", product.label, day, cycle)
157
+ if self._cycle_available(product, day, cycle):
158
+ logger.info("Found %s cycle %s %sz.", product.label, day, cycle)
159
  return Cycle(date=day, cycle=cycle)
160
+ logger.warning("No %s cycle found in the last %d days.", product.label, product.max_days_back)
161
  return None
162
 
163
+ def _cycle_available(self, product: WaveProduct, day: str, cycle_hour: str) -> bool:
164
+ for endpoint in product.endpoints:
165
+ if self._probe_endpoint(product, endpoint, day, cycle_hour):
166
+ return True
167
+ return False
168
+
169
+ def _probe_endpoint(self, product: WaveProduct, endpoint: Endpoint, day: str, cycle_hour: str) -> bool:
170
+ url = self._build_url(product, endpoint, day, cycle_hour, hour=0)
171
+ if not url:
172
+ return False
173
+
174
+ try:
175
+ response = self.session.head(url, timeout=20)
176
+ if response.ok:
177
+ logger.info("Cycle %s %sz available via %s", day, cycle_hour, endpoint.label or endpoint.base_url)
178
+ return True
179
+
180
+ if response.status_code in (401, 403) and endpoint.needs_sas:
181
+ logger.warning(
182
+ "Auth error probing %s on %s (HTTP %s). Refreshing credentials.",
183
+ product.label,
184
+ endpoint.label or endpoint.base_url,
185
+ response.status_code,
186
+ )
187
+ self._invalidate_azure_token()
188
+ url = self._build_url(product, endpoint, day, cycle_hour, hour=0)
189
+ if not url:
190
+ return False
191
  response = self.session.head(url, timeout=20)
192
  if response.ok:
 
193
  return True
194
+
195
+ if response.status_code not in (404, 400):
196
+ range_headers = {"Range": "bytes=0-0"}
197
+ with self.session.get(url, headers=range_headers, timeout=20, stream=True) as stream_resp:
198
+ if stream_resp.status_code in (200, 206):
199
+ logger.info(
200
+ "Cycle %s %sz confirmed via range request on %s",
201
+ day,
202
+ cycle_hour,
203
+ endpoint.label or endpoint.base_url,
204
+ )
205
+ return True
206
+ except requests.RequestException as exc:
207
+ logger.debug(
208
+ "Error probing %s cycle %s %sz on %s: %s",
209
+ product.label,
210
+ day,
211
+ cycle_hour,
212
+ endpoint.label or endpoint.base_url,
213
+ exc,
214
+ )
 
215
  return False
216
 
217
+ def _download_cycle(self, product: WaveProduct, cycle: Cycle) -> List[Path]:
218
  files: List[Path] = []
219
+ target_dir = self.data_root / product.key / cycle.date / cycle.cycle
220
  target_dir.mkdir(parents=True, exist_ok=True)
221
 
222
+ for hour in product.forecast_hours:
223
+ filename = product.filename_template.format(cycle=cycle.cycle, hour=hour)
224
  destination = target_dir / filename
225
  if destination.exists() and destination.stat().st_size > 0:
226
+ logger.debug("File %s already exists; skipping.", destination)
227
  files.append(destination)
228
  continue
229
  logger.info("Downloading %s", filename)
230
+ self._download_with_fallback(product, cycle, filename, destination)
231
  files.append(destination)
232
  return files
233
 
234
+ def _build_tarball(self, product: WaveProduct, cycle: Cycle, files: Iterable[Path]) -> Path:
235
+ tarball_path = self.data_root / f"{product.key}-latest.tar.gz"
236
  with tarfile.open(tarball_path, "w:gz") as tar:
237
  for file_path in files:
238
  tar.add(file_path, arcname=file_path.relative_to(self.data_root))
239
  return tarball_path
240
 
241
+ def _download_with_fallback(self, product: WaveProduct, cycle: Cycle, filename: str, destination: Path) -> None:
242
  errors = []
243
+ for endpoint in product.endpoints:
 
 
 
 
 
244
  try:
245
+ self._stream_to_file(product, endpoint, cycle, filename, destination)
246
+ return
247
+ except RetryableAuthError:
248
+ logger.info("Retrying %s download on %s after refreshing credentials.", filename, endpoint.base_url)
249
+ self._stream_to_file(product, endpoint, cycle, filename, destination)
250
  return
251
  except WaveDownloadError as exc:
252
  errors.append(str(exc))
253
+ logger.warning("Failed to download %s from %s (%s).", filename, endpoint.base_url, exc)
254
  raise WaveDownloadError(f"All download attempts failed for {filename}: {'; '.join(errors)}")
255
 
256
+ def _stream_to_file(
257
+ self,
258
+ product: WaveProduct,
259
+ endpoint: Endpoint,
260
+ cycle: Cycle,
261
+ filename: str,
262
+ destination: Path,
263
+ ) -> None:
264
+ url = self._build_url(product, endpoint, cycle.date, cycle.cycle, filename=filename)
265
+ if not url:
266
+ raise WaveDownloadError(f"No accessible URL for endpoint {endpoint.base_url}")
267
+
268
  tmp_path = destination.with_suffix(destination.suffix + ".part")
269
  try:
270
  with self.session.get(url, stream=True, timeout=120) as response:
271
+ if response.status_code in (401, 403) and endpoint.needs_sas:
272
  self._invalidate_azure_token()
273
+ raise RetryableAuthError(f"Auth failure downloading {filename} from {endpoint.base_url}")
274
  response.raise_for_status()
275
  with tmp_path.open("wb") as handle:
276
  for chunk in response.iter_content(chunk_size=1 << 20):
 
278
  continue
279
  handle.write(chunk)
280
  tmp_path.rename(destination)
281
+ except RetryableAuthError:
282
+ if tmp_path.exists():
283
+ tmp_path.unlink()
284
+ raise
285
  except requests.RequestException as exc:
286
  if tmp_path.exists():
287
  tmp_path.unlink()
288
  logger.error("Failed to download %s: %s", url, exc)
289
  raise WaveDownloadError(f"Failed to download {url}: {exc}") from exc
290
 
291
+ def _build_url(
292
+ self,
293
+ product: WaveProduct,
294
+ endpoint: Endpoint,
295
+ day: str,
296
+ cycle_hour: str,
297
+ hour: Optional[int] = None,
298
+ filename: Optional[str] = None,
299
+ ) -> Optional[str]:
300
+ if filename is None:
301
+ if hour is None:
302
+ raise ValueError("Either hour or filename must be provided.")
303
+ filename = product.filename_template.format(cycle=cycle_hour, hour=hour)
304
+
305
+ base = endpoint.base_url.rstrip("/")
306
+ if endpoint.mode == "latest":
307
+ url = f"{base}/{filename}"
308
+ else:
309
+ directory = product.directory_template.format(date=day, cycle=cycle_hour)
310
+ url = f"{base}/{directory}/{filename}"
311
+
312
+ if endpoint.needs_sas:
313
  token = self._get_azure_sas_token()
314
  if not token:
315
  return None
316
  token = token if token.startswith("?") else f"?{token}"
317
+ return f"{url}{token}"
318
+ return url
319
 
320
  def _get_azure_sas_token(self) -> Optional[str]:
321
  if self._azure_sas_token:
 
350
  self._azure_sas_token = None
351
  self._azure_token_checked = False
352
 
353
+ def _cycle_complete(self, product: WaveProduct, cycle: Cycle) -> bool:
354
+ target_dir = self.data_root / product.key / cycle.date / cycle.cycle
355
  if not target_dir.exists():
356
  return False
357
+ expected = {
358
+ product.filename_template.format(cycle=cycle.cycle, hour=hour) for hour in product.forecast_hours
359
+ }
360
  existing = {path.name for path in target_dir.glob("*.grib2") if path.stat().st_size > 0}
361
  return expected.issubset(existing)
362
 
 
388
  if self._worker and self._worker.is_alive():
389
  logger.info("Refresh already in progress; ignoring new trigger.")
390
  return
391
+ logger.info("Starting wave refresh thread.")
392
  self._status = {"status": "running"}
393
  self._worker = threading.Thread(target=self._run_refresh, daemon=True)
394
  self._worker.start()
 
406
  result = self.downloader.ensure_latest_cycle()
407
  with self._lock:
408
  self._status = {"status": "ready", "latest_state": result}
409
+ logger.info("Wave refresh completed successfully.")
410
  except Exception as exc:
411
  with self._lock:
412
  self._status = {"status": "error", "message": str(exc)}
413
+ logger.exception("Wave refresh failed: %s", exc)