Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| Web Insight Chatbot - HuggingFace Spaces Version | |
| This is a modified version of the chatbot for deployment on HuggingFace Spaces. | |
| It removes the MCP functionality while keeping the web search and insight generation. | |
| """ | |
| import gradio as gr | |
| import time | |
| import random | |
| import requests | |
| from bs4 import BeautifulSoup | |
| from duckduckgo_search import DDGS | |
| from together import Together | |
| import os | |
| # Set API key - this will be set as a secret in HuggingFace Spaces | |
| # You'll need to add this in your Space's Settings > Repository secrets | |
| TOGETHER_API_KEY = os.environ.get("TOGETHER_API_KEY", "your_default_key") | |
| # Initialize LLM client | |
| client = Together(api_key=TOGETHER_API_KEY) | |
| # Global variables to store search results and content | |
| search_results = [] | |
| extracted_content = [] | |
| def search_duckduckgo(query, max_results=3, max_retries=5, base_delay=10): | |
| """ | |
| Search DuckDuckGo and return the top results with rate limiting and retry logic | |
| """ | |
| print(f"Searching DuckDuckGo for: '{query}'") | |
| for attempt in range(max_retries): | |
| try: | |
| # Add longer random delay to avoid rate limiting | |
| delay = base_delay + random.uniform(5, 15) | |
| print(f"Attempt {attempt + 1}/{max_retries}: Waiting {delay:.1f} seconds before request...") | |
| time.sleep(delay) | |
| with DDGS() as ddgs: | |
| # Try different search methods | |
| if attempt % 2 == 0: | |
| # Try text search | |
| print("Trying text search...") | |
| results = list(ddgs.text(query, max_results=max_results)) | |
| else: | |
| # Try news search as alternative | |
| print("Trying news search...") | |
| results = list(ddgs.news(query, max_results=max_results)) | |
| if results: | |
| return results | |
| else: | |
| print("No results found, trying alternative method...") | |
| except Exception as e: | |
| error_msg = str(e) | |
| print(f"Attempt {attempt + 1}/{max_retries}: Error searching DuckDuckGo: {error_msg}") | |
| # If it's a rate limit error, wait much longer before retrying | |
| if "202" in error_msg or "ratelimit" in error_msg.lower(): | |
| wait_time = base_delay * (attempt + 1) * 3 # More aggressive exponential backoff | |
| print(f"Rate limit detected. Waiting {wait_time} seconds before retry...") | |
| time.sleep(wait_time) | |
| else: | |
| # For other errors, wait the normal delay | |
| if attempt < max_retries - 1: # Don't wait on the last attempt | |
| wait_time = base_delay + random.uniform(5, 10) | |
| print(f"Other error detected. Waiting {wait_time:.1f} seconds before retry...") | |
| time.sleep(wait_time) | |
| print("All retry attempts failed. Please try again later.") | |
| return [] | |
| def search_with_fallback(query, max_results=3): | |
| """ | |
| Alternative search function with multiple fallback strategies | |
| """ | |
| print("Trying alternative search strategies...") | |
| # Strategy 1: Try with very long delays | |
| print("\nStrategy 1: Extended delays...") | |
| results = search_duckduckgo(query, max_results=max_results, max_retries=3, base_delay=30) | |
| if results: | |
| return results | |
| # Strategy 2: Try with different user agent simulation | |
| print("\nStrategy 2: Different search approach...") | |
| try: | |
| with DDGS() as ddgs: | |
| # Try instant answers as fallback | |
| print("Trying instant answers...") | |
| results = list(ddgs.answers(query, max_results=max_results)) | |
| if results: | |
| return results | |
| except Exception as e: | |
| print(f"Instant answers failed: {e}") | |
| return [] | |
| def extract_content_from_url(url): | |
| """ | |
| Retrieve and extract content from a URL using BeautifulSoup | |
| """ | |
| try: | |
| # Add headers to mimic a browser request | |
| headers = { | |
| "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" | |
| } | |
| response = requests.get(url, headers=headers, timeout=10) | |
| response.raise_for_status() | |
| # Parse the HTML content | |
| soup = BeautifulSoup(response.content, "html.parser") | |
| # Remove script and style elements | |
| for script in soup(["script", "style", "nav", "footer", "header", "aside"]): | |
| script.decompose() | |
| # Extract text from the main content | |
| # First try to find the main content area | |
| main_content = soup.find("main") or soup.find("article") or soup.find("div", {"id": "content"}) or soup.find("div", {"class": "content"}) | |
| if main_content: | |
| paragraphs = main_content.find_all(["p", "h1", "h2", "h3", "h4", "h5", "h6"]) | |
| else: | |
| # If no main content area is found, extract from the entire document | |
| paragraphs = soup.find_all(["p", "h1", "h2", "h3", "h4", "h5", "h6"]) | |
| extracted_text = [] | |
| for element in paragraphs: | |
| text = element.get_text(strip=True) | |
| if text and len(text) > 30: # Only include substantial paragraphs | |
| extracted_text.append(text) | |
| # Join the paragraphs with newlines | |
| content = "\n\n".join(extracted_text) | |
| # Limit the content to avoid excessively long responses | |
| if len(content) > 8000: | |
| content = content[:8000] + "... [content truncated]" | |
| # If content is still empty, try getting all text as a fallback | |
| if not content: | |
| content = soup.get_text(separator="\n\n", strip=True) | |
| content = "\n\n".join([p for p in content.split("\n\n") if len(p) > 30]) | |
| if len(content) > 8000: | |
| content = content[:8000] + "... [content truncated]" | |
| return content | |
| except requests.RequestException as e: | |
| return f"Error retrieving content: {e}" | |
| except Exception as e: | |
| return f"Error parsing content: {e}" | |
| def synthesize_insights(query, content_list): | |
| """ | |
| Use an LLM to synthesize insights from the web content | |
| """ | |
| # Combine the content with source information | |
| combined_content = "" | |
| for i, (title, url, content) in enumerate(content_list, 1): | |
| combined_content += f"Source {i}: {title}\nURL: {url}\n\n{content}\n\n---\n\n" | |
| # Truncate if too long | |
| if len(combined_content) > 15000: | |
| combined_content = combined_content[:15000] + "... [content truncated]" | |
| # Create a prompt for the LLM | |
| prompt = f""" | |
| You are a research assistant with expertise in analyzing information and providing valuable insights. | |
| USER QUERY: {query} | |
| WEB CONTENT: | |
| {combined_content} | |
| Based on the above web content, provide a concise and informative analysis that: | |
| 1. Synthesizes the most relevant information related to the query | |
| 2. Identifies 3-5 key insights from the sources | |
| 3. Notes any conflicting information or perspectives if present | |
| 4. Provides a brief conclusion or recommendation if applicable | |
| Format your response in a clear, structured way with headings for each section. | |
| Focus on providing accurate, valuable insights rather than general information. | |
| Keep your response focused, informative, and under 400 words. | |
| """ | |
| # Call the LLM API | |
| try: | |
| response = client.chat.completions.create( | |
| model="meta-llama/Llama-3.3-70B-Instruct-Turbo", # Using Llama 3 Turbo model | |
| messages=[{"role": "user", "content": prompt}], | |
| max_tokens=1200, | |
| temperature=0.7, | |
| ) | |
| return response.choices[0].message.content | |
| except Exception as e: | |
| print(f"Error calling LLM API: {e}") | |
| return f"Error generating insights: {e}" | |
| def process_query(query): | |
| """ | |
| Process a user query: search web, extract content, synthesize insights | |
| Args: | |
| query (str): The user query | |
| Returns: | |
| tuple: (insights, search_results) | |
| """ | |
| global search_results, extracted_content | |
| # Check if query is empty | |
| if not query.strip(): | |
| return "Please enter a question to search for.", None | |
| # Step 1: Search the web | |
| results = search_duckduckgo(query, max_results=3) | |
| if not results: | |
| results = search_with_fallback(query, max_results=3) | |
| search_results = results | |
| if not results: | |
| return "I couldn't find any relevant information on the web. Please try a different query.", None | |
| # Step 2: Extract content from search results | |
| content_list = [] | |
| for result in results: | |
| title = result.get('title', 'No title') | |
| url = result.get('link', 'No URL') | |
| print(f"Extracting content from: {url}") | |
| content = extract_content_from_url(url) | |
| content_list.append((title, url, content)) | |
| extracted_content = content_list | |
| # Step 3: Synthesize insights using LLM | |
| insights = synthesize_insights(query, content_list) | |
| return insights, search_results | |
| def create_interface(): | |
| """Create the Gradio interface""" | |
| with gr.Blocks(title="Web Insight Chatbot") as demo: | |
| gr.Markdown(""" | |
| <div style='text-align: center; margin-bottom: 10px'> | |
| <h1>🌐 Web Insight Chatbot 🤖</h1> | |
| <p>Ask me anything, and I'll search the web and synthesize insights for you!</p> | |
| </div> | |
| """) | |
| with gr.Row(): | |
| with gr.Column(scale=3): | |
| query_input = gr.Textbox( | |
| label="Your Question", | |
| placeholder="What would you like to know about?", | |
| lines=2 | |
| ) | |
| with gr.Row(): | |
| submit_btn = gr.Button("Get Insights", variant="primary") | |
| clear_btn = gr.Button("Clear", variant="secondary") | |
| with gr.Column(scale=4): | |
| insights_output = gr.Markdown( | |
| label="Insights", | |
| value="Insights will appear here" | |
| ) | |
| with gr.Accordion("Search Results", open=False): | |
| search_results_output = gr.JSON(label="Search Results") | |
| # Define events | |
| def on_submit(query): | |
| if not query.strip(): | |
| return "Please enter a question.", None | |
| insights, results = process_query(query) | |
| # Return the search results for display | |
| return insights, results | |
| def on_clear(): | |
| return "", "Insights will appear here", None | |
| # Connect events | |
| submit_btn.click( | |
| on_submit, | |
| inputs=[query_input], | |
| outputs=[insights_output, search_results_output] | |
| ) | |
| clear_btn.click( | |
| on_clear, | |
| outputs=[query_input, insights_output, search_results_output] | |
| ) | |
| # Example queries | |
| gr.Examples( | |
| examples=[ | |
| "What are the latest advancements in renewable energy?", | |
| "How does artificial intelligence impact healthcare?", | |
| "What are the best practices for cybersecurity in 2025?", | |
| "What are the current trends in remote work?", | |
| "How is climate change affecting global agriculture?", | |
| ], | |
| inputs=query_input, | |
| ) | |
| return demo | |
| # Create and launch the interface | |
| demo = create_interface() | |
| # Launch the app | |
| if __name__ == "__main__": | |
| demo.launch(share=True) | |