Skip to content

Python API

mneme

Core modules

Recorded execution database and memory snapshot bindings.

This module defines the Python-side representation of Mneme’s record/replay artifacts:

  • MemStateRef: a lightweight wrapper over the native memory-snapshot object (prologue/epilogue) used for replay verification.
  • RecordedExecution: a JSON-serializable description of a recorded kernel, including all observed dynamic instances and the LLVM IR modules required for replay.

The native snapshot API is accessed via ctypes/FFI (ffi.lib.MnemePy_*). Instances of :class:MemStateRef behave as context managers: they load the snapshot on entry and dispose the native handle on exit.

Notes
  • This file is core to replay correctness: prologue/epilogue snapshots are used to verify that the replayed kernel produced the expected state.
  • The JSON schema here is treated as a stable interchange format between record and replay tools.

MemStateRef

Handle to a recorded memory snapshot (prologue/epilogue).

A :class:MemStateRef wraps Mneme’s native memory snapshot representation, which encodes the recorded kernel argument pointers and the captured device memory state. During replay, these snapshots serve two purposes:

  1. Inputs: The prologue snapshot provides the argument pointer list and initial memory contents required to execute the kernel deterministically.
  2. Verification: The epilogue snapshot represents the expected post-kernel state. Replay compares a reproduced epilogue against this snapshot to validate correctness.

Instances are context managers. Entering the context loads the snapshot into the native handle; leaving the context disposes it.

Parameters:

Name Type Description Default
fn str

Path to the snapshot file on disk.

required
kernel_name str

Kernel name associated with this snapshot (used by native layer).

required
snap_type SnapshotType

Whether this snapshot is a prologue or epilogue capture.

required

Raises:

Type Description
RuntimeError

If the snapshot file does not exist.

Source code in python/mneme/recorded_execution.py
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
class MemStateRef:
    """
    Handle to a recorded memory snapshot (prologue/epilogue).

    A :class:`MemStateRef` wraps Mneme’s native memory snapshot representation,
    which encodes the recorded kernel argument pointers and the captured device
    memory state. During replay, these snapshots serve two purposes:

      1. **Inputs**: The prologue snapshot provides the argument pointer list and
         initial memory contents required to execute the kernel deterministically.
      2. **Verification**: The epilogue snapshot represents the expected post-kernel
         state. Replay compares a reproduced epilogue against this snapshot to
         validate correctness.

    Instances are context managers. Entering the context loads the snapshot
    into the native handle; leaving the context disposes it.

    Parameters
    ----------
    fn : str
        Path to the snapshot file on disk.
    kernel_name : str
        Kernel name associated with this snapshot (used by native layer).
    snap_type : SnapshotType
        Whether this snapshot is a prologue or epilogue capture.

    Raises
    ------
    RuntimeError
        If the snapshot file does not exist.
    """

    def __init__(self, fn: str, kernel_name: str, snap_type: SnapshotType):
        if not Path(fn).exists():
            raise RuntimeError(f"Expected prologue file: {fn} to exist")
        self.fn = fn
        self.kernel_name = kernel_name
        self.s_type = snap_type
        self._state = None
        self._load = False
        self._num_args = None
        self._args = None

    def _dispose(self):
        if self._state is not None:
            ffi.lib.MnemePy_DisposeMemState(self._state)

    def open(self):
        """
        Initialize and load the snapshot into the native handle.

        Returns
        -------
        MemStateRef
            Returns self for convenient chaining / context-manager usage.
        """
        if self._state is None:
            self._state = ffi.lib.MnemePy_initializeMemState(
                c_char_p(self.kernel_name.encode("utf-8")),
                c_char_p(self.fn.encode("utf-8")),
                c_bool(self.s_type == SnapshotType.PROLOGUE),
            )

        ffi.lib.MnemePy_LoadMemState(self._state)
        self._load = True
        return self

    @property
    def args(self):
        """
        Return the kernel argument pointer array stored in the snapshot.

        Returns
        -------
        ctypes.POINTER(ctypes.c_void_p)
            Pointer to an array of argument pointers as returned by the native API.

        Raises
        ------
        RuntimeError
            If the snapshot has not been loaded via :meth:`open`.
        """
        if not self._load:
            raise RuntimeError("Cannot access arguments without loading memory state")

        if self._args == None:
            self._args = ffi.lib.MnemePy_getArgs(self._state)

        return self._args

    @property
    def num_args(self):
        """
        Return the number of kernel arguments recorded in the snapshot.

        Returns
        -------
        int
            Number of arguments.

        Raises
        ------
        RuntimeError
            If the snapshot has not been loaded via :meth:`open`.
        """
        if not self._load:
            raise RuntimeError("Cannot access num_args without loading memory state")

        if self._num_args == None:
            self._num_args = ffi.lib.MnemePy_getNumArgs(self._state)

        return self._num_args

    def close(self):
        self._dispose()

    def __enter__(self):
        self.open()
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        self.close()
        return False

    def reset(self):
        """
        Reset the snapshot state in the native layer.

        This is typically used to restore the device memory state to the recorded
        baseline (e.g., before re-running a replay) without reinitializing the
        snapshot handle.

        Raises
        ------
        RuntimeError
            If the snapshot has not been loaded via :meth:`open`.
        """
        if self._load is False:
            raise RuntimeError("Cannot reset memory, if state is not read first.")

        ffi.lib.MnemePy_ResetMemState(self._state)

    def __eq__(self, other):
        """
        Compare two snapshots using the native comparison routine.

        Returns
        -------
        bool
            True if the native layer considers the states equivalent.
        """
        return bool(ffi.lib.MnemePy_CompareMemState(self._state, other._state))

    def __ne__(self, other):
        """
        Compare two snapshots using the native comparison routine.

        Returns
        -------
        bool
            True if the native layer considers the states different.
        """
        return not bool(ffi.lib.MnemePy_CompareMemState(self._state, other._state))

    def __del__(self):
        try:
            state = getattr(self, "_state", None)

            # If no state, or constructor failed, or tests used fake values → skip cleanup
            if not state or isinstance(state, str):
                return

            # Call disposer only if available and callable
            dispose = getattr(ffi.lib, "MnemePy_DisposeMemState", None)
            if callable(dispose):
                try:
                    dispose(state)
                except Exception:
                    pass

        except Exception:
            # Absolutely nothing should escape __del__
            pass

args property

Return the kernel argument pointer array stored in the snapshot.

Returns:

Type Description
POINTER(c_void_p)

Pointer to an array of argument pointers as returned by the native API.

Raises:

Type Description
RuntimeError

If the snapshot has not been loaded via :meth:open.

num_args property

Return the number of kernel arguments recorded in the snapshot.

Returns:

Type Description
int

Number of arguments.

Raises:

Type Description
RuntimeError

If the snapshot has not been loaded via :meth:open.

__eq__(other)

Compare two snapshots using the native comparison routine.

Returns:

Type Description
bool

True if the native layer considers the states equivalent.

Source code in python/mneme/recorded_execution.py
245
246
247
248
249
250
251
252
253
254
def __eq__(self, other):
    """
    Compare two snapshots using the native comparison routine.

    Returns
    -------
    bool
        True if the native layer considers the states equivalent.
    """
    return bool(ffi.lib.MnemePy_CompareMemState(self._state, other._state))

__ne__(other)

Compare two snapshots using the native comparison routine.

Returns:

Type Description
bool

True if the native layer considers the states different.

Source code in python/mneme/recorded_execution.py
256
257
258
259
260
261
262
263
264
265
def __ne__(self, other):
    """
    Compare two snapshots using the native comparison routine.

    Returns
    -------
    bool
        True if the native layer considers the states different.
    """
    return not bool(ffi.lib.MnemePy_CompareMemState(self._state, other._state))

open()

Initialize and load the snapshot into the native handle.

Returns:

Type Description
MemStateRef

Returns self for convenient chaining / context-manager usage.

Source code in python/mneme/recorded_execution.py
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
def open(self):
    """
    Initialize and load the snapshot into the native handle.

    Returns
    -------
    MemStateRef
        Returns self for convenient chaining / context-manager usage.
    """
    if self._state is None:
        self._state = ffi.lib.MnemePy_initializeMemState(
            c_char_p(self.kernel_name.encode("utf-8")),
            c_char_p(self.fn.encode("utf-8")),
            c_bool(self.s_type == SnapshotType.PROLOGUE),
        )

    ffi.lib.MnemePy_LoadMemState(self._state)
    self._load = True
    return self

reset()

Reset the snapshot state in the native layer.

This is typically used to restore the device memory state to the recorded baseline (e.g., before re-running a replay) without reinitializing the snapshot handle.

Raises:

Type Description
RuntimeError

If the snapshot has not been loaded via :meth:open.

Source code in python/mneme/recorded_execution.py
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
def reset(self):
    """
    Reset the snapshot state in the native layer.

    This is typically used to restore the device memory state to the recorded
    baseline (e.g., before re-running a replay) without reinitializing the
    snapshot handle.

    Raises
    ------
    RuntimeError
        If the snapshot has not been loaded via :meth:`open`.
    """
    if self._load is False:
        raise RuntimeError("Cannot reset memory, if state is not read first.")

    ffi.lib.MnemePy_ResetMemState(self._state)

RecordedExecution

Description of a recorded kernel execution and its dynamic instances.

A :class:RecordedExecution captures everything needed to replay and tune a kernel that was observed during application execution:

  • Kernel identity (static hash, name, demangled name)
  • Argument names and specialization availability
  • Virtual address space reservation information (VA base + size)
  • LLVM IR module file paths required for linking
  • A mapping of dynamic hash → KernelInstance, representing each observed launch instance (grid/block/shared-mem and snapshot paths)

The class behaves like a mapping over kernel instances and supports JSON serialization via :meth:to_json / :meth:from_json.

Parameters:

Name Type Description Default
static_hash str

Stable identifier for the kernel’s static code shape.

required
kernel_name str

Mangled or runtime kernel symbol name.

required
demangled_name str

Human-readable kernel name (if available).

required
llvm_files list[str]

Paths to LLVM IR modules captured during recording.

required
arg_names list[str]

Recorded kernel argument names (for display/debugging).

required
specializations list[bool]

Per-argument specialization availability flags.

required
va_addr str

Base virtual address (hex string) used by Mneme’s memory manager.

required
va_size int

Virtual address space size in bytes (or recording-specific unit).

required
kernel_instances dict[str, KernelInstance]

Mapping from dynamic hash to recorded launch instance descriptor.

required
Source code in python/mneme/recorded_execution.py
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
class RecordedExecution:
    """
    Description of a recorded kernel execution and its dynamic instances.

    A :class:`RecordedExecution` captures everything needed to replay and tune
    a kernel that was observed during application execution:

      - Kernel identity (static hash, name, demangled name)
      - Argument names and specialization availability
      - Virtual address space reservation information (VA base + size)
      - LLVM IR module file paths required for linking
      - A mapping of **dynamic hash → KernelInstance**, representing each observed
        launch instance (grid/block/shared-mem and snapshot paths)

    The class behaves like a mapping over kernel instances and supports JSON
    serialization via :meth:`to_json` / :meth:`from_json`.

    Parameters
    ----------
    static_hash : str
        Stable identifier for the kernel’s static code shape.
    kernel_name : str
        Mangled or runtime kernel symbol name.
    demangled_name : str
        Human-readable kernel name (if available).
    llvm_files : list[str]
        Paths to LLVM IR modules captured during recording.
    arg_names : list[str]
        Recorded kernel argument names (for display/debugging).
    specializations : list[bool]
        Per-argument specialization availability flags.
    va_addr : str
        Base virtual address (hex string) used by Mneme’s memory manager.
    va_size : int
        Virtual address space size in bytes (or recording-specific unit).
    kernel_instances : dict[str, KernelInstance]
        Mapping from dynamic hash to recorded launch instance descriptor.
    """

    class KernelInstance:
        """
        Description of one dynamic kernel launch instance.

        A kernel may be launched multiple times with different dynamic properties
        (e.g., different grid/block sizes, argument values, or observed runtime hashes).
        Each :class:`KernelInstance` stores:

          - Launch parameters (grid, block, shared memory)
          - Dynamic hash (identifies the runtime instance)
          - Available specialization indices (derived from specialization flags)
          - Snapshot file paths for prologue and epilogue

        The prologue/epilogue snapshots are exposed via :class:`MemStateRef` objects,
        which are opened on demand by the replay executor.
        """

        def __init__(
            self,
            static_hash: str,
            dynamic_hash: str,
            kernel_name: str,
            args: List,
            shared_mem: int,
            block_dim: dim3,
            grid_dim: dim3,
            specializations: List[bool],
            occ: int,
            prologue_fn: str,
            epilogue_fn: str,
        ):
            self.static_hash = static_hash
            self.dynamic_hash = dynamic_hash
            self.kernel_name = kernel_name
            self.args = args
            self.shared_mem = shared_mem
            self.block_dim = block_dim
            self.grid_dim = grid_dim
            self.available_specializations = []
            self.specializations = specializations
            for i, v in enumerate(specializations):
                if v:
                    self.available_specializations.append(i)
            self.occ = occ
            self.prologue = MemStateRef(prologue_fn, kernel_name, SnapshotType.PROLOGUE)
            self.epilogue = MemStateRef(epilogue_fn, kernel_name, SnapshotType.EPILOGUE)

        def __hash__(self):
            return hash(self.dynamic_hash + self.static_hash)

        def __str__(self):
            return f"Grid:{self.grid_dim}, BlockDim: {self.block_dim}, Shared Memory {self.shared_mem}"

        def to_dict(self):
            """
            Convert this instance into a JSON-friendly dictionary.

            Returns
            -------
            dict
                Serializable representation containing dims, shared memory,
                occurrence count, and snapshot file paths.
            """
            res = {}
            res["Args"] = self.specializations
            res["BlockDims"] = self.block_dim.to_dict()
            res["GridDims"] = self.grid_dim.to_dict()
            res["Occurrences"] = self.occ
            res["SharedMem"] = self.shared_mem
            res["Epilogue"] = self.epilogue.fn
            res["Prologue"] = self.prologue.fn

            return res

    def __init__(
        self,
        static_hash: str,
        kernel_name: str,
        demangled_name: str,
        llvm_files: List[str],
        arg_names: List[str],
        specializations: List[bool],
        va_addr: str,
        va_size: int,
        kernel_instances: Dict[str, KernelInstance],
    ):
        self.static_hash = static_hash
        self.kernel_name = kernel_name
        self.demangled_name = demangled_name
        self.llvm_files = llvm_files
        self.arg_names = arg_names
        self.specializations = specializations
        self.va_addr = va_addr
        self.va_size = va_size
        self.kernel_instances = kernel_instances
        self._link_mod = None

    def __str__(self):
        return f"KernelName: {self.kernel_name} NumArgs: {len(self.arg_names)}, VASize: {self.va_size}, VAddr: {self.va_addr}"

    def __getitem__(self, key):
        return self.kernel_instances[key]

    def __setitem__(self, key, value):
        self.kernel_instances[key] = value

    def __delitem__(self, key):
        del self.kernel_instances[key]

    def __iter__(self):
        return iter(self.kernel_instances)

    def __len__(self):
        return len(self.kernel_instances)

    def __contains__(self, key):
        return key in self.kernel_instances

    def items(self):
        return self.kernel_instances.items()

    def keys(self):
        return self.kernel_instances.keys()

    def values(self):
        return self.kernel_instances.values()

    def link_llvm_modules(self, prune=True, internalize=True):
        """
        Link recorded LLVM IR modules into a single module suitable for replay.

        This is a convenience wrapper over the Proteus JIT linking layer.
        Results are cached on the first call and returned on subsequent calls.

        Parameters
        ----------
        prune : bool
            Whether to prune unused symbols/IR during linking.
        internalize : bool
            Whether to internalize symbols during linking.

        Returns
        -------
        ModuleRef
            Linked IR module produced by the JIT layer.
        """
        if self._link_mod is not None:
            return self._link_mod

        self._link_mod = jit.link_llvm_modules(
            self.llvm_files, self.kernel_name, prune, internalize
        )

        return self._link_mod

    def to_dict(self):
        res = {}
        res["ArgNames"] = self.arg_names
        res["BinaryBlobs"] = []
        res["DemangledName"] = self.demangled_name
        res["KernelName"] = self.kernel_name
        res["Modules"] = self.llvm_files
        res["Specializations"] = self.specializations
        res["StaticHash"] = self.static_hash
        res["VASize"] = self.va_size
        res["VAddr"] = self.va_addr
        res["instances"] = {}
        for k, v in self.items():
            res["instances"][k] = v.to_dict()
        return res

    def to_json(self, fn: str):
        """
        Serialize this record database to a JSON file.

        Parameters
        ----------
        fn : str
            Output JSON path.
        """
        find_non_jsonables(self.to_dict())
        with open(fn, "w") as fd:
            json.dump(self.to_dict(), fd, indent=2)

    @classmethod
    def from_json(cls, fn: str):
        """
        Load a :class:`RecordedExecution` database from JSON.

        This reconstructs all :class:`KernelInstance` entries and validates that
        referenced LLVM module paths exist.

        Parameters
        ----------
        fn : str
            Path to the recorded execution JSON file.

        Returns
        -------
        RecordedExecution
            Loaded record database.

        Raises
        ------
        RuntimeError
            If the JSON file does not exist or referenced IR modules are missing.
        """
        if not Path(fn).exists():
            raise RuntimeError("JSON file does not exist")

        with open(fn, "r") as fd:
            record_db = json.load(fd)

        instances = {}
        for dhash, inst in record_db["instances"].items():
            block_dim = dim3(
                inst["BlockDims"]["x"], inst["BlockDims"]["y"], inst["BlockDims"]["z"]
            )
            grid_dim = dim3(
                inst["GridDims"]["x"], inst["GridDims"]["y"], inst["GridDims"]["z"]
            )
            instances[dhash] = cls.KernelInstance(
                record_db["StaticHash"],
                dhash,
                record_db["KernelName"],
                inst["Args"],
                inst["SharedMem"],
                block_dim,
                grid_dim,
                record_db["Specializations"],
                inst["Occurrences"],
                inst["Prologue"],
                inst["Epilogue"],
            )

        for llvm_fn in record_db["Modules"]:
            if not Path(llvm_fn).exists():
                raise RuntimeError(f"File {llvm_fn} does not exist")

        return cls(
            record_db["StaticHash"],
            record_db["KernelName"],
            record_db["DemangledName"],
            record_db["Modules"],
            record_db["ArgNames"],
            record_db["Specializations"],
            record_db["VAddr"],
            record_db["VASize"],
            instances,
        )

KernelInstance

Description of one dynamic kernel launch instance.

A kernel may be launched multiple times with different dynamic properties (e.g., different grid/block sizes, argument values, or observed runtime hashes). Each :class:KernelInstance stores:

  • Launch parameters (grid, block, shared memory)
  • Dynamic hash (identifies the runtime instance)
  • Available specialization indices (derived from specialization flags)
  • Snapshot file paths for prologue and epilogue

The prologue/epilogue snapshots are exposed via :class:MemStateRef objects, which are opened on demand by the replay executor.

Source code in python/mneme/recorded_execution.py
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
class KernelInstance:
    """
    Description of one dynamic kernel launch instance.

    A kernel may be launched multiple times with different dynamic properties
    (e.g., different grid/block sizes, argument values, or observed runtime hashes).
    Each :class:`KernelInstance` stores:

      - Launch parameters (grid, block, shared memory)
      - Dynamic hash (identifies the runtime instance)
      - Available specialization indices (derived from specialization flags)
      - Snapshot file paths for prologue and epilogue

    The prologue/epilogue snapshots are exposed via :class:`MemStateRef` objects,
    which are opened on demand by the replay executor.
    """

    def __init__(
        self,
        static_hash: str,
        dynamic_hash: str,
        kernel_name: str,
        args: List,
        shared_mem: int,
        block_dim: dim3,
        grid_dim: dim3,
        specializations: List[bool],
        occ: int,
        prologue_fn: str,
        epilogue_fn: str,
    ):
        self.static_hash = static_hash
        self.dynamic_hash = dynamic_hash
        self.kernel_name = kernel_name
        self.args = args
        self.shared_mem = shared_mem
        self.block_dim = block_dim
        self.grid_dim = grid_dim
        self.available_specializations = []
        self.specializations = specializations
        for i, v in enumerate(specializations):
            if v:
                self.available_specializations.append(i)
        self.occ = occ
        self.prologue = MemStateRef(prologue_fn, kernel_name, SnapshotType.PROLOGUE)
        self.epilogue = MemStateRef(epilogue_fn, kernel_name, SnapshotType.EPILOGUE)

    def __hash__(self):
        return hash(self.dynamic_hash + self.static_hash)

    def __str__(self):
        return f"Grid:{self.grid_dim}, BlockDim: {self.block_dim}, Shared Memory {self.shared_mem}"

    def to_dict(self):
        """
        Convert this instance into a JSON-friendly dictionary.

        Returns
        -------
        dict
            Serializable representation containing dims, shared memory,
            occurrence count, and snapshot file paths.
        """
        res = {}
        res["Args"] = self.specializations
        res["BlockDims"] = self.block_dim.to_dict()
        res["GridDims"] = self.grid_dim.to_dict()
        res["Occurrences"] = self.occ
        res["SharedMem"] = self.shared_mem
        res["Epilogue"] = self.epilogue.fn
        res["Prologue"] = self.prologue.fn

        return res

to_dict()

Convert this instance into a JSON-friendly dictionary.

Returns:

Type Description
dict

Serializable representation containing dims, shared memory, occurrence count, and snapshot file paths.

Source code in python/mneme/recorded_execution.py
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
def to_dict(self):
    """
    Convert this instance into a JSON-friendly dictionary.

    Returns
    -------
    dict
        Serializable representation containing dims, shared memory,
        occurrence count, and snapshot file paths.
    """
    res = {}
    res["Args"] = self.specializations
    res["BlockDims"] = self.block_dim.to_dict()
    res["GridDims"] = self.grid_dim.to_dict()
    res["Occurrences"] = self.occ
    res["SharedMem"] = self.shared_mem
    res["Epilogue"] = self.epilogue.fn
    res["Prologue"] = self.prologue.fn

    return res

from_json(fn) classmethod

Load a :class:RecordedExecution database from JSON.

This reconstructs all :class:KernelInstance entries and validates that referenced LLVM module paths exist.

Parameters:

Name Type Description Default
fn str

Path to the recorded execution JSON file.

required

Returns:

Type Description
RecordedExecution

Loaded record database.

Raises:

Type Description
RuntimeError

If the JSON file does not exist or referenced IR modules are missing.

Source code in python/mneme/recorded_execution.py
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
@classmethod
def from_json(cls, fn: str):
    """
    Load a :class:`RecordedExecution` database from JSON.

    This reconstructs all :class:`KernelInstance` entries and validates that
    referenced LLVM module paths exist.

    Parameters
    ----------
    fn : str
        Path to the recorded execution JSON file.

    Returns
    -------
    RecordedExecution
        Loaded record database.

    Raises
    ------
    RuntimeError
        If the JSON file does not exist or referenced IR modules are missing.
    """
    if not Path(fn).exists():
        raise RuntimeError("JSON file does not exist")

    with open(fn, "r") as fd:
        record_db = json.load(fd)

    instances = {}
    for dhash, inst in record_db["instances"].items():
        block_dim = dim3(
            inst["BlockDims"]["x"], inst["BlockDims"]["y"], inst["BlockDims"]["z"]
        )
        grid_dim = dim3(
            inst["GridDims"]["x"], inst["GridDims"]["y"], inst["GridDims"]["z"]
        )
        instances[dhash] = cls.KernelInstance(
            record_db["StaticHash"],
            dhash,
            record_db["KernelName"],
            inst["Args"],
            inst["SharedMem"],
            block_dim,
            grid_dim,
            record_db["Specializations"],
            inst["Occurrences"],
            inst["Prologue"],
            inst["Epilogue"],
        )

    for llvm_fn in record_db["Modules"]:
        if not Path(llvm_fn).exists():
            raise RuntimeError(f"File {llvm_fn} does not exist")

    return cls(
        record_db["StaticHash"],
        record_db["KernelName"],
        record_db["DemangledName"],
        record_db["Modules"],
        record_db["ArgNames"],
        record_db["Specializations"],
        record_db["VAddr"],
        record_db["VASize"],
        instances,
    )

Link recorded LLVM IR modules into a single module suitable for replay.

This is a convenience wrapper over the Proteus JIT linking layer. Results are cached on the first call and returned on subsequent calls.

Parameters:

Name Type Description Default
prune bool

Whether to prune unused symbols/IR during linking.

True
internalize bool

Whether to internalize symbols during linking.

True

Returns:

Type Description
ModuleRef

Linked IR module produced by the JIT layer.

Source code in python/mneme/recorded_execution.py
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
def link_llvm_modules(self, prune=True, internalize=True):
    """
    Link recorded LLVM IR modules into a single module suitable for replay.

    This is a convenience wrapper over the Proteus JIT linking layer.
    Results are cached on the first call and returned on subsequent calls.

    Parameters
    ----------
    prune : bool
        Whether to prune unused symbols/IR during linking.
    internalize : bool
        Whether to internalize symbols during linking.

    Returns
    -------
    ModuleRef
        Linked IR module produced by the JIT layer.
    """
    if self._link_mod is not None:
        return self._link_mod

    self._link_mod = jit.link_llvm_modules(
        self.llvm_files, self.kernel_name, prune, internalize
    )

    return self._link_mod

to_json(fn)

Serialize this record database to a JSON file.

Parameters:

Name Type Description Default
fn str

Output JSON path.

required
Source code in python/mneme/recorded_execution.py
498
499
500
501
502
503
504
505
506
507
508
509
def to_json(self, fn: str):
    """
    Serialize this record database to a JSON file.

    Parameters
    ----------
    fn : str
        Output JSON path.
    """
    find_non_jsonables(self.to_dict())
    with open(fn, "w") as fd:
        json.dump(self.to_dict(), fd, indent=2)

SnapshotType

Bases: Enum

Enumeration of memory snapshot roles within a recorded execution.

PROLOGUE Snapshot captured immediately before kernel execution. EPILOGUE Snapshot captured immediately after kernel execution.

The snapshot type is used by the native snapshot loader to interpret the record format and to decide which parts of state are treated as inputs vs outputs during verification.

Source code in python/mneme/recorded_execution.py
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
class SnapshotType(Enum):
    """
    Enumeration of memory snapshot roles within a recorded execution.

    PROLOGUE
        Snapshot captured immediately before kernel execution.
    EPILOGUE
        Snapshot captured immediately after kernel execution.

    The snapshot type is used by the native snapshot loader to interpret the
    record format and to decide which parts of state are treated as inputs vs
    outputs during verification.
    """

    PROLOGUE = 1
    EPILOGUE = 2

find_non_jsonables(obj, where='$')

Debug helper: print paths to fields that are not JSON-serializable.

This is used as a sanity check before writing the record database to JSON. It is intentionally permissive and prints to stdout rather than raising.

Parameters:

Name Type Description Default
obj Any

Object graph to inspect.

required
where str

