Program Listing for File gstreamer_pipeline.cpp

Program Listing for File gstreamer_pipeline.cpp#

Return to documentation for file (Src/gstreamer_pipeline.cpp)

module;

#include "kataglyphis_export.h"
#include <atomic>
#include <cstring>
#include <expected>
#include <gst/analytics/analytics.h>
#include <gst/analytics/gsttensor.h>
#include <gst/analytics/gsttensormeta.h>
#include <gst/app/gstappsink.h>
#include <gst/gst.h>
#include <gst/video/video.h>
#include <mutex>
#include <string>
#include <thread>
#include <vector>

module kataglyphis.gstreamer_pipeline;

import kataglyphis.project_config;

namespace kataglyphis::gstreamer {

namespace {
    std::atomic<bool> g_gstreamer_initialized{ false };
    std::mutex g_gstreamer_mutex;
}// namespace

struct GStreamerPipeline::Impl
{
    GstElement *pipeline{ nullptr };
    GstElement *appsink{ nullptr };
    GstElement *appsrc{ nullptr };

    BufferCallback buffer_callback;
    std::atomic<bool> is_playing{ false };
    std::atomic<bool> is_paused{ false };
    std::string caps_string;

    GMainLoop *main_loop{ nullptr };
    std::thread main_loop_thread;

    ~Impl() { cleanup(); }

    void cleanup()
    {
        if (pipeline != nullptr) {
            gst_element_set_state(pipeline, GST_STATE_NULL);
            gst_object_unref(pipeline);
            pipeline = nullptr;
        }
        if (main_loop != nullptr) {
            g_main_loop_quit(main_loop);
            if (main_loop_thread.joinable()) { main_loop_thread.join(); }
            g_main_loop_unref(main_loop);
            main_loop = nullptr;
        }
    }

    static void on_new_sample(GstElement *sink, Impl *self)
    {
        GstSample *sample = nullptr;
        g_signal_emit_by_name(sink, "pull-sample", &sample);

        if (sample != nullptr) {
            GstBuffer *buffer = gst_sample_get_buffer(sample);
            GstCaps *caps = gst_sample_get_caps(sample);
            GstMapInfo map_info;

            if ((buffer != nullptr) && (gst_buffer_map(buffer, &map_info, GST_MAP_READ) != 0)) {
                BufferInfo buffer_info;
                buffer_info.data = map_info.data;
                buffer_info.size = map_info.size;

                if (auto *meta = gst_buffer_get_tensor_meta(buffer)) {
                    buffer_info.tensors.reserve(meta->num_tensors);
                    for (gsize i = 0; i < meta->num_tensors; ++i) {
                        const GstTensor *tensor = gst_tensor_meta_get(meta, i);
                        TensorMeta tm;
                        tm.tensor_index = i;
                        tm.num_tensors = meta->num_tensors;
                        tm.data_type = static_cast<int>(tensor->data_type);

                        gsize num_dims = 0;
                        auto *dims = gst_tensor_get_dims(const_cast<GstTensor *>(tensor), &num_dims);
                        for (gsize j = 0; j < num_dims; ++j) { tm.dimensions.push_back(dims[j]); }
                        buffer_info.tensors.push_back(std::move(tm));
                    }
                }

                GstStructure *structure = gst_caps_get_structure(caps, 0);
                if (structure != nullptr) {
                    gst_structure_get_uint(structure, "width", &buffer_info.metadata.width);
                    gst_structure_get_uint(structure, "height", &buffer_info.metadata.height);

                    guint fps_n;
                    guint fps_d;
                    if (gst_structure_get_fraction(
                          structure, "framerate", reinterpret_cast<gint *>(&fps_n), reinterpret_cast<gint *>(&fps_d)) != 0) {
                        buffer_info.metadata.fps_n = fps_n;
                        buffer_info.metadata.fps_d = fps_d;
                    }

                    const gchar *format = gst_structure_get_string(structure, "format");
                    if (format != nullptr) { buffer_info.metadata.format = format; }
                }

                buffer_info.metadata.timestamp_ns = GST_BUFFER_PTS(buffer);
                buffer_info.metadata.duration_ns = GST_BUFFER_DURATION(buffer);

                if (self->buffer_callback) { self->buffer_callback(buffer_info); }

                gst_buffer_unmap(buffer, &map_info);
            }
            gst_sample_unref(sample);
        }
    }

    static auto on_pad_probe([[maybe_unused]] GstPad *pad,
      [[maybe_unused]] GstPadProbeInfo *info,
      [[maybe_unused]] gpointer user_data) -> GstPadProbeReturn
    {
        return GST_PAD_PROBE_OK;
    }
};

GStreamerPipeline::GStreamerPipeline() : impl_(std::make_unique<Impl>()) {}

GStreamerPipeline::~GStreamerPipeline() = default;

GStreamerPipeline::GStreamerPipeline(GStreamerPipeline &&other) noexcept : impl_(std::move(other.impl_)) {}

auto GStreamerPipeline::operator=(GStreamerPipeline &&other) noexcept -> GStreamerPipeline &
{
    if (this != &other) { impl_ = std::move(other.impl_); }
    return *this;
}

auto GStreamerPipeline::initialize_gstreamer(int *argc, char ***argv) -> std::expected<void, GStreamerError>
{

    std::scoped_lock lock(g_gstreamer_mutex);

    if (g_gstreamer_initialized.load()) { return {}; }

    GError *error = nullptr;
    if (gst_init_check(argc, argv, &error) == 0) {
        if (error != nullptr) { g_error_free(error); }
        return std::unexpected(GStreamerError::InitializationFailed);
    }

    g_gstreamer_initialized.store(true);
    return {};
}

auto GStreamerPipeline::deinitialize_gstreamer() -> void
{
    std::scoped_lock lock(g_gstreamer_mutex);
    if (g_gstreamer_initialized.load()) {
        gst_deinit();
        g_gstreamer_initialized.store(false);
    }
}

auto GStreamerPipeline::create_pipeline(const PipelineConfig &config) -> std::expected<void, GStreamerError>
{

    if (!g_gstreamer_initialized.load()) { return std::unexpected(GStreamerError::InitializationFailed); }

    impl_->cleanup();

    GError *error = nullptr;
    impl_->pipeline = gst_parse_launch(config.pipeline_description.c_str(), &error);

    if (error != nullptr) {
        g_error_free(error);
        return std::unexpected(GStreamerError::PipelineCreationFailed);
    }

    if (impl_->pipeline == nullptr) { return std::unexpected(GStreamerError::PipelineCreationFailed); }

    if (config.enable_tensor_meta) {}

    return {};
}

auto GStreamerPipeline::create_pipeline_from_string(const std::string &description)
  -> std::expected<void, GStreamerError>
{
    PipelineConfig config;
    config.pipeline_description = description;
    return create_pipeline(config);
}

auto GStreamerPipeline::create_inference_pipeline(const std::string &input_source,
  const std::string &model_path,
  const std::vector<std::size_t> &input_shape,
  const std::string &output_format) -> std::expected<void, GStreamerError>
{

    if (!g_gstreamer_initialized.load()) { return std::unexpected(GStreamerError::InitializationFailed); }

    impl_->cleanup();

    std::string shape_str;
    for (std::size_t i = 0; i < input_shape.size(); ++i) {
        if (i > 0) { shape_str += ",";
}
        shape_str += std::to_string(input_shape[i]);
    }

    std::string pipeline_desc;

    if (input_source.find("://") != std::string::npos) {
        pipeline_desc = "uridecodebin uri=" + input_source + " ! ";
    } else if (input_source.starts_with("/dev/video")) {
        pipeline_desc = "v4l2src device=" + input_source + " ! ";
    } else {
        pipeline_desc = "filesrc location=" + input_source + " ! decodebin ! ";
    }

    pipeline_desc +=
      "videoconvert ! videoscale ! "
      "video/x-raw,format=RGB,width=640,height=480 ! "
      "tensor_transform mode=transpose option=1:2:0:3 ! "
      "tensor_transform mode=typecast option=float32 ! "
      "onnxruntime model-path="
      + model_path;

    if (output_format == "TENSOR") { pipeline_desc += " ! tensor_decoder mode=direct"; }

    pipeline_desc += " ! appsink name=sink emit-signals=true";

    GError *error = nullptr;
    impl_->pipeline = gst_parse_launch(pipeline_desc.c_str(), &error);

    if (error != nullptr) {
        g_error_free(error);
        return std::unexpected(GStreamerError::PipelineCreationFailed);
    }

    if (impl_->pipeline == nullptr) { return std::unexpected(GStreamerError::PipelineCreationFailed); }

    impl_->appsink = gst_bin_get_by_name(GST_BIN(impl_->pipeline), "sink");
    if (impl_->appsink == nullptr) { return std::unexpected(GStreamerError::ElementCreationFailed); }

    g_object_set(impl_->appsink, "emit-signals", TRUE, nullptr);

    return {};
}

auto GStreamerPipeline::start() -> std::expected<void, GStreamerError>
{
    if (impl_->pipeline == nullptr) { return std::unexpected(GStreamerError::PipelineCreationFailed); }

    GstStateChangeReturn ret = gst_element_set_state(impl_->pipeline, GST_STATE_PLAYING);

    if (ret == GST_STATE_CHANGE_FAILURE) { return std::unexpected(GStreamerError::StateChangeFailed); }

    impl_->is_playing.store(true);
    impl_->is_paused.store(false);
    return {};
}

auto GStreamerPipeline::stop() -> std::expected<void, GStreamerError>
{
    if (impl_->pipeline == nullptr) { return {}; }

    GstStateChangeReturn ret = gst_element_set_state(impl_->pipeline, GST_STATE_NULL);

    if (ret == GST_STATE_CHANGE_FAILURE) { return std::unexpected(GStreamerError::StateChangeFailed); }

    impl_->is_playing.store(false);
    impl_->is_paused.store(false);
    return {};
}

auto GStreamerPipeline::pause() -> std::expected<void, GStreamerError>
{
    if (impl_->pipeline == nullptr) { return std::unexpected(GStreamerError::PipelineCreationFailed); }

    GstStateChangeReturn ret = gst_element_set_state(impl_->pipeline, GST_STATE_PAUSED);

    if (ret == GST_STATE_CHANGE_FAILURE) { return std::unexpected(GStreamerError::StateChangeFailed); }

    impl_->is_paused.store(true);
    return {};
}

auto GStreamerPipeline::resume() -> std::expected<void, GStreamerError> { return start(); }

auto GStreamerPipeline::is_playing() const -> bool { return impl_->is_playing.load() && !impl_->is_paused.load(); }

auto GStreamerPipeline::is_paused() const -> bool { return impl_->is_paused.load(); }

auto GStreamerPipeline::set_buffer_callback(BufferCallback callback) -> void
{
    impl_->buffer_callback = std::move(callback);

    if (impl_->appsink != nullptr) {
        g_signal_connect(impl_->appsink, "new-sample", G_CALLBACK(&Impl::on_new_sample), impl_.get());
    }
}

auto GStreamerPipeline::pull_sample([[maybe_unused]] std::uint32_t timeout_ms)
  -> std::expected<BufferInfo, GStreamerError>
{

    if (impl_->appsink == nullptr) { return std::unexpected(GStreamerError::ElementCreationFailed); }

    GstSample *sample = nullptr;
    g_signal_emit_by_name(impl_->appsink, "pull-sample", &sample);

    if (sample == nullptr) { return std::unexpected(GStreamerError::BufferAllocationFailed); }

    GstBuffer *buffer = gst_sample_get_buffer(sample);
    GstCaps *caps = gst_sample_get_caps(sample);
    GstMapInfo map_info;

    BufferInfo buffer_info;

    if ((buffer != nullptr) && (gst_buffer_map(buffer, &map_info, GST_MAP_READ) != 0)) {
        buffer_info.data = map_info.data;
        buffer_info.size = map_info.size;

        if (caps != nullptr) {
            GstStructure *structure = gst_caps_get_structure(caps, 0);
            if (structure != nullptr) {
                gst_structure_get_uint(structure, "width", &buffer_info.metadata.width);
                gst_structure_get_uint(structure, "height", &buffer_info.metadata.height);

                const gchar *format = gst_structure_get_string(structure, "format");
                if (format != nullptr) { buffer_info.metadata.format = format; }
            }
        }

        if (auto *meta = gst_buffer_get_tensor_meta(buffer)) {
            for (gsize i = 0; i < meta->num_tensors; ++i) {
                const GstTensor *tensor = gst_tensor_meta_get(meta, i);
                TensorMeta tm;
                tm.tensor_index = i;
                tm.num_tensors = meta->num_tensors;
                tm.data_type = static_cast<int>(tensor->data_type);

                gsize num_dims = 0;
                auto *dims = gst_tensor_get_dims(const_cast<GstTensor *>(tensor), &num_dims);
                for (gsize j = 0; j < num_dims; ++j) { tm.dimensions.push_back(dims[j]); }
                buffer_info.tensors.push_back(std::move(tm));
            }
        }

        gst_buffer_unmap(buffer, &map_info);
    }

    gst_sample_unref(sample);
    return buffer_info;
}

auto GStreamerPipeline::push_buffer(void *data, std::size_t size, const FrameMetadata &metadata)
  -> std::expected<void, GStreamerError>
{

    if (impl_->appsrc == nullptr) { return std::unexpected(GStreamerError::ElementCreationFailed); }

    GstBuffer *buffer = gst_buffer_new_allocate(nullptr, size, nullptr);
    if (buffer == nullptr) { return std::unexpected(GStreamerError::BufferAllocationFailed); }

    GstMapInfo map_info;
    if (gst_buffer_map(buffer, &map_info, GST_MAP_WRITE) == 0) {
        gst_buffer_unref(buffer);
        return std::unexpected(GStreamerError::BufferAllocationFailed);
    }

    std::memcpy(map_info.data, data, size);
    gst_buffer_unmap(buffer, &map_info);

    GST_BUFFER_PTS(buffer) = metadata.timestamp_ns;
    GST_BUFFER_DURATION(buffer) = metadata.duration_ns;

    GstFlowReturn ret;
    g_signal_emit_by_name(impl_->appsrc, "push-buffer", buffer, &ret);
    gst_buffer_unref(buffer);

    if (ret != GST_FLOW_OK) { return std::unexpected(GStreamerError::StreamError); }

    return {};
}

auto GStreamerPipeline::get_position_ns() const -> std::expected<std::uint64_t, GStreamerError>
{
    if (impl_->pipeline == nullptr) { return std::unexpected(GStreamerError::PipelineCreationFailed); }

    gint64 position = 0;
    if (gst_element_query_position(impl_->pipeline, GST_FORMAT_TIME, &position) == 0) {
        return std::unexpected(GStreamerError::ResourceNotFound);
    }

    return static_cast<std::uint64_t>(position);
}

auto GStreamerPipeline::get_duration_ns() const -> std::expected<std::uint64_t, GStreamerError>
{
    if (impl_->pipeline == nullptr) { return std::unexpected(GStreamerError::PipelineCreationFailed); }

    gint64 duration = 0;
    if (gst_element_query_duration(impl_->pipeline, GST_FORMAT_TIME, &duration) == 0) {
        return std::unexpected(GStreamerError::ResourceNotFound);
    }

    return static_cast<std::uint64_t>(duration);
}

auto GStreamerPipeline::seek(std::uint64_t timestamp_ns) -> std::expected<void, GStreamerError>
{
    if (impl_->pipeline == nullptr) { return std::unexpected(GStreamerError::PipelineCreationFailed); }

    if (gst_element_seek_simple(impl_->pipeline,
          GST_FORMAT_TIME,
          static_cast<GstSeekFlags>(GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_KEY_UNIT),
          static_cast<gint64>(timestamp_ns)) == 0) {
        return std::unexpected(GStreamerError::ResourceNotFound);
    }

    return {};
}

auto GStreamerPipeline::get_caps_string() const -> std::string { return impl_->caps_string; }

auto GStreamerPipeline::get_current_state() const -> int
{
    if (impl_->pipeline == nullptr) { return GST_STATE_NULL; }

    GstState state;
    gst_element_get_state(impl_->pipeline, &state, nullptr, 0);
    return static_cast<int>(state);
}

auto create_video_inference_pipeline(const std::string &video_source,
  const std::string &model_path,
  std::uint32_t width,
  std::uint32_t height,
  const std::string &output_sink) -> std::expected<GStreamerPipeline, GStreamerError>
{

    auto init_result = GStreamerPipeline::initialize_gstreamer();
    if (!init_result) { return std::unexpected(init_result.error()); }

    GStreamerPipeline pipeline;

    std::string pipeline_desc;

    if (video_source.find("://") != std::string::npos) {
        pipeline_desc = "uridecodebin uri=" + video_source + " ! ";
    } else {
        pipeline_desc = "filesrc location=" + video_source + " ! decodebin ! ";
    }

    pipeline_desc += "videoconvert ! videoscale ! "
        "video/x-raw,format=RGB,width=" + std::to_string(width) +
        ",height=" + std::to_string(height) + " ! "
        "tensor_transform mode=transpose option=1:2:0:3 ! "
        "tensor_transform mode=typecast option=float32 ! "
        "onnxruntime model-path=" + model_path;

    if (output_sink == "appsink") {
        pipeline_desc += " ! appsink name=sink emit-signals=true sync=false";
    } else if (output_sink == "fakesink") {
        pipeline_desc += " ! fakesink sync=false";
    }

    auto result = pipeline.create_pipeline_from_string(pipeline_desc);
    if (!result) { return std::unexpected(result.error()); }

    return pipeline;
}

auto create_camera_inference_pipeline(const std::string &device,
  const std::string &model_path,
  std::uint32_t width,
  std::uint32_t height,
  std::uint32_t fps) -> std::expected<GStreamerPipeline, GStreamerError>
{

    auto init_result = GStreamerPipeline::initialize_gstreamer();
    if (!init_result) { return std::unexpected(init_result.error()); }

    GStreamerPipeline pipeline;

    std::string pipeline_desc =
        "v4l2src device=" + device + " ! "
        "video/x-raw,width=" + std::to_string(width) +
        ",height=" + std::to_string(height) +
        ",framerate=" + std::to_string(fps) + "/1 ! "
        "videoconvert ! "
        "tensor_transform mode=transpose option=1:2:0:3 ! "
        "tensor_transform mode=typecast option=float32 ! "
        "onnxruntime model-path=" + model_path + " ! "
        "appsink name=sink emit-signals=true sync=false";

    auto result = pipeline.create_pipeline_from_string(pipeline_desc);
    if (!result) { return std::unexpected(result.error()); }

    return pipeline;
}

}// namespace kataglyphis::gstreamer