Example

In this section, we'll develop a C example of a runtime that will be used to run the model file generated by the conversion toolchain explained before. And show how that runtime is integrated inside another program called a daemon that will wait for incoming inputs, finally, provide a Python that's used to test the whole framework implementation.

For starters, here is the list of compiler toolchains to use when cross-compiling the shared library of the runtime, along with the program that will be loading the runtime library and managing IO operations for it.

As previously stated, the runtime will handle data following a specific format. The following two C functions demonstrate the extraction of raw data from incoming messages and the construction of outgoing messages.

The IO data is serialized in MessagePack format, chosen for its ability to minimize memory footprints while offering cross-language support.

The characteristics defining model inputs are:

  • Number of inputs,

  • Rank of each input,

  • Shape of each input (also knows as the dimensions)

  • Each input data: a list of binary arrays.

Input message parser
/**
 * Extract relevant values from input.
 *
 * @param [in] input_packed_message Input datas (NOT freed in this function)
 * @param [in] input_names Array of input names retrieved from the ONNX file
 * @param [in] input_names_len Number of inputs
 * @param [out] returned_inputs Array of input tensors. Should be cleaned by the caller
 * @param [out] returned_input_shapes Array of input shapes. Should be cleaned by the caller
 * @param [out] returned_input_ranks Array of input ranks. Should be cleaned by the caller
 * @param [out] returned_input_sizes Array of input sizes. Should be cleaned by the caller
 * @return exit code: non-zero on error
 */
int runtime_core_parse_input_data(const char *input_packed_message,
                                size_t input_message_length,
                                int32_t input_names_len,
                                uint8_t ***returned_inputs,
                                int64_t ***returned_input_shapes,
                                int32_t **returned_input_ranks,
                                size_t **returned_input_sizes) {
    // Parse message schema
    mpack_reader_t reader;
    mpack_reader_init_data(&reader, input_packed_message, input_message_length);

    // **** Read data according to schema *****

    uint32_t num_tensors = mpack_expect_uint(&reader);
    if (mpack_reader_error(&reader) != mpack_ok) {
        printf("Warning: RUNTIME - Error reading num_tensors\n");
        return EXIT_FAILURE;
    }

    // Read input tensors in place ( do not free )
    uint8_t **inputs = (uint8_t **) malloc(num_tensors * sizeof(void *));
    // Read each tensor separately
    for (size_t tensor_index = 0; tensor_index < num_tensors; tensor_index++) {
        uint32_t input_bin_lenght = mpack_expect_bin_max(&reader, 1024 * 1024 * 100);
        // Read bytes in place to avoid copying
        uint8_t *local_input_data = (uint8_t *) mpack_read_bytes_inplace(&reader, input_bin_lenght);
        if (mpack_reader_error(&reader) != mpack_ok) {
            printf("Error: RUNTIME - Error reading input data\n");
            return EXIT_FAILURE;
        }
        mpack_done_bin(&reader);
        inputs[tensor_index] = (uint8_t *) local_input_data;
    }

    // Read output type (assume ownership of pointer)
    char* output_type = mpack_expect_cstr_alloc(&reader,100); // Max 100 chars
    if (mpack_reader_error(&reader) != mpack_ok) {
        return EXIT_FAILURE;
    }
    
    free(output_type);

    // Read input shapes
    bool shapes_included = mpack_expect_bool(&reader);
    if (!shapes_included) {
        printf("Error: RUNTIME - shapes are not included\n");
        return EXIT_FAILURE;
    }

    int64_t **input_shapes = (int64_t **) malloc(num_tensors * sizeof(int64_t *));
    int32_t *input_ranks = (int32_t *) malloc(num_tensors * sizeof(int32_t));
    size_t *input_sizes = malloc(sizeof(size_t) * input_names_len);
    for (size_t index = 0; index < num_tensors; index++) {
        uint32_t tensor_rank = mpack_expect_u32(&reader);
        input_ranks[index] = tensor_rank;
    }
    for (size_t index = 0; index < num_tensors; index++) {
        input_shapes[index] = malloc(input_ranks[index] * sizeof(int64_t));
        input_sizes[index] = 1;
        for (int32_t rank_index = 0; rank_index < input_ranks[index]; rank_index++) {
            input_shapes[index][rank_index] = mpack_expect_u64(&reader);
            input_sizes[index] *= input_shapes[index][rank_index];
        }
    }

    returned_inputs[0] = inputs;
    returned_input_shapes[0] = input_shapes;
    returned_input_ranks[0] = input_ranks;
    returned_input_sizes[0] = input_sizes;

    // Return
    return 0;
}