JSONPath-like location used when printing offending fields.

'$'
Source code in python/mneme/recorded_execution.py
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
def find_non_jsonables(obj, where="$"):
    """
    Debug helper: print paths to fields that are not JSON-serializable.

    This is used as a sanity check before writing the record database to JSON.
    It is intentionally permissive and prints to stdout rather than raising.

    Parameters
    ----------
    obj : Any
        Object graph to inspect.
    where : str
        JSONPath-like location used when printing offending fields.
    """
    if isinstance(obj, (str, int, float, bool)) or obj is None:
        return
    if isinstance(obj, Path):
        print("Non-JSON type:", where, "->", obj, type(obj))
        return
    if isinstance(obj, dict):
        for k, v in obj.items():
            find_non_jsonables(v, f"{where}.{k}")
    elif isinstance(obj, (list, tuple, set)):
        for i, v in enumerate(obj):
            find_non_jsonables(v, f"{where}[{i}]")
    else:
        # add other allowed conversions here if you plan to support them
        print("Non-JSON type:", where, "->", type(obj))

Device runtime bindings for Mneme (Python ↔ native runtime).

This module provides thin ctypes/FFI wrappers around the Mneme native runtime for loading device code objects and launching / profiling kernels.

Main abstractions
  • :class:DeviceModule loads a device object from a :class:MemBufferRef and provides access to kernel entry points.
  • :class:DeviceFunction represents a kernel function handle and exposes launch and profile operations, along with basic resource-usage queries (registers, local/const memory).
Lifetime notes

The native runtime owns device-side resources. This module mirrors those resources via ffi.ObjectRef and uses weak references to enforce correct cleanup order:

  • A :class:DeviceFunction keeps only a weakref to its parent module.
  • When a :class:DeviceModule is disposed/unloaded, it invalidates all functions obtained from it to prevent use-after-free.

DeviceFunction

Bases: ObjectRef

Handle to a device kernel function.

A DeviceFunction is obtained from a :class:DeviceModule via :meth:DeviceModule.get_function. It provides:

  • :meth:launch for a direct kernel launch (grid/block only)
  • :meth:profile for record/replay execution with prologue/epilogue state
  • Resource-usage queries via :attr:reg_usage, :attr:local_mem, and :attr:const_mem
Notes

Instances are tied to the lifetime of the parent :class:DeviceModule. If the module is unloaded, the function becomes invalid and further usage raises an error.

Source code in python/mneme/device.py
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
class DeviceFunction(ffi.ObjectRef):
    """
    Handle to a device kernel function.

    A DeviceFunction is obtained from a :class:`DeviceModule` via
    :meth:`DeviceModule.get_function`. It provides:

      - :meth:`launch` for a direct kernel launch (grid/block only)
      - :meth:`profile` for record/replay execution with prologue/epilogue state
      - Resource-usage queries via :attr:`reg_usage`, :attr:`local_mem`,
        and :attr:`const_mem`

    Notes
    -----
    Instances are tied to the lifetime of the parent :class:`DeviceModule`.
    If the module is unloaded, the function becomes invalid and further usage
    raises an error.
    """

    def __init__(self, func_ptr, module_ref, kernel_name):
        """
        Parameters
        ----------
        func_ptr
            Native function handle returned by the Mneme runtime.
        module_ref : DeviceModule
            Parent module that owns the function. A weak reference is stored.
        kernel_name : str
            Kernel symbol name (used for debugging/logging and attribution).
        """
        super(DeviceFunction, self).__init__(func_ptr)
        self._module_ref = weakref.ref(module_ref)
        self._kernel_name = kernel_name
        self.valid = True
        self._local_mem = None
        self._reg_usage = None
        self._const_mem = None

    @property
    def local_mem(self):
        """
        Local memory usage for this kernel (bytes), as reported by the runtime.

        The value is cached after the first query.

        Returns
        -------
        int
            Local memory usage in bytes.
        """
        if self._local_mem is None:
            self._local_mem = ffi.lib.MnemePy_getLocalMemUsage(self)
        return self._local_mem

    @property
    def const_mem(self):
        """
        Constant memory usage for this kernel (bytes), as reported by the runtime.

        The value is cached after the first query.

        Returns
        -------
        int
            Constant memory usage in bytes.
        """
        if self._const_mem is None:
            self._const_mem = ffi.lib.MnemePy_getConstMemUsage(self)
        return self._const_mem

    @property
    def reg_usage(self):
        """
        Register usage for this kernel (registers per thread), as reported by the runtime.

        The value is cached after the first query.

        Returns
        -------
        int
            Register usage per thread.
        """
        if self._reg_usage is None:
            self._reg_usage = ffi.lib.MnemePy_getRegisterUsage(self)
        return self._reg_usage

    def invalidate(self):
        """
        Mark this function handle as invalid.

        This is called when the owning :class:`DeviceModule` is unloaded/disposed,
        preventing use-after-free of the underlying native function handle.
        """
        self.valid = False

    def _dispose(self):
        """
        Dispose hook for ffi.ObjectRef.

        This implementation marks the function as invalid. The parent module owns
        native resources, so function disposal is primarily a logical invalidation.
        """
        self.valid = False

    def __del__(self):
        """
        Best-effort cleanup on garbage collection.

        Notes
        -----
        Python finalizers are not guaranteed to run promptly (or at all at process
        shutdown). Resource lifetime should be controlled through the parent
        :class:`DeviceModule` context manager whenever possible.
        """
        self.invalidate()

    def launch(self, grid_dim: dim3, block_dim: dim3):
        """
        Launch the kernel with the given grid/block configuration.

        Parameters
        ----------
        grid_dim : dim3
            Grid dimensions for the launch.
        block_dim : dim3
            Block dimensions for the launch.

        Raises
        ------
        RuntimeError
            If the function pointer is NULL, the parent module was unloaded, or the
            function has been invalidated.
        """

        if self._ptr is None or self._ptr.value is None:
            raise RuntimeError(
                "hipFunction_t is NULL. The module may have been unloaded."
            )

        mod = self._module_ref()
        if mod is None:
            raise RuntimeError("hipModule is NULL. The module may have been unloaded.")

        if not self.valid:
            raise RuntimeError("Cannot launch function: module was unloaded.")
        ffi.lib.MnemePy_launchKernelFunction(self, grid_dim, block_dim)

    def profile(
        self,
        grid_dim: dim3,
        block_dim: dim3,
        prologue_state: MemBufferRef,
        epilogue_state: MemBufferRef,
        shared_mem_size: int,
        iterations=5,
    ):
        """
        Execute the kernel under Mneme record/replay profiling.

        This method executes the kernel with the provided recorded prologue/epilogue
        state buffers and measures execution time across multiple iterations. The
        native runtime is responsible for timing, validation hooks, and any device
        synchronization required for consistent measurements.

        Parameters
        ----------
        grid_dim : dim3
            Grid dimensions for the launch.
        block_dim : dim3
            Block dimensions for the launch.
        prologue_state : MemBufferRef
            Recorded state buffer to initialize device memory / arguments prior to
            kernel execution.
        epilogue_state : MemBufferRef
            Recorded state buffer used to validate and/or capture post-state after
            kernel execution.
        shared_mem_size : int
            Dynamic shared memory size (bytes) for the launch.
        iterations : int, optional
            Number of kernel executions to perform for profiling.

        Raises
        ------
        RuntimeError
            If the parent module has been garbage collected.
        """
        # Set argument types
        DevMod = self._module_ref()
        if DevMod is None:
            raise RuntimeError("Device Module has been garbage collected")

        ffi.lib.MnemePy_profile(
            DevMod,
            self,
            grid_dim,
            block_dim,
            prologue_state,
            epilogue_state,
            shared_mem_size,
            iterations,
        )
        return

const_mem property

Constant memory usage for this kernel (bytes), as reported by the runtime.

The value is cached after the first query.

Returns:

Type Description
int

Constant memory usage in bytes.

local_mem property

Local memory usage for this kernel (bytes), as reported by the runtime.

The value is cached after the first query.

Returns:

Type Description
int

Local memory usage in bytes.

reg_usage property

Register usage for this kernel (registers per thread), as reported by the runtime.

The value is cached after the first query.

Returns:

Type Description
int

Register usage per thread.

__del__()

Best-effort cleanup on garbage collection.

Notes

Python finalizers are not guaranteed to run promptly (or at all at process shutdown). Resource lifetime should be controlled through the parent :class:DeviceModule context manager whenever possible.

Source code in python/mneme/device.py
180
181
182
183
184
185
186
187
188
189
190
def __del__(self):
    """
    Best-effort cleanup on garbage collection.

    Notes
    -----
    Python finalizers are not guaranteed to run promptly (or at all at process
    shutdown). Resource lifetime should be controlled through the parent
    :class:`DeviceModule` context manager whenever possible.
    """
    self.invalidate()

__init__(func_ptr, module_ref, kernel_name)

Parameters:

Name Type Description Default
func_ptr

Native function handle returned by the Mneme runtime.

required
module_ref DeviceModule

Parent module that owns the function. A weak reference is stored.

required
kernel_name str

Kernel symbol name (used for debugging/logging and attribution).

required
Source code in python/mneme/device.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
def __init__(self, func_ptr, module_ref, kernel_name):
    """
    Parameters
    ----------
    func_ptr
        Native function handle returned by the Mneme runtime.
    module_ref : DeviceModule
        Parent module that owns the function. A weak reference is stored.
    kernel_name : str
        Kernel symbol name (used for debugging/logging and attribution).
    """
    super(DeviceFunction, self).__init__(func_ptr)
    self._module_ref = weakref.ref(module_ref)
    self._kernel_name = kernel_name
    self.valid = True
    self._local_mem = None
    self._reg_usage = None
    self._const_mem = None

invalidate()

Mark this function handle as invalid.

This is called when the owning :class:DeviceModule is unloaded/disposed, preventing use-after-free of the underlying native function handle.

Source code in python/mneme/device.py
162
163
164
165
166
167
168
169
def invalidate(self):
    """
    Mark this function handle as invalid.

    This is called when the owning :class:`DeviceModule` is unloaded/disposed,
    preventing use-after-free of the underlying native function handle.
    """
    self.valid = False

launch(grid_dim, block_dim)

Launch the kernel with the given grid/block configuration.

Parameters:

Name Type Description Default
grid_dim dim3

Grid dimensions for the launch.

required
block_dim dim3

Block dimensions for the launch.

required

Raises:

Type Description
RuntimeError

If the function pointer is NULL, the parent module was unloaded, or the function has been invalidated.

Source code in python/mneme/device.py
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
def launch(self, grid_dim: dim3, block_dim: dim3):
    """
    Launch the kernel with the given grid/block configuration.

    Parameters
    ----------
    grid_dim : dim3
        Grid dimensions for the launch.
    block_dim : dim3
        Block dimensions for the launch.

    Raises
    ------
    RuntimeError
        If the function pointer is NULL, the parent module was unloaded, or the
        function has been invalidated.
    """

    if self._ptr is None or self._ptr.value is None:
        raise RuntimeError(
            "hipFunction_t is NULL. The module may have been unloaded."
        )

    mod = self._module_ref()
    if mod is None:
        raise RuntimeError("hipModule is NULL. The module may have been unloaded.")

    if not self.valid:
        raise RuntimeError("Cannot launch function: module was unloaded.")
    ffi.lib.MnemePy_launchKernelFunction(self, grid_dim, block_dim)

profile(grid_dim, block_dim, prologue_state, epilogue_state, shared_mem_size, iterations=5)

Execute the kernel under Mneme record/replay profiling.

This method executes the kernel with the provided recorded prologue/epilogue state buffers and measures execution time across multiple iterations. The native runtime is responsible for timing, validation hooks, and any device synchronization required for consistent measurements.

Parameters:

Name Type Description Default
grid_dim dim3

Grid dimensions for the launch.

required
block_dim dim3

Block dimensions for the launch.

required
prologue_state MemBufferRef

Recorded state buffer to initialize device memory / arguments prior to kernel execution.

required
epilogue_state MemBufferRef

Recorded state buffer used to validate and/or capture post-state after kernel execution.

required
shared_mem_size int

Dynamic shared memory size (bytes) for the launch.

required
iterations int

Number of kernel executions to perform for profiling.

5

Raises:

Type Description
RuntimeError

If the parent module has been garbage collected.

Source code in python/mneme/device.py
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
def profile(
    self,
    grid_dim: dim3,
    block_dim: dim3,
    prologue_state: MemBufferRef,
    epilogue_state: MemBufferRef,
    shared_mem_size: int,
    iterations=5,
):
    """
    Execute the kernel under Mneme record/replay profiling.

    This method executes the kernel with the provided recorded prologue/epilogue
    state buffers and measures execution time across multiple iterations. The
    native runtime is responsible for timing, validation hooks, and any device
    synchronization required for consistent measurements.

    Parameters
    ----------
    grid_dim : dim3
        Grid dimensions for the launch.
    block_dim : dim3
        Block dimensions for the launch.
    prologue_state : MemBufferRef
        Recorded state buffer to initialize device memory / arguments prior to
        kernel execution.
    epilogue_state : MemBufferRef
        Recorded state buffer used to validate and/or capture post-state after
        kernel execution.
    shared_mem_size : int
        Dynamic shared memory size (bytes) for the launch.
    iterations : int, optional
        Number of kernel executions to perform for profiling.

    Raises
    ------
    RuntimeError
        If the parent module has been garbage collected.
    """
    # Set argument types
    DevMod = self._module_ref()
    if DevMod is None:
        raise RuntimeError("Device Module has been garbage collected")

    ffi.lib.MnemePy_profile(
        DevMod,
        self,
        grid_dim,
        block_dim,
        prologue_state,
        epilogue_state,
        shared_mem_size,
        iterations,
    )
    return

DeviceModule

Bases: ObjectRef

Loaded device module/object.

A DeviceModule is constructed from a compiled object buffer (a :class:MemBufferRef) using :meth:from_MemBuffer. It owns the native device object handle and is responsible for releasing it.

Functions obtained via :meth:get_function are tracked and invalidated when the module is disposed to prevent use-after-free.

Source code in python/mneme/device.py
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
class DeviceModule(ffi.ObjectRef):
    """
    Loaded device module/object.

    A DeviceModule is constructed from a compiled object buffer (a :class:`MemBufferRef`)
    using :meth:`from_MemBuffer`. It owns the native device object handle and is
    responsible for releasing it.

    Functions obtained via :meth:`get_function` are tracked and invalidated when
    the module is disposed to prevent use-after-free.
    """

    def __init__(self, module_ptr):
        """
        Parameters
        ----------
        module_ptr
            Native module handle returned by the Mneme runtime.
        """
        super(DeviceModule, self).__init__(module_ptr)
        self._functions = weakref.WeakSet()

    def _dispose(self):
        """
        Dispose the underlying native module and invalidate dependent functions.

        This method is invoked by the ``ffi.ObjectRef`` disposal machinery and is
        responsible for:
          1) releasing the native device object handle
          2) invalidating all :class:`DeviceFunction` instances created from this module
        """
        self._capi.MnemePY_DisposeDeviceObject(self)
        for func in self._functions:
            func.invalidate()
        self._functions.clear()

    @classmethod
    def from_MemBuffer(cls, buffer: MemBufferRef):
        """
        Load a device module from an in-memory object buffer.

        Parameters
        ----------
        buffer : MemBufferRef
            Buffer containing a device object suitable for loading by the Mneme runtime.

        Returns
        -------
        DeviceModule
            A module that owns the loaded native device object.

        Raises
        ------
        TypeError
            If ``buffer`` is not a :class:`MemBufferRef`.
        """
        if not isinstance(buffer, MemBufferRef):
            raise TypeError(
                f"Expecting type of MemBufferRef instead got {type(buffer)}"
            )
        return cls(ffi.lib.MnemePY_getDeviceObject(buffer))

    def get_function(self, kernel_name: str):
        """
        Resolve a kernel function from the loaded module.

        Parameters
        ----------
        kernel_name : str
            Kernel symbol name to resolve.

        Returns
        -------
        DeviceFunction
            Function handle bound to this module.

        Notes
        -----
        The returned function is tracked by the module and will be invalidated when
        the module is disposed.
        """
        func = ffi.lib.MnemePY_getKernelFunctionFromImage(
            self,
            _encode_string(kernel_name),
        )

        dev_func = DeviceFunction(func, self, kernel_name)
        self._functions.add(dev_func)
        return dev_func

__init__(module_ptr)

Parameters:

Name Type Description Default
module_ptr

Native module handle returned by the Mneme runtime.

required
Source code in python/mneme/device.py
292
293
294
295
296
297
298
299
300
def __init__(self, module_ptr):
    """
    Parameters
    ----------
    module_ptr
        Native module handle returned by the Mneme runtime.
    """
    super(DeviceModule, self).__init__(module_ptr)
    self._functions = weakref.WeakSet()

from_MemBuffer(buffer) classmethod

Load a device module from an in-memory object buffer.

Parameters:

Name Type Description Default
buffer MemBufferRef

Buffer containing a device object suitable for loading by the Mneme runtime.

required

Returns:

Type Description
DeviceModule

A module that owns the loaded native device object.

Raises:

Type Description
TypeError

If buffer is not a :class:MemBufferRef.

Source code in python/mneme/device.py
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
@classmethod
def from_MemBuffer(cls, buffer: MemBufferRef):
    """
    Load a device module from an in-memory object buffer.

    Parameters
    ----------
    buffer : MemBufferRef
        Buffer containing a device object suitable for loading by the Mneme runtime.

    Returns
    -------
    DeviceModule
        A module that owns the loaded native device object.

    Raises
    ------
    TypeError
        If ``buffer`` is not a :class:`MemBufferRef`.
    """
    if not isinstance(buffer, MemBufferRef):
        raise TypeError(
            f"Expecting type of MemBufferRef instead got {type(buffer)}"
        )
    return cls(ffi.lib.MnemePY_getDeviceObject(buffer))

get_function(kernel_name)

Resolve a kernel function from the loaded module.

Parameters:

Name Type Description Default
kernel_name str

Kernel symbol name to resolve.

required

Returns:

Type Description
DeviceFunction

Function handle bound to this module.

Notes

The returned function is tracked by the module and will be invalidated when the module is disposed.

Source code in python/mneme/device.py
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
def get_function(self, kernel_name: str):
    """
    Resolve a kernel function from the loaded module.

    Parameters
    ----------
    kernel_name : str
        Kernel symbol name to resolve.

    Returns
    -------
    DeviceFunction
        Function handle bound to this module.

    Notes
    -----
    The returned function is tracked by the module and will be invalidated when
    the module is disposed.
    """
    func = ffi.lib.MnemePY_getKernelFunctionFromImage(
        self,
        _encode_string(kernel_name),
    )

    dev_func = DeviceFunction(func, self, kernel_name)
    self._functions.add(dev_func)
    return dev_func

get_device_arch()

Return the current device architecture identifier.

Returns:

Type Description
str

Architecture string reported by the Mneme runtime (backend-defined).

Source code in python/mneme/device.py
371
372
373
374
375
376
377
378
379
380
def get_device_arch():
    """
    Return the current device architecture identifier.

    Returns
    -------
    str
        Architecture string reported by the Mneme runtime (backend-defined).
    """
    return str(ffi.lib.MnemePy_getDeviceArch().decode())

get_device_count()

Return the number of visible devices.

Returns:

Type Description
int

Device count as reported by the Mneme runtime.

Source code in python/mneme/device.py
383
384
385
386
387
388
389
390
391
392
def get_device_count():
    """
    Return the number of visible devices.

    Returns
    -------
    int
        Device count as reported by the Mneme runtime.
    """
    return int(ffi.lib.MnemePy_getDeviceCount())

set_device(dev_id)

Set the active device for subsequent device operations.

Parameters:

Name Type Description Default
dev_id int

Device index to select.

required
Source code in python/mneme/device.py
395
396
397
398
399
400
401
402
403
404
def set_device(dev_id: int):
    """
    Set the active device for subsequent device operations.

    Parameters
    ----------
    dev_id : int
        Device index to select.
    """
    ffi.lib.MnemePy_setDevice(dev_id)

mneme.replay_executor

Core record–replay execution pipeline for Mneme.

This module provides the execution backbone used by both: * the synchronous CLI execution path (via BaseExecutor subclasses), and * the asynchronous tuning engine (via TuneWorker).

At a high level, an "experiment" in Mneme is: 1) Load a recorded kernel execution (RecordedExecution + KernelInstance). 2) Reconstruct the recorded GPU memory state (prologue/epilogue snapshots) into a managed virtual address space (PageManagerRef). 3) Link recorded LLVM IR modules into a single IR module suitable for replay. 4) Apply optional IR specializations (arguments, launch dims, launch bounds). 5) Run an optimization pipeline and generate a device object. 6) Load the object onto the GPU, run the kernel, and optionally profile. 7) Verify correctness by comparing epilogue vs prologue expectations.

The pipeline is intentionally organized so that: * verification can be done with minimal instrumentation, * tracked runs collect timing/resource metrics, and * worker processes can amortize initialization costs by reusing a single executor.

Public API

BaseExecutor: Base class that owns GPU affinity, recorded state, and the build/run pipeline.

TuneWorker: Worker-process implementation used by the async tuning infrastructure.

BaseExecutor

Base class for executing Mneme record–replay experiments.

A BaseExecutor instance is bound to: * one recorded database file (record_db), * one kernel instance inside that database (record_id), * one GPU device (device_id), * and an iteration count for measured runs.

Responsibilities
  • Load the recorded execution metadata (RecordedExecution) and select the target KernelInstance (kernel_descr).
  • Pin the current OS process to a specific GPU device (set_device()).
  • Manage the replay address space and recorded snapshots:
    • PageManagerRef selects/initializes the virtual address space.
    • prologue/epilogue snapshots are opened and later compared.
  • Provide a structured pipeline that takes IR -> object -> execution:
    • _preprocess_ir(): apply specialization transforms and compute a variant hash
    • _optimize(): run pass pipeline / O-level selection
    • _codegen(): lower to a device object (MemBufferRef)
    • _run(): load object, resolve kernel, execute and optionally profile
    • _execute(): orchestrate verification + cleanup + tracked run
Lifecycle

BaseExecutor is designed to be used as a context manager:

executor = MyExecutor(record_db=..., record_id=..., device_id=...)
root_ir = executor.link_ir()
with executor:
    res = executor.execute(...)

The context manager ensures GPU memory state (snapshots + page manager) is opened exactly once and released even when execution raises.

Notes / invariants
  • A BaseExecutor instance is intended to be used within a single process. (Workers should construct one executor per worker process.)
  • open() must be called before any execution; _execute() assumes prologue and epilogue states are loaded.
  • link_ir() returns a linked IR module representing the recorded kernel; callers should clone before mutation if reusing across experiments.
