bci_mcp.mcp.service

Testable brain-service core. server.py adapts this to MCP.

BrainService

Source code in src/bci_mcp/mcp/service.py
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
class BrainService:
    def __init__(self) -> None:
        self._pipeline: Pipeline | None = None
        self._events: list[dict] = []
        self._nf = None

    def list_devices(self) -> dict:
        from ..core.registry import discover, list_schemes

        return {"devices": discover(), "schemes": list_schemes()}

    def connect(self, device_uri: str = "synthetic://") -> dict:
        validate_mcp_uri(device_uri)
        if self._pipeline is not None:
            self._pipeline.stop()
        self._pipeline = Pipeline(device_uri)
        self._pipeline.start()
        return {"connected": True, "device": self._pipeline.device.info.name,
                "uri": device_uri}

    def disconnect(self) -> dict:
        if self._pipeline is not None:
            self._pipeline.stop()
            self._pipeline = None
        return {"connected": False}

    def get_brain_state(self) -> dict:
        if self._pipeline is None:
            return {"error": "not connected — call connect() first"}
        state = self._pipeline.current_state()
        return state.to_dict() if state is not None else {"status": "warming_up"}

    def get_band_powers(self) -> dict:
        state = self.get_brain_state()
        if "metrics" not in state:  # error / warming_up sentinel
            return state
        return {"band_powers": state["band_powers"],
                "relative_band_powers": state["relative_band_powers"],
                "confidence": state["confidence"]}

    def get_signal_quality(self) -> dict:
        state = self.get_brain_state()
        if "metrics" not in state:  # error / warming_up sentinel
            return state
        return {"signal_quality": state["signal_quality"],
                "quality_score": state["quality_score"],
                "artifacts": state["artifacts"],
                "confidence": state["confidence"],
                "status": state["status"]}

    def get_metric_definitions(self) -> dict:
        from ..dsp.metrics import METRIC_INFO

        return {
            "metrics": METRIC_INFO,
            "disclaimer": (
                "These are heuristic EEG band-power ratios, not validated "
                "clinical measurements. Weight them by the `confidence` and "
                "`metric_confidence` fields, treat `status` == 'unreliable' as "
                "untrustworthy, and calibrate for personalized 0-1 scaling."
            ),
        }

    def calibrate(self, seconds: int = 20, condition: str = "relax") -> dict:
        if self._pipeline is None:
            return {"error": "not connected"}
        cal = self._pipeline.calibrate(seconds=seconds)
        return {"calibrated": cal.calibrated, "condition": condition,
                "metrics": list(cal.baseline)}

    def mark_event(self, label: str) -> dict:
        self._events.append({"label": label, "timestamp": time.time()})
        return {"marked": label, "total_events": len(self._events)}

    def stream_summary(self, seconds: int = 30) -> dict:
        # Plan 1: report the instantaneous state; rolling stats arrive with recording.
        state = self.get_brain_state()
        if "metrics" not in state:
            return state
        return {"window_seconds": seconds, "current": state["metrics"],
                "metric_confidence": state["metric_confidence"],
                "confidence": state["confidence"],
                "signal_quality": state["signal_quality"],
                "status": state["status"]}

    def record(self, seconds: float = 10.0, path: str = "session.npz",
               fmt: str | None = None) -> dict:
        if self._pipeline is None:
            return {"error": "not connected"}
        try:
            safe_path = safe_record_path(path)
        except ValueError as exc:
            return {"error": str(exc)}
        out = self._pipeline.record(seconds=seconds, path=safe_path, fmt=fmt)
        return {"recorded": True, "path": out, "seconds": seconds}

    def start_neurofeedback(self, metric: str = "focus", target: float = 0.7) -> dict:
        if self._pipeline is None:
            return {"error": "not connected"}
        from ..neurofeedback.trainer import NeurofeedbackSession

        self._nf = NeurofeedbackSession(self._pipeline, metric=metric, target=target)
        self._nf.start()
        return {"started": True, "metric": metric, "target": target}

    def get_neurofeedback_score(self) -> dict:
        nf = self._nf
        if nf is None:
            return {"error": "no neurofeedback session — call start_neurofeedback first"}
        nf.sample()
        return nf.score()