The outputs of a model share similar metadata as the inputs, but with an additional field: Data type. This denotes the data type of that output array as encoded in the ONNX specification. The set of model outputs are then serialized in MessagePack format before being returned.

Output message builder
#include "mpack.h"
#include <stdio.h>
#include <string.h>
#include <stdlib.h>

/**
 * Build output MessagePack from model's outputs.
 * 
 * Schema:
 * 
 * 1. "Outputs" ({OutputName:bin})
 * 2. "OutputRanks" ([num tensors]i32)
 * 3. "OutputShapes" ([num tensors][rank]i64)
 * 4. "OutputDataTypes" ([num tensors]i32)
 *
 * @param [in] output_names Array of output names
 * @param [in] number_outputs Number of outputs
 * @param [in] outputs Array of output tensors
 * @param [in] output_sizes Array of output sizes
 * @param [in] output_shapes Array of output shapes
 * @param [in] output_ranks Array of output ranks
 * @param [in] output_data_types Array of outputs data type
 * @param [out] output_json Output JSON string pointer. Should be cleaned by calling `sclbl_core_free`
 * @param [out] output_json_size length of Output JSON string
 * @return exit code: non-zero on error
 */
int runtime_core_build_output_mpack(char **output_names,
                                     int number_outputs,
                                     uint8_t **outputs,
                                     int64_t *output_sizes,
                                     int64_t **output_shapes,
                                     int32_t *output_ranks,
                                     int32_t *output_data_types,
                                     char **output_buffer,
                                     size_t *output_buffer_size) {

        
        
        // Initialize writer
        mpack_writer_t writer;
        char* mpack_buffer;
        size_t buffer_size;
        mpack_writer_init_growable(&writer,&mpack_buffer,&buffer_size);

        // Start building root map
        mpack_start_map(&writer,4);

        // Write outputs ({OutputName} bin)
            // Map key
        mpack_write_cstr(&writer, "Outputs");
            // Map value
        mpack_start_map(&writer,number_outputs);
        for (int index = 0; index < number_outputs; index++) {
            // Determine output size
            size_t tensor_byte_size = (size_t) output_sizes[index];
            switch (output_data_types[index]) {
                case 1: // onnx::TensorProto_DataType_FLOAT:
                {
                    tensor_byte_size *= sizeof(float);
                    break;
                }
                case 2: // onnx::TensorProto_DataType_UINT8:
                {
                    tensor_byte_size *= sizeof(uint8_t);
                    break;
                }
                case 3: //onnx::TensorProto_DataType_INT8:
                {
                    tensor_byte_size *= sizeof(int8_t);
                    break;
                }
                case 6: //onnx::TensorProto_DataType_INT32:
                {
                    tensor_byte_size *= sizeof(int32_t);
                    break;
                }
                case 7: //onnx::TensorProto_DataType_INT64:
                {
                    tensor_byte_size *= sizeof(int64_t);
                    break;
                }
                case 8: // onnx::TensorProto_DataType_STRING:
                {
                    tensor_byte_size *= sizeof(char);
                    break;
                }
                case 9: //onnx::TensorProto_DataType_BOOL:
                {
                    tensor_byte_size *= sizeof(bool);
                    break;
                }
                case 11: // TensorProto_DataType_DOUBLE:
                {
                    tensor_byte_size *= sizeof(double);
                    break;
                }
                default:continue;
            }
            mpack_write_cstr(&writer, output_names[index]);
            mpack_write_bin(&writer,(const char *) outputs[index],tensor_byte_size);
        }
        mpack_finish_array(&writer); // Finish "Outputs" array

        // Write output ranks ([num tensors]i32)
            // Map key
        mpack_write_cstr(&writer, "OutputRanks");
            // Map value
        mpack_start_array(&writer,number_outputs);
        for (int index = 0; index < number_outputs; index++) {
            mpack_write_i32(&writer,output_ranks[index]);
        }
        mpack_finish_array(&writer); // Finish "OutputRanks" array

        // Write output shapes ([num tensors][rank]i64)
            // Map key
        mpack_write_cstr(&writer, "OutputShapes");
            // Map value
        mpack_start_array(&writer,number_outputs);
        for (int output_index = 0; output_index < number_outputs; output_index++) {
            mpack_start_array(&writer,output_ranks[output_index]);
            for (int rank_index = 0; rank_index < output_ranks[output_index]; rank_index++){
                mpack_write_i64(&writer,output_shapes[output_index][rank_index]);
            }
            mpack_finish_array(&writer); // Finish "OutputShapes" inner array
        }
        mpack_finish_array(&writer); // Finish "OutputShapes" outer array

        // Write output data types ([num tensors]i32)
            // Map key
        mpack_write_cstr(&writer, "OutputDataTypes");
            // Map value
        mpack_start_array(&writer,number_outputs);
        for (int index = 0; index < number_outputs; index++) {
            mpack_write_i32(&writer,output_data_types[index]);
        }
        mpack_finish_array(&writer); // Finish "OutputDataTypes" array

        // Finish building root map
        mpack_finish_map(&writer);

        // Finish writing
        if (mpack_writer_destroy(&writer) != mpack_ok) {
            fprintf(stderr, "An error occurred encoding the data!\n");
            // Free buffer since it was not succesful
            free(writer.buffer);
            // Reset buffer so that it's not used again
            *output_buffer = NULL;
            return 1;
        }

        *output_buffer_size = buffer_size;
        *output_buffer = mpack_buffer;

        return 0;
}

