broadfield-dev commited on
Commit
1662274
·
verified ·
1 Parent(s): d3c98a4

Update rss_processor.py

Browse files
Files changed (1) hide show
  1. rss_processor.py +35 -49
rss_processor.py CHANGED
@@ -10,7 +10,7 @@ import dateutil.parser
10
  import hashlib
11
  import json
12
  import re
13
- import requests
14
 
15
  logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
16
  logger = logging.getLogger(__name__)
@@ -21,7 +21,7 @@ COLLECTION_NAME = "news_articles"
21
  HF_API_TOKEN = os.getenv("HF_TOKEN")
22
  REPO_ID = "broadfield-dev/news-rag-db"
23
  MAX_ARTICLES_PER_FEED = 1000
24
- RAW_FEEDS_DIR = "raw_rss_feeds"
25
 
26
  def initialize_hf_api():
27
  if not HF_API_TOKEN:
@@ -43,7 +43,7 @@ def get_embedding_model():
43
  def clean_text(text):
44
  if not text or not isinstance(text, str):
45
  return ""
46
- text = re.sub(r'<.*?>', '', text)
47
  text = ' '.join(text.split())
48
  return text.strip()
49
 
@@ -83,11 +83,10 @@ def fetch_rss_feeds():
83
 
84
  try:
85
  logger.info(f"Fetching {feed_url}")
86
- # Fetch raw content first to save it
87
  response = requests.get(feed_url, headers={'User-Agent': 'Mozilla/5.0'})
88
  response.raise_for_status()
89
  raw_content = response.text
90
- save_raw_rss_to_file(feed_url, raw_content)
91
 
92
  feed = feedparser.parse(raw_content)
93
  if feed.bozo:
@@ -100,7 +99,6 @@ def fetch_rss_feeds():
100
  continue
101
 
102
  seen_links.add(link)
103
-
104
  title = entry.get("title", "No Title")
105
  description_raw = entry.get("summary", entry.get("description", ""))
106
  description = clean_text(description_raw)
@@ -135,12 +133,8 @@ def fetch_rss_feeds():
135
  continue
136
 
137
  articles.append({
138
- "title": title,
139
- "link": link,
140
- "description": description,
141
- "published": published_str,
142
- "category": category,
143
- "image": image,
144
  })
145
  except requests.exceptions.RequestException as e:
146
  logger.error(f"Error fetching {feed_url}: {e}")
@@ -164,27 +158,16 @@ def process_and_store_articles(articles):
164
  logger.info("No existing DB found or it is empty. Starting fresh.")
165
  existing_ids = set()
166
 
167
- contents_to_add = []
168
- metadatas_to_add = []
169
- ids_to_add = []
170
-
171
  for article in articles:
172
- if not article.get('link'):
173
- continue
174
-
175
  doc_id = hashlib.sha256(article['link'].encode('utf-8')).hexdigest()
 
176
 
177
- if doc_id in existing_ids:
178
- continue
179
-
180
  metadata = {
181
- "title": article["title"],
182
- "link": article["link"],
183
- "published": article["published"],
184
- "category": article["category"],
185
- "image": article["image"],
186
  }
187
-
188
  contents_to_add.append(article["description"])
189
  metadatas_to_add.append(metadata)
190
  ids_to_add.append(doc_id)
@@ -194,13 +177,7 @@ def process_and_store_articles(articles):
194
  try:
195
  embedding_model = get_embedding_model()
196
  embeddings_to_add = embedding_model.embed_documents(contents_to_add)
197
-
198
- collection.add(
199
- embeddings=embeddings_to_add,
200
- documents=contents_to_add,
201
- metadatas=metadatas_to_add,
202
- ids=ids_to_add
203
- )
204
  logger.info(f"Successfully added {len(ids_to_add)} new articles to DB. Total in DB: {collection.count()}")
205
  except Exception as e:
206
  logger.error(f"Error storing articles in ChromaDB: {e}", exc_info=True)
@@ -212,12 +189,8 @@ def download_from_hf_hub():
212
  try:
