import gradio as gr import torch import os import logging import time import tempfile import shutil import subprocess from datetime import datetime from pathlib import Path from huggingface_hub import HfApi from transformers import AutoConfig, AutoModel, AutoTokenizer from optimum.onnxruntime import ORTQuantizer from optimum.onnxruntime.configuration import AutoQuantizationConfig import torch.nn.utils.prune as prune logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') HF_TOKEN = os.getenv("HF_TOKEN") if not HF_TOKEN: logging.warning("HF_TOKEN environment variable not set. Packaging and uploading will fail.") api = HfApi() OUTPUT_DIR = "/tmp/optimized_models" os.makedirs(OUTPUT_DIR, exist_ok=True) # Use an absolute path to the pre-built location in /opt LLAMA_CPP_DIR = Path("/opt/llama.cpp") # Binaries are in the 'build/bin' subdirectory from our out-of-source build LLAMA_CPP_QUANTIZE_SCRIPT = LLAMA_CPP_DIR / "build" / "bin" / "quantize" LLAMA_CPP_CONVERT_SCRIPT = LLAMA_CPP_DIR / "convert.py" if not LLAMA_CPP_QUANTIZE_SCRIPT.exists(): error_msg = "FATAL ERROR: llama.cpp binaries not found. The Docker build may have failed." logging.error(error_msg) raise RuntimeError(error_msg) def stage_1_analyze_model(model_id: str): log_stream = "[STAGE 1] Analyzing model...\n" try: config = AutoConfig.from_pretrained(model_id, trust_remote_code=True, token=HF_TOKEN) model_type = config.model_type analysis_report = f"""### Model Analysis Report\n- **Model ID:** `{model_id}`\n- **Architecture:** `{model_type}`""" recommendation = "" if 'llama' in model_type or 'gpt' in model_type or 'mistral' in model_type or 'gemma' in model_type: recommendation = "**Recommendation:** This is a Large Language Model (LLM). For the best CPU performance, the **GGUF Pipeline** (using llama.cpp) is highly recommended." else: recommendation = "**Recommendation:** This is likely an encoder model. The **ONNX Pipeline** is recommended." log_stream += f"Analysis complete. Architecture: {model_type}.\n" return log_stream, analysis_report + "\n" + recommendation, gr.Accordion(open=True) except Exception as e: error_msg = f"Failed to analyze model '{model_id}'. Error: {e}" logging.error(error_msg) return log_stream + error_msg, "Could not analyze model.", gr.Accordion(open=False) def stage_2_prune_model(model, prune_percentage: float): if prune_percentage == 0: return model, "Skipped pruning as percentage was 0." log_stream = "[STAGE 2] Pruning model...\n" for name, module in model.named_modules(): if isinstance(module, torch.nn.Linear): prune.l1_unstructured(module, name='weight', amount=prune_percentage / 100.0) prune.remove(module, 'weight') log_stream += f"Pruning complete with {prune_percentage}% target.\n" return model, log_stream def stage_3_4_onnx_quantize(model_path_or_id: str, onnx_quant_type: str, calibration_data_path: str): log_stream = "[STAGE 3 & 4] Converting to ONNX and Quantizing...\n" run_id = datetime.now().strftime("%Y%m%d-%H%M%S") model_name = model_path_or_id.split('/')[-1] onnx_path = os.path.join(OUTPUT_DIR, f"{model_name}-{run_id}-onnx") try: export_command = ["optimum-cli", "export", "onnx", "--model", model_path_or_id, "--trust-remote-code", onnx_path] process = subprocess.run(export_command, check=True, capture_output=True, text=True) log_stream += f"Executing `optimum-cli export onnx` for '{model_path_or_id}'...\n{process.stdout}\n" if process.stderr: log_stream += f"[STDERR]\n{process.stderr}\n" except subprocess.CalledProcessError as e: raise RuntimeError(f"Failed during `optimum-cli export onnx`. Error:\n{e.stderr}") try: quantizer = ORTQuantizer.from_pretrained(onnx_path) log_stream += "Performing DYNAMIC quantization...\n" dqconfig = AutoQuantizationConfig.avx512_vnni(is_static=False, per_channel=False) quantized_path = os.path.join(onnx_path, "quantized-dynamic") quantizer.quantize(save_dir=quantized_path, quantization_config=dqconfig) log_stream += f"Successfully quantized model to: {quantized_path}\n" if not os.path.exists(os.path.join(quantized_path, 'tokenizer_config.json')): AutoTokenizer.from_pretrained(model_path_or_id, trust_remote_code=True).save_pretrained(quantized_path) log_stream += "Saved new tokenizer files.\n" return quantized_path, log_stream except Exception as e: raise RuntimeError(f"Failed during ONNX quantization step. Error: {e}") def stage_3_4_gguf_quantize(model_path_or_id: str, original_model_id: str, quantization_strategy: str): log_stream = "[STAGE 3 & 4] Converting to GGUF using llama.cpp...\n" run_id = datetime.now().strftime("%Y%m%d-%H%M%S") model_name = original_model_id.replace('/', '_') gguf_path = os.path.abspath(os.path.join(OUTPUT_DIR, f"{model_name}-{run_id}-gguf")) os.makedirs(gguf_path, exist_ok=True) f16_gguf_path = os.path.join(gguf_path, "model-f16.gguf") quantized_gguf_path = os.path.join(gguf_path, "model.gguf") absolute_model_path = os.path.abspath(model_path_or_id) if os.path.exists(model_path_or_id) else model_path_or_id try: # The python script can be called directly using its absolute path. convert_command = ["python3", str(LLAMA_CPP_CONVERT_SCRIPT), absolute_model_path, "--outfile", f16_gguf_path, "--outtype", "f16"] process = subprocess.run(convert_command, check=True, capture_output=True, text=True) log_stream += f"Executing llama.cpp conversion script...\n{process.stdout}\n" if process.stderr: log_stream += f"[STDERR]\n{process.stderr}\n" quantize_map = {"q4_k_m": "Q4_K_M", "q5_k_m": "Q5_K_M", "q8_0": "Q8_0", "f16": "F16"} target_quant_name = quantize_map.get(quantization_strategy.lower(), "Q4_K_M") if target_quant_name == "F16": log_stream += "Target is F16, renaming file...\n" os.rename(f16_gguf_path, quantized_gguf_path) else: log_stream += f"Quantizing FP16 GGUF to {target_quant_name}...\n" quantize_command = [str(LLAMA_CPP_QUANTIZE_SCRIPT), f16_gguf_path, quantized_gguf_path, target_quant_name] process = subprocess.run(quantize_command, check=True, capture_output=True, text=True) log_stream += f"{process.stdout}\n" if process.stderr: log_stream += f"[STDERR]\n{process.stderr}\n" os.remove(f16_gguf_path) return gguf_path, log_stream except subprocess.CalledProcessError as e: raise RuntimeError(f"Failed during llama.cpp execution. Error:\n{e.stderr}") except Exception as e: raise RuntimeError(f"An unexpected error occurred during GGUF conversion. Error: {e}") def stage_5_package_and_upload(model_id: str, optimized_model_path: str, pipeline_log: str, options: dict): log_stream = "[STAGE 5] Packaging and Uploading...\n" if not HF_TOKEN: return "Skipping upload: HF_TOKEN not found.", log_stream + "Skipping upload: HF_TOKEN not found." try: repo_name = f"{model_id.split('/')[-1]}-amop-cpu-{options['pipeline_type'].lower()}" repo_url = api.create_repo(repo_id=repo_name, exist_ok=True, token=HF_TOKEN) template_file = "model_card_template_gguf.md" if options['pipeline_type'] == "GGUF" else "model_card_template.md" with open(template_file, "r", encoding="utf-8") as f: template_content = f.read() model_card_content = template_content.format(repo_name=repo_name, model_id=model_id, optimization_date=datetime.now().strftime("%Y-%m-%d %H:%M:%S"), pruning_status="Enabled" if options.get('prune', False) else "Disabled", pruning_percent=options.get('prune_percent', 0), quant_type=options.get('quant_type', 'N/A'), repo_id=repo_url.repo_id, pipeline_log=pipeline_log) with open(os.path.join(optimized_model_path, "README.md"), "w", encoding="utf-8") as f: f.write(model_card_content) api.upload_folder(folder_path=optimized_model_path, repo_id=repo_url.repo_id, repo_type="model", token=HF_TOKEN) log_stream += "Upload complete.\n" return f"Success! Your optimized model is available at: huggingface.co/{repo_url.repo_id}", log_stream except Exception as e: raise RuntimeError(f"Failed to upload to the Hub. Error: {e}") def run_amop_pipeline(model_id: str, pipeline_type: str, do_prune: bool, prune_percent: float, onnx_quant_type: str, calibration_file, gguf_quant_type: str): if not model_id: yield {log_output: "Please enter a Model ID.", final_output: "Idle"} return initial_log = f"[START] AMOP {pipeline_type} Pipeline Initiated for '{model_id}'.\n" yield {run_button: gr.Button(interactive=False, value="🚀 Running..."), analyze_button: gr.Button(interactive=False), final_output: f"RUNNING ({pipeline_type})", log_output: initial_log} full_log = initial_log temp_model_dir = None model_path_or_id = model_id try: whoami = api.whoami(token=HF_TOKEN) if not whoami: raise RuntimeError("Could not authenticate with Hugging Face Hub. Check your HF_TOKEN.") repo_id_for_link = f"{whoami['name']}/{model_id.split('/')[-1]}-amop-cpu-{pipeline_type.lower()}" if do_prune and prune_percent > 0: full_log += f"\n[WARNING] Pruning is memory-intensive and may fail for large models.\n" yield {final_output: "Loading model (1/5)", log_output: full_log} model = AutoModel.from_pretrained(model_id, trust_remote_code=True) tokenizer = AutoTokenizer.from_pretrained(model_id, trust_remote_code=True) full_log += f"Successfully loaded '{model_id}'.\n" yield {final_output: "Pruning model (2/5)", log_output: full_log} model, log = stage_2_prune_model(model, prune_percent) full_log += log temp_model_dir = tempfile.mkdtemp() model.save_pretrained(temp_model_dir) tokenizer.save_pretrained(temp_model_dir) model_path_or_id = temp_model_dir full_log += f"Saved intermediate pruned model to {temp_model_dir}\n" else: full_log += "Pruning skipped.\n" if pipeline_type == "ONNX": yield {final_output: "Converting to ONNX (3/5)", log_output: full_log} optimized_path, log = stage_3_4_onnx_quantize(model_path_or_id, onnx_quant_type, calibration_file.name if onnx_quant_type == "Static" and calibration_file else None) options = {'pipeline_type': 'ONNX', 'prune': do_prune, 'prune_percent': prune_percent, 'quant_type': onnx_quant_type} elif pipeline_type == "GGUF": yield {final_output: "Converting to GGUF (3/5)", log_output: full_log} optimized_path, log = stage_3_4_gguf_quantize(model_path_or_id, model_id, gguf_quant_type) options = {'pipeline_type': 'GGUF', 'prune': do_prune, 'prune_percent': prune_percent, 'quant_type': gguf_quant_type} else: raise ValueError("Invalid pipeline type selected.") full_log += log yield {final_output: "Packaging & Uploading (4/5)", log_output: full_log} final_message, log = stage_5_package_and_upload(model_id, optimized_model_path, full_log, options) full_log += log yield {final_output: gr.update(value="SUCCESS", label="Status"), log_output: full_log, success_box: gr.Markdown(f"✅ **Success!** Model available: [{repo_id_for_link}](https://huggingface.co/{repo_id_for_link})", visible=True), run_button: gr.Button(interactive=True, value="Run Optimization Pipeline", variant="primary"), analyze_button: gr.Button(interactive=True, value="Analyze Model")} except Exception as e: logging.error(f"AMOP Pipeline failed. Error: {e}", exc_info=True) full_log += f"\n[ERROR] Pipeline failed: {e}" yield {final_output: gr.update(value="ERROR", label="Status"), log_output: full_log, success_box: gr.Markdown(f"❌ **An error occurred.** Check logs for details.", visible=True), run_button: gr.Button(interactive=True, value="Run Optimization Pipeline", variant="primary"), analyze_button: gr.Button(interactive=True, value="Analyze Model")} finally: if temp_model_dir and os.path.exists(temp_model_dir): shutil.rmtree(temp_model_dir) with gr.Blocks(theme=gr.themes.Soft()) as demo: gr.Markdown("# 🚀 AMOP: Adaptive Model Optimization Pipeline") if not HF_TOKEN: gr.Warning("HF_TOKEN not set! The final 'upload' step will be skipped.") with gr.Row(): with gr.Column(scale=1): gr.Markdown("### 1. Select a Model") model_id_input = gr.Textbox(label="Hugging Face Model ID", placeholder="e.g., gpt2, google/gemma-2b") analyze_button = gr.Button("🔍 Analyze Model", variant="secondary") with gr.Accordion("⚙️ 2. Configure Optimization", open=False) as optimization_accordion: analysis_report_output = gr.Markdown() pipeline_type_radio = gr.Radio(["ONNX", "GGUF"], label="Select Optimization Pipeline") gr.Warning("Pruning requires high RAM and may fail for models >2B parameters on free Spaces.") prune_checkbox = gr.Checkbox(label="Enable Pruning (Optional)", value=False, info="Removes redundant weights before quantization.") prune_slider = gr.Slider(minimum=0, maximum=90, value=20, step=5, label="Pruning Percentage (%)", visible=True) with gr.Group(visible=False) as onnx_options: gr.Markdown("#### ONNX Options") onnx_quant_radio = gr.Radio(["Dynamic"], label="Quantization Type", value="Dynamic", info="Static quantization via UI is not supported.") calibration_file_upload = gr.File(visible=False) with gr.Group(visible=False) as gguf_options: gr.Markdown("#### GGUF Options") gguf_quant_dropdown = gr.Dropdown(["q4_k_m", "q5_k_m", "q8_0", "f16"], label="Quantization Strategy", value="q4_k_m") run_button = gr.Button("🚀 Run Optimization Pipeline", variant="primary") with gr.Column(scale=2): gr.Markdown("### Pipeline Status & Logs") final_output = gr.Label(value="Idle", label="Status") success_box = gr.Markdown(visible=False) log_output = gr.Textbox(label="Live Logs", lines=20, interactive=False) def update_ui_for_pipeline(pipeline_type): return {onnx_options: gr.Group(visible=pipeline_type=="ONNX"), gguf_options: gr.Group(visible=pipeline_type=="GGUF")} pipeline_type_radio.change(fn=update_ui_for_pipeline, inputs=pipeline_type_radio, outputs=[onnx_options, gguf_options]) analyze_button.click(fn=stage_1_analyze_model, inputs=[model_id_input], outputs=[log_output, analysis_report_output, optimization_accordion]) run_button.click(fn=run_amop_pipeline, inputs=[model_id_input, pipeline_type_radio, prune_checkbox, prune_slider, onnx_quant_radio, calibration_file_upload, gguf_quant_dropdown], outputs=[run_button, analyze_button, final_output, log_output, success_box]) if __name__ == "__main__": demo.queue().launch(debug=True)