Accelerator API (Python)#

The Python Accelerator runtime API is separated into 3 distinct classes.

  • AsyncAccl is the primary Python API intended for realtime applications.

  • SyncAccl is for simple testing or batch processing of offline data.

  • MultiStreamAsyncAccl is an extension of AsyncAccl to make multi-camera scenarios easier to develop for.

class AsyncAccl(dfp: str | Path | bytes | Dfp, device_ids: List[int] | int = [0], use_model_shape: Tuple[bool, bool] = (True, True), local_mode: bool = False, scheduler_options: SchedulerOptions = SchedulerOptions(frame_limit=600, time_limit=0, stop_on_empty=False, ifmap_queue_size=16, ofmap_queue_size=12), client_options: ClientOptions = ClientOptions(smoothing=False, fps_target=0.0), manager_addr: str = '/run/mxa_manager/', manager_port: int = 10000, ignore_manager: bool = False)#

This class provides an asynchronous API to run models on the MemryX hardware accelerator. The user provides callback functions to feed data and receive outputs from the accelerator, which are then called whenever a model is ready to accept/output data. This pipelines execution of the models and allows the accelerator to run at full speed.

Parameters:
dfpbytes or str

Path to the DFP file generated by the NeuralCompiler, or a byte array representing the DFP content.

device_idslist of int, optional

List of MemryX device IDs to be used for executing the DFP. Default is [0].

use_model_shapetuple of bool, optional

Tuple in the form (input_shape, output_shape). Specifies whether to enforce the original model input/output shapes (True) or use MXA runtime shapes (False). Default is (True, True).

local_modebool, optional

If True, executes the DFP in local mode, which can improve performance for single-process use. Incompatible with multi-DFP and multi-process use. Default is False.

scheduler_optionsSchedulerOptions, optional

Scheduler configuration corresponding to the SchedulerOptions struct:

  • frame_limit : int Number of frames to process before swapping out. Default 600.

  • time_limit : int Maximum time (in milliseconds) before swapping out. Default 0.

  • stop_on_empty : bool Whether to swap out if the input queue is empty. Default False.

  • ifmap_queue_size : int Size of the shared input feature map queue. Default 16.

  • ofmap_queue_size : int Size of the per-client output feature map queues. Default 12.

client_optionsClientOptions, optional

Client runtime behavior configuration, corresponding to the ClientOptions struct:

  • smoothing : bool Whether to enable FPS smoothing. Default False.

  • fps_target : float Target frames per second. A delay of 1 / fps_target seconds is enforced between frames. Default 0.

manager_addrstr, optional

Path to the mxa-manager socket. Needed in Docker or managed environments. Default is “/run/mxa_manager/”.

manager_portint, optional

Port for mxa-manager connection (used primarily in containerized deployments). Default is 10000.

ignore_managerbool, optional

If True, bypasses the manager and forces local mode. May cause crashes if multiple containers/processes attempt to use the same device. Default is False.

Warning

Setting ignore_manager to True disables coordination with other processes and may result in device conflicts if multiple clients or containers access the same device concurrently. Use with caution.

Examples

from tensorflow import keras
import numpy as np
from memryx import NeuralCompiler, AsyncAccl

# Define the callback that will return model input data
def data_source():
    for i in range(10):
        img = np.random.rand(1, 224, 224, 3).astype(np.float32)
        data = keras.applications.mobilenet.preprocess_input(img)
        yield data

# Define the callback that will process the outputs of the model
outputs = []

def output_processor(*logits):
    logits = logits[0]
    preds = keras.applications.mobilenet.decode_predictions(logits)
    outputs.append(preds)

# Compile a MobileNet model for testing.
# Typically, comilation need to be done one time only.
model = keras.applications.MobileNet()
nc = NeuralCompiler(models=model)
dfp = nc.run()

# Accelerate using the MemryX hardware
accl = AsyncAccl(dfp)

# Starts asynchronous execution of input generating callback
accl.connect_input(data_source)

# Starts asynchronous execution of output processing callback
accl.connect_output(output_processor)

# Wait for the accelerator to finish execution
accl.wait()

# Explicitly free up hardware
accl.shutdown()
connect_input(callback, model_idx=0)#

Sets a callback function to execute when the accelerator is ready to begin processing an input frame for the specified model.

