nakas commited on
Commit
3566df5
·
1 Parent(s): 396b409

Add detailed logging for wave downloader

Browse files
Files changed (1) hide show
  1. gefs_wave.py +27 -1
gefs_wave.py CHANGED
@@ -1,4 +1,5 @@
1
  import json
 
2
  import re
3
  import tarfile
4
  import threading
@@ -15,6 +16,14 @@ FORECAST_HOURS: List[int] = list(range(0, 387, 3))
15
  STATE_FILENAME = "state.json"
16
  TARBALL_FILENAME = "gefswave-wave0-latest.tar.gz"
17
 
 
 
 
 
 
 
 
 
18
 
19
  @dataclass
20
  class Cycle:
@@ -44,16 +53,21 @@ class WaveDownloader:
44
 
45
  def ensure_latest_cycle(self) -> Dict:
46
  """Ensure the latest available control-member dataset is present locally."""
 
47
  cycle = self._find_latest_cycle()
48
  if not cycle:
 
49
  raise WaveDownloadError("Could not locate a recent GEFS wave cycle.")
50
 
51
  state = self._read_state()
52
  if state and state.get("cycle") == cycle.label and self._cycle_complete(cycle):
 
53
  return state
54
 
 
55
  files = self._download_cycle(cycle)
56
  tarball_path = self._build_tarball(cycle, files)
 
