YOLOv8 Pose Estimation#

Introduction#

In this tutorial, we will show how to use the Acclerator API to perform real-time pose estimation on MX3 in Python and C++. We will use the YOLOv8m model, which is a relatively large model with great accuracy.

Note

This tutorial assumes a four-chip solution is correctly connected.

Download the Model#

The YOLOv8m-pose pre-trained models are available on the Official YOLOv8m-pose GitHub page in the Pose Estimation section. For the sake of the tutorial, we exported the model for the user to download. The model can be found in the compressed folder attached to this tutorial.

Compile the Model#

The YOLOv8m-pose model was exported with the option to include a post-processing section in the model graph. Hence, it needed to be compiled with the neural compiler autocrop option. After the compilation, the compiler will generate the DFP file for the main section of the model (yolov8m-pose.dfp) and the cropped post-processing section of the model (yolov8m-pose_post.onnx). The compilation step is typically needed once and can be done using the Neural Compiler API or Tool.

Hint

You can use the pre-compiled DFP and post-processing section attached to this tutorial and skip the compilation step.

from memryx import NeuralCompiler
nc = NeuralCompiler(num_chips=4, models="yolov8m-pose.onnx", verbose=1, dfp_fname = "yolov8m-pose", autocrop=True)
dfp = nc.run()

In your command line, you need to type,

mx_nc -v -m yolov8m-pose.onnx --autocrop -c 4

This will produce a DFP file ready to be used by the accelerator. In your Python code, you need to point the dfp variable to the generated file path.

dfp = "yolov8m-pose.dfp"

In your Python code, you need to point the dfp variable to the generated file path,

dfp = "yolov8m-pose.dfp"

In your C++ code, you need to point the dfp via a generated file path,

const fs::path videoPath = "../Friends.mp4"; // Video file path

CV Initializations#

We will import the needed libraries, intialize the CV pipeline, and define common variables in this step.

# OpenCV and helper libraries imports
import sys
import os
from pathlib import Path
from queue import Queue
import cv2 as cv
import numpy as np
from memryx import AsyncAccl
import torchvision.ops as ops
from typing import List
# Connect to the cam and get its properties
cam = cv.VideoCapture('/dev/video0')
parent_path = Path(__file__).resolve().parent
model_input_shape = (640, 640)
app = App(cam, model_input_shape, mirror=False)
#include <opencv2/opencv.hpp>    /* imshow */
#include <opencv2/imgproc.hpp>   /* cvtcolor */
#include <opencv2/imgcodecs.hpp> /* imwrite */
#include "memx/accl/MxAccl.h"

const fs::path videoPath = "../Friends.mp4"; // Video file path
if(use_cam) { 
    std::cout << "use cam";
    vcap.open(0, cv::CAP_V4L2); 
} else {
    vcap.open(videoPath.c_str()); 
}

Along with necessary CV initialization we also initalize necessary varibles for storing DFP model information, image manipulations and fps calculations

initialize model info variable. we get this info after connecting to the accelarator.

// Model information
MX::Types::MxModelInfo model_info; // dfp model info
MX::Types::MxModelInfo post_model_info; // Post-processing model info

variables for image manipulations

int model_input_width = 640; 
int model_input_height = 640; 
double origHeight = 0.0;  // Original frame height
double origWidth = 0.0;  // Original frame width

variables used for pose keypoints calculations

float box_score = 0.25; // Box confidence threshold
float rat = 0.0; // Aspect ratio for resizing
float kpt_score = 0.5; // Keypoint confidence threshold
float nms_thr = 0.2; // Non-Maximum Suppression threshold
int dets_length = 8400; // values per detection 
int num_kpts = 17; //no of keypoints

Define an Input Function#

We need to define an input function for the accelerator to use. In this case, our input function will get a new frame from the cam and pre-process it.

