Spaces:
Sleeping
Sleeping
| from flask import Flask, request, jsonify, render_template, send_file | |
| from flask_cors import CORS | |
| import requests | |
| import json | |
| import time | |
| import pandas as pd | |
| from typing import Dict, List, Optional | |
| import pickle | |
| import os | |
| import sys | |
| import threading | |
| import tempfile | |
| import shutil | |
| from datetime import datetime | |
| import timeit | |
| import json | |
| import requests | |
| import os | |
| import time | |
| from tqdm import tqdm | |
| # Define 'toc' function once | |
| def toc(start_time): | |
| elapsed = timeit.default_timer() - start_time | |
| print(elapsed) | |
| # Record start time | |
| start_time = timeit.default_timer() | |
| # Helper function to get all pages | |
| def get_all_pages(url, headers, upper_limit=None): | |
| all_results = [] | |
| unique_ids = set() # Track unique paper IDs | |
| page = 1 | |
| processing_times = [] # Track time taken per paper | |
| # Get first page to get total count | |
| first_response = requests.get(f"{url}&page={page}", headers=headers) | |
| if first_response.status_code != 200: | |
| return [] | |
| data = first_response.json() | |
| total_count = data.get('meta', {}).get('count', 0) | |
| start_time = time.time() | |
| # Add only unique papers from first page | |
| for result in data.get('results', []): | |
| if result.get('id') not in unique_ids: | |
| unique_ids.add(result.get('id')) | |
| all_results.append(result) | |
| if upper_limit and len(all_results) >= upper_limit: | |
| return all_results | |
| papers_processed = len(all_results) | |
| time_taken = time.time() - start_time | |
| if papers_processed > 0: | |
| processing_times.append(time_taken / papers_processed) | |
| # Continue getting remaining pages until we have all papers | |
| target_count = min(total_count, upper_limit) if upper_limit else total_count | |
| pbar = tqdm(total=target_count, desc="Retrieving papers", | |
| initial=len(all_results), unit="papers") | |
| while len(all_results) < total_count: | |
| page += 1 | |
| page_start_time = time.time() | |
| paged_url = f"{url}&page={page}" | |
| response = requests.get(paged_url, headers=headers) | |
| if response.status_code != 200: | |
| print(f"Error retrieving page {page}: {response.status_code}") | |
| break | |
| data = response.json() | |
| results = data.get('results', []) | |
| if not results: | |
| break | |
| # Add only unique papers from this page | |
| new_papers = 0 | |
| for result in results: | |
| if result.get('id') not in unique_ids: | |
| unique_ids.add(result.get('id')) | |
| all_results.append(result) | |
| new_papers += 1 | |
| if upper_limit and len(all_results) >= upper_limit: | |
| pbar.update(new_papers) | |
| pbar.close() | |
| return all_results | |
| # Update processing times and estimated time remaining | |
| if new_papers > 0: | |
| time_taken = time.time() - page_start_time | |
| processing_times.append(time_taken / new_papers) | |
| avg_time_per_paper = sum(processing_times) / len(processing_times) | |
| papers_remaining = target_count - len(all_results) | |
| est_time_remaining = papers_remaining * avg_time_per_paper | |
| pbar.set_postfix({'Est. Time Remaining': f'{est_time_remaining:.1f}s'}) | |
| pbar.update(new_papers) | |
| # Add a small delay to respect rate limits | |
| time.sleep(1) | |
| pbar.close() | |
| return all_results | |
| def get_related_papers(work_id, upper_limit=None, progress_callback=None): | |
| # Define base URL for OpenAlex API | |
| base_url = "https://api.openalex.org/works" | |
| work_query = f"/{work_id}" # OpenAlex work IDs can be used directly in path | |
| work_url = base_url + work_query | |
| # Add email to be a polite API user | |
| headers = {'User-Agent': 'LowAI ([email protected])'} | |
| response = requests.get(work_url, headers=headers) | |
| print(response) | |
| if response.status_code == 200: | |
| paper = response.json() # For direct work queries, the response is the paper object | |
| paper_id = paper['id'] | |
| # Use referenced_works field on the seed work directly for cited papers | |
| referenced_ids = paper.get('referenced_works', []) or [] | |
| print("\nTotal counts:") | |
| print(f"Cited (referenced_works) count: {len(referenced_ids)}") | |
| def fetch_works_by_ids(ids, chunk_size=50): | |
| results = [] | |
| seen = set() | |
| total_chunks = (len(ids) + chunk_size - 1) // chunk_size | |
| for i in range(0, len(ids), chunk_size): | |
| chunk = ids[i:i+chunk_size] | |
| # Build ids filter: ids.openalex:ID1|ID2|ID3 | |
| ids_filter = '|'.join(chunk) | |
| url = f"{base_url}?filter=ids.openalex:{ids_filter}&per-page=200" | |
| resp = requests.get(url, headers=headers) | |
| if resp.status_code != 200: | |
| print(f"Error fetching IDs chunk {i//chunk_size+1}: {resp.status_code}") | |
| continue | |
| data = resp.json() | |
| for r in data.get('results', []): | |
| rid = r.get('id') | |
| if rid and rid not in seen: | |
| seen.add(rid) | |
| results.append(r) | |
| # Update progress for cited papers (0-30%) | |
| if progress_callback: | |
| progress = int(30 * (i // chunk_size + 1) / total_chunks) | |
| progress_callback(progress, f"Fetching cited papers... {len(results)} found") | |
| time.sleep(1) # be polite to API | |
| if upper_limit and len(results) >= upper_limit: | |
| return results[:upper_limit] | |
| return results | |
| print("\nRetrieving cited papers via referenced_works IDs...") | |
| cited_papers = fetch_works_by_ids(referenced_ids) | |
| print(f"Found {len(cited_papers)} unique cited papers") | |
| # Count citing papers (works that cite the seed), then paginate to collect all | |
| citing_count_url = f"{base_url}?filter=cites:{work_id}&per-page=1" | |
| citing_count = requests.get(citing_count_url, headers=headers).json().get('meta', {}).get('count', 0) | |
| print(f"Citing papers: {citing_count}") | |
| # Get all citing papers with pagination | |
| print("\nRetrieving citing papers (paginated)...") | |
| page = 1 | |
| citing_papers = [] | |
| unique_ids = set() | |
| target = citing_count if not upper_limit else min(upper_limit, citing_count) | |
| from tqdm import tqdm | |
| pbar = tqdm(total=target, desc="Retrieving citing papers", unit="papers") | |
| while len(citing_papers) < target: | |
| paged_url = f"{base_url}?filter=cites:{work_id}&per-page=200&sort=publication_date:desc&page={page}" | |
| resp = requests.get(paged_url, headers=headers) | |
| if resp.status_code != 200: | |
| print(f"Error retrieving citing page {page}: {resp.status_code}") | |
| break | |
| data = resp.json() | |
| results = data.get('results', []) | |
| if not results: | |
| break | |
| new = 0 | |
| for r in results: | |
| rid = r.get('id') | |
| if rid and rid not in unique_ids: | |
| unique_ids.add(rid) | |
| citing_papers.append(r) | |
| new += 1 | |
| if len(citing_papers) >= target: | |
| break | |
| # Update progress for citing papers (30-70%) | |
| if progress_callback: | |
| progress = 30 + int(40 * len(citing_papers) / target) | |
| progress_callback(progress, f"Fetching citing papers... {len(citing_papers)} found") | |
| pbar.update(new) | |
| page += 1 | |
| time.sleep(1) | |
| pbar.close() | |
| print(f"Found {len(citing_papers)} unique citing papers") | |
| # Get all related papers | |
| print("\nRetrieving related papers...") | |
| related_url = f"{base_url}?filter=related_to:{work_id}&per-page=200&sort=publication_date:desc" | |
| related_papers = get_all_pages(related_url, headers, upper_limit) | |
| print(f"Found {len(related_papers)} unique related papers") | |
| # Update progress for related papers (70-90%) | |
| if progress_callback: | |
| progress_callback(70, f"Fetching related papers... {len(related_papers)} found") | |
| # Create sets of IDs for quick lookup | |
| cited_ids = {paper['id'] for paper in cited_papers} | |
| citing_ids = {paper['id'] for paper in citing_papers} | |
| # Print some debug information | |
| print(f"\nDebug Information:") | |
| print(f"Seed paper ID: {paper_id}") | |
| print(f"Number of unique cited papers: {len(cited_ids)}") | |
| print(f"Number of unique citing papers: {len(citing_ids)}") | |
| print(f"Number of papers in both sets: {len(cited_ids.intersection(citing_ids))}") | |
| # Update progress for processing (90-95%) | |
| if progress_callback: | |
| progress_callback(90, "Processing and deduplicating papers...") | |
| # Combine all papers and remove duplicates while tracking relationship | |
| all_papers = cited_papers + citing_papers + related_papers | |
| seen_titles = set() | |
| unique_papers = [] | |
| for paper in all_papers: | |
| title = paper.get('title', '') | |
| if title not in seen_titles: | |
| seen_titles.add(title) | |
| # Add relationship type | |
| if paper['id'] in cited_ids: | |
| paper['relationship'] = 'cited' | |
| elif paper['id'] in citing_ids: | |
| paper['relationship'] = 'citing' | |
| else: | |
| paper['relationship'] = 'related' | |
| unique_papers.append(paper) | |
| # Final progress update | |
| if progress_callback: | |
| progress_callback(100, f"Collection completed! Found {len(unique_papers)} unique papers") | |
| return unique_papers | |
| else: | |
| print(f"Error retrieving seed paper: {response.status_code}") | |
| return [] | |
| import requests | |
| import json | |
| from typing import Dict, List, Optional | |
| from openai import OpenAI | |
| import concurrent.futures | |
| import threading | |
| import time | |
| def analyze_paper_relevance(content: Dict[str, str], research_question: str, api_key: str) -> Optional[Dict]: | |
| """Analyze if a paper is relevant to the research question using GPT-5 mini.""" | |
| client = OpenAI(api_key=api_key) | |
| title = content.get('title', '') | |
| abstract = content.get('abstract', '') | |
| has_abstract = bool(abstract and abstract.strip()) | |
| if has_abstract: | |
| prompt = f""" | |
| Research Question: {research_question} | |
| Paper Title: {title} | |
| Paper Abstract: {abstract} | |
| Analyze this paper and determine: | |
| 1. Is this paper highly relevant to answering the research question? | |
| 2. What are the main aims/objectives of this paper? | |
| 3. What are the key takeaways or findings? | |
| Return ONLY a valid JSON object in this exact format: | |
| {{ | |
| "relevant": true/false, | |
| "relevance_reason": "brief explanation of why it is/isn't relevant", | |
| "aims_of_paper": "main objectives of the paper", | |
| "key_takeaways": "key findings or takeaways" | |
| }} | |
| """ | |
| else: | |
| prompt = f""" | |
| Research Question: {research_question} | |
| Paper Title: {title} | |
| Note: No abstract is available for this paper. | |
| Analyze this paper based on the title only and determine: | |
| 1. Is this paper likely to be relevant to answering the research question based on the title? | |
| Return ONLY a valid JSON object in this exact format: | |
| {{ | |
| "relevant": true/false, | |
| "relevance_reason": "brief explanation of why it is/isn't relevant based on title" | |
| }} | |
| """ | |
| try: | |
| # Try GPT-5 mini first, fallback to gpt-4o-mini if it fails | |
| try: | |
| response = client.responses.create( | |
| model="gpt-5-mini", | |
| input=prompt, | |
| reasoning={"effort": "minimal"}, | |
| text={"verbosity": "low"} | |
| ) | |
| except Exception as e: | |
| print(f"GPT-5 mini failed, trying gpt-4o-mini: {e}") | |
| response = client.chat.completions.create( | |
| model="gpt-4o-mini", | |
| messages=[{ | |
| "role": "user", | |
| "content": prompt | |
| }], | |
| max_completion_tokens=1000 | |
| ) | |
| # Handle different response formats | |
| if hasattr(response, 'choices') and response.choices: | |
| # Old format (chat completions) | |
| result = response.choices[0].message.content | |
| elif hasattr(response, 'output'): | |
| # New format (responses) - extract text from output | |
| result = "" | |
| for item in response.output: | |
| if hasattr(item, "content") and item.content: | |
| for content in item.content: | |
| if hasattr(content, "text") and content.text: | |
| result += content.text | |
| else: | |
| print("Unexpected response format") | |
| return None | |
| if not result: | |
| print("Empty response from GPT") | |
| return None | |
| # Clean and parse the JSON response | |
| result = result.strip() | |
| if result.startswith("```json"): | |
| result = result[7:] | |
| if result.endswith("```"): | |
| result = result[:-3] | |
| # Try to parse JSON | |
| try: | |
| return json.loads(result.strip()) | |
| except json.JSONDecodeError as e: | |
| print(f"Failed to parse JSON response: {e}") | |
| print(f"Raw response: {result[:200]}...") | |
| return None | |
| except Exception as e: | |
| print(f"Error in GPT analysis: {str(e)}") | |
| return None | |
| def extract_abstract_from_inverted_index(inverted_index: Dict) -> str: | |
| """Extract abstract text from inverted index format.""" | |
| if not inverted_index: | |
| return "" | |
| words = [] | |
| for word, positions in inverted_index.items(): | |
| for pos in positions: | |
| while len(words) <= pos: | |
| words.append('') | |
| words[pos] = word | |
| return ' '.join(words).strip() | |
| def analyze_single_paper(paper: Dict, research_question: str, api_key: str) -> Optional[Dict]: | |
| """Analyze a single paper with its own client.""" | |
| try: | |
| client = OpenAI(api_key=api_key) | |
| # Extract title and abstract | |
| title = paper.get('title', '') | |
| abstract = extract_abstract_from_inverted_index(paper.get('abstract_inverted_index', {})) | |
| if not title and not abstract: | |
| return None | |
| # Create content for analysis | |
| content = { | |
| 'title': title, | |
| 'abstract': abstract | |
| } | |
| # Analyze with GPT | |
| analysis = analyze_paper_relevance_with_client(content, research_question, client) | |
| if analysis: | |
| paper['gpt_analysis'] = analysis | |
| paper['relevance_reason'] = analysis.get('relevance_reason', 'Analysis completed') | |
| paper['relevance_score'] = analysis.get('relevant', False) | |
| return paper | |
| return None | |
| except Exception as e: | |
| print(f"Error analyzing paper: {e}") | |
| return None | |
| def analyze_paper_batch(papers_batch: List[Dict], research_question: str, api_key: str, batch_id: int) -> List[Dict]: | |
| """Analyze a batch of papers in parallel using ThreadPoolExecutor.""" | |
| results = [] | |
| # Use ThreadPoolExecutor to process papers in parallel within the batch | |
| with concurrent.futures.ThreadPoolExecutor(max_workers=len(papers_batch)) as executor: | |
| # Submit all papers for parallel processing | |
| future_to_paper = { | |
| executor.submit(analyze_single_paper, paper, research_question, api_key): paper | |
| for paper in papers_batch | |
| } | |
| # Collect results as they complete | |
| for future in concurrent.futures.as_completed(future_to_paper): | |
| try: | |
| result = future.result() | |
| if result: | |
| results.append(result) | |
| except Exception as e: | |
| print(f"Error in parallel analysis: {e}") | |
| continue | |
| return results | |
| def analyze_paper_relevance_with_client(content: Dict[str, str], research_question: str, client: OpenAI) -> Optional[Dict]: | |
| """Analyze if a paper is relevant to the research question using provided client.""" | |
| title = content.get('title', '') | |
| abstract = content.get('abstract', '') | |
| prompt = f""" | |
| Research Question: {research_question} | |
| Paper Title: {title} | |
| Paper Abstract: {abstract or 'No abstract available'} | |
| Analyze this paper and determine: | |
| 1. Is this paper highly relevant to answering the research question? | |
| 2. What are the main aims/objectives of this paper? | |
| 3. What are the key takeaways or findings? | |
| Return ONLY a valid JSON object in this exact format: | |
| {{ | |
| "relevant": true/false, | |
| "relevance_reason": "brief explanation of why it is/isn't relevant", | |
| "aims_of_paper": "main objectives of the paper", | |
| "key_takeaways": "key findings or takeaways" | |
| }} | |
| """ | |
| try: | |
| # Try GPT-5 nano first, fallback to gpt-4o-mini if it fails | |
| try: | |
| response = client.responses.create( | |
| model="gpt-5-nano", | |
| input=prompt, | |
| reasoning={"effort": "minimal"}, | |
| text={"verbosity": "low"} | |
| ) | |
| except Exception as e: | |
| response = client.chat.completions.create( | |
| model="gpt-4o-mini", | |
| messages=[{ | |
| "role": "user", | |
| "content": prompt | |
| }], | |
| max_completion_tokens=1000 | |
| ) | |
| # Handle different response formats | |
| if hasattr(response, 'choices') and response.choices: | |
| # Old format (chat completions) | |
| result = response.choices[0].message.content | |
| elif hasattr(response, 'output'): | |
| # New format (responses) - extract text from output | |
| result = "" | |
| for item in response.output: | |
| if hasattr(item, "content") and item.content: | |
| for content in item.content: | |
| if hasattr(content, "text") and content.text: | |
| result += content.text | |
| else: | |
| return None | |
| if not result: | |
| return None | |
| # Clean and parse the JSON response | |
| result = result.strip() | |
| if result.startswith("```json"): | |
| result = result[7:] | |
| if result.endswith("```"): | |
| result = result[:-3] | |
| # Try to parse JSON | |
| try: | |
| return json.loads(result.strip()) | |
| except json.JSONDecodeError: | |
| return None | |
| except Exception as e: | |
| return None | |
| def filter_papers_for_research_question(papers: List[Dict], research_question: str, api_key: str, limit: int = 10) -> List[Dict]: | |
| """Analyze exactly 'limit' number of papers for relevance using parallel processing.""" | |
| if not papers or not research_question: | |
| return [] | |
| # Sort papers by publication date (most recent first) | |
| sorted_papers = sorted(papers, key=lambda x: x.get('publication_date', ''), reverse=True) | |
| # Take only the first 'limit' papers for analysis | |
| papers_to_analyze = sorted_papers[:limit] | |
| print(f"Analyzing {len(papers_to_analyze)} papers for relevance to: {research_question}") | |
| # Process all papers in parallel (no batching needed for small numbers) | |
| all_results = [] | |
| with concurrent.futures.ThreadPoolExecutor(max_workers=min(limit, 20)) as executor: | |
| # Submit all papers for parallel processing | |
| future_to_paper = { | |
| executor.submit(analyze_single_paper, paper, research_question, api_key): paper | |
| for paper in papers_to_analyze | |
| } | |
| # Collect results as they complete | |
| completed = 0 | |
| for future in concurrent.futures.as_completed(future_to_paper): | |
| try: | |
| result = future.result() | |
| completed += 1 | |
| if result: | |
| all_results.append(result) | |
| print(f"Completed {completed}/{len(papers_to_analyze)} papers") | |
| except Exception as e: | |
| print(f"Error in parallel analysis: {e}") | |
| completed += 1 | |
| # Sort by publication date again (most recent first) | |
| all_results.sort(key=lambda x: x.get('publication_date', ''), reverse=True) | |
| print(f"Analysis complete. Processed {len(all_results)} papers.") | |
| return all_results | |
| import requests | |
| import re | |
| import html | |
| # Try to import BeautifulSoup, fallback to simple parsing if not available | |
| try: | |
| from bs4 import BeautifulSoup | |
| HAS_BS4 = True | |
| except ImportError: | |
| HAS_BS4 = False | |
| print("BeautifulSoup not available, using simple HTML parsing") | |
| app = Flask(__name__) | |
| CORS(app) | |
| # Configuration: read from environment (set in HF Space Secrets) | |
| OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "").strip() | |
| if not OPENAI_API_KEY: | |
| print("[WARN] OPENAI_API_KEY is not set. Set it in Space Settings → Secrets.") | |
| # Global progress tracking | |
| progress_data = {} | |
| # Determine script directory and robust project root | |
| SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) | |
| ROOT_DIR = os.path.dirname(SCRIPT_DIR) if os.path.basename(SCRIPT_DIR) == "code" else SCRIPT_DIR | |
| # Ensure we can import helper modules (prefer repo root; fallback to ./code) | |
| CODE_DIR_CANDIDATE = os.path.join(ROOT_DIR, "code") | |
| CODE_DIR = CODE_DIR_CANDIDATE if os.path.isdir(CODE_DIR_CANDIDATE) else ROOT_DIR | |
| if CODE_DIR not in sys.path: | |
| sys.path.insert(0, CODE_DIR) | |
| # Database directories: prefer repo-root `database/` when present; fallback to CODE_DIR/database | |
| DATABASE_DIR_ROOT = os.path.join(ROOT_DIR, "database") | |
| DATABASE_DIR = DATABASE_DIR_ROOT if os.path.isdir(DATABASE_DIR_ROOT) else os.path.join(CODE_DIR, "database") | |
| COLLECTION_DB_DIR = os.path.join(DATABASE_DIR, "collections") | |
| FILTER_DB_DIR = os.path.join(DATABASE_DIR, "filters") | |
| # Ensure database directories exist | |
| os.makedirs(COLLECTION_DB_DIR, exist_ok=True) | |
| os.makedirs(FILTER_DB_DIR, exist_ok=True) | |
| def ensure_db_dirs() -> None: | |
| """Ensure database directories exist (safe to call anytime).""" | |
| try: | |
| os.makedirs(COLLECTION_DB_DIR, exist_ok=True) | |
| os.makedirs(FILTER_DB_DIR, exist_ok=True) | |
| except Exception: | |
| pass | |
| # Robust HTTP headers for publisher sites | |
| DEFAULT_HTTP_HEADERS = { | |
| 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0 Safari/537.36', | |
| 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', | |
| 'Accept-Language': 'en-US,en;q=0.9', | |
| 'Cache-Control': 'no-cache', | |
| } | |
| def _http_get(url: str, timeout: int = 15) -> Optional[requests.Response]: | |
| try: | |
| resp = requests.get(url, headers=DEFAULT_HTTP_HEADERS, timeout=timeout, allow_redirects=True) | |
| return resp | |
| except Exception as e: | |
| print(f"HTTP GET failed for {url}: {e}") | |
| return None | |
| def fetch_abstract_from_doi(doi: str) -> Optional[str]: | |
| """Fetch abstract/highlights from a DOI URL with a robust, layered strategy.""" | |
| if not doi: | |
| return None | |
| # Normalize DOI | |
| doi_clean = doi.replace('https://doi.org/', '').strip() | |
| # 1) Crossref (fast, sometimes JATS) | |
| try: | |
| text = fetch_from_crossref(doi_clean) | |
| if text and len(text) > 50: | |
| return text | |
| except Exception as e: | |
| print(f"Crossref fetch failed: {e}") | |
| # 2) Fetch target HTML via doi.org redirect | |
| try: | |
| start_url = f"https://doi.org/{doi_clean}" | |
| resp = _http_get(start_url, timeout=15) | |
| if not resp or resp.status_code >= 400: | |
| return None | |
| html_text = resp.text or '' | |
| final_url = getattr(resp, 'url', start_url) | |
| print(f"Resolved DOI to: {final_url}") | |
| # Parse with robust pipeline | |
| parsed = robust_extract_abstract(html_text) | |
| if parsed and len(parsed) > 50: | |
| return parsed | |
| except Exception as e: | |
| print(f"DOI HTML fetch failed: {e}") | |
| # 3) PubMed placeholder (extendable) | |
| try: | |
| text = fetch_from_pubmed(doi_clean) | |
| if text and len(text) > 50: | |
| return text | |
| except Exception: | |
| pass | |
| return None | |
| def fetch_from_crossref(doi: str) -> Optional[str]: | |
| """Fetch abstract from Crossref API.""" | |
| try: | |
| url = f"https://api.crossref.org/works/{doi}" | |
| response = _http_get(url, timeout=12) | |
| if response.status_code == 200: | |
| data = response.json() | |
| if 'message' in data: | |
| message = data['message'] | |
| # Check for abstract or highlights (case insensitive) | |
| for key in message: | |
| if key.lower() in ['abstract', 'highlights'] and message[key]: | |
| raw = str(message[key]) | |
| # Crossref sometimes returns JATS/XML; strip tags and unescape entities | |
| text = re.sub(r'<[^>]+>', ' ', raw) | |
| text = html.unescape(re.sub(r'\s+', ' ', text)).strip() | |
| return text | |
| except Exception: | |
| pass | |
| return None | |
| def fetch_from_doi_org(doi: str) -> Optional[str]: | |
| """Legacy wrapper kept for API compatibility; now uses robust pipeline.""" | |
| try: | |
| url = f"https://doi.org/{doi}" | |
| resp = _http_get(url, timeout=15) | |
| if not resp or resp.status_code >= 400: | |
| return None | |
| return robust_extract_abstract(resp.text or '') | |
| except Exception: | |
| return None | |
| def extract_from_preloaded_state_bruteforce(content: str) -> Optional[str]: | |
| """Extract abstract from window.__PRELOADED_STATE__ using brace matching and fallbacks.""" | |
| try: | |
| start_idx = content.find('window.__PRELOADED_STATE__') | |
| if start_idx == -1: | |
| return None | |
| # Find the first '{' after the equals sign | |
| eq_idx = content.find('=', start_idx) | |
| if eq_idx == -1: | |
| return None | |
| brace_idx = content.find('{', eq_idx) | |
| if brace_idx == -1: | |
| return None | |
| # Brace matching to find the matching closing '}' | |
| depth = 0 | |
| end_idx = -1 | |
| for i in range(brace_idx, min(len(content), brace_idx + 5_000_000)): | |
| ch = content[i] | |
| if ch == '{': depth += 1 | |
| elif ch == '}': | |
| depth -= 1 | |
| if depth == 0: | |
| end_idx = i | |
| break | |
| if end_idx == -1: | |
| return None | |
| json_str = content[brace_idx:end_idx+1] | |
| try: | |
| data = json.loads(json_str) | |
| except Exception as e: | |
| # Try to relax by removing trailing commas and control chars | |
| cleaned = re.sub(r',\s*([}\]])', r'\1', json_str) | |
| cleaned = re.sub(r'\u0000', '', cleaned) | |
| try: | |
| data = json.loads(cleaned) | |
| except Exception as e2: | |
| print(f"Failed to parse preloaded JSON: {e2}") | |
| return None | |
| # Same traversal as before | |
| if isinstance(data, dict) and 'abstracts' in data and isinstance(data['abstracts'], dict) and 'content' in data['abstracts']: | |
| abstracts = data['abstracts']['content'] | |
| if isinstance(abstracts, list): | |
| for abstract_item in abstracts: | |
| if isinstance(abstract_item, dict) and '$$' in abstract_item and abstract_item.get('#name') == 'abstract': | |
| class_name = abstract_item.get('$', {}).get('class', '') | |
| for section in abstract_item.get('$$', []): | |
| if isinstance(section, dict) and section.get('#name') == 'abstract-sec': | |
| section_text = extract_text_from_abstract_section(section) | |
| section_highlights = extract_highlights_from_section(section) | |
| if section_text and len(section_text.strip()) > 50: | |
| return clean_text(section_text) | |
| if section_highlights and len(section_highlights.strip()) > 50: | |
| return clean_text(section_highlights) | |
| if 'highlight' in class_name.lower(): | |
| highlights_text = extract_highlights_from_abstract_item(abstract_item) | |
| if highlights_text and len(highlights_text.strip()) > 50: | |
| return clean_text(highlights_text) | |
| return None | |
| except Exception as e: | |
| print(f"Error extracting from preloaded state (bruteforce): {e}") | |
| return None | |
| def extract_from_json_ld(content: str) -> Optional[str]: | |
| """Parse JSON-LD script tags and extract abstract/description if present.""" | |
| if not HAS_BS4: | |
| return None | |
| try: | |
| soup = BeautifulSoup(content, 'html.parser') | |
| for script in soup.find_all('script', type='application/ld+json'): | |
| try: | |
| data = json.loads(script.string or '{}') | |
| except Exception: | |
| continue | |
| candidates = [] | |
| if isinstance(data, dict): | |
| candidates.append(data) | |
| elif isinstance(data, list): | |
| candidates.extend([d for d in data if isinstance(d, dict)]) | |
| for obj in candidates: | |
| for key in ['abstract', 'description']: | |
| if key in obj and obj[key]: | |
| text = clean_text(str(obj[key])) | |
| if len(text) > 50: | |
| return text | |
| return None | |
| except Exception as e: | |
| print(f"Error extracting from JSON-LD: {e}") | |
| return None | |
| def clean_text(s: str) -> str: | |
| s = html.unescape(s) | |
| s = re.sub(r'\s+', ' ', s) | |
| return s.strip() | |
| def extract_from_meta_tags(soup) -> Optional[str]: | |
| try: | |
| # Common meta carriers of abstract-like summaries | |
| candidates = [] | |
| # OpenGraph description | |
| og = soup.find('meta', attrs={'property': 'og:description'}) | |
| if og and og.get('content'): | |
| candidates.append(og['content']) | |
| # Twitter description | |
| tw = soup.find('meta', attrs={'name': 'twitter:description'}) | |
| if tw and tw.get('content'): | |
| candidates.append(tw['content']) | |
| # Dublin Core description | |
| dc = soup.find('meta', attrs={'name': 'dc.description'}) | |
| if dc and dc.get('content'): | |
| candidates.append(dc['content']) | |
| # citation_abstract | |
| cit_abs = soup.find('meta', attrs={'name': 'citation_abstract'}) | |
| if cit_abs and cit_abs.get('content'): | |
| candidates.append(cit_abs['content']) | |
| # Fallback: any meta description | |
| desc = soup.find('meta', attrs={'name': 'description'}) | |
| if desc and desc.get('content'): | |
| candidates.append(desc['content']) | |
| # Clean and return the longest meaningful candidate | |
| candidates = [clean_text(c) for c in candidates if isinstance(c, str)] | |
| candidates.sort(key=lambda x: len(x), reverse=True) | |
| for text in candidates: | |
| if len(text) > 50: | |
| return text | |
| return None | |
| except Exception: | |
| return None | |
| def robust_extract_abstract(html_text: str) -> Optional[str]: | |
| """Layered extraction over raw HTML: preloaded-state, JSON-LD, meta tags, DOM, regex.""" | |
| if not html_text: | |
| return None | |
| # 1) ScienceDirect/Elsevier preloaded state (brace-matched) | |
| try: | |
| txt = extract_from_preloaded_state_bruteforce(html_text) | |
| if txt and len(txt) > 50: | |
| return clean_text(txt) | |
| except Exception: | |
| pass | |
| # 2) JSON-LD | |
| try: | |
| txt = extract_from_json_ld(html_text) | |
| if txt and len(txt) > 50: | |
| return clean_text(txt) | |
| except Exception: | |
| pass | |
| # 3) BeautifulSoup-based DOM extraction (meta + selectors + heading-sibling) | |
| if HAS_BS4: | |
| try: | |
| soup = BeautifulSoup(html_text, 'html.parser') | |
| # meta first | |
| meta_txt = extract_from_meta_tags(soup) | |
| if meta_txt and len(meta_txt) > 50: | |
| return clean_text(meta_txt) | |
| # selector scan | |
| selectors = [ | |
| 'div.abstract', 'div.Abstract', 'div.ABSTRACT', | |
| 'div[class*="abstract" i]', 'div[class*="Abstract" i]', | |
| 'section.abstract', 'section.Abstract', 'section.ABSTRACT', | |
| 'div[data-testid="abstract" i]', 'div[data-testid="Abstract" i]', | |
| 'div.article-abstract', 'div.article-Abstract', | |
| 'div.abstract-content', 'div.Abstract-content', | |
| 'div.highlights', 'div.Highlights', 'div.HIGHLIGHTS', | |
| 'div[class*="highlights" i]', 'div[class*="Highlights" i]', | |
| 'section.highlights', 'section.Highlights', 'section.HIGHLIGHTS', | |
| 'div[data-testid="highlights" i]', 'div[data-testid="Highlights" i]' | |
| ] | |
| for css in selectors: | |
| node = soup.select_one(css) | |
| if node: | |
| t = clean_text(node.get_text(' ', strip=True)) | |
| if len(t) > 50: | |
| return t | |
| # headings near Abstract/Highlights | |
| for tag in soup.find_all(['h1','h2','h3','h4','h5','h6','strong','b']): | |
| try: | |
| title = (tag.get_text() or '').strip().lower() | |
| if 'abstract' in title or 'highlights' in title: | |
| blocks = [] | |
| sib = tag | |
| steps = 0 | |
| while sib and steps < 20: | |
| sib = sib.find_next_sibling() | |
| steps += 1 | |
| if not sib: break | |
| if sib.name in ['p','div','section','article','ul','ol']: | |
| blocks.append(sib.get_text(' ', strip=True)) | |
| joined = clean_text(' '.join(blocks)) | |
| if len(joined) > 50: | |
| return joined | |
| except Exception: | |
| continue | |
| except Exception: | |
| pass | |
| # 4) Regex fallback | |
| try: | |
| patterns = [ | |
| r'<div[^>]*class="[^\"]*(?:abstract|Abstract|ABSTRACT|highlights|Highlights|HIGHLIGHTS)[^\"]*"[^>]*>(.*?)</div>', | |
| r'<section[^>]*class="[^\"]*(?:abstract|Abstract|ABSTRACT|highlights|Highlights|HIGHLIGHTS)[^\"]*"[^>]*>(.*?)</section>', | |
| r'<div[^>]*data-testid="(?:abstract|Abstract|highlights|Highlights)"[^>]*>(.*?)</div>' | |
| ] | |
| for pat in patterns: | |
| for m in re.findall(pat, html_text, re.DOTALL | re.IGNORECASE): | |
| t = clean_text(re.sub(r'<[^>]+>', ' ', m)) | |
| if len(t) > 50: | |
| return t | |
| except Exception: | |
| pass | |
| return None | |
| def extract_text_from_abstract_section(section: dict) -> str: | |
| """Extract text content from abstract section structure.""" | |
| try: | |
| text_parts = [] | |
| if '$$' in section: | |
| for item in section['$$']: | |
| if isinstance(item, dict): | |
| # Direct text content from simple-para | |
| if item.get('#name') == 'simple-para' and '_' in item: | |
| text_parts.append(item['_']) | |
| # Also check for para elements | |
| elif item.get('#name') == 'para' and '_' in item: | |
| text_parts.append(item['_']) | |
| # Recursively extract from nested structure | |
| elif '$$' in item: | |
| nested_text = extract_text_from_abstract_section(item) | |
| if nested_text: | |
| text_parts.append(nested_text) | |
| return ' '.join(text_parts) | |
| except Exception as e: | |
| print(f"Error extracting text from abstract section: {e}") | |
| return "" | |
| def extract_highlights_from_section(section: dict) -> str: | |
| """Extract highlights content from section structure.""" | |
| try: | |
| text_parts = [] | |
| if '$$' in section: | |
| for item in section['$$']: | |
| if isinstance(item, dict): | |
| # Look for section-title with "Highlights" | |
| if (item.get('#name') == 'section-title' and | |
| item.get('_') and 'highlight' in item['_'].lower()): | |
| # Found highlights section, extract list items | |
| highlights_text = extract_highlights_list(item, section) | |
| if highlights_text: | |
| text_parts.append(highlights_text) | |
| # Also look for direct list structures | |
| elif item.get('#name') == 'list': | |
| # Found list, extract list items directly | |
| highlights_text = extract_highlights_list(item, section) | |
| if highlights_text: | |
| text_parts.append(highlights_text) | |
| elif '$$' in item: | |
| # Recursively search for highlights | |
| nested_text = extract_highlights_from_section(item) | |
| if nested_text: | |
| text_parts.append(nested_text) | |
| return ' '.join(text_parts) | |
| except Exception as e: | |
| print(f"Error extracting highlights from section: {e}") | |
| return "" | |
| def extract_highlights_list(title_item: dict, parent_section: dict) -> str: | |
| """Extract highlights list items from the section structure.""" | |
| try: | |
| highlights = [] | |
| # Look for the list structure after the highlights title | |
| if '$$' in parent_section: | |
| for item in parent_section['$$']: | |
| if isinstance(item, dict) and item.get('#name') == 'list': | |
| # Found list, extract list items | |
| if '$$' in item: | |
| for list_item in item['$$']: | |
| if isinstance(list_item, dict) and list_item.get('#name') == 'list-item': | |
| # Extract text from list item | |
| item_text = extract_text_from_abstract_section(list_item) | |
| if item_text: | |
| highlights.append(f"• {item_text}") | |
| # Also check if the title_item itself contains a list (for direct list structures) | |
| if '$$' in title_item: | |
| for item in title_item['$$']: | |
| if isinstance(item, dict) and item.get('#name') == 'list': | |
| if '$$' in item: | |
| for list_item in item['$$']: | |
| if isinstance(list_item, dict) and list_item.get('#name') == 'list-item': | |
| item_text = extract_text_from_abstract_section(list_item) | |
| if item_text: | |
| highlights.append(f"• {item_text}") | |
| return ' '.join(highlights) | |
| except Exception as e: | |
| print(f"Error extracting highlights list: {e}") | |
| return "" | |
| def extract_highlights_from_abstract_item(abstract_item: dict) -> str: | |
| """Extract highlights from an abstract item that contains highlights.""" | |
| try: | |
| highlights = [] | |
| if '$$' in abstract_item: | |
| for section in abstract_item['$$']: | |
| if isinstance(section, dict) and section.get('#name') == 'abstract-sec': | |
| # Look for highlights within this section | |
| highlights_text = extract_highlights_from_section(section) | |
| if highlights_text: | |
| highlights.append(highlights_text) | |
| return ' '.join(highlights) | |
| except Exception as e: | |
| print(f"Error extracting highlights from abstract item: {e}") | |
| return "" | |
| def fetch_from_pubmed(doi: str) -> Optional[str]: | |
| """Fetch abstract from PubMed if available.""" | |
| try: | |
| # This is a simplified approach - in practice, you'd need to use PubMed API | |
| # For now, we'll skip this method but could be extended to check for: | |
| # - abstract field | |
| # - highlights field | |
| # - other summary fields | |
| pass | |
| except Exception: | |
| pass | |
| return None | |
| def convert_abstract_to_inverted_index(abstract: str) -> Dict: | |
| """Convert abstract text to inverted index format.""" | |
| if not abstract: | |
| return {} | |
| # Simple word tokenization and position mapping | |
| words = re.findall(r'\b\w+\b', abstract.lower()) | |
| inverted_index = {} | |
| for i, word in enumerate(words): | |
| if word not in inverted_index: | |
| inverted_index[word] = [] | |
| inverted_index[word].append(i) | |
| return inverted_index | |
| def extract_work_id_from_url(url: str) -> Optional[str]: | |
| """Extract OpenAlex work ID from various URL formats.""" | |
| if not url: | |
| return None | |
| # Handle different URL formats | |
| if 'openalex.org' in url: | |
| if '/works/' in url: | |
| # Extract ID from URL like https://openalex.org/W2741809807 | |
| work_id = url.split('/works/')[-1] | |
| return work_id | |
| elif 'api.openalex.org/works/' in url: | |
| # Extract ID from API URL | |
| work_id = url.split('/works/')[-1] | |
| return work_id | |
| # If it's already just an ID | |
| if url.startswith('W') and len(url) > 5: | |
| return url | |
| return None | |
| def save_to_database(session_id: str, data_type: str, data: Dict) -> str: | |
| """Legacy-compatible save helper that routes to the new split DB layout.""" | |
| if data_type == 'collection': | |
| work_id = data.get('work_id', '') | |
| title = data.get('title', '') | |
| return save_collection_to_database(work_id, title, data) | |
| if data_type == 'filter': | |
| source_collection = data.get('source_collection', '') | |
| research_question = data.get('research_question', '') | |
| return save_filter_to_database(source_collection, research_question, data) | |
| # Fallback legacy path (single folder) | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| filename = f"{session_id}_{data_type}_{timestamp}.pkl" | |
| filepath = os.path.join(DATABASE_DIR, filename) | |
| with open(filepath, 'wb') as f: pickle.dump(data, f) | |
| return filename | |
| def _clean_work_id(work_id_or_url: str) -> str: | |
| clean = extract_work_id_from_url(work_id_or_url) or work_id_or_url | |
| clean = clean.replace('https://api.openalex.org/works/', '').replace('https://openalex.org/', '') | |
| return clean | |
| def save_collection_to_database(work_id_or_url: str, title: str, data: Dict) -> str: | |
| """Save a collection once per work. Filename is the clean work id only (dedup).""" | |
| ensure_db_dirs() | |
| clean_id = _clean_work_id(work_id_or_url) | |
| filename = f"{clean_id}.pkl" | |
| filepath = os.path.join(COLLECTION_DB_DIR, filename) | |
| # Deduplicate: if exists, do NOT overwrite | |
| if os.path.exists(filepath): | |
| return filename | |
| # Ensure helpful metadata for frontend display | |
| data = dict(data) | |
| data['work_id'] = work_id_or_url | |
| data['title'] = title | |
| data['work_identifier'] = clean_id | |
| data['created'] = datetime.now().isoformat() | |
| with open(filepath, 'wb') as f: pickle.dump(data, f) | |
| return filename | |
| def save_filter_to_database(source_collection_clean_id: str, research_question: str, data: Dict) -> str: | |
| """Save a filter result linked to a source collection. Multiple filters allowed.""" | |
| ensure_db_dirs() | |
| # Slug for RQ to keep filenames short | |
| rq_slug = ''.join(c for c in research_question[:40] if c.isalnum() or c in (' ', '-', '_')).strip().replace(' ', '_') or 'rq' | |
| timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') | |
| filename = f"{source_collection_clean_id}__filter__{rq_slug}__{timestamp}.pkl" | |
| filepath = os.path.join(FILTER_DB_DIR, filename) | |
| data = dict(data) | |
| data['filter_identifier'] = filename.replace('.pkl','') | |
| data['source_collection'] = source_collection_clean_id | |
| data['research_question'] = research_question | |
| data['created'] = datetime.now().isoformat() | |
| with open(filepath, 'wb') as f: pickle.dump(data, f) | |
| return filename | |
| def get_collection_files() -> List[Dict]: | |
| files: List[Dict] = [] | |
| if not os.path.exists(COLLECTION_DB_DIR): return files | |
| for filename in os.listdir(COLLECTION_DB_DIR): | |
| if not filename.endswith('.pkl'): continue | |
| filepath = os.path.join(COLLECTION_DB_DIR, filename) | |
| try: | |
| stat = os.stat(filepath) | |
| with open(filepath, 'rb') as f: data = pickle.load(f) | |
| files.append({ | |
| 'filename': filename, | |
| 'type': 'collection', | |
| 'work_identifier': data.get('work_identifier') or filename.replace('.pkl',''), | |
| 'title': data.get('title',''), | |
| 'work_id': data.get('work_id',''), | |
| 'total_papers': data.get('total_papers',0), | |
| 'created': data.get('created', datetime.fromtimestamp(stat.st_ctime).isoformat()), | |
| 'size': stat.st_size | |
| }) | |
| except Exception: | |
| continue | |
| files.sort(key=lambda x: x['created'], reverse=True) | |
| return files | |
| def get_filter_files() -> List[Dict]: | |
| files: List[Dict] = [] | |
| if not os.path.exists(FILTER_DB_DIR): return files | |
| for filename in os.listdir(FILTER_DB_DIR): | |
| if not filename.endswith('.pkl'): continue | |
| filepath = os.path.join(FILTER_DB_DIR, filename) | |
| try: | |
| stat = os.stat(filepath) | |
| with open(filepath, 'rb') as f: data = pickle.load(f) | |
| files.append({ | |
| 'filename': filename, | |
| 'type': 'filter', | |
| 'filter_identifier': data.get('filter_identifier') or filename.replace('.pkl',''), | |
| 'source_collection': data.get('source_collection',''), | |
| 'research_question': data.get('research_question',''), | |
| 'relevant_papers': data.get('relevant_papers',0), | |
| 'total_papers': data.get('total_papers',0), | |
| 'tested_papers': data.get('tested_papers',0), | |
| 'created': data.get('created', datetime.fromtimestamp(stat.st_ctime).isoformat()), | |
| 'size': stat.st_size | |
| }) | |
| except Exception: | |
| continue | |
| files.sort(key=lambda x: x['created'], reverse=True) | |
| return files | |
| def get_database_files() -> List[Dict]: | |
| """Combined listing for frontend history panel.""" | |
| return get_collection_files() + get_filter_files() | |
| def find_existing_collection(work_id_or_url: str) -> Optional[str]: | |
| """Return existing collection filename for a work id if present (dedup).""" | |
| clean_id = _clean_work_id(work_id_or_url) | |
| filename = f"{clean_id}.pkl" | |
| filepath = os.path.join(COLLECTION_DB_DIR, filename) | |
| return filename if os.path.exists(filepath) else None | |
| def filter_papers_for_rq(papers: List[Dict], research_question: str) -> List[Dict]: | |
| """Filter papers based on research question using GPT-5 mini.""" | |
| if not papers or not research_question: | |
| return [] | |
| relevant_papers = [] | |
| for i, paper in enumerate(papers): | |
| print(f"Analyzing paper {i+1}/{len(papers)}: {paper.get('title', 'No title')[:50]}...") | |
| # Extract title and abstract | |
| title = paper.get('title', '') | |
| abstract = '' | |
| # Try to get abstract from inverted index | |
| inverted_abstract = paper.get('abstract_inverted_index') | |
| if inverted_abstract: | |
| words = [] | |
| for word, positions in inverted_abstract.items(): | |
| for pos in positions: | |
| while len(words) <= pos: | |
| words.append('') | |
| words[pos] = word | |
| abstract = ' '.join(words).strip() | |
| if not title and not abstract: | |
| continue | |
| # Create content for GPT analysis | |
| content = { | |
| 'title': title, | |
| 'abstract': abstract | |
| } | |
| # Analyze with GPT-5 mini | |
| try: | |
| analysis = analyze_with_gpt4(content, OPENAI_API_KEY) | |
| if analysis and analysis.get('aims_of_paper'): | |
| # Check if paper is relevant to research question | |
| relevance_prompt = f""" | |
| Research Question: {research_question} | |
| Paper Title: {title} | |
| Paper Abstract: {abstract or 'No abstract available'} | |
| Is this paper highly relevant to answering the research question? | |
| Consider the paper's aims, methods, and findings. | |
| Return ONLY a JSON object: {{"relevant": true/false, "reason": "brief explanation"}} | |
| """ | |
| relevance_response = analyze_with_gpt4({ | |
| 'title': 'Relevance Check', | |
| 'abstract': relevance_prompt | |
| }, OPENAI_API_KEY) | |
| if relevance_response and relevance_response.get('aims_of_paper'): | |
| # Parse the relevance response | |
| try: | |
| relevance_data = json.loads(relevance_response['aims_of_paper']) | |
| if relevance_data.get('relevant', False): | |
| paper['relevance_reason'] = relevance_data.get('reason', 'Relevant to research question') | |
| paper['gpt_analysis'] = analysis | |
| relevant_papers.append(paper) | |
| except: | |
| # If parsing fails, include paper anyway if it has analysis | |
| paper['gpt_analysis'] = analysis | |
| relevant_papers.append(paper) | |
| except Exception as e: | |
| print(f"Error analyzing paper {i+1}: {e}") | |
| continue | |
| return relevant_papers | |
| def index(): | |
| """Serve the main HTML page.""" | |
| return render_template('index.html') | |
| def health(): | |
| return jsonify({'status': 'ok', 'app': 'paper_analysis_backend', 'port': 5000}) | |
| def get_progress(task_id): | |
| """Get progress for a specific task.""" | |
| return jsonify(progress_data.get(task_id, {'status': 'not_found', 'progress': 0, 'message': 'Task not found'})) | |
| def collect_papers_async(work_id, limit, task_id): | |
| """Async function to collect papers with progress tracking.""" | |
| try: | |
| def progress_callback(progress, message): | |
| progress_data[task_id] = { | |
| 'status': 'running', | |
| 'progress': progress, | |
| 'message': message | |
| } | |
| progress_data[task_id] = {'status': 'running', 'progress': 0, 'message': 'Starting paper collection...'} | |
| # Get related papers with detailed counts and progress callback | |
| papers = get_related_papers(work_id, upper_limit=limit, progress_callback=progress_callback) | |
| if not papers: | |
| progress_data[task_id] = {'status': 'error', 'progress': 0, 'message': 'No related papers found'} | |
| return | |
| # Count papers by relationship type | |
| cited_count = sum(1 for p in papers if p.get('relationship') == 'cited') | |
| citing_count = sum(1 for p in papers if p.get('relationship') == 'citing') | |
| related_count = sum(1 for p in papers if p.get('relationship') == 'related') | |
| # Save papers to temporary file | |
| with open('temp_papers.pkl', 'wb') as f: | |
| pickle.dump(papers, f) | |
| # Fetch seed title for identifier; tolerate failures | |
| title = '' | |
| try: | |
| seed_resp = requests.get(f'https://api.openalex.org/works/{_clean_work_id(work_id)}', timeout=10) | |
| if seed_resp.ok: | |
| title = (seed_resp.json() or {}).get('title','') | |
| except Exception: | |
| title = '' | |
| # Save to collection database (dedup by work id) | |
| collection_data = { | |
| 'work_id': work_id, | |
| 'total_papers': len(papers), | |
| 'cited_papers': cited_count, | |
| 'citing_papers': citing_count, | |
| 'related_papers': related_count, | |
| 'limit': limit, | |
| 'papers': papers, | |
| } | |
| db_filename = save_collection_to_database(work_id, title, collection_data) | |
| progress_data[task_id] = { | |
| 'status': 'completed', | |
| 'progress': 100, | |
| 'message': 'Collection completed', | |
| 'result': { | |
| 'work_id': work_id, | |
| 'total_papers': len(papers), | |
| 'cited_papers': cited_count, | |
| 'citing_papers': citing_count, | |
| 'related_papers': related_count, | |
| 'limit': limit, | |
| 'papers': papers[:10], # Return first 10 for preview | |
| 'db_filename': db_filename | |
| } | |
| } | |
| except Exception as e: | |
| print(f"Error collecting papers: {e}") | |
| progress_data[task_id] = {'status': 'error', 'progress': 0, 'message': str(e)} | |
| def search_papers_by_title(title: str) -> List[Dict]: | |
| """Search OpenAlex for papers by title and return ranked matches.""" | |
| try: | |
| # Clean and prepare the title for search | |
| clean_title = title.strip() | |
| if not clean_title: | |
| return [] | |
| # Search OpenAlex API | |
| import urllib.parse | |
| params = { | |
| 'search': clean_title, | |
| 'per_page': 10, # Get top 10 results | |
| 'sort': 'relevance_score:desc' # Sort by relevance | |
| } | |
| # Build URL with query parameters | |
| query_string = urllib.parse.urlencode(params) | |
| search_url = f"https://api.openalex.org/works?{query_string}" | |
| print(f"EXACT URL BEING SEARCHED: {search_url}") | |
| response = _http_get(search_url, timeout=10) | |
| if not response or response.status_code != 200: | |
| print(f"OpenAlex search failed: {response.status_code if response else 'No response'}") | |
| return [] | |
| data = response.json() | |
| results = data.get('results', []) | |
| if not results: | |
| print(f"No results found for title: {clean_title}") | |
| return [] | |
| # Return top results (OpenAlex already ranks by relevance) | |
| scored_results = [] | |
| for work in results[:5]: # Take top 5 from OpenAlex | |
| work_title = work.get('title', '') | |
| if not work_title: | |
| continue | |
| work_id = work.get('id', '').replace('https://openalex.org/', '') | |
| scored_results.append({ | |
| 'work_id': work_id, | |
| 'title': work_title, | |
| 'authors': ', '.join([author.get('author', {}).get('display_name', '') for author in work.get('authorships', [])[:3]]), | |
| 'year': work.get('publication_date', '')[:4] if work.get('publication_date') else 'Unknown', | |
| 'venue': work.get('primary_location', {}).get('source', {}).get('display_name', 'Unknown'), | |
| 'relevance_score': work.get('relevance_score', 0) | |
| }) | |
| return scored_results | |
| except Exception as e: | |
| print(f"Error searching for papers by title: {e}") | |
| return [] | |
| def search_papers(): | |
| """Search for papers by title and return matches for user selection.""" | |
| try: | |
| data = request.get_json() | |
| paper_title = data.get('paper_title', '').strip() | |
| if not paper_title: | |
| return jsonify({'error': 'Paper title is required'}), 400 | |
| matches = search_papers_by_title(paper_title) | |
| if not matches: | |
| return jsonify({'error': f'No papers found matching title: {paper_title}'}), 404 | |
| return jsonify({ | |
| 'success': True, | |
| 'matches': matches, | |
| 'query': paper_title | |
| }) | |
| except Exception as e: | |
| print(f"Error searching papers: {e}") | |
| return jsonify({'error': str(e)}), 500 | |
| def collect_papers(): | |
| """Collect related papers from a seed paper URL or title search.""" | |
| try: | |
| data = request.get_json() | |
| seed_url = data.get('seed_url', '').strip() | |
| paper_title = data.get('paper_title', '').strip() | |
| method = data.get('method', 'url') | |
| user_api_key = data.get('user_api_key') # User's own API key for large collections | |
| if method == 'title' and not paper_title: | |
| return jsonify({'error': 'Paper title is required for title search'}), 400 | |
| elif method == 'url' and not seed_url: | |
| return jsonify({'error': 'Seed URL is required for URL method'}), 400 | |
| # Handle title search or URL method | |
| if method == 'title': | |
| # For title search, work_id should be provided (selected by user) | |
| work_id = data.get('selected_work_id', '').strip() | |
| if not work_id: | |
| return jsonify({'error': 'Selected work ID is required for title search'}), 400 | |
| else: | |
| # Extract work ID from URL | |
| work_id = extract_work_id_from_url(seed_url) | |
| if not work_id: | |
| return jsonify({'error': 'Invalid OpenAlex URL format'}), 400 | |
| print(f"Collecting papers for work ID: {work_id}") | |
| # Check if collection already exists (dedup) | |
| existing_file = find_existing_collection(work_id) | |
| if existing_file: | |
| print(f"Using existing collection: {existing_file}") | |
| # Load existing collection data | |
| filepath = os.path.join(COLLECTION_DB_DIR, existing_file) | |
| with open(filepath, 'rb') as f: | |
| existing_data = pickle.load(f) | |
| # Generate task ID for consistency | |
| task_id = f"collect_{int(time.time())}" | |
| # Set progress to completed immediately | |
| progress_data[task_id] = { | |
| 'status': 'completed', | |
| 'progress': 100, | |
| 'message': f'Using existing collection from {existing_data.get("created", "unknown time")}', | |
| 'result': { | |
| 'papers': existing_data.get('papers', []), | |
| 'total_papers': existing_data.get('total_papers', 0), | |
| 'cited_papers': existing_data.get('cited_papers', 0), | |
| 'citing_papers': existing_data.get('citing_papers', 0), | |
| 'related_papers': existing_data.get('related_papers', 0), | |
| 'db_filename': existing_file | |
| } | |
| } | |
| return jsonify({'success': True, 'task_id': task_id, 'used_existing': True, 'message': 'Using existing collection'}) | |
| # Optional limit from request (None means collect all) | |
| limit = data.get('limit') | |
| try: | |
| limit = int(limit) if limit is not None else None | |
| except Exception: | |
| limit = None | |
| # Generate task ID | |
| task_id = f"collect_{int(time.time())}" | |
| # Start async collection | |
| thread = threading.Thread(target=collect_papers_async, args=(work_id, limit, task_id)) | |
| thread.daemon = True | |
| thread.start() | |
| return jsonify({ | |
| 'success': True, | |
| 'task_id': task_id, | |
| 'message': 'Paper collection started' | |
| }) | |
| except Exception as e: | |
| print(f"Error collecting papers: {e}") | |
| return jsonify({'error': str(e)}), 500 | |
| def filter_papers(): | |
| """Filter papers based on research question.""" | |
| try: | |
| data = request.get_json() | |
| research_question = data.get('research_question', '').strip() | |
| limit = data.get('limit', 10) # Default to 10 most recent relevant papers | |
| provided_source_collection = (data.get('source_collection') or '').strip() | |
| papers_data = data.get('papers') # Papers passed directly from frontend | |
| user_api_key = data.get('user_api_key') # User's own API key for large analyses | |
| if not research_question: | |
| return jsonify({'error': 'Research question is required'}), 400 | |
| # Load papers from either passed data or temporary file | |
| papers = [] | |
| if papers_data: | |
| papers = papers_data | |
| elif os.path.exists('temp_papers.pkl'): | |
| with open('temp_papers.pkl', 'rb') as f: | |
| papers = pickle.load(f) | |
| else: | |
| return jsonify({'error': 'No papers found. Please collect papers first.'}), 400 | |
| print(f"Filtering {len(papers)} papers for research question: {research_question}") | |
| # Use user's API key if provided, otherwise use default | |
| api_key_to_use = user_api_key if user_api_key else OPENAI_API_KEY | |
| # Filter papers using custom analyzer (returns top N most recent relevant papers) | |
| relevant_papers = filter_papers_for_research_question(papers, research_question, api_key_to_use, limit) | |
| # Determine source collection id for linkage | |
| source_collection_id = None | |
| if provided_source_collection: | |
| source_collection_id = provided_source_collection | |
| else: | |
| try: | |
| collections = get_collection_files() | |
| if collections: | |
| source_collection_id = collections[0].get('work_identifier') | |
| except Exception: | |
| source_collection_id = None | |
| # Count actual relevant papers from analysis results | |
| actual_relevant = 0 | |
| for paper in relevant_papers: | |
| if paper.get('relevance_score') == True or paper.get('relevance_score') == 'true': | |
| actual_relevant += 1 | |
| # Calculate open access statistics | |
| total_oa = 0 | |
| for paper in papers: | |
| oa_info = paper.get('open_access') or {} | |
| if oa_info.get('is_oa', False): | |
| total_oa += 1 | |
| oa_percentage = round((total_oa / len(papers)) * 100) if papers else 0 | |
| # Calculate abstract statistics | |
| total_with_abstract = 0 | |
| for paper in papers: | |
| if paper.get('abstract_inverted_index') and len(paper.get('abstract_inverted_index', {})) > 0: | |
| total_with_abstract += 1 | |
| abstract_percentage = round((total_with_abstract / len(papers)) * 100) if papers else 0 | |
| # Save filtered results to filter database (linked to collection) | |
| tested_papers = int(limit) if isinstance(limit, int) else 0 | |
| filter_data = { | |
| 'research_question': research_question, | |
| 'total_papers': len(papers), # Total papers in collection | |
| 'tested_papers': tested_papers, # Number of papers tested for relevance | |
| 'relevant_papers': actual_relevant, # Actual count of YES responses | |
| 'oa_percentage': oa_percentage, # Open access percentage | |
| 'abstract_percentage': abstract_percentage, # Percentage with abstracts | |
| 'limit': limit, | |
| 'papers': relevant_papers, | |
| 'source_collection': source_collection_id | |
| } | |
| if source_collection_id: | |
| db_filename = save_filter_to_database(source_collection_id, research_question, filter_data) | |
| else: | |
| # Fallback | |
| db_filename = save_to_database(f"filter_{int(time.time())}", 'filter', filter_data) | |
| return jsonify({ | |
| 'success': True, | |
| 'research_question': research_question, | |
| 'total_papers': len(papers), # Total papers in collection | |
| 'tested_papers': tested_papers, # Number of papers tested for relevance | |
| 'relevant_papers': actual_relevant, # Actual count of YES responses | |
| 'oa_percentage': oa_percentage, # Open access percentage | |
| 'abstract_percentage': abstract_percentage, # Percentage with abstracts | |
| 'limit': limit, | |
| 'papers': relevant_papers, | |
| 'db_filename': db_filename | |
| }) | |
| except Exception as e: | |
| print(f"Error filtering papers: {e}") | |
| return jsonify({'error': str(e)}), 500 | |
| def get_database_files_endpoint(): | |
| """Get list of all database files (collections + filters).""" | |
| try: | |
| files = get_database_files() | |
| return jsonify({'success': True, 'files': files}) | |
| except Exception as e: | |
| return jsonify({'error': str(e)}), 500 | |
| def load_database_file(filename): | |
| """Load a specific database file.""" | |
| try: | |
| # Try collections then filters then legacy | |
| filepath = os.path.join(COLLECTION_DB_DIR, filename) | |
| if not os.path.exists(filepath): | |
| filepath = os.path.join(FILTER_DB_DIR, filename) | |
| if not os.path.exists(filepath): | |
| filepath = os.path.join(DATABASE_DIR, filename) | |
| if not os.path.exists(filepath): | |
| return jsonify({'error': 'File not found'}), 404 | |
| with open(filepath, 'rb') as f: | |
| data = pickle.load(f) | |
| return jsonify({'success': True, 'data': data}) | |
| except Exception as e: | |
| return jsonify({'error': str(e)}), 500 | |
| def delete_database_file(filename): | |
| """Delete a specific database file.""" | |
| try: | |
| # Try collections then filters then legacy | |
| filepath = os.path.join(COLLECTION_DB_DIR, filename) | |
| if not os.path.exists(filepath): | |
| filepath = os.path.join(FILTER_DB_DIR, filename) | |
| if not os.path.exists(filepath): | |
| filepath = os.path.join(DATABASE_DIR, filename) | |
| if not os.path.exists(filepath): | |
| return jsonify({'error': 'File not found'}), 404 | |
| # Delete the file | |
| os.remove(filepath) | |
| return jsonify({'success': True, 'message': f'File {filename} deleted successfully'}) | |
| except Exception as e: | |
| return jsonify({'error': str(e)}), 500 | |
| def generate_bibtex_entry(paper): | |
| """Generate a BibTeX entry for a single paper.""" | |
| try: | |
| # Handle None or invalid paper objects | |
| if not paper or not isinstance(paper, dict): | |
| print(f"Invalid paper object: {paper}") | |
| return f"@article{{error_{hash(str(paper)) % 10000},\n title={{Invalid paper data}},\n author={{Unknown}},\n year={{Unknown}}\n}}" | |
| # Extract basic info with safe defaults | |
| title = paper.get('title', 'Unknown Title') | |
| year = paper.get('publication_year', 'Unknown Year') | |
| doi = paper.get('doi', '') | |
| # Generate a unique key (using OpenAlex ID or DOI) | |
| work_id = paper.get('id', '') | |
| if work_id and isinstance(work_id, str): | |
| work_id = work_id.replace('https://openalex.org/', '') | |
| if not work_id and doi: | |
| work_id = doi.replace('https://doi.org/', '').replace('/', '_') | |
| if not work_id: | |
| work_id = f"paper_{hash(title) % 10000}" | |
| # Extract authors safely | |
| authorships = paper.get('authorships', []) | |
| author_list = [] | |
| if isinstance(authorships, list): | |
| for authorship in authorships: | |
| if isinstance(authorship, dict): | |
| author = authorship.get('author', {}) | |
| if isinstance(author, dict): | |
| display_name = author.get('display_name', '') | |
| if display_name: | |
| # Split name and format as "Last, First" | |
| name_parts = display_name.split() | |
| if len(name_parts) >= 2: | |
| last_name = name_parts[-1] | |
| first_name = ' '.join(name_parts[:-1]) | |
| author_list.append(f"{last_name}, {first_name}") | |
| else: | |
| author_list.append(display_name) | |
| authors = " and ".join(author_list) if author_list else "Unknown Author" | |
| # Extract journal info safely | |
| primary_location = paper.get('primary_location', {}) | |
| journal = 'Unknown Journal' | |
| if isinstance(primary_location, dict): | |
| source = primary_location.get('source', {}) | |
| if isinstance(source, dict): | |
| journal = source.get('display_name', 'Unknown Journal') | |
| # Extract volume, issue, pages safely | |
| biblio = paper.get('biblio', {}) | |
| volume = '' | |
| issue = '' | |
| first_page = '' | |
| last_page = '' | |
| if isinstance(biblio, dict): | |
| volume = biblio.get('volume', '') | |
| issue = biblio.get('issue', '') | |
| first_page = biblio.get('first_page', '') | |
| last_page = biblio.get('last_page', '') | |
| # Format pages | |
| if first_page and last_page and first_page != last_page: | |
| pages = f"{first_page}--{last_page}" | |
| elif first_page: | |
| pages = first_page | |
| else: | |
| pages = "" | |
| # Format volume and issue | |
| volume_info = "" | |
| if volume: | |
| volume_info = f"volume={{{volume}}}" | |
| if issue: | |
| volume_info += f", number={{{issue}}}" | |
| elif issue: | |
| volume_info = f"number={{{issue}}}" | |
| # Get URL (prefer DOI, fallback to landing page) | |
| url = doi if doi else '' | |
| if isinstance(primary_location, dict): | |
| landing_url = primary_location.get('landing_page_url', '') | |
| if landing_url and not url: | |
| url = landing_url | |
| # Build BibTeX entry | |
| bibtex_entry = f"""@article{{{work_id}, | |
| title={{{title}}}, | |
| author={{{authors}}}, | |
| journal={{{journal}}}, | |
| year={{{year}}}""" | |
| if volume_info: | |
| bibtex_entry += f",\n {volume_info}" | |
| if pages: | |
| bibtex_entry += f",\n pages={{{pages}}}" | |
| if doi: | |
| bibtex_entry += f",\n doi={{{doi.replace('https://doi.org/', '')}}}" | |
| if url: | |
| bibtex_entry += f",\n url={{{url}}}" | |
| bibtex_entry += "\n}" | |
| return bibtex_entry | |
| except Exception as e: | |
| print(f"Error generating BibTeX for paper: {e}") | |
| print(f"Paper data: {paper}") | |
| return f"@article{{error_{hash(str(paper)) % 10000},\n title={{Error generating entry}},\n author={{Unknown}},\n year={{Unknown}}\n}}" | |
| def generate_bibtex(filename): | |
| """Generate BibTeX file for a collection.""" | |
| try: | |
| # Load the collection | |
| collection_path = os.path.join(COLLECTION_DB_DIR, filename) | |
| if not os.path.exists(collection_path): | |
| return jsonify({'success': False, 'message': 'Collection not found'}), 404 | |
| with open(collection_path, 'rb') as f: | |
| collection_data = pickle.load(f) | |
| papers = collection_data.get('papers', []) | |
| if not papers: | |
| return jsonify({'success': False, 'message': 'No papers in collection'}), 400 | |
| print(f"Found {len(papers)} papers in collection") | |
| print(f"First paper structure: {type(papers[0]) if papers else 'No papers'}") | |
| if papers: | |
| print(f"First paper keys: {list(papers[0].keys()) if isinstance(papers[0], dict) else 'Not a dict'}") | |
| # Generate BibTeX entries | |
| bibtex_entries = [] | |
| for i, paper in enumerate(papers): | |
| print(f"Processing paper {i+1}/{len(papers)}: {type(paper)}") | |
| entry = generate_bibtex_entry(paper) | |
| bibtex_entries.append(entry) | |
| # Combine all entries | |
| bibtex_content = "\n\n".join(bibtex_entries) | |
| # Save BibTeX file | |
| bibtex_filename = filename.replace('.pkl', '.bib') | |
| bibtex_path = os.path.join(COLLECTION_DB_DIR, bibtex_filename) | |
| with open(bibtex_path, 'w', encoding='utf-8') as f: | |
| f.write(bibtex_content) | |
| print(f"BibTeX file saved to: {bibtex_path}") | |
| print(f"File exists: {os.path.exists(bibtex_path)}") | |
| print(f"File size: {os.path.getsize(bibtex_path) if os.path.exists(bibtex_path) else 'N/A'}") | |
| return jsonify({ | |
| 'success': True, | |
| 'message': f'BibTeX file generated with {len(papers)} entries', | |
| 'filename': bibtex_filename, | |
| 'entries_count': len(papers) | |
| }) | |
| except Exception as e: | |
| return jsonify({'success': False, 'message': f'Error generating BibTeX: {str(e)}'}), 500 | |
| def download_database_file(filename): | |
| """Download a database file (collection, filter, or BibTeX).""" | |
| try: | |
| print(f"Attempting to download file: {filename}") | |
| # Try collections first, then filters, then legacy | |
| filepath = os.path.join(COLLECTION_DB_DIR, filename) | |
| print(f"Checking collections path: {filepath}") | |
| if not os.path.exists(filepath): | |
| filepath = os.path.join(FILTER_DB_DIR, filename) | |
| print(f"Checking filters path: {filepath}") | |
| if not os.path.exists(filepath): | |
| filepath = os.path.join(DATABASE_DIR, filename) | |
| print(f"Checking legacy path: {filepath}") | |
| if not os.path.exists(filepath): | |
| print(f"File not found in any directory: {filename}") | |
| return jsonify({'error': 'File not found'}), 404 | |
| print(f"Found file at: {filepath}") | |
| print(f"File size: {os.path.getsize(filepath)}") | |
| return send_file(filepath, as_attachment=True, download_name=filename) | |
| except Exception as e: | |
| print(f"Error in download_database_file: {e}") | |
| return jsonify({'error': str(e)}), 500 | |
| def merge_collections(): | |
| """Merge multiple collections into a new collection with overlap analysis.""" | |
| try: | |
| data = request.get_json() | |
| collection_filenames = data.get('collections', []) | |
| if len(collection_filenames) < 2: | |
| return jsonify({'success': False, 'message': 'At least 2 collections required for merging'}), 400 | |
| # Load all collections and track their work IDs | |
| collections_data = [] | |
| all_work_ids = set() | |
| collection_work_ids = [] # List of sets, one per collection | |
| for filename in collection_filenames: | |
| collection_path = os.path.join(COLLECTION_DB_DIR, filename) | |
| if not os.path.exists(collection_path): | |
| return jsonify({'success': False, 'message': f'Collection {filename} not found'}), 404 | |
| with open(collection_path, 'rb') as f: | |
| collection_data = pickle.load(f) | |
| papers = collection_data.get('papers', []) | |
| collection_work_ids_set = set() | |
| # Extract work IDs for this collection | |
| for paper in papers: | |
| if isinstance(paper, dict): | |
| work_id = paper.get('id', '') | |
| if work_id: | |
| collection_work_ids_set.add(work_id) | |
| all_work_ids.add(work_id) | |
| collections_data.append({ | |
| 'filename': filename, | |
| 'title': collection_data.get('title', filename.replace('.pkl', '')), | |
| 'papers': papers, | |
| 'work_ids': collection_work_ids_set, | |
| 'total_papers': len(papers) | |
| }) | |
| collection_work_ids.append(collection_work_ids_set) | |
| # Calculate overlap statistics | |
| overlap_stats = [] | |
| total_unique_papers = len(all_work_ids) | |
| for i, collection in enumerate(collections_data): | |
| collection_work_ids_i = collection_work_ids[i] | |
| overlaps = [] | |
| # Calculate overlap with each other collection | |
| for j, other_collection in enumerate(collections_data): | |
| if i != j: | |
| other_work_ids = collection_work_ids[j] | |
| intersection = collection_work_ids_i.intersection(other_work_ids) | |
| overlap_count = len(intersection) | |
| overlap_percentage = (overlap_count / len(collection_work_ids_i)) * 100 if collection_work_ids_i else 0 | |
| overlaps.append({ | |
| 'collection': other_collection['title'], | |
| 'overlap_count': overlap_count, | |
| 'overlap_percentage': round(overlap_percentage, 1) | |
| }) | |
| overlap_stats.append({ | |
| 'collection': collection['title'], | |
| 'total_papers': collection['total_papers'], | |
| 'overlaps': overlaps | |
| }) | |
| # Create merged collection with unique papers only | |
| merged_papers = [] | |
| merged_work_ids = set() | |
| for collection in collections_data: | |
| for paper in collection['papers']: | |
| if isinstance(paper, dict): | |
| work_id = paper.get('id', '') | |
| if work_id and work_id not in merged_work_ids: | |
| merged_papers.append(paper) | |
| merged_work_ids.add(work_id) | |
| if not merged_papers: | |
| return jsonify({'success': False, 'message': 'No papers found in collections to merge'}), 400 | |
| # Calculate total papers across all collections (before deduplication) | |
| total_papers_before_merge = sum(collection['total_papers'] for collection in collections_data) | |
| duplicates_removed = total_papers_before_merge - len(merged_papers) | |
| deduplication_percentage = (duplicates_removed / total_papers_before_merge) * 100 if total_papers_before_merge > 0 else 0 | |
| # Create merged collection data | |
| collection_titles = [collection['title'] for collection in collections_data] | |
| merged_title = f"MERGED: {' + '.join(collection_titles[:3])}" | |
| if len(collection_titles) > 3: | |
| merged_title += f" + {len(collection_titles) - 3} more" | |
| merged_data = { | |
| 'work_identifier': f"merged_{int(time.time())}", | |
| 'title': merged_title, | |
| 'work_id': '', | |
| 'papers': merged_papers, | |
| 'total_papers': len(merged_papers), | |
| 'created': datetime.now().isoformat(), | |
| 'source_collections': collection_filenames, | |
| 'merge_stats': { | |
| 'total_papers_before_merge': total_papers_before_merge, | |
| 'duplicates_removed': duplicates_removed, | |
| 'deduplication_percentage': round(deduplication_percentage, 1), | |
| 'overlap_analysis': overlap_stats | |
| } | |
| } | |
| # Save merged collection | |
| merged_filename = f"merged_{int(time.time())}.pkl" | |
| merged_path = os.path.join(COLLECTION_DB_DIR, merged_filename) | |
| with open(merged_path, 'wb') as f: | |
| pickle.dump(merged_data, f) | |
| return jsonify({ | |
| 'success': True, | |
| 'message': f'Merged collection created with {len(merged_papers)} unique papers (removed {duplicates_removed} duplicates)', | |
| 'filename': merged_filename, | |
| 'total_papers': len(merged_papers), | |
| 'merge_stats': { | |
| 'total_papers_before_merge': total_papers_before_merge, | |
| 'duplicates_removed': duplicates_removed, | |
| 'deduplication_percentage': round(deduplication_percentage, 1), | |
| 'overlap_analysis': overlap_stats | |
| } | |
| }) | |
| except Exception as e: | |
| return jsonify({'success': False, 'message': f'Error merging collections: {str(e)}'}), 500 | |
| def fetch_abstracts(): | |
| """Fetch missing abstracts for papers using their DOI URLs.""" | |
| try: | |
| data = request.get_json() | |
| papers = data.get('papers', []) | |
| if not papers: | |
| return jsonify({'error': 'No papers provided'}), 400 | |
| updated_papers = [] | |
| fetched_count = 0 | |
| total_processed = 0 | |
| for paper in papers: | |
| total_processed += 1 | |
| updated_paper = paper.copy() | |
| # Check if paper already has abstract (check both abstract_inverted_index and abstract fields) | |
| has_abstract = ( | |
| (paper.get('abstract_inverted_index') and | |
| len(paper.get('abstract_inverted_index', {})) > 0) or | |
| (paper.get('abstract') and | |
| len(str(paper.get('abstract', '')).strip()) > 50) | |
| ) | |
| if not has_abstract and paper.get('doi'): | |
| print(f"Fetching abstract for DOI: {paper.get('doi')}") | |
| abstract = fetch_abstract_from_doi(paper.get('doi')) | |
| if abstract: | |
| # Convert to inverted index format | |
| inverted_index = convert_abstract_to_inverted_index(abstract) | |
| updated_paper['abstract_inverted_index'] = inverted_index | |
| fetched_count += 1 | |
| print(f"Successfully fetched abstract for: {paper.get('title', 'Unknown')[:50]}...") | |
| else: | |
| print(f"Could not fetch abstract for: {paper.get('title', 'Unknown')[:50]}...") | |
| updated_papers.append(updated_paper) | |
| return jsonify({ | |
| 'success': True, | |
| 'fetched_count': fetched_count, | |
| 'total_processed': total_processed, | |
| 'updated_papers': updated_papers | |
| }) | |
| except Exception as e: | |
| print(f"Error fetching abstracts: {e}") | |
| return jsonify({'error': str(e)}), 500 | |
| def export_excel_from_file(filename): | |
| """Export Excel from a specific database file.""" | |
| try: | |
| # Try collections then filters then legacy | |
| filepath = os.path.join(COLLECTION_DB_DIR, filename) | |
| if not os.path.exists(filepath): | |
| filepath = os.path.join(FILTER_DB_DIR, filename) | |
| if not os.path.exists(filepath): | |
| filepath = os.path.join(DATABASE_DIR, filename) | |
| if not os.path.exists(filepath): | |
| return jsonify({'error': 'File not found'}), 404 | |
| with open(filepath, 'rb') as f: | |
| data = pickle.load(f) | |
| papers = data.get('papers', []) | |
| if not papers: | |
| return jsonify({'error': 'No papers found in file'}), 400 | |
| # Prepare data for Excel export | |
| excel_data = [] | |
| for paper in papers: | |
| # Extract abstract from inverted index | |
| abstract = "" | |
| if paper.get('abstract_inverted_index'): | |
| words = [] | |
| for word, positions in paper['abstract_inverted_index'].items(): | |
| for pos in positions: | |
| while len(words) <= pos: | |
| words.append('') | |
| words[pos] = word | |
| abstract = ' '.join(words).strip() | |
| # Extract open access info with null checks | |
| oa_info = paper.get('open_access') or {} | |
| is_oa = oa_info.get('is_oa', False) if oa_info else False | |
| oa_status = oa_info.get('oa_status', '') if oa_info else '' | |
| # Extract DOI with null check | |
| doi = "" | |
| if paper.get('doi'): | |
| doi = paper['doi'].replace('https://doi.org/', '') | |
| # Extract authors with null checks | |
| authors = paper.get('authorships') or [] | |
| author_names = [] | |
| for author in authors[:5]: # Limit to first 5 authors | |
| if author and isinstance(author, dict): | |
| author_obj = author.get('author') or {} | |
| if author_obj and isinstance(author_obj, dict): | |
| author_names.append(author_obj.get('display_name', '')) | |
| # Extract journal with null checks | |
| journal = "" | |
| primary_location = paper.get('primary_location') | |
| if primary_location and isinstance(primary_location, dict): | |
| source = primary_location.get('source') | |
| if source and isinstance(source, dict): | |
| journal = source.get('display_name', '') | |
| # Extract GPT analysis with null checks | |
| gpt_analysis = paper.get('gpt_analysis') or {} | |
| gpt_aims = gpt_analysis.get('aims_of_paper', '') if gpt_analysis else '' | |
| gpt_takeaways = gpt_analysis.get('key_takeaways', '') if gpt_analysis else '' | |
| excel_data.append({ | |
| 'Title': paper.get('title', ''), | |
| 'Publication Date': paper.get('publication_date', ''), | |
| 'DOI': doi, | |
| 'Is Open Access': is_oa, | |
| 'OA Status': oa_status, | |
| 'Abstract': abstract, | |
| 'Relationship': paper.get('relationship', ''), | |
| 'Authors': ', '.join(author_names), | |
| 'Journal': journal, | |
| 'OpenAlex ID': paper.get('id', ''), | |
| 'Relevance Reason': paper.get('relevance_reason', ''), | |
| 'GPT Aims': gpt_aims, | |
| 'GPT Takeaways': gpt_takeaways | |
| }) | |
| # Create DataFrame and export to Excel | |
| df = pd.DataFrame(excel_data) | |
| excel_filename = f'{filename.replace(".pkl", "")}_{int(time.time())}.xlsx' | |
| # Create Excel file in a temporary location | |
| temp_dir = tempfile.gettempdir() | |
| excel_path = os.path.join(temp_dir, excel_filename) | |
| try: | |
| df.to_excel(excel_path, index=False) | |
| return send_file(excel_path, as_attachment=True, download_name=excel_filename) | |
| except Exception as e: | |
| print(f"Error creating Excel file: {e}") | |
| # Fallback: try current directory | |
| try: | |
| df.to_excel(excel_filename, index=False) | |
| return send_file(excel_filename, as_attachment=True, download_name=excel_filename) | |
| except Exception as e2: | |
| print(f"Error creating Excel file in current directory: {e2}") | |
| return jsonify({'error': f'Failed to create Excel file: {str(e2)}'}), 500 | |
| except Exception as e: | |
| print(f"Error exporting Excel: {e}") | |
| return jsonify({'error': str(e)}), 500 | |
| def export_excel(): | |
| """Export collected papers to Excel format.""" | |
| try: | |
| # Load papers from temporary file | |
| if not os.path.exists('temp_papers.pkl'): | |
| return jsonify({'error': 'No papers found. Please collect papers first.'}), 400 | |
| with open('temp_papers.pkl', 'rb') as f: | |
| papers = pickle.load(f) | |
| # Prepare data for Excel export | |
| excel_data = [] | |
| for paper in papers: | |
| # Extract abstract from inverted index | |
| abstract = "" | |
| if paper.get('abstract_inverted_index'): | |
| words = [] | |
| for word, positions in paper['abstract_inverted_index'].items(): | |
| for pos in positions: | |
| while len(words) <= pos: | |
| words.append('') | |
| words[pos] = word | |
| abstract = ' '.join(words).strip() | |
| # Extract open access info with null checks | |
| oa_info = paper.get('open_access') or {} | |
| is_oa = oa_info.get('is_oa', False) if oa_info else False | |
| oa_status = oa_info.get('oa_status', '') if oa_info else '' | |
| # Extract DOI with null check | |
| doi = "" | |
| if paper.get('doi'): | |
| doi = paper['doi'].replace('https://doi.org/', '') | |
| # Extract authors with null checks | |
| authors = paper.get('authorships') or [] | |
| author_names = [] | |
| for author in authors[:5]: # Limit to first 5 authors | |
| if author and isinstance(author, dict): | |
| author_obj = author.get('author') or {} | |
| if author_obj and isinstance(author_obj, dict): | |
| author_names.append(author_obj.get('display_name', '')) | |
| # Extract journal with null checks | |
| journal = "" | |
| primary_location = paper.get('primary_location') | |
| if primary_location and isinstance(primary_location, dict): | |
| source = primary_location.get('source') | |
| if source and isinstance(source, dict): | |
| journal = source.get('display_name', '') | |
| # Extract GPT analysis with null checks | |
| gpt_analysis = paper.get('gpt_analysis') or {} | |
| gpt_aims = gpt_analysis.get('aims_of_paper', '') if gpt_analysis else '' | |
| gpt_takeaways = gpt_analysis.get('key_takeaways', '') if gpt_analysis else '' | |
| excel_data.append({ | |
| 'Title': paper.get('title', ''), | |
| 'Publication Date': paper.get('publication_date', ''), | |
| 'DOI': doi, | |
| 'Is Open Access': is_oa, | |
| 'OA Status': oa_status, | |
| 'Abstract': abstract, | |
| 'Relationship': paper.get('relationship', ''), | |
| 'Authors': ', '.join(author_names), | |
| 'Journal': journal, | |
| 'OpenAlex ID': paper.get('id', ''), | |
| 'Relevance Reason': paper.get('relevance_reason', ''), | |
| 'GPT Aims': gpt_aims, | |
| 'GPT Takeaways': gpt_takeaways | |
| }) | |
| # Create DataFrame and export to Excel | |
| df = pd.DataFrame(excel_data) | |
| excel_filename = f'research_papers_{int(time.time())}.xlsx' | |
| # Create Excel file in a temporary location | |
| temp_dir = tempfile.gettempdir() | |
| excel_path = os.path.join(temp_dir, excel_filename) | |
| try: | |
| df.to_excel(excel_path, index=False) | |
| return send_file(excel_path, as_attachment=True, download_name=excel_filename) | |
| except Exception as e: | |
| print(f"Error creating Excel file: {e}") | |
| # Fallback: try current directory | |
| try: | |
| df.to_excel(excel_filename, index=False) | |
| return send_file(excel_filename, as_attachment=True, download_name=excel_filename) | |
| except Exception as e2: | |
| print(f"Error creating Excel file in current directory: {e2}") | |
| return jsonify({'error': f'Failed to create Excel file: {str(e2)}'}), 500 | |
| except Exception as e: | |
| print(f"Error exporting Excel: {e}") | |
| return jsonify({'error': str(e)}), 500 | |
| def paper_details(work_id): | |
| """Get detailed analysis for a specific paper.""" | |
| try: | |
| # Load papers from temporary file | |
| if not os.path.exists('temp_papers.pkl'): | |
| return jsonify({'error': 'No papers found'}), 400 | |
| with open('temp_papers.pkl', 'rb') as f: | |
| papers = pickle.load(f) | |
| # Find the specific paper | |
| paper = next((p for p in papers if p.get('id') == work_id), None) | |
| if not paper: | |
| return jsonify({'error': 'Paper not found'}), 404 | |
| return jsonify({ | |
| 'success': True, | |
| 'paper': paper | |
| }) | |
| except Exception as e: | |
| print(f"Error getting paper details: {e}") | |
| return jsonify({'error': str(e)}), 500 | |
| if __name__ == '__main__': | |
| # Create templates directory if it doesn't exist | |
| os.makedirs('templates', exist_ok=True) | |
| port = int(os.getenv('PORT', '5000')) | |
| app.run(debug=False, host='0.0.0.0', port=port) | |