| | from concurrent.futures import ThreadPoolExecutor, as_completed |
| | import os |
| | import csv |
| | import json |
| | import fitz |
| | import time |
| | import requests |
| | import pandas as pd |
| | from time import sleep |
| | from pathlib import Path |
| | from system.html2lines import url2lines, line_correction, html2metadata |
| |
|
| | MAX_RETRIES = 3 |
| | TIMEOUT = 5 |
| |
|
| |
|
| | def scrape_text_from_url(url, temp_name): |
| | response = None |
| | for attempt in range(MAX_RETRIES): |
| | try: |
| | response = requests.get(url, timeout=TIMEOUT) |
| | break |
| | except requests.RequestException: |
| | if attempt < MAX_RETRIES - 1: |
| | sleep(3) |
| |
|
| | if response is None or response.status_code == 503: |
| | return [] |
| |
|
| | if url.endswith(".pdf"): |
| | pdf_dir = Path("/tmp/pdf_dir") |
| | pdf_dir.mkdir(parents=True, exist_ok=True) |
| | pdf_path = pdf_dir / f"{temp_name}.pdf" |
| | with open(pdf_path, "wb") as f: |
| | f.write(response.content) |
| |
|
| | extracted_text = "" |
| | doc = fitz.open(str(pdf_path)) |
| | for page in doc: |
| | extracted_text += page.get_text() or "" |
| |
|
| | return line_correction(extracted_text.split("\n")) |
| |
|
| | return line_correction(url2lines(url)) |
| | |
| | def process_row(row, claim_id): |
| | try: |
| | url = row[2] |
| | json_data = { |
| | "claim_id": claim_id, |
| | "type": row[1], |
| | "query": row[3], |
| | "url": url, |
| | "url2text": scrape_text_from_url(url, claim_id), |
| | "metadata": {} |
| | } |
| | meta = html2metadata(url) |
| | json_data["metadata"] = { |
| | "title": meta.get("title"), |
| | "date": meta.get("date") |
| | } |
| | return json_data |
| | except Exception as e: |
| | print(f"[WARN] Failed to scrape {row[2]}: {e}") |
| | return None |
| |
|
| | def run_scraper(tsv_file_path: str, output_jsonl_path: str, max_workers: int = 10): |
| | claim_id = Path(tsv_file_path).stem |
| | output_jsonl_path = Path(output_jsonl_path) |
| | output_jsonl_path.parent.mkdir(parents=True, exist_ok=True) |
| |
|
| | if output_jsonl_path.exists(): |
| | print(f"[INFO] Skipping processing as output file already exists: {output_jsonl_path}") |
| | return str(output_jsonl_path) |
| |
|
| | try: |
| | df = pd.read_csv(tsv_file_path, sep="\t", header=None) |
| | print("[INFO] Data loaded successfully with Pandas.") |
| | except Exception as e: |
| | raise RuntimeError(f"[ERROR] Failed to load TSV: {e}") |
| |
|
| | results = [] |
| | with ThreadPoolExecutor(max_workers=max_workers) as executor: |
| | futures = [executor.submit(process_row, row, claim_id) for _, row in df.iterrows()] |
| | for future in as_completed(futures): |
| | result = future.result() |
| | if result: |
| | results.append(result) |
| |
|
| | with open(output_jsonl_path, "w", encoding="utf-8") as json_file: |
| | for item in results: |
| | json_file.write(json.dumps(item, ensure_ascii=False) + "\n") |
| |
|
| | print(f"[SYSTEM] Output saved to {output_jsonl_path}") |
| | return str(output_jsonl_path) |
| |
|