Source code in python/mneme/replay_executor.py
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
class BaseExecutor:
    """
    Base class for executing Mneme record–replay experiments.

    A BaseExecutor instance is bound to:
      * one recorded database file (record_db),
      * one kernel instance inside that database (record_id),
      * one GPU device (device_id),
      * and an iteration count for measured runs.

    Responsibilities
    ---------------
    * Load the recorded execution metadata (RecordedExecution) and select the
      target KernelInstance (kernel_descr).
    * Pin the current OS process to a specific GPU device (set_device()).
    * Manage the replay address space and recorded snapshots:
        - PageManagerRef selects/initializes the virtual address space.
        - prologue/epilogue snapshots are opened and later compared.
    * Provide a structured pipeline that takes IR -> object -> execution:
        - _preprocess_ir(): apply specialization transforms and compute a variant hash
        - _optimize(): run pass pipeline / O-level selection
        - _codegen(): lower to a device object (MemBufferRef)
        - _run(): load object, resolve kernel, execute and optionally profile
        - _execute(): orchestrate verification + cleanup + tracked run

    Lifecycle
    ---------
    BaseExecutor is designed to be used as a context manager:

        executor = MyExecutor(record_db=..., record_id=..., device_id=...)
        root_ir = executor.link_ir()
        with executor:
            res = executor.execute(...)

    The context manager ensures GPU memory state (snapshots + page manager) is
    opened exactly once and released even when execution raises.

    Notes / invariants
    ------------------
    * A BaseExecutor instance is intended to be used within a single process.
      (Workers should construct one executor per worker process.)
    * open() must be called before any execution; _execute() assumes prologue and
      epilogue states are loaded.
    * link_ir() returns a linked IR module representing the recorded kernel; callers
      should clone before mutation if reusing across experiments.
    """

    def __init__(
        self,
        record_db: str = "",
        record_id: str = "",
        iterations: int = 3,
        device_id: int = 0,
    ):
        self.record_db = record_db
        self.record_id = record_id
        self.device_id = device_id
        logger.debug(
            f"BaseExecutor Got {self.record_db} and {self.record_id} and will run on device:{self.device_id}"
        )
        self.records = RecordedExecution.from_json(self.record_db)
        self.kernel_descr = self.records[self.record_id]
        self.device_arch = get_device_arch()
        self._epilogue = None
        self._prologue = None
        self._page_manager = None
        self._iterations = iterations
        self.num_devices = get_device_count()
        set_device(device_id)
        logger.debug(
            f"GPU Affinity of process was set to device:{self.device_id} out of {self.num_devices}"
        )

    def open(self):
        # Note the 'executor' allocates all resources and picks address space.
        self._page_manager = PageManagerRef(
            self.device_id, self.records.va_addr, self.records.va_size
        )
        self._prologue = self.kernel_descr.prologue.open()
        self._epilogue = self.kernel_descr.epilogue.open()
        return self

    @property
    def prologue(self):
        return self._prologue

    @property
    def epilogue(self):
        return self._epilogue

    def close(self):
        if self._epilogue is not None:
            self._epilogue.close()
            self._epilogue = None
        if self._prologue is not None:
            self._prologue.close()
            self._prologue = None
        if self._page_manager is not None:
            self._page_manager.close()
            self._page_manager = None

    def __enter__(self):
        return self.open()

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.close()
        return False

    def link_ir(self):
        return self.records.link_llvm_modules(prune=True, internalize=True)

    @cond_time("preprocess_ir_time")
    def _preprocess_ir(
        self,
        result: ExperimentResult,
        config: ExperimentConfiguration,
        llvm_ir: ModuleRef,
    ) -> Tuple[str, ModuleRef]:
        """
        Apply IR-level preprocessing and specialization transformations prior to
        kernel code generation.

        This method computes a deterministic code hash reflecting all applied
        specializations and transformations. The input IR module may be modified
        in-place depending on the selected configuration options. The resulting
        hash is used to uniquely identify the transformed kernel and ensure
        reproducibility across record/replay runs.

        The preprocessing pipeline consists of the following conditional steps:

        1. **Argument specialization** (``config.specialize``)
           Specializes the kernel based on recorded argument values from the
           prologue. This may produce more optimized IR for kernels whose behavior
           depends on constant parameters.

        2. **Launch-dimension specialization** (``config.specialize_dims``)
           Specializes the kernel based on the provided grid and block dimensions,
           enabling IR simplification or elimination of dimension-dependent logic.

        3. **Launch-bounds insertion** (``config.set_launch_bounds``)
           Applies explicit CUDA/HIP launch bounds using the maximum threads per block
           and minimum blocks per SM provided in the experiment configuration.

        Each transformation updates the evolving code hash to reflect the applied
        change, ensuring that semantically distinct IR variants map to unique
        identifiers.

        Parameters
        ----------
        result : ExperimentResult
            The experiment result object that may be updated during preprocessing.
            (Currently unused directly, but modified by the decorator (``cond_time``)).
        config : ExperimentConfiguration
            Configuration controlling which IR specializations are applied.
        llvm_ir : ModuleRef
            Intermediate representation (LLVM-like) to be specialized. The module
            may be modified during preprocessing.

        Returns
        -------
        (str, ModuleRef)
            A tuple containing:

            * **str** – The updated code hash after all applicable transformations.
            * **ModuleRef** – The (potentially modified) IR module.

        Notes
        -----
        * IR-specialization routines are delegated to the ``proteus`` subsystem.

        """

        code_hash = self.kernel_descr.static_hash

        if config.specialize:
            code_hash = jit.specialize_args(
                llvm_ir,
                code_hash,
                self.kernel_descr.kernel_name,
                self.prologue.args,
                self.prologue.num_args,
                self.kernel_descr.available_specializations,
            )

        if config.specialize_dims:
            code_hash = jit.specialize_dims(
                llvm_ir,
                code_hash,
                self.kernel_descr.kernel_name,
                config.grid,
                config.block,
            )
        if config.set_launch_bounds:
            code_hash = jit.set_launch_bounds(
                llvm_ir,
                code_hash,
                self.kernel_descr.kernel_name,
                config.max_threads,
                config.min_blocks_per_sm,
            )
        return code_hash, llvm_ir

    @cond_time("opt_time")
    def _optimize(
        self,
        result: ExperimentResult,
        config: ExperimentConfiguration,
        ir_module: ModuleRef,
    ):
        """
        Apply optimization passes to the IR module prior to code generation.

        This method invokes the JIT optimization pipeline configured for the current
        device architecture. The pipeline typically includes both generic compiler
        optimizations (e.g., ``O1–O3``) and Mneme-specific IR transformations
        specified in the experiment configuration. Optimization operates in-place on
        the provided IR module.

        Parameters
        ----------
        result : ExperimentResult
            The experiment result object. Although not modified directly in this
            method, it is modified by the decorator (``cond_time``).
        config : ExperimentConfiguration
            Configuration controlling the optimization pipeline. Relevant fields
            include:
            * ``passes`` – Name or specification of the optimization pass pipeline.
            * ``codegen_opt`` – Code generation optimization level.
        ir_module : ModuleRef
            The intermediate representation to be optimized. The module is mutated
            in-place by the underlying JIT subsystem.

        Notes
        -----
        * Optimization routines are delegated to ``jit.optimize``.
        * The optimization phase typically precedes code generation and may
          significantly affect both performance and final code size.
        * This method does not return a value; the IR module is modified directly.

        """
        jit.optimize(ir_module, self.device_arch, config.passes, config.codegen_opt)

    @cond_time("codegen_time")
    def _codegen(
        self,
        result: ExperimentResult,
        config: ExperimentConfiguration,
        ir_module: ModuleRef,
    ) -> MemBufferRef:
        """
        Generate a device-executable object from the optimized IR module.

        This method invokes the JIT backend to lower the intermediate representation
        into a binary object suitable for loading and execution on the target device.
        The resulting artifact is returned as a :class:`MemBufferRef`. Code generation
        behavior—including backend choice and optimization level—is controlled by the
        experiment configuration.

        Parameters
        ----------
        result : ExperimentResult
            Experiment result object. Not modified directly by this method, but
            modified from the decorator (``cond_time``).
        config : ExperimentConfiguration
            Experiment configuration specifying code-generation parameters:
            * ``codegen_method`` – Backend or strategy used for code generation
              (e.g., ``"serial"`` or alternative compilation modes).
            * ``codegen_opt`` – Optimization level for the code-generation backend.
        ir_module : ModuleRef
            Optimized IR module to be lowered into an executable object. Must be the
            output of prior preprocessing and optimization stages.

        Returns
        -------
        MemBufferRef
            A memory buffer containing the generated object code. This buffer can be
            loaded into a device runtime via ``DeviceModule.from_MemBuffer`` for
            execution.

        Notes
        -----
        * The code generation step is performed by the ``jit.codegen_object`` backend.
        * Code generation typically represents the final stage of the build pipeline
          before the kernel is executed on the device.
        * Returned memory buffers may include architecture-specific metadata depending
          on the JIT backend used.

        """
        return jit.codegen_object(
            ir_module, self.device_arch, config.codegen_method, config.codegen_opt
        )

    @cond_gpu_time("exec_time")
    def _run_kernel(
        self,
        result: ExperimentResult,
        config: ExperimentConfiguration,
        kernel_name: str,
        device_func: DeviceFunction,
        iterations: int,
    ) -> None:
        """
        Execute the kernel on the device using the provided launch configuration.

        This method invokes the device-level profiling interface to run the kernel
        for the specified number of iterations. Launch parameters (grid, block, and
        shared memory), as well as the recorded prologue and epilogue states, are
        passed directly to the device runtime. Profiling results—such as execution
        times—are captured internally by the device function object and propagated
        into the associated :class:`ExperimentResult`.

        Parameters
        ----------
        result : ExperimentResult
            The result object that accumulates execution metrics. Although this
            method does not write to it directly, profiling performed by the device
            backend updates fields that will later be reflected in the result through
            the ``cond_gpu_time`` decorator.
        config : ExperimentConfiguration
            Experiment configuration specifying launch parameters, shared-memory
            requirements, and specialization settings.
        kernel_name: str
            The name of the kernel to be executed. The parameter is not directly used by the function,
            but it is actually used by the ``decorator``.
        device_func : DeviceFunction
            The device-side kernel entry point obtained from the compiled module.
            Must support the ``profile`` interface for execution and timing.
        iterations : int
            Number of times the kernel should be executed. Typically more than one
            iteration is used for performance characterization and variance analysis.

        Notes
        -----
        * Actual execution and profiling of the kernel is handled by
          ``device_func.profile``.
        * Both prologue and epilogue states are forwarded to the device runtime so
          that Mneme’s record–replay mechanism can validate kernel behavior and
          collect replay-specific metrics.
        * Errors raised by the device runtime will propagate upward to the caller.

        """
        device_func.profile(
            config.grid,
            config.block,
            self._prologue._state,
            self._epilogue._state,
            config.shared_mem,
            iterations,
        )

    def _build(
        self,
        result: ExperimentResult,
        config: ExperimentConfiguration,
        ir_module: ModuleRef,
        track: bool,
    ) -> MemBufferRef:
        """
        Build the executable device kernel from the given IR module.

        This method runs the full compilation pipeline on the provided IR module:
        preprocessing, optimization, and final code generation. The resulting
        device-ready binary is returned as a :class:`MemBufferRef`. When tracking
        is enabled, additional metadata such as object size is recorded into the
        provided :class:`ExperimentResult`.

        The build process consists of the following stages:

        1. **IR preprocessing**
           Applies specialization, dimension-dependent transformations, and optional
           launch-bound insertion. This step updates the internal code hash and
           prepares the IR for optimization.

        2. **Optimization**
           Runs the configured optimization passes (e.g., ``O3`` or user-defined
           pipelines). If profiling is enabled, optimization timing is recorded in
           the experiment result.

        3. **Code generation**
           Lowers the optimized IR into a device-executable artifact. The resulting
           binary is wrapped in a :class:`MemBufferRef`. When ``track=True``,
           the size of the generated object code is stored in ``result.obj_size``.

        Parameters
        ----------
        result : ExperimentResult
            Result object to be populated with build metrics such as optimization
            time and object size.
        config : ExperimentConfiguration
            Configuration controlling specialization, optimization pipeline, and
            code-generation strategy.
        ir_module : ModuleRef
            Intermediate representation on which the build pipeline operates.
            The module may be transformed during preprocessing and optimization.
        track : bool
            Whether to collect profiling information and resource-usage statistics
            during the build process.

        Returns
        -------
        MemBufferRef
            A memory buffer containing the compiled device module produced by the
            code-generation stage.

        Notes
        -----
        * This method does not execute the kernel; execution occurs in :meth:`_run`.
        * Tracking is optional but recommended when performance analysis is needed.
        """
        self._preprocess_ir(result, config, ir_module, profile=track)
        self._optimize(result, config, ir_module, profile=track)
        mem_buffer = self._codegen(result, config, ir_module, profile=track)
        if track:
            result.obj_size = mem_buffer.get_size()
        return mem_buffer

    def _run(
        self,
        result: ExperimentResult,
        config: ExperimentConfiguration,
        mem_buffer: MemBufferRef,
        track: bool,
        iterations: int,
    ):
        """
        Execute a compiled kernel on the device and optionally collect resource-usage
        metadata.

        This method loads the device module from the provided memory buffer, extracts
        the kernel function, and executes it for the requested number of iterations.
        When ``track`` is enabled, the kernel launch is profiled and register usage,
        local memory usage, and constant memory usage are recorded into the provided
        :class:`ExperimentResult` object.

        Parameters
        ----------
        result : ExperimentResult
            Result object that will be populated with execution metrics and resource
            usage information.
        config : ExperimentConfiguration
            The configuration controlling launch parameters such as grid, block,
            specialization settings, and shared-memory use.
        mem_buffer : MemBufferRef
            Memory buffer containing the device-side compiled module from which the
            kernel function is loaded.
        track : bool
            If ``True``, profiling and resource-usage tracking are enabled for the
            kernel execution. This populates register usage, constant memory usage,
            and local memory usage in the experiment result.
        iterations : int
            Number of times the kernel should be executed. Typically more than one
            run is used when statistical accuracy is required.

        Notes
        -----
        * The device module is managed via a context manager to ensure allocation and
          cleanup follow the device runtime’s requirements.
        * Resource usage fields are only updated when ``track=True``.
        * Actual execution is delegated to :meth:`_run_kernel`.

        ------
        """
        with DeviceModule.from_MemBuffer(mem_buffer) as DeviceObj:
            device_func = DeviceObj.get_function(self.kernel_descr.kernel_name)
            self._run_kernel(
                result,
                config,
                self.kernel_descr.kernel_name,
                device_func,
                iterations,
                profile=track,
            )
            if track:
                result.reg_usage = device_func.reg_usage
                result.const_mem_usage = device_func.const_mem
                result.local_mem_usage = device_func.local_mem

    def _execute(
        self,
        result: ExperimentResult,
        config: ExperimentConfiguration,
        ir_module: ModuleRef,
    ) -> ModuleRef:
        """
        Execute a single Mneme experiment using the given configuration and IR module.

        This method performs the full record/replay experiment pipeline, including
        verification, IR cleanup, code generation, and timed execution. It returns
        both the populated experiment result and the transformed IR module.

        The execution consists of three stages:

        1. **Verification pass**
           A clone of the input IR module is built and executed once without
           instrumentation. This ensures the recorded prologue and epilogue states
           match, allowing the system to validate kernel determinism and correctness.

        2. **IR sanitization**
           A custom transformation is applied to remove automatically inserted Clang
           initialization code. Only IR regions explicitly marked by Clang are
           removed to avoid disturbing user code.

        3. **Instrumented execution**
           The cleaned up version of the kernel is built with tracking enabled.
           The kernel is executed ``iterations + 2`` times to allow downstream
           statistical metrics to be computed reliably. Execution time, resource
           usage, and other experiment metrics are accumulated into the resulting
           :class:`ExperimentResult`.

        Parameters
        ----------
        result : ExperimentResult
            The container to store all the collected/counted values to.
        config : ExperimentConfiguration
            The experiment configuration controlling launch parameters, specialization,
            and code generation settings.
        ir_module : ModuleRef
            The LLVM-like intermediate representation module on which the experiment
            is executed. The module is cloned and the ``ir_module`` is not modified.

        Returns
        -------
        (ExperimentResult, ModuleRef)
            A tuple containing:

            * **ExperimentResult** – Populated result object containing verification
              status, execution metrics, and profiling data.
            * **ModuleRef** – The transformed IR module after auto-initialization
              removal and other modifications performed during execution.

        Raises
        ------
        RuntimeError
            If internal prologue or epilogue state is unexpectedly ``None``.
        """
        if self._prologue._state is None or self._epilogue._state is None:
            raise RuntimeError("States should never be none when executing a kernel")

        # NOTE: 1. First we need to verify.
        ver_mod = ir_module.clone()
        mem_buffer = self._build(result, config, ver_mod, False)
        self._run(result, config, mem_buffer, False, 1)
        result.verified = self.prologue == self.epilogue

        # NOTE: 2. We apply a custom pass to delete all clang insered code.
        # It is hard to identify these cases, So we delete only things
        # that have been attributed by clang
        ir_module = transform.remove_auto_initialize(ir_module.clone())
        # Done with verification. Moving to next stage

        # NOTE: 3. We build and run. We set tracking on and we always execute iterations +2,
        # to enalbe later computation of statistical metrics etc.
        mem_buffer = self._build(result, config, ir_module, True)
        self._run(result, config, mem_buffer, True, self._iterations + 2)
        result.executed = True

        return ir_module

TuneWorker

Bases: BaseExecutor

Worker-side executor used by the asynchronous tuning infrastructure.

TuneWorker is a concrete :class:BaseExecutor specialization intended to run inside a dedicated worker process. It owns the GPU affinity, prologue/epilogue state, page manager, and JIT pipeline required to compile and replay a recorded kernel under a given :class:ExperimentConfiguration.

A worker process typically: 1) Initializes profiling and selects a GPU device. 2) Loads the recorded execution (record DB + record ID). 3) Links the recorded LLVM IR into a single module (link_ir). 4) Enters a message-processing loop (see :meth:run) to evaluate configurations.

Notes
  • The public entry point for the worker process is :meth:run, which is designed to be used as a multiprocessing target.
  • Per-request execution is handled by :meth:process_payload, which builds, verifies, and runs the kernel according to the provided configuration.
Source code in python/mneme/replay_executor.py
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
class TuneWorker(BaseExecutor):
    """
    Worker-side executor used by the asynchronous tuning infrastructure.

    ``TuneWorker`` is a concrete :class:`BaseExecutor` specialization intended to run
    inside a dedicated worker process. It owns the GPU affinity, prologue/epilogue
    state, page manager, and JIT pipeline required to compile and replay a recorded
    kernel under a given :class:`ExperimentConfiguration`.

    A worker process typically:
      1) Initializes profiling and selects a GPU device.
      2) Loads the recorded execution (record DB + record ID).
      3) Links the recorded LLVM IR into a single module (``link_ir``).
      4) Enters a message-processing loop (see :meth:`run`) to evaluate configurations.

    Notes
    -----
    * The public entry point for the worker process is :meth:`run`, which is designed
      to be used as a multiprocessing target.
    * Per-request execution is handled by :meth:`process_payload`, which builds,
      verifies, and runs the kernel according to the provided configuration.
    """

    def __init__(self, *args, **kwargs):
        """
        Construct a TuneWorker and initialize worker-local profiling.

        This constructor initializes the Mneme profiler (for timing breakdowns) and
        then delegates initialization to :class:`BaseExecutor`. The base class sets
        device affinity, loads the recorded execution, and prepares prologue/epilogue
        descriptors.

        Notes
        -----
        * The worker process should typically construct a single TuneWorker instance
          and reuse it for multiple requests to amortize startup overhead.
        * ``init_profiler()`` is required to be executed once by every OS process executing it
            multiple times results to undefined behavior.
        """
        init_profiler()
        super().__init__(*args, **kwargs)

    def process_payload(
        self, ir_module, config: ExperimentConfiguration
    ) -> Tuple[ExperimentResult, ModuleRef]:
        """
        Execute one tuning request: build, verify, and run the kernel under ``config``.

        This method is the unit of work performed by a worker in response to a tuning
        request. It executes the full Mneme record–replay pipeline using the provided
        IR module and configuration:

          1) Records the experiment start timestamp.
          2) Invokes the base executor pipeline (see :meth:`BaseExecutor._execute`),
             which performs verification, IR sanitization, compilation, and timed execution.
          3) Records the experiment end timestamp and annotates the result with the GPU id.
          4) Returns both the populated :class:`ExperimentResult` and the transformed IR.

        Parameters
        ----------
        ir_module : ModuleRef
            Root IR module (or clone) used as input for this experiment. The module
            is cloned internally and transformed as part of the execution pipeline.
        config : ExperimentConfiguration
            Configuration describing launch parameters, specialization options, and
            code-generation controls for this experiment.

        Returns
        -------
        (ExperimentResult, ModuleRef)
            A tuple containing:

            * **ExperimentResult** – result object populated with timing, verification
              status, and device resource usage.
            * **ModuleRef** – the transformed IR module after preprocessing and
              auto-initialization removal.

        Notes
        -----
        * This method is expected to be called repeatedly within the worker loop;
          callers should pass a cloned IR module to avoid cross-experiment mutation.
        * Timestamps are recorded in ISO 8601 format using UTC time.
        """
        result = ExperimentResult()
        result.start_time = datetime.now(timezone.utc).isoformat()
        generated_ir = super()._execute(result, config, ir_module)
        result.end_time = datetime.now(timezone.utc).isoformat()
        result.gpu_id = self.device_id
        return result, generated_ir

    @staticmethod
    def run(
        request_q: Queue,
        response_q: Queue,
        record_db: str,
        record_id: str,
        device_id: int,
        iterations: int,
        results_db_dir: str,
        state: Event,
    ):
        """
        Worker process entry point: initialize resources and serve requests from a queue.

        This method is designed to be used as the target function for a worker
        ``multiprocessing.Process``. It performs one-time initialization and then
        enters a blocking loop that processes messages from ``request_q``.

        Initialization performed once per worker:
          1) Redirects stdout/stderr to a per-worker log file:
             ``{results_db_dir}/Worker-{device_id}.log``. This avoids interleaved
             output across processes.
          2) Constructs a :class:`TuneWorker` with the given recording and device id.
          3) Links and caches the root IR module (``root_ir``) that will be cloned
             per experiment.
          4) Opens GPU memory/prologue/epilogue resources via the executor context
             manager (``with worker as Memory``).
          5) Signals readiness by setting ``state``.

        Message protocol:
          - ``{"payload": "terminate", ...}``:
            Stop the worker loop and exit.
          - ``{"payload": "process", "exp_id": <id>, "data": <config-dict>}``:
            Execute an experiment and respond on ``response_q`` with:
            ``{"exp_id": <id>, "payload": "result", "data": <result-dict>, "llvm_ir": ""}``.

        Parameters
        ----------
        request_q : multiprocessing.Queue
            Queue from which the worker receives control messages and experiment requests.
        response_q : multiprocessing.Queue
            Queue to which the worker publishes experiment results.
        record_db : str
            Path to the recorded execution database/file used to construct the executor.
        record_id : str
            Identifier of the recorded kernel instance inside ``record_db``.
        device_id : int
            GPU device index to which this worker process is pinned.
        iterations : int
            Number of kernel iterations to execute during the tracked run (the full
            execution may include additional runs for verification/warmup depending
            on the executor pipeline).
        results_db_dir : str
            Directory where per-worker logs and output artifacts are written.
        state : multiprocessing.Event
            Event used to signal to the parent process that initialization is complete
            and the worker is ready to accept requests.

        Notes
        -----
        * The worker loop blocks on ``request_q.get()`` until a message arrives.
        * The worker clones ``root_ir`` per request to avoid cross-request IR mutation.
        * Exceptions raised inside the loop will currently propagate and terminate the
          worker process; higher-level infrastructure should treat this as a worker crash.

        """
        # NOTE: We open a file for every individual executor and give persmisions, then we redirect stdout/stderr
        # to that file. We do this to not conflict our messages

        fd_out = os.open(
            f"{results_db_dir}/Worker-{device_id}.log",
            os.O_WRONLY | os.O_CREAT | os.O_APPEND,
        )
        os.dup2(fd_out, 1)  # 1 = stdout
        os.dup2(fd_out, 2)  # 2 = stderr
        worker = TuneWorker(
            record_db=record_db,
            record_id=record_id,
            device_id=device_id,
            iterations=iterations,
        )
        # Open GPU memory, setup prologue epilogue and create a single
        # LLVM IR file to start working on optimizations
        root_ir = worker.link_ir()

        with worker as Memory:
            state.set()
            logger.debug(f"Worker running on {worker.device_id} starts busy loop")
            while True:
                msg = request_q.get()
                if msg["payload"] == "terminate":
                    logger.debug(
                        f"Worker {worker.device_id} received terminate request, exiting ..."
                    )
                    break
                elif msg["payload"] == "process":
                    logger.debug(
                        f"Worker {worker.device_id} received processing request {msg['exp_id']}"
                    )
                    exp, ir = worker.process_payload(
                        root_ir.clone(), ExperimentConfiguration.from_dict(msg["data"])
                    )
                    # final = resdb.save_ir(ir, exp.hash())
                    logger.debug(
                        f"Worker {worker.device_id} finalized processing request {msg['exp_id']}"
                    )

                    response_q.put(
                        {
                            "exp_id": msg["exp_id"],
                            "payload": "result",
                            "data": exp.to_dict(),
                            "llvm_ir": "",
                        }
                    )
                else:
                    logger.warning(f"Received unknown message {msg}")

        return

__init__(*args, **kwargs)

Construct a TuneWorker and initialize worker-local profiling.

This constructor initializes the Mneme profiler (for timing breakdowns) and then delegates initialization to :class:BaseExecutor. The base class sets device affinity, loads the recorded execution, and prepares prologue/epilogue descriptors.

Notes
  • The worker process should typically construct a single TuneWorker instance and reuse it for multiple requests to amortize startup overhead.
  • init_profiler() is required to be executed once by every OS process executing it multiple times results to undefined behavior.
Source code in python/mneme/replay_executor.py
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
def __init__(self, *args, **kwargs):
    """
    Construct a TuneWorker and initialize worker-local profiling.

    This constructor initializes the Mneme profiler (for timing breakdowns) and
    then delegates initialization to :class:`BaseExecutor`. The base class sets
    device affinity, loads the recorded execution, and prepares prologue/epilogue
    descriptors.

    Notes
    -----
    * The worker process should typically construct a single TuneWorker instance
      and reuse it for multiple requests to amortize startup overhead.
    * ``init_profiler()`` is required to be executed once by every OS process executing it
        multiple times results to undefined behavior.
    """
    init_profiler()
    super().__init__(*args, **kwargs)

process_payload(ir_module, config)

Execute one tuning request: build, verify, and run the kernel under config.

This method is the unit of work performed by a worker in response to a tuning request. It executes the full Mneme record–replay pipeline using the provided IR module and configuration:

1) Records the experiment start timestamp. 2) Invokes the base executor pipeline (see :meth:BaseExecutor._execute), which performs verification, IR sanitization, compilation, and timed execution. 3) Records the experiment end timestamp and annotates the result with the GPU id. 4) Returns both the populated :class:ExperimentResult and the transformed IR.

Parameters:

Name Type Description Default
ir_module ModuleRef

Root IR module (or clone) used as input for this experiment. The module is cloned internally and transformed as part of the execution pipeline.

required
config ExperimentConfiguration

Configuration describing launch parameters, specialization options, and code-generation controls for this experiment.

required

Returns:

Type Description
(ExperimentResult, ModuleRef)

A tuple containing:

  • ExperimentResult – result object populated with timing, verification status, and device resource usage.
  • ModuleRef – the transformed IR module after preprocessing and auto-initialization removal.
Notes
  • This method is expected to be called repeatedly within the worker loop; callers should pass a cloned IR module to avoid cross-experiment mutation.
  • Timestamps are recorded in ISO 8601 format using UTC time.
Source code in python/mneme/replay_executor.py
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
def process_payload(
    self, ir_module, config: ExperimentConfiguration
) -> Tuple[ExperimentResult, ModuleRef]:
    """
    Execute one tuning request: build, verify, and run the kernel under ``config``.

    This method is the unit of work performed by a worker in response to a tuning
    request. It executes the full Mneme record–replay pipeline using the provided
    IR module and configuration:

      1) Records the experiment start timestamp.
      2) Invokes the base executor pipeline (see :meth:`BaseExecutor._execute`),
         which performs verification, IR sanitization, compilation, and timed execution.
      3) Records the experiment end timestamp and annotates the result with the GPU id.
      4) Returns both the populated :class:`ExperimentResult` and the transformed IR.

    Parameters
    ----------
    ir_module : ModuleRef
        Root IR module (or clone) used as input for this experiment. The module
        is cloned internally and transformed as part of the execution pipeline.
    config : ExperimentConfiguration
        Configuration describing launch parameters, specialization options, and
        code-generation controls for this experiment.

    Returns
    -------
    (ExperimentResult, ModuleRef)
        A tuple containing:

        * **ExperimentResult** – result object populated with timing, verification
          status, and device resource usage.
        * **ModuleRef** – the transformed IR module after preprocessing and
          auto-initialization removal.

    Notes
    -----
    * This method is expected to be called repeatedly within the worker loop;
      callers should pass a cloned IR module to avoid cross-experiment mutation.
    * Timestamps are recorded in ISO 8601 format using UTC time.
    """
    result = ExperimentResult()
    result.start_time = datetime.now(timezone.utc).isoformat()
    generated_ir = super()._execute(result, config, ir_module)
    result.end_time = datetime.now(timezone.utc).isoformat()
    result.gpu_id = self.device_id
    return result, generated_ir

run(request_q, response_q, record_db, record_id, device_id, iterations, results_db_dir, state) staticmethod

Worker process entry point: initialize resources and serve requests from a queue.

This method is designed to be used as the target function for a worker multiprocessing.Process. It performs one-time initialization and then enters a blocking loop that processes messages from request_q.

Initialization performed once per worker: 1) Redirects stdout/stderr to a per-worker log file: {results_db_dir}/Worker-{device_id}.log. This avoids interleaved output across processes. 2) Constructs a :class:TuneWorker with the given recording and device id. 3) Links and caches the root IR module (root_ir) that will be cloned per experiment. 4) Opens GPU memory/prologue/epilogue resources via the executor context manager (with worker as Memory). 5) Signals readiness by setting state.

Message protocol: - {"payload": "terminate", ...}: Stop the worker loop and exit. - {"payload": "process", "exp_id": <id>, "data": <config-dict>}: Execute an experiment and respond on response_q with: {"exp_id": <id>, "payload": "result", "data": <result-dict>, "llvm_ir": ""}.

Parameters:

Name Type Description Default
request_q Queue

Queue from which the worker receives control messages and experiment requests.

required
response_q Queue

Queue to which the worker publishes experiment results.

required
record_db str

Path to the recorded execution database/file used to construct the executor.

required
record_id str

Identifier of the recorded kernel instance inside record_db.

required
device_id int

GPU device index to which this worker process is pinned.

required
iterations int

Number of kernel iterations to execute during the tracked run (the full execution may include additional runs for verification/warmup depending on the executor pipeline).

required
results_db_dir str

Directory where per-worker logs and output artifacts are written.

