#!/usr/bin/env python # coding: utf-8 import re import numpy as np import joblib import sqlite3 from datetime import datetime, timedelta import gradio as gr from transformers import pipeline from better_profanity import profanity # ---------------------- Profanity Filter ---------------------- profanity.load_censor_words() # ---------------------- SQLite DB Connection ---------------------- db = sqlite3.connect("anomaly1.db", check_same_thread=False) cursor = db.cursor() # ---------------------- Load Models ---------------------- try: spam_model = joblib.load("spam_classifier.pkl") except Exception as e: raise RuntimeError(f"Failed to load spam model: {e}") try: toxicity_model = pipeline("text-classification", model="unitary/toxic-bert") except Exception as e: raise RuntimeError(f"Failed to load toxicity model: {e}") # ---------------------- Anomaly Heuristic Score ---------------------- def basic_anomaly_score(text): score = 0 if is_low_quality(text): score += 0.3 if contains_suspicious_content(text): score += 0.3 if is_nonsensical_structure(text): score += 0.2 if len(text.split()) < 3: score += 0.2 return score # ---------------------- Rule-Based Checks ---------------------- def is_toxic(text): try: result = toxicity_model(text)[0] return result['label'].lower() == "toxic" and result['score'] > 0.7 except Exception: return False def is_low_quality(text): return len(text.strip()) < 10 or text.strip().isupper() or re.search(r"(.)\1{3,}", text) def contains_suspicious_content(text): patterns = [r"\b\d{10}\b", r"\bcall me\b", r"\bwhatsapp\b", r"\bnumber\b", r"\bcontact\b", r"\bemail\b"] return any(re.search(p, text.lower()) for p in patterns) def is_nonsensical_structure(text): patterns = [r"\bi am a\b", r"\bi will be a\b", r"\bthis is my\b"] return any(re.search(p, text.lower()) for p in patterns) # ---------------------- Main Prediction Function ---------------------- def predict_review(text): text = text.strip() if not text: return "⚠️ Please enter a review." flags = [] # Spam try: if spam_model.predict([text])[0]: flags.append("Spam") except Exception: flags.append("Spam Detection Failed") # Rule-based if is_toxic(text): flags.append("Toxic") if is_low_quality(text): flags.append("Low Quality") if contains_suspicious_content(text): flags.append("Suspicious") if is_nonsensical_structure(text): flags.append("Nonsensical") if len(text.split()) < 3: flags.append("Too Short") score = basic_anomaly_score(text) if score >= 0.5: flags.append("Anomalous") prediction = ", ".join(flags) if flags else "Normal" now = datetime.now() is_anomaly = 1 if "Anomalous" in flags else 0 # ---------------------- Save in DB ---------------------- try: cursor.execute("SELECT user_id FROM users ORDER BY user_id DESC LIMIT 1") result = cursor.fetchone() user_id = result[0] if result else 1 vendor_id = 1 # Static for now cursor.execute(""" INSERT INTO reviews (user_id, vendor_id, review_text, timestamp, is_anomaly, prediction, review) VALUES (?, ?, ?, ?, ?, ?, ?) """, (user_id, vendor_id, text, now, is_anomaly, prediction, text)) db.commit() if is_anomaly: suspend_until = now + timedelta(hours=24) cursor.execute("UPDATE users SET suspended_until = ? WHERE user_id = ?", (suspend_until, user_id)) db.commit() return f"❌ {prediction}\nUser temporarily suspended until {suspend_until.strftime('%Y-%m-%d %H:%M:%S')}." return f"✅ Prediction: {prediction}" except Exception as e: return f"⚠️ Database Error: {str(e)}" interface = gr.Interface( fn=predict_review, inputs=gr.Textbox(lines=4, placeholder="Type a product review here...", label="Review Text"), outputs=gr.Textbox(label="Prediction"), title="🛍️ Byte Bazar Review Anomaly Detector", description="Enter a vendor or product review to check if it's anomalous or normal. This system uses spam detection, toxicity check, and custom rules to identify suspicious content." ) # ---------------------- Launch App ---------------------- interface.launch()