|
--- |
|
license: cc-by-nc-sa-4.0 |
|
language: |
|
- en |
|
- tr |
|
tags: |
|
- eeg |
|
- brain |
|
- deeplearning |
|
- artificialintelligence |
|
- ai |
|
- model |
|
- emotions |
|
- neuroscience |
|
- neura |
|
- neuro |
|
- bci |
|
- health |
|
pipeline_tag: time-series-forecasting |
|
library_name: keras |
|
--- |
|
|
|
# bai-6 Emotion (TR) |
|
|
|
## Tanım |
|
|
|
bai-6 Emotion modeli, EEG ve iEEG tarafından toplanan veriler ile eğitilen bir detaylı duygu sınıflandırma modelidir. Model, 6 kanallı bir EEG cihazıyla çalışabilir durumdadır. |
|
|
|
## Hedef Kitle |
|
|
|
bai modelleri, herkes için tasarlanmıştır. Açık kaynak versiyonları herkes tarafından kullanılabilir. |
|
|
|
## Sınıflar |
|
|
|
- Sakin |
|
- Üzgün |
|
- Kızgın |
|
- Mutlu |
|
|
|
## Neuramax |
|
|
|
Neuramax-6 Gen1 ile tam uyumlu çalışmaktadır. |
|
|
|
------------------------------------------------------------------------- |
|
# bai-6 Emotion (EN) |
|
|
|
## Definition |
|
|
|
The bai-6 Emotion model is a detailed emotion classification model trained with data collected by EEG and iEEG. The model can work with a 6-channel EEG device. |
|
|
|
## Target Audience |
|
|
|
bai models are designed for everyone. Open source versions are available for everyone to use. |
|
|
|
## Classes |
|
|
|
- Calm |
|
- Sad |
|
- Angry |
|
- Happy |
|
|
|
## Neuramax |
|
|
|
Fully compatible with Neuramax-6 Gen1. |
|
|
|
------------- |
|
|
|
<details> |
|
<summary><strong>bai-6 Emotion v1</strong></summary> |
|
|
|
# bai-6 Emotion v1 Yapısı / Structure |
|
|
|
```bash |
|
"model_summary": |
|
"Model: Total params: 5,046 (19.71 KB) |
|
Trainable params: 5,044 (19.70 KB) |
|
Non-trainable params: 0 (0.00 B) |
|
Optimizer params: 2 (12.00 B)", |
|
"layers": [ |
|
{ |
|
"name": "dense", |
|
"trainable": true, |
|
"count_params": 2368 |
|
}, |
|
{ |
|
"name": "dropout", |
|
"trainable": true, |
|
"count_params": 0 |
|
}, |
|
{ |
|
"name": "dense_1", |
|
"trainable": true, |
|
"count_params": 2080 |
|
}, |
|
{ |
|
"name": "dropout_1", |
|
"trainable": true, |
|
"count_params": 0 |
|
}, |
|
{ |
|
"name": "dense_2", |
|
"trainable": true, |
|
"count_params": 528 |
|
}, |
|
{ |
|
"name": "dense_3", |
|
"trainable": true, |
|
"count_params": 68 |
|
} |
|
] |
|
``` |
|
|
|
# Kullanım / Usage |
|
|
|
## 1. Sentetik Veri ile / With Synthetic Data |
|
```python |
|
import numpy as np |
|
import matplotlib.pyplot as plt |
|
import mne |
|
from matplotlib.animation import FuncAnimation |
|
from tensorflow.keras.models import load_model |
|
import joblib |
|
|
|
|
|
class EEGMonitor: |
|
def __init__(self, model_path, scaler_path): |
|
self.model = load_model(model_path) |
|
self.scaler = joblib.load(scaler_path) |
|
self.ch_names = ['T7', 'C3', 'Cz', 'C4', 'T8', 'Pz'] |
|
self.fs = 1000 # Örnekleme frekansı / Sampling frequency |
|
self.buffer_size = 1000 # 1 saniyelik buffer / 1 second buffer |
|
|
|
self.raw_buffer = np.zeros((6, self.buffer_size)) |
|
self.feature_contributions = {ch: [] for ch in self.ch_names} |
|
|
|
# Elektrot pozisyonları (10-20 sistemi) / Electrode positions (10-20 system) |
|
self.montage = mne.channels.make_standard_montage('standard_1020') |
|
|
|
self.fig = plt.figure(figsize=(15, 10)) |
|
self.setup_plots() |
|
|
|
def setup_plots(self): |
|
self.ax1 = self.fig.add_subplot(223) |
|
self.ax1.set_title("Canlı EEG Sinyalleri / Live EEG Signals") |
|
self.ax1.set_xlabel("Zaman (ms) / Time (ms)") |
|
self.ax1.set_ylabel("Amplitüd (µV) / Amplitude (µV)") |
|
|
|
self.ax2 = self.fig.add_subplot(221) |
|
self.ax2.set_title("Elektrot Konumları / Electrode Locations") |
|
|
|
self.ax3 = self.fig.add_subplot(224) |
|
self.ax3.set_title("Elektrot Katkı Oranları / Electrode Contribution Ratios") |
|
self.ax3.set_ylim(0, 1) |
|
|
|
self.ax4 = self.fig.add_subplot(222) |
|
self.ax4.set_title("Duygu Tahmin Olasılıkları / Emotion Prediction Probabilities") |
|
self.ax4.set_ylim(0, 1) |
|
|
|
plt.tight_layout() |
|
|
|
def generate_synthetic_data(self): |
|
"""Sentetik EEG verisi üretir (6 kanal x 1000 örnek) / Generates synthetic EEG data (6 channels x 1000 samples)""" |
|
noise = np.random.normal(0, 5e-6, (6, self.buffer_size)) |
|
|
|
t = np.linspace(0, 1, self.buffer_size) |
|
noise[1] += 2e-6 * np.sin(2 * np.pi * 10 * t) |
|
|
|
return noise |
|
|
|
def update_buffer(self, new_data): |
|
"""Buffer'ı kaydırmalı olarak günceller / Updates the buffer with new data by rolling""" |
|
self.raw_buffer = np.roll(self.raw_buffer, -new_data.shape[1], axis=1) |
|
self.raw_buffer[:, -new_data.shape[1]:] = new_data |
|
|
|
def calculate_channel_contributions(self, features): |
|
"""Her elektrotun tahmindeki katkısını hesaplar / Calculates the contribution of each electrode to the prediction""" |
|
contributions = np.zeros(6) |
|
for i in range(6): |
|
channel_weights = self.model.layers[0].get_weights()[0][i * 6:(i + 1) * 6] |
|
contributions[i] = np.mean(np.abs(channel_weights)) |
|
|
|
return contributions / np.sum(contributions) |
|
|
|
def update_plot(self, frame): |
|
new_data = self.generate_synthetic_data() |
|
self.update_buffer(new_data) |
|
|
|
features = self.extract_features(self.raw_buffer) |
|
scaled_features = self.scaler.transform([features]) |
|
probs = self.model.predict(scaled_features, verbose=0)[0] |
|
|
|
contributions = self.calculate_channel_contributions(features) |
|
|
|
self.update_eeg_plot() |
|
self.update_topomap() |
|
self.update_contributions(contributions) |
|
self.update_probabilities(probs) |
|
|
|
def update_eeg_plot(self): |
|
self.ax1.clear() |
|
for i in range(6): |
|
offset = i * 20e-6 |
|
self.ax1.plot(self.raw_buffer[i] + offset, label=self.ch_names[i]) |
|
self.ax1.legend(loc='upper right') |
|
|
|
def update_topomap(self): |
|
self.ax2.clear() |
|
info = mne.create_info(self.ch_names, self.fs, 'eeg') |
|
evoked = mne.EvokedArray(self.raw_buffer.mean(axis=1, keepdims=True), info) |
|
evoked.set_montage(self.montage) |
|
mne.viz.plot_topomap(evoked.data[:, 0], evoked.info, axes=self.ax2, show=False) |
|
|
|
def update_contributions(self, contributions): |
|
self.ax3.clear() |
|
self.ax3.barh(self.ch_names, contributions, color='skyblue') |
|
for i, v in enumerate(contributions): |
|
self.ax3.text(v, i, f"{v * 100:.1f}%", color='black') |
|
|
|
def update_probabilities(self, probs): |
|
emotions = ['Mutlu / Happy', 'Kızgın / Angry', 'Üzgün / Sad', 'Sakin / Calm'] |
|
self.ax4.clear() |
|
bars = self.ax4.barh(emotions, probs, color=['green', 'red', 'blue', 'purple']) |
|
for bar in bars: |
|
width = bar.get_width() |
|
self.ax4.text(width, bar.get_y() + 0.2, f"{width * 100:.1f}%", ha='left') |
|
|
|
def extract_features(self, data): |
|
"""6 kanal için özellik çıkarımı / Feature extraction for 6 channels""" |
|
features = [] |
|
for channel in data: |
|
features.extend([ |
|
np.mean(channel), |
|
np.std(channel), |
|
np.ptp(channel), |
|
np.sum(np.abs(np.diff(channel))), |
|
np.median(channel), |
|
np.percentile(np.abs(channel), 95) |
|
]) |
|
return np.array(features) |
|
|
|
def start_monitoring(self): |
|
anim = FuncAnimation(self.fig, self.update_plot, interval=100) |
|
plt.show() |
|
|
|
|
|
if __name__ == "__main__": |
|
monitor = EEGMonitor( |
|
model_path='model/path/bai-6 Emotion.h5', |
|
scaler_path='scaler/path/bai-6_scaler.save' |
|
) |
|
monitor.start_monitoring() |
|
``` |
|
|
|
## 2. Veri Seti ile / With Dataset |
|
```python |
|
import numpy as np |
|
import matplotlib.pyplot as plt |
|
import mne |
|
from matplotlib.animation import FuncAnimation |
|
from tensorflow.keras.models import load_model |
|
import joblib |
|
import os |
|
|
|
|
|
class EEGMonitor: |
|
def __init__(self, model_path, scaler_path, data_path): |
|
self.model = load_model(model_path) |
|
self.scaler = joblib.load(scaler_path) |
|
self.data_path = data_path |
|
self.ch_names = ['T7', 'C3', 'Cz', 'C4', 'T8', 'Pz'] |
|
self.fs = 1000 # Örnekleme frekansı / Sampling frequency |
|
self.buffer_size = 1000 # 1 saniyelik buffer / 1 second buffer |
|
|
|
self.raw_buffer = np.zeros((6, self.buffer_size)) |
|
self.feature_contributions = {ch: [] for ch in self.ch_names} |
|
|
|
# Elektrot pozisyonları / Electrode positions (10-20 system) |
|
self.montage = mne.channels.make_standard_montage('standard_1020') |
|
|
|
self.fig = plt.figure(figsize=(15, 10)) |
|
self.setup_plots() |
|
|
|
self.dataset = self.load_dataset(self.data_path) |
|
self.current_index = 0 |
|
|
|
def setup_plots(self): |
|
self.ax1 = self.fig.add_subplot(223) |
|
self.ax1.set_title("Canlı EEG Sinyalleri / Live EEG Signals") |
|
self.ax1.set_xlabel("Zaman (ms) / Time (ms)") |
|
self.ax1.set_ylabel("Amplitüd (µV) / Amplitude (µV)") |
|
|
|
self.ax2 = self.fig.add_subplot(221) |
|
self.ax2.set_title("Elektrot Konumları / Electrode Locations") |
|
|
|
self.ax3 = self.fig.add_subplot(224) |
|
self.ax3.set_title("Elektrot Katkı Oranları / Electrode Contribution Ratios") |
|
self.ax3.set_ylim(0, 1) |
|
|
|
self.ax4 = self.fig.add_subplot(222) |
|
self.ax4.set_title("Duygu Tahmin Olasılıkları / Emotion Prediction Probabilities") |
|
self.ax4.set_ylim(0, 1) |
|
|
|
plt.tight_layout() |
|
|
|
def load_dataset(self, path): |
|
"""Desteklenen veri formatları: .npy (numpy), .csv / Supported data formats: .npy (numpy), .csv""" |
|
if not os.path.exists(path): |
|
raise FileNotFoundError(f"Veri seti bulunamadı / Not found dataset: {path}") |
|
|
|
if path.endswith(".npy"): |
|
data = np.load(path) |
|
elif path.endswith(".csv"): |
|
data = np.loadtxt(path, delimiter=',') |
|
else: |
|
raise ValueError("Desteklenmeyen dosya formatı. Yalnızca .npy veya .csv kullanılabilir. / Unsupported file format. Only .npy or .csv can be used.") |
|
|
|
# Transpose gerekebilir: (n_channels, n_samples) / Transpose may be needed: (n_channels, n_samples) |
|
if data.shape[0] != 6: |
|
data = data.T |
|
return data |
|
|
|
def get_next_chunk(self): |
|
"""Veri setinden buffer_size uzunluğunda bir parça alır / Gets a chunk of length buffer_size from the dataset""" |
|
if self.current_index + self.buffer_size >= self.dataset.shape[1]: |
|
self.current_index = 0 |
|
chunk = self.dataset[:, self.current_index:self.current_index + self.buffer_size] |
|
self.current_index += self.buffer_size |
|
return chunk |
|
|
|
def update_buffer(self, new_data): |
|
self.raw_buffer = np.roll(self.raw_buffer, -new_data.shape[1], axis=1) |
|
self.raw_buffer[:, -new_data.shape[1]:] = new_data |
|
|
|
def calculate_channel_contributions(self, features): |
|
contributions = np.zeros(6) |
|
for i in range(6): |
|
channel_weights = self.model.layers[0].get_weights()[0][i * 6:(i + 1) * 6] |
|
contributions[i] = np.mean(np.abs(channel_weights)) |
|
return contributions / np.sum(contributions) |
|
|
|
def update_plot(self, frame): |
|
new_data = self.get_next_chunk() |
|
self.update_buffer(new_data) |
|
|
|
features = self.extract_features(self.raw_buffer) |
|
scaled_features = self.scaler.transform([features]) |
|
probs = self.model.predict(scaled_features, verbose=0)[0] |
|
|
|
contributions = self.calculate_channel_contributions(features) |
|
|
|
self.update_eeg_plot() |
|
self.update_topomap() |
|
self.update_contributions(contributions) |
|
self.update_probabilities(probs) |
|
|
|
def update_eeg_plot(self): |
|
self.ax1.clear() |
|
for i in range(6): |
|
offset = i * 20e-6 |
|
self.ax1.plot(self.raw_buffer[i] + offset, label=self.ch_names[i]) |
|
self.ax1.legend(loc='upper right') |
|
|
|
def update_topomap(self): |
|
self.ax2.clear() |
|
info = mne.create_info(self.ch_names, self.fs, 'eeg') |
|
evoked = mne.EvokedArray(self.raw_buffer.mean(axis=1, keepdims=True), info) |
|
evoked.set_montage(self.montage) |
|
mne.viz.plot_topomap(evoked.data[:, 0], evoked.info, axes=self.ax2, show=False) |
|
|
|
def update_contributions(self, contributions): |
|
self.ax3.clear() |
|
self.ax3.barh(self.ch_names, contributions, color='skyblue') |
|
for i, v in enumerate(contributions): |
|
self.ax3.text(v, i, f"{v * 100:.1f}%", color='black') |
|
|
|
def update_probabilities(self, probs): |
|
emotions = ['Mutlu / Happy', 'Kızgın / Angry', 'Üzgün / Sad', 'Sakin / Calm'] |
|
self.ax4.clear() |
|
bars = self.ax4.barh(emotions, probs, color=['green', 'red', 'blue', 'purple']) |
|
for bar in bars: |
|
width = bar.get_width() |
|
self.ax4.text(width, bar.get_y() + 0.2, f"{width * 100:.1f}%", ha='left') |
|
|
|
def extract_features(self, data): |
|
features = [] |
|
for channel in data: |
|
features.extend([ |
|
np.mean(channel), |
|
np.std(channel), |
|
np.ptp(channel), |
|
np.sum(np.abs(np.diff(channel))), |
|
np.median(channel), |
|
np.percentile(np.abs(channel), 95) |
|
]) |
|
return np.array(features) |
|
|
|
def start_monitoring(self): |
|
anim = FuncAnimation(self.fig, self.update_plot, interval=1000) |
|
plt.show() |
|
|
|
|
|
if __name__ == "__main__": |
|
monitor = EEGMonitor( |
|
model_path="model/path/bai-6 Emotion.h5", |
|
scaler_path="scaler/path/bai-6_scaler.save", |
|
data_path="data/path/npy/or/csv" |
|
) |
|
monitor.start_monitoring() |
|
``` |
|
|
|
</details> |
|
|
|
<details> |
|
<summary><strong>bai-6 Emotion v2</strong></summary> |
|
|
|
# bai-6 Emotion v2 Yapısı / Structure |
|
|
|
|Layer (type) | Output Shape | Param # | |
|
| --- | --- | --- | |
|
| dense_4 (Dense) | (None, 128) | 4,736 | |
|
| batch_normalization_2 | (None, 128) | 512 | |
|
| dropout_3 (Dropout) | (None, 128) | 0 | |
|
| dense_5 (Dense) | (None, 64) | 8,256 | |
|
| batch_normalization_3 | (None, 64) | 256 | |
|
| dropout_4 (Dropout) | (None, 64) | 0 | |
|
| dense_6 (Dense) | (None, 32) | 2,080 | |
|
| dropout_5 (Dropout) | (None, 32) | 0 | |
|
| dense_7 (Dense) | (None, 4) | 132 | |
|
|
|
### Total params: 15,972 (62.39 KB) |
|
|
|
**Trainable params: 15,588 (60.89 KB)** |
|
|
|
**Non-trainable params: 384 (1.50 KB)** |
|
|
|
# Kullanım / Usage |
|
|
|
## Sentetik Veri ile / With Synthetic Data |
|
```python |
|
import numpy as np |
|
import joblib |
|
import time |
|
import matplotlib.pyplot as plt |
|
from matplotlib.animation import FuncAnimation |
|
from tensorflow.keras.models import load_model |
|
from datetime import datetime |
|
import mne |
|
import warnings |
|
|
|
warnings.filterwarnings('ignore') |
|
|
|
|
|
class EEGEmotionMonitorOptimized: |
|
def __init__(self, model_path, scaler_path, selector_path=None, pca_path=None): |
|
self.emotion_labels = { |
|
0: "Mutlu (Happy)", |
|
1: "Kızgın (Angry)", |
|
2: "Üzgün (Sad)", |
|
3: "Sakin (Calm)" |
|
} |
|
|
|
self.emotion_colors = ['#FFD700', '#FF4444', '#4169E1', '#32CD32'] |
|
|
|
# Kanal isimleri ve parametreler / Channel names and parameters |
|
self.ch_names = ['T7', 'C3', 'Cz', 'C4', 'T8', 'Pz'] |
|
self.fs = 128 # Örnekleme hızı / Sampling rate |
|
self.buffer_size = 640 |
|
self.update_interval = 200 |
|
|
|
# Buffer'ları başlat / Initialize buffers |
|
self.raw_buffer = np.zeros((6, self.buffer_size)) |
|
self.prediction_history = [] |
|
self.confidence_history = [] |
|
self.time_history = [] |
|
self.max_history = 30 |
|
|
|
# Performance metrics tracking |
|
self.performance_metrics = { |
|
'total_predictions': 0, |
|
'high_confidence_predictions': 0, # >0.8 confidence |
|
'low_confidence_predictions': 0, # <0.5 confidence |
|
'prediction_times': [], # Processing time per prediction |
|
'emotion_transitions': 0, # Count of emotion changes |
|
'stability_score': 0.0, # How stable predictions are |
|
'average_confidence': 0.0, |
|
'confidence_trend': [], # Last 10 confidence values for trend analysis |
|
'processing_fps': 0.0, # Processing speed |
|
'last_prediction': None |
|
} |
|
|
|
self.metrics_text = "" |
|
self.start_time = None |
|
|
|
try: |
|
self.model = load_model(model_path) |
|
self.scaler = joblib.load(scaler_path) |
|
|
|
self.selector = None |
|
self.pca = None |
|
|
|
if selector_path: |
|
try: |
|
self.selector = joblib.load(selector_path) |
|
print("Feature selector loaded") |
|
except: |
|
print("Feature selector not found, using raw features") |
|
|
|
if pca_path: |
|
try: |
|
self.pca = joblib.load(pca_path) |
|
print("PCA reducer loaded") |
|
except: |
|
print("PCA reducer not found, skipping dimensionality reduction") |
|
|
|
print("Model and preprocessors successfully loaded!") |
|
print(f"Model input shape: {self.model.input_shape}") |
|
print(f"Output classes: {len(self.emotion_labels)}") |
|
|
|
# Elektrot pozisyonları (10-20 sistemi) / Electrode positions (10-20 system) |
|
self.montage = mne.channels.make_standard_montage('standard_1020') |
|
|
|
except Exception as e: |
|
print(f"Model/preprocessor loading error: {e}") |
|
raise |
|
|
|
self.fig = plt.figure(figsize=(14, 8)) |
|
self.fig.suptitle('EEG Duygu Tanıma Sistemi / EEG Emotion Analysis System', fontsize=16, fontweight='bold') |
|
self.setup_plots() |
|
|
|
self.animation = None |
|
self.is_running = False |
|
|
|
def setup_plots(self): |
|
"""4 panelli görselleştirme arayüzünü hazırla (with performance metrics) / Setup 4-panel visualization interface (with performance metrics)""" |
|
|
|
self.ax1 = self.fig.add_subplot(221) |
|
self.ax1.set_title("Live EEG Signals", fontsize=10) |
|
self.ax1.set_xlabel("Time (samples)", fontsize=9) |
|
self.ax1.set_ylabel("Amplitude (µV)", fontsize=9) |
|
self.ax1.grid(True, alpha=0.3) |
|
|
|
self.ax2 = self.fig.add_subplot(222) |
|
self.ax2.set_title("Emotion Probabilities", fontsize=10) |
|
self.ax2.set_xlim(0, 1) |
|
|
|
self.ax3 = self.fig.add_subplot(223) |
|
self.ax3.set_title("Performance Metrics & Confidence Trend", fontsize=10) |
|
|
|
self.ax4 = self.fig.add_subplot(224) |
|
self.ax4.set_title("Electrode Contributions", fontsize=10) |
|
self.ax4.set_xlim(0, 1) |
|
|
|
plt.tight_layout(pad=1.0) |
|
|
|
def generate_realistic_eeg_signal(self, emotion_bias=None): |
|
noise = np.random.normal(0, 3e-6, (6, self.buffer_size)) |
|
|
|
t = np.linspace(0, self.buffer_size/self.fs, self.buffer_size) |
|
|
|
alpha_freq = np.random.uniform(8, 12) # Alpha dominant |
|
beta_freq = np.random.uniform(15, 25) # Beta |
|
|
|
for ch in range(6): |
|
if emotion_bias == 0: # Happy - higher beta |
|
beta_amp = np.random.uniform(4e-6, 6e-6) |
|
alpha_amp = np.random.uniform(2e-6, 3e-6) |
|
elif emotion_bias == 1: # Angry - very high beta |
|
beta_amp = np.random.uniform(5e-6, 7e-6) |
|
alpha_amp = np.random.uniform(1e-6, 2e-6) |
|
elif emotion_bias == 2: # Sad - lower activity |
|
beta_amp = np.random.uniform(1e-6, 2e-6) |
|
alpha_amp = np.random.uniform(3e-6, 5e-6) |
|
elif emotion_bias == 3: # Calm - high alpha |
|
beta_amp = np.random.uniform(1e-6, 3e-6) |
|
alpha_amp = np.random.uniform(4e-6, 6e-6) |
|
else: # Random |
|
beta_amp = np.random.uniform(2e-6, 4e-6) |
|
alpha_amp = np.random.uniform(2e-6, 4e-6) |
|
|
|
noise[ch] += alpha_amp * np.sin(2 * np.pi * alpha_freq * t + np.random.random() * 2 * np.pi) |
|
noise[ch] += beta_amp * np.sin(2 * np.pi * beta_freq * t + np.random.random() * 2 * np.pi) |
|
|
|
return noise.astype(np.float32) |
|
|
|
def update_buffer(self, new_data): |
|
samples_to_add = min(new_data.shape[1], self.buffer_size // 4) |
|
self.raw_buffer = np.roll(self.raw_buffer, -samples_to_add, axis=1) |
|
self.raw_buffer[:, -samples_to_add:] = new_data[:, :samples_to_add] |
|
|
|
def extract_lightweight_features(self, signal_data): |
|
features = [] |
|
|
|
for channel_data in signal_data: |
|
time_features = [ |
|
np.mean(channel_data), |
|
np.std(channel_data), |
|
np.ptp(channel_data), |
|
np.median(channel_data), |
|
np.mean(np.abs(channel_data)), |
|
np.sqrt(np.mean(channel_data**2)) |
|
] |
|
|
|
try: |
|
fft_vals = np.abs(np.fft.rfft(channel_data[::4])) |
|
freqs = np.fft.rfftfreq(len(channel_data)//4, 4/self.fs) |
|
|
|
delta_power = np.sum(fft_vals[(freqs >= 0.5) & (freqs <= 4)]) |
|
theta_power = np.sum(fft_vals[(freqs >= 4) & (freqs <= 8)]) |
|
alpha_power = np.sum(fft_vals[(freqs >= 8) & (freqs <= 13)]) |
|
beta_power = np.sum(fft_vals[(freqs >= 13) & (freqs <= 30)]) |
|
total_power = np.sum(fft_vals) + 1e-10 |
|
|
|
freq_features = [ |
|
delta_power / total_power, |
|
theta_power / total_power, |
|
alpha_power / total_power, |
|
beta_power / total_power |
|
] |
|
except: |
|
freq_features = [0.25, 0.25, 0.25, 0.25] |
|
|
|
nonlinear_features = [ |
|
np.std(np.diff(channel_data)) / (np.std(channel_data) + 1e-10), |
|
np.mean(np.abs(np.diff(channel_data))) |
|
] |
|
|
|
channel_features = time_features + freq_features + nonlinear_features |
|
features.extend(channel_features) |
|
|
|
return np.array(features, dtype=np.float32) |
|
|
|
def calculate_channel_contributions(self, signal_data): |
|
contributions = np.zeros(6) |
|
for i in range(6): |
|
contributions[i] = np.sqrt(np.mean(signal_data[i]**2)) |
|
|
|
total = np.sum(contributions) + 1e-10 |
|
return contributions / total |
|
|
|
def update_performance_metrics(self, predicted_class, confidence, processing_time): |
|
metrics = self.performance_metrics |
|
|
|
metrics['total_predictions'] += 1 |
|
|
|
if confidence > 0.8: |
|
metrics['high_confidence_predictions'] += 1 |
|
elif confidence < 0.5: |
|
metrics['low_confidence_predictions'] += 1 |
|
|
|
metrics['prediction_times'].append(processing_time) |
|
if len(metrics['prediction_times']) > 50: |
|
metrics['prediction_times'].pop(0) |
|
|
|
if metrics['prediction_times']: |
|
avg_time = np.mean(metrics['prediction_times']) |
|
metrics['processing_fps'] = 1.0 / max(avg_time, 0.001) |
|
|
|
if metrics['last_prediction'] is not None and metrics['last_prediction'] != predicted_class: |
|
metrics['emotion_transitions'] += 1 |
|
metrics['last_prediction'] = predicted_class |
|
|
|
metrics['confidence_trend'].append(float(confidence)) |
|
if len(metrics['confidence_trend']) > 10: |
|
metrics['confidence_trend'].pop(0) |
|
|
|
if self.confidence_history: |
|
metrics['average_confidence'] = np.mean(self.confidence_history) |
|
|
|
if metrics['total_predictions'] > 1: |
|
transition_rate = metrics['emotion_transitions'] / metrics['total_predictions'] |
|
metrics['stability_score'] = max(0, 1.0 - transition_rate) |
|
|
|
def update_plot(self, frame): |
|
if not self.is_running: |
|
return |
|
|
|
start_time = time.time() |
|
|
|
if frame % 2 == 0: |
|
if np.random.random() < 0.2: |
|
emotion_bias = np.random.randint(0, 4) |
|
else: |
|
emotion_bias = None |
|
|
|
new_samples = self.buffer_size // 8 |
|
new_data = self.generate_realistic_eeg_signal(emotion_bias)[:, :new_samples] |
|
self.update_buffer(new_data) |
|
|
|
prediction_start = time.time() |
|
features = self.extract_lightweight_features(self.raw_buffer) |
|
|
|
try: |
|
scaled_features = self.scaler.transform([features]) |
|
|
|
if self.selector is not None: |
|
scaled_features = self.selector.transform(scaled_features) |
|
|
|
if self.pca is not None: |
|
scaled_features = self.pca.transform(scaled_features) |
|
|
|
probs = self.model.predict(scaled_features, verbose=0)[0] |
|
predicted_class = np.argmax(probs) |
|
confidence = np.max(probs) |
|
|
|
except Exception as e: |
|
print(f"Prediction error: {e}") |
|
probs = np.array([0.25, 0.25, 0.25, 0.25]) |
|
predicted_class = 0 |
|
confidence = 0.25 |
|
|
|
prediction_time = time.time() - prediction_start |
|
|
|
self.update_performance_metrics(predicted_class, confidence, prediction_time) |
|
|
|
self.prediction_history.append(predicted_class) |
|
self.confidence_history.append(confidence) |
|
self.time_history.append(datetime.now()) |
|
|
|
if len(self.prediction_history) > self.max_history: |
|
self.prediction_history.pop(0) |
|
self.confidence_history.pop(0) |
|
self.time_history.pop(0) |
|
|
|
contributions = self.calculate_channel_contributions(self.raw_buffer) |
|
|
|
self.update_eeg_plot() |
|
self.update_probabilities(probs) |
|
self.update_performance_plot() |
|
self.update_contributions(contributions) |
|
|
|
emotion_name = self.emotion_labels[predicted_class] |
|
metrics = self.performance_metrics |
|
elapsed_time = time.time() - self.start_time if self.start_time else 0 |
|
|
|
print(f"\r{datetime.now().strftime('%H:%M:%S')} | " |
|
f"Emotion: {emotion_name} | " |
|
f"Conf: {confidence:.3f} | " |
|
f"FPS: {metrics['processing_fps']:.1f} | " |
|
f"Stab: {metrics['stability_score']:.2f} | " |
|
f"Total: {metrics['total_predictions']} | " |
|
f"Time: {elapsed_time:.0f}s", end='') |
|
|
|
def update_eeg_plot(self): |
|
self.ax1.clear() |
|
|
|
colors = plt.cm.tab10(np.linspace(0, 1, 6)) |
|
display_samples = min(300, self.buffer_size) # Show fewer samples for performance |
|
|
|
for i in range(6): |
|
offset = i * 20e-6 |
|
signal = self.raw_buffer[i, -display_samples:] + offset |
|
self.ax1.plot(signal, label=self.ch_names[i], |
|
color=colors[i], linewidth=1.0, alpha=0.8) |
|
|
|
self.ax1.set_title("Live EEG Signals", fontsize=12) |
|
self.ax1.set_xlabel("Time (samples)") |
|
self.ax1.set_ylabel("Amplitude (µV)") |
|
self.ax1.legend(loc='upper right', fontsize=8) |
|
self.ax1.grid(True, alpha=0.3) |
|
|
|
def update_performance_plot(self): |
|
self.ax3.clear() |
|
|
|
metrics = self.performance_metrics |
|
|
|
if metrics['total_predictions'] > 0: |
|
high_conf_pct = (metrics['high_confidence_predictions'] / metrics['total_predictions']) * 100 |
|
low_conf_pct = (metrics['low_confidence_predictions'] / metrics['total_predictions']) * 100 |
|
|
|
metrics_text = f"""PERFORMANCE METRICS |
|
|
|
Total Predictions: {metrics['total_predictions']} |
|
Average Confidence: {metrics['average_confidence']:.3f} |
|
High Confidence (>0.8): {high_conf_pct:.1f}% |
|
Low Confidence (<0.5): {low_conf_pct:.1f}% |
|
|
|
Processing Speed: {metrics['processing_fps']:.1f} FPS |
|
Stability Score: {metrics['stability_score']:.3f} |
|
Emotion Transitions: {metrics['emotion_transitions']} |
|
|
|
Model Accuracy: {high_conf_pct:.1f}% |
|
Response Time: {np.mean(metrics['prediction_times'])*1000:.1f}ms""" |
|
|
|
self.ax3.text(0.02, 0.98, metrics_text, |
|
transform=self.ax3.transAxes, |
|
fontsize=8, verticalalignment='top', |
|
fontfamily='monospace', |
|
bbox=dict(boxstyle="round,pad=0.3", facecolor="lightblue", alpha=0.7)) |
|
|
|
if len(metrics['confidence_trend']) > 1: |
|
trend_x = np.arange(len(metrics['confidence_trend'])) |
|
trend_data = np.array(metrics['confidence_trend'], dtype=np.float64) |
|
self.ax3.plot(trend_x + 0.6, trend_data * 0.4 + 0.1, |
|
'g-o', markersize=3, linewidth=2, label='Confidence Trend') |
|
|
|
# Add trend analysis |
|
if len(metrics['confidence_trend']) > 3: |
|
try: |
|
x_data = np.array(range(len(trend_data[-5:])), dtype=np.float64) |
|
y_data = np.array(trend_data[-5:], dtype=np.float64) |
|
recent_trend = np.polyfit(x_data, y_data, 1)[0] |
|
trend_direction = "↗" if recent_trend > 0.01 else "↘" if recent_trend < -0.01 else "→" |
|
self.ax3.text(0.7, 0.9, f"Trend: {trend_direction}", |
|
transform=self.ax3.transAxes, fontsize=10, fontweight='bold') |
|
except: |
|
# Fallback if polyfit fails |
|
self.ax3.text(0.7, 0.9, f"Trend: →", |
|
transform=self.ax3.transAxes, fontsize=10, fontweight='bold') |
|
|
|
self.ax3.set_xlim(0, 1) |
|
self.ax3.set_ylim(0, 1) |
|
self.ax3.set_title("Performance Metrics & Confidence Trend", fontsize=10) |
|
|
|
if metrics['average_confidence'] > 0.8: |
|
title_color = 'green' |
|
elif metrics['average_confidence'] > 0.6: |
|
title_color = 'orange' |
|
else: |
|
title_color = 'red' |
|
self.ax3.title.set_color(title_color) |
|
|
|
def update_contributions(self, contributions): |
|
self.ax4.clear() |
|
|
|
colors = plt.cm.viridis(contributions) |
|
bars = self.ax4.barh(self.ch_names, contributions, color=colors) |
|
|
|
for i, (bar, v) in enumerate(zip(bars, contributions)): |
|
if v > 0.05: |
|
self.ax4.text(v + 0.02, i, f"{v*100:.1f}%", |
|
va='center', fontsize=9) |
|
|
|
self.ax4.set_title("Electrode Contributions", fontsize=10) |
|
self.ax4.set_xlabel("Contribution Rate", fontsize=9) |
|
self.ax4.set_xlim(0, 0.6) |
|
self.ax4.grid(True, alpha=0.3, axis='x') |
|
|
|
def update_probabilities(self, probs): |
|
self.ax2.clear() |
|
|
|
emotions = [self.emotion_labels[i] for i in range(4)] |
|
bars = self.ax2.barh(emotions, probs, color=self.emotion_colors) |
|
|
|
max_idx = np.argmax(probs) |
|
bars[max_idx].set_edgecolor('black') |
|
bars[max_idx].set_linewidth(2) |
|
|
|
for bar, prob in zip(bars, probs): |
|
width = bar.get_width() |
|
if width > 0.05: |
|
self.ax2.text(width + 0.02, bar.get_y() + bar.get_height()/2, |
|
f"{width*100:.1f}%", ha='left', va='center', |
|
fontsize=9, fontweight='bold') |
|
|
|
self.ax2.set_title("Emotion Probabilities", fontsize=12) |
|
self.ax2.set_xlabel("Probability") |
|
self.ax2.set_xlim(0, 1) |
|
self.ax2.grid(True, alpha=0.3, axis='x') |
|
|
|
current_emotion = emotions[max_idx] |
|
confidence = probs[max_idx] |
|
self.ax2.text(0.5, 1.05, f"Current: {current_emotion} ({confidence*100:.1f}%)", |
|
transform=self.ax2.transAxes, ha='center', |
|
fontsize=10, fontweight='bold', color=self.emotion_colors[max_idx]) |
|
|
|
def start_monitoring(self): |
|
print("\n" + "="*60) |
|
print(" OPTIMIZED EEG EMOTION RECOGNITION MONITOR") |
|
print("="*60) |
|
print("\nPress 'X' to close the window...") |
|
print("Real-time performance metrics will be displayed") |
|
print("-"*60) |
|
|
|
self.is_running = True |
|
self.start_time = time.time() |
|
|
|
self.animation = FuncAnimation( |
|
self.fig, |
|
self.update_plot, |
|
interval=self.update_interval, |
|
blit=False, |
|
cache_frame_data=False |
|
) |
|
|
|
plt.show() |
|
|
|
self.is_running = False |
|
print("\n\nMonitoring stopped.") |
|
|
|
if self.prediction_history: |
|
self.print_summary_statistics() |
|
|
|
def print_summary_statistics(self): |
|
print("\n" + "="*80) |
|
print(" DETAILED PERFORMANCE & STATISTICS SUMMARY") |
|
print("="*80) |
|
|
|
if not self.prediction_history: |
|
print("No data collected.") |
|
return |
|
|
|
metrics = self.performance_metrics |
|
total_time = time.time() - self.start_time if self.start_time else 0 |
|
|
|
print("\n📊 PERFORMANCE METRICS:") |
|
print(f" Total Predictions: {metrics['total_predictions']}") |
|
print(f" Total Runtime: {total_time:.1f} seconds") |
|
print(f" Average Processing Speed: {metrics['processing_fps']:.1f} FPS") |
|
print(f" Average Response Time: {np.mean(metrics['prediction_times'])*1000:.1f}ms") |
|
print(f" Model Stability Score: {metrics['stability_score']:.3f} (0-1, higher=better)") |
|
print(f" Emotion Transitions: {metrics['emotion_transitions']}") |
|
|
|
print(f"\n🎯 CONFIDENCE ANALYSIS:") |
|
total = len(self.prediction_history) |
|
high_conf_count = metrics['high_confidence_predictions'] |
|
low_conf_count = metrics['low_confidence_predictions'] |
|
medium_conf_count = max(0, total - high_conf_count - low_conf_count) |
|
|
|
print(f" Average Confidence: {metrics['average_confidence']:.3f}") |
|
print(f" Confidence Std Dev: {np.std(self.confidence_history):.3f}") |
|
print(f" High Confidence (>0.8): {high_conf_count} ({high_conf_count/total*100:.1f}%)") |
|
print(f" Medium Confidence (0.5-0.8): {medium_conf_count} ({medium_conf_count/total*100:.1f}%)") |
|
print(f" Low Confidence (<0.5): {low_conf_count} ({low_conf_count/total*100:.1f}%)") |
|
|
|
accuracy_score = high_conf_count / total * 100 if total > 0 else 0 |
|
print(f"\n🏆 MODEL QUALITY ASSESSMENT:") |
|
print(f" Estimated Accuracy: {accuracy_score:.1f}% (based on high confidence predictions)") |
|
|
|
if accuracy_score >= 80: |
|
quality = "EXCELLENT 🌟" |
|
elif accuracy_score >= 70: |
|
quality = "GOOD ✅" |
|
elif accuracy_score >= 60: |
|
quality = "FAIR ⚠️" |
|
else: |
|
quality = "POOR ❌" |
|
print(f" Model Quality Rating: {quality}") |
|
|
|
emotion_counts = {i: 0 for i in range(4)} |
|
for pred in self.prediction_history: |
|
emotion_counts[pred] += 1 |
|
|
|
print(f"\n😊 EMOTION DISTRIBUTION:") |
|
for emotion_id, count in emotion_counts.items(): |
|
percentage = (count / total) * 100 |
|
bar = "█" * int(percentage / 5) |
|
print(f" {self.emotion_labels[emotion_id]:<15}: {count:>3} ({percentage:>5.1f}%) {bar}") |
|
|
|
dominant_emotion = max(emotion_counts, key=emotion_counts.get) |
|
dominant_percentage = emotion_counts[dominant_emotion] / total * 100 |
|
print(f"\n Dominant Emotion: {self.emotion_labels[dominant_emotion]} ({dominant_percentage:.1f}%)") |
|
|
|
if len(metrics['confidence_trend']) > 3: |
|
try: |
|
x_data = np.array(range(len(metrics['confidence_trend'])), dtype=np.float64) |
|
y_data = np.array(metrics['confidence_trend'], dtype=np.float64) |
|
trend_slope = np.polyfit(x_data, y_data, 1)[0] |
|
print(f"\n📈 TREND ANALYSIS:") |
|
if trend_slope > 0.01: |
|
trend_desc = "IMPROVING ↗" |
|
elif trend_slope < -0.01: |
|
trend_desc = "DECLINING ↘" |
|
else: |
|
trend_desc = "STABLE →" |
|
print(f" Recent Confidence Trend: {trend_desc} (slope: {trend_slope:.4f})") |
|
except Exception as e: |
|
print(f"\n📈 TREND ANALYSIS:") |
|
print(f" Recent Confidence Trend: STABLE → (analysis unavailable)") |
|
|
|
print(f"\n💡 RECOMMENDATIONS:") |
|
if accuracy_score < 70: |
|
print(" • Consider retraining the model with more data") |
|
print(" • Check data quality and preprocessing steps") |
|
if metrics['stability_score'] < 0.7: |
|
print(" • Model predictions are unstable - review signal quality") |
|
if metrics['processing_fps'] < 5: |
|
print(" • Processing speed is slow - consider model optimization") |
|
if accuracy_score >= 80 and metrics['stability_score'] >= 0.8: |
|
print(" • Model performance is excellent! ✨") |
|
|
|
print("\n" + "="*80) |
|
|
|
|
|
def main(): |
|
model_path = 'path/to/bai-6 EmotionOptimized.h5' |
|
scaler_path = 'path/to/bai-6 ScalerOptimized.pkl' |
|
selector_path = 'path/to/bai-6_feature_selector_opt.pkl' |
|
pca_path = 'path/to/bai-6_pca_reducer_opt.pkl' |
|
|
|
try: |
|
monitor = EEGEmotionMonitorOptimized( |
|
model_path, scaler_path, selector_path, pca_path |
|
) |
|
|
|
monitor.start_monitoring() |
|
|
|
except FileNotFoundError as e: |
|
print(f"Model or preprocessor file not found: {e}") |
|
print("Please ensure the model has been trained and saved.") |
|
print("Available fallback: Using basic model without feature selection/PCA") |
|
|
|
try: |
|
basic_model_path = 'path/to/bai-6 EmotionOptimized.h5' |
|
basic_scaler_path = 'path/to/bai-6 ScalerOptimized.pkl' |
|
|
|
monitor = EEGEmotionMonitorOptimized(basic_model_path, basic_scaler_path) |
|
monitor.start_monitoring() |
|
|
|
except Exception as e2: |
|
print(f"Fallback also failed: {e2}") |
|
|
|
except Exception as e: |
|
print(f"Error: {e}") |
|
import traceback |
|
traceback.print_exc() |
|
|
|
|
|
if __name__ == "__main__": |
|
main() |
|
``` |
|
|
|
</details> |
|
|
|
------------- |
|
## Lisans/License |
|
CC-BY-NC-SA-4.0 |