required
state Event

Event used to signal to the parent process that initialization is complete and the worker is ready to accept requests.

required
Notes
  • The worker loop blocks on request_q.get() until a message arrives.
  • The worker clones root_ir per request to avoid cross-request IR mutation.
  • Exceptions raised inside the loop will currently propagate and terminate the worker process; higher-level infrastructure should treat this as a worker crash.
Source code in python/mneme/replay_executor.py
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
@staticmethod
def run(
    request_q: Queue,
    response_q: Queue,
    record_db: str,
    record_id: str,
    device_id: int,
    iterations: int,
    results_db_dir: str,
    state: Event,
):
    """
    Worker process entry point: initialize resources and serve requests from a queue.

    This method is designed to be used as the target function for a worker
    ``multiprocessing.Process``. It performs one-time initialization and then
    enters a blocking loop that processes messages from ``request_q``.

    Initialization performed once per worker:
      1) Redirects stdout/stderr to a per-worker log file:
         ``{results_db_dir}/Worker-{device_id}.log``. This avoids interleaved
         output across processes.
      2) Constructs a :class:`TuneWorker` with the given recording and device id.
      3) Links and caches the root IR module (``root_ir``) that will be cloned
         per experiment.
      4) Opens GPU memory/prologue/epilogue resources via the executor context
         manager (``with worker as Memory``).
      5) Signals readiness by setting ``state``.

    Message protocol:
      - ``{"payload": "terminate", ...}``:
        Stop the worker loop and exit.
      - ``{"payload": "process", "exp_id": <id>, "data": <config-dict>}``:
        Execute an experiment and respond on ``response_q`` with:
        ``{"exp_id": <id>, "payload": "result", "data": <result-dict>, "llvm_ir": ""}``.

    Parameters
    ----------
    request_q : multiprocessing.Queue
        Queue from which the worker receives control messages and experiment requests.
    response_q : multiprocessing.Queue
        Queue to which the worker publishes experiment results.
    record_db : str
        Path to the recorded execution database/file used to construct the executor.
    record_id : str
        Identifier of the recorded kernel instance inside ``record_db``.
    device_id : int
        GPU device index to which this worker process is pinned.
    iterations : int
        Number of kernel iterations to execute during the tracked run (the full
        execution may include additional runs for verification/warmup depending
        on the executor pipeline).
    results_db_dir : str
        Directory where per-worker logs and output artifacts are written.
    state : multiprocessing.Event
        Event used to signal to the parent process that initialization is complete
        and the worker is ready to accept requests.

    Notes
    -----
    * The worker loop blocks on ``request_q.get()`` until a message arrives.
    * The worker clones ``root_ir`` per request to avoid cross-request IR mutation.
    * Exceptions raised inside the loop will currently propagate and terminate the
      worker process; higher-level infrastructure should treat this as a worker crash.

    """
    # NOTE: We open a file for every individual executor and give persmisions, then we redirect stdout/stderr
    # to that file. We do this to not conflict our messages

    fd_out = os.open(
        f"{results_db_dir}/Worker-{device_id}.log",
        os.O_WRONLY | os.O_CREAT | os.O_APPEND,
    )
    os.dup2(fd_out, 1)  # 1 = stdout
    os.dup2(fd_out, 2)  # 2 = stderr
    worker = TuneWorker(
        record_db=record_db,
        record_id=record_id,
        device_id=device_id,
        iterations=iterations,
    )
    # Open GPU memory, setup prologue epilogue and create a single
    # LLVM IR file to start working on optimizations
    root_ir = worker.link_ir()

    with worker as Memory:
        state.set()
        logger.debug(f"Worker running on {worker.device_id} starts busy loop")
        while True:
            msg = request_q.get()
            if msg["payload"] == "terminate":
                logger.debug(
                    f"Worker {worker.device_id} received terminate request, exiting ..."
                )
                break
            elif msg["payload"] == "process":
                logger.debug(
                    f"Worker {worker.device_id} received processing request {msg['exp_id']}"
                )
                exp, ir = worker.process_payload(
                    root_ir.clone(), ExperimentConfiguration.from_dict(msg["data"])
                )
                # final = resdb.save_ir(ir, exp.hash())
                logger.debug(
                    f"Worker {worker.device_id} finalized processing request {msg['exp_id']}"
                )

                response_q.put(
                    {
                        "exp_id": msg["exp_id"],
                        "payload": "result",
                        "data": exp.to_dict(),
                        "llvm_ir": "",
                    }
                )
            else:
                logger.warning(f"Received unknown message {msg}")

    return

AsyncReplayExecutor

Asynchronous record/replay executor backed by a pool of worker processes.

AsyncReplayExecutor provides a lightweight interface to evaluate :class:ExperimentConfiguration objects using one or more worker processes. Internally it manages:

  • A global thread queue of pending :class:EvalFuture jobs.
  • A set of :class:TuneWorkerHandle instances (one per worker process).
  • A monotonic job id generator for mapping submissions to results.

Users may submit jobs asynchronously via :meth:submit, or synchronously evaluate a configuration via :meth:evaluate (submit + wait).

Notes
  • Each worker handle can execute at most one in-flight job at a time.
  • The executor is intended for repeated evaluations; startup/teardown overhead may dominate for microbenchmarks.
Source code in python/mneme/async_executor.py
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
class AsyncReplayExecutor:
    """
    Asynchronous record/replay executor backed by a pool of worker processes.

    ``AsyncReplayExecutor`` provides a lightweight interface to evaluate
    :class:`ExperimentConfiguration` objects using one or more worker processes.
    Internally it manages:

      - A global thread queue of pending :class:`EvalFuture` jobs.
      - A set of :class:`TuneWorkerHandle` instances (one per worker process).
      - A monotonic job id generator for mapping submissions to results.

    Users may submit jobs asynchronously via :meth:`submit`, or synchronously
    evaluate a configuration via :meth:`evaluate` (submit + wait).

    Notes
    -----
    * Each worker handle can execute at most one in-flight job at a time.
    * The executor is intended for repeated evaluations; startup/teardown overhead
      may dominate for microbenchmarks.
    """

    def __init__(
        self,
        record_db: str,
        record_id: str,
        iterations: int,
        results_db_dir: str,
        num_workers: int,
    ):
        """
        Construct an asynchronous executor with a fixed-size worker pool.

        Parameters
        ----------
        record_db : str
            Path to the recorded execution database/file.
        record_id : str
            Identifier of the recorded kernel instance inside ``record_db``.
        iterations : int
            Number of kernel iterations performed by each worker per tracked run.
        results_db_dir : str
            Directory where workers write logs and optional output artifacts.
        num_workers : int
            Number of worker processes to launch.
        """
        self.global_q = ThreadQueue()
        self._futures: Dict[int, EvalFuture] = {}
        self._next_id = 0
        self._lock = threading.Lock()
        self.iterations = iterations

        self.workers = [
            TuneWorkerHandle(
                i,
                self.global_q,
                record_db,
                record_id,
                i,
                iterations,
                results_db_dir,
            )
            for i in range(num_workers)
        ]

    # ------------------------------------------------------------------
    # Submit new job (non-blocking)
    # ------------------------------------------------------------------
    def submit(self, config: ExperimentConfiguration) -> EvalFuture:
        """
        Submit a new experiment configuration for asynchronous evaluation.

        The configuration is wrapped in an :class:`EvalFuture` and enqueued for
        execution by the first available worker handle.

        Parameters
        ----------
        config : ExperimentConfiguration
            Experiment configuration to evaluate.

        Returns
        -------
        EvalFuture
            A future that will be resolved with an :class:`ExperimentResult` once
            the worker completes the experiment (or marked as failed on crash).
        """
        with self._lock:
            job_id = self._next_id
            self._next_id += 1

        logger.debug(f"[{self.__class__.__name__}] Submitting job {job_id}")
        future = EvalFuture(job_id, config)
        self._futures[job_id] = future
        self.global_q.put(future)
        return future

    def shutdown(self):
        """
        Gracefully shutdown all workers and their monitoring threads.

        This method requests each :class:`TuneWorkerHandle` to stop, which causes:
          - the worker loop to receive a terminate message,
          - the worker process to exit,
          - the monitor thread to join.

        Notes
        -----
        * After shutdown, submitting additional jobs is undefined behavior.
        """
        logger.debug(f"[{self.__class__.__name__}] Starting shutdown process")
        for w in self.workers:
            w.join()
        logger.debug(f"[{self.__class__.__name__}] Done Shutdown")

    def evaluate(self, config: ExperimentConfiguration) -> ExperimentResult:
        """
        Synchronously evaluate one configuration through the worker pool.

        This convenience method submits a configuration and blocks until the
        corresponding :class:`EvalFuture` completes.

        Parameters
        ----------
        config : ExperimentConfiguration
            Experiment configuration to evaluate.

        Returns
        -------
        ExperimentResult
            Result object containing verification status, execution time samples,
            and optional compilation/resource metrics, depending on worker settings.
        """
        future = self.submit(config)

        return future.result()

__init__(record_db, record_id, iterations, results_db_dir, num_workers)

Construct an asynchronous executor with a fixed-size worker pool.

Parameters:

Name Type Description Default
record_db str

Path to the recorded execution database/file.

required
record_id str

Identifier of the recorded kernel instance inside record_db.

required
iterations int

Number of kernel iterations performed by each worker per tracked run.

required
results_db_dir str

Directory where workers write logs and optional output artifacts.

required
num_workers int

Number of worker processes to launch.

required
Source code in python/mneme/async_executor.py
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
def __init__(
    self,
    record_db: str,
    record_id: str,
    iterations: int,
    results_db_dir: str,
    num_workers: int,
):
    """
    Construct an asynchronous executor with a fixed-size worker pool.

    Parameters
    ----------
    record_db : str
        Path to the recorded execution database/file.
    record_id : str
        Identifier of the recorded kernel instance inside ``record_db``.
    iterations : int
        Number of kernel iterations performed by each worker per tracked run.
    results_db_dir : str
        Directory where workers write logs and optional output artifacts.
    num_workers : int
        Number of worker processes to launch.
    """
    self.global_q = ThreadQueue()
    self._futures: Dict[int, EvalFuture] = {}
    self._next_id = 0
    self._lock = threading.Lock()
    self.iterations = iterations

    self.workers = [
        TuneWorkerHandle(
            i,
            self.global_q,
            record_db,
            record_id,
            i,
            iterations,
            results_db_dir,
        )
        for i in range(num_workers)
    ]

evaluate(config)

Synchronously evaluate one configuration through the worker pool.

This convenience method submits a configuration and blocks until the corresponding :class:EvalFuture completes.

Parameters:

Name Type Description Default
config ExperimentConfiguration

Experiment configuration to evaluate.

required

Returns:

Type Description
ExperimentResult

Result object containing verification status, execution time samples, and optional compilation/resource metrics, depending on worker settings.

Source code in python/mneme/async_executor.py
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
def evaluate(self, config: ExperimentConfiguration) -> ExperimentResult:
    """
    Synchronously evaluate one configuration through the worker pool.

    This convenience method submits a configuration and blocks until the
    corresponding :class:`EvalFuture` completes.

    Parameters
    ----------
    config : ExperimentConfiguration
        Experiment configuration to evaluate.

    Returns
    -------
    ExperimentResult
        Result object containing verification status, execution time samples,
        and optional compilation/resource metrics, depending on worker settings.
    """
    future = self.submit(config)

    return future.result()

shutdown()

Gracefully shutdown all workers and their monitoring threads.

This method requests each :class:TuneWorkerHandle to stop, which causes: - the worker loop to receive a terminate message, - the worker process to exit, - the monitor thread to join.

Notes
  • After shutdown, submitting additional jobs is undefined behavior.
Source code in python/mneme/async_executor.py
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
def shutdown(self):
    """
    Gracefully shutdown all workers and their monitoring threads.

    This method requests each :class:`TuneWorkerHandle` to stop, which causes:
      - the worker loop to receive a terminate message,
      - the worker process to exit,
      - the monitor thread to join.

    Notes
    -----
    * After shutdown, submitting additional jobs is undefined behavior.
    """
    logger.debug(f"[{self.__class__.__name__}] Starting shutdown process")
    for w in self.workers:
        w.join()
    logger.debug(f"[{self.__class__.__name__}] Done Shutdown")

submit(config)

Submit a new experiment configuration for asynchronous evaluation.

The configuration is wrapped in an :class:EvalFuture and enqueued for execution by the first available worker handle.

Parameters:

Name Type Description Default
config ExperimentConfiguration

Experiment configuration to evaluate.

required

Returns:

Type Description
EvalFuture

A future that will be resolved with an :class:ExperimentResult once the worker completes the experiment (or marked as failed on crash).

Source code in python/mneme/async_executor.py
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
def submit(self, config: ExperimentConfiguration) -> EvalFuture:
    """
    Submit a new experiment configuration for asynchronous evaluation.

    The configuration is wrapped in an :class:`EvalFuture` and enqueued for
    execution by the first available worker handle.

    Parameters
    ----------
    config : ExperimentConfiguration
        Experiment configuration to evaluate.

    Returns
    -------
    EvalFuture
        A future that will be resolved with an :class:`ExperimentResult` once
        the worker completes the experiment (or marked as failed on crash).
    """
    with self._lock:
        job_id = self._next_id
        self._next_id += 1

    logger.debug(f"[{self.__class__.__name__}] Submitting job {job_id}")
    future = EvalFuture(job_id, config)
    self._futures[job_id] = future
    self.global_q.put(future)
    return future

TuneWorkerHandle

Thread-side controller for one worker process executing tuning experiments.

TuneWorkerHandle owns: - A single worker :class:multiprocessing.Process running :meth:TuneWorker.run. - A pair of IPC queues for requests/responses. - A monitoring thread that drives a small state machine for submitting jobs and receiving results. - Crash detection and automatic worker respawn.

The handle consumes :class:EvalFuture objects from a shared thread queue (global_q), forwards their configurations to the worker process, and resolves each future when the corresponding result arrives.

Notes
  • Each handle pins its worker process to a specific device id (GPU affinity is handled inside :class:BaseExecutor / :class:TuneWorker).
  • Crash recovery is best-effort: if the worker dies while running an experiment, the active future is marked as failed and the worker is restarted.
Source code in python/mneme/async_executor.py
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
class TuneWorkerHandle:
    """
    Thread-side controller for one worker process executing tuning experiments.

    ``TuneWorkerHandle`` owns:
      - A single worker :class:`multiprocessing.Process` running :meth:`TuneWorker.run`.
      - A pair of IPC queues for requests/responses.
      - A monitoring thread that drives a small state machine for submitting jobs
        and receiving results.
      - Crash detection and automatic worker respawn.

    The handle consumes :class:`EvalFuture` objects from a shared thread queue
    (``global_q``), forwards their configurations to the worker process, and
    resolves each future when the corresponding result arrives.

    Notes
    -----
    * Each handle pins its worker process to a specific device id (GPU affinity is
      handled inside :class:`BaseExecutor` / :class:`TuneWorker`).
    * Crash recovery is best-effort: if the worker dies while running an experiment,
      the active future is marked as failed and the worker is restarted.
    """

    class StateMachine(IntEnum):
        """
        Internal action state for the monitor loop.

        SUBMIT
            Attempt to dequeue a new job from the global queue and send it to the worker.
        RECEIVE
            Poll for a worker response and resolve the currently active future.
        """

        SUBMIT = 1
        RECEIVE = 2

    def __init__(
        self,
        idx,
        global_q: ThreadQueue,
        record_db: str,
        record_id: str,
        device_id: int,
        iterations: int,
        results_db_dir: str,
    ):
        """
        Construct a worker handle and start the worker process + monitor thread.

        Parameters
        ----------
        idx : int
            Logical worker index (primarily used for logging/debugging).
        global_q : queue.Queue
            Shared thread queue containing :class:`EvalFuture` objects to be executed
            by this handle’s worker process.
        record_db : str
            Path to the recorded execution database/file.
        record_id : str
            Identifier of the recorded kernel instance inside ``record_db``.
        device_id : int
            Device id (GPU index) assigned to the underlying worker process.
        iterations : int
            Number of kernel iterations used by the worker for the tracked execution.
        results_db_dir : str
            Directory where the worker writes logs and optional artifacts.

        Notes
        -----
        * The worker process is spawned immediately during initialization.
        * A background thread is started to monitor the worker process and drive job
          submission/result collection.
        """
        self.idx = idx
        self.global_q = global_q

        self._ipc_write_q = None
        self._ipc_read_q = None

        self._shutdown_event = ThreadEvent()
        self._action = self.StateMachine.SUBMIT

        self.record_db = record_db
        self.record_id = record_id
        self.device_id = device_id
        self.iterations = iterations
        self.results_db_dir = results_db_dir

        self._state = None  # ProcessEvent
        self._process = None  # Process
        self.current = None  # EvalFuture
        logger.debug(f"[TuneWorkerHandle] Starting processes")
        self._spawn_process()

        logger.debug(
            f"[TuneWorkerHandle] Starting Thread and bind it to monitor process {self._process.pid}"
        )
        self._monitor_thread = threading.Thread(target=self._shadow_process_loop)
        self._monitor_thread.start()
        logger.debug(
            f"[TuneWorkerHandle] Done Launching TunerWorkerHandles Thread and Processing Infrastructure"
        )

    def _spawn_process(self):
        """
        Spawn (or respawn) the underlying worker process and IPC infrastructure.

        This method creates fresh IPC queues, a new readiness event, and launches the
        worker process using :meth:`TuneWorker.run`. The internal action state is
        reset to ``SUBMIT``.

        Notes
        -----
        * This is used both at startup and during crash recovery.
        * Any in-flight job must be handled by the caller before respawning.
        """
        self._state = ProcessEvent()
        self._ipc_write_q = ProcessQueue()
        self._ipc_read_q = ProcessQueue()

        self._process = Process(
            target=TuneWorker.run,
            args=(
                self._ipc_write_q,
                self._ipc_read_q,
                self.record_db,
                self.record_id,
                self.device_id,
                self.iterations,
                self.results_db_dir,
                self._state,
            ),
            daemon=False,
        )
        self._process.start()
        self._action = self.StateMachine.SUBMIT

    # ------------------------------------------------------------
    # Result handling
    # ------------------------------------------------------------
    def _process_result(self, msg):
        """
        Resolve the current in-flight future using a response message.

        Parameters
        ----------
        msg : dict
            Response message produced by :meth:`TuneWorker.run`. Expected fields:
            * ``exp_id`` – experiment id matching the active future
            * ``data`` – serialized :class:`ExperimentResult` dict

        Raises
        ------
        RuntimeError
            If a result is received for an unexpected experiment id.
        """
        if self.current is None:
            return

        if self.current.job_id != msg["exp_id"]:
            raise RuntimeError(
                f"Worker {self.idx} received result for unexpected job "
                f"{msg['exp_id']} vs {self.current.job_id}"
            )
        logger.debug(
            f"[{self.__class__.__name__}-{self.device_id}] finished experiment {self.current.job_id}"
        )
        self.current.set_result(ExperimentResult.from_dict(msg["data"]))
        self.current = None

    def _try_receive(self):
        """
        Poll the worker response queue and process one available result.

        If no message is available within the polling timeout, this method returns
        without modifying state. On a successful receive, the internal action state
        transitions back to ``SUBMIT``.
        """
        msg = pop(self._ipc_read_q, timeout=1)
        if msg is None:
            return

        self._process_result(msg)
        self._action = self.StateMachine.SUBMIT

    # ------------------------------------------------------------
    # Job submission
    # ------------------------------------------------------------
    def _submit(self):
        """
        Submit one job to the worker process if one is available.

        This method dequeues a single :class:`EvalFuture` from the shared global
        queue, sends its configuration to the worker process, and marks it as the
        current in-flight job. The internal action state transitions to ``RECEIVE``.

        Notes
        -----
        * At most one job is in-flight per worker handle at any time.
        """
        future: EvalFuture = pop(self.global_q, timeout=1)
        if future is None:
            return

        self.current = future
        msg = {
            "payload": "process",
            "data": future.config.to_dict(),
            "exp_id": future.job_id,
        }

        self._ipc_write_q.put(msg)
        self._action = self.StateMachine.RECEIVE

    # ------------------------------------------------------------
    # Main loop
    # ------------------------------------------------------------
    def _shadow_process_loop(self):
        """
        Background monitor loop that drives the worker state machine.

        This loop runs in a dedicated thread and performs:

          1) **Crash detection and recovery**
             If the worker process dies, the current in-flight job (if any) is
             marked as failed and the worker is respawned.

          2) **Worker readiness waiting**
             Before sending work, the loop waits for the process-side readiness
             event to be set.

          3) **Submit/Receive alternation**
             A simple state machine ensures that only one job is in-flight:
             ``SUBMIT`` sends a job, then ``RECEIVE`` polls for the result.

          4) **Graceful shutdown**
             When the shutdown event is set, the loop sends a ``terminate`` message
             to the worker, drains results while the process is alive, and joins
             the process.

        Notes
        -----
        * The loop uses polling timeouts to remain responsive to shutdown.
        * The readiness event is a simple synchronization primitive; future versions
          may use condition variables or a richer handshake protocol.
        """
        while not self._shutdown_event.is_set():
            if not self._process.is_alive():
                # Crash recovery
                if self.current is not None:
                    # TODO: At some point we need to have more descrptive messages on the crash, it is not always a seg fault, somethimes it is LLVM related and we should know.
                    self.current.set_error(
                        f"Worker crashed (exit code: {self._process.exitcode}) running on device {self.device_id}"
                    )
                    self.current = None
                    _ = pop(self._ipc_read_q, timeout=0)

                self._spawn_process()
                continue

            # Wait for worker to initialize
            if not self._state.is_set():
                # TODO: We should use mp.conditional variables instead of the state.
                # That should simplify the logic.
                time.sleep(0.5)
                continue

            if self._action == self.StateMachine.SUBMIT:
                self._submit()
            else:
                self._try_receive()

        # Shutdown
        if self._process.is_alive():
            self._ipc_write_q.put({"payload": "terminate"})
            while self._process.is_alive():
                self._try_receive()
            self._process.join()

    def join(self):
        """
        Request shutdown of the monitor thread and wait for completion.

        This method signals the monitor loop to terminate, which triggers graceful
        worker shutdown and process join. It then joins the monitor thread.
        """
        self._shutdown_event.set()
        self._monitor_thread.join()

StateMachine

Bases: IntEnum

Internal action state for the monitor loop.

SUBMIT Attempt to dequeue a new job from the global queue and send it to the worker. RECEIVE Poll for a worker response and resolve the currently active future.

Source code in python/mneme/async_executor.py
68
69
70
71
72
73
74
75
76
77
78
79
class StateMachine(IntEnum):
    """
    Internal action state for the monitor loop.

    SUBMIT
        Attempt to dequeue a new job from the global queue and send it to the worker.
    RECEIVE
        Poll for a worker response and resolve the currently active future.
    """

    SUBMIT = 1
    RECEIVE = 2

__init__(idx, global_q, record_db, record_id, device_id, iterations, results_db_dir)

Construct a worker handle and start the worker process + monitor thread.

Parameters:

Name Type Description Default
idx int

Logical worker index (primarily used for logging/debugging).

required
global_q Queue

Shared thread queue containing :class:EvalFuture objects to be executed by this handle’s worker process.

required
record_db str

Path to the recorded execution database/file.

required
record_id str

Identifier of the recorded kernel instance inside record_db.

required
device_id int

Device id (GPU index) assigned to the underlying worker process.

required
iterations int

Number of kernel iterations used by the worker for the tracked execution.

required
results_db_dir str

Directory where the worker writes logs and optional artifacts.

required
Notes
  • The worker process is spawned immediately during initialization.
  • A background thread is started to monitor the worker process and drive job submission/result collection.
Source code in python/mneme/async_executor.py
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
def __init__(
    self,
    idx,
    global_q: ThreadQueue,
    record_db: str,
    record_id: str,
    device_id: int,
    iterations: int,
    results_db_dir: str,
):
    """
    Construct a worker handle and start the worker process + monitor thread.

    Parameters
    ----------
    idx : int
        Logical worker index (primarily used for logging/debugging).
    global_q : queue.Queue
        Shared thread queue containing :class:`EvalFuture` objects to be executed
        by this handle’s worker process.
    record_db : str
        Path to the recorded execution database/file.
    record_id : str
        Identifier of the recorded kernel instance inside ``record_db``.
    device_id : int
        Device id (GPU index) assigned to the underlying worker process.
    iterations : int
        Number of kernel iterations used by the worker for the tracked execution.
    results_db_dir : str
        Directory where the worker writes logs and optional artifacts.

    Notes
    -----
    * The worker process is spawned immediately during initialization.
    * A background thread is started to monitor the worker process and drive job
      submission/result collection.
    """
    self.idx = idx
    self.global_q = global_q

    self._ipc_write_q = None
    self._ipc_read_q = None

    self._shutdown_event = ThreadEvent()
    self._action = self.StateMachine.SUBMIT

    self.record_db = record_db
    self.record_id = record_id
    self.device_id = device_id
    self.iterations = iterations
    self.results_db_dir = results_db_dir

    self._state = None  # ProcessEvent
    self._process = None  # Process
    self.current = None  # EvalFuture
    logger.debug(f"[TuneWorkerHandle] Starting processes")
    self._spawn_process()

    logger.debug(
        f"[TuneWorkerHandle] Starting Thread and bind it to monitor process {self._process.pid}"
    )
    self._monitor_thread = threading.Thread(target=self._shadow_process_loop)
    self._monitor_thread.start()
    logger.debug(
        f"[TuneWorkerHandle] Done Launching TunerWorkerHandles Thread and Processing Infrastructure"
    )

join()

Request shutdown of the monitor thread and wait for completion.

This method signals the monitor loop to terminate, which triggers graceful worker shutdown and process join. It then joins the monitor thread.

Source code in python/mneme/async_executor.py
324
325
326
327
328
329
330
331
332
def join(self):
    """
    Request shutdown of the monitor thread and wait for completion.

    This method signals the monitor loop to terminate, which triggers graceful
    worker shutdown and process join. It then joins the monitor thread.
    """
    self._shutdown_event.set()
    self._monitor_thread.join()

pop(q, timeout)

Pop an item from a queue with a timeout.

This helper provides a uniform interface for both thread-based queues (:class:queue.Queue) and multiprocessing queues (:class:multiprocessing.Queue). If the queue is empty at the end of the timeout, None is returned.

Parameters:

Name Type Description Default
q

Queue-like object providing a get(timeout=...) API.

required
timeout float

Timeout in seconds for the blocking get call.

required

Returns:

Type Description
Any or None

The retrieved item, or None if the queue was empty.

Source code in python/mneme/async_executor.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
def pop(q, timeout):
    """
    Pop an item from a queue with a timeout.

    This helper provides a uniform interface for both thread-based queues
    (:class:`queue.Queue`) and multiprocessing queues
    (:class:`multiprocessing.Queue`). If the queue is empty at the end of the
    timeout, ``None`` is returned.

    Parameters
    ----------
    q
        Queue-like object providing a ``get(timeout=...)`` API.
    timeout : float
        Timeout in seconds for the blocking ``get`` call.

    Returns
    -------
    Any or None
        The retrieved item, or ``None`` if the queue was empty.
    """
    try:
        return q.get(timeout=timeout)
    except queue.Empty:
        return None

