Callback Functions#

With the asynchronous code structure, manually managing the input and output threads can be cumbersome. With the MemryX Runtime, the preferred way to handle this is to use callback functions. These are user-defined functions that will be called automatically by the Runtime when new the chip is ready for new input or has output available.

Description#

For example, an input callback function might capture an image from a camera, while an output callback function might display the results on a screen. The Runtime will handle the threading and calling of these functions automatically, enabling the hardware to run in a fully pipelined fashion.

def input_callback():
    frame = get_next_frame()   # Input
    return frame

def output_callback(result):
    display_result(result)     # Output

# Connect the callbacks to the Accelerator
accl = AsyncAccl("my_model.dfp")
accl.connect_input(input_callback)
accl.connect_output(output_callback)

# Main thread can continue doing whatever else now

(AsyncAccl API reference)

bool input_callback(vector<const FeatureMap*> &input) {
    // Capture an input from some source
    // Here we use an OpenCV::Mat as an example
    cv::Mat img = get_next_frame();

    // Set the data for input feature map 0
    input[0]->set_data(img.data);

    // Return true to indicate success
    // Return false to indicate program shutdown
    return true;
}

bool output_callback(vector<const FeatureMap*> &output) {
    // Have a destination for the output data (OpenCV::Mat in this case)
    cv::Mat result(my_ofmap_size);

    // Get the data from the runtime
    output[0]->get_data(result.data);

    // Display the results on a screen
    display_result(result);
}

// Connect the callbacks to the Accelerator
MX::Runtime::MxAccl accl("my_model.dfp");
accl.connect_stream(&input_callback, &output_callback);
accl.start();

// Main thread can continue doing whatever else now

(MxAccl API reference)

In this pseudo-realistic example, the input_callback function will be called automatically by the MemryX runtime when the chip is ready for new input. The runtime will then stream the data to and from the chip using its own internal threads. Then the output_callback function will be called when the chip has finished processing the input and has output available.

Tips & Tricks#

Original Image Queue#

In the common case of combining inference output and the original captured image, use a queue that exists outside of the callback functions, and push/pop to it within the callbacks.

For example, in Python:

from queue import Queue

orig_frame_queue = Queue()

def input_callback():
    frame = get_next_frame()      # Input
    orig_frame_queue.put(frame)   # Store the frame in the queue
    return frame                  # Gets sent to the accelerator

def output_callback(result):
    original_frame = orig_frame_queue.get()  # Get the original frame from the queue
    draw_results(original_frame, result)     # Draw inference results (e.g. bounding boxes) on the original image

accl = AsyncAccl("my_model.dfp")
accl.connect_input(input_callback)
accl.connect_output(output_callback)
accl.wait()

Warning

Don’t Break The Pipeline!

With global queues, be careful you are not creating synchronization points between the input and output threads, that could block them from running in parallel!

For example, do not have a CPU/GPU-style synchronous loop elsewhere in your code, such as:

to_input_queue = Queue()
from_output_queue = Queue()

def input_callback():
    frame = to_input_queue.get()      # Get the frame from the input queue
    return frame                      # Send to the accelerator

def output_callback(result):
    from_output_queue.put(result)     # Put the result in the output queue

accl = AsyncAccl("my_model.dfp")
accl.connect_input(input_callback)
accl.connect_output(output_callback)

while True:
    frame = get_next_frame()          # Get next input from, e.g. a camera
    to_input_queue.put(frame)         # Add to input callback queue
    result = from_output_queue.get()  # Wait for output callback's data **BLOCKING!!**
    draw_results(frame, result)       # Draw results on the original frame

The above code will block the input thread from running while waiting for the output thread to finish, effectively breaking the pipeline and negating the benefits of using callbacks.

Many Streams, Same Callbacks#

If you have multiple streams of data (e.g. multiple cameras), you can either define multiple callback functions for each stream, or use a single pair of callbacks that utilize the stream_id callback parameter.

In C++, this is part of the regular MxAccl API, while in Python you’ll need to use the MultiStreamAsyncAccl variant of the AsyncAccl class.

def input_callback(stream_id):
    frame = get_next_frame(stream_id)  # Input for specific stream
    return frame

def output_callback(result, stream_id):
    display_result(result, stream_id)  # Output for specific stream

accl = MultiStreamAsyncAccl("my_model.dfp")
# Creates two streams with IDs 0 and 1
accl.connect_streams(input_callback, output_callback, stream_count=2)
accl.wait()
bool input_callback(vector<const FeatureMap*> input, int stream_id) {
    // Capture an input from some source based on the stream_id
    cv::Mat img = get_next_frame(stream_id);
    input[0]->set_data(img.data);
    return true;
}

bool output_callback(vector<const FeatureMap*> output, int stream_id) {
    cv::Mat result(my_ofmap_size);
    output[0]->get_data(result.data);
    display_result(result, stream_id);
}

MX::Runtime::MxAccl accl("my_model.dfp");
// Create two streams with IDs 0 and 1
accl.connect_stream(&input_callback, &output_callback, 0);
accl.connect_stream(&input_callback, &output_callback, 1);
accl.start();
accl.wait();