--- license: cc-by-nc-sa-4.0 language: - en - tr tags: - eeg - brain - deeplearning - artificialintelligence - ai - model - emotions - neuroscience - neura - neuro - bci - health pipeline_tag: time-series-forecasting library_name: keras --- # bai-6 Emotion (TR) ## Tanım bai-6 Emotion modeli, EEG ve iEEG tarafından toplanan veriler ile eğitilen bir detaylı duygu sınıflandırma modelidir. Model, 6 kanallı bir EEG cihazıyla çalışabilir durumdadır. ## Hedef Kitle bai modelleri, herkes için tasarlanmıştır. Açık kaynak versiyonları herkes tarafından kullanılabilir. ## Sınıflar - Sakin - Üzgün - Kızgın - Mutlu ## Neuramax Neuramax-6 Gen1 ile tam uyumlu çalışmaktadır. ------------------------------------------------------------------------- # bai-6 Emotion (EN) ## Definition The bai-6 Emotion model is a detailed emotion classification model trained with data collected by EEG and iEEG. The model can work with a 6-channel EEG device. ## Target Audience bai models are designed for everyone. Open source versions are available for everyone to use. ## Classes - Calm - Sad - Angry - Happy ## Neuramax Fully compatible with Neuramax-6 Gen1. -------------
bai-6 Emotion v1 # bai-6 Emotion v1 Yapısı / Structure ```bash "model_summary": "Model: Total params: 5,046 (19.71 KB) Trainable params: 5,044 (19.70 KB) Non-trainable params: 0 (0.00 B) Optimizer params: 2 (12.00 B)", "layers": [ { "name": "dense", "trainable": true, "count_params": 2368 }, { "name": "dropout", "trainable": true, "count_params": 0 }, { "name": "dense_1", "trainable": true, "count_params": 2080 }, { "name": "dropout_1", "trainable": true, "count_params": 0 }, { "name": "dense_2", "trainable": true, "count_params": 528 }, { "name": "dense_3", "trainable": true, "count_params": 68 } ] ``` # Kullanım / Usage ## 1. Sentetik Veri ile / With Synthetic Data ```python import numpy as np import matplotlib.pyplot as plt import mne from matplotlib.animation import FuncAnimation from tensorflow.keras.models import load_model import joblib class EEGMonitor: def __init__(self, model_path, scaler_path): self.model = load_model(model_path) self.scaler = joblib.load(scaler_path) self.ch_names = ['T7', 'C3', 'Cz', 'C4', 'T8', 'Pz'] self.fs = 1000 # Örnekleme frekansı / Sampling frequency self.buffer_size = 1000 # 1 saniyelik buffer / 1 second buffer self.raw_buffer = np.zeros((6, self.buffer_size)) self.feature_contributions = {ch: [] for ch in self.ch_names} # Elektrot pozisyonları (10-20 sistemi) / Electrode positions (10-20 system) self.montage = mne.channels.make_standard_montage('standard_1020') self.fig = plt.figure(figsize=(15, 10)) self.setup_plots() def setup_plots(self): self.ax1 = self.fig.add_subplot(223) self.ax1.set_title("Canlı EEG Sinyalleri / Live EEG Signals") self.ax1.set_xlabel("Zaman (ms) / Time (ms)") self.ax1.set_ylabel("Amplitüd (µV) / Amplitude (µV)") self.ax2 = self.fig.add_subplot(221) self.ax2.set_title("Elektrot Konumları / Electrode Locations") self.ax3 = self.fig.add_subplot(224) self.ax3.set_title("Elektrot Katkı Oranları / Electrode Contribution Ratios") self.ax3.set_ylim(0, 1) self.ax4 = self.fig.add_subplot(222) self.ax4.set_title("Duygu Tahmin Olasılıkları / Emotion Prediction Probabilities") self.ax4.set_ylim(0, 1) plt.tight_layout() def generate_synthetic_data(self): """Sentetik EEG verisi üretir (6 kanal x 1000 örnek) / Generates synthetic EEG data (6 channels x 1000 samples)""" noise = np.random.normal(0, 5e-6, (6, self.buffer_size)) t = np.linspace(0, 1, self.buffer_size) noise[1] += 2e-6 * np.sin(2 * np.pi * 10 * t) return noise def update_buffer(self, new_data): """Buffer'ı kaydırmalı olarak günceller / Updates the buffer with new data by rolling""" self.raw_buffer = np.roll(self.raw_buffer, -new_data.shape[1], axis=1) self.raw_buffer[:, -new_data.shape[1]:] = new_data def calculate_channel_contributions(self, features): """Her elektrotun tahmindeki katkısını hesaplar / Calculates the contribution of each electrode to the prediction""" contributions = np.zeros(6) for i in range(6): channel_weights = self.model.layers[0].get_weights()[0][i * 6:(i + 1) * 6] contributions[i] = np.mean(np.abs(channel_weights)) return contributions / np.sum(contributions) def update_plot(self, frame): new_data = self.generate_synthetic_data() self.update_buffer(new_data) features = self.extract_features(self.raw_buffer) scaled_features = self.scaler.transform([features]) probs = self.model.predict(scaled_features, verbose=0)[0] contributions = self.calculate_channel_contributions(features) self.update_eeg_plot() self.update_topomap() self.update_contributions(contributions) self.update_probabilities(probs) def update_eeg_plot(self): self.ax1.clear() for i in range(6): offset = i * 20e-6 self.ax1.plot(self.raw_buffer[i] + offset, label=self.ch_names[i]) self.ax1.legend(loc='upper right') def update_topomap(self): self.ax2.clear() info = mne.create_info(self.ch_names, self.fs, 'eeg') evoked = mne.EvokedArray(self.raw_buffer.mean(axis=1, keepdims=True), info) evoked.set_montage(self.montage) mne.viz.plot_topomap(evoked.data[:, 0], evoked.info, axes=self.ax2, show=False) def update_contributions(self, contributions): self.ax3.clear() self.ax3.barh(self.ch_names, contributions, color='skyblue') for i, v in enumerate(contributions): self.ax3.text(v, i, f"{v * 100:.1f}%", color='black') def update_probabilities(self, probs): emotions = ['Mutlu / Happy', 'Kızgın / Angry', 'Üzgün / Sad', 'Sakin / Calm'] self.ax4.clear() bars = self.ax4.barh(emotions, probs, color=['green', 'red', 'blue', 'purple']) for bar in bars: width = bar.get_width() self.ax4.text(width, bar.get_y() + 0.2, f"{width * 100:.1f}%", ha='left') def extract_features(self, data): """6 kanal için özellik çıkarımı / Feature extraction for 6 channels""" features = [] for channel in data: features.extend([ np.mean(channel), np.std(channel), np.ptp(channel), np.sum(np.abs(np.diff(channel))), np.median(channel), np.percentile(np.abs(channel), 95) ]) return np.array(features) def start_monitoring(self): anim = FuncAnimation(self.fig, self.update_plot, interval=100) plt.show() if __name__ == "__main__": monitor = EEGMonitor( model_path='model/path/bai-6 Emotion.h5', scaler_path='scaler/path/bai-6_scaler.save' ) monitor.start_monitoring() ``` ## 2. Veri Seti ile / With Dataset ```python import numpy as np import matplotlib.pyplot as plt import mne from matplotlib.animation import FuncAnimation from tensorflow.keras.models import load_model import joblib import os class EEGMonitor: def __init__(self, model_path, scaler_path, data_path): self.model = load_model(model_path) self.scaler = joblib.load(scaler_path) self.data_path = data_path self.ch_names = ['T7', 'C3', 'Cz', 'C4', 'T8', 'Pz'] self.fs = 1000 # Örnekleme frekansı / Sampling frequency self.buffer_size = 1000 # 1 saniyelik buffer / 1 second buffer self.raw_buffer = np.zeros((6, self.buffer_size)) self.feature_contributions = {ch: [] for ch in self.ch_names} # Elektrot pozisyonları / Electrode positions (10-20 system) self.montage = mne.channels.make_standard_montage('standard_1020') self.fig = plt.figure(figsize=(15, 10)) self.setup_plots() self.dataset = self.load_dataset(self.data_path) self.current_index = 0 def setup_plots(self): self.ax1 = self.fig.add_subplot(223) self.ax1.set_title("Canlı EEG Sinyalleri / Live EEG Signals") self.ax1.set_xlabel("Zaman (ms) / Time (ms)") self.ax1.set_ylabel("Amplitüd (µV) / Amplitude (µV)") self.ax2 = self.fig.add_subplot(221) self.ax2.set_title("Elektrot Konumları / Electrode Locations") self.ax3 = self.fig.add_subplot(224) self.ax3.set_title("Elektrot Katkı Oranları / Electrode Contribution Ratios") self.ax3.set_ylim(0, 1) self.ax4 = self.fig.add_subplot(222) self.ax4.set_title("Duygu Tahmin Olasılıkları / Emotion Prediction Probabilities") self.ax4.set_ylim(0, 1) plt.tight_layout() def load_dataset(self, path): """Desteklenen veri formatları: .npy (numpy), .csv / Supported data formats: .npy (numpy), .csv""" if not os.path.exists(path): raise FileNotFoundError(f"Veri seti bulunamadı / Not found dataset: {path}") if path.endswith(".npy"): data = np.load(path) elif path.endswith(".csv"): data = np.loadtxt(path, delimiter=',') else: raise ValueError("Desteklenmeyen dosya formatı. Yalnızca .npy veya .csv kullanılabilir. / Unsupported file format. Only .npy or .csv can be used.") # Transpose gerekebilir: (n_channels, n_samples) / Transpose may be needed: (n_channels, n_samples) if data.shape[0] != 6: data = data.T return data def get_next_chunk(self): """Veri setinden buffer_size uzunluğunda bir parça alır / Gets a chunk of length buffer_size from the dataset""" if self.current_index + self.buffer_size >= self.dataset.shape[1]: self.current_index = 0 chunk = self.dataset[:, self.current_index:self.current_index + self.buffer_size] self.current_index += self.buffer_size return chunk def update_buffer(self, new_data): self.raw_buffer = np.roll(self.raw_buffer, -new_data.shape[1], axis=1) self.raw_buffer[:, -new_data.shape[1]:] = new_data def calculate_channel_contributions(self, features): contributions = np.zeros(6) for i in range(6): channel_weights = self.model.layers[0].get_weights()[0][i * 6:(i + 1) * 6] contributions[i] = np.mean(np.abs(channel_weights)) return contributions / np.sum(contributions) def update_plot(self, frame): new_data = self.get_next_chunk() self.update_buffer(new_data) features = self.extract_features(self.raw_buffer) scaled_features = self.scaler.transform([features]) probs = self.model.predict(scaled_features, verbose=0)[0] contributions = self.calculate_channel_contributions(features) self.update_eeg_plot() self.update_topomap() self.update_contributions(contributions) self.update_probabilities(probs) def update_eeg_plot(self): self.ax1.clear() for i in range(6): offset = i * 20e-6 self.ax1.plot(self.raw_buffer[i] + offset, label=self.ch_names[i]) self.ax1.legend(loc='upper right') def update_topomap(self): self.ax2.clear() info = mne.create_info(self.ch_names, self.fs, 'eeg') evoked = mne.EvokedArray(self.raw_buffer.mean(axis=1, keepdims=True), info) evoked.set_montage(self.montage) mne.viz.plot_topomap(evoked.data[:, 0], evoked.info, axes=self.ax2, show=False) def update_contributions(self, contributions): self.ax3.clear() self.ax3.barh(self.ch_names, contributions, color='skyblue') for i, v in enumerate(contributions): self.ax3.text(v, i, f"{v * 100:.1f}%", color='black') def update_probabilities(self, probs): emotions = ['Mutlu / Happy', 'Kızgın / Angry', 'Üzgün / Sad', 'Sakin / Calm'] self.ax4.clear() bars = self.ax4.barh(emotions, probs, color=['green', 'red', 'blue', 'purple']) for bar in bars: width = bar.get_width() self.ax4.text(width, bar.get_y() + 0.2, f"{width * 100:.1f}%", ha='left') def extract_features(self, data): features = [] for channel in data: features.extend([ np.mean(channel), np.std(channel), np.ptp(channel), np.sum(np.abs(np.diff(channel))), np.median(channel), np.percentile(np.abs(channel), 95) ]) return np.array(features) def start_monitoring(self): anim = FuncAnimation(self.fig, self.update_plot, interval=1000) plt.show() if __name__ == "__main__": monitor = EEGMonitor( model_path="model/path/bai-6 Emotion.h5", scaler_path="scaler/path/bai-6_scaler.save", data_path="data/path/npy/or/csv" ) monitor.start_monitoring() ```
bai-6 Emotion v2 # bai-6 Emotion v2 Yapısı / Structure |Layer (type) | Output Shape | Param # | | --- | --- | --- | | dense_4 (Dense) | (None, 128) | 4,736 | | batch_normalization_2 | (None, 128) | 512 | | dropout_3 (Dropout) | (None, 128) | 0 | | dense_5 (Dense) | (None, 64) | 8,256 | | batch_normalization_3 | (None, 64) | 256 | | dropout_4 (Dropout) | (None, 64) | 0 | | dense_6 (Dense) | (None, 32) | 2,080 | | dropout_5 (Dropout) | (None, 32) | 0 | | dense_7 (Dense) | (None, 4) | 132 | ### Total params: 15,972 (62.39 KB) **Trainable params: 15,588 (60.89 KB)** **Non-trainable params: 384 (1.50 KB)** # Kullanım / Usage ## Sentetik Veri ile / With Synthetic Data ```python import numpy as np import joblib import time import matplotlib.pyplot as plt from matplotlib.animation import FuncAnimation from tensorflow.keras.models import load_model from datetime import datetime import mne import warnings warnings.filterwarnings('ignore') class EEGEmotionMonitorOptimized: def __init__(self, model_path, scaler_path, selector_path=None, pca_path=None): self.emotion_labels = { 0: "Mutlu (Happy)", 1: "Kızgın (Angry)", 2: "Üzgün (Sad)", 3: "Sakin (Calm)" } self.emotion_colors = ['#FFD700', '#FF4444', '#4169E1', '#32CD32'] # Kanal isimleri ve parametreler / Channel names and parameters self.ch_names = ['T7', 'C3', 'Cz', 'C4', 'T8', 'Pz'] self.fs = 128 # Örnekleme hızı / Sampling rate self.buffer_size = 640 self.update_interval = 200 # Buffer'ları başlat / Initialize buffers self.raw_buffer = np.zeros((6, self.buffer_size)) self.prediction_history = [] self.confidence_history = [] self.time_history = [] self.max_history = 30 # Performance metrics tracking self.performance_metrics = { 'total_predictions': 0, 'high_confidence_predictions': 0, # >0.8 confidence 'low_confidence_predictions': 0, # <0.5 confidence 'prediction_times': [], # Processing time per prediction 'emotion_transitions': 0, # Count of emotion changes 'stability_score': 0.0, # How stable predictions are 'average_confidence': 0.0, 'confidence_trend': [], # Last 10 confidence values for trend analysis 'processing_fps': 0.0, # Processing speed 'last_prediction': None } self.metrics_text = "" self.start_time = None try: self.model = load_model(model_path) self.scaler = joblib.load(scaler_path) self.selector = None self.pca = None if selector_path: try: self.selector = joblib.load(selector_path) print("Feature selector loaded") except: print("Feature selector not found, using raw features") if pca_path: try: self.pca = joblib.load(pca_path) print("PCA reducer loaded") except: print("PCA reducer not found, skipping dimensionality reduction") print("Model and preprocessors successfully loaded!") print(f"Model input shape: {self.model.input_shape}") print(f"Output classes: {len(self.emotion_labels)}") # Elektrot pozisyonları (10-20 sistemi) / Electrode positions (10-20 system) self.montage = mne.channels.make_standard_montage('standard_1020') except Exception as e: print(f"Model/preprocessor loading error: {e}") raise self.fig = plt.figure(figsize=(14, 8)) self.fig.suptitle('EEG Duygu Tanıma Sistemi / EEG Emotion Analysis System', fontsize=16, fontweight='bold') self.setup_plots() self.animation = None self.is_running = False def setup_plots(self): """4 panelli görselleştirme arayüzünü hazırla (with performance metrics) / Setup 4-panel visualization interface (with performance metrics)""" self.ax1 = self.fig.add_subplot(221) self.ax1.set_title("Live EEG Signals", fontsize=10) self.ax1.set_xlabel("Time (samples)", fontsize=9) self.ax1.set_ylabel("Amplitude (µV)", fontsize=9) self.ax1.grid(True, alpha=0.3) self.ax2 = self.fig.add_subplot(222) self.ax2.set_title("Emotion Probabilities", fontsize=10) self.ax2.set_xlim(0, 1) self.ax3 = self.fig.add_subplot(223) self.ax3.set_title("Performance Metrics & Confidence Trend", fontsize=10) self.ax4 = self.fig.add_subplot(224) self.ax4.set_title("Electrode Contributions", fontsize=10) self.ax4.set_xlim(0, 1) plt.tight_layout(pad=1.0) def generate_realistic_eeg_signal(self, emotion_bias=None): noise = np.random.normal(0, 3e-6, (6, self.buffer_size)) t = np.linspace(0, self.buffer_size/self.fs, self.buffer_size) alpha_freq = np.random.uniform(8, 12) # Alpha dominant beta_freq = np.random.uniform(15, 25) # Beta for ch in range(6): if emotion_bias == 0: # Happy - higher beta beta_amp = np.random.uniform(4e-6, 6e-6) alpha_amp = np.random.uniform(2e-6, 3e-6) elif emotion_bias == 1: # Angry - very high beta beta_amp = np.random.uniform(5e-6, 7e-6) alpha_amp = np.random.uniform(1e-6, 2e-6) elif emotion_bias == 2: # Sad - lower activity beta_amp = np.random.uniform(1e-6, 2e-6) alpha_amp = np.random.uniform(3e-6, 5e-6) elif emotion_bias == 3: # Calm - high alpha beta_amp = np.random.uniform(1e-6, 3e-6) alpha_amp = np.random.uniform(4e-6, 6e-6) else: # Random beta_amp = np.random.uniform(2e-6, 4e-6) alpha_amp = np.random.uniform(2e-6, 4e-6) noise[ch] += alpha_amp * np.sin(2 * np.pi * alpha_freq * t + np.random.random() * 2 * np.pi) noise[ch] += beta_amp * np.sin(2 * np.pi * beta_freq * t + np.random.random() * 2 * np.pi) return noise.astype(np.float32) def update_buffer(self, new_data): samples_to_add = min(new_data.shape[1], self.buffer_size // 4) self.raw_buffer = np.roll(self.raw_buffer, -samples_to_add, axis=1) self.raw_buffer[:, -samples_to_add:] = new_data[:, :samples_to_add] def extract_lightweight_features(self, signal_data): features = [] for channel_data in signal_data: time_features = [ np.mean(channel_data), np.std(channel_data), np.ptp(channel_data), np.median(channel_data), np.mean(np.abs(channel_data)), np.sqrt(np.mean(channel_data**2)) ] try: fft_vals = np.abs(np.fft.rfft(channel_data[::4])) freqs = np.fft.rfftfreq(len(channel_data)//4, 4/self.fs) delta_power = np.sum(fft_vals[(freqs >= 0.5) & (freqs <= 4)]) theta_power = np.sum(fft_vals[(freqs >= 4) & (freqs <= 8)]) alpha_power = np.sum(fft_vals[(freqs >= 8) & (freqs <= 13)]) beta_power = np.sum(fft_vals[(freqs >= 13) & (freqs <= 30)]) total_power = np.sum(fft_vals) + 1e-10 freq_features = [ delta_power / total_power, theta_power / total_power, alpha_power / total_power, beta_power / total_power ] except: freq_features = [0.25, 0.25, 0.25, 0.25] nonlinear_features = [ np.std(np.diff(channel_data)) / (np.std(channel_data) + 1e-10), np.mean(np.abs(np.diff(channel_data))) ] channel_features = time_features + freq_features + nonlinear_features features.extend(channel_features) return np.array(features, dtype=np.float32) def calculate_channel_contributions(self, signal_data): contributions = np.zeros(6) for i in range(6): contributions[i] = np.sqrt(np.mean(signal_data[i]**2)) total = np.sum(contributions) + 1e-10 return contributions / total def update_performance_metrics(self, predicted_class, confidence, processing_time): metrics = self.performance_metrics metrics['total_predictions'] += 1 if confidence > 0.8: metrics['high_confidence_predictions'] += 1 elif confidence < 0.5: metrics['low_confidence_predictions'] += 1 metrics['prediction_times'].append(processing_time) if len(metrics['prediction_times']) > 50: metrics['prediction_times'].pop(0) if metrics['prediction_times']: avg_time = np.mean(metrics['prediction_times']) metrics['processing_fps'] = 1.0 / max(avg_time, 0.001) if metrics['last_prediction'] is not None and metrics['last_prediction'] != predicted_class: metrics['emotion_transitions'] += 1 metrics['last_prediction'] = predicted_class metrics['confidence_trend'].append(float(confidence)) if len(metrics['confidence_trend']) > 10: metrics['confidence_trend'].pop(0) if self.confidence_history: metrics['average_confidence'] = np.mean(self.confidence_history) if metrics['total_predictions'] > 1: transition_rate = metrics['emotion_transitions'] / metrics['total_predictions'] metrics['stability_score'] = max(0, 1.0 - transition_rate) def update_plot(self, frame): if not self.is_running: return start_time = time.time() if frame % 2 == 0: if np.random.random() < 0.2: emotion_bias = np.random.randint(0, 4) else: emotion_bias = None new_samples = self.buffer_size // 8 new_data = self.generate_realistic_eeg_signal(emotion_bias)[:, :new_samples] self.update_buffer(new_data) prediction_start = time.time() features = self.extract_lightweight_features(self.raw_buffer) try: scaled_features = self.scaler.transform([features]) if self.selector is not None: scaled_features = self.selector.transform(scaled_features) if self.pca is not None: scaled_features = self.pca.transform(scaled_features) probs = self.model.predict(scaled_features, verbose=0)[0] predicted_class = np.argmax(probs) confidence = np.max(probs) except Exception as e: print(f"Prediction error: {e}") probs = np.array([0.25, 0.25, 0.25, 0.25]) predicted_class = 0 confidence = 0.25 prediction_time = time.time() - prediction_start self.update_performance_metrics(predicted_class, confidence, prediction_time) self.prediction_history.append(predicted_class) self.confidence_history.append(confidence) self.time_history.append(datetime.now()) if len(self.prediction_history) > self.max_history: self.prediction_history.pop(0) self.confidence_history.pop(0) self.time_history.pop(0) contributions = self.calculate_channel_contributions(self.raw_buffer) self.update_eeg_plot() self.update_probabilities(probs) self.update_performance_plot() self.update_contributions(contributions) emotion_name = self.emotion_labels[predicted_class] metrics = self.performance_metrics elapsed_time = time.time() - self.start_time if self.start_time else 0 print(f"\r{datetime.now().strftime('%H:%M:%S')} | " f"Emotion: {emotion_name} | " f"Conf: {confidence:.3f} | " f"FPS: {metrics['processing_fps']:.1f} | " f"Stab: {metrics['stability_score']:.2f} | " f"Total: {metrics['total_predictions']} | " f"Time: {elapsed_time:.0f}s", end='') def update_eeg_plot(self): self.ax1.clear() colors = plt.cm.tab10(np.linspace(0, 1, 6)) display_samples = min(300, self.buffer_size) # Show fewer samples for performance for i in range(6): offset = i * 20e-6 signal = self.raw_buffer[i, -display_samples:] + offset self.ax1.plot(signal, label=self.ch_names[i], color=colors[i], linewidth=1.0, alpha=0.8) self.ax1.set_title("Live EEG Signals", fontsize=12) self.ax1.set_xlabel("Time (samples)") self.ax1.set_ylabel("Amplitude (µV)") self.ax1.legend(loc='upper right', fontsize=8) self.ax1.grid(True, alpha=0.3) def update_performance_plot(self): self.ax3.clear() metrics = self.performance_metrics if metrics['total_predictions'] > 0: high_conf_pct = (metrics['high_confidence_predictions'] / metrics['total_predictions']) * 100 low_conf_pct = (metrics['low_confidence_predictions'] / metrics['total_predictions']) * 100 metrics_text = f"""PERFORMANCE METRICS Total Predictions: {metrics['total_predictions']} Average Confidence: {metrics['average_confidence']:.3f} High Confidence (>0.8): {high_conf_pct:.1f}% Low Confidence (<0.5): {low_conf_pct:.1f}% Processing Speed: {metrics['processing_fps']:.1f} FPS Stability Score: {metrics['stability_score']:.3f} Emotion Transitions: {metrics['emotion_transitions']} Model Accuracy: {high_conf_pct:.1f}% Response Time: {np.mean(metrics['prediction_times'])*1000:.1f}ms""" self.ax3.text(0.02, 0.98, metrics_text, transform=self.ax3.transAxes, fontsize=8, verticalalignment='top', fontfamily='monospace', bbox=dict(boxstyle="round,pad=0.3", facecolor="lightblue", alpha=0.7)) if len(metrics['confidence_trend']) > 1: trend_x = np.arange(len(metrics['confidence_trend'])) trend_data = np.array(metrics['confidence_trend'], dtype=np.float64) self.ax3.plot(trend_x + 0.6, trend_data * 0.4 + 0.1, 'g-o', markersize=3, linewidth=2, label='Confidence Trend') # Add trend analysis if len(metrics['confidence_trend']) > 3: try: x_data = np.array(range(len(trend_data[-5:])), dtype=np.float64) y_data = np.array(trend_data[-5:], dtype=np.float64) recent_trend = np.polyfit(x_data, y_data, 1)[0] trend_direction = "↗" if recent_trend > 0.01 else "↘" if recent_trend < -0.01 else "→" self.ax3.text(0.7, 0.9, f"Trend: {trend_direction}", transform=self.ax3.transAxes, fontsize=10, fontweight='bold') except: # Fallback if polyfit fails self.ax3.text(0.7, 0.9, f"Trend: →", transform=self.ax3.transAxes, fontsize=10, fontweight='bold') self.ax3.set_xlim(0, 1) self.ax3.set_ylim(0, 1) self.ax3.set_title("Performance Metrics & Confidence Trend", fontsize=10) if metrics['average_confidence'] > 0.8: title_color = 'green' elif metrics['average_confidence'] > 0.6: title_color = 'orange' else: title_color = 'red' self.ax3.title.set_color(title_color) def update_contributions(self, contributions): self.ax4.clear() colors = plt.cm.viridis(contributions) bars = self.ax4.barh(self.ch_names, contributions, color=colors) for i, (bar, v) in enumerate(zip(bars, contributions)): if v > 0.05: self.ax4.text(v + 0.02, i, f"{v*100:.1f}%", va='center', fontsize=9) self.ax4.set_title("Electrode Contributions", fontsize=10) self.ax4.set_xlabel("Contribution Rate", fontsize=9) self.ax4.set_xlim(0, 0.6) self.ax4.grid(True, alpha=0.3, axis='x') def update_probabilities(self, probs): self.ax2.clear() emotions = [self.emotion_labels[i] for i in range(4)] bars = self.ax2.barh(emotions, probs, color=self.emotion_colors) max_idx = np.argmax(probs) bars[max_idx].set_edgecolor('black') bars[max_idx].set_linewidth(2) for bar, prob in zip(bars, probs): width = bar.get_width() if width > 0.05: self.ax2.text(width + 0.02, bar.get_y() + bar.get_height()/2, f"{width*100:.1f}%", ha='left', va='center', fontsize=9, fontweight='bold') self.ax2.set_title("Emotion Probabilities", fontsize=12) self.ax2.set_xlabel("Probability") self.ax2.set_xlim(0, 1) self.ax2.grid(True, alpha=0.3, axis='x') current_emotion = emotions[max_idx] confidence = probs[max_idx] self.ax2.text(0.5, 1.05, f"Current: {current_emotion} ({confidence*100:.1f}%)", transform=self.ax2.transAxes, ha='center', fontsize=10, fontweight='bold', color=self.emotion_colors[max_idx]) def start_monitoring(self): print("\n" + "="*60) print(" OPTIMIZED EEG EMOTION RECOGNITION MONITOR") print("="*60) print("\nPress 'X' to close the window...") print("Real-time performance metrics will be displayed") print("-"*60) self.is_running = True self.start_time = time.time() self.animation = FuncAnimation( self.fig, self.update_plot, interval=self.update_interval, blit=False, cache_frame_data=False ) plt.show() self.is_running = False print("\n\nMonitoring stopped.") if self.prediction_history: self.print_summary_statistics() def print_summary_statistics(self): print("\n" + "="*80) print(" DETAILED PERFORMANCE & STATISTICS SUMMARY") print("="*80) if not self.prediction_history: print("No data collected.") return metrics = self.performance_metrics total_time = time.time() - self.start_time if self.start_time else 0 print("\n📊 PERFORMANCE METRICS:") print(f" Total Predictions: {metrics['total_predictions']}") print(f" Total Runtime: {total_time:.1f} seconds") print(f" Average Processing Speed: {metrics['processing_fps']:.1f} FPS") print(f" Average Response Time: {np.mean(metrics['prediction_times'])*1000:.1f}ms") print(f" Model Stability Score: {metrics['stability_score']:.3f} (0-1, higher=better)") print(f" Emotion Transitions: {metrics['emotion_transitions']}") print(f"\n🎯 CONFIDENCE ANALYSIS:") total = len(self.prediction_history) high_conf_count = metrics['high_confidence_predictions'] low_conf_count = metrics['low_confidence_predictions'] medium_conf_count = max(0, total - high_conf_count - low_conf_count) print(f" Average Confidence: {metrics['average_confidence']:.3f}") print(f" Confidence Std Dev: {np.std(self.confidence_history):.3f}") print(f" High Confidence (>0.8): {high_conf_count} ({high_conf_count/total*100:.1f}%)") print(f" Medium Confidence (0.5-0.8): {medium_conf_count} ({medium_conf_count/total*100:.1f}%)") print(f" Low Confidence (<0.5): {low_conf_count} ({low_conf_count/total*100:.1f}%)") accuracy_score = high_conf_count / total * 100 if total > 0 else 0 print(f"\n🏆 MODEL QUALITY ASSESSMENT:") print(f" Estimated Accuracy: {accuracy_score:.1f}% (based on high confidence predictions)") if accuracy_score >= 80: quality = "EXCELLENT 🌟" elif accuracy_score >= 70: quality = "GOOD ✅" elif accuracy_score >= 60: quality = "FAIR ⚠️" else: quality = "POOR ❌" print(f" Model Quality Rating: {quality}") emotion_counts = {i: 0 for i in range(4)} for pred in self.prediction_history: emotion_counts[pred] += 1 print(f"\n😊 EMOTION DISTRIBUTION:") for emotion_id, count in emotion_counts.items(): percentage = (count / total) * 100 bar = "█" * int(percentage / 5) print(f" {self.emotion_labels[emotion_id]:<15}: {count:>3} ({percentage:>5.1f}%) {bar}") dominant_emotion = max(emotion_counts, key=emotion_counts.get) dominant_percentage = emotion_counts[dominant_emotion] / total * 100 print(f"\n Dominant Emotion: {self.emotion_labels[dominant_emotion]} ({dominant_percentage:.1f}%)") if len(metrics['confidence_trend']) > 3: try: x_data = np.array(range(len(metrics['confidence_trend'])), dtype=np.float64) y_data = np.array(metrics['confidence_trend'], dtype=np.float64) trend_slope = np.polyfit(x_data, y_data, 1)[0] print(f"\n📈 TREND ANALYSIS:") if trend_slope > 0.01: trend_desc = "IMPROVING ↗" elif trend_slope < -0.01: trend_desc = "DECLINING ↘" else: trend_desc = "STABLE →" print(f" Recent Confidence Trend: {trend_desc} (slope: {trend_slope:.4f})") except Exception as e: print(f"\n📈 TREND ANALYSIS:") print(f" Recent Confidence Trend: STABLE → (analysis unavailable)") print(f"\n💡 RECOMMENDATIONS:") if accuracy_score < 70: print(" • Consider retraining the model with more data") print(" • Check data quality and preprocessing steps") if metrics['stability_score'] < 0.7: print(" • Model predictions are unstable - review signal quality") if metrics['processing_fps'] < 5: print(" • Processing speed is slow - consider model optimization") if accuracy_score >= 80 and metrics['stability_score'] >= 0.8: print(" • Model performance is excellent! ✨") print("\n" + "="*80) def main(): model_path = 'path/to/bai-6 EmotionOptimized.h5' scaler_path = 'path/to/bai-6 ScalerOptimized.pkl' selector_path = 'path/to/bai-6_feature_selector_opt.pkl' pca_path = 'path/to/bai-6_pca_reducer_opt.pkl' try: monitor = EEGEmotionMonitorOptimized( model_path, scaler_path, selector_path, pca_path ) monitor.start_monitoring() except FileNotFoundError as e: print(f"Model or preprocessor file not found: {e}") print("Please ensure the model has been trained and saved.") print("Available fallback: Using basic model without feature selection/PCA") try: basic_model_path = 'path/to/bai-6 EmotionOptimized.h5' basic_scaler_path = 'path/to/bai-6 ScalerOptimized.pkl' monitor = EEGEmotionMonitorOptimized(basic_model_path, basic_scaler_path) monitor.start_monitoring() except Exception as e2: print(f"Fallback also failed: {e2}") except Exception as e: print(f"Error: {e}") import traceback traceback.print_exc() if __name__ == "__main__": main() ```
------------- ## Lisans/License CC-BY-NC-SA-4.0