ExperimentConfiguration dataclass

Configuration for a single Mneme record/replay experiment.

This object captures all knobs that control kernel launch configuration, specialization strategy, and code generation behavior. It is intended to be hashable (via :meth:hash) so the same configuration can be given a stable, persistent identifier across runs.

Attributes:

Name Type Description
grid dim3

Grid dimensions (x, y, z) of the kernel launch.

block dim3

Block dimensions (x, y, z) of the kernel launch.

shared_mem int

Amount of dynamic shared memory to allocate for the launch.

specialize bool

Whether to enable specialization based on the recorded execution (e.g., specializing on input sizes or recorded parameters).

set_launch_bounds bool

Whether to explicitly set CUDA launch bounds for the generated kernel.

max_threads int

Maximum number of threads per block to assume when setting launch bounds or during specialization.

min_blocks_per_sm int

Minimum number of resident blocks per SM when computing launch bounds.

specialize_dims bool

Whether to specialize based on the recorded grid/block dimensions.

passes str

Optimization pass pipeline specification, e.g. "default<O3>".

codegen_opt int

Code generation optimization level (e.g., 0–3).

codegen_method str

Code generation strategy, e.g. "serial" or other proteus backend-specific modes. Currently only serial is supported from Mneme

prune bool

Whether to enable IR pruning / dead-code elimination in the generated kernel. This is mandatory always true. We will explore later the impact of it.

internalize bool

Whether to internalize symbols (e.g., limit symbol visibility) during code generation. This is mandatory always true. We will explore later the impact of it.

Source code in python/mneme/mneme_types.py
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
@dataclass
class ExperimentConfiguration:
    """
    Configuration for a single Mneme record/replay experiment.

    This object captures all knobs that control kernel launch configuration,
    specialization strategy, and code generation behavior. It is intended to be
    hashable (via :meth:`hash`) so the same configuration can be given a stable,
    persistent identifier across runs.

    Attributes
    ----------
    grid : dim3
        Grid dimensions (x, y, z) of the kernel launch.
    block : dim3
        Block dimensions (x, y, z) of the kernel launch.
    shared_mem : int
        Amount of dynamic shared memory to allocate for the launch.
    specialize : bool
        Whether to enable specialization based on the recorded execution
        (e.g., specializing on input sizes or recorded parameters).
    set_launch_bounds : bool
        Whether to explicitly set CUDA launch bounds for the generated kernel.
    max_threads : int
        Maximum number of threads per block to assume when setting launch bounds
        or during specialization.
    min_blocks_per_sm : int
        Minimum number of resident blocks per SM when computing launch bounds.
    specialize_dims : bool
        Whether to specialize based on the recorded grid/block dimensions.
    passes : str
        Optimization pass pipeline specification, e.g. ``"default<O3>"``.
    codegen_opt : int
        Code generation optimization level (e.g., 0–3).
    codegen_method : str
        Code generation strategy, e.g. ``"serial"`` or other ``proteus`` backend-specific
        modes. Currently only serial is supported from Mneme
    prune : bool
        Whether to enable IR pruning / dead-code elimination in the generated
        kernel. This is mandatory always true. We will explore later the impact of it.
    internalize : bool
        Whether to internalize symbols (e.g., limit symbol visibility) during
        code generation. This is mandatory always true. We will explore later the impact of it.
    """

    grid: dim3 = field(default_factory=dim3)
    block: dim3 = field(default_factory=dim3)
    shared_mem: int = 0
    specialize: bool = False
    set_launch_bounds: bool = False
    max_threads: int = 1024
    min_blocks_per_sm: int = 1
    specialize_dims: bool = False
    passes: str = "default<O3>"
    codegen_opt: int = 3
    codegen_method: str = "serial"
    prune: bool = True
    internalize: bool = True

    @classmethod
    def from_dict(cls, data: Dict[str, Any]) -> "ExperimentConfiguration":
        """
        Construct a configuration from a plain dictionary.

        The dictionary is expected to contain JSON-/YAML-serializable
        representations, with ``"grid"`` and ``"block"`` encoded as dictionaries
        compatible with :meth:`dim3.from_dict`.

        Parameters
        ----------
        data : dict
            Dictionary containing configuration fields.

        Returns
        -------
        ExperimentConfiguration
            A new configuration instance initialized from ``data``.
        """
        data = dict(data)

        grid = data.pop("grid")
        block = data.pop("block")

        if isinstance(grid, dim3):
            grid_obj = grid
        else:
            grid_obj = dim3.from_dict(grid)

        if isinstance(block, dim3):
            block_obj = block
        else:
            block_obj = dim3.from_dict(block)

        return cls(grid=grid_obj, block=block_obj, **data)

    def to_dict(self) -> Dict[str, Any]:
        """
        Convert the configuration to a plain dictionary.

        Returns
        -------
        dict
            A JSON-/YAML-serializable dictionary representation of the
            configuration, suitable for persistence or hashing.
        """

        return _to_serializable(self)

    def is_valid(self):
        """
        Checks if the configuration follows device constraints

        Returns
        -------
        bool
            A boolean value indicating whether this is a valid configuration
        """
        if self.set_launch_bounds and (
            self.max_threads < (self.block.x * self.block.y * self.block.z)
        ):
            return False
        return True

    def ground(self):
        """
        Values that are 'unused' are set on default values. This can help when hashing configs

        Returns
        -------
            None
                This function does not return anyhing, but modifies the contents of the class in place
        """

        if not self.set_launch_bounds:
            self.max_threads = 0
            self.min_blocks_per_sm = 0

    def hash(self) -> str:
        """
        Compute a stable SHA-256 hash of the full configuration.

        The hash is computed from a normalized, JSON-serializable view of the
        configuration so that identical configurations produce the same digest
        across processes and runs.

        Returns
        -------
        str
            Hex-encoded SHA-256 digest of the configuration.
        """
        serializable = _to_serializable(self)
        # sort_keys + compact separators => deterministic string
        payload = json.dumps(serializable, sort_keys=True, separators=(",", ":"))
        return hashlib.sha256(payload.encode("utf-8")).hexdigest()

from_dict(data) classmethod

Construct a configuration from a plain dictionary.

The dictionary is expected to contain JSON-/YAML-serializable representations, with "grid" and "block" encoded as dictionaries compatible with :meth:dim3.from_dict.

Parameters:

Name Type Description Default
data dict

Dictionary containing configuration fields.

required

Returns:

Type Description
ExperimentConfiguration

A new configuration instance initialized from data.

Source code in python/mneme/mneme_types.py
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "ExperimentConfiguration":
    """
    Construct a configuration from a plain dictionary.

    The dictionary is expected to contain JSON-/YAML-serializable
    representations, with ``"grid"`` and ``"block"`` encoded as dictionaries
    compatible with :meth:`dim3.from_dict`.

    Parameters
    ----------
    data : dict
        Dictionary containing configuration fields.

    Returns
    -------
    ExperimentConfiguration
        A new configuration instance initialized from ``data``.
    """
    data = dict(data)

    grid = data.pop("grid")
    block = data.pop("block")

    if isinstance(grid, dim3):
        grid_obj = grid
    else:
        grid_obj = dim3.from_dict(grid)

    if isinstance(block, dim3):
        block_obj = block
    else:
        block_obj = dim3.from_dict(block)

    return cls(grid=grid_obj, block=block_obj, **data)

ground()

Values that are 'unused' are set on default values. This can help when hashing configs

Returns:

Type Description
None

This function does not return anyhing, but modifies the contents of the class in place

Source code in python/mneme/mneme_types.py
177
178
179
180
181
182
183
184
185
186
187
188
189
def ground(self):
    """
    Values that are 'unused' are set on default values. This can help when hashing configs

    Returns
    -------
        None
            This function does not return anyhing, but modifies the contents of the class in place
    """

    if not self.set_launch_bounds:
        self.max_threads = 0
        self.min_blocks_per_sm = 0

hash()

Compute a stable SHA-256 hash of the full configuration.

The hash is computed from a normalized, JSON-serializable view of the configuration so that identical configurations produce the same digest across processes and runs.

Returns:

Type Description
str

Hex-encoded SHA-256 digest of the configuration.

Source code in python/mneme/mneme_types.py
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
def hash(self) -> str:
    """
    Compute a stable SHA-256 hash of the full configuration.

    The hash is computed from a normalized, JSON-serializable view of the
    configuration so that identical configurations produce the same digest
    across processes and runs.

    Returns
    -------
    str
        Hex-encoded SHA-256 digest of the configuration.
    """
    serializable = _to_serializable(self)
    # sort_keys + compact separators => deterministic string
    payload = json.dumps(serializable, sort_keys=True, separators=(",", ":"))
    return hashlib.sha256(payload.encode("utf-8")).hexdigest()

is_valid()

Checks if the configuration follows device constraints

Returns:

Type Description
bool

A boolean value indicating whether this is a valid configuration

Source code in python/mneme/mneme_types.py
162
163
164
165
166
167
168
169
170
171
172
173
174
175
def is_valid(self):
    """
    Checks if the configuration follows device constraints

    Returns
    -------
    bool
        A boolean value indicating whether this is a valid configuration
    """
    if self.set_launch_bounds and (
        self.max_threads < (self.block.x * self.block.y * self.block.z)
    ):
        return False
    return True

to_dict()

Convert the configuration to a plain dictionary.

Returns:

Type Description
dict

A JSON-/YAML-serializable dictionary representation of the configuration, suitable for persistence or hashing.

Source code in python/mneme/mneme_types.py
149
150
151
152
153
154
155
156
157
158
159
160
def to_dict(self) -> Dict[str, Any]:
    """
    Convert the configuration to a plain dictionary.

    Returns
    -------
    dict
        A JSON-/YAML-serializable dictionary representation of the
        configuration, suitable for persistence or hashing.
    """

    return _to_serializable(self)

ExperimentResult dataclass

Result record for a single Mneme record/replay experiment.

This captures timing information, code size, resource usage, and basic execution outcome. It is designed to be easily serializable so that experiment runs can be logged and analyzed offline.

Attributes:

Name Type Description
preprocess_ir_time float

Time spent in applying proteus specific optimizations.

opt_time float

Time spent in the optimization phase of the experiment.

codegen_time float

Time spent in the code generation / compilation phase.

obj_size int

Size of the generated object or binary artifact.

exec_time list of int

Execution time measurements for the replayed kernel, one entry per run.

verified bool

Whether the experiment matched the results of the recorded execution.

executed bool

Whether the experiment was executed at least once (without a crash).

failed bool

Whether the experiment ultimately failed (e.g., compilation or runtime error).

start_time str

ISO 8601 timestamp for when the experiment started.

end_time str

ISO 8601 timestamp for when the experiment finished.

gpu_id int

Identifier of the GPU device on which the experiment ran.

const_mem_usage int

Amount of constant memory used by the generated kernel.

local_mem_usage int

Amount of local memory used by the generated kernel.

reg_usage int

Number of registers used per thread by the generated kernel.

error str

Error description, usually set by the TunerHandler on crash

Source code in python/mneme/mneme_types.py
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
@dataclass
class ExperimentResult:
    """
    Result record for a single Mneme record/replay experiment.

    This captures timing information, code size, resource usage, and basic
    execution outcome. It is designed to be easily serializable so that
    experiment runs can be logged and analyzed offline.

    Attributes
    ----------
    preprocess_ir_time : float
        Time spent in applying ``proteus`` specific optimizations.
    opt_time : float
        Time spent in the optimization phase of the experiment.
    codegen_time : float
        Time spent in the code generation / compilation phase.
    obj_size : int
        Size of the generated object or binary artifact.
    exec_time : list of int
        Execution time measurements for the replayed kernel, one entry per run.
    verified: bool
        Whether the experiment matched the results of the recorded execution.
    executed : bool
        Whether the experiment was executed at least once (without a crash).
    failed : bool
        Whether the experiment ultimately failed (e.g., compilation or runtime
        error).
    start_time : str
        ISO 8601 timestamp for when the experiment started.
    end_time : str
        ISO 8601 timestamp for when the experiment finished.
    gpu_id : int
        Identifier of the GPU device on which the experiment ran.
    const_mem_usage : int
        Amount of constant memory used by the generated kernel.
    local_mem_usage : int
        Amount of local memory used by the generated kernel.
    reg_usage : int
        Number of registers used per thread by the generated kernel.
    error : str
        Error description, usually set by the TunerHandler on crash
    """

    preprocess_ir_time: float = 0.0
    opt_time: float = 0.0
    codegen_time: float = 0.0
    obj_size: int = 0
    exec_time: List[int] = field(default_factory=list)
    verified: bool = False
    executed: bool = False
    failed: bool = False
    start_time: str = ""
    end_time: str = ""
    gpu_id: int = 0
    const_mem_usage: int = 0
    local_mem_usage: int = 0
    reg_usage: int = 0
    error: str = ""

    @classmethod
    def from_dict(cls, data: Dict[str, Any]) -> "ExperimentResult":
        """
        Construct an experiment result from a plain dictionary.

        Parameters
        ----------
        data : dict
            Dictionary containing result fields.

        Returns
        -------
        ExperimentResult
            A new result instance initialized from ``data``.
        """
        return cls(**data)

    def to_dict(self) -> Dict[str, Any]:
        """
        Convert the result record to a plain dictionary.

        Returns
        -------
        dict
            A JSON-/YAML-serializable dictionary representation of the result.
        """
        return asdict(self)

from_dict(data) classmethod

Construct an experiment result from a plain dictionary.

Parameters:

Name Type Description Default
data dict

Dictionary containing result fields.

required

Returns:

Type Description
ExperimentResult

A new result instance initialized from data.

Source code in python/mneme/mneme_types.py
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "ExperimentResult":
    """
    Construct an experiment result from a plain dictionary.

    Parameters
    ----------
    data : dict
        Dictionary containing result fields.

    Returns
    -------
    ExperimentResult
        A new result instance initialized from ``data``.
    """
    return cls(**data)

to_dict()

Convert the result record to a plain dictionary.

Returns:

Type Description
dict

A JSON-/YAML-serializable dictionary representation of the result.

Source code in python/mneme/mneme_types.py
287
288
289
290
291
292
293
294
295
296
def to_dict(self) -> Dict[str, Any]:
    """
    Convert the result record to a plain dictionary.

    Returns
    -------
    dict
        A JSON-/YAML-serializable dictionary representation of the result.
    """
    return asdict(self)

GPU profiling helpers (lazy-loaded native profiler binding).

This module provides a small Python wrapper around Mneme's native profiling library. The profiler is loaded lazily to avoid premature initialization side-effects in GPU runtimes (notably HSA), which may perform tool initialization during shared-library load time.

Why lazy-load? - Mneme spawns worker processes (fork). Some GPU profiling/tooling stacks initialize at import / dlopen time, which is unsafe or undesirable pre-fork. - By deferring the load until :func:init_profiler is called inside each worker, the profiling runtime is initialized in the correct process context.

Public API: - :func:init_profiler Initialize (load) the profiling library. - :func:gpu_profile_start Start profiling for a kernel name, returns correlation id. - :func:gpu_profile_stop Stop profiling and return recorded timestamps/records.

Notes: - This is an internal module; callers are expected to call :func:init_profiler once per process before using start/stop. - The native library and its ABI are considered the source of truth.

gpu_profile_start(kernel_name)

Start GPU profiling for a kernel.

Parameters:

Name Type Description Default
kernel_name str

Kernel name used as a label by the profiling backend.

required

Returns:

Type Description
int

Correlation identifier used to match start/stop calls.

Raises:

Type Description
RuntimeError

If the profiling library has not been initialized via :func:init_profiler.

Source code in python/mneme/profile.py
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
def gpu_profile_start(kernel_name: str):
    """
    Start GPU profiling for a kernel.

    Parameters
    ----------
    kernel_name : str
        Kernel name used as a label by the profiling backend.

    Returns
    -------
    int
        Correlation identifier used to match start/stop calls.

    Raises
    ------
    RuntimeError
        If the profiling library has not been initialized via :func:`init_profiler`.
    """
    if profile_lib is None:
        raise RuntimeError("Profile library is not initialized")
    return int(profile_lib.MnemePy_startProfile(_encode_string(kernel_name)))

gpu_profile_stop(correlation_id)

Stop GPU profiling and return recorded profiling values.

Parameters:

Name Type Description Default
correlation_id int

Correlation identifier returned by :func:gpu_profile_start.

required

Returns:

Type Description
list[int]

List of profiling records returned by the native backend (typically GPU timestamps or counter values, depending on the profiler implementation).

Raises:

Type Description
RuntimeError

If the profiling library has not been initialized via :func:init_profiler.

Source code in python/mneme/profile.py
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
def gpu_profile_stop(correlation_id: int):
    """
    Stop GPU profiling and return recorded profiling values.

    Parameters
    ----------
    correlation_id : int
        Correlation identifier returned by :func:`gpu_profile_start`.

    Returns
    -------
    list[int]
        List of profiling records returned by the native backend (typically GPU
        timestamps or counter values, depending on the profiler implementation).

    Raises
    ------
    RuntimeError
        If the profiling library has not been initialized via :func:`init_profiler`.
    """
    if profile_lib is None:
        raise RuntimeError("Profile library is not initialized")

    num_records = profile_lib.MnemePy_getNumRecords(correlation_id)

    if num_records <= 0 or num_records > 10_000_000:
        raise RuntimeError(f"Bad num_records={num_records} for token={correlation_id}")

    logger.debug(f"Profiler contains {num_records} records")
    arr = (c_int64 * num_records)()

    _ = profile_lib.MnemePy_stopProfile(correlation_id, arr, num_records)
    return [int(r) for r in arr]

init_profiler()

Initialize the Mneme profiling backend for the current process.

This should be called inside each worker process (post-fork) before any calls to :func:gpu_profile_start / :func:gpu_profile_stop.

Source code in python/mneme/profile.py
131
132
133
134
135
136
137
138
def init_profiler() -> None:
    """
    Initialize the Mneme profiling backend for the current process.

    This should be called inside each worker process (post-fork) before any calls
    to :func:`gpu_profile_start` / :func:`gpu_profile_stop`.
    """
    _init_profile()

proteus

Python FFI bindings for the Proteus JIT transformation and code-generation pipeline.

This module provides a thin, Pythonic wrapper around Proteus’ C++ JIT infrastructure, exposing functionality for:

  • Linking multiple LLVM IR modules into a single executable module
  • Pruning dead IR and internalizing symbols
  • Applying architecture-aware optimization pipelines
  • Specializing kernels based on runtime arguments and launch dimensions
  • Emitting device-specific executable objects (e.g., ELF / HSACO)

All operations are performed through a C FFI layer and operate directly on LLVM modules represented by :class:~mneme.llvm.module.ModuleRef. Most functions mutate the provided module in place and return either updated metadata (such as a specialization hash) or compiled device artifacts.

This module forms the core of Mneme’s record–replay and autotuning workflow, bridging recorded execution metadata with dynamic compilation and execution on accelerator devices.

codegen_object(mod, device_arch, codegen_type='serial', codegen_opt_level=3)

Generate a compiled device code object from an LLVM module.

Invokes the Proteus backend code generator for the given architecture and returns the produced binary wrapped in a :class:~mneme.llvm.buffer.MemBufferRef.

Parameters:

Name Type Description Default
mod ModuleRef

LLVM module to compile.

required
device_arch str

Target architecture string.

required
codegen_type str

Codegen mode (e.g., "serial"). Defaults to "serial".

'serial'
codegen_opt_level int

Backend optimization level in [1, 3]. Defaults to 3.

3

Returns:

Type Description
MemBufferRef

Memory buffer containing the produced code object.

Raises:

Type Description
TypeError

If mod is not a :class:~mneme.llvm.module.ModuleRef.

RuntimeError

If codegen_opt_level is not in [1, 3].

Source code in python/mneme/proteus/jit.py
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
def codegen_object(
    mod: ModuleRef, device_arch, codegen_type="serial", codegen_opt_level: int = 3
):
    """
    Generate a compiled device code object from an LLVM module.

    Invokes the Proteus backend code generator for the given architecture and
    returns the produced binary wrapped in a :class:`~mneme.llvm.buffer.MemBufferRef`.

    Parameters
    ----------
    mod : ModuleRef
        LLVM module to compile.
    device_arch : str
        Target architecture string.
    codegen_type : str, optional
        Codegen mode (e.g., ``"serial"``). Defaults to ``"serial"``.
    codegen_opt_level : int, optional
        Backend optimization level in ``[1, 3]``. Defaults to ``3``.

    Returns
    -------
    MemBufferRef
        Memory buffer containing the produced code object.

    Raises
    ------
    TypeError
        If ``mod`` is not a :class:`~mneme.llvm.module.ModuleRef`.
    RuntimeError
        If ``codegen_opt_level`` is not in ``[1, 3]``.
    """
    if not isinstance(mod, ModuleRef):
        raise TypeError(f"Expecting type of ModuleRef instead got {type(mod)}")

    if codegen_opt_level < 1 or codegen_opt_level > 3:
        raise RuntimeError(
            f"codegen optimization level must be in range (0,3], instead it was {codegen_opt_level}"
        )
    result = MemBufferRef(
        ffi.lib.ProteusPY_codeGenObject(
            mod,
            _encode_string(device_arch),
            _encode_string(codegen_type),
            codegen_opt_level,
        )
    )
    return result

internalize(mod, kernel_name)

Mark all symbols except the given kernel as internal.

This applies Proteus' internalization pass, restricting symbol visibility to reduce linking overhead and enable more aggressive optimization.

Parameters:

Name Type Description Default
mod ModuleRef

LLVM module to update (mutated in-place).

required
kernel_name str

Name of the kernel whose symbol must remain externally visible.

required

Raises:

Type Description
TypeError

If mod is not a :class:~mneme.llvm.module.ModuleRef.

Source code in python/mneme/proteus/jit.py
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
def internalize(mod: ModuleRef, kernel_name: str):
    """
    Mark all symbols except the given kernel as internal.

    This applies Proteus' internalization pass, restricting symbol visibility to
    reduce linking overhead and enable more aggressive optimization.

    Parameters
    ----------
    mod : ModuleRef
        LLVM module to update (mutated in-place).
    kernel_name : str
        Name of the kernel whose symbol must remain externally visible.

    Raises
    ------
    TypeError
        If ``mod`` is not a :class:`~mneme.llvm.module.ModuleRef`.
    """
    if not isinstance(mod, ModuleRef):
        raise TypeError(f"Expecting type of ModuleRef instead got {type(mod)}")

    ffi.lib.ProteusPY_internalize(mod, _encode_string(kernel_name))

Link multiple LLVM IR modules into a single unified module.

This constructs a new module by invoking Proteus' linker. Optionally performs pruning and internalization during the link stage.

Parameters:

Name Type Description Default
modules list[str]

Filesystem paths to LLVM IR modules to link.

required
kernel_name str

Name of the kernel entry function to preserve.

required
prune bool

Whether to prune dead IR after linking.

required
internalize bool

Whether to internalize symbols except the kernel.

required

Returns:

Type Description
ModuleRef

Newly linked module.

Source code in python/mneme/proteus/jit.py
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
def link_llvm_modules(
    modules: List[str], kernel_name: str, prune: bool, internalize: bool
) -> ModuleRef:
    """
    Link multiple LLVM IR modules into a single unified module.

    This constructs a new module by invoking Proteus' linker. Optionally performs
    pruning and internalization during the link stage.

    Parameters
    ----------
    modules : list[str]
        Filesystem paths to LLVM IR modules to link.
    kernel_name : str
        Name of the kernel entry function to preserve.
    prune : bool
        Whether to prune dead IR after linking.
    internalize : bool
        Whether to internalize symbols except the kernel.

    Returns
    -------
    ModuleRef
        Newly linked module.
    """
    c_strings = [c_char_p(s.encode("utf-8")) for s in modules]
    ArrayType = c_char_p * len(c_strings)
    c_array = ArrayType(*c_strings)
    Mod = ModuleRef(
        ffi.lib.ProteusPY_linkModules(
            c_array,
            len(modules),
            get_global_context(),
            kernel_name.encode("utf-8"),
            prune,
            internalize,
        ),
        get_global_context(),
    )
    return Mod

optimize(mod, device_arch, opt_level, codegen_opt_level)

Run Proteus optimization passes on an LLVM module.

Applies middle-end optimization passes customized for a target device architecture and a chosen LLVM optimization level. Also configures the code-generation optimization intensity used later by the backend.

Parameters:

Name Type Description Default
mod ModuleRef

LLVM module to optimize (mutated in-place).

required
device_arch str

Target device architecture string (e.g., "gfx942").

required
opt_level str

LLVM optimization pipeline selector (e.g., "O1", "O2", "O3", "Os", "Oz"). If empty, optimization is skipped.

required
codegen_opt_level int

Backend optimization level in [0, 3].

required

Raises:

Type Description
TypeError

If mod is not a :class:~mneme.llvm.module.ModuleRef.

ValueError

If codegen_opt_level is outside [0, 3].

Source code in python/mneme/proteus/jit.py
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
def optimize(mod: ModuleRef, device_arch: str, opt_level: str, codegen_opt_level: int):
    """
    Run Proteus optimization passes on an LLVM module.

    Applies middle-end optimization passes customized for a target device
    architecture and a chosen LLVM optimization level. Also configures the
    code-generation optimization intensity used later by the backend.

    Parameters
    ----------
    mod : ModuleRef
        LLVM module to optimize (mutated in-place).
    device_arch : str
        Target device architecture string (e.g., ``"gfx942"``).
    opt_level : str
        LLVM optimization pipeline selector (e.g., ``"O1"``, ``"O2"``, ``"O3"``,
        ``"Os"``, ``"Oz"``). If empty, optimization is skipped.
    codegen_opt_level : int
        Backend optimization level in ``[0, 3]``.

    Raises
    ------
    TypeError
        If ``mod`` is not a :class:`~mneme.llvm.module.ModuleRef`.
    ValueError
        If ``codegen_opt_level`` is outside ``[0, 3]``.
    """
    if not isinstance(mod, ModuleRef):
        raise TypeError(f"Expecting type of ModuleRef instead got {type(mod)}")

    if not (codegen_opt_level >= 0 and codegen_opt_level <= 3):
        raise ValueError(
            f"Expected the codegen_opt_level to be between 0-3 instead got {codegen_opt_level}"
        )
    if len(opt_level) == 0:
        return

    ffi.lib.ProteusPY_optimize(
        mod,
        _encode_string(device_arch),
        _encode_string(opt_level),
        int(codegen_opt_level),
    )

pruneIR(mod)

Remove unused functions, globals, and dead IR from an LLVM module.

This calls Proteus' C++ pruning pass through the FFI to eliminate dead IR and reduce module size before further specialization or optimization.

Parameters:

Name Type Description Default
mod ModuleRef

LLVM module to prune.

required

Raises:

Type Description
TypeError

If mod is not a :class:~mneme.llvm.module.ModuleRef.

Source code in python/mneme/proteus/jit.py
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
def pruneIR(mod: ModuleRef):
    """
    Remove unused functions, globals, and dead IR from an LLVM module.

    This calls Proteus' C++ pruning pass through the FFI to eliminate dead IR and
    reduce module size before further specialization or optimization.

    Parameters
    ----------
    mod : ModuleRef
        LLVM module to prune.

    Raises
    ------
    TypeError
        If ``mod`` is not a :class:`~mneme.llvm.module.ModuleRef`.
    """
    if not isinstance(mod, ModuleRef):
        raise TypeError(f"Expecting type of ModuleRef instead got {type(mod)}")
    ffi.lib.ProteusPY_pruneIR(mod)

set_launch_bounds(mod, mod_hash, kernel_name, max_threads_per_block, min_blocks_per_sm)

Apply CUDA/HIP-style launch-bounds metadata to the kernel.

Sets launch-bounds on the kernel to restrict maximum threads per block and communicate occupancy constraints, influencing register allocation and codegen decisions.

Parameters:

Name Type Description Default
mod ModuleRef

LLVM module to annotate.

required
mod_hash int

Current module hash.

required
kernel_name str

Name of the kernel function.

required
max_threads_per_block int

Maximum threads-per-block bound (must be <= 1024).

required
min_blocks_per_sm int

Minimum required blocks per SM.

required

Returns:

Type Description
int

Updated module hash.

Raises:

Type Description
RuntimeError

If max_threads_per_block exceeds 1024.

Source code in python/mneme/proteus/jit.py
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
def set_launch_bounds(
    mod: ModuleRef,
    mod_hash: int,
    kernel_name: str,
    max_threads_per_block: int,
    min_blocks_per_sm: int,
):
    """
    Apply CUDA/HIP-style launch-bounds metadata to the kernel.

    Sets launch-bounds on the kernel to restrict maximum threads per block and
    communicate occupancy constraints, influencing register allocation and
    codegen decisions.

    Parameters
    ----------
    mod : ModuleRef
        LLVM module to annotate.
    mod_hash : int
        Current module hash.
    kernel_name : str
        Name of the kernel function.
    max_threads_per_block : int
        Maximum threads-per-block bound (must be ``<= 1024``).
    min_blocks_per_sm : int
        Minimum required blocks per SM.

    Returns
    -------
    int
        Updated module hash.

    Raises
    ------
    RuntimeError
        If ``max_threads_per_block`` exceeds 1024.
    """
    if max_threads_per_block > 1024:
        raise RuntimeError("Max threads cannot be larger than 1024")

    return int(
        ffi.lib.ProteusPY_setLaunchBounds(
            mod,
            c_uint64(mod_hash),
            _encode_string(kernel_name),
            max_threads_per_block,
            min_blocks_per_sm,
        )
    )

specialize_args(mod, mod_hash, kernel_name, kernel_args, num_args, specialize_indexes)

Specialize a subset of kernel arguments inside an LLVM module.

Performs IR rewriting / constant propagation based on provided runtime arguments, and returns an updated hash reflecting the specialization.

Parameters:

Name Type Description Default
mod ModuleRef

LLVM module to modify.

required
mod_hash int

Current module hash before specialization.

required
kernel_name str

Kernel whose arguments are specialized.

required
kernel_args

Raw pointers to argument values (FFI-compatible pointer array).

required
num_args int

Total number of kernel arguments.

required
specialize_indexes

Indices of arguments to specialize.

required

Returns:

Type Description
int

Updated module hash after specialization.

Raises:

Type Description
RuntimeError

If more indices are requested than available arguments.

Source code in python/mneme/proteus/jit.py
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
def specialize_args(
    mod: ModuleRef,
    mod_hash: int,
    kernel_name: str,
    kernel_args,
    num_args: int,
    specialize_indexes,
) -> int:
    """
    Specialize a subset of kernel arguments inside an LLVM module.

    Performs IR rewriting / constant propagation based on provided runtime
    arguments, and returns an updated hash reflecting the specialization.

    Parameters
    ----------
    mod : ModuleRef
        LLVM module to modify.
    mod_hash : int
        Current module hash before specialization.
    kernel_name : str
        Kernel whose arguments are specialized.
    kernel_args
        Raw pointers to argument values (FFI-compatible pointer array).
    num_args : int
        Total number of kernel arguments.
    specialize_indexes
        Indices of arguments to specialize.

    Returns
    -------
    int
        Updated module hash after specialization.

    Raises
    ------
    RuntimeError
        If more indices are requested than available arguments.
    """
    if num_args < len(specialize_indexes):
        raise RuntimeError("Trying to specialize more indexes than available")

    indexes = (c_int * len(specialize_indexes))()
    for i, v in enumerate(specialize_indexes):
        indexes[i] = v

    return int(
        ffi.lib.ProteusPY_specializeArguments(
            mod,
            c_uint64(mod_hash),
            _encode_string(kernel_name),
            kernel_args,
            num_args,
            indexes,
            len(specialize_indexes),
        )
    )

specialize_dims(mod, mod_hash, kernel_name, grid_dim, block_dim)

Specialize launch dimensions (grid/block) inside the LLVM module.

Embeds compile-time constants for launch configuration, enabling IR simplification and more aggressive optimization.

Parameters:

Name Type Description Default
mod ModuleRef

LLVM module to update.

required
mod_hash int

Previous module hash.

required
kernel_name str

Kernel to specialize.

required
grid_dim dim3

Grid dimensions.

required
block_dim dim3

Block dimensions.

required

Returns:

Type Description
int

Updated module hash.

Source code in python/mneme/proteus/jit.py
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
def specialize_dims(
    mod: ModuleRef, mod_hash: int, kernel_name: str, grid_dim: dim3, block_dim: dim3
):
    """
    Specialize launch dimensions (grid/block) inside the LLVM module.

    Embeds compile-time constants for launch configuration, enabling IR
    simplification and more aggressive optimization.

    Parameters
    ----------
    mod : ModuleRef
        LLVM module to update.
    mod_hash : int
        Previous module hash.
    kernel_name : str
        Kernel to specialize.
    grid_dim : dim3
        Grid dimensions.
    block_dim : dim3
        Block dimensions.

    Returns
    -------
    int
        Updated module hash.
    """
    return int(
        ffi.lib.ProteusPY_specializeDims(
            mod, c_uint64(mod_hash), _encode_string(kernel_name), grid_dim, block_dim
        )
    )

specialize_dims_assume(mod, mod_hash, kernel_name, grid_dim, block_dim)

Add launch-dimension assumptions (grid/block) inside the LLVM module.

Similar to :func:specialize_dims, but emits assumptions rather than (or in addition to) direct constant replacement, enabling downstream passes to simplify based on assumed launch invariants.

Parameters:

Name Type Description Default
mod ModuleRef

LLVM module to update.

required
mod_hash int

Previous module hash.

required
kernel_name str

Kernel to specialize.

required
grid_dim dim3

Grid dimensions.

required
block_dim dim3

Block dimensions.

required

Returns:

Type Description
int

Updated module hash.

Source code in python/mneme/proteus/jit.py
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
def specialize_dims_assume(
    mod: ModuleRef, mod_hash: int, kernel_name: str, grid_dim: dim3, block_dim: dim3
):
    """
    Add launch-dimension assumptions (grid/block) inside the LLVM module.

    Similar to :func:`specialize_dims`, but emits assumptions rather than (or in
    addition to) direct constant replacement, enabling downstream passes to
    simplify based on assumed launch invariants.

    Parameters
    ----------
    mod : ModuleRef
        LLVM module to update.
    mod_hash : int
        Previous module hash.
    kernel_name : str
        Kernel to specialize.
    grid_dim : dim3
        Grid dimensions.
    block_dim : dim3
        Block dimensions.

    Returns
    -------
    int
        Updated module hash.
    """
    return int(
        ffi.lib.ProteusPY_specializeDimsAssume(
            mod, c_uint64(mod_hash), _encode_string(kernel_name), grid_dim, block_dim
        )
    )

Tuning

ExhaustiveSamplingStrategy

Bases: SamplingStrategy

Exhaustive sampling strategy over the entire search space.

This strategy enumerates all valid combinations of parameters as defined by the associated :class:SearchSpace. It is intended primarily for small search spaces where full enumeration is feasible.

Notes
  • This strategy has not been tested at all and should be considered as proof of concept.
  • Exhaustive enumeration may become prohibitively expensive for large or high-dimensional search spaces.
Source code in python/mneme/tuning/sample_strategy.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
class ExhaustiveSamplingStrategy(SamplingStrategy):
    """
    Exhaustive sampling strategy over the entire search space.

    This strategy enumerates **all** valid combinations of parameters as defined
    by the associated :class:`SearchSpace`. It is intended primarily for small
    search spaces where full enumeration is feasible.

    Notes
    -----
    * This strategy has **not been tested at all** and should be considered
      as proof of concept.
    * Exhaustive enumeration may become prohibitively expensive for large or
      high-dimensional search spaces.
    """

    def __init__(self, search_space):
        """
        Construct an exhaustive sampler.

        Parameters
        ----------
        search_space : SearchSpace
            Search space providing the parameter definitions and exhaustive
            enumeration logic.
        """
        self.space = search_space

    def __iter__(self) -> Iterator[Tuple[ExperimentConfiguration, Any]]:
        """
        Yield all parameter combinations from the search space.

        Returns
        -------
        Iterator[dict]
            Iterator over parameter dictionaries produced by
            :meth:`SearchSpace.sample_exhaustive`.
        """
        for params in self.space.sample_exhaustive():
            yield params

__init__(search_space)

Construct an exhaustive sampler.

Parameters:

Name Type Description Default
search_space SearchSpace

Search space providing the parameter definitions and exhaustive enumeration logic.

required
Source code in python/mneme/tuning/sample_strategy.py
45
46
47
48
49
50
51
52
53
54
55
def __init__(self, search_space):
    """
    Construct an exhaustive sampler.

    Parameters
    ----------
    search_space : SearchSpace
        Search space providing the parameter definitions and exhaustive
        enumeration logic.
    """
    self.space = search_space

__iter__()

Yield all parameter combinations from the search space.

Returns:

Type Description
Iterator[dict]

Iterator over parameter dictionaries produced by :meth:SearchSpace.sample_exhaustive.

Source code in python/mneme/tuning/sample_strategy.py
57
58
59
60
61
62
63
64
65
66
67
68
def __iter__(self) -> Iterator[Tuple[ExperimentConfiguration, Any]]:
    """
    Yield all parameter combinations from the search space.

    Returns
    -------
    Iterator[dict]
        Iterator over parameter dictionaries produced by
        :meth:`SearchSpace.sample_exhaustive`.
    """
    for params in self.space.sample_exhaustive():
        yield params

OptunaSamplingStrategy

Bases: SamplingStrategy

Optuna-driven adaptive sampling strategy.

This strategy delegates sampling decisions to an Optuna Study object. Parameter suggestions are generated by invoking the search space’s Optuna sampling logic, which typically binds Optuna Trial objects to parameter definitions.

The iterator yields samples until the requested number of trials has been reached, accounting for trials that may already exist in the study (e.g., when resuming from a persistent Optuna backend).

Source code in python/mneme/tuning/sample_strategy.py
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
class OptunaSamplingStrategy(SamplingStrategy):
    """
    Optuna-driven adaptive sampling strategy.

    This strategy delegates sampling decisions to an Optuna ``Study`` object.
    Parameter suggestions are generated by invoking the search space’s Optuna
    sampling logic, which typically binds Optuna ``Trial`` objects to parameter
    definitions.

    The iterator yields samples until the requested number of trials has been
    reached, accounting for trials that may already exist in the study (e.g.,
    when resuming from a persistent Optuna backend).
    """

    def __init__(self, search_space, study, n_trials):
        """
        Construct an Optuna-based sampling strategy.

        Parameters
        ----------
        search_space : SearchSpace
            Search space providing Optuna-aware sampling logic.
        study : optuna.Study
            Optuna study object managing trials and optimization state.
        n_trials : int
            Total number of trials to execute (including any existing trials
            already present in the study).
        """
        self.space = search_space
        self.study = study
        self.n_trials = n_trials
        logger.debug(
            f"{self.__class__.__name__} Total number of previously executed trials are {len(self.study.trials)} and total requested trials are {self.n_trials}"
        )

    def __iter__(self) -> Iterator[Tuple[ExperimentConfiguration, Any]]:
        """
        Yield Optuna-suggested parameter dictionaries.

        Iteration continues until the number of trials in the associated study
        reaches ``n_trials``.

        Returns
        -------
        Iterator[dict]
            Iterator yielding parameter dictionaries produced via Optuna.
        """
        while len(self.study.trials) < self.n_trials:
            params = self.space.sample_optuna(self.study)
            yield params

__init__(search_space, study, n_trials)

Construct an Optuna-based sampling strategy.

Parameters:

Name Type Description Default
search_space SearchSpace

Search space providing Optuna-aware sampling logic.

required
study Study

Optuna study object managing trials and optimization state.

required
n_trials int

Total number of trials to execute (including any existing trials already present in the study).

required
Source code in python/mneme/tuning/sample_strategy.py
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
def __init__(self, search_space, study, n_trials):
    """
    Construct an Optuna-based sampling strategy.

    Parameters
    ----------
    search_space : SearchSpace
        Search space providing Optuna-aware sampling logic.
    study : optuna.Study
        Optuna study object managing trials and optimization state.
    n_trials : int
        Total number of trials to execute (including any existing trials
        already present in the study).
    """
    self.space = search_space
    self.study = study
    self.n_trials = n_trials
    logger.debug(
        f"{self.__class__.__name__} Total number of previously executed trials are {len(self.study.trials)} and total requested trials are {self.n_trials}"
    )

__iter__()

Yield Optuna-suggested parameter dictionaries.

Iteration continues until the number of trials in the associated study reaches n_trials.

Returns:

Type Description
Iterator[dict]

Iterator yielding parameter dictionaries produced via Optuna.

Source code in python/mneme/tuning/sample_strategy.py
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
def __iter__(self) -> Iterator[Tuple[ExperimentConfiguration, Any]]:
    """
    Yield Optuna-suggested parameter dictionaries.

    Iteration continues until the number of trials in the associated study
    reaches ``n_trials``.

    Returns
    -------
    Iterator[dict]
        Iterator yielding parameter dictionaries produced via Optuna.
    """
    while len(self.study.trials) < self.n_trials:
        params = self.space.sample_optuna(self.study)
        yield params

RandomSamplingStrategy

Bases: SamplingStrategy

Random sampling strategy over the search space.

This strategy draws a fixed number of independent samples from the search space using the search space’s random sampling logic.

Notes
  • This strategy has not been tested at all and should be considered as proof of concept.
  • Sampling does not guarantee coverage or uniqueness of configurations.
Source code in python/mneme/tuning/sample_strategy.py
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
class RandomSamplingStrategy(SamplingStrategy):
    """
    Random sampling strategy over the search space.

    This strategy draws a fixed number of independent samples from the search
    space using the search space’s random sampling logic.

    Notes
    -----
    * This strategy has **not been tested at all** and should be considered
      as proof of concept.
    * Sampling does not guarantee coverage or uniqueness of configurations.
    """

    def __init__(self, search_space, num_samples: int):
        """
        Construct a random sampler.

        Parameters
        ----------
        search_space : SearchSpace
            Search space providing the parameter definitions and random sampling
            logic.
        num_samples : int
            Number of random samples to generate.
        """
        self.space = search_space
        self.num_samples = num_samples

    def __iter__(self) -> Iterator[Tuple[ExperimentConfiguration, Any]]:
        """
        Yield randomly sampled parameter dictionaries.

        Returns
        -------
        Iterator[dict]
            Iterator yielding ``num_samples`` independently sampled parameter
            dictionaries.
        """
        for _ in range(self.num_samples):
            yield self.space.sample_random()

__init__(search_space, num_samples)

Construct a random sampler.

Parameters:

Name Type Description Default
search_space SearchSpace

Search space providing the parameter definitions and random sampling logic.

required
num_samples int

Number of random samples to generate.

required
Source code in python/mneme/tuning/sample_strategy.py
85
86
87
88
89
90
91
92
93
94
95
96
97
98
def __init__(self, search_space, num_samples: int):
    """
    Construct a random sampler.

    Parameters
    ----------
    search_space : SearchSpace
        Search space providing the parameter definitions and random sampling
        logic.
    num_samples : int
        Number of random samples to generate.
    """
    self.space = search_space
    self.num_samples = num_samples

__iter__()

Yield randomly sampled parameter dictionaries.

Returns:

Type Description
Iterator[dict]

Iterator yielding num_samples independently sampled parameter dictionaries.

Source code in python/mneme/tuning/sample_strategy.py
100
101
102
103
104
105
106
107
108
109
110
111
def __iter__(self) -> Iterator[Tuple[ExperimentConfiguration, Any]]:
    """
    Yield randomly sampled parameter dictionaries.

    Returns
    -------
    Iterator[dict]
        Iterator yielding ``num_samples`` independently sampled parameter
        dictionaries.
    """
    for _ in range(self.num_samples):
        yield self.space.sample_random()

SamplingStrategy

Bases: ABC

Abstract base class for parameter sampling strategies.

A SamplingStrategy defines how candidate parameter dictionaries are generated from a :class:SearchSpace. Concrete implementations determine whether sampling is exhaustive, random, adaptive, or driven by an external optimization framework (e.g., Optuna).

Implementations must provide an iterator interface that yields parameter dictionaries compatible with the associated :class:SearchSpace.

Source code in python/mneme/tuning/sample_strategy.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class SamplingStrategy(ABC):
    """
    Abstract base class for parameter sampling strategies.

    A ``SamplingStrategy`` defines how candidate parameter dictionaries are
    generated from a :class:`SearchSpace`. Concrete implementations determine
    whether sampling is exhaustive, random, adaptive, or driven by an external
    optimization framework (e.g., Optuna).

    Implementations must provide an iterator interface that yields parameter
    dictionaries compatible with the associated :class:`SearchSpace`.
    """

    @abstractmethod
    def __iter__(self) -> Iterator[Tuple[ExperimentConfiguration, Any]]:
        """Return an iterator that yields configs."""
        pass

__iter__() abstractmethod

Return an iterator that yields configs.

Source code in python/mneme/tuning/sample_strategy.py
23
24
25
26
@abstractmethod
def __iter__(self) -> Iterator[Tuple[ExperimentConfiguration, Any]]:
    """Return an iterator that yields configs."""
    pass

BaseParam

Bases: ABC

Abstract base class for tuning parameter definitions.

A BaseParam represents a single tunable dimension in a :class:SearchSpace. Concrete subclasses define the domain (fixed, boolean, categorical, numeric range, pass-pipeline, etc.) and provide metadata needed by sampling backends.

Attributes:

Name Type Description
name str

Logical name of the parameter. This name is typically used as the Optuna parameter name when sampling via :class:optuna.trial.Trial.

Source code in python/mneme/tuning/search_space.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class BaseParam(ABC):
    """
    Abstract base class for tuning parameter definitions.

    A ``BaseParam`` represents a single tunable dimension in a :class:`SearchSpace`.
    Concrete subclasses define the domain (fixed, boolean, categorical, numeric range,
    pass-pipeline, etc.) and provide metadata needed by sampling backends.

    Attributes
    ----------
    name : str
        Logical name of the parameter. This name is typically used as the Optuna
        parameter name when sampling via :class:`optuna.trial.Trial`.
    """

    def __init__(self, name: str):
        self.name = name

BoolParam

Bases: BaseParam

A boolean parameter.

The domain is {True, False}.

Source code in python/mneme/tuning/search_space.py
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
class BoolParam(BaseParam):
    """
    A boolean parameter.

    The domain is ``{True, False}``.
    """

    def __init__(self, name: str):
        """
        Parameters
        ----------
        name : str
            Name of the parameter.
        """
        super().__init__(name)
        self.choices: List[bool] = [True, False]

__init__(name)

Parameters:

Name Type Description Default
name str

Name of the parameter.

required
Source code in python/mneme/tuning/search_space.py
57
58
59
60
61
62
63
64
65
def __init__(self, name: str):
    """
    Parameters
    ----------
    name : str
        Name of the parameter.
    """
    super().__init__(name)
    self.choices: List[bool] = [True, False]

CategoricalParam

Bases: BaseParam

A parameter with an explicit finite set of choices.

The domain is the provided list of choices.

Source code in python/mneme/tuning/search_space.py
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
class CategoricalParam(BaseParam):
    """
    A parameter with an explicit finite set of choices.

    The domain is the provided list of choices.
    """

    def __init__(self, name: str, choices: List[Any]):
        """
        Parameters
        ----------
        name : str
            Name of the parameter.
        choices : list
            Finite set of allowed values.

        Raises
        ------
        ValueError
            If ``choices`` is empty.
        """
        super().__init__(name)
        if not choices:
            raise ValueError("CategoricalParam must have at least one choice.")
        self.choices: List[Any] = list(choices)

__init__(name, choices)

Parameters:

Name Type Description Default
name str

Name of the parameter.

required
choices list

Finite set of allowed values.

required

Raises:

Type Description
ValueError

If choices is empty.

Source code in python/mneme/tuning/search_space.py
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
def __init__(self, name: str, choices: List[Any]):
    """
    Parameters
    ----------
    name : str
        Name of the parameter.
    choices : list
        Finite set of allowed values.

    Raises
    ------
    ValueError
        If ``choices`` is empty.
    """
    super().__init__(name)
    if not choices:
        raise ValueError("CategoricalParam must have at least one choice.")
    self.choices: List[Any] = list(choices)

FixedParam

Bases: BaseParam

A parameter with a single fixed value.

This is useful for keeping a dimension present in the search space interface while effectively disabling tuning for that parameter.

Source code in python/mneme/tuning/search_space.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
class FixedParam(BaseParam):
    """
    A parameter with a single fixed value.

    This is useful for keeping a dimension present in the search space interface
    while effectively disabling tuning for that parameter.
    """

    def __init__(self, name: str, value: Any):
        """
        Parameters
        ----------
        name : str
            Name of the parameter.
        value : Any
            Fixed value returned by all samplers.
        """
        super().__init__(name)
        self.value = value

__init__(name, value)

Parameters:

Name Type Description Default
name str

Name of the parameter.

required
value Any

Fixed value returned by all samplers.

required
Source code in python/mneme/tuning/search_space.py
37
38
39
40
41
42
43
44
45
46
47
def __init__(self, name: str, value: Any):
    """
    Parameters
    ----------
    name : str
        Name of the parameter.
    value : Any
        Fixed value returned by all samplers.
    """
    super().__init__(name)
    self.value = value

IntRangeParam

Bases: BaseParam

Integer range parameter.

Represents an inclusive integer range [low, high] with a positive step. Sampling produces values from the discrete set: {low, low+step, ..., high} (assuming divisibility).

Notes
  • This class models a discrete domain (even though it is expressed as a range).
Source code in python/mneme/tuning/search_space.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
class IntRangeParam(BaseParam):
    """
    Integer range parameter.

    Represents an inclusive integer range ``[low, high]`` with a positive step.
    Sampling produces values from the discrete set:
    ``{low, low+step, ..., high}`` (assuming divisibility).

    Notes
    -----
    * This class models a discrete domain (even though it is expressed as a range).
    """

    def __init__(self, name: str, low: int, high: int, step: int = 1):
        """
        Parameters
        ----------
        name : str
            Name of the parameter.
        low : int
            Inclusive lower bound.
        high : int
            Inclusive upper bound.
        step : int, optional
            Step size (must be positive).

        Raises
        ------
        ValueError
            If ``low > high`` or ``step <= 0``.
        """
        super().__init__(name)
        if low > high:
            raise ValueError("IntRangeParam low must be <= high.")
        if step is None or step <= 0:
            raise ValueError("IntRangeParam step must be a positive integer.")

        self.low: int = low
        self.high: int = high
        self.step: int = step

__init__(name, low, high, step=1)

Parameters:

Name Type Description Default
name str

Name of the parameter.

required
low int

Inclusive lower bound.

required
high int

Inclusive upper bound.

required
step int

Step size (must be positive).

1

Raises:

Type Description
ValueError

If low > high or step <= 0.

Source code in python/mneme/tuning/search_space.py
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
def __init__(self, name: str, low: int, high: int, step: int = 1):
    """
    Parameters
    ----------
    name : str
        Name of the parameter.
    low : int
        Inclusive lower bound.
    high : int
        Inclusive upper bound.
    step : int, optional
        Step size (must be positive).

    Raises
    ------
    ValueError
        If ``low > high`` or ``step <= 0``.
    """
    super().__init__(name)
    if low > high:
        raise ValueError("IntRangeParam low must be <= high.")
    if step is None or step <= 0:
        raise ValueError("IntRangeParam step must be a positive integer.")

    self.low: int = low
    self.high: int = high
    self.step: int = step

PipelineParam

Bases: BaseParam

Parameter representing a compiler optimization pipeline / pass sequence.

This parameter is specialized: its domain is defined by the available passes provided by :class:PipelineManager and an internal sampling scheme that can select passes, order them, and optionally select multiple occurrences.

Attributes:

Name Type Description
pass_manager PipelineManager

Helper that provides available passes and serialization to pipeline strings.

available_passes list of str

Sorted list of pass identifiers that may be selected.

num_draws int

Upper bound on how many pass-selection decisions are made when sampling. (Interpretation depends on the sampling backend.)

Notes

This parameter must be used cautiously, the pipeline sequence is a combinatorial space on each own composed of more than 100+ of dimensions. We mainly provide this for completeness, however applying TPE or NGSAII on this space is not recommended.

Source code in python/mneme/tuning/search_space.py
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
class PipelineParam(BaseParam):
    """
    Parameter representing a compiler optimization pipeline / pass sequence.

    This parameter is specialized: its domain is defined by the available passes
    provided by :class:`PipelineManager` and an internal sampling scheme that can
    select passes, order them, and optionally select multiple occurrences.

    Attributes
    ----------
    pass_manager : PipelineManager
        Helper that provides available passes and serialization to pipeline strings.
    available_passes : list of str
        Sorted list of pass identifiers that may be selected.
    num_draws : int
        Upper bound on how many pass-selection decisions are made when sampling.
        (Interpretation depends on the sampling backend.)
    Notes
    -----
    This parameter must be used cautiously, the pipeline sequence is a combinatorial space on each own
    composed of more than 100+ of dimensions. We mainly provide this for completeness, however applying
    `TPE` or `NGSAII` on this space is not recommended.

    """

    def __init__(self, name: str, num_draws: int):
        """
        Parameters
        ----------
        name : str
            Name of the parameter (used as a logical key in the search space).
        num_draws : int
            Number of sampling "draws" used when constructing a pipeline.

        Notes
        -----
        * The available pass list is obtained from :class:`PipelineManager` and is
          sorted to ensure stable iteration order.
        """
        super().__init__(name)
        self.pass_manager = PipelineManager()
        self.num_draws = num_draws
        self.available_passes = self.pass_manager.get_passes()
        self.available_passes.sort()

__init__(name, num_draws)

Parameters:

Name Type Description Default
name str

Name of the parameter (used as a logical key in the search space).

required
num_draws int

Number of sampling "draws" used when constructing a pipeline.

required
Notes
  • The available pass list is obtained from :class:PipelineManager and is sorted to ensure stable iteration order.
Source code in python/mneme/tuning/search_space.py
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
def __init__(self, name: str, num_draws: int):
    """
    Parameters
    ----------
    name : str
        Name of the parameter (used as a logical key in the search space).
    num_draws : int
        Number of sampling "draws" used when constructing a pipeline.

    Notes
    -----
    * The available pass list is obtained from :class:`PipelineManager` and is
      sorted to ensure stable iteration order.
    """
    super().__init__(name)
    self.pass_manager = PipelineManager()
    self.num_draws = num_draws
    self.available_passes = self.pass_manager.get_passes()
    self.available_passes.sort()

RealRangeParam

Bases: BaseParam

Real-valued range parameter.

Represents an inclusive real range [low, high]. This parameter is intended for continuous sampling backends (e.g., Optuna suggest_float).

Notes
  • This parameter is not exhaustively enumerable.
Source code in python/mneme/tuning/search_space.py
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
class RealRangeParam(BaseParam):
    """
    Real-valued range parameter.

    Represents an inclusive real range ``[low, high]``. This parameter is intended
    for continuous sampling backends (e.g., Optuna suggest_float).

    Notes
    -----
    * This parameter is not exhaustively enumerable.
    """

    def __init__(self, name: str, low: float, high: float):
        """
        Parameters
        ----------
        name : str
            Name of the parameter.
        low : float
            Inclusive lower bound.
        high : float
            Inclusive upper bound.

        Raises
        ------
        ValueError
            If ``low > high``.
        """
        super().__init__(name)
        if low > high:
            raise ValueError("RealRangeParam low must be <= high.")
        self.low: float = low
        self.high: float = high

__init__(name, low, high)

Parameters:

Name Type Description Default
name str

Name of the parameter.

required
low float

Inclusive lower bound.

required
high float

Inclusive upper bound.

required

Raises:

Type Description
ValueError

If low > high.

Source code in python/mneme/tuning/search_space.py
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
def __init__(self, name: str, low: float, high: float):
    """
    Parameters
    ----------
    name : str
        Name of the parameter.
    low : float
        Inclusive lower bound.
    high : float
        Inclusive upper bound.

    Raises
    ------
    ValueError
        If ``low > high``.
    """
    super().__init__(name)
    if low > high:
        raise ValueError("RealRangeParam low must be <= high.")
    self.low: float = low
    self.high: float = high

SearchSpace

Bases: ABC

Declarative representation of a tuning search space.

A :class:SearchSpace describes:

1) The primary tunable dimensions (see :meth:dimensions) 2) Any derived configuration computed from sampled parameters (see :meth:derived) 3) Constraints that determine whether a sampled assignment is valid (see :meth:constraints)

The base class also provides helper sampling routines for different backends (random sampling, Optuna sampling, and exhaustive enumeration of finite domains).

Notes
  • Concrete subclasses should keep :meth:dimensions purely declarative and implement domain-specific logic inside :meth:derived and :meth:constraints.
Source code in python/mneme/tuning/search_space.py
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
class SearchSpace(ABC):
    """
    Declarative representation of a tuning search space.

    A :class:`SearchSpace` describes:

      1) The **primary tunable dimensions** (see :meth:`dimensions`)
      2) Any **derived configuration** computed from sampled parameters (see :meth:`derived`)
      3) **Constraints** that determine whether a sampled assignment is valid (see :meth:`constraints`)

    The base class also provides helper sampling routines for different backends
    (random sampling, Optuna sampling, and exhaustive enumeration of finite domains).

    Notes
    -----
    * Concrete subclasses should keep :meth:`dimensions` purely declarative and
      implement domain-specific logic inside :meth:`derived` and :meth:`constraints`.
    """

    @abstractmethod
    def dimensions(self) -> Dict[str, BaseParam]:
        """
        Return the top-level tunable parameters of this search space.

        Returns
        -------
        dict
            Mapping from parameter name to a :class:`BaseParam` instance describing
            that parameter’s domain.
        """
        pass

    @abstractmethod
    def derived(self, params: Dict[str, Any]) -> ExperimentConfiguration:
        """
        Compute a full experiment configuration from sampled primary parameters.

        Parameters
        ----------
        params : dict
            Dictionary mapping primary parameter names to sampled values.

        Returns
        -------
        ExperimentConfiguration
            Fully specified experiment configuration derived from the sampled values.

        Notes
        -----
        * Derived configuration may include both original parameters and additional
          fields computed from them (e.g., mapping a normalized fraction to an integer
          launch-bounds parameter).
        """
        return {}

    @abstractmethod
    def constraints(self, params: Dict[str, Any]) -> bool:
        """
        Validate that a parameter assignment or derived configuration is legal.

        Parameters
        ----------
        params : dict
            Parameter assignment to validate. Implementations may choose whether this
            expects only primary parameters or a derived configuration, depending on
            the calling context.

        Returns
        -------
        bool
            ``True`` if the assignment is valid, otherwise ``False``.
        """
        return True

    def sample_random(self) -> Dict[str, Any]:
        """
        Generate one valid random sample from this search space.

        This method repeatedly samples all primary dimensions using
        :func:`sample_random_param` and applies :meth:`constraints`. Sampling is retried
        up to ``MAX_RETRIES`` times.

        Returns
        -------
        dict
            A dictionary of the form ``{"parameters": <param-dict>}`` containing the
            sampled primary parameters.

        Raises
        ------
        RuntimeError
            If a valid configuration cannot be produced within the retry budget.
        """

        MAX_RETRIES = 1000
        dims = self.dimensions()

        for _ in range(MAX_RETRIES):
            result = {}

            # Step 1: sample primary dimensions
            for name, param in dims.items():
                result[name] = sample_random_param(param)

            # Step 2: constraints
            if self.constraints(result):
                return {"parameters": result}

        raise RuntimeError(
            f"Failed to produce a valid random sample after {MAX_RETRIES} attempts."
        )

    def sample_optuna(self, study) -> Tuple[ExperimentConfiguration, Trial]:
        """
        Generate a valid configuration using Optuna.

        This method uses an Optuna study to create trials and sample primary
        dimensions, then computes the derived experiment configuration and enforces
        constraints. Invalid samples are immediately reported back to the study with
        a sentinel objective value.

        Parameters
        ----------
        study : optuna.study.Study
            Optuna study used to create trials and manage search state.

        Returns
        -------
        (ExperimentConfiguration, Trial)
            A tuple containing:

            * **ExperimentConfiguration** – derived configuration produced by :meth:`derived`.
            * **Trial** – Optuna trial associated with the sampled configuration.

        Raises
        ------
        RuntimeError
            If a valid configuration cannot be produced within the retry budget.

        Notes
        -----
        * This routine uses ``study.ask()`` / ``study.tell()`` rather than Optuna’s
          higher-level objective API to support asynchronous evaluation.
        """

        MAX_RETRIES = 1000
        dims = self.dimensions()

        for _ in range(MAX_RETRIES):
            trial = study.ask()
            config = {}

            # Step 1: primary dimension sampling
            for name, param in dims.items():
                config[name] = sample_optuna_param(trial, param)

            # Step 2: constraints
            derived_config = self.derived(config)
            if self.constraints(derived_config):
                return derived_config, trial
            else:
                study.tell(trial, (1 << 64) - 1)

        raise RuntimeError(
            f"Failed to generate a valid Optuna sample after {MAX_RETRIES} attempts."
        )

    def sample_exhaustive(self) -> Iterable[Dict[str, Any]]:
        """
        Exhaustively enumerate all configurations for finite domains.

        This helper enumerates the cartesian product of all dimension value lists
        for parameters with finite, enumerable domains (fixed, boolean, categorical,
        and integer ranges). Real-valued parameters are not enumerable.

        Yields
        ------
        dict
            Dictionaries of the form ``{"parameters": <param-dict>}`` for each valid
            parameter assignment satisfying :meth:`constraints`.

        Raises
        ------
        ValueError
            If an attempt is made to enumerate a non-enumerable parameter type.
        """
        dims = self.dimensions()
        keys = list(dims.keys())

        # Build value lists
        value_lists = []
        for dim in dims.values():
            if isinstance(dim, FixedParam):
                value_lists.append([dim.config])
            elif isinstance(dim, BoolParam):
                value_lists.append(list(dim.choices))
            elif isinstance(dim, IntRangeParam):
                lo = dim.low
                hi = dim.high
                step = dim.step
                value_lists.append(list(range(lo, hi + 1, step)))
            elif isinstance(dim, CategoricalParam):
                value_lists.append(list(dim.choices))
            elif isinstance(dim, RealRangeParam):
                raise ValueError("Cannot enumerate real space.")
            else:
                raise ValueError("Cannot enumerate custom dimension.")

        # Cartesian product
        from itertools import product

        for combo in product(*value_lists):
            params = dict(zip(keys, combo))
            derived_config = self.derived(params)
            if self.constraints(derived_config):
                yield derived_config

constraints(params) abstractmethod

Validate that a parameter assignment or derived configuration is legal.

Parameters:

Name Type Description Default
params dict

Parameter assignment to validate. Implementations may choose whether this expects only primary parameters or a derived configuration, depending on the calling context.

required

Returns:

Type Description
bool

True if the assignment is valid, otherwise False.

Source code in python/mneme/tuning/search_space.py
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
@abstractmethod
def constraints(self, params: Dict[str, Any]) -> bool:
    """
    Validate that a parameter assignment or derived configuration is legal.

    Parameters
    ----------
    params : dict
        Parameter assignment to validate. Implementations may choose whether this
        expects only primary parameters or a derived configuration, depending on
        the calling context.

    Returns
    -------
    bool
        ``True`` if the assignment is valid, otherwise ``False``.
    """
    return True

derived(params) abstractmethod

Compute a full experiment configuration from sampled primary parameters.

Parameters:

Name Type Description Default
params dict

Dictionary mapping primary parameter names to sampled values.

required

Returns:

Type Description
ExperimentConfiguration

Fully specified experiment configuration derived from the sampled values.

Notes
  • Derived configuration may include both original parameters and additional fields computed from them (e.g., mapping a normalized fraction to an integer launch-bounds parameter).
Source code in python/mneme/tuning/search_space.py
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
@abstractmethod
def derived(self, params: Dict[str, Any]) -> ExperimentConfiguration:
    """
    Compute a full experiment configuration from sampled primary parameters.

    Parameters
    ----------
    params : dict
        Dictionary mapping primary parameter names to sampled values.

    Returns
    -------
    ExperimentConfiguration
        Fully specified experiment configuration derived from the sampled values.

    Notes
    -----
    * Derived configuration may include both original parameters and additional
      fields computed from them (e.g., mapping a normalized fraction to an integer
      launch-bounds parameter).
    """
    return {}

dimensions() abstractmethod

Return the top-level tunable parameters of this search space.

Returns:

Type Description
dict

Mapping from parameter name to a :class:BaseParam instance describing that parameter’s domain.

Source code in python/mneme/tuning/search_space.py
359
360
361
362
363
364
365
366
367
368
369
370
@abstractmethod
def dimensions(self) -> Dict[str, BaseParam]:
    """
    Return the top-level tunable parameters of this search space.

    Returns
    -------
    dict
        Mapping from parameter name to a :class:`BaseParam` instance describing
        that parameter’s domain.
    """
    pass

sample_exhaustive()

Exhaustively enumerate all configurations for finite domains.

This helper enumerates the cartesian product of all dimension value lists for parameters with finite, enumerable domains (fixed, boolean, categorical, and integer ranges). Real-valued parameters are not enumerable.

Yields:

Type Description
dict

Dictionaries of the form {"parameters": <param-dict>} for each valid parameter assignment satisfying :meth:constraints.

Raises:

Type Description
ValueError

If an attempt is made to enumerate a non-enumerable parameter type.

Source code in python/mneme/tuning/search_space.py
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
def sample_exhaustive(self) -> Iterable[Dict[str, Any]]:
    """
    Exhaustively enumerate all configurations for finite domains.

    This helper enumerates the cartesian product of all dimension value lists
    for parameters with finite, enumerable domains (fixed, boolean, categorical,
    and integer ranges). Real-valued parameters are not enumerable.

    Yields
    ------
    dict
        Dictionaries of the form ``{"parameters": <param-dict>}`` for each valid
        parameter assignment satisfying :meth:`constraints`.

    Raises
    ------
    ValueError
        If an attempt is made to enumerate a non-enumerable parameter type.
    """
    dims = self.dimensions()
    keys = list(dims.keys())

    # Build value lists
    value_lists = []
    for dim in dims.values():
        if isinstance(dim, FixedParam):
            value_lists.append([dim.config])
        elif isinstance(dim, BoolParam):
            value_lists.append(list(dim.choices))
        elif isinstance(dim, IntRangeParam):
            lo = dim.low
            hi = dim.high
            step = dim.step
            value_lists.append(list(range(lo, hi + 1, step)))
        elif isinstance(dim, CategoricalParam):
            value_lists.append(list(dim.choices))
        elif isinstance(dim, RealRangeParam):
            raise ValueError("Cannot enumerate real space.")
        else:
            raise ValueError("Cannot enumerate custom dimension.")

    # Cartesian product
    from itertools import product

    for combo in product(*value_lists):
        params = dict(zip(keys, combo))
        derived_config = self.derived(params)
        if self.constraints(derived_config):
            yield derived_config

sample_optuna(study)

Generate a valid configuration using Optuna.

This method uses an Optuna study to create trials and sample primary dimensions, then computes the derived experiment configuration and enforces constraints. Invalid samples are immediately reported back to the study with a sentinel objective value.

Parameters:

Name Type Description Default
study Study

Optuna study used to create trials and manage search state.

required

Returns:

Type Description
(ExperimentConfiguration, Trial)

A tuple containing:

  • ExperimentConfiguration – derived configuration produced by :meth:derived.
  • Trial – Optuna trial associated with the sampled configuration.

Raises:

Type Description
RuntimeError

If a valid configuration cannot be produced within the retry budget.

Notes
  • This routine uses study.ask() / study.tell() rather than Optuna’s higher-level objective API to support asynchronous evaluation.
Source code in python/mneme/tuning/search_space.py
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
def sample_optuna(self, study) -> Tuple[ExperimentConfiguration, Trial]:
    """
    Generate a valid configuration using Optuna.

    This method uses an Optuna study to create trials and sample primary
    dimensions, then computes the derived experiment configuration and enforces
    constraints. Invalid samples are immediately reported back to the study with
    a sentinel objective value.

    Parameters
    ----------
    study : optuna.study.Study
        Optuna study used to create trials and manage search state.

    Returns
    -------
    (ExperimentConfiguration, Trial)
        A tuple containing:

        * **ExperimentConfiguration** – derived configuration produced by :meth:`derived`.
        * **Trial** – Optuna trial associated with the sampled configuration.

    Raises
    ------
    RuntimeError
        If a valid configuration cannot be produced within the retry budget.

    Notes
    -----
    * This routine uses ``study.ask()`` / ``study.tell()`` rather than Optuna’s
      higher-level objective API to support asynchronous evaluation.
    """

    MAX_RETRIES = 1000
    dims = self.dimensions()

    for _ in range(MAX_RETRIES):
        trial = study.ask()
        config = {}

        # Step 1: primary dimension sampling
        for name, param in dims.items():
            config[name] = sample_optuna_param(trial, param)

        # Step 2: constraints
        derived_config = self.derived(config)
        if self.constraints(derived_config):
            return derived_config, trial
        else:
            study.tell(trial, (1 << 64) - 1)

    raise RuntimeError(
        f"Failed to generate a valid Optuna sample after {MAX_RETRIES} attempts."
    )

sample_random()

Generate one valid random sample from this search space.

This method repeatedly samples all primary dimensions using :func:sample_random_param and applies :meth:constraints. Sampling is retried up to MAX_RETRIES times.

Returns:

Type Description
dict

A dictionary of the form {"parameters": <param-dict>} containing the sampled primary parameters.

Raises:

Type Description
RuntimeError

If a valid configuration cannot be produced within the retry budget.

Source code in python/mneme/tuning/search_space.py
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
def sample_random(self) -> Dict[str, Any]:
    """
    Generate one valid random sample from this search space.

    This method repeatedly samples all primary dimensions using
    :func:`sample_random_param` and applies :meth:`constraints`. Sampling is retried
    up to ``MAX_RETRIES`` times.

    Returns
    -------
    dict
        A dictionary of the form ``{"parameters": <param-dict>}`` containing the
        sampled primary parameters.

    Raises
    ------
    RuntimeError
        If a valid configuration cannot be produced within the retry budget.
    """

    MAX_RETRIES = 1000
    dims = self.dimensions()

    for _ in range(MAX_RETRIES):
        result = {}

        # Step 1: sample primary dimensions
        for name, param in dims.items():
            result[name] = sample_random_param(param)

        # Step 2: constraints
        if self.constraints(result):
            return {"parameters": result}

    raise RuntimeError(
        f"Failed to produce a valid random sample after {MAX_RETRIES} attempts."
    )

sample_optuna_param(trial, param)

Sample a single parameter value using an Optuna trial.

This function maps :class:BaseParam subclasses to the appropriate Optuna sampling primitive (e.g., suggest_int, suggest_float, or suggest_categorical). For specialized parameter types (e.g., :class:PipelineParam), it implements a custom sampling scheme that encodes selection and ordering via multiple Optuna decision variables.

Parameters:

Name Type Description Default
trial Trial

Optuna trial used to generate parameter suggestions.

required
param BaseParam

Parameter definition describing the domain and sampling behavior.

required

Returns:

Type Description
Any

Sampled value for the parameter.

Raises:

Type Description
TypeError

If the parameter type is not supported.

Source code in python/mneme/tuning/search_space.py
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
def sample_optuna_param(trial: Trial, param: BaseParam) -> Any:
    """
    Sample a single parameter value using an Optuna trial.

    This function maps :class:`BaseParam` subclasses to the appropriate Optuna
    sampling primitive (e.g., ``suggest_int``, ``suggest_float``, or
    ``suggest_categorical``). For specialized parameter types (e.g.,
    :class:`PipelineParam`), it implements a custom sampling scheme that encodes
    selection and ordering via multiple Optuna decision variables.

    Parameters
    ----------
    trial : optuna.trial.Trial
        Optuna trial used to generate parameter suggestions.
    param : BaseParam
        Parameter definition describing the domain and sampling behavior.

    Returns
    -------
    Any
        Sampled value for the parameter.

    Raises
    ------
    TypeError
        If the parameter type is not supported.
    """

    name = param.name

    if isinstance(param, FixedParam):
        return param.value

    if isinstance(param, BoolParam):
        return trial.suggest_categorical(name, param.choices)

    if isinstance(param, CategoricalParam):
        return trial.suggest_categorical(name, param.choices)

    if isinstance(param, IntRangeParam):
        return trial.suggest_int(name, param.low, param.high, step=param.step)

    if isinstance(param, RealRangeParam):
        # Use step-aware sampling
        return trial.suggest_float(name, param.low, param.high)

    if isinstance(param, PipelineParam):
        selected_pipelines = []
        pass_id = 0
        pass_count = defaultdict(lambda: 0)
        for i in range(param.num_draws):
            pass_name = param.available_passes[pass_id]
            count = pass_count[pass_name]
            use_it = trial.suggest_categorical(f"use_{pass_name}_{count}", [0, 1])
            if not use_it:
                continue

            # Priority *within* this round
            prio = trial.suggest_int(f"prio_{pass_name}_{count}", 0, param.num_draws)

            # (round, prio, index, pass)
            selected_pipelines.append((pass_name, prio, pass_count[pass_name]))
            pass_count[pass_name] += 1
            pass_id = (pass_id + 1) % len(param.available_passes)

        selected_pipelines.sort(key=lambda x: (x[1], x[2]))
        concrete_passes = param.pass_manager.get_concrete_passes()
        return param.pass_manager.to_string(
            [concrete_passes[v[0]] for v in selected_pipelines]
        )

    raise TypeError(f"Unsupported parameter type in Optuna: {type(param)}")

sample_random_param(param)

Generate one random sample for a single parameter.

Parameters:

Name Type Description Default
param BaseParam

Parameter definition describing the domain.

required

Returns:

Type Description
Any

Randomly sampled value.

Raises:

Type Description
TypeError

If the parameter type is not supported.