213
  logger.info(f"Downloading Chroma DB from {REPO_ID} to {LOCAL_DB_DIR}...")
214
  snapshot_download(
215
- repo_id=REPO_ID,
216
- repo_type="dataset",
217
- local_dir=".",
218
- local_dir_use_symlinks=False,
219
- allow_patterns=[f"{LOCAL_DB_DIR}/**"],
220
- token=HF_API_TOKEN
221
  )
222
  logger.info("Finished downloading DB.")
223
  except Exception as e:
@@ -226,20 +199,33 @@ def download_from_hf_hub():
226
  logger.info(f"Local Chroma DB found at '{LOCAL_DB_DIR}', skipping download.")
227
 
228
  def upload_to_hf_hub():
 
 
 
 
229
  if os.path.exists(LOCAL_DB_DIR):
230
  try:
231
  logger.info(f"Uploading updated Chroma DB '{LOCAL_DB_DIR}' to {REPO_ID}...")
232
  hf_api.upload_folder(
233
- folder_path=LOCAL_DB_DIR,
234
- path_in_repo=LOCAL_DB_DIR,
235
- repo_id=REPO_ID,
236
- repo_type="dataset",
237
- commit_message=f"Update RSS news database {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}",
238
- ignore_patterns=["*.bak", "*.tmp"]
239
  )
240
  logger.info(f"Database folder '{LOCAL_DB_DIR}' uploaded to: {REPO_ID}")
241
  except Exception as e:
242
- logger.error(f"Error uploading to Hugging Face Hub: {e}", exc_info=True)
 
 
 
 
 
 
 
 
 
 
 
 
 
243
 
244
  def main():
245
  try:
@@ -247,7 +233,7 @@ def main():
247
  articles_to_process = fetch_rss_feeds()
248
  if articles_to_process:
249
  process_and_store_articles(articles_to_process)
250
- upload_to_hf_hub()
251
  else:
252
  logger.info("No articles fetched, skipping database processing and upload.")
253
  except Exception as e:
 
10
  import hashlib
11
  import json
12
  import re
13
+ import requests # Ensure requests is imported
14
 
15
  logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
16
  logger = logging.getLogger(__name__)
 
21
  HF_API_TOKEN = os.getenv("HF_TOKEN")
22
  REPO_ID = "broadfield-dev/news-rag-db"
23
  MAX_ARTICLES_PER_FEED = 1000
24
+ RAW_FEEDS_DIR = "raw_rss_feeds" # Directory for raw RSS files
25
 
26
  def initialize_hf_api():
27
  if not HF_API_TOKEN:
 
43
  def clean_text(text):
44
  if not text or not isinstance(text, str):
45
  return ""
46
+ text = re.sub(r'<|.*?>', '', text)
47
  text = ' '.join(text.split())
48
  return text.strip()
49
 
 
83
 
84
  try:
85
  logger.info(f"Fetching {feed_url}")
 
86
  response = requests.get(feed_url, headers={'User-Agent': 'Mozilla/5.0'})
87
  response.raise_for_status()
88
  raw_content = response.text
89
+ save_raw_rss_to_file(feed_url, raw_content) # Save the raw feed
90
 
91
  feed = feedparser.parse(raw_content)
92
  if feed.bozo:
 
99
  continue
100
 
101
  seen_links.add(link)
 
102
  title = entry.get("title", "No Title")
103
  description_raw = entry.get("summary", entry.get("description", ""))
104
  description = clean_text(description_raw)
 
133
  continue
134
 
135
  articles.append({
136
+ "title": title, "link": link, "description": description,
137
+ "published": published_str, "category": category, "image": image,
 
 
 
 
138
  })
139
  except requests.exceptions.RequestException as e:
140
  logger.error(f"Error fetching {feed_url}: {e}")
 
158
  logger.info("No existing DB found or it is empty. Starting fresh.")
159
  existing_ids = set()
160
 
161
+ contents_to_add, metadatas_to_add, ids_to_add = [], [], []
 
 
 
