YOLOv8 Pose Estimation#

Introduction#

In this tutorial, we will show how to use the Acclerator API to perform real-time pose estimation on MX3 in Python and C++. We will use the YOLOv8m model, which is a relatively large model with great accuracy.

Note

This tutorial assumes a four-chip solution is correctly connected.

Download & Run

Download

This tutorial provides a high-level overview of the application’s key components. To run the full application, download the complete code package and the compiled DFP. After downloading, refer to the Run section below for step-by-step instructions.

Run

Requirements

Install the required dependencies:

pip install opencv-python==4.11.0.86
pip install ultralytics==8.3.161

Run Command

Run the Python script:

python run_pose_estimation.py

Requirements Install the required dependencies and build the project:

# Install OpenCV (ensure version compatibility)
sudo apt-get install libopencv-dev

# Build the C++ project
mkdir build && cd build
cmake ..
make

Run Command

Run the C++ executable:

./poseEstimation

1. Download the Model#

The YOLOv8m-pose pre-trained models are available on the Official YOLOv8m-pose GitHub page in the Pose Estimation section. For the sake of the tutorial, we exported and pre-compiled the model for the user to download. The model can be found in the compressed folder attached to this tutorial.

2. Compile the Model#

The YOLOv8m-pose model was exported with the option to include a post-processing section in the model graph. Hence, it needed to be compiled with the neural compiler autocrop option. After the compilation, the compiler will generate the DFP file for the main section of the model (YOLO_v8_medium_pose_640_640_3_onnx.dfp) and the cropped post-processing section of the model (YOLO_v8_medium_pose_640_640_3_onnx_post.onnx). The compilation step is typically needed once and can be done using the Neural Compiler API or Tool.

Hint

You can use the pre-compiled DFP and post-processing section attached for this tutorial and skip the compilation step.

from memryx import NeuralCompiler
nc = NeuralCompiler(num_chips=4, models="YOLO_v8_medium_pose_640_640_3_onnx.onnx", verbose=1, autocrop=True)
dfp = nc.run()

In your command line, you need to type,

mx_nc -v -m YOLO_v8_medium_pose_640_640_3_onnx.onnx --autocrop -c 4

This will produce a DFP file ready to be used by the accelerator. In your Python code, you need to point the dfp, post_model variables to the generated file paths.

dfp = "YOLO_v8_medium_pose_640_640_3_onnx.dfp"
post_model = "YOLO_v8_medium_pose_640_640_3_onnx_post.onnx"

In your Python code, you need to point the dfp, post_model variables to the generated file paths.

dfp = "YOLO_v8_medium_pose_640_640_3_onnx.dfp"
post_model = "YOLO_v8_medium_pose_640_640_3_onnx_post.onnx"

In your C++ code, you need to point the dfp and post-processing section via a generated file path,

// Model file paths
const fs::path modelPath = "YOLO_v8_medium_pose_640_640_3_onnx.dfp"; // Path to DFP model

3. CV Initializations#

We will import the needed libraries, intialize the CV pipeline, and define common variables in this step.

# OpenCV and helper libraries imports
import sys
import os
from pathlib import Path
from queue import Queue
import cv2 as cv
import numpy as np
from memryx import AsyncAccl
import torchvision.ops as ops
from typing import List
import argparse

# Connect to the camera and initialize the app
cam = cv.VideoCapture(0)
parent_path = Path(__file__).resolve().parent
model_input_shape = (640, 640)
app = App(cam, model_input_shape, mirror=False, src_is_cam=True)
#include <opencv2/opencv.hpp>    /* imshow */
#include <opencv2/imgproc.hpp>   /* cvtcolor */
#include <opencv2/imgcodecs.hpp> /* imwrite */
#include "memx/accl/MxAccl.h"

fs::path videoPath;
if(use_cam) { 
    std::cout << "use cam";
    vcap.open(0, cv::CAP_V4L2); // Open camera
} else {
    vcap.open(videoPath.c_str()); // Open video file
}

Along with necessary CV initialization, we also initalize necessary varibles for storing DFP model information, image manipulations, and FPS calculations.

Initialize model info variables - we get this information after connecting to the accelerator:

// Model information containers
MX::Types::MxModelInfo model_info; // Information for the DFP model
MX::Types::MxModelInfo post_model_info; // Information for post-processing model

Variables for image manipulations:

// Model input parameters
int model_input_width = 640; 
int model_input_height = 640; 
double origHeight = 0.0;  // Original frame height
double origWidth = 0.0;  // Original frame width

