bci_mcp.core.device

Device abstraction shared by every EEG backend.

Chunk dataclass

Source code in src/bci_mcp/core/device.py
21
22
23
24
@dataclass
class Chunk:
    data: np.ndarray  # shape (channel_count, n_samples), float32, microvolts
    timestamps: np.ndarray  # shape (n_samples,), seconds

Device

Bases: ABC

A streaming EEG source. Subclasses run their own acquisition internally.

Source code in src/bci_mcp/core/device.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
class Device(ABC):
    """A streaming EEG source. Subclasses run their own acquisition internally."""

    info: DeviceInfo

    @abstractmethod
    def connect(self) -> None: ...

    @abstractmethod
    def start(self) -> None: ...

    @abstractmethod
    def read(self) -> Chunk | None:
        """Non-blocking pull of newly available samples, or None if none/not streaming."""

    @abstractmethod
    def stop(self) -> None: ...

    @abstractmethod
    def disconnect(self) -> None: ...

    def __enter__(self) -> Device:
        self.connect()
        self.start()
        return self

    def __exit__(self, *exc: object) -> None:
        self.stop()
        self.disconnect()

read abstractmethod

read()

Non-blocking pull of newly available samples, or None if none/not streaming.

Source code in src/bci_mcp/core/device.py
38
39
40
@abstractmethod
def read(self) -> Chunk | None:
    """Non-blocking pull of newly available samples, or None if none/not streaming."""

DeviceInfo dataclass

Source code in src/bci_mcp/core/device.py
10
11
12
13
14
15
16
17
18
@dataclass
class DeviceInfo:
    name: str
    uri: str
    sample_rate: float
    channel_count: int
    channel_names: list[str]
    units: str = "uV"  # "uV" | "counts"
    extra: dict = field(default_factory=dict)

bci_mcp.core.registry

URI-based device registry: create_device('synthetic://?focus=0.8').

DeviceFactory module-attribute

DeviceFactory = Callable[[ParseResult, dict[str, str]], Device]

_REGISTRY module-attribute

_REGISTRY = {}

Device

Bases: ABC

A streaming EEG source. Subclasses run their own acquisition internally.

Source code in src/bci_mcp/core/device.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
class Device(ABC):
    """A streaming EEG source. Subclasses run their own acquisition internally."""

    info: DeviceInfo

    @abstractmethod
    def connect(self) -> None: ...

    @abstractmethod
    def start(self) -> None: ...

    @abstractmethod
    def read(self) -> Chunk | None:
        """Non-blocking pull of newly available samples, or None if none/not streaming."""

    @abstractmethod
    def stop(self) -> None: ...

    @abstractmethod
    def disconnect(self) -> None: ...

    def __enter__(self) -> Device:
        self.connect()
        self.start()
        return self

    def __exit__(self, *exc: object) -> None:
        self.stop()
        self.disconnect()

read abstractmethod

read()

Non-blocking pull of newly available samples, or None if none/not streaming.

Source code in src/bci_mcp/core/device.py
38
39
40
@abstractmethod
def read(self) -> Chunk | None:
    """Non-blocking pull of newly available samples, or None if none/not streaming."""

create_device

create_device(uri)
Source code in src/bci_mcp/core/registry.py
17
18
19
20
21
22
23
24
25
def create_device(uri: str) -> Device:
    parsed = urlparse(uri)
    scheme = parsed.scheme or "synthetic"
    if scheme not in _REGISTRY:
        raise ValueError(
            f"Unknown device scheme '{scheme}'. Known: {sorted(_REGISTRY)}"
        )
    params = {k: v[0] for k, v in parse_qs(parsed.query).items()}
    return _REGISTRY[scheme](parsed, params)

discover

discover()

Best-effort device discovery: always-available schemes + scanned serial ports.

Source code in src/bci_mcp/core/registry.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
def discover() -> list[dict]:
    """Best-effort device discovery: always-available schemes + scanned serial ports."""
    entries: list[dict] = [
        {"uri": "synthetic://", "name": "Synthetic EEG (no hardware)",
         "needs_hardware": False},
    ]
    try:  # enumerate serial ports if pyserial is available
        from serial.tools import list_ports

        for port in list_ports.comports():
            entries.append({
                "uri": f"serial://{port.device}", "name": port.description or port.device,
                "needs_hardware": True,
            })
    except Exception:  # pragma: no cover
        pass
    return entries