Source code in python/mneme/tuning/search_space.py
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
def sample_random_param(param: BaseParam) -> Any:
    """
    Generate one random sample for a single parameter.

    Parameters
    ----------
    param : BaseParam
        Parameter definition describing the domain.

    Returns
    -------
    Any
        Randomly sampled value.

    Raises
    ------
    TypeError
        If the parameter type is not supported.
    """

    if isinstance(param, FixedParam):
        return param.value

    elif isinstance(param, BoolParam):
        return random.choice(param.choices)

    elif isinstance(param, CategoricalParam):
        return random.choice(param.choices)

    elif isinstance(param, IntRangeParam):
        low = param.low
        high = param.high
        step = param.step
        # Sample from {low, low+step, ..., high}
        n = ((high - low) // step) + 1
        idx = random.randrange(n)
        return low + idx * step
    elif isinstance(param, RealRangeParam):
        return random.uniform(param.low, param.high)
    elif isinstance(param, PipelineParam):
        if param.pipelines is not None:
            return random.choice(param.pipelines)
        return param.generator()

    else:
        raise TypeError(f"Unsupported parameter type: {type(param)}")

LLVM

ValueRef

Bases: ObjectRef

A weak reference to a LLVM value.

Source code in python/mneme/llvm/value.py
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
class ValueRef(ffi.ObjectRef):
    """A weak reference to a LLVM value."""

    def __init__(self, ptr, kind, parents):
        self._kind = kind
        self._parents = parents
        ffi.ObjectRef.__init__(self, ptr)

    def __str__(self):
        with ffi.OutputString() as outstr:
            ffi.lib.LLVMPY_PrintValueToString(self, outstr)
            return str(outstr)

    @property
    def module(self):
        """
        The module this function or global variable value was obtained from.
        """
        return self._parents.get("module")

    @property
    def function(self):
        """
        The function this argument or basic block value was obtained from.
        """
        return self._parents.get("function")

    @property
    def block(self):
        """
        The block this instruction value was obtained from.
        """
        return self._parents.get("block")

    @property
    def instruction(self):
        """
        The instruction this operand value was obtained from.
        """
        return self._parents.get("instruction")

    @property
    def is_global(self):
        return self._kind == "global"

    @property
    def is_function(self):
        return self._kind == "function"

    @property
    def is_block(self):
        return self._kind == "block"

    @property
    def is_argument(self):
        return self._kind == "argument"

    @property
    def is_instruction(self):
        return self._kind == "instruction"

    @property
    def alignment(self):
        """The alignment property."""
        return ffi.lib.LLVMPY_GetAlignment(self)

    @property
    def is_memory_instruction(self):
        if self._kind != "instruction":
            return False

        memory_instructions = ("alloca", "store", "load", "getelementptr")
        if self.opcode in memory_instructions:
            return True

        return False

    @property
    def is_operand(self):
        return self._kind == "operand"

    @property
    def is_constant(self):
        return bool(ffi.lib.LLVMPY_IsConstant(self))

    @property
    def value_kind(self):
        return ValueKind(ffi.lib.LLVMPY_GetValueKind(self))

    @property
    def name(self):
        return _decode_string(ffi.lib.LLVMPY_GetValueName(self))

    @name.setter
    def name(self, val):
        ffi.lib.LLVMPY_SetValueName(self, _encode_string(val))

    @property
    def linkage(self):
        if self.value_kind in (
            ValueKind.global_alias,
            ValueKind.global_ifunc,
            ValueKind.global_variable,
            ValueKind.function,
        ):
            return Linkage(ffi.lib.LLVMPY_GetLinkage(self))
        raise TypeError(
            f"expected global value, got {self}." f"ValueKind is {self.value_kind.name}"
        )

    @linkage.setter
    def linkage(self, value):
        if not isinstance(value, Linkage):
            value = Linkage[value]
        ffi.lib.LLVMPY_SetLinkage(self, value)

    @property
    def visibility(self):
        return Visibility(ffi.lib.LLVMPY_GetVisibility(self))

    @visibility.setter
    def visibility(self, value):
        if not isinstance(value, Visibility):
            value = Visibility[value]
        ffi.lib.LLVMPY_SetVisibility(self, value)

    @property
    def storage_class(self):
        return StorageClass(ffi.lib.LLVMPY_GetDLLStorageClass(self))

    @storage_class.setter
    def storage_class(self, value):
        if not isinstance(value, StorageClass):
            value = StorageClass[value]
        ffi.lib.LLVMPY_SetDLLStorageClass(self, value)

    def add_function_attribute(self, attr):
        """Only works on function value

        Parameters
        -----------
        attr : str
            attribute name
        """
        if not self.is_function:
            raise ValueError("expected function value, got %s" % (self._kind,))
        attrname = str(attr)
        attrval = ffi.lib.LLVMPY_GetEnumAttributeKindForName(
            _encode_string(attrname), len(attrname)
        )
        if attrval == 0:
            raise ValueError("no such attribute {!r}".format(attrname))
        ffi.lib.LLVMPY_AddFunctionAttr(self, attrval)

    def add_function_key_value_attribute(self, key, value):
        if not self.is_function:
            raise ValueError("expected function value, got %s" % (self._kind,))

        ffi.lib.LLVMPY_AddFunctionKeyValueAttr(
            self, _encode_string(key), len(key), _encode_string(value), len(value)
        )

    def get_function_location(self):
        if not self.is_function:
            raise ValueError("expected function value, got %s" % (self._kind,))
        return (
            _decode_string(ffi.lib.LLVMPY_GetFunctionDefinitionRoot(self)),
            _decode_string(ffi.lib.LLVMPY_GetFunctionDefinitionFileName(self)),
            int(ffi.lib.LLVMPY_GetFunctionLineLoc(self)),
        )

    @property
    def type(self):
        """
        This value's LLVM type.
        """
        # XXX what does this return?
        return TypeRef(ffi.lib.LLVMPY_TypeOf(self), self.module)

    @property
    def memory_type(self):
        """
        The memory type accessed by this instruction LLVM type.
        """
        if not self.is_memory_instruction:
            raise ValueError(
                "Argument is not  amemory instruciton {!r}".format(str(self))
            )

        return TypeRef(ffi.lib.LLVMPY_TypeOfMemory(self), self.module)

    @property
    def has_initializer(self):
        """
        Returns True if a global variable has an initializer.
        """
        if self.value_kind != ValueKind.global_variable:
            raise ValueError("expected global value, got %s" % (self._kind))
        return ffi.lib.LLVMPY_HasInitializer(self)

    @property
    def initializer(self):
        """
        Returns the initializer of a global variable.
        """
        if self.value_kind != ValueKind.global_variable:
            raise ValueError("expected global value, got %s" % (self._kind))
        if not self.has_initializer:
            return None
        return ValueRef(
            ffi.lib.LLVMPY_GetInitializer(self), "initializer", self._parents
        )

    @property
    def is_declaration(self):
        """
        Whether this value (presumably global) is defined in the current
        module.
        """
        if not (self.is_global or self.is_function):
            raise ValueError(
                "expected global or function value, got %s" % (self._kind,)
            )
        return ffi.lib.LLVMPY_IsDeclaration(self)

    @property
    def attributes(self):
        """
        Return an iterator over this value's attributes.
        The iterator will yield a string for each attribute.
        """
        return AttributeRef.attribute_iterator(self)

    @property
    def blocks(self):
        """
        Return an iterator over this function's blocks.
        The iterator will yield a ValueRef for each block.
        """
        if not self.is_function:
            raise ValueError("expected function value, got %s" % (self._kind,))
        it = ffi.lib.LLVMPY_FunctionBlocksIter(self)
        parents = self._parents.copy()
        parents.update(function=self)
        return _BlocksIterator(it, parents)

    @property
    def arguments(self):
        """
        Return an iterator over this function's arguments.
        The iterator will yield a ValueRef for each argument.
        """
        if not self.is_function:
            raise ValueError("expected function value, got %s" % (self._kind,))
        it = ffi.lib.LLVMPY_FunctionArgumentsIter(self)
        parents = self._parents.copy()
        parents.update(function=self)
        return _ArgumentsIterator(it, parents)

    @property
    def instructions(self):
        """
        Return an iterator over this block's instructions.
        The iterator will yield a ValueRef for each instruction.
        """
        if not self.is_block:
            raise ValueError("expected block value, got %s" % (self._kind,))
        it = ffi.lib.LLVMPY_BlockInstructionsIter(self)
        parents = self._parents.copy()
        parents.update(block=self)
        return _InstructionsIterator(it, parents)

    @property
    def operands(self):
        """
        Return an iterator over this instruction's operands.
        The iterator will yield a ValueRef for each operand.
        """
        if self.value_kind not in (
            ValueKind.constant_array,
            ValueKind.constant_vector,
            ValueKind.constant_struct,
            ValueKind.constant_data_array,
            ValueKind.constant_data_vector,
            ValueKind.global_alias,
            ValueKind.constant_expr,
            ValueKind.instruction,
            ValueKind.global_variable,
            ValueKind.constant_aggregate_zero,
            ValueKind.undef_value,
            ValueKind.constant_int,
            ValueKind.constant_fp,
            ValueKind.constant_pointer_null,
        ):
            raise ValueError(
                "expected instruction value, constant aggregate, or global."
                " Got %s %s" % (self._kind, self.value_kind.name)
            )

        if self.value_kind in (
            ValueKind.constant_data_array,
            ValueKind.constant_data_vector,
        ):
            it = ffi.lib.LLVMPY_ConstantDataIter(self)
            parents = self._parents.copy()
            parents.update(instruction=self)
            return _ConstantDataIterator(it, parents)

        it = ffi.lib.LLVMPY_OperandsIter(self)
        parents = self._parents.copy()
        parents.update(instruction=self)
        return _OperandsIterator(it, parents)

    @property
    def opcode(self):
        if not self.is_instruction:
            raise ValueError("expected instruction value, got %s" % (self._kind,))
        return ffi.ret_string(ffi.lib.LLVMPY_GetOpcodeName(self))

    @property
    def incoming_blocks(self):
        if not self.is_instruction or self.opcode != "phi":
            raise ValueError("expected phi instruction value, got %s" % (self._kind,))
        it = ffi.lib.LLVMPY_PhiIncomingBlocksIter(self)
        parents = self._parents.copy()
        parents.update(instruction=self)
        return _IncomingBlocksIterator(it, parents)

    @property
    def indices(self):
        if not self.is_instruction or self.opcode not in (
            "insertvalue",
            "extractvalue",
        ):
            raise ValueError(
                "expected insert/extractvalue value, got %s" % (self._kind,)
            )
        it = ffi.lib.LLVMPY_IndicesIter(self)
        parents = self._parents.copy()
        parents.update(instruction=self)
        return _IndicesIterator(it, parents)

    def get_constant_value(self, signed_int=False, round_fp=False):
        """
        Return the constant value, either as a literal (when supported)
        or as a string.

        Parameters
        -----------
        signed_int : bool
            if True and the constant is an integer, returns a signed version
        round_fp : bool
            if True and the constant is a floating point value, rounds the
            result upon accuracy loss (e.g., when querying an fp128 value).
            By default, raises an exception on accuracy loss
        """
        if not self.is_constant:
            raise ValueError("expected constant value, got %s" % (self._kind,))

        if self.value_kind == ValueKind.constant_int:
            # Python integers are also arbitrary-precision
            little_endian = c_bool(False)
            numbytes = self.type.type_width // 8
            ptr = ffi.lib.LLVMPY_GetConstantIntRawValue(self, byref(little_endian))
            asbytes = bytes(cast(ptr, POINTER(c_uint8 * numbytes)).contents)
            return int.from_bytes(
                asbytes,
                ("little" if little_endian.value else "big"),
                signed=signed_int,
            )
        elif self.value_kind == ValueKind.constant_fp:
            # Convert floating-point values to double-precision (Python float)
            accuracy_loss = c_bool(False)
            value = ffi.lib.LLVMPY_GetConstantFPValue(self, byref(accuracy_loss))
            if accuracy_loss.value and not round_fp:
                raise ValueError(
                    "Accuracy loss encountered in conversion of constant "
                    f"value {str(self)}"
                )

            return value
        elif self.value_kind == ValueKind.constant_expr:
            # Convert constant expressions to their corresponding operands
            return [op.get_constant_value(signed_int, round_fp) for op in self.operands]
        elif self.value_kind == ValueKind.global_variable:
            # Obtain constant value from global initializer
            return self.initializer.get_constant_value(signed_int, round_fp)
        elif self.value_kind in (
            ValueKind.constant_array,
            ValueKind.constant_vector,
            ValueKind.constant_struct,
        ):
            # Convert constant aggregates to lists
            return [op.get_constant_value(signed_int, round_fp) for op in self.operands]
        elif self.value_kind in (
            ValueKind.constant_data_array,
            ValueKind.constant_data_vector,
        ):
            # Try to get the value as a constant data (sequential)
            value = ffi.lib.LLVMPY_GetConstantDataAsString(self)
            if value:
                return ffi.ret_string(value)
            # Try to get sequence elements via a slower but safer route
            num_elements = ffi.lib.LLVMPY_GetConstantSequenceNumElements(self)
            return [
                ValueRef(
                    ffi.lib.LLVMPY_GetConstantSequenceElement(self, i),
                    self._kind,
                    self._parents,
                ).get_constant_value(signed_int, round_fp)
                for i in range(num_elements)
            ]
        elif self.value_kind in (ValueKind.function, ValueKind.basic_block):
            return self

        # Otherwise, return the IR string
        return str(self)

    def as_instruction(self):
        """
        Returns a constant expression value as an instruction.
        """
        if self.value_kind != ValueKind.constant_expr:
            raise ValueError("expected constant expr, got %s" % (self.value_kind))
        return ValueRef(
            ffi.lib.LLVMPY_ConstantExprAsInstruction(self), "instruction", self._parents
        )

alignment property

The alignment property.

arguments property

Return an iterator over this function's arguments. The iterator will yield a ValueRef for each argument.

attributes property

Return an iterator over this value's attributes. The iterator will yield a string for each attribute.

block property

The block this instruction value was obtained from.

blocks property

Return an iterator over this function's blocks. The iterator will yield a ValueRef for each block.

function property

The function this argument or basic block value was obtained from.

has_initializer property

Returns True if a global variable has an initializer.

initializer property

Returns the initializer of a global variable.

instruction property

The instruction this operand value was obtained from.

instructions property

Return an iterator over this block's instructions. The iterator will yield a ValueRef for each instruction.

is_declaration property

Whether this value (presumably global) is defined in the current module.

memory_type property

The memory type accessed by this instruction LLVM type.

module property

The module this function or global variable value was obtained from.

operands property

Return an iterator over this instruction's operands. The iterator will yield a ValueRef for each operand.

type property

This value's LLVM type.

add_function_attribute(attr)

Only works on function value

Parameters:

Name Type Description Default
attr str

attribute name

required
Source code in python/mneme/llvm/value.py
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
def add_function_attribute(self, attr):
    """Only works on function value

    Parameters
    -----------
    attr : str
        attribute name
    """
    if not self.is_function:
        raise ValueError("expected function value, got %s" % (self._kind,))
    attrname = str(attr)
    attrval = ffi.lib.LLVMPY_GetEnumAttributeKindForName(
        _encode_string(attrname), len(attrname)
    )
    if attrval == 0:
        raise ValueError("no such attribute {!r}".format(attrname))
    ffi.lib.LLVMPY_AddFunctionAttr(self, attrval)

as_instruction()

Returns a constant expression value as an instruction.

Source code in python/mneme/llvm/value.py
515
516
517
518
519
520
521
522
523
def as_instruction(self):
    """
    Returns a constant expression value as an instruction.
    """
    if self.value_kind != ValueKind.constant_expr:
        raise ValueError("expected constant expr, got %s" % (self.value_kind))
    return ValueRef(
        ffi.lib.LLVMPY_ConstantExprAsInstruction(self), "instruction", self._parents
    )

get_constant_value(signed_int=False, round_fp=False)

Return the constant value, either as a literal (when supported) or as a string.

Parameters:

Name Type Description Default
signed_int bool

if True and the constant is an integer, returns a signed version

False
round_fp bool

if True and the constant is a floating point value, rounds the result upon accuracy loss (e.g., when querying an fp128 value). By default, raises an exception on accuracy loss

False
Source code in python/mneme/llvm/value.py
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
def get_constant_value(self, signed_int=False, round_fp=False):
    """
    Return the constant value, either as a literal (when supported)
    or as a string.

    Parameters
    -----------
    signed_int : bool
        if True and the constant is an integer, returns a signed version
    round_fp : bool
        if True and the constant is a floating point value, rounds the
        result upon accuracy loss (e.g., when querying an fp128 value).
        By default, raises an exception on accuracy loss
    """
    if not self.is_constant:
        raise ValueError("expected constant value, got %s" % (self._kind,))

    if self.value_kind == ValueKind.constant_int:
        # Python integers are also arbitrary-precision
        little_endian = c_bool(False)
        numbytes = self.type.type_width // 8
        ptr = ffi.lib.LLVMPY_GetConstantIntRawValue(self, byref(little_endian))
        asbytes = bytes(cast(ptr, POINTER(c_uint8 * numbytes)).contents)
        return int.from_bytes(
            asbytes,
            ("little" if little_endian.value else "big"),
            signed=signed_int,
        )
    elif self.value_kind == ValueKind.constant_fp:
        # Convert floating-point values to double-precision (Python float)
        accuracy_loss = c_bool(False)
        value = ffi.lib.LLVMPY_GetConstantFPValue(self, byref(accuracy_loss))
        if accuracy_loss.value and not round_fp:
            raise ValueError(
                "Accuracy loss encountered in conversion of constant "
                f"value {str(self)}"
            )

        return value
    elif self.value_kind == ValueKind.constant_expr:
        # Convert constant expressions to their corresponding operands
        return [op.get_constant_value(signed_int, round_fp) for op in self.operands]
    elif self.value_kind == ValueKind.global_variable:
        # Obtain constant value from global initializer
        return self.initializer.get_constant_value(signed_int, round_fp)
    elif self.value_kind in (
        ValueKind.constant_array,
        ValueKind.constant_vector,
        ValueKind.constant_struct,
    ):
        # Convert constant aggregates to lists
        return [op.get_constant_value(signed_int, round_fp) for op in self.operands]
    elif self.value_kind in (
        ValueKind.constant_data_array,
        ValueKind.constant_data_vector,
    ):
        # Try to get the value as a constant data (sequential)
        value = ffi.lib.LLVMPY_GetConstantDataAsString(self)
        if value:
            return ffi.ret_string(value)
        # Try to get sequence elements via a slower but safer route
        num_elements = ffi.lib.LLVMPY_GetConstantSequenceNumElements(self)
        return [
            ValueRef(
                ffi.lib.LLVMPY_GetConstantSequenceElement(self, i),
                self._kind,
                self._parents,
            ).get_constant_value(signed_int, round_fp)
            for i in range(num_elements)
        ]
    elif self.value_kind in (ValueKind.function, ValueKind.basic_block):
        return self

    # Otherwise, return the IR string
    return str(self)

TypeRef

Bases: ObjectRef

A weak reference to a LLVM type

Source code in python/mneme/llvm/typeref.py
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
class TypeRef(ffi.ObjectRef):
    """A weak reference to a LLVM type"""

    def __init__(self, obj, mod):
        super().__init__(obj)
        self._module = mod

    @property
    def name(self):
        """
        Get type name
        """
        return ffi.ret_string(ffi.lib.LLVMPY_GetTypeName(self))

    @property
    def is_struct(self):
        """
        Returns true is the type is a struct type.
        """
        return ffi.lib.LLVMPY_TypeIsStruct(self)

    @property
    def is_pointer(self):
        """
        Returns true is the type is a pointer type.
        """
        return ffi.lib.LLVMPY_TypeIsPointer(self)

    @property
    def is_array(self):
        """
        Returns true is the type is an array type.
        """
        return ffi.lib.LLVMPY_TypeIsArray(self)

    @property
    def is_vector(self):
        """
        Returns true is the type is a vector type.
        """
        return ffi.lib.LLVMPY_TypeIsVector(self)

    @property
    def elements(self):
        """
        Returns iterator over enclosing types
        """
        return _TypeListIterator(ffi.lib.LLVMPY_ElementIter(self), self._module)

    #    @property
    #    def element_type(self):
    #        """
    #        Returns the pointed-to type. When the type is not a pointer,
    #        raises exception.
    #        """
    #        if not self.is_pointer:
    #            raise ValueError("Type {} is not a pointer".format(self))
    #        return TypeRef(ffi.lib.LLVMPY_GetElementType(self))

    @property
    def element_count(self):
        """
        Returns the number of elements in an array or a vector. For scalable
        vectors, returns minimum number of elements. When the type is neither
        an array nor a vector, raises exception.
        """
        if not self.is_array and not self.is_vector:
            raise ValueError("Type {} is not an array nor vector".format(self))
        return ffi.lib.LLVMPY_GetTypeElementCount(self)

    @property
    def type_width(self):
        """
        Return the basic size of this type if it is a primitive type. These are
        fixed by LLVM and are not target-dependent.
        This will return zero if the type does not have a size or is not a
        primitive type.

        If this is a scalable vector type, the scalable property will be set and
        the runtime size will be a positive integer multiple of the base size.

        Note that this may not reflect the size of memory allocated for an
        instance of the type or the number of bytes that are written when an
        instance of the type is stored to memory.
        """
        return ffi.lib.LLVMPY_GetTypeBitWidth(self)

    @property
    def system_type_width(self):
        """
        Return the basic size of this type if it is a primitive type. These is
        target-dependent.
        This will return zero if the type does not have a size or is not a
        primitive type.

        If this is a scalable vector type, the scalable property will be set and
        the runtime size will be a positive integer multiple of the base size.

        Note that this may not reflect the size of memory allocated for an
        instance of the type or the number of bytes that are written when an
        instance of the type is stored to memory.
        """
        return ffi.lib.LLVMPY_GetDLTypeBitWidth(self, self._module)

    @property
    def store_type_width(self):
        """
        Returns the maximum number of bytes that may be overwritten by
        storing the specified type.

        If this is a scalable vector type, the scalable property will be set and
        the runtime size will be a positive integer multiple of the base size.

        For example, returns 36 for i36 and 80 for x86_fp80. The type passed must
        have a size (Type::isSized() must return true)."""
        return ffi.lib.LLVMPY_GetDLStoreTypeBitWidth(self, self._module)

    @property
    def alloc_type_width(self):
        """
        Returns the offset in bytes between successive objects of the
        specified type, including alignment padding.

        If Ty is a scalable vector type, the scalable property will be set and
        the runtime size will be a positive integer multiple of the base size.

        This is the amount that alloca reserves for this type. For example,
        returns 12 or 16 for x86_fp80, depending on alignment.
        """
        return ffi.lib.LLVMPY_GetDLAllocTypeBitWidth(self, self._module)

    @property
    def type_kind(self):
        """
        Returns the LLVMTypeKind enumeration of this type.
        """
        return TypeKind(ffi.lib.LLVMPY_GetTypeKind(self))

    def __str__(self):
        return ffi.ret_string(ffi.lib.LLVMPY_PrintType(self))

alloc_type_width property

Returns the offset in bytes between successive objects of the specified type, including alignment padding.

If Ty is a scalable vector type, the scalable property will be set and the runtime size will be a positive integer multiple of the base size.

This is the amount that alloca reserves for this type. For example, returns 12 or 16 for x86_fp80, depending on alignment.

element_count property

Returns the number of elements in an array or a vector. For scalable vectors, returns minimum number of elements. When the type is neither an array nor a vector, raises exception.

elements property

Returns iterator over enclosing types

is_array property

Returns true is the type is an array type.

is_pointer property

Returns true is the type is a pointer type.

is_struct property

Returns true is the type is a struct type.

is_vector property

Returns true is the type is a vector type.

name property

Get type name

store_type_width property

Returns the maximum number of bytes that may be overwritten by storing the specified type.

If this is a scalable vector type, the scalable property will be set and the runtime size will be a positive integer multiple of the base size.

For example, returns 36 for i36 and 80 for x86_fp80. The type passed must have a size (Type::isSized() must return true).

system_type_width property

Return the basic size of this type if it is a primitive type. These is target-dependent. This will return zero if the type does not have a size or is not a primitive type.

If this is a scalable vector type, the scalable property will be set and the runtime size will be a positive integer multiple of the base size.

Note that this may not reflect the size of memory allocated for an instance of the type or the number of bytes that are written when an instance of the type is stored to memory.

type_kind property

Returns the LLVMTypeKind enumeration of this type.

type_width property

Return the basic size of this type if it is a primitive type. These are fixed by LLVM and are not target-dependent. This will return zero if the type does not have a size or is not a primitive type.

If this is a scalable vector type, the scalable property will be set and the runtime size will be a positive integer multiple of the base size.

Note that this may not reflect the size of memory allocated for an instance of the type or the number of bytes that are written when an instance of the type is stored to memory.

ModuleRef

Bases: ObjectRef

A reference to a LLVM module.

Source code in python/mneme/llvm/module.py
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
class ModuleRef(ffi.ObjectRef):
    """
    A reference to a LLVM module.
    """

    def __init__(self, module_ptr, context):
        super(ModuleRef, self).__init__(module_ptr)
        self._context = context

    def __str__(self):
        with ffi.OutputString() as outstr:
            ffi.lib.LLVMPY_PrintModuleToString(self, outstr)
            return str(outstr)

    def _dispose(self):
        self._capi.LLVMPY_DisposeModule(self)

    def get_function(self, name):
        """
        Get a ValueRef pointing to the function named *name*.
        NameError is raised if the symbol isn't found.
        """
        p = ffi.lib.LLVMPY_GetNamedFunction(self, _encode_string(name))
        if not p:
            raise NameError(name)
        return ValueRef(p, "function", dict(module=self))

    def get_global_variable(self, name):
        """
        Get a ValueRef pointing to the global variable named *name*.
        NameError is raised if the symbol isn't found.
        """
        p = ffi.lib.LLVMPY_GetNamedGlobalVariable(self, _encode_string(name))
        if not p:
            raise NameError(name)
        return ValueRef(p, "global", dict(module=self))

    def get_struct_type(self, name):
        """
        Get a TypeRef pointing to a structure type named *name*.
        NameError is raised if the struct type isn't found.
        """
        p = ffi.lib.LLVMPY_GetNamedStructType(self, _encode_string(name))
        if not p:
            raise NameError(name)
        return TypeRef(p, self)

    def verify(self):
        """
        Verify the module IR's correctness.  RuntimeError is raised on error.
        """
        with ffi.OutputString() as outmsg:
            if ffi.lib.LLVMPY_VerifyModule(self, outmsg):
                raise RuntimeError(str(outmsg))

    @property
    def name(self):
        """
        The module's identifier.
        """
        return _decode_string(ffi.lib.LLVMPY_GetModuleName(self))

    @name.setter
    def name(self, value):
        ffi.lib.LLVMPY_SetModuleName(self, _encode_string(value))

    @property
    def source_file(self):
        """
        The module's original source file name
        """
        return _decode_string(ffi.lib.LLVMPY_GetModuleSourceFileName(self))

    @property
    def data_layout(self):
        """
        This module's data layout specification, as a string.
        """
        # LLVMGetDataLayout() points inside a std::string managed by LLVM.
        with ffi.OutputString(owned=False) as outmsg:
            ffi.lib.LLVMPY_GetDataLayout(self, outmsg)
            return str(outmsg)

    @data_layout.setter
    def data_layout(self, strrep):
        ffi.lib.LLVMPY_SetDataLayout(self, create_string_buffer(strrep.encode("utf8")))

    @property
    def triple(self):
        """
        This module's target "triple" specification, as a string.
        """
        # LLVMGetTarget() points inside a std::string managed by LLVM.
        with ffi.OutputString(owned=False) as outmsg:
            ffi.lib.LLVMPY_GetTarget(self, outmsg)
            return str(outmsg)

    @triple.setter
    def triple(self, strrep):
        ffi.lib.LLVMPY_SetTarget(self, create_string_buffer(strrep.encode("utf8")))

    @property
    def global_variables(self):
        """
        Return an iterator over this module's global variables.
        The iterator will yield a ValueRef for each global variable.

        Note that global variables don't include functions
        (a function is a "global value" but not a "global variable" in
         LLVM parlance)
        """
        it = ffi.lib.LLVMPY_ModuleGlobalsIter(self)
        return _GlobalsIterator(it, dict(module=self))

    @property
    def functions(self):
        """
        Return an iterator over this module's functions.
        The iterator will yield a ValueRef for each function.
        """
        it = ffi.lib.LLVMPY_ModuleFunctionsIter(self)
        return _FunctionsIterator(it, dict(module=self))

    @property
    def struct_types(self):
        """
        Return an iterator over the struct types defined in
        the module. The iterator will yield a TypeRef.
        """
        it = ffi.lib.LLVMPY_ModuleTypesIter(self)
        return _TypesIterator(it, dict(module=self))

    @property
    def aliases(self):
        """
        Return an iterator over this module's function aliases.
        The iterator will yield a ValueRef for each alias.
        """
        it = ffi.lib.LLVMPY_ModuleAliasesIter(self)
        return _AliasesIterator(it, dict(module=self))

    @property
    def ifuncs(self):
        """
        Return an iterator over this module's ifuncs.
        The iterator will yield a ValueRef for each ifunc.
        """
        it = ffi.lib.LLVMPY_ModuleIFuncsIter(self)
        return _IFuncsIterator(it, dict(module=self))

    def clone(self):
        return ModuleRef(ffi.lib.LLVMPY_CloneModule(self), self._context)

    def to_bitcode(self, fn: str):
        ffi.lib.LLVMPY_WriteBitcode(self, _encode_string(fn))

aliases property

Return an iterator over this module's function aliases. The iterator will yield a ValueRef for each alias.

data_layout property writable

This module's data layout specification, as a string.

functions property

Return an iterator over this module's functions. The iterator will yield a ValueRef for each function.

global_variables property

Return an iterator over this module's global variables. The iterator will yield a ValueRef for each global variable.

Note that global variables don't include functions (a function is a "global value" but not a "global variable" in LLVM parlance)

ifuncs property

Return an iterator over this module's ifuncs. The iterator will yield a ValueRef for each ifunc.

name property writable

The module's identifier.

source_file property

The module's original source file name

struct_types property

Return an iterator over the struct types defined in the module. The iterator will yield a TypeRef.

triple property writable

This module's target "triple" specification, as a string.

get_function(name)

Get a ValueRef pointing to the function named name. NameError is raised if the symbol isn't found.

Source code in python/mneme/llvm/module.py
68
69
70
71
72
73
74
75
76
def get_function(self, name):
    """
    Get a ValueRef pointing to the function named *name*.
    NameError is raised if the symbol isn't found.
    """
    p = ffi.lib.LLVMPY_GetNamedFunction(self, _encode_string(name))
    if not p:
        raise NameError(name)
    return ValueRef(p, "function", dict(module=self))

get_global_variable(name)

Get a ValueRef pointing to the global variable named name. NameError is raised if the symbol isn't found.

Source code in python/mneme/llvm/module.py
78
79
80
81
82
83
84
85
86
def get_global_variable(self, name):
    """
    Get a ValueRef pointing to the global variable named *name*.
    NameError is raised if the symbol isn't found.
    """
    p = ffi.lib.LLVMPY_GetNamedGlobalVariable(self, _encode_string(name))
    if not p:
        raise NameError(name)
    return ValueRef(p, "global", dict(module=self))

get_struct_type(name)

Get a TypeRef pointing to a structure type named name. NameError is raised if the struct type isn't found.

Source code in python/mneme/llvm/module.py
88
89
90
91
92
93
94
95
96
def get_struct_type(self, name):
    """
    Get a TypeRef pointing to a structure type named *name*.
    NameError is raised if the struct type isn't found.
    """
    p = ffi.lib.LLVMPY_GetNamedStructType(self, _encode_string(name))
    if not p:
        raise NameError(name)
    return TypeRef(p, self)

verify()

Verify the module IR's correctness. RuntimeError is raised on error.

Source code in python/mneme/llvm/module.py
 98
 99
100
101
102
103
104
def verify(self):
    """
    Verify the module IR's correctness.  RuntimeError is raised on error.
    """
    with ffi.OutputString() as outmsg:
        if ffi.lib.LLVMPY_VerifyModule(self, outmsg):
            raise RuntimeError(str(outmsg))

parse_assembly(llvmir, context=None)

Create Module from a LLVM IR string

Source code in python/mneme/llvm/module.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
def parse_assembly(llvmir, context=None):
    """
    Create Module from a LLVM IR string
    """
    if context is None:
        context = get_global_context()
    llvmir = _encode_string(llvmir)
    strbuf = c_char_p(llvmir)
    with ffi.OutputString() as errmsg:
        mod = ModuleRef(ffi.lib.LLVMPY_ParseAssembly(context, strbuf, errmsg), context)
        if errmsg:
            mod.close()
            raise RuntimeError("LLVM IR parsing error\n{0}".format(errmsg))
    return mod

parse_bitcode(bitcode, context=None)

Create Module from a LLVM bitcode (a bytes object).

Source code in python/mneme/llvm/module.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
def parse_bitcode(bitcode, context=None):
    """
    Create Module from a LLVM *bitcode* (a bytes object).
    """
    if context is None:
        context = get_global_context()
    buf = c_char_p(bitcode)
    bufsize = len(bitcode)
    with ffi.OutputString() as errmsg:
        mod = ModuleRef(
            ffi.lib.LLVMPY_ParseBitcode(context, buf, bufsize, errmsg), context
        )
        if errmsg:
            mod.close()
            raise RuntimeError("LLVM bitcode parsing error\n{0}".format(errmsg))
    return mod