Parameters:
callbackcallable

A function or bound method that is invoked asynchronously when the accelerator signals readiness to process a new input frame for the model identified by model_idx.

  • The callback must take no arguments.

  • It must return either a single np.ndarray (for single-input models) or a sequence of np.ndarray objects (for multi-input models).

  • The returned arrays must match the data types and shapes expected by the model.

Returning None or raising an exception from the callback signals the end of the input stream for the model.

If a preprocessing model was configured via set_preprocessing, the output of callback will first be passed through that model before being sent to the accelerator.

model_idxint, optional

Index of the target model to which the input callback should be bound. Default is 0.

Raises:
RuntimeError: If the signature of callback contains any paramters
connect_output(callback, model_idx=0)#

Sets a callback function to execute when the outputs of the specified model are ready.

Parameters:
callbackcallable

A function or bound method that is invoked asynchronously when the accelerator finishes processing an input frame for the model identified by model_idx.

  • The arguments to callback must exactly match the number and order of output feature maps as defined by the model’s port configuration (retrievable via outport_assignment).

  • No additional parameters beyond the model outputs are allowed in the function signature.

If a post-processing model was previously set using set_postprocessing, the model’s raw outputs will first be passed through that post-processing model, and the resulting outputs will be passed to callback instead.

model_idxint, optional

Index of the model whose outputs should be routed to callback. Default is 0.

stop()#

Send a signal to stop each of the models running on the accelerator. This call blocks until each of the models stops and cleans up its resources.

wait()#

Make the main thread wait for the accelerator to finish executing all models.

Raises:
RuntimeError: If the any of the model’s inputs/outputs are left unconnected
set_preprocessing_model(model_or_path, model_idx=0)#

Supply the path to a model/file that should be run to pre-process the input feature map. This is an optional feature that can be used to automatically run the pre-processing model output by the NeuralCompiler

Note

This function currently does not support PyTorch models

Warning

This function is currently not available on the ARM platform

Parameters:
model_or_path: obj or str

Can be either an already loaded model such as a tf.keras.Model object for Keras, or a str path to a model file.

model_idx: int

Index of the model on the accelerator whose input feature map should be pre-processed by the supplied model

set_postprocessing_model(model_or_path, model_idx=0)#

Supply the path to a model/file that should be run to post-process the output feature maps This is an optional feature that can be used to automatically run the post-processing model output by the NeuralCompiler

Note

This function currently does not support PyTorch models

Warning

This function is currently not available on the ARM platform

Parameters:
model_or_path: obj or str

Can be either an already loaded model such as a tf.keras.Model object for Keras, or a string path to a model file.

model_idx: int

Index of the model on the accelerator whose output should be post-processed by the supplied model

inport_assignment(model_idx=0)#

Returns a dictionary which maps input port ids to model input layer names for the model specified by model_idx

Parameters:
model_idx: int

Index of the model whose input port assignment is returned

outport_assignment(model_idx=0)#

Returns a dictionary which maps output port ids to model output layer names for the model specified by model_idx

Parameters:
model_idx: int

Index of the model whose output port assignment is returned

class SyncAccl(dfp: str | Path | bytes | Dfp, device_ids: List[int] | int = [0], use_model_shape: Tuple[bool, bool] = (True, True), local_mode: bool = False, scheduler_options: SchedulerOptions = SchedulerOptions(frame_limit=600, time_limit=0, stop_on_empty=False, ifmap_queue_size=16, ofmap_queue_size=12), client_options: ClientOptions = ClientOptions(smoothing=False, fps_target=0.0), manager_addr: str = '/run/mxa_manager/', manager_port: int = 10000, ignore_manager: bool = False)#

This class provides a synchronous API for the MemryX hardware accelerator, which performs input and output sequentially per model. The accelerator is abstracted as a collection of models. You can select the desired model specifying its index to the member function.

Parameters:
dfpbytes or str

Path to the DFP file generated by the NeuralCompiler, or a byte array representing the DFP content.

device_idslist of int, optional

List of MemryX device IDs to be used for executing the DFP. Default is [0].

use_model_shapetuple of bool, optional

Tuple in the form (input_shape, output_shape). Specifies whether to enforce the original model input/output shapes (True) or use MXA runtime shapes (False). Default is (True, True).