list_schemes

list_schemes()
Source code in src/bci_mcp/core/registry.py
28
29
def list_schemes() -> list[str]:
    return sorted(_REGISTRY)

register

register(scheme, factory)
Source code in src/bci_mcp/core/registry.py
13
14
def register(scheme: str, factory: DeviceFactory) -> None:
    _REGISTRY[scheme] = factory

bci_mcp.pipeline

Pipeline: ties a Device/Stream to the DSP chain and emits BrainState.

BrainState dataclass

Source code in src/bci_mcp/dsp/state.py
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
@dataclass
class BrainState:
    timestamp: float
    metrics: dict[str, float]
    band_powers: dict[str, float]
    relative_band_powers: dict[str, float]
    signal_quality: str
    quality_score: float
    artifacts: list[str]
    channels: int
    sample_rate: float
    calibrated: bool
    # How much to trust this reading (0..1): folds in signal quality, whether a
    # baseline has been calibrated, and how full the analysis window was.
    confidence: float = 1.0
    # Per-metric confidence (same keys as ``metrics``); lets consumers discount
    # individual values rather than the whole snapshot.
    metric_confidence: dict[str, float] = field(default_factory=dict)
    # "ok" | "warming_up" | "unreliable". "unreliable" means a hard artifact or
    # poor signal contaminated this window — the metrics are still present but
    # should not be interpreted as a clean reading.
    status: str = "ok"

    def to_dict(self) -> dict:
        return asdict(self)

    def summary(self) -> str:
        top = ", ".join(f"{k}={v:.2f}" for k, v in self.metrics.items())
        arts = ", ".join(self.artifacts) if self.artifacts else "none"
        cal = "calibrated" if self.calibrated else "uncalibrated"
        flag = "" if self.status == "ok" else f" [{self.status}]"
        return (
            f"[{cal}]{flag} {top} | signal: {self.signal_quality} "
            f"({self.quality_score:.2f}, conf {self.confidence:.2f}) | "
            f"artifacts: {arts}"
        )

Calibration dataclass

Source code in src/bci_mcp/dsp/calibration.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
@dataclass
class Calibration:
    baseline: dict[str, dict[str, float]] = field(default_factory=dict)
    # Optional per-metric (center, scale) used only on the uncalibrated path.
    # Empty → every metric defaults to (1.0, 1.0) i.e. ratio of 1.0 maps to 0.5.
    # The pipeline injects metrics.DEFAULT_SCALING so bounded metrics (calm,
    # meditation) are centered at 0.5 instead of being capped low.
    scaling: dict[str, tuple[float, float]] = field(default_factory=dict)

    @property
    def calibrated(self) -> bool:
        return bool(self.baseline)

    def apply(self, raw: dict[str, float]) -> dict[str, float]:
        out: dict[str, float] = {}
        for key, value in raw.items():
            if key in self.baseline:
                mean = self.baseline[key]["mean"]
                std = self.baseline[key]["std"] or 1.0
                z = (value - mean) / std
            else:
                center, scale = self.scaling.get(key, (1.0, 1.0))
                z = (value - center) / (scale or 1.0)
            z = max(-_Z_CLAMP, min(_Z_CLAMP, z))
            out[key] = 1.0 / (1.0 + math.exp(-z))
        return out

    def to_json(self) -> str:
        return json.dumps({"baseline": self.baseline})

    @classmethod
    def from_json(cls, text: str) -> Calibration:
        return cls(baseline=json.loads(text)["baseline"])

    @classmethod
    def from_samples(
        cls,
        raw_list: list[dict[str, float]],
        scaling: dict[str, tuple[float, float]] | None = None,
    ) -> Calibration:
        if not raw_list:
            raise ValueError("from_samples requires at least one sample")
        keys = raw_list[0].keys()
        baseline = {
            k: {
                "mean": float(np.mean([r[k] for r in raw_list])),
                "std": float(np.std([r[k] for r in raw_list])),
            }
            for k in keys
        }
        for k, stats in baseline.items():
            if stats["std"] == 0.0:
                warnings.warn(
                    f"Calibration std=0 for '{k}'; baseline may be degenerate",
                    stacklevel=2,
                )
        return cls(baseline=baseline, scaling=scaling or {})

Device