Pipeline

Source code in src/bci_mcp/pipeline.py
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
class Pipeline:
    def __init__(self, device: Device | str, window_seconds: float = 2.0,
                 notch_freq: float = 60.0) -> None:
        self.device = create_device(device) if isinstance(device, str) else device
        self.stream = Stream(self.device)
        # ~2 s gives Welch enough segments to resolve the low-frequency
        # (delta/theta) bands that several metrics depend on, instead of a single
        # noisy 1 s periodogram.
        self.window = int(self.device.info.sample_rate * window_seconds)
        self.notch_freq = notch_freq
        self.calibration = Calibration(scaling=metrics_mod.DEFAULT_SCALING)

    def start(self) -> None:
        self.stream.start()

    def stop(self) -> None:
        self.stream.stop()

    def _raw_metrics_now(self):
        fs = self.device.info.sample_rate
        data = self.stream.latest(self.window)
        if data.shape[1] < max(int(fs * 0.5), 64):
            return None, None, data, fs
        # Notch first (remove line noise before it can fold into the passband),
        # then bandpass to the 1-45 Hz analysis range.
        filtered = filters.notch(data, fs, self.notch_freq)
        filtered = filters.bandpass(filtered, fs)
        bp = bands.band_powers(filtered, fs)
        return metrics_mod.raw_metrics(bp), bp, data, fs

    def _confidence(self, quality_score: float, samples: int) -> float:
        """How much to trust a reading: signal quality × calibration × window fill."""
        fill = min(1.0, samples / float(self.window)) if self.window else 1.0
        cal_factor = 1.0 if self.calibration.calibrated else 0.6
        return float(max(0.0, min(1.0, quality_score * cal_factor * fill)))

    def current_state(self) -> BrainState | None:
        raw, bp, data, fs = self._raw_metrics_now()
        if raw is None:
            return None
        scaled = self.calibration.apply(raw)
        q_score, q_label, artifacts = quality_mod.assess_quality(data, fs)
        confidence = self._confidence(q_score, int(data.shape[1]))
        # Artifact handling, not just detection: a hard artifact or poor signal
        # flags the reading unreliable and collapses its confidence, so the
        # metrics are never narrated as a clean reading.
        hard = any(a in quality_mod.HARD_ARTIFACTS for a in artifacts)
        status = "unreliable" if (hard or q_label == "poor") else "ok"
        if status == "unreliable":
            confidence = min(confidence, 0.1)
        confidence = round(confidence, 4)
        return BrainState(
            timestamp=time.time(),
            metrics=scaled,
            band_powers=bp,
            relative_band_powers=bands.relative_band_powers(bp),
            signal_quality=q_label,
            quality_score=q_score,
            artifacts=artifacts,
            channels=self.device.info.channel_count,
            sample_rate=fs,
            calibrated=self.calibration.calibrated,
            confidence=confidence,
            metric_confidence={k: confidence for k in scaled},
            status=status,
        )

    def calibrate(self, seconds: float = 20.0) -> Calibration:
        samples = []
        end = time.time() + seconds
        while time.time() < end:
            raw, _, _, _ = self._raw_metrics_now()
            if raw is not None:
                samples.append(raw)
            time.sleep(0.25)
        if samples:
            self.calibration = Calibration.from_samples(
                samples, scaling=metrics_mod.DEFAULT_SCALING)
        return self.calibration

    def record(self, seconds: float, path: str, fmt: str | None = None) -> str:
        from .recording.recorder import Recorder
        from .recording.writer import save_recording

        recorder = Recorder()
        self.stream.add_consumer(recorder)
        recorder.start()
        try:
            time.sleep(seconds)
        finally:
            recorder.stop()
            self.stream.remove_consumer(recorder)
        data = recorder.data()
        return save_recording(
            data, self.device.info.sample_rate, self.device.info.channel_names, path, fmt,
            metadata={"device": self.device.info.name, "uri": self.device.info.uri},
        )

