---
license: cc-by-nc-sa-4.0
language:
- en
- tr
tags:
- eeg
- brain
- deeplearning
- artificialintelligence
- ai
- model
- emotions
- neuroscience
- neura
- neuro
- bci
- health
pipeline_tag: time-series-forecasting
library_name: keras
---
# bai-6 Emotion (TR)
## Tanım
bai-6 Emotion modeli, EEG ve iEEG tarafından toplanan veriler ile eğitilen bir detaylı duygu sınıflandırma modelidir. Model, 6 kanallı bir EEG cihazıyla çalışabilir durumdadır.
## Hedef Kitle
bai modelleri, herkes için tasarlanmıştır. Açık kaynak versiyonları herkes tarafından kullanılabilir.
## Sınıflar
- Sakin
- Üzgün
- Kızgın
- Mutlu
## Neuramax
Neuramax-6 Gen1 ile tam uyumlu çalışmaktadır.
-------------------------------------------------------------------------
# bai-6 Emotion (EN)
## Definition
The bai-6 Emotion model is a detailed emotion classification model trained with data collected by EEG and iEEG. The model can work with a 6-channel EEG device.
## Target Audience
bai models are designed for everyone. Open source versions are available for everyone to use.
## Classes
- Calm
- Sad
- Angry
- Happy
## Neuramax
Fully compatible with Neuramax-6 Gen1.
-------------
bai-6 Emotion v1
# bai-6 Emotion v1 Yapısı / Structure
```bash
"model_summary":
"Model: Total params: 5,046 (19.71 KB)
Trainable params: 5,044 (19.70 KB)
Non-trainable params: 0 (0.00 B)
Optimizer params: 2 (12.00 B)",
"layers": [
{
"name": "dense",
"trainable": true,
"count_params": 2368
},
{
"name": "dropout",
"trainable": true,
"count_params": 0
},
{
"name": "dense_1",
"trainable": true,
"count_params": 2080
},
{
"name": "dropout_1",
"trainable": true,
"count_params": 0
},
{
"name": "dense_2",
"trainable": true,
"count_params": 528
},
{
"name": "dense_3",
"trainable": true,
"count_params": 68
}
]
```
# Kullanım / Usage
## 1. Sentetik Veri ile / With Synthetic Data
```python
import numpy as np
import matplotlib.pyplot as plt
import mne
from matplotlib.animation import FuncAnimation
from tensorflow.keras.models import load_model
import joblib
class EEGMonitor:
def __init__(self, model_path, scaler_path):
self.model = load_model(model_path)
self.scaler = joblib.load(scaler_path)
self.ch_names = ['T7', 'C3', 'Cz', 'C4', 'T8', 'Pz']
self.fs = 1000 # Örnekleme frekansı / Sampling frequency
self.buffer_size = 1000 # 1 saniyelik buffer / 1 second buffer
self.raw_buffer = np.zeros((6, self.buffer_size))
self.feature_contributions = {ch: [] for ch in self.ch_names}
# Elektrot pozisyonları (10-20 sistemi) / Electrode positions (10-20 system)
self.montage = mne.channels.make_standard_montage('standard_1020')
self.fig = plt.figure(figsize=(15, 10))
self.setup_plots()
def setup_plots(self):
self.ax1 = self.fig.add_subplot(223)
self.ax1.set_title("Canlı EEG Sinyalleri / Live EEG Signals")
self.ax1.set_xlabel("Zaman (ms) / Time (ms)")
self.ax1.set_ylabel("Amplitüd (µV) / Amplitude (µV)")
self.ax2 = self.fig.add_subplot(221)
self.ax2.set_title("Elektrot Konumları / Electrode Locations")
self.ax3 = self.fig.add_subplot(224)
self.ax3.set_title("Elektrot Katkı Oranları / Electrode Contribution Ratios")
self.ax3.set_ylim(0, 1)
self.ax4 = self.fig.add_subplot(222)
self.ax4.set_title("Duygu Tahmin Olasılıkları / Emotion Prediction Probabilities")
self.ax4.set_ylim(0, 1)
plt.tight_layout()
def generate_synthetic_data(self):
"""Sentetik EEG verisi üretir (6 kanal x 1000 örnek) / Generates synthetic EEG data (6 channels x 1000 samples)"""
noise = np.random.normal(0, 5e-6, (6, self.buffer_size))
t = np.linspace(0, 1, self.buffer_size)
noise[1] += 2e-6 * np.sin(2 * np.pi * 10 * t)
return noise
def update_buffer(self, new_data):
"""Buffer'ı kaydırmalı olarak günceller / Updates the buffer with new data by rolling"""
self.raw_buffer = np.roll(self.raw_buffer, -new_data.shape[1], axis=1)
self.raw_buffer[:, -new_data.shape[1]:] = new_data
def calculate_channel_contributions(self, features):
"""Her elektrotun tahmindeki katkısını hesaplar / Calculates the contribution of each electrode to the prediction"""
contributions = np.zeros(6)
for i in range(6):
channel_weights = self.model.layers[0].get_weights()[0][i * 6:(i + 1) * 6]
contributions[i] = np.mean(np.abs(channel_weights))
return contributions / np.sum(contributions)
def update_plot(self, frame):
new_data = self.generate_synthetic_data()
self.update_buffer(new_data)
features = self.extract_features(self.raw_buffer)
scaled_features = self.scaler.transform([features])
probs = self.model.predict(scaled_features, verbose=0)[0]
contributions = self.calculate_channel_contributions(features)
self.update_eeg_plot()
self.update_topomap()
self.update_contributions(contributions)
self.update_probabilities(probs)
def update_eeg_plot(self):
self.ax1.clear()
for i in range(6):
offset = i * 20e-6
self.ax1.plot(self.raw_buffer[i] + offset, label=self.ch_names[i])
self.ax1.legend(loc='upper right')
def update_topomap(self):
self.ax2.clear()
info = mne.create_info(self.ch_names, self.fs, 'eeg')
evoked = mne.EvokedArray(self.raw_buffer.mean(axis=1, keepdims=True), info)
evoked.set_montage(self.montage)
mne.viz.plot_topomap(evoked.data[:, 0], evoked.info, axes=self.ax2, show=False)
def update_contributions(self, contributions):
self.ax3.clear()
self.ax3.barh(self.ch_names, contributions, color='skyblue')
for i, v in enumerate(contributions):
self.ax3.text(v, i, f"{v * 100:.1f}%", color='black')
def update_probabilities(self, probs):
emotions = ['Mutlu / Happy', 'Kızgın / Angry', 'Üzgün / Sad', 'Sakin / Calm']
self.ax4.clear()
bars = self.ax4.barh(emotions, probs, color=['green', 'red', 'blue', 'purple'])
for bar in bars:
width = bar.get_width()
self.ax4.text(width, bar.get_y() + 0.2, f"{width * 100:.1f}%", ha='left')
def extract_features(self, data):
"""6 kanal için özellik çıkarımı / Feature extraction for 6 channels"""
features = []
for channel in data:
features.extend([
np.mean(channel),
np.std(channel),
np.ptp(channel),
np.sum(np.abs(np.diff(channel))),
np.median(channel),
np.percentile(np.abs(channel), 95)
])
return np.array(features)
def start_monitoring(self):
anim = FuncAnimation(self.fig, self.update_plot, interval=100)
plt.show()
if __name__ == "__main__":
monitor = EEGMonitor(
model_path='model/path/bai-6 Emotion.h5',
scaler_path='scaler/path/bai-6_scaler.save'
)
monitor.start_monitoring()
```
## 2. Veri Seti ile / With Dataset
```python
import numpy as np
import matplotlib.pyplot as plt
import mne
from matplotlib.animation import FuncAnimation
from tensorflow.keras.models import load_model
import joblib
import os
class EEGMonitor:
def __init__(self, model_path, scaler_path, data_path):
self.model = load_model(model_path)
self.scaler = joblib.load(scaler_path)
self.data_path = data_path
self.ch_names = ['T7', 'C3', 'Cz', 'C4', 'T8', 'Pz']
self.fs = 1000 # Örnekleme frekansı / Sampling frequency
self.buffer_size = 1000 # 1 saniyelik buffer / 1 second buffer
self.raw_buffer = np.zeros((6, self.buffer_size))
self.feature_contributions = {ch: [] for ch in self.ch_names}
# Elektrot pozisyonları / Electrode positions (10-20 system)
self.montage = mne.channels.make_standard_montage('standard_1020')
self.fig = plt.figure(figsize=(15, 10))
self.setup_plots()
self.dataset = self.load_dataset(self.data_path)
self.current_index = 0
def setup_plots(self):
self.ax1 = self.fig.add_subplot(223)
self.ax1.set_title("Canlı EEG Sinyalleri / Live EEG Signals")
self.ax1.set_xlabel("Zaman (ms) / Time (ms)")
self.ax1.set_ylabel("Amplitüd (µV) / Amplitude (µV)")
self.ax2 = self.fig.add_subplot(221)
self.ax2.set_title("Elektrot Konumları / Electrode Locations")
self.ax3 = self.fig.add_subplot(224)
self.ax3.set_title("Elektrot Katkı Oranları / Electrode Contribution Ratios")
self.ax3.set_ylim(0, 1)
self.ax4 = self.fig.add_subplot(222)
self.ax4.set_title("Duygu Tahmin Olasılıkları / Emotion Prediction Probabilities")
self.ax4.set_ylim(0, 1)
plt.tight_layout()
def load_dataset(self, path):
"""Desteklenen veri formatları: .npy (numpy), .csv / Supported data formats: .npy (numpy), .csv"""
if not os.path.exists(path):
raise FileNotFoundError(f"Veri seti bulunamadı / Not found dataset: {path}")
if path.endswith(".npy"):
data = np.load(path)
elif path.endswith(".csv"):
data = np.loadtxt(path, delimiter=',')
else:
raise ValueError("Desteklenmeyen dosya formatı. Yalnızca .npy veya .csv kullanılabilir. / Unsupported file format. Only .npy or .csv can be used.")
# Transpose gerekebilir: (n_channels, n_samples) / Transpose may be needed: (n_channels, n_samples)
if data.shape[0] != 6:
data = data.T
return data
def get_next_chunk(self):
"""Veri setinden buffer_size uzunluğunda bir parça alır / Gets a chunk of length buffer_size from the dataset"""
if self.current_index + self.buffer_size >= self.dataset.shape[1]:
self.current_index = 0
chunk = self.dataset[:, self.current_index:self.current_index + self.buffer_size]
self.current_index += self.buffer_size
return chunk
def update_buffer(self, new_data):
self.raw_buffer = np.roll(self.raw_buffer, -new_data.shape[1], axis=1)
self.raw_buffer[:, -new_data.shape[1]:] = new_data
def calculate_channel_contributions(self, features):
contributions = np.zeros(6)
for i in range(6):
channel_weights = self.model.layers[0].get_weights()[0][i * 6:(i + 1) * 6]
contributions[i] = np.mean(np.abs(channel_weights))
return contributions / np.sum(contributions)
def update_plot(self, frame):
new_data = self.get_next_chunk()
self.update_buffer(new_data)
features = self.extract_features(self.raw_buffer)
scaled_features = self.scaler.transform([features])
probs = self.model.predict(scaled_features, verbose=0)[0]
contributions = self.calculate_channel_contributions(features)
self.update_eeg_plot()
self.update_topomap()
self.update_contributions(contributions)
self.update_probabilities(probs)
def update_eeg_plot(self):
self.ax1.clear()
for i in range(6):
offset = i * 20e-6
self.ax1.plot(self.raw_buffer[i] + offset, label=self.ch_names[i])
self.ax1.legend(loc='upper right')
def update_topomap(self):
self.ax2.clear()
info = mne.create_info(self.ch_names, self.fs, 'eeg')
evoked = mne.EvokedArray(self.raw_buffer.mean(axis=1, keepdims=True), info)
evoked.set_montage(self.montage)
mne.viz.plot_topomap(evoked.data[:, 0], evoked.info, axes=self.ax2, show=False)
def update_contributions(self, contributions):
self.ax3.clear()
self.ax3.barh(self.ch_names, contributions, color='skyblue')
for i, v in enumerate(contributions):
self.ax3.text(v, i, f"{v * 100:.1f}%", color='black')
def update_probabilities(self, probs):
emotions = ['Mutlu / Happy', 'Kızgın / Angry', 'Üzgün / Sad', 'Sakin / Calm']
self.ax4.clear()
bars = self.ax4.barh(emotions, probs, color=['green', 'red', 'blue', 'purple'])
for bar in bars:
width = bar.get_width()
self.ax4.text(width, bar.get_y() + 0.2, f"{width * 100:.1f}%", ha='left')
def extract_features(self, data):
features = []
for channel in data:
features.extend([
np.mean(channel),
np.std(channel),
np.ptp(channel),
np.sum(np.abs(np.diff(channel))),
np.median(channel),
np.percentile(np.abs(channel), 95)
])
return np.array(features)
def start_monitoring(self):
anim = FuncAnimation(self.fig, self.update_plot, interval=1000)
plt.show()
if __name__ == "__main__":
monitor = EEGMonitor(
model_path="model/path/bai-6 Emotion.h5",
scaler_path="scaler/path/bai-6_scaler.save",
data_path="data/path/npy/or/csv"
)
monitor.start_monitoring()
```
bai-6 Emotion v2
# bai-6 Emotion v2 Yapısı / Structure
|Layer (type) | Output Shape | Param # |
| --- | --- | --- |
| dense_4 (Dense) | (None, 128) | 4,736 |
| batch_normalization_2 | (None, 128) | 512 |
| dropout_3 (Dropout) | (None, 128) | 0 |
| dense_5 (Dense) | (None, 64) | 8,256 |
| batch_normalization_3 | (None, 64) | 256 |
| dropout_4 (Dropout) | (None, 64) | 0 |
| dense_6 (Dense) | (None, 32) | 2,080 |
| dropout_5 (Dropout) | (None, 32) | 0 |
| dense_7 (Dense) | (None, 4) | 132 |
### Total params: 15,972 (62.39 KB)
**Trainable params: 15,588 (60.89 KB)**
**Non-trainable params: 384 (1.50 KB)**
# Kullanım / Usage
## Sentetik Veri ile / With Synthetic Data
```python
import numpy as np
import joblib
import time
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
from tensorflow.keras.models import load_model
from datetime import datetime
import mne
import warnings
warnings.filterwarnings('ignore')
class EEGEmotionMonitorOptimized:
def __init__(self, model_path, scaler_path, selector_path=None, pca_path=None):
self.emotion_labels = {
0: "Mutlu (Happy)",
1: "Kızgın (Angry)",
2: "Üzgün (Sad)",
3: "Sakin (Calm)"
}
self.emotion_colors = ['#FFD700', '#FF4444', '#4169E1', '#32CD32']
# Kanal isimleri ve parametreler / Channel names and parameters
self.ch_names = ['T7', 'C3', 'Cz', 'C4', 'T8', 'Pz']
self.fs = 128 # Örnekleme hızı / Sampling rate
self.buffer_size = 640
self.update_interval = 200
# Buffer'ları başlat / Initialize buffers
self.raw_buffer = np.zeros((6, self.buffer_size))
self.prediction_history = []
self.confidence_history = []
self.time_history = []
self.max_history = 30
# Performance metrics tracking
self.performance_metrics = {
'total_predictions': 0,
'high_confidence_predictions': 0, # >0.8 confidence
'low_confidence_predictions': 0, # <0.5 confidence
'prediction_times': [], # Processing time per prediction
'emotion_transitions': 0, # Count of emotion changes
'stability_score': 0.0, # How stable predictions are
'average_confidence': 0.0,
'confidence_trend': [], # Last 10 confidence values for trend analysis
'processing_fps': 0.0, # Processing speed
'last_prediction': None
}
self.metrics_text = ""
self.start_time = None
try:
self.model = load_model(model_path)
self.scaler = joblib.load(scaler_path)
self.selector = None
self.pca = None
if selector_path:
try:
self.selector = joblib.load(selector_path)
print("Feature selector loaded")
except:
print("Feature selector not found, using raw features")
if pca_path:
try:
self.pca = joblib.load(pca_path)
print("PCA reducer loaded")
except:
print("PCA reducer not found, skipping dimensionality reduction")
print("Model and preprocessors successfully loaded!")
print(f"Model input shape: {self.model.input_shape}")
print(f"Output classes: {len(self.emotion_labels)}")
# Elektrot pozisyonları (10-20 sistemi) / Electrode positions (10-20 system)
self.montage = mne.channels.make_standard_montage('standard_1020')
except Exception as e:
print(f"Model/preprocessor loading error: {e}")
raise
self.fig = plt.figure(figsize=(14, 8))
self.fig.suptitle('EEG Duygu Tanıma Sistemi / EEG Emotion Analysis System', fontsize=16, fontweight='bold')
self.setup_plots()
self.animation = None
self.is_running = False
def setup_plots(self):
"""4 panelli görselleştirme arayüzünü hazırla (with performance metrics) / Setup 4-panel visualization interface (with performance metrics)"""
self.ax1 = self.fig.add_subplot(221)
self.ax1.set_title("Live EEG Signals", fontsize=10)
self.ax1.set_xlabel("Time (samples)", fontsize=9)
self.ax1.set_ylabel("Amplitude (µV)", fontsize=9)
self.ax1.grid(True, alpha=0.3)
self.ax2 = self.fig.add_subplot(222)
self.ax2.set_title("Emotion Probabilities", fontsize=10)
self.ax2.set_xlim(0, 1)
self.ax3 = self.fig.add_subplot(223)
self.ax3.set_title("Performance Metrics & Confidence Trend", fontsize=10)
self.ax4 = self.fig.add_subplot(224)
self.ax4.set_title("Electrode Contributions", fontsize=10)
self.ax4.set_xlim(0, 1)
plt.tight_layout(pad=1.0)
def generate_realistic_eeg_signal(self, emotion_bias=None):
noise = np.random.normal(0, 3e-6, (6, self.buffer_size))
t = np.linspace(0, self.buffer_size/self.fs, self.buffer_size)
alpha_freq = np.random.uniform(8, 12) # Alpha dominant
beta_freq = np.random.uniform(15, 25) # Beta
for ch in range(6):
if emotion_bias == 0: # Happy - higher beta
beta_amp = np.random.uniform(4e-6, 6e-6)
alpha_amp = np.random.uniform(2e-6, 3e-6)
elif emotion_bias == 1: # Angry - very high beta
beta_amp = np.random.uniform(5e-6, 7e-6)
alpha_amp = np.random.uniform(1e-6, 2e-6)
elif emotion_bias == 2: # Sad - lower activity
beta_amp = np.random.uniform(1e-6, 2e-6)
alpha_amp = np.random.uniform(3e-6, 5e-6)
elif emotion_bias == 3: # Calm - high alpha
beta_amp = np.random.uniform(1e-6, 3e-6)
alpha_amp = np.random.uniform(4e-6, 6e-6)
else: # Random
beta_amp = np.random.uniform(2e-6, 4e-6)
alpha_amp = np.random.uniform(2e-6, 4e-6)
noise[ch] += alpha_amp * np.sin(2 * np.pi * alpha_freq * t + np.random.random() * 2 * np.pi)
noise[ch] += beta_amp * np.sin(2 * np.pi * beta_freq * t + np.random.random() * 2 * np.pi)
return noise.astype(np.float32)
def update_buffer(self, new_data):
samples_to_add = min(new_data.shape[1], self.buffer_size // 4)
self.raw_buffer = np.roll(self.raw_buffer, -samples_to_add, axis=1)
self.raw_buffer[:, -samples_to_add:] = new_data[:, :samples_to_add]
def extract_lightweight_features(self, signal_data):
features = []
for channel_data in signal_data:
time_features = [
np.mean(channel_data),
np.std(channel_data),
np.ptp(channel_data),
np.median(channel_data),
np.mean(np.abs(channel_data)),
np.sqrt(np.mean(channel_data**2))
]
try:
fft_vals = np.abs(np.fft.rfft(channel_data[::4]))
freqs = np.fft.rfftfreq(len(channel_data)//4, 4/self.fs)
delta_power = np.sum(fft_vals[(freqs >= 0.5) & (freqs <= 4)])
theta_power = np.sum(fft_vals[(freqs >= 4) & (freqs <= 8)])
alpha_power = np.sum(fft_vals[(freqs >= 8) & (freqs <= 13)])
beta_power = np.sum(fft_vals[(freqs >= 13) & (freqs <= 30)])
total_power = np.sum(fft_vals) + 1e-10
freq_features = [
delta_power / total_power,
theta_power / total_power,
alpha_power / total_power,
beta_power / total_power
]
except:
freq_features = [0.25, 0.25, 0.25, 0.25]
nonlinear_features = [
np.std(np.diff(channel_data)) / (np.std(channel_data) + 1e-10),
np.mean(np.abs(np.diff(channel_data)))
]
channel_features = time_features + freq_features + nonlinear_features
features.extend(channel_features)
return np.array(features, dtype=np.float32)
def calculate_channel_contributions(self, signal_data):
contributions = np.zeros(6)
for i in range(6):
contributions[i] = np.sqrt(np.mean(signal_data[i]**2))
total = np.sum(contributions) + 1e-10
return contributions / total
def update_performance_metrics(self, predicted_class, confidence, processing_time):
metrics = self.performance_metrics
metrics['total_predictions'] += 1
if confidence > 0.8:
metrics['high_confidence_predictions'] += 1
elif confidence < 0.5:
metrics['low_confidence_predictions'] += 1
metrics['prediction_times'].append(processing_time)
if len(metrics['prediction_times']) > 50:
metrics['prediction_times'].pop(0)
if metrics['prediction_times']:
avg_time = np.mean(metrics['prediction_times'])
metrics['processing_fps'] = 1.0 / max(avg_time, 0.001)
if metrics['last_prediction'] is not None and metrics['last_prediction'] != predicted_class:
metrics['emotion_transitions'] += 1
metrics['last_prediction'] = predicted_class
metrics['confidence_trend'].append(float(confidence))
if len(metrics['confidence_trend']) > 10:
metrics['confidence_trend'].pop(0)
if self.confidence_history:
metrics['average_confidence'] = np.mean(self.confidence_history)
if metrics['total_predictions'] > 1:
transition_rate = metrics['emotion_transitions'] / metrics['total_predictions']
metrics['stability_score'] = max(0, 1.0 - transition_rate)
def update_plot(self, frame):
if not self.is_running:
return
start_time = time.time()
if frame % 2 == 0:
if np.random.random() < 0.2:
emotion_bias = np.random.randint(0, 4)
else:
emotion_bias = None
new_samples = self.buffer_size // 8
new_data = self.generate_realistic_eeg_signal(emotion_bias)[:, :new_samples]
self.update_buffer(new_data)
prediction_start = time.time()
features = self.extract_lightweight_features(self.raw_buffer)
try:
scaled_features = self.scaler.transform([features])
if self.selector is not None:
scaled_features = self.selector.transform(scaled_features)
if self.pca is not None:
scaled_features = self.pca.transform(scaled_features)
probs = self.model.predict(scaled_features, verbose=0)[0]
predicted_class = np.argmax(probs)
confidence = np.max(probs)
except Exception as e:
print(f"Prediction error: {e}")
probs = np.array([0.25, 0.25, 0.25, 0.25])
predicted_class = 0
confidence = 0.25
prediction_time = time.time() - prediction_start
self.update_performance_metrics(predicted_class, confidence, prediction_time)
self.prediction_history.append(predicted_class)
self.confidence_history.append(confidence)
self.time_history.append(datetime.now())
if len(self.prediction_history) > self.max_history:
self.prediction_history.pop(0)
self.confidence_history.pop(0)
self.time_history.pop(0)
contributions = self.calculate_channel_contributions(self.raw_buffer)
self.update_eeg_plot()
self.update_probabilities(probs)
self.update_performance_plot()
self.update_contributions(contributions)
emotion_name = self.emotion_labels[predicted_class]
metrics = self.performance_metrics
elapsed_time = time.time() - self.start_time if self.start_time else 0
print(f"\r{datetime.now().strftime('%H:%M:%S')} | "
f"Emotion: {emotion_name} | "
f"Conf: {confidence:.3f} | "
f"FPS: {metrics['processing_fps']:.1f} | "
f"Stab: {metrics['stability_score']:.2f} | "
f"Total: {metrics['total_predictions']} | "
f"Time: {elapsed_time:.0f}s", end='')
def update_eeg_plot(self):
self.ax1.clear()
colors = plt.cm.tab10(np.linspace(0, 1, 6))
display_samples = min(300, self.buffer_size) # Show fewer samples for performance
for i in range(6):
offset = i * 20e-6
signal = self.raw_buffer[i, -display_samples:] + offset
self.ax1.plot(signal, label=self.ch_names[i],
color=colors[i], linewidth=1.0, alpha=0.8)
self.ax1.set_title("Live EEG Signals", fontsize=12)
self.ax1.set_xlabel("Time (samples)")
self.ax1.set_ylabel("Amplitude (µV)")
self.ax1.legend(loc='upper right', fontsize=8)
self.ax1.grid(True, alpha=0.3)
def update_performance_plot(self):
self.ax3.clear()
metrics = self.performance_metrics
if metrics['total_predictions'] > 0:
high_conf_pct = (metrics['high_confidence_predictions'] / metrics['total_predictions']) * 100
low_conf_pct = (metrics['low_confidence_predictions'] / metrics['total_predictions']) * 100
metrics_text = f"""PERFORMANCE METRICS
Total Predictions: {metrics['total_predictions']}
Average Confidence: {metrics['average_confidence']:.3f}
High Confidence (>0.8): {high_conf_pct:.1f}%
Low Confidence (<0.5): {low_conf_pct:.1f}%
Processing Speed: {metrics['processing_fps']:.1f} FPS
Stability Score: {metrics['stability_score']:.3f}
Emotion Transitions: {metrics['emotion_transitions']}
Model Accuracy: {high_conf_pct:.1f}%
Response Time: {np.mean(metrics['prediction_times'])*1000:.1f}ms"""
self.ax3.text(0.02, 0.98, metrics_text,
transform=self.ax3.transAxes,
fontsize=8, verticalalignment='top',
fontfamily='monospace',
bbox=dict(boxstyle="round,pad=0.3", facecolor="lightblue", alpha=0.7))
if len(metrics['confidence_trend']) > 1:
trend_x = np.arange(len(metrics['confidence_trend']))
trend_data = np.array(metrics['confidence_trend'], dtype=np.float64)
self.ax3.plot(trend_x + 0.6, trend_data * 0.4 + 0.1,
'g-o', markersize=3, linewidth=2, label='Confidence Trend')
# Add trend analysis
if len(metrics['confidence_trend']) > 3:
try:
x_data = np.array(range(len(trend_data[-5:])), dtype=np.float64)
y_data = np.array(trend_data[-5:], dtype=np.float64)
recent_trend = np.polyfit(x_data, y_data, 1)[0]
trend_direction = "↗" if recent_trend > 0.01 else "↘" if recent_trend < -0.01 else "→"
self.ax3.text(0.7, 0.9, f"Trend: {trend_direction}",
transform=self.ax3.transAxes, fontsize=10, fontweight='bold')
except:
# Fallback if polyfit fails
self.ax3.text(0.7, 0.9, f"Trend: →",
transform=self.ax3.transAxes, fontsize=10, fontweight='bold')
self.ax3.set_xlim(0, 1)
self.ax3.set_ylim(0, 1)
self.ax3.set_title("Performance Metrics & Confidence Trend", fontsize=10)
if metrics['average_confidence'] > 0.8:
title_color = 'green'
elif metrics['average_confidence'] > 0.6:
title_color = 'orange'
else:
title_color = 'red'
self.ax3.title.set_color(title_color)
def update_contributions(self, contributions):
self.ax4.clear()
colors = plt.cm.viridis(contributions)
bars = self.ax4.barh(self.ch_names, contributions, color=colors)
for i, (bar, v) in enumerate(zip(bars, contributions)):
if v > 0.05:
self.ax4.text(v + 0.02, i, f"{v*100:.1f}%",
va='center', fontsize=9)
self.ax4.set_title("Electrode Contributions", fontsize=10)
self.ax4.set_xlabel("Contribution Rate", fontsize=9)
self.ax4.set_xlim(0, 0.6)
self.ax4.grid(True, alpha=0.3, axis='x')
def update_probabilities(self, probs):
self.ax2.clear()
emotions = [self.emotion_labels[i] for i in range(4)]
bars = self.ax2.barh(emotions, probs, color=self.emotion_colors)
max_idx = np.argmax(probs)
bars[max_idx].set_edgecolor('black')
bars[max_idx].set_linewidth(2)
for bar, prob in zip(bars, probs):
width = bar.get_width()
if width > 0.05:
self.ax2.text(width + 0.02, bar.get_y() + bar.get_height()/2,
f"{width*100:.1f}%", ha='left', va='center',
fontsize=9, fontweight='bold')
self.ax2.set_title("Emotion Probabilities", fontsize=12)
self.ax2.set_xlabel("Probability")
self.ax2.set_xlim(0, 1)
self.ax2.grid(True, alpha=0.3, axis='x')
current_emotion = emotions[max_idx]
confidence = probs[max_idx]
self.ax2.text(0.5, 1.05, f"Current: {current_emotion} ({confidence*100:.1f}%)",
transform=self.ax2.transAxes, ha='center',
fontsize=10, fontweight='bold', color=self.emotion_colors[max_idx])
def start_monitoring(self):
print("\n" + "="*60)
print(" OPTIMIZED EEG EMOTION RECOGNITION MONITOR")
print("="*60)
print("\nPress 'X' to close the window...")
print("Real-time performance metrics will be displayed")
print("-"*60)
self.is_running = True
self.start_time = time.time()
self.animation = FuncAnimation(
self.fig,
self.update_plot,
interval=self.update_interval,
blit=False,
cache_frame_data=False
)
plt.show()
self.is_running = False
print("\n\nMonitoring stopped.")
if self.prediction_history:
self.print_summary_statistics()
def print_summary_statistics(self):
print("\n" + "="*80)
print(" DETAILED PERFORMANCE & STATISTICS SUMMARY")
print("="*80)
if not self.prediction_history:
print("No data collected.")
return
metrics = self.performance_metrics
total_time = time.time() - self.start_time if self.start_time else 0
print("\n📊 PERFORMANCE METRICS:")
print(f" Total Predictions: {metrics['total_predictions']}")
print(f" Total Runtime: {total_time:.1f} seconds")
print(f" Average Processing Speed: {metrics['processing_fps']:.1f} FPS")
print(f" Average Response Time: {np.mean(metrics['prediction_times'])*1000:.1f}ms")
print(f" Model Stability Score: {metrics['stability_score']:.3f} (0-1, higher=better)")
print(f" Emotion Transitions: {metrics['emotion_transitions']}")
print(f"\n🎯 CONFIDENCE ANALYSIS:")
total = len(self.prediction_history)
high_conf_count = metrics['high_confidence_predictions']
low_conf_count = metrics['low_confidence_predictions']
medium_conf_count = max(0, total - high_conf_count - low_conf_count)
print(f" Average Confidence: {metrics['average_confidence']:.3f}")
print(f" Confidence Std Dev: {np.std(self.confidence_history):.3f}")
print(f" High Confidence (>0.8): {high_conf_count} ({high_conf_count/total*100:.1f}%)")
print(f" Medium Confidence (0.5-0.8): {medium_conf_count} ({medium_conf_count/total*100:.1f}%)")
print(f" Low Confidence (<0.5): {low_conf_count} ({low_conf_count/total*100:.1f}%)")
accuracy_score = high_conf_count / total * 100 if total > 0 else 0
print(f"\n🏆 MODEL QUALITY ASSESSMENT:")
print(f" Estimated Accuracy: {accuracy_score:.1f}% (based on high confidence predictions)")
if accuracy_score >= 80:
quality = "EXCELLENT 🌟"
elif accuracy_score >= 70:
quality = "GOOD ✅"
elif accuracy_score >= 60:
quality = "FAIR ⚠️"
else:
quality = "POOR ❌"
print(f" Model Quality Rating: {quality}")
emotion_counts = {i: 0 for i in range(4)}
for pred in self.prediction_history:
emotion_counts[pred] += 1
print(f"\n😊 EMOTION DISTRIBUTION:")
for emotion_id, count in emotion_counts.items():
percentage = (count / total) * 100
bar = "█" * int(percentage / 5)
print(f" {self.emotion_labels[emotion_id]:<15}: {count:>3} ({percentage:>5.1f}%) {bar}")
dominant_emotion = max(emotion_counts, key=emotion_counts.get)
dominant_percentage = emotion_counts[dominant_emotion] / total * 100
print(f"\n Dominant Emotion: {self.emotion_labels[dominant_emotion]} ({dominant_percentage:.1f}%)")
if len(metrics['confidence_trend']) > 3:
try:
x_data = np.array(range(len(metrics['confidence_trend'])), dtype=np.float64)
y_data = np.array(metrics['confidence_trend'], dtype=np.float64)
trend_slope = np.polyfit(x_data, y_data, 1)[0]
print(f"\n📈 TREND ANALYSIS:")
if trend_slope > 0.01:
trend_desc = "IMPROVING ↗"
elif trend_slope < -0.01:
trend_desc = "DECLINING ↘"
else:
trend_desc = "STABLE →"
print(f" Recent Confidence Trend: {trend_desc} (slope: {trend_slope:.4f})")
except Exception as e:
print(f"\n📈 TREND ANALYSIS:")
print(f" Recent Confidence Trend: STABLE → (analysis unavailable)")
print(f"\n💡 RECOMMENDATIONS:")
if accuracy_score < 70:
print(" • Consider retraining the model with more data")
print(" • Check data quality and preprocessing steps")
if metrics['stability_score'] < 0.7:
print(" • Model predictions are unstable - review signal quality")
if metrics['processing_fps'] < 5:
print(" • Processing speed is slow - consider model optimization")
if accuracy_score >= 80 and metrics['stability_score'] >= 0.8:
print(" • Model performance is excellent! ✨")
print("\n" + "="*80)
def main():
model_path = 'path/to/bai-6 EmotionOptimized.h5'
scaler_path = 'path/to/bai-6 ScalerOptimized.pkl'
selector_path = 'path/to/bai-6_feature_selector_opt.pkl'
pca_path = 'path/to/bai-6_pca_reducer_opt.pkl'
try:
monitor = EEGEmotionMonitorOptimized(
model_path, scaler_path, selector_path, pca_path
)
monitor.start_monitoring()
except FileNotFoundError as e:
print(f"Model or preprocessor file not found: {e}")
print("Please ensure the model has been trained and saved.")
print("Available fallback: Using basic model without feature selection/PCA")
try:
basic_model_path = 'path/to/bai-6 EmotionOptimized.h5'
basic_scaler_path = 'path/to/bai-6 ScalerOptimized.pkl'
monitor = EEGEmotionMonitorOptimized(basic_model_path, basic_scaler_path)
monitor.start_monitoring()
except Exception as e2:
print(f"Fallback also failed: {e2}")
except Exception as e:
print(f"Error: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
main()
```
-------------
## Lisans/License
CC-BY-NC-SA-4.0