def preprocess_image(self, image):
    
    h, w = image.shape[:2]
    r = min(self.model_input_shape[0] / h, self.model_input_shape[1] / w)
    image_resized = cv.resize(image, (int(w * r), int(h * r)), interpolation=cv.INTER_LINEAR)
    
    # Create a padded image
    padded_img = np.ones((self.model_input_shape[0], self.model_input_shape[1], 3), dtype=np.uint8) * 114
    padded_img[:int(h * r), :int(w * r)] = image_resized

    # Normalize image
    padded_img = padded_img / 255.0
    padded_img = padded_img.astype(np.float32)
    
    # Add new axis and change shape to (640, 640, 1, 3)
    padded_img = np.expand_dims(padded_img, axis=2)  # Adding new axis at the third position
    
    return padded_img,r
bool incallback_getframe(vector<const MX::Types::FeatureMap<float>*> dst, int streamLabel){

    if(runflag.load()){

        cv::Mat inframe;

        bool got_frame = vcap.read(inframe); // Capture a frame from video/camera

        if (!got_frame) {
            std::cout << "\n\n No frame - End of cam? \n\n\n";
            runflag.store(false);
            return false; 
        }

        else{
            // Put the frame in the cap_queue to be overlayed later
            {
                std::lock_guard<std::mutex> flock(frameQueue_mutex); // Lock the queue
                frames_queue.push_back(inframe); // Push the captured frame to the queue
            }
            cv::Mat rgbImage;
            cv::cvtColor(inframe, rgbImage, cv::COLOR_BGR2RGB);
            // Preprocess frame
            cv::Mat preProcframe;
            cv::resize(rgbImage,preProcframe,cv::Size(model_input_width,model_input_height),cv::INTER_LINEAR); // Resize frame to model input size
            cv::Mat floatImage;
            preProcframe.convertTo(floatImage, CV_32F,  1.0 / 255.0); // Normalize frame to float

            // Set preprocessed input data to be sent to accelarator
            dst[0]->set_data((float*)floatImage.data, false);

            return true;
        }           
    }
    else{
        vcap.release(); // Release video capture resources
        return false;
    }    
}

Define an Output Function#

We also need to define an out function for the accelerator to use. Our output function will post-process the accelerator output and display it on the screen.

