Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI | |
| from pydantic import BaseModel | |
| from playwright.async_api import async_playwright | |
| from bs4 import BeautifulSoup | |
| import logging | |
| import re | |
| app = FastAPI() | |
| logging.basicConfig(level=logging.INFO) | |
| class RedirectRequest(BaseModel): | |
| url: str | |
| async def resolve_redirect(data: RedirectRequest): | |
| try: | |
| async with async_playwright() as p: | |
| browser = await p.chromium.launch(headless=True) | |
| context = await browser.new_context() | |
| page = await context.new_page() | |
| # Step 1: Start navigation to the RSS link | |
| await page.goto(data.url, wait_until="domcontentloaded", timeout=15000) | |
| # Step 2: Wait for navigation to a non-Google domain | |
| try: | |
| await page.wait_for_url(re.compile(r"^(?!.*news\.google\.com).*"), timeout=10000) | |
| except: | |
| pass # fallback if no hard redirect happened | |
| final_url = page.url | |
| await browser.close() | |
| return {"final_url": final_url} | |
| except Exception as e: | |
| logging.error("Redirect resolution failed", exc_info=True) | |
| return {"error": str(e)} | |
| class ScrapeRequest(BaseModel): | |
| url: str | |
| async def scrape_page(data: ScrapeRequest): | |
| try: | |
| async with async_playwright() as p: | |
| browser = await p.chromium.launch(headless=True) | |
| context = await browser.new_context() | |
| page = await context.new_page() | |
| await page.goto(data.url, wait_until="domcontentloaded", timeout=40000) | |
| # Extract visible text using JS walker for generalized coverage | |
| text = await page.evaluate(""" | |
| () => { | |
| const walker = document.createTreeWalker(document.body, NodeFilter.SHOW_TEXT, { | |
| acceptNode: node => { | |
| const style = window.getComputedStyle(node.parentElement || {}); | |
| return style && style.display !== 'none' && style.visibility !== 'hidden' ? NodeFilter.FILTER_ACCEPT : NodeFilter.FILTER_REJECT; | |
| } | |
| }); | |
| let text = ''; | |
| while (walker.nextNode()) { | |
| text += walker.currentNode.textContent + '\\n'; | |
| } | |
| return text.trim(); | |
| } | |
| """) | |
| # Get links | |
| links = await page.eval_on_selector_all( | |
| "a[href]", | |
| """els => els.map(el => ({ | |
| text: el.innerText.trim(), | |
| href: el.href | |
| }))""" | |
| ) | |
| await browser.close() | |
| return { | |
| "final_url": page.url, | |
| "text": text if text else "No visible content found.", | |
| "links": links | |
| } | |
| except Exception as e: | |
| logging.error("Scraping failed", exc_info=True) | |
| return {"error": str(e)} | |