Bases: ABC

A streaming EEG source. Subclasses run their own acquisition internally.

Source code in src/bci_mcp/core/device.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
class Device(ABC):
    """A streaming EEG source. Subclasses run their own acquisition internally."""

    info: DeviceInfo

    @abstractmethod
    def connect(self) -> None: ...

    @abstractmethod
    def start(self) -> None: ...

    @abstractmethod
    def read(self) -> Chunk | None:
        """Non-blocking pull of newly available samples, or None if none/not streaming."""

    @abstractmethod
    def stop(self) -> None: ...

    @abstractmethod
    def disconnect(self) -> None: ...

    def __enter__(self) -> Device:
        self.connect()
        self.start()
        return self

    def __exit__(self, *exc: object) -> None:
        self.stop()
        self.disconnect()

read abstractmethod

read()

Non-blocking pull of newly available samples, or None if none/not streaming.

Source code in src/bci_mcp/core/device.py
38
39
40
@abstractmethod
def read(self) -> Chunk | None:
    """Non-blocking pull of newly available samples, or None if none/not streaming."""

Pipeline

Source code in src/bci_mcp/pipeline.py
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
class Pipeline:
    def __init__(self, device: Device | str, window_seconds: float = 2.0,
                 notch_freq: float = 60.0) -> None:
        self.device = create_device(device) if isinstance(device, str) else device
        self.stream = Stream(self.device)
        # ~2 s gives Welch enough segments to resolve the low-frequency
        # (delta/theta) bands that several metrics depend on, instead of a single
        # noisy 1 s periodogram.
        self.window = int(self.device.info.sample_rate * window_seconds)
        self.notch_freq = notch_freq
        self.calibration = Calibration(scaling=metrics_mod.DEFAULT_SCALING)

    def start(self) -> None:
        self.stream.start()

    def stop(self) -> None:
        self.stream.stop()

    def _raw_metrics_now(self):
        fs = self.device.info.sample_rate
        data = self.stream.latest(self.window)
        if data.shape[1] < max(int(fs * 0.5), 64):
            return None, None, data, fs
        # Notch first (remove line noise before it can fold into the passband),
        # then bandpass to the 1-45 Hz analysis range.
        filtered = filters.notch(data, fs, self.notch_freq)
        filtered = filters.bandpass(filtered, fs)
        bp = bands.band_powers(filtered, fs)
        return metrics_mod.raw_metrics(bp), bp, data, fs

    def _confidence(self, quality_score: float, samples: int) -> float:
        """How much to trust a reading: signal quality × calibration × window fill."""
        fill = min(1.0, samples / float(self.window)) if self.window else 1.0
        cal_factor = 1.0 if self.calibration.calibrated else 0.6
        return float(max(0.0, min(1.0, quality_score * cal_factor * fill)))

    def current_state(self) -> BrainState | None:
        raw, bp, data, fs = self._raw_metrics_now()
        if raw is None:
            return None
        scaled = self.calibration.apply(raw)
        q_score, q_label, artifacts = quality_mod.assess_quality(data, fs)
        confidence = self._confidence(q_score, int(data.shape[1]))
        # Artifact handling, not just detection: a hard artifact or poor signal
        # flags the reading unreliable and collapses its confidence, so the
        # metrics are never narrated as a clean reading.
        hard = any(a in quality_mod.HARD_ARTIFACTS for a in artifacts)
        status = "unreliable" if (hard or q_label == "poor") else "ok"
        if status == "unreliable":
            confidence = min(confidence, 0.1)
        confidence = round(confidence, 4)
        return BrainState(
            timestamp=time.time(),
            metrics=scaled,
            band_powers=bp,
            relative_band_powers=bands.relative_band_powers(bp),
            signal_quality=q_label,
            quality_score=q_score,
            artifacts=artifacts,
            channels=self.device.info.channel_count,
            sample_rate=fs,
            calibrated=self.calibration.calibrated,
            confidence=confidence,
            metric_confidence={k: confidence for k in scaled},
            status=status,
        )

    def calibrate(self, seconds: float = 20.0) -> Calibration:
        samples = []
        end = time.time() + seconds
        while time.time() < end:
            raw, _, _, _ = self._raw_metrics_now()
            if raw is not None:
                samples.append(raw)
            time.sleep(0.25)
        if samples:
            self.calibration = Calibration.from_samples(
                samples, scaling=metrics_mod.DEFAULT_SCALING)
        return self.calibration

    def record(self, seconds: float, path: str, fmt: str | None = None) -> str:
        from .recording.recorder import Recorder
        from .recording.writer import save_recording

        recorder = Recorder()
        self.stream.add_consumer(recorder)
        recorder.start()
        try:
            time.sleep(seconds)
        finally:
            recorder.stop()
            self.stream.remove_consumer(recorder)
        data = recorder.data()
        return save_recording(
            data, self.device.info.sample_rate, self.device.info.channel_names, path, fmt,
            metadata={"device": self.device.info.name, "uri": self.device.info.uri},
        )