def process_model_output(self, *ofmaps):
    
    predict = ofmaps[0].squeeze(0).T #8400,56
    predict = predict[predict[:, 4] > self.box_score, :]
    scores = predict[:, 4]
    boxes = predict[:, 0:4] / self.ratio

    boxes = self.xywh2xyxy(boxes)

    kpts = predict[:, 5:]
    for i in range(kpts.shape[0]):
        for j in range(kpts.shape[1] // 3):
            if kpts[i, 3*j+2] < self.kpt_score:
                kpts[i, 3*j: 3*(j+1)] = [-1, -1, -1]
            else:
                kpts[i, 3*j] /= self.ratio
                kpts[i, 3*j+1] /= self.ratio 
    idxes = self.nms_process(boxes, scores, self.nms_thr)
    result = {'boxes': boxes[idxes,: ].astype(int).tolist(),
              'kpts': kpts[idxes,: ].astype(float).tolist(),
              'scores': scores[idxes].tolist()}

    img = self.capture_queue.get()

    color = (0,255,0)

    boxes, kpts, scores = result['boxes'], result['kpts'], result['scores']
    for  kpt, score in zip(kpts, scores):

        # Draw keypoints and lines
        # Connect keypoints with lines
        for pair in self.KEYPOINT_PAIRS:
            pt1 = kpt[3 * pair[0]: 3 * (pair[0] + 1)]
            pt2 = kpt[3 * pair[1]: 3 * (pair[1] + 1)]
            if pt1[2] > 0 and pt2[2] > 0:
                cv.line(img, (int(pt1[0]), int(pt1[1])), (int(pt2[0]), int(pt2[1])), (255, 255, 255), 3)

        for idx in range(len(kpt) // 3):
            x, y, score = kpt[3*idx: 3*(idx+1)]
            if score > 0:
                cv.circle(img, (int(x), int(y)), 5, self.COLOR_LIST[idx % len(self.COLOR_LIST)], -1)

    self.show(img)
    return img
// Output callback function
bool outcallback_getmxaoutput(vector<const MX::Types::FeatureMap<float>*> src, int streamLabel) {

    // Extract feature maps from the accelerator output
    for(int i = 0; i < post_model_info.num_out_featuremaps; ++i) {
        src[i]->get_data(ofmap[i], false);
    }

    cv::Mat inframe;
    {
        std::lock_guard<std::mutex> flock(frameQueue_mutex); // Lock the frame queue
        inframe = frames_queue.front(); // Get the first frame from the queue
        frames_queue.pop_front(); // Remove the frame from the queue
    }

    // Create display window if not already created
    if(!window_created) {
        cv::namedWindow("Pose Estimation", cv::WINDOW_NORMAL | cv::WINDOW_KEEPRATIO);
        cv::resizeWindow("Pose Estimation", cv::Size(origWidth, origHeight));
        cv::moveWindow("Pose Estimation", 0, 0);
        window_created = true;
    }  

    std::vector<Box> all_boxes;
    std::vector<float> all_scores;
    std::vector<cv::Rect> cv_boxes;
         
    for (int i = 0; i < dets_length; ++i) {

        float x0 = ofmap[0][dets_length * 0 + i]; // Extract x-coordinate of the bounding box center
        float y0 = ofmap[0][dets_length * 1 + i]; // Extract y-coordinate of the bounding box center
        float w = ofmap[0][dets_length * 2 + i];  // Extract width of the bounding box
        float h = ofmap[0][dets_length * 3 + i];  // Extract height of the bounding box
        float confidence = ofmap[0][dets_length * 4 + i]; // Extract confidence score of the detection


        if (confidence > box_score) { // Check if confidence exceeds threshold
            Box box;
            box.confidence = confidence;

            // Scale box coordinates back to the original image size
            float y_factor = inframe.rows / float(model_input_height); // Calculate scaling factor for height
            float x_factor = inframe.cols / float(model_input_width);  // Calculate scaling factor for width
            x0 = x0 * x_factor; // Scale x-coordinate
            y0 = y0 * y_factor; // Scale y-coordinate
            w = w * x_factor;   // Scale width
            h = h * y_factor;   // Scale height

            int x1 = (int)(x0 - 0.5 * w); // Calculate top-left x-coordinate
            int x2 = (int)(x0 + 0.5 * w); // Calculate bottom-right x-coordinate
            int y1 = (int)(y0 - 0.5 * h); // Calculate top-left y-coordinate
            int y2 = (int)(y0 + 0.5 * h); // Calculate bottom-right y-coordinate

            // Extract keypoints
            for (int j = 0; j < num_kpts; ++j) { 
                float kpt_x = ofmap[0][dets_length * (5 + j * 3) + i] * x_factor; // Scale keypoint x-coordinate
                float kpt_y = ofmap[0][dets_length * (5 + j * 3 + 1) + i] * y_factor; // Scale keypoint y-coordinate
                float kpt_conf = ofmap[0][dets_length * (5 + j * 3 + 2) + i]; // Get keypoint confidence

                if (kpt_conf > kpt_score) { // Check if keypoint confidence exceeds threshold
                    box.keypoints.push_back(std::make_pair(kpt_x, kpt_y)); // Add valid keypoint
                } else {
                    box.keypoints.push_back(std::make_pair(-1, -1)); // Add invalid keypoint as placeholder
                }
            }

            all_boxes.push_back(box); // Store the box with keypoints
            all_scores.push_back(confidence); // Store the box confidence score
            cv_boxes.push_back(cv::Rect(x1, y1, x2 - x1, y2 - y1)); // Store bounding box for NMS
        }
    }

    // Apply Non-Maximum Suppression (NMS) to filter overlapping boxes
    std::vector<int> nms_result;
    cv::dnn::NMSBoxes(cv_boxes, all_scores, box_score, nms_thr, nms_result);

    std::vector<Box> filtered_boxes;
    for (int idx : nms_result) {
        filtered_boxes.push_back(all_boxes[idx]); // Keep only the boxes after NMS
    }

    // Draw keypoints and connections on the frame
    for (const auto &box : filtered_boxes) {
        for (const auto &connection : KEYPOINT_PAIRS) {
            int idx1 = connection.first;
            int idx2 = connection.second;

            if (idx1 < box.keypoints.size() && idx2 < box.keypoints.size()) {
                auto kpt1 = box.keypoints[idx1];
                auto kpt2 = box.keypoints[idx2];

                if (kpt1.first != -1 && kpt1.second != -1 && kpt2.first != -1 && kpt2.second != -1) {
                    cv::line(inframe, cv::Point(kpt1.first, kpt1.second), cv::Point(kpt2.first, kpt2.second), cv::Scalar(255, 255, 255), 3);
                }
            }
        }
        
        // Draw keypoints on the frame
        for (int k = 0; k < box.keypoints.size(); ++k) {
            auto &kpt = box.keypoints[k];
            if (kpt.first != -1 && kpt.second != -1) {
                cv::circle(inframe, cv::Point(kpt.first, kpt.second), 4, COLOR_LIST[k % COLOR_LIST.size()], -1);
            }
        }
    }

    //Calulate FPS once every AVG_FPS_CALC_FRAME_COUNT frames     
    frame_count++;
    if (frame_count == 1)
    {
        start_ms = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch());
    }
    else if (frame_count % AVG_FPS_CALC_FRAME_COUNT == 0)
    {
        std::chrono::milliseconds duration =
            std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()) - start_ms;
        fps_number = (float)AVG_FPS_CALC_FRAME_COUNT * 1000 / (float)(duration.count());
        sprintf(fps_text, "FPS = %.1f", fps_number);
        frame_count = 0;
    }

    //Write FPS values on the display image
    cv::putText(inframe,fps_text,
        cv::Point2i(450, 30), // origin of text (bottom left of textbox)
        cv::FONT_ITALIC,
        0.8, // font scale
        cv::Scalar(255, 255, 255), // color (green)
        2 // thickness
    );

    // Display the frame with the detected keypoints and connections
    cv::imshow("Pose Estimation", inframe);
    
    if (cv::waitKey(1) == 'q') {
        runflag.store(false);
    }

    return true; 
}

