Accelerator API (Python)#
The Python Accelerator runtime API is separated into 3 distinct classes.
AsyncAccl is the primary Python API intended for realtime applications.
SyncAccl is for simple testing or batch processing of offline data.
MultiStreamAsyncAccl is an extension of AsyncAccl to make multi-camera scenarios easier to develop for.
- class AsyncAccl(dfp: str | Path | bytes | Dfp, device_ids: List[int] | int = [0], use_model_shape: Tuple[bool, bool] = (True, True), local_mode: bool = False, scheduler_options: SchedulerOptions = SchedulerOptions(frame_limit=600, time_limit=0, stop_on_empty=False, ifmap_queue_size=16, ofmap_queue_size=12), client_options: ClientOptions = ClientOptions(smoothing=False, fps_target=0.0), manager_addr: str = '/run/mxa_manager/', manager_port: int = 10000, ignore_manager: bool = False)#
This class provides an asynchronous API to run models on the MemryX hardware accelerator. The user provides callback functions to feed data and receive outputs from the accelerator, which are then called whenever a model is ready to accept/output data. This pipelines execution of the models and allows the accelerator to run at full speed.
- Parameters:
- dfpbytes or str
Path to the DFP file generated by the NeuralCompiler, or a byte array representing the DFP content.
- device_idslist of int, optional
List of MemryX device IDs to be used for executing the DFP. Default is [0].
- use_model_shapetuple of bool, optional
Tuple in the form (input_shape, output_shape). Specifies whether to enforce the original model input/output shapes (True) or use MXA runtime shapes (False). Default is (True, True).
- local_modebool, optional
If True, executes the DFP in local mode, which can improve performance for single-process use. Incompatible with multi-DFP and multi-process use. Default is False.
- scheduler_optionsSchedulerOptions, optional
Scheduler configuration corresponding to the SchedulerOptions struct:
frame_limit : int Number of frames to process before swapping out. Default 600.
time_limit : int Maximum time (in milliseconds) before swapping out. Default 0.
stop_on_empty : bool Whether to swap out if the input queue is empty. Default False.
ifmap_queue_size : int Size of the shared input feature map queue. Default 16.
ofmap_queue_size : int Size of the per-client output feature map queues. Default 12.
- client_optionsClientOptions, optional
Client runtime behavior configuration, corresponding to the ClientOptions struct:
smoothing : bool Whether to enable FPS smoothing. Default False.
fps_target : float Target frames per second. A delay of 1 / fps_target seconds is enforced between frames. Default 0.
- manager_addrstr, optional
Path to the mxa-manager socket. Needed in Docker or managed environments. Default is “/run/mxa_manager/”.
- manager_portint, optional
Port for mxa-manager connection (used primarily in containerized deployments). Default is 10000.
- ignore_managerbool, optional
If True, bypasses the manager and forces local mode. May cause crashes if multiple containers/processes attempt to use the same device. Default is False.
Warning
Setting ignore_manager to True disables coordination with other processes and may result in device conflicts if multiple clients or containers access the same device concurrently. Use with caution.
Examples
from tensorflow import keras import numpy as np from memryx import NeuralCompiler, AsyncAccl # Define the callback that will return model input data def data_source(): for i in range(10): img = np.random.rand(1, 224, 224, 3).astype(np.float32) data = keras.applications.mobilenet.preprocess_input(img) yield data # Define the callback that will process the outputs of the model outputs = [] def output_processor(*logits): logits = logits[0] preds = keras.applications.mobilenet.decode_predictions(logits) outputs.append(preds) # Compile a MobileNet model for testing. # Typically, comilation need to be done one time only. model = keras.applications.MobileNet() nc = NeuralCompiler(models=model) dfp = nc.run() # Accelerate using the MemryX hardware accl = AsyncAccl(dfp) # Starts asynchronous execution of input generating callback accl.connect_input(data_source) # Starts asynchronous execution of output processing callback accl.connect_output(output_processor) # Wait for the accelerator to finish execution accl.wait() # Explicitly free up hardware accl.shutdown()
- connect_input(callback, model_idx=0)#
Sets a callback function to execute when the accelerator is ready to begin processing an input frame for the specified model.
- Parameters:
- callbackcallable
A function or bound method that is invoked asynchronously when the accelerator signals readiness to process a new input frame for the model identified by model_idx.
The callback must take no arguments.
It must return either a single np.ndarray (for single-input models) or a sequence of np.ndarray objects (for multi-input models).
The returned arrays must match the data types and shapes expected by the model.
Returning None or raising an exception from the callback signals the end of the input stream for the model.
If a preprocessing model was configured via set_preprocessing, the output of callback will first be passed through that model before being sent to the accelerator.
- model_idxint, optional
Index of the target model to which the input callback should be bound. Default is 0.
- Raises:
- RuntimeError: If the signature of callback contains any paramters
- connect_output(callback, model_idx=0)#
Sets a callback function to execute when the outputs of the specified model are ready.
- Parameters:
- callbackcallable
A function or bound method that is invoked asynchronously when the accelerator finishes processing an input frame for the model identified by model_idx.
The arguments to callback must exactly match the number and order of output feature maps as defined by the model’s port configuration (retrievable via outport_assignment).
No additional parameters beyond the model outputs are allowed in the function signature.
If a post-processing model was previously set using set_postprocessing, the model’s raw outputs will first be passed through that post-processing model, and the resulting outputs will be passed to callback instead.
- model_idxint, optional
Index of the model whose outputs should be routed to callback. Default is 0.
- stop()#
Send a signal to stop each of the models running on the accelerator. This call blocks until each of the models stops and cleans up its resources.
- wait()#
Make the main thread wait for the accelerator to finish executing all models.
- Raises:
- RuntimeError: If the any of the model’s inputs/outputs are left unconnected
- set_preprocessing_model(model_or_path, model_idx=0)#
Supply the path to a model/file that should be run to pre-process the input feature map. This is an optional feature that can be used to automatically run the pre-processing model output by the NeuralCompiler
Note
This function currently does not support PyTorch models
Warning
This function is currently not available on the ARM platform
- Parameters:
- model_or_path: obj or str
Can be either an already loaded model such as a tf.keras.Model object for Keras, or a str path to a model file.
- model_idx: int
Index of the model on the accelerator whose input feature map should be pre-processed by the supplied model
- set_postprocessing_model(model_or_path, model_idx=0)#
Supply the path to a model/file that should be run to post-process the output feature maps This is an optional feature that can be used to automatically run the post-processing model output by the NeuralCompiler
Note
This function currently does not support PyTorch models
Warning
This function is currently not available on the ARM platform
- Parameters:
- model_or_path: obj or str
Can be either an already loaded model such as a tf.keras.Model object for Keras, or a string path to a model file.
- model_idx: int
Index of the model on the accelerator whose output should be post-processed by the supplied model
- inport_assignment(model_idx=0)#
Returns a dictionary which maps input port ids to model input layer names for the model specified by model_idx
- Parameters:
- model_idx: int
Index of the model whose input port assignment is returned
- outport_assignment(model_idx=0)#
Returns a dictionary which maps output port ids to model output layer names for the model specified by model_idx
- Parameters:
- model_idx: int
Index of the model whose output port assignment is returned
- class SyncAccl(dfp: str | Path | bytes | Dfp, device_ids: List[int] | int = [0], use_model_shape: Tuple[bool, bool] = (True, True), local_mode: bool = False, scheduler_options: SchedulerOptions = SchedulerOptions(frame_limit=600, time_limit=0, stop_on_empty=False, ifmap_queue_size=16, ofmap_queue_size=12), client_options: ClientOptions = ClientOptions(smoothing=False, fps_target=0.0), manager_addr: str = '/run/mxa_manager/', manager_port: int = 10000, ignore_manager: bool = False)#
This class provides a synchronous API for the MemryX hardware accelerator, which performs input and output sequentially per model. The accelerator is abstracted as a collection of models. You can select the desired model specifying its index to the member function.
- Parameters:
- dfpbytes or str
Path to the DFP file generated by the NeuralCompiler, or a byte array representing the DFP content.
- device_idslist of int, optional
List of MemryX device IDs to be used for executing the DFP. Default is [0].
- use_model_shapetuple of bool, optional
Tuple in the form (input_shape, output_shape). Specifies whether to enforce the original model input/output shapes (True) or use MXA runtime shapes (False). Default is (True, True).
- local_modebool, optional
If True, executes the DFP in local mode, which can improve performance for single-process use. Incompatible with multi-DFP and multi-process use. Default is False.
- scheduler_optionsSchedulerOptions, optional
Scheduler configuration corresponding to the SchedulerOptions struct:
frame_limit : int Number of frames to process before swapping out. Default 600.
time_limit : int Maximum time (in milliseconds) before swapping out. Default 0.
stop_on_empty : bool Whether to swap out if the input queue is empty. Default False.
ifmap_queue_size : int Size of the shared input feature map queue. Default 16.
ofmap_queue_size : int Size of the per-client output feature map queues. Default 12.
- client_optionsClientOptions, optional
Client runtime behavior configuration, corresponding to the ClientOptions struct:
smoothing : bool Whether to enable FPS smoothing. Default False.
fps_target : float Target frames per second. A delay of 1 / fps_target seconds is enforced between frames. Default 0.
- manager_addrstr, optional
Path to the mxa-manager socket. Needed in Docker or managed environments. Default is “/run/mxa_manager/”.
- manager_portint, optional
Port for mxa-manager connection (used primarily in containerized deployments). Default is 10000.
- ignore_managerbool, optional
If True, bypasses the manager and forces local mode. May cause crashes if multiple containers/processes attempt to use the same device. Default is False.
Warning
Setting ignore_manager to True disables coordination with other processes and may result in device conflicts if multiple clients or containers access the same device concurrently. Use with caution.
Examples
import tensorflow as tf import numpy as np from memryx import NeuralCompiler, SyncAccl # Compile a MobileNet model for testing. # Typically, comilation need to be done one time only. model = tf.keras.applications.MobileNet() nc = NeuralCompiler(models=model) dfp = nc.run() # Prepare the input data img = np.random.rand(1, 224, 224, 3).astype(np.float32) data = tf.keras.applications.mobilenet.preprocess_input(img) # Accelerate using the MemryX hardware accl = SyncAccl(dfp) outputs = accl.run(data) # Run sequential acceleration on the input data. # Explicitly free up hardware accl.shutdown()
Warning
MemryX accelerator is a streaming processor that the user can supply with pipelined input data. Using the synchronous API to perform sequential execution of multiple input frames may result in a significant performance penalty. The user is advised to use the send/receive functions on separate threads or to use the asynchronous API interface.
- send(data, model_idx=0, timeout=None)#
Sends input data to the accelerator for the specified model.
For the model identified by model_idx, this method transfers the input data to the accelerator by copying it into the model’s input buffer(s). If the input buffer(s) are full, the call blocks according to the value of timeout.
- Parameters:
- datanp.ndarray or sequence of np.ndarray
The input data to be transferred. This is typically the preprocessed input array (or list/tuple of arrays) expected by the model. For multi-input models, a sequence of arrays must be provided, with each array corresponding to an model input.
- model_idxint, optional
Index of the model to which the data should be sent. Default is 0.
- timeoutint or None, optional
The maximum time in milliseconds to block if the input buffer is full. If None (default), the call blocks indefinitely until space becomes available. If a positive integer is provided, the call blocks for up to timeout milliseconds, after which it raises a TimeoutError if space is still unavailable.
- Raises:
- TimeoutError
Raised if the input buffer does not become available within the specified timeout.
- receive(model_idx=0, timeout=None)#
Receives output data from the accelerator for the specified model.
For the model identified by model_idx, this method retrieves data from the accelerator’s output buffer(s). If no output data is currently available, the call will block according to the specified timeout policy.
- Parameters:
- model_idxint, optional
Index of the model from which output data should be retrieved. Default is 0.
- timeoutint or None, optional
Maximum number of milliseconds to wait if no output is immediately available. If None (default), blocks indefinitely until output data becomes available. If a positive integer is provided, the call will block for up to timeout milliseconds before raising a TimeoutError.
- Returns:
- np.ndarray or tuple of np.ndarray
The output data retrieved from the model. For single-output models, a single np.ndarray is returned. For multi-output models, a tuple of arrays is returned, with one entry per output of the specified model.
- Raises:
- TimeoutError
Raised if no output data becomes available within the specified timeout.
- run(inputs, model_idx=0)#
Sends input data to the specified model and retrieves the corresponding outputs.
This method combines send() and receive() into a single sequential operation. It transfers the provided input(s) to the accelerator and blocks until the corresponding outputs are ready.
- Parameters:
- inputsnp.ndarray or list of np.ndarray or list of list of np.ndarray
The preprocessed input data for the model. - A single np.ndarray is used for single-input models. - A list of np.ndarray is used for multi-input models. - A nested list of inputs (e.g., list of list of arrays) can be used to batch multiple input sets for better throughput. Each individual np.ndarray must match the input shape and data type expected by the model.
Note: Stacking multiple input sets into a single np.ndarray is not supported.
- model_idxint, optional
Index of the model to which the inputs should be sent. Default is 0.
- Returns:
- np.ndarray or tuple of np.ndarray
The model’s output data. A single np.ndarray is returned for models with one output, or a tuple of arrays for multi-output models.
- inport_assignment(model_idx=0)#
Returns a dictionary which maps input port ids to model input layer names for the model specified by model_idx
- Parameters:
- model_idx: int
Index of the model whose input port assignment is returned
- outport_assignment(model_idx=0)#
Returns a dictionary which maps output port ids to model output layer names for the model specified by model_idx
- Parameters:
- model_idx: int
Index of the model whose output port assignment is returned
- class MultiStreamAsyncAccl(dfp: str | Path | bytes | Dfp, device_ids: List[int] | int = [0], stream_workers: int = -1, use_model_shape: Tuple[bool, bool] = (True, True), local_mode: bool = False, scheduler_options: SchedulerOptions = SchedulerOptions(frame_limit=600, time_limit=0, stop_on_empty=False, ifmap_queue_size=16, ofmap_queue_size=12), client_options: ClientOptions = ClientOptions(smoothing=False, fps_target=0.0), manager_addr: str = '/run/mxa_manager/', manager_port: int = 10000, ignore_manager: bool = False)#
This class provides a multi-stream version of the AsyncAccl API. This allows multiple input+output callbacks to be associated with a single model.
- Parameters:
- dfpbytes or str
Path to the DFP file generated by the NeuralCompiler, or a byte array representing the DFP content.
- device_idslist of int, optional
List of MemryX device IDs to be used for executing the DFP. Default is [0].
- stream_workersint, optional
Number of worker threads to use for thread pooling multiple stream threads If -1, the number of workers is set to (CPU cores - 1), with a minimum of 2. This allows for efficient asynchronous data flow without overwhelming the system. Default is -1.
- use_model_shapetuple of bool, optional
Tuple in the form (input_shape, output_shape). Specifies whether to enforce the original model input/output shapes (True) or use MXA runtime shapes (False). Default is (True, True).
- local_modebool, optional
If True, executes the DFP in local mode, which can improve performance for single-process use. Incompatible with multi-DFP and multi-process use. Default is False.
- scheduler_optionsSchedulerOptions, optional
Scheduler configuration corresponding to the SchedulerOptions struct:
frame_limit : int Number of frames to process before swapping out. Default 600.
time_limit : int Maximum time (in milliseconds) before swapping out. Default 0.
stop_on_empty : bool Whether to swap out if the input queue is empty. Default False.
ifmap_queue_size : int Size of the shared input feature map queue. Default 16.
ofmap_queue_size : int Size of the per-client output feature map queues. Default 12.
- client_optionsClientOptions, optional
Client runtime behavior configuration, corresponding to the ClientOptions struct:
smoothing : bool Whether to enable FPS smoothing. Default False.
fps_target : float Target frames per second. A delay of 1 / fps_target seconds is enforced between frames. Default 0.
- manager_addrstr, optional
Path to the mxa-manager socket. Needed in Docker or managed environments. Default is “/run/mxa_manager/”.
- manager_portint, optional
Port for mxa-manager connection (used primarily in containerized deployments). Default is 10000.
- ignore_managerbool, optional
If True, bypasses the manager and forces local mode. May cause crashes if multiple containers/processes attempt to use the same device. Default is False.
Warning
Setting ignore_manager to True disables coordination with other processes and may result in device conflicts if multiple clients or containers access the same device concurrently. Use with caution.
Examples
from tensorflow import keras import numpy as np from memryx import NeuralCompiler, MultiStreamAsyncAccl class Application: def __init__(self): self.streams = [] self.streams_idx = [] self.outputs = [] for i in range(2): self.streams.append( [ np.random.rand(224, 224, 3).astype(np.float32) for _ in range(10) ] ) self.streams_idx.append(0) self.outputs.append([]) # Define the callback that will return model input data def data_source(self, stream_idx): # Generate inputs based on stream_idx if self.streams_idx[stream_idx] == len(self.streams[stream_idx]): return None self.streams_idx[stream_idx] += 1 return self.streams[stream_idx][self.streams_idx[stream_idx] - 1] # Define the callback that will process the outputs of the model def output_processor(self, stream_idx, *outputs): logits = np.squeeze(outputs[0], 0) preds = keras.applications.mobilenet.decode_predictions(logits) # Route outputs based on stream_idx self.outputs[stream_idx].append(preds) # Compile a MobileNet model for testing. # Typically, comilation need to be done one time only. model = keras.applications.MobileNet() nc = NeuralCompiler(models=model, verbose=1) dfp = nc.run() # Accelerate using the MemryX hardware app = Application() accl = MultiStreamAsyncAccl(dfp) # Starts asynchronous execution of input output callback pair associated with 2 streams accl.connect_streams(app.data_source, app.output_processor, 2) # Wait for the accelerator to finish execution accl.wait() # Explicitly free up hardware accl.shutdown()
- connect_streams(input_callback, output_callback, stream_count, model_idx=0)#
Registers and starts execution of a pair of input and output callback functions that process a specified number of data streams as defined by stream_count for a given model.
- Parameters:
- input_callbackcallable
A function or bound method responsible for supplying input data to the model identified by model_idx. It must accept exactly one argument:
stream_idx : int The index of the input stream, ranging from 0 to stream_count - 1, used by the application to select the appropriate data source.
- output_callbackcallable
A function or bound method invoked with the output feature maps generated by the model. It must accept at least two arguments:
stream_idx : int The index of the output stream (same as input).
*ofmaps : tuple or unpacked The output feature maps. The function can use a variadic signature (*ofmaps) or named parameters (e.g., fmap0, fmap1, …) depending on how many feature maps the model produces.
- stream_countint
The number of independent input feature map sources (streams) that the model will process.
- model_idxint, optional
The index of the model to associate with this input-output callback pair. Each stream for the specified model will be assigned a stream_idx in the range [0, stream_count - 1]. Default is 0.
- stop()#
Sends a signal to stop each of the models running on the accelerator. This call blocks until each of the models stops and cleans up its resources.
- wait()#
Blocks the application thread until the accelerator finishes executing all models.
- Raises:
- RuntimeError: If the any of the model’s inputs/outputs are left unconnected
Runtime Scheduler Configuration#
Some configuration can be passed to the runtime scheduler to control how the Accelerator API behaves relative to other programs or Accelerator API instances. Below is a usage example of how to configure the scheduler options when creating an instance of the Accelerator API.
from memryx import AsyncAccl
from memryx.accl import ClientOptions, SchedulerOptions
accl = AsyncAccl(
client_options=ClientOptions(),
scheduler_optionsSchedulerOptions()
)
- class ClientOptions(smoothing: bool = False, fps_target: float = 0.0)#
Client runtime behavior configuration.
- Parameters:
- smoothingbool, optional
Whether to enable FPS smoothing. Default is False.
- fps_targetfloat, optional
Target frames per second. A delay of 1 / fps_target seconds is enforced between frames. Default is 0.0.
- class SchedulerOptions(frame_limit: int = 600, time_limit: int = 0, stop_on_empty: bool = False, ifmap_queue_size: int = 16, ofmap_queue_size: int = 12)#
Configuration options for the scheduler.
- Parameters:
- frame_limitint, optional
Number of frames to process before swapping out. Default is 600.
- time_limitint, optional
Maximum time (in milliseconds) before swapping out. Default is 0.
- stop_on_emptybool, optional
Whether to swap out if the input queue is empty. Default is False.
- ifmap_queue_sizeint, optional
Size of the shared input feature map queue. Default is 16.
- ofmap_queue_sizeint, optional
Size of the per-client output feature map queues. Default is 12.