import os import cv2 import json import numpy as np import torch import argparse from facenet_pytorch import MTCNN from hsemotion.facial_emotions import HSEmotionRecognizer from PIL import Image def detect_largest_face(frame, mtcnn): """Detect faces in a frame using MTCNN and return only the largest face""" bounding_boxes, probs = mtcnn.detect(frame, landmarks=False) if bounding_boxes is not None and probs is not None: # Filter boxes by probability valid_indices = probs > 0.9 bounding_boxes = bounding_boxes[valid_indices] if len(bounding_boxes) > 0: # Calculate areas of all detected faces areas = [] for bbox in bounding_boxes: x1, y1, x2, y2 = bbox[0:4] area = (x2 - x1) * (y2 - y1) areas.append(area) # Find the index of the largest face largest_index = np.argmax(areas) # Return only the largest face return bounding_boxes[largest_index:largest_index+1] return None def softmax(x): """Compute softmax values for x""" e_x = np.exp(x - np.max(x)) return e_x / e_x.sum(axis=0) def process_image_for_emotions(image_path, fer, mtcnn): """Process a single image file and extract emotions""" try: # Load image using OpenCV for consistency with video processing frame_bgr = cv2.imread(image_path) if frame_bgr is None: print(f"Error: Could not load image {image_path}") return None # Convert BGR to RGB frame = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGB) # Detect the largest face bounding_boxes = detect_largest_face(frame, mtcnn) if bounding_boxes is not None and len(bounding_boxes) > 0: # Use the largest detected face bbox = bounding_boxes[0] box = bbox.astype(int) x1, y1, x2, y2 = box[0:4] # Ensure coordinates are within image bounds x1 = max(0, x1) y1 = max(0, y1) x2 = min(frame.shape[1], x2) y2 = min(frame.shape[0], y2) # Extract face region if x2 > x1 and y2 > y1: face_img = frame[y1:y2, x1:x2, :] # Predict emotions with logits=False to get probabilities emotion, scores = fer.predict_emotions(face_img, logits=False) # For MTL models, we need to use only the emotion scores (not VA scores) if fer.is_mtl: scores = scores[:-2] # Remove last 2 elements (VA scores) # Apply softmax to ensure probabilities sum to 1 probabilities = softmax(scores) # Convert to list for processing prob_list = probabilities.tolist() if isinstance(probabilities, np.ndarray) else list(probabilities) # Create emotion probabilities dictionary using idx_to_class from fer emotion_probabilities = {} for i in range(len(fer.idx_to_class)): label = fer.idx_to_class[i] emotion_probabilities[label] = round(prob_list[i], 4) if i < len(prob_list) else 0.0 # Get emotion index dominant_emotion_index = -1 for idx, label in fer.idx_to_class.items(): if label == emotion: dominant_emotion_index = idx break return { "emotion_probabilities": emotion_probabilities, "dominant_emotion": emotion, "dominant_emotion_index": dominant_emotion_index } # If no face detected print(f"No faces detected in image {image_path}") return { "emotion_probabilities": None, "dominant_emotion": None, "dominant_emotion_index": -1 } except Exception as e: print(f"Error processing image {image_path}: {e}") return None def process_video_for_emotions(video_path, fer, mtcnn, skip_frames=5): """Process a single video file and extract average emotions""" # Open video file cap = cv2.VideoCapture(video_path) if not cap.isOpened(): print(f"Error: Could not open video {video_path}") return None # Get video properties fps = cap.get(cv2.CAP_PROP_FPS) frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) # Store emotion data (without fps and frame_count) emotion_data = { "emotion_probabilities": None, "dominant_emotion": None, "dominant_emotion_index": None } # For averaging emotions across frames all_scores = [] emotion_counts = {} frame_idx = 0 while True: ret, frame_bgr = cap.read() if not ret: break # Skip frames if needed if frame_idx % skip_frames != 0: frame_idx += 1 continue # Convert BGR to RGB frame = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGB) # Detect the largest face bounding_boxes = detect_largest_face(frame, mtcnn) if bounding_boxes is not None: # Use the largest detected face bbox = bounding_boxes[0] box = bbox.astype(int) x1, y1, x2, y2 = box[0:4] # Ensure coordinates are within frame bounds x1 = max(0, x1) y1 = max(0, y1) x2 = min(frame.shape[1], x2) y2 = min(frame.shape[0], y2) # Extract face region if x2 > x1 and y2 > y1: face_img = frame[y1:y2, x1:x2, :] # Predict emotions with logits=False to get probabilities try: emotion, scores = fer.predict_emotions(face_img, logits=False) # For MTL models, we need to use only the emotion scores (not VA scores) if fer.is_mtl: scores = scores[:-2] # Remove last 2 elements (VA scores) # Convert scores to list for JSON serialization scores_list = scores.tolist() if isinstance(scores, np.ndarray) else list(scores) # Store scores for averaging all_scores.append(scores_list) # Count emotions if emotion in emotion_counts: emotion_counts[emotion] += 1 else: emotion_counts[emotion] = 1 except Exception as e: print(f"Error predicting emotions for frame {frame_idx}: {e}") frame_idx += 1 # Print progress if frame_idx % 30 == 0: print(f"Processed {frame_idx}/{frame_count} frames") cap.release() # Calculate average emotions if we have data if all_scores: # Calculate average scores avg_scores = np.mean(np.array(all_scores), axis=0) # Apply softmax to ensure probabilities sum to 1 probabilities = softmax(avg_scores) # Convert to list for JSON serialization prob_list = probabilities.tolist() if isinstance(probabilities, np.ndarray) else list(probabilities) # Find dominant emotion dominant_emotion = max(emotion_counts, key=emotion_counts.get) # Create emotion probabilities dictionary using idx_to_class from fer emotion_probabilities = {} for i in range(len(fer.idx_to_class)): label = fer.idx_to_class[i] emotion_probabilities[label] = round(prob_list[i], 4) if i < len(prob_list) else 0.0 # Get emotion index dominant_emotion_index = -1 for idx, label in fer.idx_to_class.items(): if label == dominant_emotion: dominant_emotion_index = idx break # Store in emotion_data emotion_data["emotion_probabilities"] = emotion_probabilities emotion_data["dominant_emotion"] = dominant_emotion emotion_data["dominant_emotion_index"] = dominant_emotion_index emotion_data["emotion_distribution"] = emotion_counts return emotion_data else: print("No faces detected in the video") return None def process_metadata_file(metadata_path, output_path, model_name='enet_b0_8_best_afew', root_path=None, skip_frames=5): """Process files listed in metadata.json and add emotion information""" # Check if CUDA is available use_cuda = torch.cuda.is_available() device = 'cuda' if use_cuda else 'cpu' print(f"Using device: {device}") # Initialize MTCNN for face detection mtcnn = MTCNN(keep_all=False, post_process=False, min_face_size=40, device=device) # Initialize HSEmotionRecognizer fer = HSEmotionRecognizer(model_name=model_name, device=device) # Read metadata file with open(metadata_path, 'r') as f: metadata_list = json.load(f) # Process each file in metadata and add emotion information for metadata in metadata_list: file_path = metadata.get("file_path") file_type = metadata.get("type", "unknown") # If root_path is provided, join it with the file_path if root_path and file_path: file_path = os.path.join(root_path, file_path) if not file_path or not os.path.exists(file_path): print(f"File not found: {file_path}") # Add empty emotion data metadata["emotion"] = { "emotion_probabilities": None, "dominant_emotion": None, "dominant_emotion_index": -1 } continue print(f"Processing {file_type} file: {file_path}") try: if file_type == "video": result = process_video_for_emotions(file_path, fer, mtcnn, skip_frames) elif file_type == "image": result = process_image_for_emotions(file_path, fer, mtcnn) else: print(f"Unknown file type: {file_type}") # Add empty emotion data metadata["emotion"] = { "emotion_probabilities": None, "dominant_emotion": None, "dominant_emotion_index": -1 } continue # Add emotion data to metadata if result: metadata["emotion"] = result else: # Add empty emotion data if processing failed metadata["emotion"] = { "emotion_probabilities": None, "dominant_emotion": None, "dominant_emotion_index": -1 } except Exception as e: print(f"Error processing file {file_path}: {e}") # Add empty emotion data if processing failed metadata["emotion"] = { "emotion_probabilities": None, "dominant_emotion": None, "dominant_emotion_index": -1 } # Save updated metadata to output file with open(output_path, 'w') as f: json.dump(metadata_list, f, indent=2) print(f"Updated metadata with emotion data saved to {output_path}") print(f"Processed {len(metadata_list)} files") return output_path def main(): # Create argument parser parser = argparse.ArgumentParser(description='Extract emotions from files listed in metadata.json') parser.add_argument('--input', '-i', type=str, default='metadata.json', help='Input metadata JSON file (default: metadata.json)') parser.add_argument('--output', '-o', type=str, default='metadata_with_emotions.json', help='Output metadata JSON file with emotion data (default: metadata_with_emotions.json)') parser.add_argument('--model', '-m', type=str, default='enet_b0_8_best_afew', choices=['enet_b0_8_best_afew', 'enet_b0_8_best_vgaf', 'enet_b0_8_va_mtl', 'enet_b2_8', 'enet_b2_7'], help='Model to use for emotion recognition') parser.add_argument('--root', '-r', type=str, default=None, help='Root path to prepend to file paths (default: None)') parser.add_argument('--skip-frames', '-s', type=int, default=5, help='Number of frames to skip between emotion detections (default: 5)') # Parse arguments args = parser.parse_args() # Process files output_path = process_metadata_file(args.input, args.output, args.model, args.root, args.skip_frames) if __name__ == "__main__": main()