nakas commited on
Commit
d093138
·
1 Parent(s): 3566df5

Add mirror fallback and log viewer

Browse files
Files changed (2) hide show
  1. app.py +40 -12
  2. gefs_wave.py +37 -15
app.py CHANGED
@@ -1,15 +1,42 @@
 
1
  import os
 
2
  from pathlib import Path
3
  from threading import Event
4
  from typing import Dict, Optional, Tuple
5
 
6
  import gradio as gr
7
 
8
- from gefs_wave import WaveDownloadManager
9
 
10
  DATA_ROOT = Path(os.environ.get("GEFS_WAVE_DATA_DIR", "data/gefswave"))
11
  manager = WaveDownloadManager(DATA_ROOT)
12
  _refresh_started = Event()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
13
 
14
 
15
  def ensure_refresh() -> None:
@@ -45,22 +72,22 @@ def format_status(status: Dict, notice: Optional[str] = None) -> Tuple[Dict, str
45
  return status, "\n".join(lines)
46
 
47
 
48
- def get_status() -> Tuple[Dict, str, Optional[str]]:
49
  ensure_refresh()
50
  status = manager.status()
51
  status_dict, status_text = format_status(status)
52
- return status_dict, status_text, None
53
 
54
 
55
- def trigger_refresh() -> Tuple[Dict, str, Optional[str]]:
56
  ensure_refresh()
57
  manager.trigger_refresh()
58
  status = manager.status()
59
  status_dict, status_text = format_status(status)
60
- return status_dict, status_text, None
61
 
62
 
63
- def download_tarball() -> Tuple[Dict, str, Optional[str]]:
64
  ensure_refresh()
65
  status = manager.status()
66
  state = status.get("latest_state", {})
@@ -70,17 +97,17 @@ def download_tarball() -> Tuple[Dict, str, Optional[str]]:
70
  status_dict, status_text = format_status(
71
  status, "Wave dataset not available yet. Trigger a refresh and try again."
72
  )
73
- return status_dict, status_text, None
74
 
75
  tarball_path = DATA_ROOT / tarball_rel
76
  if not tarball_path.exists():
77
  status_dict, status_text = format_status(
78
  status, "Tarball missing on disk. Trigger a refresh and try again."
79
  )
80
- return status_dict, status_text, None
81
 
82
  status_dict, status_text = format_status(status)
83
- return status_dict, status_text, str(tarball_path)
84
 
85
 
86
  with gr.Blocks(title="GFES Wave 0 Downloader") as demo:
@@ -96,14 +123,15 @@ with gr.Blocks(title="GFES Wave 0 Downloader") as demo:
96
  )
97
  status_json = gr.JSON(label="Downloader Status")
98
  status_md = gr.Markdown()
 
99
 
100
  with gr.Row():
101
  refresh_button = gr.Button("Trigger Refresh", variant="secondary")
102
  download_button = gr.DownloadButton("Download Latest Tarball", variant="primary")
103
 
104
- demo.load(get_status, outputs=[status_json, status_md, download_button])
105
- refresh_button.click(trigger_refresh, outputs=[status_json, status_md, download_button])
106
- download_button.click(download_tarball, outputs=[status_json, status_md, download_button])
107
 
108
  demo.queue()
109
 
 
1
+ import logging
2
  import os
3
+ from collections import deque
4
  from pathlib import Path
5
  from threading import Event
6
  from typing import Dict, Optional, Tuple
7
 
8
  import gradio as gr
9
 
10
+ from gefs_wave import WaveDownloadManager, logger as wave_logger
11
 
12
  DATA_ROOT = Path(os.environ.get("GEFS_WAVE_DATA_DIR", "data/gefswave"))
13
  manager = WaveDownloadManager(DATA_ROOT)
14
  _refresh_started = Event()
15
+ _log_buffer: deque[str] = deque(maxlen=200)
16
+
17
+
18
+ class UILogHandler(logging.Handler):
19
+ def emit(self, record) -> None: # type: ignore[override]
20
+ try:
21
+ message = self.format(record)
22
+ except Exception:
23
+ message = record.getMessage()
24
+ _log_buffer.append(message)
25
+
26
+
27
+ log_formatter = logging.Formatter("%Y-%m-%d %H:%M:%S %(levelname)s %(message)s")
28
+ ui_log_handler = UILogHandler()
29
+ ui_log_handler.setFormatter(log_formatter)
30
+
31
+ if not any(isinstance(handler, UILogHandler) for handler in wave_logger.handlers):
32
+ wave_logger.addHandler(ui_log_handler)
33
+ wave_logger.setLevel(logging.INFO)
34
+
35
+
36
+ def _log_text() -> str:
37
+ if not _log_buffer:
38
+ return "No log messages yet."
39
+ return "\n".join(_log_buffer)
40
 
