Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python | |
| """ | |
| AI Tutor App - Course Addition Workflow | |
| This script guides you through the complete process of adding a new course to the AI Tutor App: | |
| 1. Process course markdown files to create JSONL data | |
| 2. MANDATORY MANUAL STEP: Add URLs to course content in the generated JSONL | |
| 3. Merge course JSONL into all_sources_data.jsonl | |
| 4. Add contextual information to document nodes | |
| 5. Create vector stores | |
| 6. Upload databases to HuggingFace | |
| 7. Update UI configuration | |
| Usage: | |
| python add_course_workflow.py --course [COURSE_NAME] | |
| Additional flags to run specific steps (if you want to restart from a specific point): | |
| --skip-process-md Skip the markdown processing step | |
| --skip-merge Skip merging into all_sources_data.jsonl | |
| --new-context-only Only process new content when adding context | |
| --skip-context Skip the context addition step entirely | |
| --skip-vectors Skip vector store creation | |
| --skip-upload Skip uploading to HuggingFace | |
| --skip-ui-update Skip updating the UI configuration | |
| """ | |
| import argparse | |
| import json | |
| import logging | |
| import os | |
| import pickle | |
| import subprocess | |
| import sys | |
| import time | |
| from pathlib import Path | |
| from typing import Dict, List, Set | |
| from dotenv import load_dotenv | |
| from huggingface_hub import HfApi, hf_hub_download | |
| # Load environment variables from .env file | |
| load_dotenv() | |
| # Configure logging | |
| logging.basicConfig( | |
| level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" | |
| ) | |
| logger = logging.getLogger(__name__) | |
| def ensure_required_files_exist(): | |
| """Download required data files from HuggingFace if they don't exist locally.""" | |
| # List of files to check and download | |
| required_files = { | |
| # Critical files | |
| "data/all_sources_data.jsonl": "all_sources_data.jsonl", | |
| "data/all_sources_contextual_nodes.pkl": "all_sources_contextual_nodes.pkl", | |
| # Documentation source files | |
| "data/transformers_data.jsonl": "transformers_data.jsonl", | |
| "data/peft_data.jsonl": "peft_data.jsonl", | |
| "data/trl_data.jsonl": "trl_data.jsonl", | |
| "data/llama_index_data.jsonl": "llama_index_data.jsonl", | |
| "data/langchain_data.jsonl": "langchain_data.jsonl", | |
| "data/openai_cookbooks_data.jsonl": "openai_cookbooks_data.jsonl", | |
| # Course files | |
| "data/tai_blog_data.jsonl": "tai_blog_data.jsonl", | |
| "data/8-hour_primer_data.jsonl": "8-hour_primer_data.jsonl", | |
| "data/llm_developer_data.jsonl": "llm_developer_data.jsonl", | |
| "data/python_primer_data.jsonl": "python_primer_data.jsonl" | |
| } | |
| # Critical files that must be downloaded | |
| critical_files = [ | |
| "data/all_sources_data.jsonl", | |
| "data/all_sources_contextual_nodes.pkl" | |
| ] | |
| # Check and download each file | |
| for local_path, remote_filename in required_files.items(): | |
| if not os.path.exists(local_path): | |
| logger.info(f"{remote_filename} not found. Attempting to download from HuggingFace...") | |
| try: | |
| hf_hub_download( | |
| token=os.getenv("HF_TOKEN"), | |
| repo_id="towardsai-tutors/ai-tutor-data", | |
| filename=remote_filename, | |
| repo_type="dataset", | |
| local_dir="data", | |
| ) | |
| logger.info(f"Successfully downloaded {remote_filename} from HuggingFace") | |
| except Exception as e: | |
| logger.warning(f"Could not download {remote_filename}: {e}") | |
| # Only create empty file for all_sources_data.jsonl if it's missing | |
| if local_path == "data/all_sources_data.jsonl": | |
| logger.warning("Creating a new all_sources_data.jsonl file. This will not include previously existing data.") | |
| with open(local_path, "w") as f: | |
| pass | |
| # If critical file is missing, print a more serious warning | |
| if local_path in critical_files: | |
| logger.warning(f"Critical file {remote_filename} is missing. The workflow may not function correctly.") | |
| if local_path == "data/all_sources_contextual_nodes.pkl": | |
| logger.warning("The context addition step will process all documents since no existing contexts were found.") | |
| def load_jsonl(file_path: str) -> List[Dict]: | |
| """Load data from a JSONL file.""" | |
| data = [] | |
| with open(file_path, "r", encoding="utf-8") as f: | |
| for line in f: | |
| data.append(json.loads(line)) | |
| return data | |
| def save_jsonl(data: List[Dict], file_path: str) -> None: | |
| """Save data to a JSONL file.""" | |
| with open(file_path, "w", encoding="utf-8") as f: | |
| for item in data: | |
| json.dump(item, f, ensure_ascii=False) | |
| f.write("\n") | |
| def process_markdown_files(course_name: str) -> str: | |
| """Process markdown files for a specific course. Returns path to output JSONL.""" | |
| logger.info(f"Processing markdown files for course: {course_name}") | |
| cmd = ["python", "data/scraping_scripts/process_md_files.py", course_name] | |
| result = subprocess.run(cmd) | |
| if result.returncode != 0: | |
| logger.error(f"Error processing markdown files - check output above") | |
| sys.exit(1) | |
| logger.info(f"Successfully processed markdown files for {course_name}") | |
| # Determine the output file path from process_md_files.py | |
| from data.scraping_scripts.process_md_files import SOURCE_CONFIGS | |
| if course_name not in SOURCE_CONFIGS: | |
| logger.error(f"Course {course_name} not found in SOURCE_CONFIGS") | |
| sys.exit(1) | |
| output_file = SOURCE_CONFIGS[course_name]["output_file"] | |
| return output_file | |
| def manual_url_addition(jsonl_path: str) -> None: | |
| """Guide the user through manually adding URLs to the course JSONL.""" | |
| logger.info(f"=== MANDATORY MANUAL STEP: URL ADDITION ===") | |
| logger.info(f"Please add the URLs to the course content in: {jsonl_path}") | |
| logger.info(f"For each document in the JSONL file:") | |
| logger.info(f"1. Open the file in a text editor") | |
| logger.info(f"2. Find the empty 'url' field for each document") | |
| logger.info(f"3. Add the appropriate URL from the live course platform") | |
| logger.info(f" Example URL format: https://academy.towardsai.net/courses/take/python-for-genai/multimedia/62515980-course-structure") | |
| logger.info(f"4. Save the file when done") | |
| # Check if URLs are present | |
| data = load_jsonl(jsonl_path) | |
| missing_urls = sum(1 for item in data if not item.get("url")) | |
| if missing_urls > 0: | |
| logger.warning(f"Found {missing_urls} documents without URLs in {jsonl_path}") | |
| answer = input( | |
| f"\n{missing_urls} documents are missing URLs. Have you added all the URLs? (yes/no): " | |
| ) | |
| if answer.lower() not in ["yes", "y"]: | |
| logger.info("Please add the URLs and run the script again.") | |
| sys.exit(0) | |
| else: | |
| logger.info("All documents have URLs. Continuing with the workflow.") | |
| def merge_into_all_sources(course_jsonl_path: str) -> None: | |
| """Merge the course JSONL into all_sources_data.jsonl.""" | |
| all_sources_path = "data/all_sources_data.jsonl" | |
| logger.info(f"Merging {course_jsonl_path} into {all_sources_path}") | |
| # Load course data | |
| course_data = load_jsonl(course_jsonl_path) | |
| # Load existing all_sources data if it exists | |
| all_data = [] | |
| if os.path.exists(all_sources_path): | |
| all_data = load_jsonl(all_sources_path) | |
| # Get doc_ids from existing data | |
| existing_ids = {item["doc_id"] for item in all_data} | |
| # Add new course data (avoiding duplicates) | |
| new_items = 0 | |
| for item in course_data: | |
| if item["doc_id"] not in existing_ids: | |
| all_data.append(item) | |
| existing_ids.add(item["doc_id"]) | |
| new_items += 1 | |
| # Save the combined data | |
| save_jsonl(all_data, all_sources_path) | |
| logger.info(f"Added {new_items} new documents to {all_sources_path}") | |
| def get_processed_doc_ids() -> Set[str]: | |
| """Get set of doc_ids that have already been processed with context.""" | |
| if not os.path.exists("data/all_sources_contextual_nodes.pkl"): | |
| return set() | |
| try: | |
| with open("data/all_sources_contextual_nodes.pkl", "rb") as f: | |
| nodes = pickle.load(f) | |
| return {node.source_node.node_id for node in nodes} | |
| except Exception as e: | |
| logger.error(f"Error loading processed doc_ids: {e}") | |
| return set() | |
| def add_context_to_nodes(new_only: bool = False) -> None: | |
| """Add context to document nodes, optionally processing only new content.""" | |
| logger.info("Adding context to document nodes") | |
| if new_only: | |
| # Load all documents | |
| all_docs = load_jsonl("data/all_sources_data.jsonl") | |
| processed_ids = get_processed_doc_ids() | |
| # Filter for unprocessed documents | |
| new_docs = [doc for doc in all_docs if doc["doc_id"] not in processed_ids] | |
| if not new_docs: | |
| logger.info("No new documents to process") | |
| return | |
| # Save temporary JSONL with only new documents | |
| temp_file = "data/new_docs_temp.jsonl" | |
| save_jsonl(new_docs, temp_file) | |
| # Temporarily modify the add_context_to_nodes.py script to use the temp file | |
| cmd = [ | |
| "python", | |
| "-c", | |
| f""" | |
| import asyncio | |
| import os | |
| import pickle | |
| import json | |
| from data.scraping_scripts.add_context_to_nodes import create_docs, process | |
| async def main(): | |
| # First, get the list of sources being updated from the temp file | |
| updated_sources = set() | |
| with open("{temp_file}", "r") as f: | |
| for line in f: | |
| data = json.loads(line) | |
| updated_sources.add(data["source"]) | |
| print(f"Updating nodes for sources: {{updated_sources}}") | |
| # Process new documents | |
| documents = create_docs("{temp_file}") | |
| enhanced_nodes = await process(documents) | |
| print(f"Generated context for {{len(enhanced_nodes)}} new nodes") | |
| # Load existing nodes if they exist | |
| existing_nodes = [] | |
| if os.path.exists("data/all_sources_contextual_nodes.pkl"): | |
| with open("data/all_sources_contextual_nodes.pkl", "rb") as f: | |
| existing_nodes = pickle.load(f) | |
| # Filter out existing nodes for sources we're updating | |
| filtered_nodes = [] | |
| removed_count = 0 | |
| for node in existing_nodes: | |
| # Try to extract source from node metadata | |
| try: | |
| source = None | |
| if hasattr(node, 'source_node') and hasattr(node.source_node, 'metadata'): | |
| source = node.source_node.metadata.get("source") | |
| elif hasattr(node, 'metadata'): | |
| source = node.metadata.get("source") | |
| if source not in updated_sources: | |
| filtered_nodes.append(node) | |
| else: | |
| removed_count += 1 | |
| except Exception: | |
| # Keep nodes where we can't determine the source | |
| filtered_nodes.append(node) | |
| print(f"Removed {{removed_count}} existing nodes for updated sources") | |
| existing_nodes = filtered_nodes | |
| # Combine filtered existing nodes with new nodes | |
| all_nodes = existing_nodes + enhanced_nodes | |
| # Save all nodes | |
| with open("data/all_sources_contextual_nodes.pkl", "wb") as f: | |
| pickle.dump(all_nodes, f) | |
| print(f"Total nodes in updated file: {{len(all_nodes)}}") | |
| asyncio.run(main()) | |
| """, | |
| ] | |
| else: | |
| # Process all documents | |
| cmd = ["python", "data/scraping_scripts/add_context_to_nodes.py"] | |
| result = subprocess.run(cmd) | |
| if result.returncode != 0: | |
| logger.error(f"Error adding context to nodes - check output above") | |
| sys.exit(1) | |
| logger.info("Successfully added context to nodes") | |
| # Clean up temp file if it exists | |
| if new_only and os.path.exists("data/new_docs_temp.jsonl"): | |
| os.remove("data/new_docs_temp.jsonl") | |
| def create_vector_stores() -> None: | |
| """Create vector stores from processed documents.""" | |
| logger.info("Creating vector stores") | |
| cmd = ["python", "data/scraping_scripts/create_vector_stores.py", "all_sources"] | |
| result = subprocess.run(cmd) | |
| if result.returncode != 0: | |
| logger.error(f"Error creating vector stores - check output above") | |
| sys.exit(1) | |
| logger.info("Successfully created vector stores") | |
| def upload_to_huggingface(upload_jsonl: bool = False) -> None: | |
| """Upload databases to HuggingFace.""" | |
| logger.info("Uploading databases to HuggingFace") | |
| cmd = ["python", "data/scraping_scripts/upload_dbs_to_hf.py"] | |
| result = subprocess.run(cmd) | |
| if result.returncode != 0: | |
| logger.error(f"Error uploading databases - check output above") | |
| sys.exit(1) | |
| logger.info("Successfully uploaded databases to HuggingFace") | |
| if upload_jsonl: | |
| logger.info("Uploading data files to HuggingFace") | |
| try: | |
| # Note: This uses a separate private repository | |
| cmd = ["python", "data/scraping_scripts/upload_data_to_hf.py"] | |
| result = subprocess.run(cmd) | |
| if result.returncode != 0: | |
| logger.error(f"Error uploading data files - check output above") | |
| sys.exit(1) | |
| logger.info("Successfully uploaded data files to HuggingFace") | |
| except Exception as e: | |
| logger.error(f"Error uploading JSONL file: {e}") | |
| sys.exit(1) | |
| def update_ui_files(course_name: str) -> None: | |
| """Update main.py and setup.py with the new source.""" | |
| logger.info(f"Updating UI files with new course: {course_name}") | |
| # Get the source configuration for display name | |
| from data.scraping_scripts.process_md_files import SOURCE_CONFIGS | |
| if course_name not in SOURCE_CONFIGS: | |
| logger.error(f"Course {course_name} not found in SOURCE_CONFIGS") | |
| return | |
| # Get a readable display name for the UI | |
| display_name = course_name.replace("_", " ").title() | |
| # Update setup.py - add to AVAILABLE_SOURCES and AVAILABLE_SOURCES_UI | |
| setup_path = Path("scripts/setup.py") | |
| if setup_path.exists(): | |
| setup_content = setup_path.read_text() | |
| # Check if already added | |
| if f'"{course_name}"' in setup_content: | |
| logger.info(f"Course {course_name} already in setup.py") | |
| else: | |
| # Add to AVAILABLE_SOURCES_UI | |
| ui_list_start = setup_content.find("AVAILABLE_SOURCES_UI = [") | |
| ui_list_end = setup_content.find("]", ui_list_start) | |
| new_ui_content = ( | |
| setup_content[:ui_list_end] | |
| + f' "{display_name}",\n' | |
| + setup_content[ui_list_end:] | |
| ) | |
| # Add to AVAILABLE_SOURCES | |
| sources_list_start = new_ui_content.find("AVAILABLE_SOURCES = [") | |
| sources_list_end = new_ui_content.find("]", sources_list_start) | |
| new_content = ( | |
| new_ui_content[:sources_list_end] | |
| + f' "{course_name}",\n' | |
| + new_ui_content[sources_list_end:] | |
| ) | |
| # Write updated content | |
| setup_path.write_text(new_content) | |
| logger.info(f"Updated setup.py with {course_name}") | |
| else: | |
| logger.warning(f"setup.py not found at {setup_path}") | |
| # Update main.py - add to source_mapping | |
| main_path = Path("scripts/main.py") | |
| if main_path.exists(): | |
| main_content = main_path.read_text() | |
| # Check if already added | |
| if f'"{display_name}": "{course_name}"' in main_content: | |
| logger.info(f"Course {course_name} already in main.py") | |
| else: | |
| # Add to source_mapping | |
| mapping_start = main_content.find("source_mapping = {") | |
| mapping_end = main_content.find("}", mapping_start) | |
| new_main_content = ( | |
| main_content[:mapping_end] | |
| + f' "{display_name}": "{course_name}",\n' | |
| + main_content[mapping_end:] | |
| ) | |
| # Add to default selected sources if not there | |
| value_start = new_main_content.find("value=[") | |
| value_end = new_main_content.find("]", value_start) | |
| if f'"{display_name}"' not in new_main_content[value_start:value_end]: | |
| new_main_content = ( | |
| new_main_content[: value_start + 7] | |
| + f' "{display_name}",\n' | |
| + new_main_content[value_start + 7 :] | |
| ) | |
| # Write updated content | |
| main_path.write_text(new_main_content) | |
| logger.info(f"Updated main.py with {course_name}") | |
| else: | |
| logger.warning(f"main.py not found at {main_path}") | |
| def main(): | |
| parser = argparse.ArgumentParser( | |
| description="AI Tutor App Course Addition Workflow" | |
| ) | |
| parser.add_argument( | |
| "--course", | |
| required=True, | |
| help="Name of the course to process (must match SOURCE_CONFIGS)", | |
| ) | |
| parser.add_argument( | |
| "--skip-process-md", | |
| action="store_true", | |
| help="Skip the markdown processing step", | |
| ) | |
| parser.add_argument( | |
| "--skip-merge", | |
| action="store_true", | |
| help="Skip merging into all_sources_data.jsonl", | |
| ) | |
| parser.add_argument( | |
| "--process-all-context", | |
| action="store_true", | |
| help="Process all content when adding context (default: only process new content)", | |
| ) | |
| parser.add_argument( | |
| "--skip-context", | |
| action="store_true", | |
| help="Skip the context addition step entirely", | |
| ) | |
| parser.add_argument( | |
| "--skip-vectors", action="store_true", help="Skip vector store creation" | |
| ) | |
| parser.add_argument( | |
| "--skip-upload", action="store_true", help="Skip uploading to HuggingFace" | |
| ) | |
| parser.add_argument( | |
| "--skip-ui-update", | |
| action="store_true", | |
| help="Skip updating the UI configuration", | |
| ) | |
| parser.add_argument( | |
| "--skip-data-upload", | |
| action="store_true", | |
| help="Skip uploading data files to private HuggingFace repo (they are uploaded by default)", | |
| ) | |
| args = parser.parse_args() | |
| course_name = args.course | |
| # Ensure required data files exist before proceeding | |
| ensure_required_files_exist() | |
| # Get the output file path | |
| from data.scraping_scripts.process_md_files import SOURCE_CONFIGS | |
| if course_name not in SOURCE_CONFIGS: | |
| logger.error(f"Course {course_name} not found in SOURCE_CONFIGS") | |
| sys.exit(1) | |
| course_jsonl_path = SOURCE_CONFIGS[course_name]["output_file"] | |
| # Execute the workflow steps | |
| if not args.skip_process_md: | |
| course_jsonl_path = process_markdown_files(course_name) | |
| # Always do the manual URL addition step for courses | |
| manual_url_addition(course_jsonl_path) | |
| if not args.skip_merge: | |
| merge_into_all_sources(course_jsonl_path) | |
| if not args.skip_context: | |
| add_context_to_nodes(not args.process_all_context) | |
| if not args.skip_vectors: | |
| create_vector_stores() | |
| if not args.skip_upload: | |
| # By default, also upload the data files (JSONL and PKL) unless explicitly skipped | |
| upload_to_huggingface(not args.skip_data_upload) | |
| if not args.skip_ui_update: | |
| update_ui_files(course_name) | |
| logger.info("Course addition workflow completed successfully") | |
| if __name__ == "__main__": | |
| main() | |