162
  for article in articles:
163
+ if not article.get('link'): continue
 
 
164
  doc_id = hashlib.sha256(article['link'].encode('utf-8')).hexdigest()
165
+ if doc_id in existing_ids: continue
166
 
 
 
 
167
  metadata = {
168
+ "title": article["title"], "link": article["link"], "published": article["published"],
169
+ "category": article["category"], "image": article["image"],
 
 
 
170
  }
 
171
  contents_to_add.append(article["description"])
172
  metadatas_to_add.append(metadata)
173
  ids_to_add.append(doc_id)
 
177
  try:
178
  embedding_model = get_embedding_model()
179
  embeddings_to_add = embedding_model.embed_documents(contents_to_add)
180
+ collection.add(embeddings=embeddings_to_add, documents=contents_to_add, metadatas=metadatas_to_add, ids=ids_to_add)
 
 
 
 
 
 
181
  logger.info(f"Successfully added {len(ids_to_add)} new articles to DB. Total in DB: {collection.count()}")
182
  except Exception as e:
183
  logger.error(f"Error storing articles in ChromaDB: {e}", exc_info=True)
 
189
  try:
190
  logger.info(f"Downloading Chroma DB from {REPO_ID} to {LOCAL_DB_DIR}...")
191
  snapshot_download(
192
+ repo_id=REPO_ID, repo_type="dataset", local_dir=".",
193
+ local_dir_use_symlinks=False, allow_patterns=[f"{LOCAL_DB_DIR}/**"], token=HF_API_TOKEN
 
 
 
 
194
  )
195
  logger.info("Finished downloading DB.")
196
  except Exception as e:
 
199
  logger.info(f"Local Chroma DB found at '{LOCAL_DB_DIR}', skipping download.")
200
 
201
  def upload_to_hf_hub():
202
+ """Uploads both the ChromaDB and the raw RSS feeds to the Hugging Face Hub."""
203
+ commit_message = f"Update RSS news database and raw feeds {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
204
+
205
+ # Upload ChromaDB if it exists
206
  if os.path.exists(LOCAL_DB_DIR):
207
  try:
208
  logger.info(f"Uploading updated Chroma DB '{LOCAL_DB_DIR}' to {REPO_ID}...")
209
  hf_api.upload_folder(
210
+ folder_path=LOCAL_DB_DIR, path_in_repo=LOCAL_DB_DIR, repo_id=REPO_ID,
211
+ repo_type="dataset", commit_message=commit_message, ignore_patterns=["*.bak", "*.tmp"]
 
 
 
 
212
  )
213
  logger.info(f"Database folder '{LOCAL_DB_DIR}' uploaded to: {REPO_ID}")
214
  except Exception as e:
215
+ logger.error(f"Error uploading Chroma DB to Hugging Face Hub: {e}", exc_info=True)
216
+
217
+ # Upload Raw RSS Feeds directory if it exists
218
+ if os.path.exists(RAW_FEEDS_DIR):
219
+ try:
220
+ logger.info(f"Uploading raw RSS feeds from '{RAW_FEEDS_DIR}' to {REPO_ID}...")
221
+ hf_api.upload_folder(
222
+ folder_path=RAW_FEEDS_DIR, path_in_repo=RAW_FEEDS_DIR, repo_id=REPO_ID,
223
+ repo_type="dataset", commit_message=commit_message
224
+ )
225
+ logger.info(f"Raw feeds folder '{RAW_FEEDS_DIR}' uploaded to: {REPO_ID}")
226
+ except Exception as e:
227
+ logger.error(f"Error uploading raw feeds to Hugging Face Hub: {e}", exc_info=True)
228
+
229
 
230
  def main():
231
  try:
 
233
  articles_to_process = fetch_rss_feeds()
234
  if articles_to_process:
235
  process_and_store_articles(articles_to_process)
236
+ upload_to_hf_hub() # This now uploads both directories
237
  else:
238
  logger.info("No articles fetched, skipping database processing and upload.")
239
  except Exception as e: