Skip to content

Electrophysiology API

pykoppu.electrophysiology.base.ElectrophysiologyDriver

Bases: ABC

Abstract Base Class for Electrophysiology Drivers.

Source code in pykoppu/electrophysiology/base.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
class ElectrophysiologyDriver(ABC):
    """
    Abstract Base Class for Electrophysiology Drivers.
    """

    @abstractmethod
    def connect(self):
        """Establish connection to the device."""
        pass

    @abstractmethod
    def execute(self, instructions: List[Instruction]) -> Any:
        """
        Execute a sequence of BioASM instructions.

        Args:
            instructions (List[Instruction]): The instructions to execute.

        Returns:
            Any: The result of the execution (e.g., final state).
        """
        pass

    @abstractmethod
    def disconnect(self):
        """Close connection to the device."""
        pass

connect() abstractmethod

Establish connection to the device.

Source code in pykoppu/electrophysiology/base.py
16
17
18
19
@abstractmethod
def connect(self):
    """Establish connection to the device."""
    pass

disconnect() abstractmethod

Close connection to the device.

Source code in pykoppu/electrophysiology/base.py
34
35
36
37
@abstractmethod
def disconnect(self):
    """Close connection to the device."""
    pass

execute(instructions) abstractmethod

Execute a sequence of BioASM instructions.

Parameters:

Name Type Description Default
instructions List[Instruction]

The instructions to execute.

required

Returns:

Name Type Description
Any Any

The result of the execution (e.g., final state).

Source code in pykoppu/electrophysiology/base.py
21
22
23
24
25
26
27
28
29
30
31
32
@abstractmethod
def execute(self, instructions: List[Instruction]) -> Any:
    """
    Execute a sequence of BioASM instructions.

    Args:
        instructions (List[Instruction]): The instructions to execute.

    Returns:
        Any: The result of the execution (e.g., final state).
    """
    pass

pykoppu.electrophysiology.cpu.CPUDriver

Bases: ElectrophysiologyDriver

Driver for the CPU-based Digital Twin (using Brian2).

Source code in pykoppu/electrophysiology/cpu.py
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
class CPUDriver(ElectrophysiologyDriver):
    """
    Driver for the CPU-based Digital Twin (using Brian2).
    """

    def __init__(self, opu: OPU):
        self.opu = opu
        self.network = None
        self.neurons = None
        self.J = None
        self.h = None
        self.sigma = 0.0

    def connect(self):
        """Initialize the Brian2 environment."""
        # Reset Brian2 scope
        b2.start_scope()
        # Set default clock
        b2.defaultclock.dt = 0.1 * b2.ms

    def disconnect(self):
        """Clean up resources."""
        self.network = None
        self.neurons = None

    def execute(self, instructions: List[Instruction]) -> Any:
        """
        Execute BioASM instructions using Brian2.
        """
        results = {}

        for instr in instructions:
            if instr.opcode == OpCode.ALC:
                self._allocate(instr.operands[0])
            elif instr.opcode == OpCode.LDJ:
                self.J = np.array(instr.operands[0])
            elif instr.opcode == OpCode.LDH:
                self.h = np.array(instr.operands[0])
            elif instr.opcode == OpCode.SIG:
                self.sigma = float(instr.operands[0])
                # Update noise in the neuron model dynamically
                if self.neurons:
                    # We need to access the variable in the running network
                    # Brian2 allows setting variables directly
                    self.neurons.sigma_noise = self.sigma * b2.volt
            elif instr.opcode == OpCode.RUN:
                duration = float(instr.operands[0])
                self._run_simulation(duration)
            elif instr.opcode == OpCode.RD:
                # Read state (final membrane potentials)
                if self.neurons:
                    results['state'] = np.array(self.neurons.v[:])

        return results

    def _allocate(self, num_neurons: int):
        """Create the neuron group with Critical Regime parameters."""
        # Hardcoded Critical Regime Parameters as requested
        R = 50 * b2.Mohm
        tau = 20 * b2.ms
        El = -70 * b2.mV
        Vt = -50 * b2.mV
        Vr = -70 * b2.mV
        I_offset = 0.36 * b2.nA
        # Initial sigma (will be updated by SIG instruction)
        sigma_init = 2.0 * b2.mV 

        # Brian2 equations
        # dv/dt = (-(v - El) + R * (I_offset + I_input)) / tau + sigma_noise * sqrt(2/tau) * xi : volt
        eqs = '''
        dv/dt = (-(v - El) + R * (I_offset + I_input)) / tau + sigma_noise * sqrt(2/tau) * xi : volt
        I_input : amp
        sigma_noise : volt
        R : ohm
        tau : second
        El : volt
        Vt : volt
        Vr : volt
        I_offset : amp
        '''

        self.neurons = b2.NeuronGroup(
            num_neurons,
            eqs,
            threshold='v > Vt',
            reset='v = Vr',
            method='euler'
        )

        # Initialize variables
        self.neurons.R = R
        self.neurons.tau = tau
        self.neurons.El = El
        self.neurons.Vt = Vt
        self.neurons.Vr = Vr
        self.neurons.I_offset = I_offset

        self.neurons.v = El # Start at rest
        self.neurons.sigma_noise = sigma_init
        self.neurons.I_input = 0 * b2.amp

        # Create Network
        self.network = b2.Network(self.neurons)

        # Telemetry
        self.spike_monitor = b2.SpikeMonitor(self.neurons)
        self.network.add(self.spike_monitor)
        self.energy_trace = [] # Store energy at each step

        # Add feedback loop
        @b2.network_operation(dt=1*b2.ms)
        def feedback_loop():
            if self.J is not None and self.h is not None:
                # 1. Get State 's'
                # We use a normalized potential approximation for the state
                # s \in [0, 1]
                v_raw = self.neurons.v / b2.volt
                el_raw = El / b2.volt
                vt_raw = Vt / b2.volt

                # Linear mapping of v to [0, 1]
                s = np.clip((v_raw - el_raw) / (vt_raw - el_raw), 0, 1)

                # 2. Compute Feedback Current
                # I_fb = J @ s + h
                raw_current = (self.J @ s + self.h)

                # 3. Normalize Feedback
                # Target range: +/- 1.5 nA
                target_range = 1.5e-9

                max_abs_current = np.max(np.abs(raw_current))
                if max_abs_current > target_range:
                    scale = target_range / max_abs_current
                    raw_current = raw_current * scale

                self.neurons.I_input = raw_current * b2.amp

                # 4. Telemetry: Calculate Energy
                # E = -0.5 * s^T J s - h^T s
                # Note: This is an approximation using the continuous state 's'
                energy = -0.5 * s.T @ self.J @ s - self.h.T @ s
                self.energy_trace.append(energy)

        self.network.add(feedback_loop)

    def _run_simulation(self, duration: float):
        """Run the simulation."""
        if self.network:
            self.network.run(duration * b2.second)

    def execute(self, instructions: List[Instruction]) -> Any:
        """
        Execute BioASM instructions using Brian2.
        """
        results = {}

        for instr in instructions:
            if instr.opcode == OpCode.ALC:
                self._allocate(instr.operands[0])
            elif instr.opcode == OpCode.LDJ:
                self.J = np.array(instr.operands[0])
            elif instr.opcode == OpCode.LDH:
                self.h = np.array(instr.operands[0])
            elif instr.opcode == OpCode.SIG:
                self.sigma = float(instr.operands[0])
                # Update noise in the neuron model dynamically
                if self.neurons:
                    # We need to access the variable in the running network
                    # Brian2 allows setting variables directly
                    self.neurons.sigma_noise = self.sigma * b2.volt
            elif instr.opcode == OpCode.RUN:
                duration = float(instr.operands[0])
                self._run_simulation(duration)
            elif instr.opcode == OpCode.RD:
                # Read state (final membrane potentials)
                if self.neurons:
                    # Return tuple: (final_state, energy_trace, spike_data)
                    # Normalize final state to [0, 1]
                    v_raw = np.array(self.neurons.v[:])
                    el_raw = self.neurons.El[0] / b2.volt
                    vt_raw = self.neurons.Vt[0] / b2.volt

                    # s = (v - El) / (Vt - El) clipped to [0, 1]
                    s = np.clip((v_raw - el_raw) / (vt_raw - el_raw), 0, 1)

                    final_state = s
                    energy_history = list(self.energy_trace)
                    spike_data = (np.array(self.spike_monitor.t/b2.ms), np.array(self.spike_monitor.i))

                    results = (final_state, energy_history, spike_data)

        return results