local_modebool, optional

If True, executes the DFP in local mode, which can improve performance for single-process use. Incompatible with multi-DFP and multi-process use. Default is False.

scheduler_optionsSchedulerOptions, optional

Scheduler configuration corresponding to the SchedulerOptions struct:

  • frame_limit : int Number of frames to process before swapping out. Default 600.

  • time_limit : int Maximum time (in milliseconds) before swapping out. Default 0.

  • stop_on_empty : bool Whether to swap out if the input queue is empty. Default False.

  • ifmap_queue_size : int Size of the shared input feature map queue. Default 16.

  • ofmap_queue_size : int Size of the per-client output feature map queues. Default 12.

client_optionsClientOptions, optional

Client runtime behavior configuration, corresponding to the ClientOptions struct:

  • smoothing : bool Whether to enable FPS smoothing. Default False.

  • fps_target : float Target frames per second. A delay of 1 / fps_target seconds is enforced between frames. Default 0.

manager_addrstr, optional

Path to the mxa-manager socket. Needed in Docker or managed environments. Default is “/run/mxa_manager/”.

manager_portint, optional

Port for mxa-manager connection (used primarily in containerized deployments). Default is 10000.

ignore_managerbool, optional

If True, bypasses the manager and forces local mode. May cause crashes if multiple containers/processes attempt to use the same device. Default is False.

Warning

Setting ignore_manager to True disables coordination with other processes and may result in device conflicts if multiple clients or containers access the same device concurrently. Use with caution.

Examples

import tensorflow as tf
import numpy as np
from memryx import NeuralCompiler, SyncAccl

# Compile a MobileNet model for testing.
# Typically, comilation need to be done one time only.
model = tf.keras.applications.MobileNet()
nc = NeuralCompiler(models=model)
dfp = nc.run()

# Prepare the input data
img = np.random.rand(1, 224, 224, 3).astype(np.float32)
data = tf.keras.applications.mobilenet.preprocess_input(img)

# Accelerate using the MemryX hardware
accl = SyncAccl(dfp)
outputs = accl.run(data)  # Run sequential acceleration on the input data.

# Explicitly free up hardware
accl.shutdown()

Warning

MemryX accelerator is a streaming processor that the user can supply with pipelined input data. Using the synchronous API to perform sequential execution of multiple input frames may result in a significant performance penalty. The user is advised to use the send/receive functions on separate threads or to use the asynchronous API interface.

send(data, model_idx=0, timeout=None)#

Sends input data to the accelerator for the specified model.

For the model identified by model_idx, this method transfers the input data to the accelerator by copying it into the model’s input buffer(s). If the input buffer(s) are full, the call blocks according to the value of timeout.

Parameters:
datanp.ndarray or sequence of np.ndarray

The input data to be transferred. This is typically the preprocessed input array (or list/tuple of arrays) expected by the model. For multi-input models, a sequence of arrays must be provided, with each array corresponding to an model input.

model_idxint, optional

Index of the model to which the data should be sent. Default is 0.

timeoutint or None, optional

The maximum time in milliseconds to block if the input buffer is full. If None (default), the call blocks indefinitely until space becomes available. If a positive integer is provided, the call blocks for up to timeout milliseconds, after which it raises a TimeoutError if space is still unavailable.

Raises:
TimeoutError

Raised if the input buffer does not become available within the specified timeout.

receive(model_idx=0, timeout=None)#

Receives output data from the accelerator for the specified model.

For the model identified by model_idx, this method retrieves data from the accelerator’s output buffer(s). If no output data is currently available, the call will block according to the specified timeout policy.

Parameters:
model_idxint, optional

Index of the model from which output data should be retrieved. Default is 0.

timeoutint or None, optional

Maximum number of milliseconds to wait if no output is immediately available. If None (default), blocks indefinitely until output data becomes available. If a positive integer is provided, the call will block for up to timeout milliseconds before raising a TimeoutError.

Returns:
np.ndarray or tuple of np.ndarray

The output data retrieved from the model. For single-output models, a single np.ndarray is returned. For multi-output models, a tuple of arrays is returned, with one entry per output of the specified model.

Raises:
TimeoutError

Raised if no output data becomes available within the specified timeout.

run(inputs, model_idx=0)#

Sends input data to the specified model and retrieves the corresponding outputs.

