Spaces:
Sleeping
Sleeping
| from flask import Flask, send_from_directory, request, jsonify, send_file | |
| import anthropic | |
| import PyPDF2 | |
| import json | |
| import io | |
| import os | |
| from werkzeug.utils import secure_filename | |
| import tempfile | |
| import base64 | |
| from openpyxl import Workbook | |
| from waitress import serve | |
| import re | |
| app = Flask(__name__, static_folder='static') | |
| app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 | |
| UPLOAD_FOLDER = tempfile.mkdtemp() | |
| app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER | |
| CLAUDE_API_KEY = "" # replace with your actual API key | |
| claude_client = anthropic.Anthropic(api_key=CLAUDE_API_KEY) | |
| def extract_text_from_pdf(pdf_path): | |
| text = "" | |
| with open(pdf_path, 'rb') as file: | |
| pdf_reader = PyPDF2.PdfReader(file) | |
| for page in pdf_reader.pages: | |
| text += page.extract_text() | |
| return text | |
| def clean_text_for_processing(text): | |
| """Clean and compress text to reduce token usage""" | |
| # Remove excessive whitespace | |
| text = re.sub(r'\s+', ' ', text) | |
| # Remove empty lines | |
| text = re.sub(r'\n\s*\n', '\n', text) | |
| # Remove special characters that don't add value | |
| text = re.sub(r'[^\w\s\n\-\(\)@.,:/]', '', text) | |
| # Keep only first 8000 characters if text is too long | |
| if len(text) > 8000: | |
| text = text[:8000] + "..." | |
| return text.strip() | |
| def repair_json_string(json_str): | |
| """Repair common JSON formatting issues""" | |
| # Remove any trailing commas before closing brackets/braces | |
| json_str = re.sub(r',\s*}', '}', json_str) | |
| json_str = re.sub(r',\s*]', ']', json_str) | |
| # Fix unescaped quotes in strings | |
| json_str = re.sub(r'(?<!\\)"(?![,}\]:])(?![^"]*"[,}\]:])', '\\"', json_str) | |
| # Remove any text before first { or after last } | |
| start_idx = json_str.find('{') | |
| end_idx = json_str.rfind('}') + 1 | |
| if start_idx != -1 and end_idx > start_idx: | |
| json_str = json_str[start_idx:end_idx] | |
| return json_str | |
| def process_with_gemini(text): | |
| # Clean text to reduce tokens | |
| cleaned_text = clean_text_for_processing(text) | |
| # Shortened prompt to minimize tokens | |
| base_prompt = """Extract invoice data to valid JSON format. Return ONLY JSON, no explanations. | |
| For box quantities like "8 BOX (16 Units x 8)", create 8 separate box entries with 16 units each and number_of_box: 1. | |
| Required JSON structure: | |
| { | |
| "shipping_method": "Xindus-Express", | |
| "terms": null, | |
| "export_reference": null, | |
| "invoice_number": null, | |
| "invoice_date": null, | |
| "purpose": null, | |
| "shipping_currency": null, | |
| "tax_type": null, | |
| "service": null, | |
| "iec": null, | |
| "shipment_references": null, | |
| "generate_invoice": null, | |
| "consignor_email": null, | |
| "market_place": null, | |
| "shipment_boxes": [ | |
| { | |
| "weight": null, | |
| "uom": null, | |
| "width": null, | |
| "length": null, | |
| "height": null, | |
| "box_id": null, | |
| "shipment_box_items": [ | |
| { | |
| "category": null, | |
| "description": null, | |
| "ehsn": null, | |
| "ihsn": null, | |
| "quantity": null, | |
| "weight": null, | |
| "unit_price": null, | |
| "igst": null, | |
| "igst_amount": null, | |
| "number_of_box": 1, | |
| "per_box_unit": null | |
| } | |
| ] | |
| } | |
| ], | |
| "shipper_kyc": { | |
| "bank_account_number": null, | |
| "nfei": null, | |
| "is_gov": null, | |
| "lut_verified": null, | |
| "shipper_docs": [{"doc_name": null, "doc_number": null, "doc_verified": null}] | |
| }, | |
| "shipper_address": { | |
| "name": null, "email": null, "phone": null, "address": null, | |
| "city": null, "zip": null, "state": null, "country": null, "extension_number": null | |
| }, | |
| "receiver_address": { | |
| "name": null, "email": null, "phone": null, "address": null, | |
| "city": null, "zip": null, "state": null, "country": null, "extension_number": null | |
| }, | |
| "billing_address": { | |
| "name": null, "email": null, "phone": null, "address": null, | |
| "city": null, "zip": null, "state": null, "country": null, "extension_number": null | |
| }, | |
| "ior_address": { | |
| "name": null, "email": null, "phone": null, "address": null, | |
| "city": null, "zip": null, "state": null, "country": null, "extension_number": null | |
| } | |
| } | |
| Text: """ | |
| full_prompt = base_prompt + cleaned_text | |
| try: | |
| # Use Claude Sonnet 4 for faster processing and lower cost | |
| response = claude_client.messages.create( | |
| model="claude-sonnet-4-20250514", | |
| max_tokens=4000, # Increased slightly to avoid truncation | |
| temperature=0, | |
| messages=[ | |
| { | |
| "role": "user", | |
| "content": full_prompt | |
| } | |
| ] | |
| ) | |
| # Log token usage | |
| input_tokens = response.usage.input_tokens if hasattr(response, 'usage') else 0 | |
| output_tokens = response.usage.output_tokens if hasattr(response, 'usage') else 0 | |
| print(f"Token usage - Input: {input_tokens}, Output: {output_tokens}, Total: {input_tokens + output_tokens}") | |
| raw_output = response.content[0].text if response.content else "" | |
| print(f"Raw output length: {len(raw_output)} characters") | |
| # Extract and repair JSON | |
| json_str = repair_json_string(raw_output) | |
| if not json_str: | |
| raise ValueError("No valid JSON found in Claude response") | |
| # Try to parse JSON with better error handling | |
| try: | |
| parsed_data = json.loads(json_str) | |
| except json.JSONDecodeError as e: | |
| print(f"JSON parse error at position {e.pos}: {str(e)}") | |
| print(f"Problematic JSON section: {json_str[max(0, e.pos-50):e.pos+50]}") | |
| # Try to fix common issues and retry | |
| json_str = json_str.replace('\\n', ' ').replace('\\t', ' ') | |
| json_str = re.sub(r'\s+', ' ', json_str) | |
| try: | |
| parsed_data = json.loads(json_str) | |
| except json.JSONDecodeError: | |
| # If still failing, return a basic structure | |
| print("Using fallback JSON structure") | |
| parsed_data = { | |
| "shipping_method": "Xindus-Express", | |
| "terms": None, | |
| "export_reference": None, | |
| "invoice_number": None, | |
| "invoice_date": None, | |
| "purpose": None, | |
| "shipping_currency": None, | |
| "tax_type": None, | |
| "service": None, | |
| "iec": None, | |
| "shipment_references": None, | |
| "generate_invoice": None, | |
| "consignor_email": None, | |
| "market_place": None, | |
| "shipment_boxes": [], | |
| "shipper_kyc": { | |
| "bank_account_number": None, | |
| "nfei": None, | |
| "is_gov": None, | |
| "lut_verified": None, | |
| "shipper_docs": [] | |
| }, | |
| "shipper_address": { | |
| "name": None, "email": None, "phone": None, "address": None, | |
| "city": None, "zip": None, "state": None, "country": None, "extension_number": None | |
| }, | |
| "receiver_address": { | |
| "name": None, "email": None, "phone": None, "address": None, | |
| "city": None, "zip": None, "state": None, "country": None, "extension_number": None | |
| }, | |
| "billing_address": { | |
| "name": None, "email": None, "phone": None, "address": None, | |
| "city": None, "zip": None, "state": None, "country": None, "extension_number": None | |
| }, | |
| "ior_address": { | |
| "name": None, "email": None, "phone": None, "address": None, | |
| "city": None, "zip": None, "state": None, "country": None, "extension_number": None | |
| } | |
| } | |
| # Fix number_of_box field | |
| fix_number_of_box_field(parsed_data) | |
| return json.dumps(parsed_data) | |
| except Exception as e: | |
| print(f"Claude API error: {str(e)}") | |
| raise RuntimeError("Claude API error: " + str(e)) | |
| def fix_number_of_box_field(data): | |
| """Ensure number_of_box is always 1 for each individual box entry""" | |
| if 'shipment_boxes' in data and isinstance(data['shipment_boxes'], list): | |
| for box in data['shipment_boxes']: | |
| if 'shipment_box_items' in box and isinstance(box['shipment_box_items'], list): | |
| for item in box['shipment_box_items']: | |
| # Force number_of_box to be 1 for each individual box | |
| item['number_of_box'] = 1 | |
| def flatten_dict(d, parent_key='', sep='_'): | |
| items = [] | |
| for k, v in d.items(): | |
| new_key = parent_key + sep + k if parent_key else k | |
| if isinstance(v, dict): | |
| items.extend(flatten_dict(v, new_key, sep=sep).items()) | |
| elif isinstance(v, list): | |
| for i, item in enumerate(v): | |
| if isinstance(item, dict): | |
| items.extend(flatten_dict(item, new_key + '_' + str(i), sep=sep).items()) | |
| else: | |
| items.append((new_key + '_' + str(i), item)) | |
| else: | |
| items.append((new_key, v)) | |
| return dict(items) | |
| def index(): | |
| return send_from_directory('static', 'index.html') | |
| def health_check(): | |
| return jsonify({'status': 'healthy', 'message': 'PDF Converter API is running.'}), 200 | |
| def upload_file(): | |
| if 'file' not in request.files: | |
| return jsonify({'error': 'No file selected'}), 400 | |
| file = request.files['file'] | |
| if file.filename == '': | |
| return jsonify({'error': 'No file selected'}), 400 | |
| if not (file and file.filename.lower().endswith('.pdf')): | |
| return jsonify({'error': 'Invalid file format. Please upload a PDF file.'}), 400 | |
| try: | |
| with tempfile.NamedTemporaryFile(suffix='.pdf', dir=app.config['UPLOAD_FOLDER'], delete=False) as tmp_file: | |
| file.save(tmp_file.name) | |
| tmp_file_path = tmp_file.name | |
| try: | |
| print("Starting PDF text extraction...") | |
| text = extract_text_from_pdf(tmp_file_path) | |
| print(f"Extracted text length: {len(text)} characters") | |
| print("Processing with Claude...") | |
| gemini_response = process_with_gemini(text) | |
| print("Claude processing completed") | |
| try: | |
| # gemini_response is already a JSON string from process_with_gemini | |
| invoice_data = json.loads(gemini_response) | |
| print(f"Successfully parsed invoice data for: {invoice_data.get('invoice_number', 'Unknown')}") | |
| except Exception as e: | |
| print(f"JSON parsing error: {str(e)}") | |
| # Try to extract JSON manually as fallback | |
| try: | |
| start_idx = gemini_response.find('{') | |
| end_idx = gemini_response.rfind('}') + 1 | |
| if start_idx != -1 and end_idx > start_idx: | |
| json_str = repair_json_string(gemini_response[start_idx:end_idx]) | |
| invoice_data = json.loads(json_str) | |
| print("Fallback JSON parsing successful") | |
| else: | |
| raise ValueError("No valid JSON structure found") | |
| except Exception as fallback_error: | |
| print(f"Fallback parsing also failed: {str(fallback_error)}") | |
| return jsonify({ | |
| 'error': 'Failed to parse invoice data: ' + str(e), | |
| 'raw_response': gemini_response[:500] + "..." if len(gemini_response) > 500 else gemini_response | |
| }), 500 | |
| return jsonify({ | |
| 'success': True, | |
| 'data': invoice_data, | |
| 'filename': secure_filename(file.filename) | |
| }) | |
| except Exception as e: | |
| print(f"Processing error: {str(e)}") | |
| return jsonify({'error': 'Processing failed: ' + str(e)}), 500 | |
| finally: | |
| try: | |
| if os.path.exists(tmp_file_path): | |
| os.unlink(tmp_file_path) | |
| except: | |
| pass | |
| except Exception as e: | |
| print(f"Upload error: {str(e)}") | |
| return jsonify({'error': 'File upload failed: ' + str(e)}), 500 | |
| def convert_to_json(): | |
| if not request.is_json: | |
| return jsonify({'error': 'Request must be JSON'}), 400 | |
| data = request.get_json() | |
| if not data or 'invoice_data' not in data: | |
| return jsonify({'error': 'Missing invoice_data in request'}), 400 | |
| invoice_data = data['invoice_data'] | |
| try: | |
| json_buffer = io.StringIO() | |
| json.dump(invoice_data, json_buffer, indent=2) | |
| json_content = json_buffer.getvalue() | |
| json_buffer.close() | |
| invoice_number = invoice_data.get('invoice_number', 'data') | |
| filename = 'invoice_' + str(invoice_number) + '.json' | |
| return jsonify({ | |
| 'success': True, | |
| 'json_data': json_content, | |
| 'filename': filename | |
| }) | |
| except Exception as e: | |
| return jsonify({'error': 'JSON conversion failed: ' + str(e)}), 500 | |
| def convert_to_excel(): | |
| if not request.is_json: | |
| return jsonify({'error': 'Request must be JSON'}), 400 | |
| data = request.get_json() | |
| if not data or 'invoice_data' not in data: | |
| return jsonify({'error': 'Missing invoice_data in request'}), 400 | |
| invoice_data = data['invoice_data'] | |
| try: | |
| wb = Workbook() | |
| ws = wb.active | |
| ws.title = "Invoice Data" | |
| flat_data = flatten_dict(invoice_data) | |
| ws.append(['Field', 'Value']) | |
| for key, value in flat_data.items(): | |
| if isinstance(value, (list, dict)): | |
| value = json.dumps(value) | |
| ws.append([key, value]) | |
| excel_buffer = io.BytesIO() | |
| wb.save(excel_buffer) | |
| excel_buffer.seek(0) | |
| excel_content = base64.b64encode(excel_buffer.getvalue()).decode('utf-8') | |
| excel_buffer.close() | |
| invoice_number = invoice_data.get('invoice_number', 'data') | |
| filename = 'invoice_' + str(invoice_number) + '.xlsx' | |
| return jsonify({ | |
| 'success': True, | |
| 'excel_data': excel_content, | |
| 'filename': filename | |
| }) | |
| except Exception as e: | |
| return jsonify({'error': 'Excel conversion failed: ' + str(e)}), 500 | |
| if __name__ == '__main__': | |
| serve( | |
| app, | |
| host='0.0.0.0', | |
| port=7860, | |
| threads=4, | |
| channel_timeout=180, | |
| cleanup_interval=30 | |
| ) |