Variables used for pose keypoints calculations

float box_score = 0.25; // Threshold for box confidence
float rat = 0.0; // Aspect ratio used during resizing
float kpt_score = 0.5; // Threshold for keypoint confidence
float nms_thr = 0.2; // IoU threshold for Non-Maximum Suppression (NMS)
int dets_length = 8400; // Number of detections
int num_kpts = 17; // Number of keypoints in pose estimation

4. Define an Input Function#

We need to define an input function for the accelerator to use. In this case, our input function will get a new frame from the cam or video and pre-process it.

def preprocess_image(self, image):
    # Resize and pad the image to fit the model input shape
    h, w = image.shape[:2]
    r = min(self.model_input_shape[0] / h, self.model_input_shape[1] / w)
    image_resized = cv.resize(image, (int(w * r), int(h * r)), interpolation=cv.INTER_LINEAR)
    
    # Create a padded image
    padded_img = np.ones((self.model_input_shape[0], self.model_input_shape[1], 3), dtype=np.uint8) * 114
    padded_img[:int(h * r), :int(w * r)] = image_resized

    # Normalize image to [0, 1] range
    padded_img = padded_img / 255.0
    padded_img = padded_img.astype(np.float32)
    
    # Change the shape to (1, 3, 640, 640)
    padded_img = np.transpose(padded_img, (2, 0, 1))  # Change shape to (3, 640, 640)
    padded_img = np.expand_dims(padded_img, axis=0)  # Add batch dimension to make it (1, 3, 640, 640)
    
    return padded_img, r
// Callback function for processing input frames
bool incallback_getframe(std::vector<const MX::Types::FeatureMap*> dst, int streamLabel){

    if(runflag.load()){
        cv::Mat inframe;


	while(true){
            // Capture a frame from video/camera
            bool got_frame = vcap.read(inframe);

            if (!got_frame) {
                std::cout << "\n\n No frame - End of cam? \n\n\n";
                runflag.store(false);
                return false; 
            } else {
                // Push the captured frame to the queue
                {
                    std::lock_guard<std::mutex> flock(frameQueue_mutex);
		            // only push this frame if there's space
		            if(use_cam && (frames_queue.size() >= max_backlog)){
			            // drop it and capture the next frame instead
			            continue;
		            } else {
                        frames_queue.push_back(inframe);
		            }
                }

                // Convert frame to RGB and preprocess for model input
                cv::Mat rgbImage;
                cv::cvtColor(inframe, rgbImage, cv::COLOR_BGR2RGB);
                cv::Mat preProcframe;
                // cv::resize(rgbImage, preProcframe, 1.0, cv::Size(model_input_width, model_input_height), cv::Scalar(0, 0, 0), true, false);
                cv::dnn::blobFromImage(rgbImage, preProcframe, 1.0, cv::Size(model_input_width, model_input_height), cv::Scalar(0, 0, 0), true, false);
                cv::Mat floatImage;
                preProcframe.convertTo(floatImage, CV_32F,  1.0 / 255.0); // Normalize the frame

                // Set preprocessed input data to accelerator
                dst[0]->set_data((float*)floatImage.data);
                return true;
            }
	}
    } else {
        vcap.release(); // Release video resources if runflag is false
        return false;
    }
}

5. Define an Output Function#

We also need to define an output function for the accelerator to use. Our output function will post-process the accelerator output and display it on the screen.