This method combines send() and receive() into a single sequential operation. It transfers the provided input(s) to the accelerator and blocks until the corresponding outputs are ready.

Parameters:
inputsnp.ndarray or list of np.ndarray or list of list of np.ndarray

The preprocessed input data for the model. - A single np.ndarray is used for single-input models. - A list of np.ndarray is used for multi-input models. - A nested list of inputs (e.g., list of list of arrays) can be used to batch multiple input sets for better throughput. Each individual np.ndarray must match the input shape and data type expected by the model.

Note: Stacking multiple input sets into a single np.ndarray is not supported.

model_idxint, optional

Index of the model to which the inputs should be sent. Default is 0.

Returns:
np.ndarray or tuple of np.ndarray

The model’s output data. A single np.ndarray is returned for models with one output, or a tuple of arrays for multi-output models.

inport_assignment(model_idx=0)#

Returns a dictionary which maps input port ids to model input layer names for the model specified by model_idx

Parameters:
model_idx: int

Index of the model whose input port assignment is returned

outport_assignment(model_idx=0)#

Returns a dictionary which maps output port ids to model output layer names for the model specified by model_idx

Parameters:
model_idx: int

Index of the model whose output port assignment is returned

class MultiStreamAsyncAccl(dfp: str | Path | bytes | Dfp, device_ids: List[int] | int = [0], stream_workers: int = -1, use_model_shape: Tuple[bool, bool] = (True, True), local_mode: bool = False, scheduler_options: SchedulerOptions = SchedulerOptions(frame_limit=600, time_limit=0, stop_on_empty=False, ifmap_queue_size=16, ofmap_queue_size=12), client_options: ClientOptions = ClientOptions(smoothing=False, fps_target=0.0), manager_addr: str = '/run/mxa_manager/', manager_port: int = 10000, ignore_manager: bool = False)#

This class provides a multi-stream version of the AsyncAccl API. This allows multiple input+output callbacks to be associated with a single model.

Parameters:
dfpbytes or str

Path to the DFP file generated by the NeuralCompiler, or a byte array representing the DFP content.

device_idslist of int, optional

List of MemryX device IDs to be used for executing the DFP. Default is [0].

stream_workersint, optional

Number of worker threads to use for thread pooling multiple stream threads If -1, the number of workers is set to (CPU cores - 1), with a minimum of 2. This allows for efficient asynchronous data flow without overwhelming the system. Default is -1.

use_model_shapetuple of bool, optional

Tuple in the form (input_shape, output_shape). Specifies whether to enforce the original model input/output shapes (True) or use MXA runtime shapes (False). Default is (True, True).

local_modebool, optional

If True, executes the DFP in local mode, which can improve performance for single-process use. Incompatible with multi-DFP and multi-process use. Default is False.

scheduler_optionsSchedulerOptions, optional

Scheduler configuration corresponding to the SchedulerOptions struct:

  • frame_limit : int Number of frames to process before swapping out. Default 600.

  • time_limit : int Maximum time (in milliseconds) before swapping out. Default 0.

  • stop_on_empty : bool Whether to swap out if the input queue is empty. Default False.

  • ifmap_queue_size : int Size of the shared input feature map queue. Default 16.

  • ofmap_queue_size : int Size of the per-client output feature map queues. Default 12.

client_optionsClientOptions, optional

Client runtime behavior configuration, corresponding to the ClientOptions struct:

  • smoothing : bool Whether to enable FPS smoothing. Default False.

  • fps_target : float Target frames per second. A delay of 1 / fps_target seconds is enforced between frames. Default 0.

manager_addrstr, optional

Path to the mxa-manager socket. Needed in Docker or managed environments. Default is “/run/mxa_manager/”.

manager_portint, optional

Port for mxa-manager connection (used primarily in containerized deployments). Default is 10000.

ignore_managerbool, optional

If True, bypasses the manager and forces local mode. May cause crashes if multiple containers/processes attempt to use the same device. Default is False.

Warning

Setting ignore_manager to True disables coordination with other processes and may result in device conflicts if multiple clients or containers access the same device concurrently. Use with caution.

Examples

from tensorflow import keras
import numpy as np
from memryx import NeuralCompiler, MultiStreamAsyncAccl

