Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python | |
| # coding: utf-8 | |
| import re | |
| import numpy as np | |
| import joblib | |
| import sqlite3 | |
| from datetime import datetime, timedelta | |
| import gradio as gr | |
| from transformers import pipeline | |
| from better_profanity import profanity | |
| # ---------------------- Profanity Filter ---------------------- | |
| profanity.load_censor_words() | |
| # ---------------------- SQLite DB Connection ---------------------- | |
| db = sqlite3.connect("anomaly1.db", check_same_thread=False) | |
| cursor = db.cursor() | |
| # ---------------------- Load Models ---------------------- | |
| try: | |
| spam_model = joblib.load("spam_classifier.pkl") | |
| except Exception as e: | |
| raise RuntimeError(f"Failed to load spam model: {e}") | |
| try: | |
| toxicity_model = pipeline("text-classification", model="unitary/toxic-bert") | |
| except Exception as e: | |
| raise RuntimeError(f"Failed to load toxicity model: {e}") | |
| # ---------------------- Anomaly Heuristic Score ---------------------- | |
| def basic_anomaly_score(text): | |
| score = 0 | |
| if is_low_quality(text): score += 0.3 | |
| if contains_suspicious_content(text): score += 0.3 | |
| if is_nonsensical_structure(text): score += 0.2 | |
| if len(text.split()) < 3: score += 0.2 | |
| return score | |
| # ---------------------- Rule-Based Checks ---------------------- | |
| def is_toxic(text): | |
| try: | |
| result = toxicity_model(text)[0] | |
| return result['label'].lower() == "toxic" and result['score'] > 0.7 | |
| except Exception: | |
| return False | |
| def is_low_quality(text): | |
| return len(text.strip()) < 10 or text.strip().isupper() or re.search(r"(.)\1{3,}", text) | |
| def contains_suspicious_content(text): | |
| patterns = [r"\b\d{10}\b", r"\bcall me\b", r"\bwhatsapp\b", r"\bnumber\b", r"\bcontact\b", r"\bemail\b"] | |
| return any(re.search(p, text.lower()) for p in patterns) | |
| def is_nonsensical_structure(text): | |
| patterns = [r"\bi am a\b", r"\bi will be a\b", r"\bthis is my\b"] | |
| return any(re.search(p, text.lower()) for p in patterns) | |
| # ---------------------- Main Prediction Function ---------------------- | |
| def predict_review(text): | |
| text = text.strip() | |
| if not text: | |
| return "⚠️ Please enter a review." | |
| flags = [] | |
| # Spam | |
| try: | |
| if spam_model.predict([text])[0]: | |
| flags.append("Spam") | |
| except Exception: | |
| flags.append("Spam Detection Failed") | |
| # Rule-based | |
| if is_toxic(text): | |
| flags.append("Toxic") | |
| if is_low_quality(text): | |
| flags.append("Low Quality") | |
| if contains_suspicious_content(text): | |
| flags.append("Suspicious") | |
| if is_nonsensical_structure(text): | |
| flags.append("Nonsensical") | |
| if len(text.split()) < 3: | |
| flags.append("Too Short") | |
| score = basic_anomaly_score(text) | |
| if score >= 0.5: | |
| flags.append("Anomalous") | |
| prediction = ", ".join(flags) if flags else "Normal" | |
| now = datetime.now() | |
| is_anomaly = 1 if "Anomalous" in flags else 0 | |
| # ---------------------- Save in DB ---------------------- | |
| try: | |
| cursor.execute("SELECT user_id FROM users ORDER BY user_id DESC LIMIT 1") | |
| result = cursor.fetchone() | |
| user_id = result[0] if result else 1 | |
| vendor_id = 1 # Static for now | |
| cursor.execute(""" | |
| INSERT INTO reviews (user_id, vendor_id, review_text, timestamp, is_anomaly, prediction, review) | |
| VALUES (?, ?, ?, ?, ?, ?, ?) | |
| """, (user_id, vendor_id, text, now, is_anomaly, prediction, text)) | |
| db.commit() | |
| if is_anomaly: | |
| suspend_until = now + timedelta(hours=24) | |
| cursor.execute("UPDATE users SET suspended_until = ? WHERE user_id = ?", (suspend_until, user_id)) | |
| db.commit() | |
| return f"❌ {prediction}\nUser temporarily suspended until {suspend_until.strftime('%Y-%m-%d %H:%M:%S')}." | |
| return f"✅ Prediction: {prediction}" | |
| except Exception as e: | |
| return f"⚠️ Database Error: {str(e)}" | |
| interface = gr.Interface( | |
| fn=predict_review, | |
| inputs=gr.Textbox(lines=4, placeholder="Type a product review here...", label="Review Text"), | |
| outputs=gr.Textbox(label="Prediction"), | |
| title="🛍️ Byte Bazar Review Anomaly Detector", | |
| description="Enter a vendor or product review to check if it's anomalous or normal. This system uses spam detection, toxicity check, and custom rules to identify suspicious content." | |
| ) | |
| # ---------------------- Launch App ---------------------- | |
| interface.launch() |