YOLOv8 Pose Estimation#
Introduction#
In this tutorial, we will show how to use the Acclerator API to perform real-time pose estimation on MX3 in Python and C++. We will use the YOLOv8m model, which is a relatively large model with great accuracy.
Note
This tutorial assumes a four-chip solution is correctly connected.
Download the Model#
The YOLOv8m-pose pre-trained models are available on the Official YOLOv8m-pose GitHub page in the Pose Estimation section. For the sake of the tutorial, we exported the model for the user to download. The model can be found in the compressed folder attached
to this tutorial.
Compile the Model#
The YOLOv8m-pose model was exported with the option to include a post-processing section in the model graph. Hence, it needed to be compiled with the neural compiler autocrop
option. After the compilation, the compiler will generate the DFP file for the main section of the model (yolov8m-pose.dfp
) and the cropped post-processing section of the model (yolov8m-pose_post.onnx
). The compilation step is typically needed once and can be done using the Neural Compiler API or Tool.
Hint
You can use the pre-compiled DFP and post-processing section attached
to this tutorial and skip the compilation step.
from memryx import NeuralCompiler
nc = NeuralCompiler(num_chips=4, models="yolov8m-pose.onnx", verbose=1, dfp_fname = "yolov8m-pose", autocrop=True)
dfp = nc.run()
In your command line, you need to type,
mx_nc -v -m yolov8m-pose.onnx --autocrop -c 4
This will produce a DFP file ready to be used by the accelerator. In your Python code, you need to point the dfp
variable to the generated file path.
dfp = "yolov8m-pose.dfp"
In your Python code, you need to point the dfp
variable to the generated file path,
dfp = "yolov8m-pose.dfp"
In your C++ code, you need to point the dfp
via a generated file path,
const fs::path videoPath = "../Friends.mp4"; // Video file path
CV Initializations#
We will import the needed libraries, intialize the CV pipeline, and define common variables in this step.
# OpenCV and helper libraries imports
import sys
import os
from pathlib import Path
from queue import Queue
import cv2 as cv
import numpy as np
from memryx import AsyncAccl
import torchvision.ops as ops
from typing import List
# Connect to the cam and get its properties
cam = cv.VideoCapture('/dev/video0')
parent_path = Path(__file__).resolve().parent
model_input_shape = (640, 640)
app = App(cam, model_input_shape, mirror=False)
#include <opencv2/opencv.hpp> /* imshow */
#include <opencv2/imgproc.hpp> /* cvtcolor */
#include <opencv2/imgcodecs.hpp> /* imwrite */
#include "memx/accl/MxAccl.h"
const fs::path videoPath = "../Friends.mp4"; // Video file path
if(use_cam) {
std::cout << "use cam";
vcap.open(0, cv::CAP_V4L2);
} else {
vcap.open(videoPath.c_str());
}
Along with necessary CV initialization we also initalize necessary varibles for storing DFP model information, image manipulations and fps calculations
initialize model info variable. we get this info after connecting to the accelarator.
// Model information
MX::Types::MxModelInfo model_info; // dfp model info
MX::Types::MxModelInfo post_model_info; // Post-processing model info
variables for image manipulations
int model_input_width = 640;
int model_input_height = 640;
double origHeight = 0.0; // Original frame height
double origWidth = 0.0; // Original frame width
variables used for pose keypoints calculations
float box_score = 0.25; // Box confidence threshold
float rat = 0.0; // Aspect ratio for resizing
float kpt_score = 0.5; // Keypoint confidence threshold
float nms_thr = 0.2; // Non-Maximum Suppression threshold
int dets_length = 8400; // values per detection
int num_kpts = 17; //no of keypoints
Define an Input Function#
We need to define an input function for the accelerator to use. In this case, our input function will get a new frame from the cam and pre-process it.
def preprocess_image(self, image):
h, w = image.shape[:2]
r = min(self.model_input_shape[0] / h, self.model_input_shape[1] / w)
image_resized = cv.resize(image, (int(w * r), int(h * r)), interpolation=cv.INTER_LINEAR)
# Create a padded image
padded_img = np.ones((self.model_input_shape[0], self.model_input_shape[1], 3), dtype=np.uint8) * 114
padded_img[:int(h * r), :int(w * r)] = image_resized
# Normalize image
padded_img = padded_img / 255.0
padded_img = padded_img.astype(np.float32)
# Add new axis and change shape to (640, 640, 1, 3)
padded_img = np.expand_dims(padded_img, axis=2) # Adding new axis at the third position
return padded_img,r
bool incallback_getframe(vector<const MX::Types::FeatureMap<float>*> dst, int streamLabel){
if(runflag.load()){
cv::Mat inframe;
bool got_frame = vcap.read(inframe); // Capture a frame from video/camera
if (!got_frame) {
std::cout << "\n\n No frame - End of cam? \n\n\n";
runflag.store(false);
return false;
}
else{
// Put the frame in the cap_queue to be overlayed later
{
std::lock_guard<std::mutex> flock(frameQueue_mutex); // Lock the queue
frames_queue.push_back(inframe); // Push the captured frame to the queue
}
cv::Mat rgbImage;
cv::cvtColor(inframe, rgbImage, cv::COLOR_BGR2RGB);
// Preprocess frame
cv::Mat preProcframe;
cv::resize(rgbImage,preProcframe,cv::Size(model_input_width,model_input_height),cv::INTER_LINEAR); // Resize frame to model input size
cv::Mat floatImage;
preProcframe.convertTo(floatImage, CV_32F, 1.0 / 255.0); // Normalize frame to float
// Set preprocessed input data to be sent to accelarator
dst[0]->set_data((float*)floatImage.data, false);
return true;
}
}
else{
vcap.release(); // Release video capture resources
return false;
}
}
Define an Output Function#
We also need to define an out function for the accelerator to use. Our output function will post-process the accelerator output and display it on the screen.
def process_model_output(self, *ofmaps):
predict = ofmaps[0].squeeze(0).T #8400,56
predict = predict[predict[:, 4] > self.box_score, :]
scores = predict[:, 4]
boxes = predict[:, 0:4] / self.ratio
boxes = self.xywh2xyxy(boxes)
kpts = predict[:, 5:]
for i in range(kpts.shape[0]):
for j in range(kpts.shape[1] // 3):
if kpts[i, 3*j+2] < self.kpt_score:
kpts[i, 3*j: 3*(j+1)] = [-1, -1, -1]
else:
kpts[i, 3*j] /= self.ratio
kpts[i, 3*j+1] /= self.ratio
idxes = self.nms_process(boxes, scores, self.nms_thr)
result = {'boxes': boxes[idxes,: ].astype(int).tolist(),
'kpts': kpts[idxes,: ].astype(float).tolist(),
'scores': scores[idxes].tolist()}
img = self.capture_queue.get()
color = (0,255,0)
boxes, kpts, scores = result['boxes'], result['kpts'], result['scores']
for kpt, score in zip(kpts, scores):
# Draw keypoints and lines
# Connect keypoints with lines
for pair in self.KEYPOINT_PAIRS:
pt1 = kpt[3 * pair[0]: 3 * (pair[0] + 1)]
pt2 = kpt[3 * pair[1]: 3 * (pair[1] + 1)]
if pt1[2] > 0 and pt2[2] > 0:
cv.line(img, (int(pt1[0]), int(pt1[1])), (int(pt2[0]), int(pt2[1])), (255, 255, 255), 3)
for idx in range(len(kpt) // 3):
x, y, score = kpt[3*idx: 3*(idx+1)]
if score > 0:
cv.circle(img, (int(x), int(y)), 5, self.COLOR_LIST[idx % len(self.COLOR_LIST)], -1)
self.show(img)
return img
// Output callback function
bool outcallback_getmxaoutput(vector<const MX::Types::FeatureMap<float>*> src, int streamLabel) {
// Extract feature maps from the accelerator output
for(int i = 0; i < post_model_info.num_out_featuremaps; ++i) {
src[i]->get_data(ofmap[i], false);
}
cv::Mat inframe;
{
std::lock_guard<std::mutex> flock(frameQueue_mutex); // Lock the frame queue
inframe = frames_queue.front(); // Get the first frame from the queue
frames_queue.pop_front(); // Remove the frame from the queue
}
// Create display window if not already created
if(!window_created) {
cv::namedWindow("Pose Estimation", cv::WINDOW_NORMAL | cv::WINDOW_KEEPRATIO);
cv::resizeWindow("Pose Estimation", cv::Size(origWidth, origHeight));
cv::moveWindow("Pose Estimation", 0, 0);
window_created = true;
}
std::vector<Box> all_boxes;
std::vector<float> all_scores;
std::vector<cv::Rect> cv_boxes;
for (int i = 0; i < dets_length; ++i) {
float x0 = ofmap[0][dets_length * 0 + i]; // Extract x-coordinate of the bounding box center
float y0 = ofmap[0][dets_length * 1 + i]; // Extract y-coordinate of the bounding box center
float w = ofmap[0][dets_length * 2 + i]; // Extract width of the bounding box
float h = ofmap[0][dets_length * 3 + i]; // Extract height of the bounding box
float confidence = ofmap[0][dets_length * 4 + i]; // Extract confidence score of the detection
if (confidence > box_score) { // Check if confidence exceeds threshold
Box box;
box.confidence = confidence;
// Scale box coordinates back to the original image size
float y_factor = inframe.rows / float(model_input_height); // Calculate scaling factor for height
float x_factor = inframe.cols / float(model_input_width); // Calculate scaling factor for width
x0 = x0 * x_factor; // Scale x-coordinate
y0 = y0 * y_factor; // Scale y-coordinate
w = w * x_factor; // Scale width
h = h * y_factor; // Scale height
int x1 = (int)(x0 - 0.5 * w); // Calculate top-left x-coordinate
int x2 = (int)(x0 + 0.5 * w); // Calculate bottom-right x-coordinate
int y1 = (int)(y0 - 0.5 * h); // Calculate top-left y-coordinate
int y2 = (int)(y0 + 0.5 * h); // Calculate bottom-right y-coordinate
// Extract keypoints
for (int j = 0; j < num_kpts; ++j) {
float kpt_x = ofmap[0][dets_length * (5 + j * 3) + i] * x_factor; // Scale keypoint x-coordinate
float kpt_y = ofmap[0][dets_length * (5 + j * 3 + 1) + i] * y_factor; // Scale keypoint y-coordinate
float kpt_conf = ofmap[0][dets_length * (5 + j * 3 + 2) + i]; // Get keypoint confidence
if (kpt_conf > kpt_score) { // Check if keypoint confidence exceeds threshold
box.keypoints.push_back(std::make_pair(kpt_x, kpt_y)); // Add valid keypoint
} else {
box.keypoints.push_back(std::make_pair(-1, -1)); // Add invalid keypoint as placeholder
}
}
all_boxes.push_back(box); // Store the box with keypoints
all_scores.push_back(confidence); // Store the box confidence score
cv_boxes.push_back(cv::Rect(x1, y1, x2 - x1, y2 - y1)); // Store bounding box for NMS
}
}
// Apply Non-Maximum Suppression (NMS) to filter overlapping boxes
std::vector<int> nms_result;
cv::dnn::NMSBoxes(cv_boxes, all_scores, box_score, nms_thr, nms_result);
std::vector<Box> filtered_boxes;
for (int idx : nms_result) {
filtered_boxes.push_back(all_boxes[idx]); // Keep only the boxes after NMS
}
// Draw keypoints and connections on the frame
for (const auto &box : filtered_boxes) {
for (const auto &connection : KEYPOINT_PAIRS) {
int idx1 = connection.first;
int idx2 = connection.second;
if (idx1 < box.keypoints.size() && idx2 < box.keypoints.size()) {
auto kpt1 = box.keypoints[idx1];
auto kpt2 = box.keypoints[idx2];
if (kpt1.first != -1 && kpt1.second != -1 && kpt2.first != -1 && kpt2.second != -1) {
cv::line(inframe, cv::Point(kpt1.first, kpt1.second), cv::Point(kpt2.first, kpt2.second), cv::Scalar(255, 255, 255), 3);
}
}
}
// Draw keypoints on the frame
for (int k = 0; k < box.keypoints.size(); ++k) {
auto &kpt = box.keypoints[k];
if (kpt.first != -1 && kpt.second != -1) {
cv::circle(inframe, cv::Point(kpt.first, kpt.second), 4, COLOR_LIST[k % COLOR_LIST.size()], -1);
}
}
}
//Calulate FPS once every AVG_FPS_CALC_FRAME_COUNT frames
frame_count++;
if (frame_count == 1)
{
start_ms = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch());
}
else if (frame_count % AVG_FPS_CALC_FRAME_COUNT == 0)
{
std::chrono::milliseconds duration =
std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()) - start_ms;
fps_number = (float)AVG_FPS_CALC_FRAME_COUNT * 1000 / (float)(duration.count());
sprintf(fps_text, "FPS = %.1f", fps_number);
frame_count = 0;
}
//Write FPS values on the display image
cv::putText(inframe,fps_text,
cv::Point2i(450, 30), // origin of text (bottom left of textbox)
cv::FONT_ITALIC,
0.8, // font scale
cv::Scalar(255, 255, 255), // color (green)
2 // thickness
);
// Display the frame with the detected keypoints and connections
cv::imshow("Pose Estimation", inframe);
if (cv::waitKey(1) == 'q') {
runflag.store(false);
}
return true;
}
Connect the Accelerator#
Now, all you need to do is to connect your input and output functions to the AsyncAccl API. The API will take care of the rest.
accl = AsyncAccl(dfp)
accl.set_postprocessing_model('yolov8m-pose_post.onnx', model_idx=0)
accl.connect_input(app.generate_frame)
accl.connect_output(app.process_model_output)
accl.wait()
Here we also have a post-processing ONNX model, and accl.connect_post_model will handle the post-processing the onnx part of the output from the chip.
MX::Runtime::MxAccl accl;
accl.connect_dfp(modelPath); // Initialize the accelerator with the model
accl.connect_post_model(onnx_postprocessing_model_path, 0); // Connect post-processing model
post_model_info = accl.get_post_model_info(0); // Get post-processing model info
Next, you need to connect your input and output functions to the AsyncAccl API. The output from the DFP and post-processing ONNX will then be directly obtained.
accl.connect_stream(&incallback_getframe, &outcallback_getmxaoutput, 10 /*unique stream ID*/, 0 /*Model ID*/); // Connect input and output streams
std::cout << "Connected stream \n\n\n";
accl.start(); // Start inference
accl.wait(); // Wait for inference to complete
accl.stop(); // Stop the accelerator
The accelerator will automatically call the connected input and output functions in a fully pipelined fashion.
Third-Party Licenses#
This tutorial uses third-party software, models, and libraries. Below are the details of the licenses for these dependencies:
Model: Yolov8M-pose from Ultralytics GitHub
License: AGPLv3
Code and Pre/Post-Processing: Some code components, including pre/post-processing, were sourced from their GitHub
License: AGPLv3
Summary#
This tutorial showed how to use a Accelerator API to run a real-time inference using a Pose Estimation model. The full code and the compiled DFP are available for download.