class Application:
    def __init__(self):
        self.streams = []
        self.streams_idx = []
        self.outputs = []
        for i in range(2):
            self.streams.append(
                [
                    np.random.rand(224, 224, 3).astype(np.float32)
                    for _ in range(10)
                ]
            )
            self.streams_idx.append(0)
            self.outputs.append([])

    # Define the callback that will return model input data
    def data_source(self, stream_idx):
        # Generate inputs based on stream_idx
        if self.streams_idx[stream_idx] == len(self.streams[stream_idx]):
            return None
        self.streams_idx[stream_idx] += 1
        return self.streams[stream_idx][self.streams_idx[stream_idx] - 1]

    # Define the callback that will process the outputs of the model
    def output_processor(self, stream_idx, *outputs):
        logits = np.squeeze(outputs[0], 0)
        preds = keras.applications.mobilenet.decode_predictions(logits)
        # Route outputs based on stream_idx
        self.outputs[stream_idx].append(preds)

# Compile a MobileNet model for testing.
# Typically, comilation need to be done one time only.
model = keras.applications.MobileNet()
nc = NeuralCompiler(models=model, verbose=1)
dfp = nc.run()

# Accelerate using the MemryX hardware
app = Application()
accl = MultiStreamAsyncAccl(dfp)

# Starts asynchronous execution of input output callback pair associated with 2 streams
accl.connect_streams(app.data_source, app.output_processor, 2)

# Wait for the accelerator to finish execution
accl.wait()

# Explicitly free up hardware
accl.shutdown()
connect_streams(input_callback, output_callback, stream_count, model_idx=0)#

Registers and starts execution of a pair of input and output callback functions that process a specified number of data streams as defined by stream_count for a given model.

Parameters:
input_callbackcallable

A function or bound method responsible for supplying input data to the model identified by model_idx. It must accept exactly one argument:

  • stream_idx : int The index of the input stream, ranging from 0 to stream_count - 1, used by the application to select the appropriate data source.

output_callbackcallable

A function or bound method invoked with the output feature maps generated by the model. It must accept at least two arguments:

  • stream_idx : int The index of the output stream (same as input).

  • *ofmaps : tuple or unpacked The output feature maps. The function can use a variadic signature (*ofmaps) or named parameters (e.g., fmap0, fmap1, …) depending on how many feature maps the model produces.

stream_countint

The number of independent input feature map sources (streams) that the model will process.

model_idxint, optional

The index of the model to associate with this input-output callback pair. Each stream for the specified model will be assigned a stream_idx in the range [0, stream_count - 1]. Default is 0.

stop()#

Sends a signal to stop each of the models running on the accelerator. This call blocks until each of the models stops and cleans up its resources.

wait()#

Blocks the application thread until the accelerator finishes executing all models.

Raises:
RuntimeError: If the any of the model’s inputs/outputs are left unconnected

Runtime Scheduler Configuration#

Some configuration can be passed to the runtime scheduler to control how the Accelerator API behaves relative to other programs or Accelerator API instances. Below is a usage example of how to configure the scheduler options when creating an instance of the Accelerator API.

from memryx import AsyncAccl
from memryx.accl import ClientOptions, SchedulerOptions

accl = AsyncAccl(
    client_options=ClientOptions(),
    scheduler_optionsSchedulerOptions()
)
class ClientOptions(smoothing: bool = False, fps_target: float = 0.0)#

Client runtime behavior configuration.

Parameters:
smoothingbool, optional

Whether to enable FPS smoothing. Default is False.

fps_targetfloat, optional

Target frames per second. A delay of 1 / fps_target seconds is enforced between frames. Default is 0.0.

class SchedulerOptions(frame_limit: int = 600, time_limit: int = 0, stop_on_empty: bool = False, ifmap_queue_size: int = 16, ofmap_queue_size: int = 12)#

Configuration options for the scheduler.

Parameters:
frame_limitint, optional

Number of frames to process before swapping out. Default is 600.

time_limitint, optional

Maximum time (in milliseconds) before swapping out. Default is 0.

stop_on_emptybool, optional

Whether to swap out if the input queue is empty. Default is False.

ifmap_queue_sizeint, optional

Size of the shared input feature map queue. Default is 16.

ofmap_queue_sizeint, optional

Size of the per-client output feature map queues. Default is 12.