Next is the core implementation of the runtime, which is uses Hailo's version of Onnxruntime under the hood. It includes the implementation of all the functions required by the interface.

Runtime Core
#include <stdio.h>
#include <unistd.h>

#include <onnxruntime/core/session/onnxruntime_c_api.h>
#include <onnxruntime/core/session/onnxruntime_session_options_config_keys.h>

#include <stdbool.h>

#include "runtime_core.h"
#include "runtime_utils.h"

#ifndef ONNXRUNTIME_API_VERSION
#define ONNXRUNTIME_API_VERSION 15
#endif

#define RUNTIME_ORT_CORE_EXEC(return_code, error) ({int32_t code = return_code; if (code != 0) {return code;}})

const OrtApi *api;
OrtSession *session;
OrtAllocator *allocator;
OrtMemoryInfo *memory_info;
OrtRunOptions *run_options;
OrtEnv *env;
OrtSessionOptions *session_options;
char *output_message;
size_t output_message_size;
int return_code;

static int runtime_core_process_status(OrtStatus *status) {
    if (status == NULL) return 0;
    printf("Error: RUNTIME - Message by ORT API: \n%s\n", api->GetErrorMessage(status));;
    api->ReleaseStatus(status);
    return 1;
}

/**
 * Retrieve number of inputs, each input names and its corresponding data type from the ONNX file.
 * \n DISCLAIMER: This can be called only after both `runtime_core_model_read` are executed with success.
 * @param [out] input_names_count Number of inputs
 * @param [out] input_data_types Array of inputs data type based on the onnx.pb-c.h._Onnx__TensorProto__DataType enum
 * @return Array of input names
 */