Stream

Source code in src/bci_mcp/core/stream.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
class Stream:
    def __init__(self, device: Device, buffer_seconds: float = 10.0) -> None:
        self.device = device
        capacity = int(device.info.sample_rate * buffer_seconds)
        self.buffer = RingBuffer(device.info.channel_count, capacity)
        self._consumers: list[Callable[[Chunk], None]] = []
        self._thread: threading.Thread | None = None
        self._running = False
        self._lock = threading.Lock()

    def add_consumer(self, callback: Callable[[Chunk], None]) -> None:
        self._consumers.append(callback)

    def remove_consumer(self, callback: Callable[[Chunk], None]) -> None:
        if callback in self._consumers:
            self._consumers.remove(callback)

    def start(self) -> None:
        if self._running:
            return
        self.device.connect()
        self.device.start()
        self._running = True
        self._thread = threading.Thread(target=self._run, daemon=True)
        self._thread.start()

    def _run(self) -> None:
        chunk_samples = getattr(self.device, "chunk_samples", None)
        period = (chunk_samples / self.device.info.sample_rate) if chunk_samples else 0.01
        while self._running:
            chunk = self.device.read()
            if chunk is not None and chunk.data.shape[1] > 0:
                with self._lock:
                    self.buffer.write(chunk.data)
                for cb in self._consumers:
                    try:
                        cb(chunk)
                    except Exception:
                        logging.getLogger(__name__).exception("Consumer callback raised")
            time.sleep(period)

    def latest(self, n: int) -> np.ndarray:
        with self._lock:
            return self.buffer.latest(n)

    def stop(self) -> None:
        self._running = False
        if self._thread is not None:
            self._thread.join(timeout=1.0)
        self.device.stop()
        self.device.disconnect()

create_device

create_device(uri)
Source code in src/bci_mcp/core/registry.py
17
18
19
20
21
22
23
24
25
def create_device(uri: str) -> Device:
    parsed = urlparse(uri)
    scheme = parsed.scheme or "synthetic"
    if scheme not in _REGISTRY:
        raise ValueError(
            f"Unknown device scheme '{scheme}'. Known: {sorted(_REGISTRY)}"
        )
    params = {k: v[0] for k, v in parse_qs(parsed.query).items()}
    return _REGISTRY[scheme](parsed, params)

bci_mcp.dsp.bands

Band power via Welch PSD.

Band power is the integral of the Welch power spectral density over each band (scipy welch + trapezoid). This is the standard EEG band-power estimator; by Parseval it equals the variance (RMS²) of the band-filtered signal up to scaling, so the choice of "PSD vs RMS" is a unit convention, not a correctness question.

Two practical details handled here:

  • Welch averaging. nperseg targets ~1 s segments with 50% overlap, so a longer analysis window is averaged over several segments (lower-variance PSD) instead of returning a single noisy periodogram.
  • Narrow-band guard. trapezoid over a single frequency bin returns 0. At coarse resolution a narrow band (e.g. gamma at a low sample rate) can capture just one bin, which would silently zero that band's power; we fall back to rectangular integration in that case.

BANDS module-attribute

BANDS = {'delta': (1.0, 4.0), 'theta': (4.0, 8.0), 'alpha': (8.0, 13.0), 'beta': (13.0, 30.0), 'gamma': (30.0, 45.0)}

band_powers

band_powers(data, fs, nperseg=None)

Mean absolute power per band, averaged over channels (µV²).

