from flask import Flask, send_from_directory, request, jsonify, send_file import anthropic import PyPDF2 import json import io import os from werkzeug.utils import secure_filename import tempfile import base64 from openpyxl import Workbook from waitress import serve import re app = Flask(__name__, static_folder='static') app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 UPLOAD_FOLDER = tempfile.mkdtemp() app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER CLAUDE_API_KEY = "" # replace with your actual API key claude_client = anthropic.Anthropic(api_key=CLAUDE_API_KEY) def extract_text_from_pdf(pdf_path): text = "" with open(pdf_path, 'rb') as file: pdf_reader = PyPDF2.PdfReader(file) for page in pdf_reader.pages: text += page.extract_text() return text def clean_text_for_processing(text): """Clean and compress text to reduce token usage""" # Remove excessive whitespace text = re.sub(r'\s+', ' ', text) # Remove empty lines text = re.sub(r'\n\s*\n', '\n', text) # Remove special characters that don't add value text = re.sub(r'[^\w\s\n\-\(\)@.,:/]', '', text) # Keep only first 8000 characters if text is too long if len(text) > 8000: text = text[:8000] + "..." return text.strip() def repair_json_string(json_str): """Repair common JSON formatting issues""" # Remove any trailing commas before closing brackets/braces json_str = re.sub(r',\s*}', '}', json_str) json_str = re.sub(r',\s*]', ']', json_str) # Fix unescaped quotes in strings json_str = re.sub(r'(? start_idx: json_str = json_str[start_idx:end_idx] return json_str def process_with_gemini(text): # Clean text to reduce tokens cleaned_text = clean_text_for_processing(text) # Shortened prompt to minimize tokens base_prompt = """Extract invoice data to valid JSON format. Return ONLY JSON, no explanations. For box quantities like "8 BOX (16 Units x 8)", create 8 separate box entries with 16 units each and number_of_box: 1. Required JSON structure: { "shipping_method": "Xindus-Express", "terms": null, "export_reference": null, "invoice_number": null, "invoice_date": null, "purpose": null, "shipping_currency": null, "tax_type": null, "service": null, "iec": null, "shipment_references": null, "generate_invoice": null, "consignor_email": null, "market_place": null, "shipment_boxes": [ { "weight": null, "uom": null, "width": null, "length": null, "height": null, "box_id": null, "shipment_box_items": [ { "category": null, "description": null, "ehsn": null, "ihsn": null, "quantity": null, "weight": null, "unit_price": null, "igst": null, "igst_amount": null, "number_of_box": 1, "per_box_unit": null } ] } ], "shipper_kyc": { "bank_account_number": null, "nfei": null, "is_gov": null, "lut_verified": null, "shipper_docs": [{"doc_name": null, "doc_number": null, "doc_verified": null}] }, "shipper_address": { "name": null, "email": null, "phone": null, "address": null, "city": null, "zip": null, "state": null, "country": null, "extension_number": null }, "receiver_address": { "name": null, "email": null, "phone": null, "address": null, "city": null, "zip": null, "state": null, "country": null, "extension_number": null }, "billing_address": { "name": null, "email": null, "phone": null, "address": null, "city": null, "zip": null, "state": null, "country": null, "extension_number": null }, "ior_address": { "name": null, "email": null, "phone": null, "address": null, "city": null, "zip": null, "state": null, "country": null, "extension_number": null } } Text: """ full_prompt = base_prompt + cleaned_text try: # Use Claude Sonnet 4 for faster processing and lower cost response = claude_client.messages.create( model="claude-sonnet-4-20250514", max_tokens=4000, # Increased slightly to avoid truncation temperature=0, messages=[ { "role": "user", "content": full_prompt } ] ) # Log token usage input_tokens = response.usage.input_tokens if hasattr(response, 'usage') else 0 output_tokens = response.usage.output_tokens if hasattr(response, 'usage') else 0 print(f"Token usage - Input: {input_tokens}, Output: {output_tokens}, Total: {input_tokens + output_tokens}") raw_output = response.content[0].text if response.content else "" print(f"Raw output length: {len(raw_output)} characters") # Extract and repair JSON json_str = repair_json_string(raw_output) if not json_str: raise ValueError("No valid JSON found in Claude response") # Try to parse JSON with better error handling try: parsed_data = json.loads(json_str) except json.JSONDecodeError as e: print(f"JSON parse error at position {e.pos}: {str(e)}") print(f"Problematic JSON section: {json_str[max(0, e.pos-50):e.pos+50]}") # Try to fix common issues and retry json_str = json_str.replace('\\n', ' ').replace('\\t', ' ') json_str = re.sub(r'\s+', ' ', json_str) try: parsed_data = json.loads(json_str) except json.JSONDecodeError: # If still failing, return a basic structure print("Using fallback JSON structure") parsed_data = { "shipping_method": "Xindus-Express", "terms": None, "export_reference": None, "invoice_number": None, "invoice_date": None, "purpose": None, "shipping_currency": None, "tax_type": None, "service": None, "iec": None, "shipment_references": None, "generate_invoice": None, "consignor_email": None, "market_place": None, "shipment_boxes": [], "shipper_kyc": { "bank_account_number": None, "nfei": None, "is_gov": None, "lut_verified": None, "shipper_docs": [] }, "shipper_address": { "name": None, "email": None, "phone": None, "address": None, "city": None, "zip": None, "state": None, "country": None, "extension_number": None }, "receiver_address": { "name": None, "email": None, "phone": None, "address": None, "city": None, "zip": None, "state": None, "country": None, "extension_number": None }, "billing_address": { "name": None, "email": None, "phone": None, "address": None, "city": None, "zip": None, "state": None, "country": None, "extension_number": None }, "ior_address": { "name": None, "email": None, "phone": None, "address": None, "city": None, "zip": None, "state": None, "country": None, "extension_number": None } } # Fix number_of_box field fix_number_of_box_field(parsed_data) return json.dumps(parsed_data) except Exception as e: print(f"Claude API error: {str(e)}") raise RuntimeError("Claude API error: " + str(e)) def fix_number_of_box_field(data): """Ensure number_of_box is always 1 for each individual box entry""" if 'shipment_boxes' in data and isinstance(data['shipment_boxes'], list): for box in data['shipment_boxes']: if 'shipment_box_items' in box and isinstance(box['shipment_box_items'], list): for item in box['shipment_box_items']: # Force number_of_box to be 1 for each individual box item['number_of_box'] = 1 def flatten_dict(d, parent_key='', sep='_'): items = [] for k, v in d.items(): new_key = parent_key + sep + k if parent_key else k if isinstance(v, dict): items.extend(flatten_dict(v, new_key, sep=sep).items()) elif isinstance(v, list): for i, item in enumerate(v): if isinstance(item, dict): items.extend(flatten_dict(item, new_key + '_' + str(i), sep=sep).items()) else: items.append((new_key + '_' + str(i), item)) else: items.append((new_key, v)) return dict(items) @app.route('/') def index(): return send_from_directory('static', 'index.html') @app.route('/health') def health_check(): return jsonify({'status': 'healthy', 'message': 'PDF Converter API is running.'}), 200 @app.route('/upload', methods=['POST']) def upload_file(): if 'file' not in request.files: return jsonify({'error': 'No file selected'}), 400 file = request.files['file'] if file.filename == '': return jsonify({'error': 'No file selected'}), 400 if not (file and file.filename.lower().endswith('.pdf')): return jsonify({'error': 'Invalid file format. Please upload a PDF file.'}), 400 try: with tempfile.NamedTemporaryFile(suffix='.pdf', dir=app.config['UPLOAD_FOLDER'], delete=False) as tmp_file: file.save(tmp_file.name) tmp_file_path = tmp_file.name try: print("Starting PDF text extraction...") text = extract_text_from_pdf(tmp_file_path) print(f"Extracted text length: {len(text)} characters") print("Processing with Claude...") gemini_response = process_with_gemini(text) print("Claude processing completed") try: # gemini_response is already a JSON string from process_with_gemini invoice_data = json.loads(gemini_response) print(f"Successfully parsed invoice data for: {invoice_data.get('invoice_number', 'Unknown')}") except Exception as e: print(f"JSON parsing error: {str(e)}") # Try to extract JSON manually as fallback try: start_idx = gemini_response.find('{') end_idx = gemini_response.rfind('}') + 1 if start_idx != -1 and end_idx > start_idx: json_str = repair_json_string(gemini_response[start_idx:end_idx]) invoice_data = json.loads(json_str) print("Fallback JSON parsing successful") else: raise ValueError("No valid JSON structure found") except Exception as fallback_error: print(f"Fallback parsing also failed: {str(fallback_error)}") return jsonify({ 'error': 'Failed to parse invoice data: ' + str(e), 'raw_response': gemini_response[:500] + "..." if len(gemini_response) > 500 else gemini_response }), 500 return jsonify({ 'success': True, 'data': invoice_data, 'filename': secure_filename(file.filename) }) except Exception as e: print(f"Processing error: {str(e)}") return jsonify({'error': 'Processing failed: ' + str(e)}), 500 finally: try: if os.path.exists(tmp_file_path): os.unlink(tmp_file_path) except: pass except Exception as e: print(f"Upload error: {str(e)}") return jsonify({'error': 'File upload failed: ' + str(e)}), 500 @app.route('/convert_json', methods=['POST']) def convert_to_json(): if not request.is_json: return jsonify({'error': 'Request must be JSON'}), 400 data = request.get_json() if not data or 'invoice_data' not in data: return jsonify({'error': 'Missing invoice_data in request'}), 400 invoice_data = data['invoice_data'] try: json_buffer = io.StringIO() json.dump(invoice_data, json_buffer, indent=2) json_content = json_buffer.getvalue() json_buffer.close() invoice_number = invoice_data.get('invoice_number', 'data') filename = 'invoice_' + str(invoice_number) + '.json' return jsonify({ 'success': True, 'json_data': json_content, 'filename': filename }) except Exception as e: return jsonify({'error': 'JSON conversion failed: ' + str(e)}), 500 @app.route('/convert_excel', methods=['POST']) def convert_to_excel(): if not request.is_json: return jsonify({'error': 'Request must be JSON'}), 400 data = request.get_json() if not data or 'invoice_data' not in data: return jsonify({'error': 'Missing invoice_data in request'}), 400 invoice_data = data['invoice_data'] try: wb = Workbook() ws = wb.active ws.title = "Invoice Data" flat_data = flatten_dict(invoice_data) ws.append(['Field', 'Value']) for key, value in flat_data.items(): if isinstance(value, (list, dict)): value = json.dumps(value) ws.append([key, value]) excel_buffer = io.BytesIO() wb.save(excel_buffer) excel_buffer.seek(0) excel_content = base64.b64encode(excel_buffer.getvalue()).decode('utf-8') excel_buffer.close() invoice_number = invoice_data.get('invoice_number', 'data') filename = 'invoice_' + str(invoice_number) + '.xlsx' return jsonify({ 'success': True, 'excel_data': excel_content, 'filename': filename }) except Exception as e: return jsonify({'error': 'Excel conversion failed: ' + str(e)}), 500 if __name__ == '__main__': serve( app, host='0.0.0.0', port=7860, threads=4, channel_timeout=180, cleanup_interval=30 )