connect()

Initialize the Brian2 environment.

Source code in pykoppu/electrophysiology/cpu.py
27
28
29
30
31
32
def connect(self):
    """Initialize the Brian2 environment."""
    # Reset Brian2 scope
    b2.start_scope()
    # Set default clock
    b2.defaultclock.dt = 0.1 * b2.ms

disconnect()

Clean up resources.

Source code in pykoppu/electrophysiology/cpu.py
34
35
36
37
def disconnect(self):
    """Clean up resources."""
    self.network = None
    self.neurons = None

execute(instructions)

Execute BioASM instructions using Brian2.

Source code in pykoppu/electrophysiology/cpu.py
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
def execute(self, instructions: List[Instruction]) -> Any:
    """
    Execute BioASM instructions using Brian2.
    """
    results = {}

    for instr in instructions:
        if instr.opcode == OpCode.ALC:
            self._allocate(instr.operands[0])
        elif instr.opcode == OpCode.LDJ:
            self.J = np.array(instr.operands[0])
        elif instr.opcode == OpCode.LDH:
            self.h = np.array(instr.operands[0])
        elif instr.opcode == OpCode.SIG:
            self.sigma = float(instr.operands[0])
            # Update noise in the neuron model dynamically
            if self.neurons:
                # We need to access the variable in the running network
                # Brian2 allows setting variables directly
                self.neurons.sigma_noise = self.sigma * b2.volt
        elif instr.opcode == OpCode.RUN:
            duration = float(instr.operands[0])
            self._run_simulation(duration)
        elif instr.opcode == OpCode.RD:
            # Read state (final membrane potentials)
            if self.neurons:
                # Return tuple: (final_state, energy_trace, spike_data)
                # Normalize final state to [0, 1]
                v_raw = np.array(self.neurons.v[:])
                el_raw = self.neurons.El[0] / b2.volt
                vt_raw = self.neurons.Vt[0] / b2.volt

                # s = (v - El) / (Vt - El) clipped to [0, 1]
                s = np.clip((v_raw - el_raw) / (vt_raw - el_raw), 0, 1)

                final_state = s
                energy_history = list(self.energy_trace)
                spike_data = (np.array(self.spike_monitor.t/b2.ms), np.array(self.spike_monitor.i))

                results = (final_state, energy_history, spike_data)

    return results

pykoppu.electrophysiology.connect(driver_name='cpu', opu=None, **kwargs)

Factory function to connect to a driver.

Parameters:

Name Type Description Default
driver_name str

Name of the driver ("cpu", "gpu", "intan", "cloud").

'cpu'
opu OPU

The OPU instance.

None
**kwargs Any

Arguments for the driver constructor.

{}

Returns:

Name Type Description
ElectrophysiologyDriver ElectrophysiologyDriver

The connected driver.

Source code in pykoppu/electrophysiology/__init__.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
def connect(driver_name: str = "cpu", opu: Optional[Any] = None, **kwargs: Any) -> ElectrophysiologyDriver:
    """
    Factory function to connect to a driver.

    Args:
        driver_name (str): Name of the driver ("cpu", "gpu", "intan", "cloud").
        opu (OPU): The OPU instance.
        **kwargs: Arguments for the driver constructor.

    Returns:
        ElectrophysiologyDriver: The connected driver.
    """
    from ..opu.device import OPU
    if opu is None:
        opu = kwargs.get("opu", OPU())

    if driver_name == "cpu":
        driver = CPUDriver(opu=opu)
    elif driver_name == "gpu":
        driver = GPUDriver(opu=opu)
    elif driver_name == "intan":
        driver = INTANDriver(opu=opu)
    elif driver_name == "cloud":
        driver = CLOUDDriver(opu=opu)
    else:
        raise ValueError(f"Unknown driver: {driver_name}")

    driver.connect()
    return driver