def process_model_output(self, *ofmaps):
    # Process model output (keypoints and bounding boxes)
    predict = ofmaps[0].squeeze(0).T  # Shape: [8400, 56]
    predict = predict[predict[:, 4] > self.box_score, :]  # Filter boxes by confidence score
    scores = predict[:, 4]
    boxes = predict[:, 0:4] / self.ratio

    boxes = self.xywh2xyxy(boxes)  # Convert bounding box format

    # Process keypoints
    kpts = predict[:, 5:]
    for i in range(kpts.shape[0]):
        for j in range(kpts.shape[1] // 3):
            if kpts[i, 3*j+2] < self.kpt_score:  # Filter keypoints by confidence score
                kpts[i, 3*j: 3*(j+1)] = [-1, -1, -1]
            else:
                kpts[i, 3*j] /= self.ratio
                kpts[i, 3*j+1] /= self.ratio 
    idxes = self.nms_process(boxes, scores, self.nms_thr)  # Apply NMS
    result = {'boxes': boxes[idxes,: ].astype(int).tolist(),
              'kpts': kpts[idxes,: ].astype(float).tolist(),
              'scores': scores[idxes].tolist()}

    img = self.capture_queue.get()  # Get the frame from the queue
    self.capture_queue.task_done()

    # Draw keypoints and bounding boxes on the image
    color = (0,255,0)
    boxes, kpts, scores = result['boxes'], result['kpts'], result['scores']
    for  kpt, score in zip(kpts, scores):
        # Draw lines connecting keypoints
        for pair in self.KEYPOINT_PAIRS:
            pt1 = kpt[3 * pair[0]: 3 * (pair[0] + 1)]
            pt2 = kpt[3 * pair[1]: 3 * (pair[1] + 1)]
            if pt1[2] > 0 and pt2[2] > 0:
                cv.line(img, (int(pt1[0]), int(pt1[1])), (int(pt2[0]), int(pt2[1])), (255, 255, 255), 3)

        # Draw individual keypoints
        for idx in range(len(kpt) // 3):
            x, y, score = kpt[3*idx: 3*(idx+1)]
            if score > 0:
                cv.circle(img, (int(x), int(y)), 5, self.COLOR_LIST[idx % len(self.COLOR_LIST)], -1)

    self.show(img)  # Display the image
    return img
// Output callback function to process model output
bool outcallback_getmxaoutput(std::vector<const MX::Types::FeatureMap*> src, int streamLabel) {

    // Get data from the feature maps
    for(int i = 0; i < post_model_info.num_out_featuremaps; ++i) {
        src[i]->get_data(ofmap[i]);
    }

    // Get the input frame from the queue
    cv::Mat inframe;
    {
        std::lock_guard<std::mutex> flock(frameQueue_mutex);
        inframe = frames_queue.front();
        frames_queue.pop_front();
    }

    // Create a window for displaying results if not created yet
    if(!window_created) {
        cv::namedWindow("Pose Estimation", cv::WINDOW_NORMAL | cv::WINDOW_KEEPRATIO);
        cv::resizeWindow("Pose Estimation", cv::Size(origWidth, origHeight));
        cv::moveWindow("Pose Estimation", 0, 0);
        window_created = true;
    }

    // Process detection results and perform NMS
    std::vector<Box> all_boxes;
    std::vector<float> all_scores;
    std::vector<cv::Rect> cv_boxes;
    
    // Loop through each detection
    #pragma omp parallel for num_threads(2)
    for (int i = 0; i < dets_length; ++i) {

        // Extract bounding box coordinates and confidence score
        float x0 = ofmap[0][dets_length * 0 + i];
        float y0 = ofmap[0][dets_length * 1 + i];
        float w = ofmap[0][dets_length * 2 + i];
        float h = ofmap[0][dets_length * 3 + i];
        float confidence = ofmap[0][dets_length * 4 + i];

        if (confidence > box_score) {
            Box box;
            box.confidence = confidence;

            // Scale box coordinates back to original image size
            float y_factor = inframe.rows / float(model_input_height);
            float x_factor = inframe.cols / float(model_input_width);
            x0 = x0 * x_factor;
            y0 = y0 * y_factor;
            w = w * x_factor;
            h = h * y_factor;

            int x1 = (int)(x0 - 0.5 * w);
            int x2 = (int)(x0 + 0.5 * w);
            int y1 = (int)(y0 - 0.5 * h);
            int y2 = (int)(y0 + 0.5 * h);

            // Extract keypoints for pose estimation
            for (int j = 0; j < num_kpts; ++j) { 
                float kpt_x = ofmap[0][dets_length * (5 + j * 3) + i] * x_factor;
                float kpt_y = ofmap[0][dets_length * (5 + j * 3 + 1) + i] * y_factor;
                float kpt_conf = ofmap[0][dets_length * (5 + j * 3 + 2) + i];

                // Add keypoints if confidence is above threshold
                if (kpt_conf > kpt_score) {
                    box.keypoints.push_back(std::make_pair(kpt_x, kpt_y));
                } else {
                    box.keypoints.push_back(std::make_pair(-1, -1)); // Invalid keypoint
                }
            }

	    #pragma omp critical
	    {
            	all_boxes.push_back(box);
            	all_scores.push_back(confidence);
            	cv_boxes.push_back(cv::Rect(x1, y1, x2 - x1, y2 - y1));
	    }
        }
    }

    // Apply Non-Maximum Suppression (NMS) to reduce overlapping boxes
    std::vector<int> nms_result;
    cv::dnn::NMSBoxes(cv_boxes, all_scores, box_score, nms_thr, nms_result);

    // Keep only the filtered boxes after NMS
    std::vector<Box> filtered_boxes;
    for (int idx : nms_result) {
        filtered_boxes.push_back(all_boxes[idx]);
    }

    // Draw keypoints and connections (skeleton) on the frame
    for (const auto &box : filtered_boxes) {
        for (const auto &connection : KEYPOINT_PAIRS) {
            int idx1 = connection.first;
            int idx2 = connection.second;

            if (idx1 < box.keypoints.size() && idx2 < box.keypoints.size()) {
                auto kpt1 = box.keypoints[idx1];
                auto kpt2 = box.keypoints[idx2];

                if (kpt1.first != -1 && kpt1.second != -1 && kpt2.first != -1 && kpt2.second != -1) {
                    cv::line(inframe, cv::Point(kpt1.first, kpt1.second), cv::Point(kpt2.first, kpt2.second), cv::Scalar(255, 255, 255), 3);
                }
            }
        }

        // Draw individual keypoints
        for (int k = 0; k < box.keypoints.size(); ++k) {
            auto &kpt = box.keypoints[k];
            if (kpt.first != -1 && kpt.second != -1) {
                cv::circle(inframe, cv::Point(kpt.first, kpt.second), 4, COLOR_LIST[k % COLOR_LIST.size()], -1);
            }
        }
    }

    // Calculate FPS every AVG_FPS_CALC_FRAME_COUNT frames
    frame_count++;
    if (frame_count == 1) {
        start_ms = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch());
    } else if (frame_count % AVG_FPS_CALC_FRAME_COUNT == 0) {
        std::chrono::milliseconds duration = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()) - start_ms;
        fps_number = (float)AVG_FPS_CALC_FRAME_COUNT * 1000 / (float)(duration.count());
        sprintf(fps_text, "FPS = %.1f", fps_number);
        frame_count = 0;
    }

    // Display FPS on the frame
    cv::putText(inframe, fps_text, cv::Point2i(450, 30), cv::FONT_ITALIC, 0.8, cv::Scalar(255, 255, 255), 2);

    // Show the frame with keypoints and bounding boxes
    cv::imshow("Pose Estimation", inframe);

    // Exit if 'q' key is pressed
    if (cv::waitKey(1) == 'q') {
        runflag.store(false);
    }

    return true; 
}