41
 
42
  def ensure_refresh() -> None:
 
72
  return status, "\n".join(lines)
73
 
74
 
75
+ def get_status() -> Tuple[Dict, str, Optional[str], str]:
76
  ensure_refresh()
77
  status = manager.status()
78
  status_dict, status_text = format_status(status)
79
+ return status_dict, status_text, None, _log_text()
80
 
81
 
82
+ def trigger_refresh() -> Tuple[Dict, str, Optional[str], str]:
83
  ensure_refresh()
84
  manager.trigger_refresh()
85
  status = manager.status()
86
  status_dict, status_text = format_status(status)
87
+ return status_dict, status_text, None, _log_text()
88
 
89
 
90
+ def download_tarball() -> Tuple[Dict, str, Optional[str], str]:
91
  ensure_refresh()
92
  status = manager.status()
93
  state = status.get("latest_state", {})
 
97
  status_dict, status_text = format_status(
98
  status, "Wave dataset not available yet. Trigger a refresh and try again."
99
  )
100
+ return status_dict, status_text, None, _log_text()
101
 
102
  tarball_path = DATA_ROOT / tarball_rel
103
  if not tarball_path.exists():
104
  status_dict, status_text = format_status(
105
  status, "Tarball missing on disk. Trigger a refresh and try again."
106
  )
107
+ return status_dict, status_text, None, _log_text()
108
 
109
  status_dict, status_text = format_status(status)
110
+ return status_dict, status_text, str(tarball_path), _log_text()
111
 
112
 
113
  with gr.Blocks(title="GFES Wave 0 Downloader") as demo:
 
123
  )
124
  status_json = gr.JSON(label="Downloader Status")
125
  status_md = gr.Markdown()
126
+ logs_box = gr.Textbox(label="Recent Logs", lines=12, max_lines=12, interactive=False)
127
 
128
  with gr.Row():
129
  refresh_button = gr.Button("Trigger Refresh", variant="secondary")
130
  download_button = gr.DownloadButton("Download Latest Tarball", variant="primary")
131
 
132
+ demo.load(get_status, outputs=[status_json, status_md, download_button, logs_box])
133
+ refresh_button.click(trigger_refresh, outputs=[status_json, status_md, download_button, logs_box])
134
+ download_button.click(download_tarball, outputs=[status_json, status_md, download_button, logs_box])
135
 
136
  demo.queue()
137
 
gefs_wave.py CHANGED
@@ -11,7 +11,9 @@ from typing import Dict, Iterable, List, Optional
11
  import requests
12
 
13
 
14
- BASE_URL = "https://nomads.ncep.noaa.gov/pub/data/nccf/com/gens/prod"
 
 
15
  FORECAST_HOURS: List[int] = list(range(0, 387, 3))
16
  STATE_FILENAME = "state.json"
17
  TARBALL_FILENAME = "gefswave-wave0-latest.tar.gz"
@@ -80,7 +82,7 @@ class WaveDownloader:
80
  def current_state(self) -> Optional[Dict]:
81
  return self._read_state()
82
 
83
- def _find_latest_cycle(self, max_days_back: int = 5) -> Optional[Cycle]:
84
  now = datetime.now(timezone.utc)
85
  candidate_days = [(now - timedelta(days=days)).strftime("%Y%m%d") for days in range(max_days_back + 1)]
86
  cycles = ["18", "12", "06", "00"]
@@ -95,17 +97,26 @@ class WaveDownloader:
95
  return None
96
 
97
  def _cycle_available(self, day: str, cycle: str) -> bool:
98
- url = f"{BASE_URL}/gefs.{day}/{cycle}/wave/gridded/"
99
- try:
100
- response = self.session.get(url, timeout=20)
101
- if not response.ok:
102
- logger.debug("Cycle %s %sz check failed with HTTP %s.", day, cycle, response.status_code)
103
- return False
104
- pattern = re.compile(rf"gefswave\.t{cycle}z\.c00\.global\.0p25\.f000\.grib2")
105
- return bool(pattern.search(response.text))
106
- except requests.RequestException as exc:
107
- logger.debug("Error probing cycle %s %sz: %s", day, cycle, exc)
108
- return False
 
 
 
 
 
 
 
 
 
109
 
110
  def _download_cycle(self, cycle: Cycle) -> List[Path]:
111
  files: List[Path] = []
@@ -114,14 +125,13 @@ class WaveDownloader:
114
 
