Streams#

The C++ runtime APIs and MultiStreamAsyncAccl Python API support multiple “streams”, providing a convenient way to handle multiple data sources (e.g. cameras) in parallel. This is particularly useful for applications that require real-time processing from multiple inputs.

Definition 💡

A stream refers to an input source + output destination pair, such as a camera input to a display window output.

Each stream is bound to its own set of input and output callbacks, which can either be the same functions (and use the stream_id parameter to differentiate between them) or completely separate functions for each stream.

Stream, Thread, Process?#

When designing a system that will process multiple input streams (e.g. cameras), the MemryX Runtime is best utilized with a single process that has connected multiple streams to a single DFP.

The streams will run in their own threads within that one process.

Note

If your target application has multiple input streams that will run through the same neural net model (thus the same DFP), the following situations would be suboptimal:

  • Separate processes for each stream

  • Multiple Accelerator objects for each stream, but in the same process

These would increase the overhead of MXA-Manager and cannot configure stream worker threads for optimizing CPU usage.

Connecting Streams#

Each stream needs to be connected to the accelerator object, registering its callbacks.

Different Callbacks#

Each stream can have its own input and output callback functions defined.

bool input_callback_1(vector<const FeatureMap*> input) {
    // Capture an input from camera 1
    input[0]->set_data(get_next_frame_camera_1());
    return true; // Return true to continue processing
}

void output_callback_1(const vector<FeatureMap*>& output) {
    // Process the output for camera 1
    process_output_camera_1(output[0]);
}

bool input_callback_2(vector<const FeatureMap*> input) {
    // Capture an input from camera 2
    input[0]->set_data(get_next_frame_camera_2());
    return true; // Return true to continue processing
}

void output_callback_2(const vector<FeatureMap*>& output) {
    // Process the output for camera 2
    process_output_camera_2(output[0]);
}

MxAccl accl("my_model.dfp");
accl.connect_stream(&input_callback_1, &output_callback_1, 0); // stream_id 0 for camera 1
accl.connect_stream(&input_callback_2, &output_callback_2, 1); // stream_id 1 for camera 2

// etc...
def input_callback_1():
    frame = get_next_frame_camera_1()  # Input for camera 1
    return frame

def output_callback_1(result):
    display_result_camera_1(result)  # Output for camera 1

def input_callback_2():
    frame = get_next_frame_camera_2()  # Input for camera 2
    return frame

def output_callback_2(result):
    display_result_camera_2(result)  # Output for camera 2

# connect once instance of the stream to each pair of callbacks
accl = MultiStreamAsyncAccl("my_model.dfp")
accl.connect_streams(input_callback_1, output_callback_1, stream_count=1)
accl.connect_streams(input_callback_2, output_callback_2, stream_count=1)

# etc...

Same Callbacks#

You can also use the same functions, but differentiate between streams using the stream ID parameter:

bool input_callback(vector<const FeatureMap*> input, int stream_id) {
    // Capture an input from some source based on the stream_id
    input[0]->set_data(get_next_frame(stream_id)); // Get the next frame for this stream
    return true; // Return true to continue processing
}

void output_callback(const vector<FeatureMap*>& output, int stream_id) {
    // Process the output for the specific stream
    process_output(output[0], stream_id); // Post-process the output and display, etc.
}

MxAccl accl("my_model.dfp");
accl.connect_stream(&input_callback, &output_callback, 0); // stream_id 0
accl.connect_stream(&input_callback, &output_callback, 1); // stream_id 1

// etc...
def input_callback(stream_id):
    frame = get_next_frame(stream_id)  # Input for specific stream
    return frame

def output_callback(result, stream_id):
    display_result(result, stream_id)  # Output for specific stream

accl = MultiStreamAsyncAccl("my_model.dfp")
# Creates two streams with IDs 0 and 1
accl.connect_streams(input_callback, output_callback, stream_count=2)
accl.wait()

Configuring Stream Worker Threads#

In the C++ runtime (not availabe in Python), each stream has a thread in a thread pool. This pool can be configured to values lower than the total number of streams, lessening CPU usage.

Hint

By default, the number of threads in the pool is equal to either the number of streams or the number of CPU cores, whichever is lower.

The number of threads can be configured separately for input and output callbacks, allowing for more flexibility in resource management depending on whether your pre-processing or post-processing is more CPU-intensive.

// create 32 streams
MxAccl accl("my_model.dfp");
for(int i=0; i < 32; i++){
    accl.connect_stream(&input_callback, &output_callback, i);
}

// Limit the stream threadpool to 2 input workers and 4 output workers
accl.set_num_workers(2, 4);

// Start
accl.start();

// etc...

See also

Be sure to check out the multi-stream YOLOv7t and YOLOv8 tutorials.