Connect the Accelerator#

Now, all you need to do is to connect your input and output functions to the AsyncAccl API. The API will take care of the rest.

accl = AsyncAccl(dfp)
accl.set_postprocessing_model('yolov8m-pose_post.onnx', model_idx=0)
accl.connect_input(app.generate_frame)
accl.connect_output(app.process_model_output)
accl.wait()

Here we also have a post-processing ONNX model, and accl.connect_post_model will handle the post-processing the onnx part of the output from the chip.

MX::Runtime::MxAccl accl;
accl.connect_dfp(modelPath); // Initialize the accelerator with the model

accl.connect_post_model(onnx_postprocessing_model_path, 0); // Connect post-processing model
post_model_info = accl.get_post_model_info(0); // Get post-processing model info

Next, you need to connect your input and output functions to the AsyncAccl API. The output from the DFP and post-processing ONNX will then be directly obtained.

accl.connect_stream(&incallback_getframe, &outcallback_getmxaoutput, 10 /*unique stream ID*/, 0 /*Model ID*/); // Connect input and output streams

std::cout << "Connected stream \n\n\n";
accl.start(); // Start inference
accl.wait();  // Wait for inference to complete
accl.stop();  // Stop the accelerator

The accelerator will automatically call the connected input and output functions in a fully pipelined fashion.

graph LR input([Input Function]) --> accl[Accelerator] accl --> output([Output Function]) style input fill:#CFE8FD, stroke:#595959 style accl fill:#FFE699, stroke:#595959 style output fill:#A9D18E, stroke:#595959

Third-Party Licenses#

This tutorial uses third-party software, models, and libraries. Below are the details of the licenses for these dependencies:

Summary#

This tutorial showed how to use a Accelerator API to run a real-time inference using a Pose Estimation model. The full code and the compiled DFP are available for download.