115
  for hour in FORECAST_HOURS:
116
  filename = f"gefswave.t{cycle.cycle}z.c00.global.0p25.f{hour:03d}.grib2"
117
- url = f"{BASE_URL}/{cycle.directory}/wave/gridded/{filename}"
118
  destination = target_dir / filename
119
  if destination.exists() and destination.stat().st_size > 0:
120
  logger.debug("File %s already exists; skipping download.", destination)
121
  files.append(destination)
122
  continue
123
  logger.info("Downloading %s", filename)
124
- self._stream_to_file(url, destination)
125
  files.append(destination)
126
  return files
127
 
@@ -132,6 +142,18 @@ class WaveDownloader:
132
  tar.add(file_path, arcname=file_path.relative_to(self.data_root))
133
  return tarball_path
134
 
 
 
 
 
 
 
 
 
 
 
 
 
135
  def _stream_to_file(self, url: str, destination: Path) -> None:
136
  tmp_path = destination.with_suffix(destination.suffix + ".part")
137
  try:
 
11
  import requests
12
 
13
 
14
+ NOMADS_URL = "https://nomads.ncep.noaa.gov/pub/data/nccf/com/gens/prod"
15
+ AZURE_URL = "https://noaagefs.blob.core.windows.net/gefs"
16
+ BASE_URLS = [AZURE_URL, NOMADS_URL]
17
  FORECAST_HOURS: List[int] = list(range(0, 387, 3))
18
  STATE_FILENAME = "state.json"
19
  TARBALL_FILENAME = "gefswave-wave0-latest.tar.gz"
 
82
  def current_state(self) -> Optional[Dict]:
83
  return self._read_state()
84
 
85
+ def _find_latest_cycle(self, max_days_back: int = 9) -> Optional[Cycle]:
86
  now = datetime.now(timezone.utc)
87
  candidate_days = [(now - timedelta(days=days)).strftime("%Y%m%d") for days in range(max_days_back + 1)]
88
  cycles = ["18", "12", "06", "00"]
 
97
  return None
98
 
99
  def _cycle_available(self, day: str, cycle: str) -> bool:
100
+ pattern = re.compile(rf"gefswave\.t{cycle}z\.c00\.global\.0p25\.f000\.grib2")
101
+ for base_url in BASE_URLS:
102
+ url = f"{base_url}/gefs.{day}/{cycle}/wave/gridded/"
103
+ try:
104
+ response = self.session.get(url, timeout=20)
105
+ if not response.ok:
106
+ logger.debug(
107
+ "Cycle %s %sz check failed on %s with HTTP %s.",
108
+ day,
109
+ cycle,
110
+ base_url,
111
+ response.status_code,
112
+ )
113
+ continue
114
+ if pattern.search(response.text):
115
+ logger.info("Cycle %s %sz present on %s", day, cycle, base_url)
116
+ return True
117
+ except requests.RequestException as exc:
118
+ logger.debug("Error probing cycle %s %sz on %s: %s", day, cycle, base_url, exc)
119
+ return False
120
 
121
  def _download_cycle(self, cycle: Cycle) -> List[Path]:
122
  files: List[Path] = []
 
125
 
126
  for hour in FORECAST_HOURS:
127
  filename = f"gefswave.t{cycle.cycle}z.c00.global.0p25.f{hour:03d}.grib2"
 
128
  destination = target_dir / filename
129
  if destination.exists() and destination.stat().st_size > 0:
130
  logger.debug("File %s already exists; skipping download.", destination)
131
  files.append(destination)
132
  continue
133
  logger.info("Downloading %s", filename)
134
+ self._download_with_fallback(cycle, filename, destination)
135
  files.append(destination)
136
  return files
137
 
 
142
  tar.add(file_path, arcname=file_path.relative_to(self.data_root))
143
  return tarball_path
144
 
145
+ def _download_with_fallback(self, cycle: Cycle, filename: str, destination: Path) -> None:
146
+ errors = []
147
+ for base_url in BASE_URLS:
148
+ url = f"{base_url}/{cycle.directory}/wave/gridded/{filename}"
149
+ try:
150
+ self._stream_to_file(url, destination)
151
+ return
152
+ except WaveDownloadError as exc:
153
+ errors.append(str(exc))
154
+ logger.warning("Failed to download %s from %s (%s).", filename, base_url, exc)
155
+ raise WaveDownloadError(f"All download attempts failed for {filename}: {'; '.join(errors)}")
156
+
157
  def _stream_to_file(self, url: str, destination: Path) -> None:
158
  tmp_path = destination.with_suffix(destination.suffix + ".part")
159
  try: