YOLOv8 Pose Estimation#
Introduction#
In this tutorial, we will show how to use the Acclerator API to perform real-time pose estimation on MX3 in Python and C++. We will use the YOLOv8m model, which is a relatively large model with great accuracy.
Note
This tutorial assumes a four-chip solution is correctly connected.
Download & Run
Download
This tutorial provides a high-level overview of the application’s key components. To run the full application, download the complete code package and the compiled DFP. After downloading, refer to the Run section below for step-by-step instructions.
Run
Requirements
Install the required dependencies:
pip install opencv-python==4.11.0.86
pip install ultralytics==8.3.161
Run Command
Run the Python script:
python run_pose_estimation.py
Requirements Install the required dependencies and build the project:
# Install OpenCV (ensure version compatibility)
sudo apt-get install libopencv-dev
# Build the C++ project
mkdir build && cd build
cmake ..
make
Run Command
Run the C++ executable:
./poseEstimation
1. Download the Model#
The YOLOv8m-pose pre-trained models are available on the Official YOLOv8m-pose GitHub page in the Pose Estimation section. For the sake of the tutorial, we exported and pre-compiled the model for the user to download. The model can be found in the compressed folder attached
to this tutorial.
2. Compile the Model#
The YOLOv8m-pose model was exported with the option to include a post-processing section in the model graph. Hence, it needed to be compiled with the neural compiler autocrop
option. After the compilation, the compiler will generate the DFP file for the main section of the model (YOLO_v8_medium_pose_640_640_3_onnx.dfp
) and the cropped post-processing section of the model (YOLO_v8_medium_pose_640_640_3_onnx_post.onnx
). The compilation step is typically needed once and can be done using the Neural Compiler API or Tool.
Hint
You can use the pre-compiled DFP and post-processing section attached
for this tutorial and skip the compilation step.
from memryx import NeuralCompiler
nc = NeuralCompiler(num_chips=4, models="YOLO_v8_medium_pose_640_640_3_onnx.onnx", verbose=1, autocrop=True)
dfp = nc.run()
In your command line, you need to type,
mx_nc -v -m YOLO_v8_medium_pose_640_640_3_onnx.onnx --autocrop -c 4
This will produce a DFP file ready to be used by the accelerator. In your Python code, you need to point the dfp, post_model
variables to the generated file paths.
dfp = "YOLO_v8_medium_pose_640_640_3_onnx.dfp"
post_model = "YOLO_v8_medium_pose_640_640_3_onnx_post.onnx"
In your Python code, you need to point the dfp, post_model
variables to the generated file paths.
dfp = "YOLO_v8_medium_pose_640_640_3_onnx.dfp"
post_model = "YOLO_v8_medium_pose_640_640_3_onnx_post.onnx"
In your C++ code, you need to point the dfp
and post-processing section via a generated file path,
// Model file paths
const fs::path modelPath = "YOLO_v8_medium_pose_640_640_3_onnx.dfp"; // Path to DFP model
3. CV Initializations#
We will import the needed libraries, intialize the CV pipeline, and define common variables in this step.
# OpenCV and helper libraries imports
import sys
import os
from pathlib import Path
from queue import Queue
import cv2 as cv
import numpy as np
from memryx import AsyncAccl
import torchvision.ops as ops
from typing import List
import argparse
# Connect to the camera and initialize the app
cam = cv.VideoCapture(0)
parent_path = Path(__file__).resolve().parent
model_input_shape = (640, 640)
app = App(cam, model_input_shape, mirror=False, src_is_cam=True)
#include <opencv2/opencv.hpp> /* imshow */
#include <opencv2/imgproc.hpp> /* cvtcolor */
#include <opencv2/imgcodecs.hpp> /* imwrite */
#include "memx/accl/MxAccl.h"
fs::path videoPath;
if(use_cam) {
std::cout << "use cam";
vcap.open(0, cv::CAP_V4L2); // Open camera
} else {
vcap.open(videoPath.c_str()); // Open video file
}
Along with necessary CV initialization, we also initalize necessary varibles for storing DFP model information, image manipulations, and FPS calculations.
Initialize model info variables - we get this information after connecting to the accelerator:
// Model information containers
MX::Types::MxModelInfo model_info; // Information for the DFP model
MX::Types::MxModelInfo post_model_info; // Information for post-processing model
Variables for image manipulations:
// Model input parameters
int model_input_width = 640;
int model_input_height = 640;
double origHeight = 0.0; // Original frame height
double origWidth = 0.0; // Original frame width
Variables used for pose keypoints calculations
float box_score = 0.25; // Threshold for box confidence
float rat = 0.0; // Aspect ratio used during resizing
float kpt_score = 0.5; // Threshold for keypoint confidence
float nms_thr = 0.2; // IoU threshold for Non-Maximum Suppression (NMS)
int dets_length = 8400; // Number of detections
int num_kpts = 17; // Number of keypoints in pose estimation
4. Define an Input Function#
We need to define an input function for the accelerator to use. In this case, our input function will get a new frame from the cam or video and pre-process it.
def preprocess_image(self, image):
# Resize and pad the image to fit the model input shape
h, w = image.shape[:2]
r = min(self.model_input_shape[0] / h, self.model_input_shape[1] / w)
image_resized = cv.resize(image, (int(w * r), int(h * r)), interpolation=cv.INTER_LINEAR)
# Create a padded image
padded_img = np.ones((self.model_input_shape[0], self.model_input_shape[1], 3), dtype=np.uint8) * 114
padded_img[:int(h * r), :int(w * r)] = image_resized
# Normalize image to [0, 1] range
padded_img = padded_img / 255.0
padded_img = padded_img.astype(np.float32)
# Change the shape to (1, 3, 640, 640)
padded_img = np.transpose(padded_img, (2, 0, 1)) # Change shape to (3, 640, 640)
padded_img = np.expand_dims(padded_img, axis=0) # Add batch dimension to make it (1, 3, 640, 640)
return padded_img, r
// Callback function for processing input frames
bool incallback_getframe(std::vector<const MX::Types::FeatureMap*> dst, int streamLabel){
if(runflag.load()){
cv::Mat inframe;
while(true){
// Capture a frame from video/camera
bool got_frame = vcap.read(inframe);
if (!got_frame) {
std::cout << "\n\n No frame - End of cam? \n\n\n";
runflag.store(false);
return false;
} else {
// Push the captured frame to the queue
{
std::lock_guard<std::mutex> flock(frameQueue_mutex);
// only push this frame if there's space
if(use_cam && (frames_queue.size() >= max_backlog)){
// drop it and capture the next frame instead
continue;
} else {
frames_queue.push_back(inframe);
}
}
// Convert frame to RGB and preprocess for model input
cv::Mat rgbImage;
cv::cvtColor(inframe, rgbImage, cv::COLOR_BGR2RGB);
cv::Mat preProcframe;
// cv::resize(rgbImage, preProcframe, 1.0, cv::Size(model_input_width, model_input_height), cv::Scalar(0, 0, 0), true, false);
cv::dnn::blobFromImage(rgbImage, preProcframe, 1.0, cv::Size(model_input_width, model_input_height), cv::Scalar(0, 0, 0), true, false);
cv::Mat floatImage;
preProcframe.convertTo(floatImage, CV_32F, 1.0 / 255.0); // Normalize the frame
// Set preprocessed input data to accelerator
dst[0]->set_data((float*)floatImage.data);
return true;
}
}
} else {
vcap.release(); // Release video resources if runflag is false
return false;
}
}
5. Define an Output Function#
We also need to define an output function for the accelerator to use. Our output function will post-process the accelerator output and display it on the screen.
def process_model_output(self, *ofmaps):
# Process model output (keypoints and bounding boxes)
predict = ofmaps[0].squeeze(0).T # Shape: [8400, 56]
predict = predict[predict[:, 4] > self.box_score, :] # Filter boxes by confidence score
scores = predict[:, 4]
boxes = predict[:, 0:4] / self.ratio
boxes = self.xywh2xyxy(boxes) # Convert bounding box format
# Process keypoints
kpts = predict[:, 5:]
for i in range(kpts.shape[0]):
for j in range(kpts.shape[1] // 3):
if kpts[i, 3*j+2] < self.kpt_score: # Filter keypoints by confidence score
kpts[i, 3*j: 3*(j+1)] = [-1, -1, -1]
else:
kpts[i, 3*j] /= self.ratio
kpts[i, 3*j+1] /= self.ratio
idxes = self.nms_process(boxes, scores, self.nms_thr) # Apply NMS
result = {'boxes': boxes[idxes,: ].astype(int).tolist(),
'kpts': kpts[idxes,: ].astype(float).tolist(),
'scores': scores[idxes].tolist()}
img = self.capture_queue.get() # Get the frame from the queue
self.capture_queue.task_done()
# Draw keypoints and bounding boxes on the image
color = (0,255,0)
boxes, kpts, scores = result['boxes'], result['kpts'], result['scores']
for kpt, score in zip(kpts, scores):
# Draw lines connecting keypoints
for pair in self.KEYPOINT_PAIRS:
pt1 = kpt[3 * pair[0]: 3 * (pair[0] + 1)]
pt2 = kpt[3 * pair[1]: 3 * (pair[1] + 1)]
if pt1[2] > 0 and pt2[2] > 0:
cv.line(img, (int(pt1[0]), int(pt1[1])), (int(pt2[0]), int(pt2[1])), (255, 255, 255), 3)
# Draw individual keypoints
for idx in range(len(kpt) // 3):
x, y, score = kpt[3*idx: 3*(idx+1)]
if score > 0:
cv.circle(img, (int(x), int(y)), 5, self.COLOR_LIST[idx % len(self.COLOR_LIST)], -1)
self.show(img) # Display the image
return img
// Output callback function to process model output
bool outcallback_getmxaoutput(std::vector<const MX::Types::FeatureMap*> src, int streamLabel) {
// Get data from the feature maps
for(int i = 0; i < post_model_info.num_out_featuremaps; ++i) {
src[i]->get_data(ofmap[i]);
}
// Get the input frame from the queue
cv::Mat inframe;
{
std::lock_guard<std::mutex> flock(frameQueue_mutex);
inframe = frames_queue.front();
frames_queue.pop_front();
}
// Create a window for displaying results if not created yet
if(!window_created) {
cv::namedWindow("Pose Estimation", cv::WINDOW_NORMAL | cv::WINDOW_KEEPRATIO);
cv::resizeWindow("Pose Estimation", cv::Size(origWidth, origHeight));
cv::moveWindow("Pose Estimation", 0, 0);
window_created = true;
}
// Process detection results and perform NMS
std::vector<Box> all_boxes;
std::vector<float> all_scores;
std::vector<cv::Rect> cv_boxes;
// Loop through each detection
#pragma omp parallel for num_threads(2)
for (int i = 0; i < dets_length; ++i) {
// Extract bounding box coordinates and confidence score
float x0 = ofmap[0][dets_length * 0 + i];
float y0 = ofmap[0][dets_length * 1 + i];
float w = ofmap[0][dets_length * 2 + i];
float h = ofmap[0][dets_length * 3 + i];
float confidence = ofmap[0][dets_length * 4 + i];
if (confidence > box_score) {
Box box;
box.confidence = confidence;
// Scale box coordinates back to original image size
float y_factor = inframe.rows / float(model_input_height);
float x_factor = inframe.cols / float(model_input_width);
x0 = x0 * x_factor;
y0 = y0 * y_factor;
w = w * x_factor;
h = h * y_factor;
int x1 = (int)(x0 - 0.5 * w);
int x2 = (int)(x0 + 0.5 * w);
int y1 = (int)(y0 - 0.5 * h);
int y2 = (int)(y0 + 0.5 * h);
// Extract keypoints for pose estimation
for (int j = 0; j < num_kpts; ++j) {
float kpt_x = ofmap[0][dets_length * (5 + j * 3) + i] * x_factor;
float kpt_y = ofmap[0][dets_length * (5 + j * 3 + 1) + i] * y_factor;
float kpt_conf = ofmap[0][dets_length * (5 + j * 3 + 2) + i];
// Add keypoints if confidence is above threshold
if (kpt_conf > kpt_score) {
box.keypoints.push_back(std::make_pair(kpt_x, kpt_y));
} else {
box.keypoints.push_back(std::make_pair(-1, -1)); // Invalid keypoint
}
}
#pragma omp critical
{
all_boxes.push_back(box);
all_scores.push_back(confidence);
cv_boxes.push_back(cv::Rect(x1, y1, x2 - x1, y2 - y1));
}
}
}
// Apply Non-Maximum Suppression (NMS) to reduce overlapping boxes
std::vector<int> nms_result;
cv::dnn::NMSBoxes(cv_boxes, all_scores, box_score, nms_thr, nms_result);
// Keep only the filtered boxes after NMS
std::vector<Box> filtered_boxes;
for (int idx : nms_result) {
filtered_boxes.push_back(all_boxes[idx]);
}
// Draw keypoints and connections (skeleton) on the frame
for (const auto &box : filtered_boxes) {
for (const auto &connection : KEYPOINT_PAIRS) {
int idx1 = connection.first;
int idx2 = connection.second;
if (idx1 < box.keypoints.size() && idx2 < box.keypoints.size()) {
auto kpt1 = box.keypoints[idx1];
auto kpt2 = box.keypoints[idx2];
if (kpt1.first != -1 && kpt1.second != -1 && kpt2.first != -1 && kpt2.second != -1) {
cv::line(inframe, cv::Point(kpt1.first, kpt1.second), cv::Point(kpt2.first, kpt2.second), cv::Scalar(255, 255, 255), 3);
}
}
}
// Draw individual keypoints
for (int k = 0; k < box.keypoints.size(); ++k) {
auto &kpt = box.keypoints[k];
if (kpt.first != -1 && kpt.second != -1) {
cv::circle(inframe, cv::Point(kpt.first, kpt.second), 4, COLOR_LIST[k % COLOR_LIST.size()], -1);
}
}
}
// Calculate FPS every AVG_FPS_CALC_FRAME_COUNT frames
frame_count++;
if (frame_count == 1) {
start_ms = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch());
} else if (frame_count % AVG_FPS_CALC_FRAME_COUNT == 0) {
std::chrono::milliseconds duration = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()) - start_ms;
fps_number = (float)AVG_FPS_CALC_FRAME_COUNT * 1000 / (float)(duration.count());
sprintf(fps_text, "FPS = %.1f", fps_number);
frame_count = 0;
}
// Display FPS on the frame
cv::putText(inframe, fps_text, cv::Point2i(450, 30), cv::FONT_ITALIC, 0.8, cv::Scalar(255, 255, 255), 2);
// Show the frame with keypoints and bounding boxes
cv::imshow("Pose Estimation", inframe);
// Exit if 'q' key is pressed
if (cv::waitKey(1) == 'q') {
runflag.store(false);
}
return true;
}
6. Connect the Accelerator#
Now, all you need to do is to connect your input and output functions to the AsyncAccl API. The API will take care of the rest.
run_mxa(dfp, post_model, app):
# Initialize the accelerator and set up model paths
accl = AsyncAccl(dfp)
accl.set_postprocessing_model(post_model, model_idx=0)
accl.connect_input(app.generate_frame)
accl.connect_output(app.process_model_output)
accl.wait() # Wait for the accelerator to finish
Here we also have a post-processing ONNX model, and :code: accl.connect_post_model will handle the post-processing of the output from the chip:
// Initialize the MemryX accelerator
MX::Runtime::MxAccl accl{fs::path(modelPath)};
accl.connect_post_model(fs::path(onnx_postprocessing_model_path)); // Connect the post-processing model
post_model_info = accl.get_post_model_info(0); // Get post-processing model info
model_info = accl.get_model_info(0); // Get main model info
model_input_height = model_info.in_featuremap_shapes[0][0]; // Set model input height
model_input_width = model_info.in_featuremap_shapes[0][1]; // Set model input width
Next, you need to connect your input and output functions to the AsyncAccl API. The output from the DFP and post-processing ONNX will then be directly obtained:
// Connect input and output streams to the accelerator
accl.connect_stream(&incallback_getframe, &outcallback_getmxaoutput, 10 /*unique stream ID*/, 0 /*Model ID*/);
std::cout << "Connected stream \n\n\n";
accl.start(); // Start inference
accl.wait(); // Wait for inference to complete
accl.stop(); // Stop the accelerator
The accelerator will automatically call the connected input and output functions in a fully pipelined fashion.
Third-Party Licenses#
This tutorial uses third-party software, models, and libraries. Below are the details of the licenses for these dependencies:
Model: Yolov8M-pose from Ultralytics GitHub
License: AGPLv3
Code and Pre/Post-Processing: Some code components, including pre/post-processing, were sourced from their GitHub
License: AGPLv3
Summary#
This tutorial showed how to use a Accelerator API to run a real-time inference using a Pose Estimation model.