broadfield-dev commited on
Commit
7869f03
·
verified ·
1 Parent(s): d072be3

Create rss_processor.py

Browse files
Files changed (1) hide show
  1. rss_processor.py +285 -0
rss_processor.py ADDED
@@ -0,0 +1,285 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ import feedparser
3
+ from chromadb import PersistentClient
4
+ from langchain_community.embeddings import HuggingFaceEmbeddings
5
+ from langchain_core.documents import Document
6
+ import logging
7
+ from huggingface_hub import HfApi, login, snapshot_download
8
+ from datetime import datetime
9
+ import dateutil.parser
10
+ import hashlib
11
+ import json
12
+ import re
13
+ import requests
14
+
15
+ logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
16
+ logger = logging.getLogger(__name__)
17
+
18
+ LOCAL_DB_DIR = "chroma_db"
19
+ FEEDS_FILE = "rss_feeds.json"
20
+ COLLECTION_NAME = "news_articles"
21
+ HF_API_TOKEN = os.getenv("HF_TOKEN")
22
+ REPO_ID = "broadfield-dev/news-rag-db"
23
+ MAX_ARTICLES_PER_FEED = 1000
24
+ RAW_FEEDS_DIR = "raw_rss_feeds"
25
+
26
+ def initialize_hf_api():
27
+ if not HF_API_TOKEN:
28
+ logger.error("Hugging Face API token (HF_TOKEN) not set.")
29
+ raise ValueError("HF_TOKEN environment variable is not set.")
30
+ try:
31
+ login(token=HF_API_TOKEN)
32
+ return HfApi()
33
+ except Exception as e:
34
+ logger.error(f"Failed to login to Hugging Face Hub: {e}")
35
+ raise
36
+ hf_api = initialize_hf_api()
37
+
38
+ def get_embedding_model():
39
+ if not hasattr(get_embedding_model, "model"):
40
+ get_embedding_model.model = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2")
41
+ return get_embedding_model.model
42
+
43
+ def clean_text(html_text):
44
+ """
45
+ Cleans HTML text by prioritizing content within <p> tags,
46
+ then falling back to stripping all HTML tags.
47
+ """
48
+ if not html_text or not isinstance(html_text, str):
49
+ return ""
50
+ # If <p> tags are present, extract their content
51
+ if '<p>' in html_text.lower():
52
+ p_contents = re.findall(r'<p>(.*?)</p>', html_text, re.DOTALL | re.IGNORECASE)
53
+ if p_contents:
54
+ # Join the content of all p tags and then strip any remaining inner HTML tags
55
+ text = ' '.join(p_contents)
56
+ text = re.sub(r'<.*?>', '', text) # Cleans tags like <i>, <a>
57
+ return ' '.join(text.split()).strip()
58
+
59
+ # Fallback for descriptions without <p> tags or if regex fails
60
+ text = re.sub(r'<.*?>', '', html_text)
61
+ return ' '.join(text.split()).strip()
62
+
63
+ def save_raw_rss_to_file(feed_url, content):
64
+ if not os.path.exists(RAW_FEEDS_DIR):
65
+ os.makedirs(RAW_FEEDS_DIR)
66
+
67
+ filename = re.sub(r'[^a-zA-Z0-9]', '_', feed_url) + ".xml"
68
+ filepath = os.path.join(RAW_FEEDS_DIR, filename)
69
+
70
+ try:
71
+ with open(filepath, 'w', encoding='utf-8') as f:
72
+ f.write(content)
73
+ logger.info(f"Saved raw RSS from {feed_url} to {filepath}")
74
+ except Exception as e:
75
+ logger.error(f"Could not save raw RSS from {feed_url}: {e}")
76
+
77
+ def fetch_rss_feeds():
78
+ articles = []
79
+ seen_links = set()
80
+
81
+ try:
82
+ with open(FEEDS_FILE, 'r') as f:
83
+ feed_categories = json.load(f)
84
+ except FileNotFoundError:
85
+ logger.error(f"{FEEDS_FILE} not found. No feeds to process.")
86
+ return []
87
+
88
+ for category, feeds in feed_categories.items():
89
+ for feed_info in feeds:
90
+ feed_url = feed_info.get("url")
91
+ if not feed_url:
92
+ logger.warning(f"Skipping feed with no URL in category '{category}'")
93
+ continue
94
+
95
+ try:
96
+ logger.info(f"Fetching {feed_url}")
97
+ response = requests.get(feed_url, headers={'User-Agent': 'Mozilla/5.0'})
98
+ response.raise_for_status()
99
+ raw_content = response.text
100
+ save_raw_rss_to_file(feed_url, raw_content)
101
+
102
+ feed = feedparser.parse(raw_content)
103
+ if feed.bozo:
104
+ logger.warning(f"Parse error for {feed_url}: {feed.bozo_exception}")
105
+ continue
106
+
107
+ for entry in feed.entries[:MAX_ARTICLES_PER_FEED]:
108
+ link = entry.get("link", "")
109
+ if not link or link in seen_links:
110
+ continue
111
+
112
+ seen_links.add(link)
113
+
114
+ title = entry.get("title", "No Title")
115
+
116
+ # Prioritize content:encoded, then summary, then description
117
+ description_raw = ""
118
+ if 'content' in entry and entry.content:
119
+ description_raw = entry.content[0].get('value', '')
120
+ if not description_raw:
121
+ description_raw = entry.get("summary", entry.get("description", ""))
122
+
123
+ description = clean_text(description_raw)
124
+
125
+ if not description:
126
+ continue
127
+
128
+ # Expanded date fields to check
129
+ published_str = "Unknown Date"
130
+ for date_field in ["published", "updated", "created", "pubDate", "dc:date"]:
131
+ if date_field in entry:
132
+ try:
133
+ parsed_date = dateutil.parser.parse(entry[date_field])
134
+ published_str = parsed_date.isoformat()
135
+ break
136
+ except (ValueError, TypeError, AttributeError):
137
+ continue
138
+
139
+ # Prioritized and expanded image sources
140
+ image = "svg" # Default fallback image
141
+ image_sources = [
142
+ lambda e: e.get("media_thumbnail", [{}])[0].get("url") if e.get("media_thumbnail") else None,
143
+ lambda e: e.get("media_content", [{}])[0].get("url") if e.get("media_content") else None,
144
+ lambda e: e.get("enclosure", {}).get("url") if e.get("enclosure") and e.get("enclosure", {}).get('type', '').startswith('image') else None,
145
+ lambda e: next((lnk.get("href") for lnk in e.get("links", []) if lnk.get("type", "").startswith("image")), None),
146
+ ]
147
+ for source_func in image_sources:
148
+ try:
149
+ img_url = source_func(entry)
150
+ if img_url and isinstance(img_url, str) and img_url.strip():
151
+ image = img_url
152
+ break
153
+ except (IndexError, AttributeError, TypeError):
154
+ continue
155
+
156
+ articles.append({
157
+ "title": title,
158
+ "link": link,
159
+ "description": description,
160
+ "published": published_str,
161
+ "category": category,
162
+ "image": image,
163
+ })
164
+ except requests.exceptions.RequestException as e:
165
+ logger.error(f"Error fetching {feed_url}: {e}")
166
+ except Exception as e:
167
+ logger.error(f"Error processing {feed_url}: {e}")
168
+
169
+ logger.info(f"Total unique articles fetched: {len(articles)}")
170
+ return articles
171
+
172
+ def process_and_store_articles(articles):
173
+ if not os.path.exists(LOCAL_DB_DIR):
174
+ os.makedirs(LOCAL_DB_DIR)
175
+
176
+ client = PersistentClient(path=LOCAL_DB_DIR)
177
+ collection = client.get_or_create_collection(name=COLLECTION_NAME)
178
+
179
+ try:
180
+ existing_ids = set(collection.get(include=[])["ids"])
181
+ logger.info(f"Loaded {len(existing_ids)} existing document IDs from {LOCAL_DB_DIR}.")
182
+ except Exception:
183
+ logger.info("No existing DB found or it is empty. Starting fresh.")
184
+ existing_ids = set()
185
+
186
+ contents_to_add = []
187
+ metadatas_to_add = []
188
+ ids_to_add = []
189
+
190
+ for article in articles:
191
+ if not article.get('link'):
192
+ continue
193
+
194
+ doc_id = hashlib.sha256(article['link'].encode('utf-8')).hexdigest()
195
+
196
+ if doc_id in existing_ids:
197
+ continue
198
+
199
+ metadata = {
200
+ "title": article["title"],
201
+ "link": article["link"],
202
+ "published": article["published"],
203
+ "category": article["category"],
204
+ "image": article["image"],
205
+ }
206
+
207
+ contents_to_add.append(article["description"])
208
+ metadatas_to_add.append(metadata)
209
+ ids_to_add.append(doc_id)
210
+
211
+ if ids_to_add:
212
+ logger.info(f"Found {len(ids_to_add)} new articles to add to the database.")
213
+ try:
214
+ embedding_model = get_embedding_model()
215
+ embeddings_to_add = embedding_model.embed_documents(contents_to_add)
216
+
217
+ collection.add(
218
+ embeddings=embeddings_to_add,
219
+ documents=contents_to_add,
220
+ metadatas=metadatas_to_add,
221
+ ids=ids_to_add
222
+ )
223
+ logger.info(f"Successfully added {len(ids_to_add)} new articles to DB. Total in DB: {collection.count()}")
224
+ except Exception as e:
225
+ logger.error(f"Error storing articles in ChromaDB: {e}", exc_info=True)
226
+ else:
227
+ logger.info("No new articles to add to the database.")
228
+
229
+ def download_from_hf_hub():
230
+ if not os.path.exists(os.path.join(LOCAL_DB_DIR, "chroma.sqlite3")):
231
+ try:
232
+ logger.info(f"Downloading Chroma DB from {REPO_ID} to {LOCAL_DB_DIR}...")
233
+ snapshot_download(
234
+ repo_id=REPO_ID,
235
+ repo_type="dataset",
236
+ local_dir=".",
237
+ local_dir_use_symlinks=False,
238
+ allow_patterns=[f"{LOCAL_DB_DIR}/**"],
239
+ token=HF_API_TOKEN
240
+ )
241
+ logger.info("Finished downloading DB.")
242
+ except Exception as e:
243
+ logger.warning(f"Could not download from Hugging Face Hub (this is normal on first run): {e}")
244
+ else:
245
+ logger.info(f"Local Chroma DB found at '{LOCAL_DB_DIR}', skipping download.")
246
+
247
+ def upload_to_hf_hub():
248
+ commit_message = f"Update RSS news database and raw feeds {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
249
+
250
+ if os.path.exists(LOCAL_DB_DIR):
251
+ try:
252
+ logger.info(f"Uploading updated Chroma DB '{LOCAL_DB_DIR}' to {REPO_ID}...")
253
+ hf_api.upload_folder(
254
+ folder_path=LOCAL_DB_DIR, path_in_repo=LOCAL_DB_DIR, repo_id=REPO_ID,
255
+ repo_type="dataset", commit_message=commit_message, ignore_patterns=["*.bak", "*.tmp"]
256
+ )
257
+ logger.info(f"Database folder '{LOCAL_DB_DIR}' uploaded to: {REPO_ID}")
258
+ except Exception as e:
259
+ logger.error(f"Error uploading Chroma DB to Hugging Face Hub: {e}", exc_info=True)
260
+
261
+ if os.path.exists(RAW_FEEDS_DIR):
262
+ try:
263
+ logger.info(f"Uploading raw RSS feeds from '{RAW_FEEDS_DIR}' to {REPO_ID}...")
264
+ hf_api.upload_folder(
265
+ folder_path=RAW_FEEDS_DIR, path_in_repo=RAW_FEEDS_DIR, repo_id=REPO_ID,
266
+ repo_type="dataset", commit_message=commit_message
267
+ )
268
+ logger.info(f"Raw feeds folder '{RAW_FEEDS_DIR}' uploaded to: {REPO_ID}")
269
+ except Exception as e:
270
+ logger.error(f"Error uploading raw feeds to Hugging Face Hub: {e}", exc_info=True)
271
+
272
+ def main():
273
+ try:
274
+ download_from_hf_hub()
275
+ articles_to_process = fetch_rss_feeds()
276
+ if articles_to_process:
277
+ process_and_store_articles(articles_to_process)
278
+ upload_to_hf_hub()
279
+ else:
280
+ logger.info("No articles fetched, skipping database processing and upload.")
281
+ except Exception as e:
282
+ logger.critical(f"An unhandled error occurred in main execution: {e}", exc_info=True)
283
+
284
+ if __name__ == "__main__":
285
+ main()