static char **runtime_core_get_input_names(int32_t *input_names_count, int32_t **input_data_types) {
    printf("Notice: RUNTIME - Reading input names and data types from ONNX file\n");

    size_t s_input_count;
    int32_t input_count;

    runtime_core_process_status(api->SessionGetInputCount(session, &s_input_count));
    input_count = (int32_t) s_input_count;

    char **tmp_input_names = malloc(sizeof(char *) * input_count);
    char **input_names = malloc(sizeof(char *) * input_count);
    if (input_data_types != NULL)
        input_data_types[0] = malloc(sizeof(int32_t) * input_count);

    for (int i = 0; i < input_count; i++) {
        if (input_data_types != NULL) {
            OrtTypeInfo *type_info;
            OrtTensorTypeAndShapeInfo *type_shape_info;
            ONNXTensorElementDataType onnx_type;
            runtime_core_process_status(api->SessionGetInputTypeInfo(session, i, &type_info));
            runtime_core_process_status(api->CastTypeInfoToTensorInfo(type_info,
                                                                    (const OrtTensorTypeAndShapeInfo **) &type_shape_info));
            runtime_core_process_status(api->GetTensorElementType(type_shape_info, &onnx_type));
            input_data_types[0][i] = (int) onnx_type;
            api->ReleaseTypeInfo(type_info);
        }

        runtime_core_process_status(api->SessionGetInputName(session, i, allocator, tmp_input_names + i));
        input_names[i] = malloc(sizeof(char) * strlen(tmp_input_names[i]));
        strcpy(input_names[i], tmp_input_names[i]);
        runtime_core_process_status(api->AllocatorFree(allocator, tmp_input_names[i]));
    }

    free(tmp_input_names);
    *input_names_count = input_count;

    return input_names;
}

/**
 * Retrieve number of outputs, each output names and its corresponding data type from the ONNX file.
 * @note: This can be called only after both `runtime_core_model_read` are executed with success.
 * @param output_names_count [out] Number of outputs
 * @param output_data_types [out] Array of outputs data type based on the onnx.pb-c.h._Onnx__TensorProto__DataType enum
 * @return Array of output names
 */
static char **runtime_core_get_output_names(int32_t *output_names_count, int32_t **output_data_types) {
    printf("Notice: RUNTIME - Reading output names and data types from ONNX file\n");

    size_t s_output_count;
    int32_t output_count;

    runtime_core_process_status(api->SessionGetOutputCount(session, &s_output_count));
    output_count = (int32_t) s_output_count;

    char **tmp_output_names = malloc(sizeof(char *) * output_count);
    char **output_names = malloc(sizeof(char *) * output_count);
    if (output_data_types != NULL)
        output_data_types[0] = malloc(sizeof(int32_t) * output_count);

    for (int i = 0; i < output_count; i++) {
        if (output_data_types != NULL) {
            OrtTypeInfo *type_info;
            OrtTensorTypeAndShapeInfo *type_shape_info;
            ONNXTensorElementDataType onnx_type;
            runtime_core_process_status(api->SessionGetOutputTypeInfo(session, i, &type_info));
            runtime_core_process_status(api->CastTypeInfoToTensorInfo(type_info,
                                                                    (const OrtTensorTypeAndShapeInfo **) &type_shape_info));
            runtime_core_process_status(api->GetTensorElementType(type_shape_info, &onnx_type));
            output_data_types[0][i] = (int) onnx_type;
            api->ReleaseTypeInfo(type_info);
        }

        runtime_core_process_status(api->SessionGetOutputName(session, i, allocator, tmp_output_names + i));
        output_names[i] = malloc(sizeof(char) * strlen(tmp_output_names[i]));
        strcpy(output_names[i], tmp_output_names[i]);
        runtime_core_process_status(api->AllocatorFree(allocator, tmp_output_names[i]));
    }

    free(tmp_output_names);
    *output_names_count = output_count;

    return output_names;
}

int runtime_core_exec(const char *input_packed_message, size_t input_message_length, char **output_packed_message, size_t* return_message_length){
    printf("Notice: RUNTIME - start core exec.\n");
    int error;
    uint8_t **inputs;
    int64_t **input_shapes;
    int32_t *input_ranks;
    size_t *input_sizes;

    int number_inputs, number_outputs;
    int *input_dtypes, *output_dtypes;

    printf("Notice: RUNTIME - start reading input names.\n");

    char **input_names = runtime_core_get_input_names(&number_inputs, &input_dtypes);

    printf("Notice: RUNTIME - start reading output names.\n");

    char **output_names = runtime_core_get_output_names(&number_outputs, &output_dtypes);

    printf("Notice: RUNTIME - runtime_core_parse_input_data running.\n");
    return_code = runtime_core_parse_input_data(input_packed_message, input_message_length, input_names, number_inputs,
                                              &inputs, &input_shapes, &input_ranks, &input_sizes);
    if (return_code != 0) error = 0;

    // run model
    OrtValue **input_values = malloc(sizeof(OrtValue *) * number_inputs);
    OrtValue **output_values = malloc(sizeof(OrtValue *) * number_outputs);

    printf("Notice: RUNTIME - building input tensors for onnxruntime ...\n");
    for (int i = 0; i < number_inputs; ++i) {
        RUNTIME_ORT_CORE_EXEC(runtime_core_process_status(
                api->CreateTensorWithDataAsOrtValue(memory_info,
                                                    inputs[i],
                                                    input_sizes[i] * runtime_util_get_sizeof_onnx_type(input_dtypes[i]),
                                                    input_shapes[i],
                                                    input_ranks[i],
                                                    input_dtypes[i],
                                                    input_values + i
                )), &error);

    }
    for (int i = 0; i < number_outputs; ++i)
        output_values[i] = NULL;

    printf("Notice: RUNTIME - onnxruntime inferring ...\n");
    // optional, may be helpfull in edge cases
    // runtime_core_process_status(api->RunOptionsUnsetTerminate(run_options));
    RUNTIME_ORT_CORE_EXEC(runtime_core_process_status(api->Run(session, run_options, (const char *const *) input_names,
                                       (const OrtValue *const *) input_values, number_inputs,
                                       (const char *const *) output_names, number_outputs, output_values))
                                       , &error);

    printf("Notice: RUNTIME - cleaning up inputs ...\n");
    // clean up
    // Inputs
    free(input_dtypes);
    free(input_ranks);
    free(input_sizes);
    for (int i = 0; i < number_inputs; i++) {
        free(input_names[i]);
        free(input_shapes[i]);
        api->ReleaseValue(input_values[i]);
    }
    free(input_names);
    free(input_shapes);
    free(inputs);
    free(input_values);

    void **outputs = malloc(sizeof(void *) * number_outputs);
    int64_t **output_shapes = malloc(sizeof(int64_t *) * number_outputs);
    int32_t *output_ranks = malloc(sizeof(int32_t) * number_outputs);
    int64_t *output_sizes = malloc(sizeof(size_t) * number_outputs);

    printf("Notice: RUNTIME - reading output tensors ...\n");
    for (int i = 0; i < number_outputs; ++i) {
        OrtTensorTypeAndShapeInfo *type_shape;

        // get shape information
        RUNTIME_ORT_CORE_EXEC(runtime_core_process_status(api->GetTensorTypeAndShape(output_values[i], &type_shape))
        , &error);

        // get output size
        size_t size;
        RUNTIME_ORT_CORE_EXEC(runtime_core_process_status(api->GetTensorShapeElementCount(type_shape, &size))
        , &error);
        output_sizes[i] = (int64_t) size;

        // get output rank
        size_t rank;
        RUNTIME_ORT_CORE_EXEC(runtime_core_process_status(api->GetDimensionsCount(type_shape, &rank))
        , &error);
        output_ranks[i] = (int32_t) rank;

        // get output shape
        output_shapes[i] = malloc(sizeof(int64_t) * rank);
        RUNTIME_ORT_CORE_EXEC(runtime_core_process_status(api->GetDimensions(type_shape, output_shapes[i], rank))
        , &error);

        // get output value
        size_t output_bytes = size * runtime_util_get_sizeof_onnx_type(output_dtypes[i]);
        outputs[i] = malloc(output_bytes);
        void *tmp;
        RUNTIME_ORT_CORE_EXEC(runtime_core_process_status(api->GetTensorMutableData(output_values[i], &tmp))
        , &error);
        memcpy(outputs[i], tmp, output_bytes);

        api->ReleaseTensorTypeAndShapeInfo(type_shape);
        api->ReleaseValue(output_values[i]);
    }
    free(output_values);

    printf("Notice: RUNTIME - runtime_core_build_output_mpack running ...\n");
    return_code = runtime_core_build_output_mpack(output_names,
                                               number_outputs,
                                               (uint8_t **) outputs,
                                               output_sizes,
                                               output_shapes,
                                               output_ranks,
                                               output_dtypes,
                                               &output_message,
                                               return_message_length);

    printf("Notice: RUNTIME - cleaning up outputs ...\n");
    if (return_code != 0) error = 0;

    // clean up
    // Outputs
    free(output_dtypes);
    free(output_sizes);
    free(output_ranks);
    for (int i = 0; i < number_outputs; i++) {
        free(output_names[i]);
        free(output_shapes[i]);
        free(outputs[i]);
    }
    free(output_names);
    free(output_shapes);
    free(outputs);

    *output_packed_message = output_message;
    return error;
}

/**
 * Initialize OnnxRuntime variables.
 * \n It limits number of threads to 1, logging level to errors and optimization level to ORT_ENABLE_ALL.
 * @return 0 (always)
 */
int runtime_core_runtime_init(int acceleration, int runtime_count) {
    printf("Notice: RUNTIME - Initializing OnnxRuntime I\n");

    long number_of_processors = sysconf(_SC_NPROCESSORS_ONLN);

    // get the api
    api = OrtGetApiBase()->GetApi(ONNXRUNTIME_API_VERSION);

    // create the environment
    runtime_core_process_status(api->CreateEnv(ORT_LOGGING_LEVEL_FATAL, "nx_ort", &env));
    printf("Notice: RUNTIME - Initializing OnnxRuntime II\n");

    // Create session options
    runtime_core_process_status(api->CreateSessionOptions(&session_options));

    // choices: ORT_DISABLE_ALL, ORT_ENABLE_BASIC, ORT_ENABLE_EXTENDED, ORT_ENABLE_ALL
    runtime_core_process_status(api->SetSessionGraphOptimizationLevel(session_options, ORT_ENABLE_ALL));

    printf("Notice: RUNTIME - Initializing OnnxRuntime III\n");

    // Divide threads equally between runtimes
    const int max_threads_per_runtime = 8;
    int interop_threads = number_of_processors/runtime_count;

    // Clamp value between 1 and max_threads_per_runtime
    if (interop_threads < 1) {
        interop_threads = 1;
    } else if (interop_threads > max_threads_per_runtime) {
        interop_threads = max_threads_per_runtime;
    }

    runtime_core_process_status(api->SetIntraOpNumThreads(session_options, interop_threads));
    runtime_core_process_status(api->SetInterOpNumThreads(session_options, 1));
    runtime_core_process_status(api->SetSessionExecutionMode(session_options, ORT_PARALLEL));

    printf("Notice: RUNTIME - Initializing OnnxRuntime II\n");

    if(acceleration >=1 ){
#ifdef ONNXRUNTIME_HAILO
    runtime_core_process_status(api->SessionOptionsAppendExecutionProvider_Hailo(session_options, true));
#endif
    }
    char **providers;
    int number_providers;
    runtime_core_process_status(api->GetAvailableProviders(&providers, &number_providers));
    for (int i = 0; i < number_providers; ++i)
        printf("Notice: RUNTIME - Provider id: %i - name: %s\n", i, providers[i]);
    runtime_core_process_status(api->ReleaseAvailableProviders(providers, number_providers));

    // create run options
    runtime_core_process_status(api->CreateRunOptions(&run_options));

    return 0;
}

int runtime_core_model_read(const char *file_path) {
    printf("Notice: RUNTIME - Reading ONNX file from '%s' ...\n", file_path);

    // Create a session
    runtime_core_process_status(api->CreateSession(env, file_path, session_options, &session));

    // create allocator
    runtime_core_process_status(api->CreateCpuMemoryInfo(OrtArenaAllocator, OrtMemTypeDefault, &memory_info));
    runtime_core_process_status(api->CreateAllocator(session, memory_info, &allocator));

    return 0;
}

int runtime_core_free() {
    printf("Notice: RUNTIME - Freeing output JSON returned by the runtime ...\n");

    // optional, may be helpfull in edge cases
    // runtime_core_process_status(api->RunOptionsSetTerminate(run_options));

    free(output_message);
    output_message = NULL;
    output_message_size = 0;
    printf("Notice: RUNTIME - Free output completed ...\n");

    return 0;
}


int runtime_core_finalize() {
    printf("Notice: RUNTIME - Releasing all objects created by ORT API ...\n");

    api->ReleaseRunOptions(run_options);
    api->ReleaseMemoryInfo(memory_info);
    api->ReleaseAllocator(allocator);
    api->ReleaseEnv(env);
    api->ReleaseSession(session);
    api->ReleaseSessionOptions(session_options);
    return 0;
}

const char *runtime_error_message(){
    return "Not implemented.";
}

const char *runtime_version(){
    return "0.1.0";
}

const char *runtime_name(){
    return "ORT";
}

Instructions on how to build the whole shared library along with all the needed dependencies can be found here.

Testing the runtime

Now that we built the runtime's shared library, and we can load it using the adequate Daemon program, we can test the optimized model generated by the conversion toolchain block using the following Python script:

Python script to test the runtime
from PIL import Image
import numpy as np
import msgpack
from os.path import dirname, abspath, join
import struct
import cv2

_here = dirname(abspath(__file__))

# Initialize vars
engine_pipe_name = join(_here, 'engine_pipe')
module_pipe_name = join(_here, 'module_pipe')
shm_path = 1

input_image_path = join(_here, 'image.jpg') # TODO: Change this to the path of your image
height, width, means, stds, nchw = 320, 320, 0, 1, True # TODO: Change these values to match your model

def get_input_image():
    image = Image.open(input_image_path).convert('RGB')
    image = image.resize((width, height))
    image = np.array(image)
    image = (image - means) / stds
    image = image.astype('float32')
    image = np.expand_dims(image, axis=0)
    if nchw:
        image = np.transpose(image, (0, 3, 1, 2)) # NHWC -> NCHW
    return image

def build_input_message():
    image = get_input_image() # NCHW
    height, width = image.shape[2], image.shape[3]
    nms_sensitivity = np.array([0.5], dtype='float32')
    mask = np.ones((height, width), dtype='bool')

    num_tensors = 3
    input_shapes = [image.shape, nms_sensitivity.shape, mask.shape]
    input_ranks = [len(shape) for shape in input_shapes]
    input_shapes = [*image.shape, *nms_sensitivity.shape, *mask.shape]
                  
    # image to bytes
    image_list = image.flatten().tolist()
    image_bytes = struct.pack("f" * len(image_list),*image_list)
    # nms to bytes
    nms_list = nms_sensitivity.flatten().tolist()
    nms_bytes = struct.pack("f" * len(nms_list),*nms_list)
    # mask to bytes
    mask_list = mask.flatten().tolist()
    mask_bytes = struct.pack("f" * len(mask_list),*mask_list)

    input_data = [image_bytes, nms_bytes, mask_bytes]
    # msgpack data: [number of tensors, tensors, 'json', True, tensor ranks, tensor shapes]
    input_message = [num_tensors]
    for input_d in input_data:
        input_message.append(input_d)
    input_message.extend(['json', True, *input_ranks, *input_shapes])
    packer : msgpack.Packer = msgpack.Packer(use_bin_type=True)
    msgpack_data : bytearray = bytearray()
    for val in input_message:
        packed_val = packer.pack(val)
        msgpack_data.extend(packed_val)
    return msgpack_data

def parse_output_message(byte_data):
    output = msgpack.unpackb(byte_data)
    outputs = output['Outputs'] # dict
    output_ranks = output['OutputRanks']
    output_shapes = output['OutputShapes']
    output_dtypes = output['OutputDataTypes']

    output_sizes = [np.prod(shape) for shape in output_shapes]
    arrays = [struct.unpack("f" * size, output) for size, output in zip(output_sizes, outputs.values())]
    arrays = [np.array(array).reshape(shape) for array, shape in zip(arrays, output_shapes)]
    return arrays

def visualize_bboxes(bboxes, img_path, width, height):
    if not (bboxes.shape[1] == 6 and len(bboxes.shape) == 2):
        raise ValueError('Invalid bboxes shape. Expected (N, 6) where N is the number of bboxes.')
    
    img = cv2.imread(img_path)
    img = cv2.resize(img, (width, height), interpolation=cv2.INTER_AREA)

    for bbox in bboxes:
        x1, y1, x2, y2, score, class_id = bbox
        x1, y1, x2, y2 = int(x1), int(y1), int(x2), int(y2)
        cv2.rectangle(img, (x1, y1), (x2, y2), (0, 255, 0), 2)
        cv2.putText(img, f'{int(class_id)}', (x1, y1), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 2)

    # save the image
    cv2.imwrite('output.jpg', img)


# Create the pipes
import os
if os.path.exists(engine_pipe_name):
    os.unlink(engine_pipe_name)
os.mkfifo(engine_pipe_name, mode=0o666)

if os.path.exists(module_pipe_name):
    os.unlink(module_pipe_name)
os.mkfifo(module_pipe_name, mode=0o666)

# Create the shared memory
import sysv_ipc
shm_key = sysv_ipc.ftok('/tmp', shm_path)
shm = sysv_ipc.SharedMemory(shm_key, flags=sysv_ipc.IPC_CREAT | 0o666, size=1024*1024*10)
shm_id = shm.id

# Print engine connection info and wait for the engine to connect
print(f'Engine paramters (engine_pipe_name, module_pipe_name, shm_id, shm_key): {engine_pipe_name} {module_pipe_name} {shm_id} {shm_key}')

# Wait for the engine to connect
print("Opening pipe to engine...")
with open(engine_pipe_name, 'wb') as pipe:
    pipe.write(bytes(1))
    pipe.flush()
    
# Wait for the engine to connect
print("Waiting for the engine to connect...")
with open(module_pipe_name, 'rb') as pipe:
    pipe.read(1)

# Wait for the engine to connect
try:
    print("Engine connected. Starting the module...")
    while True:
        # print('Creating input message...')
        # Send data to the engine through shm
        msg = build_input_message()
        msg_len = struct.pack("<I",len(msg))
        # Send through shared memory
        shm.write(msg_len)
        shm.write(msg,offset=4)
        print(f'Sent {len(msg)} bytes to shared memory')

        # Signal the engine to start
        # print("Signaling the engine to start...")
        with open(engine_pipe_name, 'wb') as f:
            f.write(b'a')
            f.flush()

        # Start wait for signal from engine
        # print("Waiting for the engine to finish...")
        with open(module_pipe_name, 'rb') as f:
            f.read(1)

        # Get the output from the shared memory
        # print("Reading output from shared memory...")
        output_len = shm.read(4)
        output_len = struct.unpack("<I",output_len)[0]
        output = shm.read(output_len, offset=4)
        print(f'Received {output_len} bytes from shared memory')
        arrays = parse_output_message(output)

        # Visualize the output
        bboxes = arrays[0]
        visualize_bboxes(bboxes, input_image_path, 320, 320)

except KeyboardInterrupt:
    pass

# Clean up
os.remove(engine_pipe_name)
os.remove(module_pipe_name)
shm.detach()

Last updated