6. Connect the Accelerator#

Now, all you need to do is to connect your input and output functions to the AsyncAccl API. The API will take care of the rest.

run_mxa(dfp, post_model, app):
# Initialize the accelerator and set up model paths
accl = AsyncAccl(dfp)
accl.set_postprocessing_model(post_model, model_idx=0)
accl.connect_input(app.generate_frame)
accl.connect_output(app.process_model_output)
accl.wait()  # Wait for the accelerator to finish

Here we also have a post-processing ONNX model, and :code: accl.connect_post_model will handle the post-processing of the output from the chip:

// Initialize the MemryX accelerator
MX::Runtime::MxAccl accl{fs::path(modelPath)};

accl.connect_post_model(fs::path(onnx_postprocessing_model_path)); // Connect the post-processing model
post_model_info = accl.get_post_model_info(0); // Get post-processing model info

model_info = accl.get_model_info(0); // Get main model info
model_input_height = model_info.in_featuremap_shapes[0][0]; // Set model input height
model_input_width = model_info.in_featuremap_shapes[0][1]; // Set model input width

Next, you need to connect your input and output functions to the AsyncAccl API. The output from the DFP and post-processing ONNX will then be directly obtained:

// Connect input and output streams to the accelerator
accl.connect_stream(&incallback_getframe, &outcallback_getmxaoutput, 10 /*unique stream ID*/, 0 /*Model ID*/);

std::cout << "Connected stream \n\n\n";
accl.start(); // Start inference
accl.wait();  // Wait for inference to complete
accl.stop();  // Stop the accelerator

The accelerator will automatically call the connected input and output functions in a fully pipelined fashion.

graph LR input([Input Function]) --> accl[Accelerator] accl --> output([Output Function]) style input fill:#CFE8FD, stroke:#595959 style accl fill:#FFE699, stroke:#595959 style output fill:#A9D18E, stroke:#595959

Third-Party Licenses#

This tutorial uses third-party software, models, and libraries. Below are the details of the licenses for these dependencies:

Summary#

This tutorial showed how to use a Accelerator API to run a real-time inference using a Pose Estimation model.