anomaly-detection / review_app.py
DataWhiz07's picture
Upload 15 files
892bfcc verified
#!/usr/bin/env python
# coding: utf-8
import re
import numpy as np
import joblib
import sqlite3
from datetime import datetime, timedelta
import gradio as gr
from transformers import pipeline
from better_profanity import profanity
# ---------------------- Profanity Filter ----------------------
profanity.load_censor_words()
# ---------------------- SQLite DB Connection ----------------------
db = sqlite3.connect("anomaly1.db", check_same_thread=False)
cursor = db.cursor()
# ---------------------- Load Models ----------------------
try:
spam_model = joblib.load("spam_classifier.pkl")
except Exception as e:
raise RuntimeError(f"Failed to load spam model: {e}")
try:
toxicity_model = pipeline("text-classification", model="unitary/toxic-bert")
except Exception as e:
raise RuntimeError(f"Failed to load toxicity model: {e}")
# ---------------------- Anomaly Heuristic Score ----------------------
def basic_anomaly_score(text):
score = 0
if is_low_quality(text): score += 0.3
if contains_suspicious_content(text): score += 0.3
if is_nonsensical_structure(text): score += 0.2
if len(text.split()) < 3: score += 0.2
return score
# ---------------------- Rule-Based Checks ----------------------
def is_toxic(text):
try:
result = toxicity_model(text)[0]
return result['label'].lower() == "toxic" and result['score'] > 0.7
except Exception:
return False
def is_low_quality(text):
return len(text.strip()) < 10 or text.strip().isupper() or re.search(r"(.)\1{3,}", text)
def contains_suspicious_content(text):
patterns = [r"\b\d{10}\b", r"\bcall me\b", r"\bwhatsapp\b", r"\bnumber\b", r"\bcontact\b", r"\bemail\b"]
return any(re.search(p, text.lower()) for p in patterns)
def is_nonsensical_structure(text):
patterns = [r"\bi am a\b", r"\bi will be a\b", r"\bthis is my\b"]
return any(re.search(p, text.lower()) for p in patterns)
# ---------------------- Main Prediction Function ----------------------
def predict_review(text):
text = text.strip()
if not text:
return "⚠️ Please enter a review."
flags = []
# Spam
try:
if spam_model.predict([text])[0]:
flags.append("Spam")
except Exception:
flags.append("Spam Detection Failed")
# Rule-based
if is_toxic(text):
flags.append("Toxic")
if is_low_quality(text):
flags.append("Low Quality")
if contains_suspicious_content(text):
flags.append("Suspicious")
if is_nonsensical_structure(text):
flags.append("Nonsensical")
if len(text.split()) < 3:
flags.append("Too Short")
score = basic_anomaly_score(text)
if score >= 0.5:
flags.append("Anomalous")
prediction = ", ".join(flags) if flags else "Normal"
now = datetime.now()
is_anomaly = 1 if "Anomalous" in flags else 0
# ---------------------- Save in DB ----------------------
try:
cursor.execute("SELECT user_id FROM users ORDER BY user_id DESC LIMIT 1")
result = cursor.fetchone()
user_id = result[0] if result else 1
vendor_id = 1 # Static for now
cursor.execute("""
INSERT INTO reviews (user_id, vendor_id, review_text, timestamp, is_anomaly, prediction, review)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", (user_id, vendor_id, text, now, is_anomaly, prediction, text))
db.commit()
if is_anomaly:
suspend_until = now + timedelta(hours=24)
cursor.execute("UPDATE users SET suspended_until = ? WHERE user_id = ?", (suspend_until, user_id))
db.commit()
return f"❌ {prediction}\nUser temporarily suspended until {suspend_until.strftime('%Y-%m-%d %H:%M:%S')}."
return f"✅ Prediction: {prediction}"
except Exception as e:
return f"⚠️ Database Error: {str(e)}"
interface = gr.Interface(
fn=predict_review,
inputs=gr.Textbox(lines=4, placeholder="Type a product review here...", label="Review Text"),
outputs=gr.Textbox(label="Prediction"),
title="🛍️ Byte Bazar Review Anomaly Detector",
description="Enter a vendor or product review to check if it's anomalous or normal. This system uses spam detection, toxicity check, and custom rules to identify suspicious content."
)
# ---------------------- Launch App ----------------------
interface.launch()