Source code in src/bci_mcp/dsp/bands.py
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
def band_powers(data: np.ndarray, fs: float, nperseg: int | None = None) -> dict[str, float]:
    """Mean absolute power per band, averaged over channels (µV²)."""
    data = np.atleast_2d(data)
    n = data.shape[-1]
    if nperseg is None:
        # ~1 s segments → Welch averages multiple segments when the window is
        # longer, reducing variance; falls back to one segment for short windows.
        nperseg = int(min(n, max(fs, 1.0)))
    nperseg = max(1, min(int(nperseg), n))
    freqs, psd = welch(data, fs=fs, nperseg=nperseg, noverlap=nperseg // 2)
    psd = np.atleast_2d(psd)
    df = float(freqs[1] - freqs[0]) if len(freqs) > 1 else 1.0
    out: dict[str, float] = {}
    for band, (lo, hi) in BANDS.items():
        mask = (freqs >= lo) & (freqs < hi)
        nbins = int(np.count_nonzero(mask))
        if nbins == 0:
            out[band] = 0.0
        elif nbins == 1:
            # trapezoid over a single sample is 0 — use rectangular integration.
            out[band] = float(np.mean(psd[:, mask].sum(axis=-1)) * df)
        else:
            out[band] = float(np.mean(trapezoid(psd[:, mask], freqs[mask], axis=-1)))
    return out

relative_band_powers

relative_band_powers(bp)
Source code in src/bci_mcp/dsp/bands.py
63
64
65
def relative_band_powers(bp: dict[str, float]) -> dict[str, float]:
    total = sum(bp.values()) or 1.0
    return {k: v / total for k, v in bp.items()}

bci_mcp.dsp.metrics

Cognitive metrics derived from band powers.

These are heuristic band-power ratios, not validated clinical measurements. Each one is a recognizable simplification of an index from the literature; the METRIC_INFO table below records the exact formula, the work it draws on, and an honest caveat for every metric. Downstream consumers (CLI, dashboard, MCP/LLM) should treat the values as proxies, weight them by the per-reading confidence the pipeline attaches, and never present them as ground-truth cognitive states.

Design notes

  • focus uses the genuine Pope, Bogart & Bartolome (1995) engagement index β / (α + θ) — the best-validated of these ratios.
  • engagement is a distinct arousal ratio β / α (not the Pope index), so the two are not duplicates.
  • Delta and gamma are intentionally excluded from every metric: on consumer hardware over short windows, 30-45 Hz "gamma" is dominated by EMG and <1-4 Hz delta by drift/ocular artifact, so building user-facing metrics on them adds noise, not signal. They are still reported as raw band powers.
  • The metrics are chosen so no two are collinear on the same band in the same direction — in particular meditation falls with theta while fatigue rises with it, so a drowsy reading cannot score high on both (a relative- theta "meditation" metric would have confounded the two).

References

  • Pope, Bogart & Bartolome (1995), Biological Psychology — engagement index β / (α + θ).
  • Lubar (1991); Monastra et al. (1999), Neuropsychology — theta/beta ratio (TBR) for attention/ADHD; diagnostic validity later contested (Arns et al. 2013).
  • Eoh, Chung & Kim (2005), Int. J. Industrial Ergonomics(θ + α) / β as a driver mental-fatigue index.
  • Berger (1929); NeuroSky eSense — alpha as the canonical relaxed-wakefulness rhythm underlying consumer calm/meditation meters.

BOUNDED_METRICS module-attribute

BOUNDED_METRICS = frozenset({'calm', 'meditation'})

DEFAULT_SCALING module-attribute

DEFAULT_SCALING = {'focus': (1.0, 1.0), 'calm': (0.5, 0.2), 'attention': (1.0, 1.0), 'engagement': (1.0, 1.0), 'fatigue': (1.0, 1.0), 'meditation': (0.5, 0.2)}

METRIC_INFO module-attribute

METRIC_INFO = {'focus': {'formula': 'beta / (alpha + theta)', 'basis': 'Pope et al. (1995) EEG engagement/concentration index — the best-validated of these ratios.', 'caveat': "A proxy for task engagement/concentration, not a direct measurement of 'focus'."}, 'calm': {'formula': 'alpha / (alpha + beta)', 'basis': 'Alpha-up / beta-down is a long-standing relaxation correlate.', 'caveat': "Resting alpha also tracks drowsiness and eyes-closed state, so high 'calm' is not necessarily relaxed attention."}, 'attention': {'formula': 'beta / theta (inverse of the theta/beta ratio, TBR)', 'basis': 'TBR (Lubar 1991; Monastra 1999) is the classic inattention marker.', 'caveat': "TBR's diagnostic validity is contested (Arns et al. 2013) and it is a between-subjects trait marker, not a moment-to-moment readout."}, 'engagement': {'formula': 'beta / alpha', 'basis': 'Beta/alpha arousal–activation ratio.', 'caveat': "An arousal proxy, distinct from the Pope engagement index (that formula is used by 'focus')."}, 'fatigue': {'formula': '(theta + alpha) / beta', 'basis': 'Eoh et al. (2005) driver mental-fatigue / drowsiness index.', 'caveat': 'Slow-wave dominance is ambiguous; corroborate with low engagement before concluding fatigue.'}, 'meditation': {'formula': 'alpha / (alpha + beta + theta)', 'basis': 'Relative alpha as a relaxed-wakefulness proxy (Berger rhythm; NeuroSky eSense convention).', 'caveat': 'NOT a validated meditation classifier. Deliberately falls with theta so it does not collapse into the drowsiness signal the way a relative-theta metric would.'}}

METRIC_NAMES module-attribute

METRIC_NAMES = ('focus', 'calm', 'attention', 'engagement', 'fatigue', 'meditation')

_EPS module-attribute

_EPS = 1e-09

raw_metrics

raw_metrics(bp)

Unscaled metric ratios. Calibration maps these into 0..1 later.

See the module docstring and METRIC_INFO for the basis and caveats of each metric. Only theta/alpha/beta are used; delta and gamma are excluded on purpose (drift- and EMG-dominated on consumer hardware).

Source code in src/bci_mcp/dsp/metrics.py
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
def raw_metrics(bp: dict[str, float]) -> dict[str, float]:
    """Unscaled metric ratios. Calibration maps these into 0..1 later.

    See the module docstring and ``METRIC_INFO`` for the basis and caveats of
    each metric. Only theta/alpha/beta are used; delta and gamma are excluded
    on purpose (drift- and EMG-dominated on consumer hardware).
    """
    t, a, b = bp["theta"], bp["alpha"], bp["beta"]
    return {
        "focus": b / (a + t + _EPS),            # Pope (1995) engagement index
        "calm": a / (a + b + _EPS),             # alpha vs beta relaxation
        "attention": b / (t + _EPS),            # inverse theta/beta ratio (TBR)
        "engagement": b / (a + _EPS),           # beta/alpha arousal ratio
        "fatigue": (t + a) / (b + _EPS),        # Eoh (2005) fatigue index
        "meditation": a / (a + b + t + _EPS),   # relative alpha (calm wakefulness)
    }

bci_mcp.dsp.state

The unified brain-state snapshot shared by CLI, dashboard, and MCP.

BrainState dataclass

Source code in src/bci_mcp/dsp/state.py
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
@dataclass
class BrainState:
    timestamp: float
    metrics: dict[str, float]
    band_powers: dict[str, float]
    relative_band_powers: dict[str, float]
    signal_quality: str
    quality_score: float
    artifacts: list[str]
    channels: int
    sample_rate: float
    calibrated: bool
    # How much to trust this reading (0..1): folds in signal quality, whether a
    # baseline has been calibrated, and how full the analysis window was.
    confidence: float = 1.0
    # Per-metric confidence (same keys as ``metrics``); lets consumers discount
    # individual values rather than the whole snapshot.
    metric_confidence: dict[str, float] = field(default_factory=dict)
    # "ok" | "warming_up" | "unreliable". "unreliable" means a hard artifact or
    # poor signal contaminated this window — the metrics are still present but
    # should not be interpreted as a clean reading.
    status: str = "ok"

    def to_dict(self) -> dict:
        return asdict(self)

    def summary(self) -> str:
        top = ", ".join(f"{k}={v:.2f}" for k, v in self.metrics.items())
        arts = ", ".join(self.artifacts) if self.artifacts else "none"
        cal = "calibrated" if self.calibrated else "uncalibrated"
        flag = "" if self.status == "ok" else f" [{self.status}]"
        return (
            f"[{cal}]{flag} {top} | signal: {self.signal_quality} "
            f"({self.quality_score:.2f}, conf {self.confidence:.2f}) | "
            f"artifacts: {arts}"
        )