
.. _program_listing_file_Src_gstreamer_pipeline.cpp:

Program Listing for File gstreamer_pipeline.cpp
===============================================

|exhale_lsh| :ref:`Return to documentation for file <file_Src_gstreamer_pipeline.cpp>` (``Src/gstreamer_pipeline.cpp``)

.. |exhale_lsh| unicode:: U+021B0 .. UPWARDS ARROW WITH TIP LEFTWARDS

.. code-block:: cpp

   module;
   
   #include "kataglyphis_export.h"
   #include <atomic>
   #include <cstring>
   #include <expected>
   #include <gst/analytics/analytics.h>
   #include <gst/analytics/gsttensor.h>
   #include <gst/analytics/gsttensormeta.h>
   #include <gst/app/gstappsink.h>
   #include <gst/gst.h>
   #include <gst/video/video.h>
   #include <mutex>
   #include <string>
   #include <thread>
   #include <vector>
   
   module kataglyphis.gstreamer_pipeline;
   
   import kataglyphis.project_config;
   
   namespace kataglyphis::gstreamer {
   
   namespace {
       std::atomic<bool> g_gstreamer_initialized{ false };
       std::mutex g_gstreamer_mutex;
   }// namespace
   
   struct GStreamerPipeline::Impl
   {
       GstElement *pipeline{ nullptr };
       GstElement *appsink{ nullptr };
       GstElement *appsrc{ nullptr };
   
       BufferCallback buffer_callback;
       std::atomic<bool> is_playing{ false };
       std::atomic<bool> is_paused{ false };
       std::string caps_string;
   
       GMainLoop *main_loop{ nullptr };
       std::thread main_loop_thread;
   
       ~Impl() { cleanup(); }
   
       void cleanup()
       {
           if (pipeline != nullptr) {
               gst_element_set_state(pipeline, GST_STATE_NULL);
               gst_object_unref(pipeline);
               pipeline = nullptr;
           }
           if (main_loop != nullptr) {
               g_main_loop_quit(main_loop);
               if (main_loop_thread.joinable()) { main_loop_thread.join(); }
               g_main_loop_unref(main_loop);
               main_loop = nullptr;
           }
       }
   
       static void on_new_sample(GstElement *sink, Impl *self)
       {
           GstSample *sample = nullptr;
           g_signal_emit_by_name(sink, "pull-sample", &sample);
   
           if (sample != nullptr) {
               GstBuffer *buffer = gst_sample_get_buffer(sample);
               GstCaps *caps = gst_sample_get_caps(sample);
               GstMapInfo map_info;
   
               if ((buffer != nullptr) && (gst_buffer_map(buffer, &map_info, GST_MAP_READ) != 0)) {
                   BufferInfo buffer_info;
                   buffer_info.data = map_info.data;
                   buffer_info.size = map_info.size;
   
                   if (auto *meta = gst_buffer_get_tensor_meta(buffer)) {
                       buffer_info.tensors.reserve(meta->num_tensors);
                       for (gsize i = 0; i < meta->num_tensors; ++i) {
                           const GstTensor *tensor = gst_tensor_meta_get(meta, i);
                           TensorMeta tm;
                           tm.tensor_index = i;
                           tm.num_tensors = meta->num_tensors;
                           tm.data_type = static_cast<int>(tensor->data_type);
   
                           gsize num_dims = 0;
                           auto *dims = gst_tensor_get_dims(const_cast<GstTensor *>(tensor), &num_dims);
                           for (gsize j = 0; j < num_dims; ++j) { tm.dimensions.push_back(dims[j]); }
                           buffer_info.tensors.push_back(std::move(tm));
                       }
                   }
   
                   GstStructure *structure = gst_caps_get_structure(caps, 0);
                   if (structure != nullptr) {
                       gst_structure_get_uint(structure, "width", &buffer_info.metadata.width);
                       gst_structure_get_uint(structure, "height", &buffer_info.metadata.height);
   
                       guint fps_n;
                       guint fps_d;
                       if (gst_structure_get_fraction(
                             structure, "framerate", reinterpret_cast<gint *>(&fps_n), reinterpret_cast<gint *>(&fps_d)) != 0) {
                           buffer_info.metadata.fps_n = fps_n;
                           buffer_info.metadata.fps_d = fps_d;
                       }
   
                       const gchar *format = gst_structure_get_string(structure, "format");
                       if (format != nullptr) { buffer_info.metadata.format = format; }
                   }
   
                   buffer_info.metadata.timestamp_ns = GST_BUFFER_PTS(buffer);
                   buffer_info.metadata.duration_ns = GST_BUFFER_DURATION(buffer);
   
                   if (self->buffer_callback) { self->buffer_callback(buffer_info); }
   
                   gst_buffer_unmap(buffer, &map_info);
               }
               gst_sample_unref(sample);
           }
       }
   
       static auto on_pad_probe([[maybe_unused]] GstPad *pad,
         [[maybe_unused]] GstPadProbeInfo *info,
         [[maybe_unused]] gpointer user_data) -> GstPadProbeReturn
       {
           return GST_PAD_PROBE_OK;
       }
   };
   
   GStreamerPipeline::GStreamerPipeline() : impl_(std::make_unique<Impl>()) {}
   
   GStreamerPipeline::~GStreamerPipeline() = default;
   
   GStreamerPipeline::GStreamerPipeline(GStreamerPipeline &&other) noexcept : impl_(std::move(other.impl_)) {}
   
   auto GStreamerPipeline::operator=(GStreamerPipeline &&other) noexcept -> GStreamerPipeline &
   {
       if (this != &other) { impl_ = std::move(other.impl_); }
       return *this;
   }
   
   auto GStreamerPipeline::initialize_gstreamer(int *argc, char ***argv) -> std::expected<void, GStreamerError>
   {
   
       std::scoped_lock lock(g_gstreamer_mutex);
   
       if (g_gstreamer_initialized.load()) { return {}; }
   
       GError *error = nullptr;
       if (gst_init_check(argc, argv, &error) == 0) {
           if (error != nullptr) { g_error_free(error); }
           return std::unexpected(GStreamerError::InitializationFailed);
       }
   
       g_gstreamer_initialized.store(true);
       return {};
   }
   
   auto GStreamerPipeline::deinitialize_gstreamer() -> void
   {
       std::scoped_lock lock(g_gstreamer_mutex);
       if (g_gstreamer_initialized.load()) {
           gst_deinit();
           g_gstreamer_initialized.store(false);
       }
   }
   
   auto GStreamerPipeline::create_pipeline(const PipelineConfig &config) -> std::expected<void, GStreamerError>
   {
   
       if (!g_gstreamer_initialized.load()) { return std::unexpected(GStreamerError::InitializationFailed); }
   
       impl_->cleanup();
   
       GError *error = nullptr;
       impl_->pipeline = gst_parse_launch(config.pipeline_description.c_str(), &error);
   
       if (error != nullptr) {
           g_error_free(error);
           return std::unexpected(GStreamerError::PipelineCreationFailed);
       }
   
       if (impl_->pipeline == nullptr) { return std::unexpected(GStreamerError::PipelineCreationFailed); }
   
       if (config.enable_tensor_meta) {}
   
       return {};
   }
   
   auto GStreamerPipeline::create_pipeline_from_string(const std::string &description)
     -> std::expected<void, GStreamerError>
   {
       PipelineConfig config;
       config.pipeline_description = description;
       return create_pipeline(config);
   }
   
   auto GStreamerPipeline::create_inference_pipeline(const std::string &input_source,
     const std::string &model_path,
     const std::vector<std::size_t> &input_shape,
     const std::string &output_format) -> std::expected<void, GStreamerError>
   {
   
       if (!g_gstreamer_initialized.load()) { return std::unexpected(GStreamerError::InitializationFailed); }
   
       impl_->cleanup();
   
       std::string shape_str;
       for (std::size_t i = 0; i < input_shape.size(); ++i) {
           if (i > 0) { shape_str += ",";
   }
           shape_str += std::to_string(input_shape[i]);
       }
   
       std::string pipeline_desc;
   
       if (input_source.find("://") != std::string::npos) {
           pipeline_desc = "uridecodebin uri=" + input_source + " ! ";
       } else if (input_source.starts_with("/dev/video")) {
           pipeline_desc = "v4l2src device=" + input_source + " ! ";
       } else {
           pipeline_desc = "filesrc location=" + input_source + " ! decodebin ! ";
       }
   
       pipeline_desc +=
         "videoconvert ! videoscale ! "
         "video/x-raw,format=RGB,width=640,height=480 ! "
         "tensor_transform mode=transpose option=1:2:0:3 ! "
         "tensor_transform mode=typecast option=float32 ! "
         "onnxruntime model-path="
         + model_path;
   
       if (output_format == "TENSOR") { pipeline_desc += " ! tensor_decoder mode=direct"; }
   
       pipeline_desc += " ! appsink name=sink emit-signals=true";
   
       GError *error = nullptr;
       impl_->pipeline = gst_parse_launch(pipeline_desc.c_str(), &error);
   
       if (error != nullptr) {
           g_error_free(error);
           return std::unexpected(GStreamerError::PipelineCreationFailed);
       }
   
       if (impl_->pipeline == nullptr) { return std::unexpected(GStreamerError::PipelineCreationFailed); }
   
       impl_->appsink = gst_bin_get_by_name(GST_BIN(impl_->pipeline), "sink");
       if (impl_->appsink == nullptr) { return std::unexpected(GStreamerError::ElementCreationFailed); }
   
       g_object_set(impl_->appsink, "emit-signals", TRUE, nullptr);
   
       return {};
   }
   
   auto GStreamerPipeline::start() -> std::expected<void, GStreamerError>
   {
       if (impl_->pipeline == nullptr) { return std::unexpected(GStreamerError::PipelineCreationFailed); }
   
       GstStateChangeReturn ret = gst_element_set_state(impl_->pipeline, GST_STATE_PLAYING);
   
       if (ret == GST_STATE_CHANGE_FAILURE) { return std::unexpected(GStreamerError::StateChangeFailed); }
   
       impl_->is_playing.store(true);
       impl_->is_paused.store(false);
       return {};
   }
   
   auto GStreamerPipeline::stop() -> std::expected<void, GStreamerError>
   {
       if (impl_->pipeline == nullptr) { return {}; }
   
       GstStateChangeReturn ret = gst_element_set_state(impl_->pipeline, GST_STATE_NULL);
   
       if (ret == GST_STATE_CHANGE_FAILURE) { return std::unexpected(GStreamerError::StateChangeFailed); }
   
       impl_->is_playing.store(false);
       impl_->is_paused.store(false);
       return {};
   }
   
   auto GStreamerPipeline::pause() -> std::expected<void, GStreamerError>
   {
       if (impl_->pipeline == nullptr) { return std::unexpected(GStreamerError::PipelineCreationFailed); }
   
       GstStateChangeReturn ret = gst_element_set_state(impl_->pipeline, GST_STATE_PAUSED);
   
       if (ret == GST_STATE_CHANGE_FAILURE) { return std::unexpected(GStreamerError::StateChangeFailed); }
   
       impl_->is_paused.store(true);
       return {};
   }
   
   auto GStreamerPipeline::resume() -> std::expected<void, GStreamerError> { return start(); }
   
   auto GStreamerPipeline::is_playing() const -> bool { return impl_->is_playing.load() && !impl_->is_paused.load(); }
   
   auto GStreamerPipeline::is_paused() const -> bool { return impl_->is_paused.load(); }
   
   auto GStreamerPipeline::set_buffer_callback(BufferCallback callback) -> void
   {
       impl_->buffer_callback = std::move(callback);
   
       if (impl_->appsink != nullptr) {
           g_signal_connect(impl_->appsink, "new-sample", G_CALLBACK(&Impl::on_new_sample), impl_.get());
       }
   }
   
   auto GStreamerPipeline::pull_sample([[maybe_unused]] std::uint32_t timeout_ms)
     -> std::expected<BufferInfo, GStreamerError>
   {
   
       if (impl_->appsink == nullptr) { return std::unexpected(GStreamerError::ElementCreationFailed); }
   
       GstSample *sample = nullptr;
       g_signal_emit_by_name(impl_->appsink, "pull-sample", &sample);
   
       if (sample == nullptr) { return std::unexpected(GStreamerError::BufferAllocationFailed); }
   
       GstBuffer *buffer = gst_sample_get_buffer(sample);
       GstCaps *caps = gst_sample_get_caps(sample);
       GstMapInfo map_info;
   
       BufferInfo buffer_info;
   
       if ((buffer != nullptr) && (gst_buffer_map(buffer, &map_info, GST_MAP_READ) != 0)) {
           buffer_info.data = map_info.data;
           buffer_info.size = map_info.size;
   
           if (caps != nullptr) {
               GstStructure *structure = gst_caps_get_structure(caps, 0);
               if (structure != nullptr) {
                   gst_structure_get_uint(structure, "width", &buffer_info.metadata.width);
                   gst_structure_get_uint(structure, "height", &buffer_info.metadata.height);
   
                   const gchar *format = gst_structure_get_string(structure, "format");
                   if (format != nullptr) { buffer_info.metadata.format = format; }
               }
           }
   
           if (auto *meta = gst_buffer_get_tensor_meta(buffer)) {
               for (gsize i = 0; i < meta->num_tensors; ++i) {
                   const GstTensor *tensor = gst_tensor_meta_get(meta, i);
                   TensorMeta tm;
                   tm.tensor_index = i;
                   tm.num_tensors = meta->num_tensors;
                   tm.data_type = static_cast<int>(tensor->data_type);
   
                   gsize num_dims = 0;
                   auto *dims = gst_tensor_get_dims(const_cast<GstTensor *>(tensor), &num_dims);
                   for (gsize j = 0; j < num_dims; ++j) { tm.dimensions.push_back(dims[j]); }
                   buffer_info.tensors.push_back(std::move(tm));
               }
           }
   
           gst_buffer_unmap(buffer, &map_info);
       }
   
       gst_sample_unref(sample);
       return buffer_info;
   }
   
   auto GStreamerPipeline::push_buffer(void *data, std::size_t size, const FrameMetadata &metadata)
     -> std::expected<void, GStreamerError>
   {
   
       if (impl_->appsrc == nullptr) { return std::unexpected(GStreamerError::ElementCreationFailed); }
   
       GstBuffer *buffer = gst_buffer_new_allocate(nullptr, size, nullptr);
       if (buffer == nullptr) { return std::unexpected(GStreamerError::BufferAllocationFailed); }
   
       GstMapInfo map_info;
       if (gst_buffer_map(buffer, &map_info, GST_MAP_WRITE) == 0) {
           gst_buffer_unref(buffer);
           return std::unexpected(GStreamerError::BufferAllocationFailed);
       }
   
       std::memcpy(map_info.data, data, size);
       gst_buffer_unmap(buffer, &map_info);
   
       GST_BUFFER_PTS(buffer) = metadata.timestamp_ns;
       GST_BUFFER_DURATION(buffer) = metadata.duration_ns;
   
       GstFlowReturn ret;
       g_signal_emit_by_name(impl_->appsrc, "push-buffer", buffer, &ret);
       gst_buffer_unref(buffer);
   
       if (ret != GST_FLOW_OK) { return std::unexpected(GStreamerError::StreamError); }
   
       return {};
   }
   
   auto GStreamerPipeline::get_position_ns() const -> std::expected<std::uint64_t, GStreamerError>
   {
       if (impl_->pipeline == nullptr) { return std::unexpected(GStreamerError::PipelineCreationFailed); }
   
       gint64 position = 0;
       if (gst_element_query_position(impl_->pipeline, GST_FORMAT_TIME, &position) == 0) {
           return std::unexpected(GStreamerError::ResourceNotFound);
       }
   
       return static_cast<std::uint64_t>(position);
   }
   
   auto GStreamerPipeline::get_duration_ns() const -> std::expected<std::uint64_t, GStreamerError>
   {
       if (impl_->pipeline == nullptr) { return std::unexpected(GStreamerError::PipelineCreationFailed); }
   
       gint64 duration = 0;
       if (gst_element_query_duration(impl_->pipeline, GST_FORMAT_TIME, &duration) == 0) {
           return std::unexpected(GStreamerError::ResourceNotFound);
       }
   
       return static_cast<std::uint64_t>(duration);
   }
   
   auto GStreamerPipeline::seek(std::uint64_t timestamp_ns) -> std::expected<void, GStreamerError>
   {
       if (impl_->pipeline == nullptr) { return std::unexpected(GStreamerError::PipelineCreationFailed); }
   
       if (gst_element_seek_simple(impl_->pipeline,
             GST_FORMAT_TIME,
             static_cast<GstSeekFlags>(GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_KEY_UNIT),
             static_cast<gint64>(timestamp_ns)) == 0) {
           return std::unexpected(GStreamerError::ResourceNotFound);
       }
   
       return {};
   }
   
   auto GStreamerPipeline::get_caps_string() const -> std::string { return impl_->caps_string; }
   
   auto GStreamerPipeline::get_current_state() const -> int
   {
       if (impl_->pipeline == nullptr) { return GST_STATE_NULL; }
   
       GstState state;
       gst_element_get_state(impl_->pipeline, &state, nullptr, 0);
       return static_cast<int>(state);
   }
   
   auto create_video_inference_pipeline(const std::string &video_source,
     const std::string &model_path,
     std::uint32_t width,
     std::uint32_t height,
     const std::string &output_sink) -> std::expected<GStreamerPipeline, GStreamerError>
   {
   
       auto init_result = GStreamerPipeline::initialize_gstreamer();
       if (!init_result) { return std::unexpected(init_result.error()); }
   
       GStreamerPipeline pipeline;
   
       std::string pipeline_desc;
   
       if (video_source.find("://") != std::string::npos) {
           pipeline_desc = "uridecodebin uri=" + video_source + " ! ";
       } else {
           pipeline_desc = "filesrc location=" + video_source + " ! decodebin ! ";
       }
   
       pipeline_desc += "videoconvert ! videoscale ! "
           "video/x-raw,format=RGB,width=" + std::to_string(width) + 
           ",height=" + std::to_string(height) + " ! "
           "tensor_transform mode=transpose option=1:2:0:3 ! "
           "tensor_transform mode=typecast option=float32 ! "
           "onnxruntime model-path=" + model_path;
   
       if (output_sink == "appsink") {
           pipeline_desc += " ! appsink name=sink emit-signals=true sync=false";
       } else if (output_sink == "fakesink") {
           pipeline_desc += " ! fakesink sync=false";
       }
   
       auto result = pipeline.create_pipeline_from_string(pipeline_desc);
       if (!result) { return std::unexpected(result.error()); }
   
       return pipeline;
   }
   
   auto create_camera_inference_pipeline(const std::string &device,
     const std::string &model_path,
     std::uint32_t width,
     std::uint32_t height,
     std::uint32_t fps) -> std::expected<GStreamerPipeline, GStreamerError>
   {
   
       auto init_result = GStreamerPipeline::initialize_gstreamer();
       if (!init_result) { return std::unexpected(init_result.error()); }
   
       GStreamerPipeline pipeline;
   
       std::string pipeline_desc = 
           "v4l2src device=" + device + " ! "
           "video/x-raw,width=" + std::to_string(width) + 
           ",height=" + std::to_string(height) +
           ",framerate=" + std::to_string(fps) + "/1 ! "
           "videoconvert ! "
           "tensor_transform mode=transpose option=1:2:0:3 ! "
           "tensor_transform mode=typecast option=float32 ! "
           "onnxruntime model-path=" + model_path + " ! "
           "appsink name=sink emit-signals=true sync=false";
   
       auto result = pipeline.create_pipeline_from_string(pipeline_desc);
       if (!result) { return std::unexpected(result.error()); }
   
       return pipeline;
   }
   
   }// namespace kataglyphis::gstreamer