safe_record_path

safe_record_path(path)

Resolve path to an absolute path inside BCI_RECORD_DIR.

Raises ValueError on traversal attempts. When path is already absolute and inside the allowed directory it is returned as-is; otherwise the basename is appended to the allowed directory.

Source code in src/bci_mcp/recording/paths.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
def safe_record_path(path: str) -> str:
    """Resolve *path* to an absolute path inside BCI_RECORD_DIR.

    Raises ValueError on traversal attempts.  When *path* is already absolute
    and inside the allowed directory it is returned as-is; otherwise the
    basename is appended to the allowed directory.
    """
    allowed = _allowed_dir()
    p = Path(path)
    if p.is_absolute():
        resolved = p.resolve()
    else:
        # Strip any directory components from relative paths — use name only.
        resolved = (allowed / p.name).resolve()
    if not str(resolved).startswith(str(allowed) + os.sep) and resolved != allowed:
        raise ValueError(
            f"Recording path '{path}' is outside the allowed directory '{allowed}'. "
            f"Set BCI_RECORD_DIR to change the allowed directory."
        )
    resolved.parent.mkdir(parents=True, exist_ok=True)
    return str(resolved)

validate_mcp_uri

validate_mcp_uri(device_uri)

Raise ValueError if device_uri uses a scheme not allowed over MCP.

Source code in src/bci_mcp/recording/paths.py
42
43
44
45
46
47
48
49
50
51
52
def validate_mcp_uri(device_uri: str) -> str:
    """Raise ValueError if *device_uri* uses a scheme not allowed over MCP."""
    from urllib.parse import urlparse

    scheme = urlparse(device_uri).scheme.lower()
    if scheme not in MCP_ALLOWED_SCHEMES:
        raise ValueError(
            f"Device URI scheme '{scheme}' is not permitted via MCP. "
            f"Allowed schemes: {', '.join(sorted(MCP_ALLOWED_SCHEMES))}."
        )
    return device_uri

bci_mcp.devices.synthetic

Synthetic EEG device — realistic brainwaves with no hardware.

Chunk dataclass

Source code in src/bci_mcp/core/device.py
21
22
23
24
@dataclass
class Chunk:
    data: np.ndarray  # shape (channel_count, n_samples), float32, microvolts
    timestamps: np.ndarray  # shape (n_samples,), seconds

Device

Bases: ABC

A streaming EEG source. Subclasses run their own acquisition internally.

Source code in src/bci_mcp/core/device.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
class Device(ABC):
    """A streaming EEG source. Subclasses run their own acquisition internally."""

    info: DeviceInfo

    @abstractmethod
    def connect(self) -> None: ...

    @abstractmethod
    def start(self) -> None: ...

    @abstractmethod
    def read(self) -> Chunk | None:
        """Non-blocking pull of newly available samples, or None if none/not streaming."""

    @abstractmethod
    def stop(self) -> None: ...

    @abstractmethod
    def disconnect(self) -> None: ...

    def __enter__(self) -> Device:
        self.connect()
        self.start()
        return self

    def __exit__(self, *exc: object) -> None:
        self.stop()
        self.disconnect()

read abstractmethod

read()

Non-blocking pull of newly available samples, or None if none/not streaming.

Source code in src/bci_mcp/core/device.py
38
39
40
@abstractmethod
def read(self) -> Chunk | None:
    """Non-blocking pull of newly available samples, or None if none/not streaming."""

DeviceInfo dataclass

Source code in src/bci_mcp/core/device.py
10
11
12
13
14
15
16
17
18
@dataclass
class DeviceInfo:
    name: str
    uri: str
    sample_rate: float
    channel_count: int
    channel_names: list[str]
    units: str = "uV"  # "uV" | "counts"
    extra: dict = field(default_factory=dict)

SyntheticDevice

Bases: Device

Generates band-mixed EEG. focus (0..1) trades alpha for beta.

Source code in src/bci_mcp/devices/synthetic.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
class SyntheticDevice(Device):
    """Generates band-mixed EEG. `focus` (0..1) trades alpha for beta."""

    def __init__(
        self,
        channels: int = 4,
        sample_rate: float = 256.0,
        chunk_samples: int = 32,
        focus: float = 0.5,
        seed: int | None = None,
        uri: str = "synthetic://",
    ) -> None:
        self.info = DeviceInfo(
            name="Synthetic EEG",
            uri=uri,
            sample_rate=sample_rate,
            channel_count=channels,
            channel_names=[f"ch{i + 1}" for i in range(channels)],
            units="uV",
        )
        self.chunk_samples = chunk_samples
        self.focus = float(np.clip(focus, 0.0, 1.0))
        self._rng = np.random.default_rng(seed)
        self._t = 0  # sample counter
        self._streaming = False

    def connect(self) -> None:
        pass

    def start(self) -> None:
        self._streaming = True

    def stop(self) -> None:
        self._streaming = False

    def disconnect(self) -> None:
        pass

    def read(self) -> Chunk | None:
        if not self._streaming:
            return None
        fs = self.info.sample_rate
        n = self.chunk_samples
        idx = self._t + np.arange(n)
        t = idx / fs
        alpha_amp = 20.0 * (1.0 - self.focus) + 5.0  # 10 Hz
        beta_amp = 18.0 * self.focus + 3.0  # 20 Hz
        base = (
            alpha_amp * np.sin(2 * np.pi * 10 * t)
            + beta_amp * np.sin(2 * np.pi * 20 * t)
            + 8.0 * np.sin(2 * np.pi * 6 * t)  # theta
        )
        data = np.empty((self.info.channel_count, n), dtype=np.float32)
        for c in range(self.info.channel_count):
            noise = self._rng.normal(0.0, 5.0, n)
            data[c] = base + noise
        self._t += n
        return Chunk(data=data, timestamps=t.astype(np.float64))

_factory

_factory(parsed, params)
Source code in src/bci_mcp/devices/synthetic.py
69
70
71
72
73
74
75
76
def _factory(parsed, params):  # noqa: ANN001
    return SyntheticDevice(
        channels=int(params.get("channels", 4)),
        sample_rate=float(params.get("sample_rate", 256.0)),
        focus=float(params.get("focus", 0.5)),
        seed=int(params["seed"]) if "seed" in params else None,
        uri=f"synthetic://?{parsed.query}" if parsed.query else "synthetic://",
    )

register

register(scheme, factory)
Source code in src/bci_mcp/core/registry.py
13
14
def register(scheme: str, factory: DeviceFactory) -> None:
    _REGISTRY[scheme] = factory

bci_mcp.devices.neurofocus

NeuroFocus v4 device — USB-serial and BLE transports.

Chunk dataclass

Source code in src/bci_mcp/core/device.py
21
22
23
24
@dataclass
class Chunk:
    data: np.ndarray  # shape (channel_count, n_samples), float32, microvolts
    timestamps: np.ndarray  # shape (n_samples,), seconds

Device

Bases: ABC

A streaming EEG source. Subclasses run their own acquisition internally.

Source code in src/bci_mcp/core/device.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
class Device(ABC):
    """A streaming EEG source. Subclasses run their own acquisition internally."""

    info: DeviceInfo

    @abstractmethod
    def connect(self) -> None: ...

    @abstractmethod
    def start(self) -> None: ...

    @abstractmethod
    def read(self) -> Chunk | None:
        """Non-blocking pull of newly available samples, or None if none/not streaming."""

    @abstractmethod
    def stop(self) -> None: ...

    @abstractmethod
    def disconnect(self) -> None: ...

    def __enter__(self) -> Device:
        self.connect()
        self.start()
        return self

    def __exit__(self, *exc: object) -> None:
        self.stop()
        self.disconnect()

read abstractmethod

read()

Non-blocking pull of newly available samples, or None if none/not streaming.

Source code in src/bci_mcp/core/device.py
38
39
40
@abstractmethod
def read(self) -> Chunk | None:
    """Non-blocking pull of newly available samples, or None if none/not streaming."""

DeviceInfo dataclass

Source code in src/bci_mcp/core/device.py
10
11
12
13
14
15
16
17
18
@dataclass
class DeviceInfo:
    name: str
    uri: str
    sample_rate: float
    channel_count: int
    channel_names: list[str]
    units: str = "uV"  # "uV" | "counts"
    extra: dict = field(default_factory=dict)

NeuroFocusDevice

Bases: Device

Source code in src/bci_mcp/devices/neurofocus.py
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
class NeuroFocusDevice(Device):
    def __init__(self, transport: str = "serial", port: str = "",
                 ble_name: str = "NEUROFOCUS_V4_01",
                 sample_rate: float = proto.DEFAULT_SAMPLE_RATE,
                 baud: int = 115200, uri: str | None = None,
                 serial_factory=None) -> None:
        self.transport = transport
        self.port = port
        self.ble_name = ble_name
        self.baud = baud
        self._serial_factory = serial_factory
        self.info = DeviceInfo(
            name=f"NeuroFocus v4 ({transport})",
            uri=uri or (f"neurofocus://serial/{port}" if transport == "serial"
                        else f"neurofocus://ble/{ble_name}"),
            sample_rate=sample_rate, channel_count=1, channel_names=["ch1"],
            units="uV", extra={"transport": transport},
        )
        self._buf: list[float] = []
        self._lock = threading.Lock()
        self._thread: threading.Thread | None = None
        self._running = False
        self._serial = None
        self._ble_client = None
        self._loop = None
        self._ble_ready = None
        self._ble_error = None

    # --- shared ---
    def _emit_counts(self, counts) -> None:
        if not counts:
            return
        uv = proto.counts_to_uv(np.asarray(counts, dtype=np.int64))
        with self._lock:
            self._buf.extend(float(x) for x in np.atleast_1d(uv))

    def read(self) -> Chunk | None:
        with self._lock:
            if not self._buf:
                return None
            values = self._buf
            self._buf = []
        data = np.array(values, dtype=np.float32).reshape(1, -1)
        ts = np.arange(data.shape[1], dtype=np.float64) / self.info.sample_rate
        return Chunk(data=data, timestamps=ts)

    # --- lifecycle dispatch ---
    def connect(self) -> None:
        if self.transport == "serial":
            self._connect_serial()
        else:
            self._connect_ble()

    def start(self) -> None:
        if self._running:
            return
        if self.transport == "serial":
            self._serial.write(proto.CMD_START)
            self._running = True
            self._thread = threading.Thread(target=self._run_serial, daemon=True)
            self._thread.start()
        else:
            # BLE auto-streams on connect; notifications already flowing.
            self._running = True

    def stop(self) -> None:
        self._running = False
        if self.transport == "serial":
            if self._serial is not None:
                try:
                    self._serial.write(proto.CMD_STOP)
                except (OSError, AttributeError):
                    pass
            if self._thread is not None:
                self._thread.join(timeout=1.0)
        else:
            if self._loop is not None and self._ble_client is not None:
                import asyncio

                try:
                    fut = asyncio.run_coroutine_threadsafe(
                        self._ble_client.write_gatt_char(
                            proto.CMD_CHAR_UUID, proto.CMD_STOP
                        ),
                        self._loop,
                    )
                    fut.result(timeout=2.0)
                except Exception:  # noqa: BLE001
                    pass

    def disconnect(self) -> None:
        if self.transport == "serial":
            if self._serial is not None and getattr(self._serial, "is_open", False):
                self._serial.close()
        else:
            self._disconnect_ble()

    # --- serial transport ---
    def _connect_serial(self) -> None:
        if self._serial_factory is not None:
            self._serial = self._serial_factory(self.port, self.baud)
        else:
            import serial

            self._serial = serial.Serial(self.port, self.baud, timeout=1)

    def _run_serial(self) -> None:
        while self._running and getattr(self._serial, "is_open", False):
            try:
                if self._serial.in_waiting:
                    line = self._serial.readline()
                    self._emit_counts(proto.parse_frame(line))
                else:
                    time.sleep(0.001)
            except (OSError, AttributeError):
                break

    # --- BLE transport (bleak 3.x API verified: find_device_by_name, start_notify(uuid, cb),
    #     callback signature (sender, data: bytearray)) ---
    def _connect_ble(self) -> None:
        import asyncio

        import bleak

        self._loop = asyncio.new_event_loop()
        self._ble_ready = threading.Event()
        self._ble_error = None

        def _notify(_sender, data) -> None:
            self._emit_counts(proto.parse_frame(bytes(data)))

        async def _setup() -> None:
            try:
                device = await bleak.BleakScanner.find_device_by_name(self.ble_name)
                if device is None:
                    raise RuntimeError(
                        f"NeuroFocus BLE device '{self.ble_name}' not found"
                    )
                self._ble_client = bleak.BleakClient(device)
                await self._ble_client.connect()
                await self._ble_client.start_notify(proto.DATA_CHAR_UUID, _notify)
                await self._ble_client.write_gatt_char(
                    proto.CMD_CHAR_UUID, proto.CMD_START
                )
            except Exception as e:  # noqa: BLE001
                self._ble_error = e
            finally:
                self._ble_ready.set()

        def _run_loop() -> None:
            asyncio.set_event_loop(self._loop)
            self._loop.run_until_complete(_setup())
            if self._ble_error is None:
                self._loop.run_forever()

        self._thread = threading.Thread(target=_run_loop, daemon=True)
        self._thread.start()
        if not self._ble_ready.wait(timeout=15.0):
            raise RuntimeError(
                f"NeuroFocus BLE connect timed out for '{self.ble_name}'"
            )
        if self._ble_error is not None:
            raise self._ble_error

    def _disconnect_ble(self) -> None:
        self._running = False
        if self._loop is not None and self._ble_client is not None:
            import asyncio

            async def _teardown() -> None:
                try:
                    await self._ble_client.stop_notify(proto.DATA_CHAR_UUID)
                except Exception:  # noqa: BLE001
                    pass
                await self._ble_client.disconnect()

            try:
                fut = asyncio.run_coroutine_threadsafe(_teardown(), self._loop)
                fut.result(timeout=2.0)
            except Exception:  # noqa: BLE001
                pass
            self._loop.call_soon_threadsafe(self._loop.stop)
            if self._thread is not None:
                self._thread.join(timeout=2.0)

_factory

_factory(parsed, params)
Source code in src/bci_mcp/devices/neurofocus.py
200
201
202
203
204
205
206
207
208
209
def _factory(parsed, params):  # noqa: ANN001
    # neurofocus://serial/<port>  or  neurofocus://ble/<name>
    transport = parsed.netloc or "serial"
    rest = parsed.path.lstrip("/")
    if transport == "serial":
        port = "/" + rest if rest else ""
        return NeuroFocusDevice(transport="serial", port=port,
                                uri=parsed.geturl())
    return NeuroFocusDevice(transport="ble", ble_name=rest or "NEUROFOCUS_V4_01",
                            uri=parsed.geturl())

register

register(scheme, factory)
Source code in src/bci_mcp/core/registry.py
13
14
def register(scheme: str, factory: DeviceFactory) -> None:
    _REGISTRY[scheme] = factory