57
  state = {
58
  "cycle": cycle.label,
59
  "files": [str(path.relative_to(self.data_root)) for path in files],
@@ -73,8 +87,11 @@ class WaveDownloader:
73
 
74
  for day in candidate_days:
75
  for cycle in cycles:
 
76
  if self._cycle_available(day, cycle):
 
77
  return Cycle(date=day, cycle=cycle)
 
78
  return None
79
 
80
  def _cycle_available(self, day: str, cycle: str) -> bool:
@@ -82,10 +99,12 @@ class WaveDownloader:
82
  try:
83
  response = self.session.get(url, timeout=20)
84
  if not response.ok:
 
85
  return False
86
  pattern = re.compile(rf"gefswave\.t{cycle}z\.c00\.global\.0p25\.f000\.grib2")
87
  return bool(pattern.search(response.text))
88
- except requests.RequestException:
 
89
  return False
90
 
91
  def _download_cycle(self, cycle: Cycle) -> List[Path]:
@@ -98,8 +117,10 @@ class WaveDownloader:
98
  url = f"{BASE_URL}/{cycle.directory}/wave/gridded/{filename}"
99
  destination = target_dir / filename
100
  if destination.exists() and destination.stat().st_size > 0:
 
101
  files.append(destination)
102
  continue
 
103
  self._stream_to_file(url, destination)
104
  files.append(destination)
105
  return files
@@ -125,6 +146,7 @@ class WaveDownloader:
125
  except requests.RequestException as exc:
126
  if tmp_path.exists():
127
  tmp_path.unlink()
 
128
  raise WaveDownloadError(f"Failed to download {url}: {exc}") from exc
129
 
130
  def _cycle_complete(self, cycle: Cycle) -> bool:
@@ -161,7 +183,9 @@ class WaveDownloadManager:
161
  def trigger_refresh(self) -> None:
162
  with self._lock:
163
  if self._worker and self._worker.is_alive():
 
164
  return
 
165
  self._status = {"status": "running"}
166
  self._worker = threading.Thread(target=self._run_refresh, daemon=True)
167
  self._worker.start()
@@ -179,6 +203,8 @@ class WaveDownloadManager:
179
  result = self.downloader.ensure_latest_cycle()
180
  with self._lock:
181
  self._status = {"status": "ready", "latest_state": result}
 
182
  except Exception as exc:
183
  with self._lock:
184
  self._status = {"status": "error", "message": str(exc)}
 
 
1
  import json
2
+ import logging
3
  import re
4
  import tarfile
5
  import threading
 
16
  STATE_FILENAME = "state.json"
17
  TARBALL_FILENAME = "gefswave-wave0-latest.tar.gz"
18
 
19
+ logger = logging.getLogger("gefs_wave")
20
+ if not logger.handlers:
21
+ handler = logging.StreamHandler()
22
+ handler.setFormatter(logging.Formatter("%(asctime)s %(levelname)s %(message)s"))
23
+ logger.addHandler(handler)
24
+ logger.setLevel(logging.INFO)
25
+ logger.propagate = False
26
+
27
 
28
  @dataclass
29
  class Cycle:
 
53
 
54
  def ensure_latest_cycle(self) -> Dict:
55
  """Ensure the latest available control-member dataset is present locally."""
56
+ logger.info("Ensuring latest GEFS wave control-member cycle for %s", self.data_root)
57
  cycle = self._find_latest_cycle()
58
  if not cycle:
59
+ logger.error("No GEFS wave cycle found in the recent window.")
60
  raise WaveDownloadError("Could not locate a recent GEFS wave cycle.")
61
 
62
  state = self._read_state()
63
  if state and state.get("cycle") == cycle.label and self._cycle_complete(cycle):
64
+ logger.info("Cycle %s already cached; using existing files.", cycle.label)
65
  return state
66
 
67
+ logger.info("Downloading cycle %s", cycle.label)
68
  files = self._download_cycle(cycle)
69
  tarball_path = self._build_tarball(cycle, files)
70
+ logger.info("Packaged cycle %s into tarball %s", cycle.label, tarball_path.name)
71
  state = {
72
  "cycle": cycle.label,
73
  "files": [str(path.relative_to(self.data_root)) for path in files],
 
87
 
88
  for day in candidate_days:
89
  for cycle in cycles:
90
+ logger.info("Probing cycle %s %sz", day, cycle)
91
  if self._cycle_available(day, cycle):
92
+ logger.info("Cycle %s %sz is available.", day, cycle)
93
  return Cycle(date=day, cycle=cycle)
94
+ logger.warning("No GEFS wave cycle found in the last %d days.", max_days_back)
95
  return None
96
 
97
  def _cycle_available(self, day: str, cycle: str) -> bool:
 
99
  try:
100
  response = self.session.get(url, timeout=20)
101
  if not response.ok:
102
+ logger.debug("Cycle %s %sz check failed with HTTP %s.", day, cycle, response.status_code)
103
  return False
104
  pattern = re.compile(rf"gefswave\.t{cycle}z\.c00\.global\.0p25\.f000\.grib2")
105
  return bool(pattern.search(response.text))
106
+ except requests.RequestException as exc:
107
+ logger.debug("Error probing cycle %s %sz: %s", day, cycle, exc)
108
  return False
109
 
110
  def _download_cycle(self, cycle: Cycle) -> List[Path]:
 
117
  url = f"{BASE_URL}/{cycle.directory}/wave/gridded/{filename}"
118
  destination = target_dir / filename
119
  if destination.exists() and destination.stat().st_size > 0:
120
+ logger.debug("File %s already exists; skipping download.", destination)
121
  files.append(destination)
122
  continue
123
+ logger.info("Downloading %s", filename)
124
  self._stream_to_file(url, destination)
125
  files.append(destination)
126
  return files
 
146
  except requests.RequestException as exc:
147
  if tmp_path.exists():
148
  tmp_path.unlink()
149
+ logger.error("Failed to download %s: %s", url, exc)
150
  raise WaveDownloadError(f"Failed to download {url}: {exc}") from exc
151
 
152
  def _cycle_complete(self, cycle: Cycle) -> bool:
 
183
  def trigger_refresh(self) -> None:
184
  with self._lock:
185
  if self._worker and self._worker.is_alive():
186
+ logger.info("Refresh already in progress; ignoring new trigger.")
187
  return
188
+ logger.info("Starting GEFS wave refresh thread.")
189
  self._status = {"status": "running"}
190
  self._worker = threading.Thread(target=self._run_refresh, daemon=True)
191
  self._worker.start()
 
203
  result = self.downloader.ensure_latest_cycle()
204
  with self._lock:
205
  self._status = {"status": "ready", "latest_state": result}
206
+ logger.info("GEFS wave refresh completed successfully.")
207
  except Exception as exc:
208
  with self._lock:
209
  self._status = {"status": "error", "message": str(exc)}
210
+ logger.exception("GEFS wave refresh failed: %s", exc)