Program Listing for File webrtc_streamer.cpp#
↰ Return to documentation for file (Src/webrtc_streamer.cpp)
module;
#include "kataglyphis_export.h"
#include <atomic>
#include <chrono>
#include <cstring>
#include <expected>
#include <filesystem>
#include <gst/gst.h>
#include <gst/sdp/sdp.h>
#include <mutex>
#include <sstream>
#include <string>
#include <thread>
#define GST_USE_UNSTABLE_API
#include <gst/webrtc/webrtc.h>
module kataglyphis.webrtc_streamer;
import kataglyphis.project_config;
import kataglyphis.config_loader;
namespace kataglyphis::webrtc {
namespace {
std::atomic<bool> g_gstreamer_initialized{ false };
std::mutex g_gstreamer_mutex;
auto error_to_string(WebRTCError error) -> const char *
{
switch (error) {
case WebRTCError::InitializationFailed:
return "GStreamer initialization failed";
case WebRTCError::SignallingConnectionFailed:
return "Failed to connect to signalling server";
case WebRTCError::PipelineCreationFailed:
return "Failed to create GStreamer pipeline";
case WebRTCError::NegotiationFailed:
return "WebRTC negotiation failed";
case WebRTCError::MediaError:
return "Media error";
case WebRTCError::LibcameraNotAvailable:
return "libcamera not available";
case WebRTCError::EncoderNotAvailable:
return "Video encoder not available";
case WebRTCError::StateChangeFailed:
return "Pipeline state change failed";
case WebRTCError::InvalidConfiguration:
return "Invalid configuration";
case WebRTCError::Timeout:
return "Operation timed out";
}
return "Unknown error";
}
}// namespace
struct WebRTCStreamer::Impl
{
StreamConfig config;
GstElement *pipeline{ nullptr };
GstElement *webrtcsink{ nullptr };
GMainLoop *main_loop{ nullptr };
std::thread main_loop_thread;
std::atomic<StreamState> state{ StreamState::Idle };
StateCallback state_callback;
ErrorCallback error_callback;
std::string producer_id;
std::mutex callback_mutex;
~Impl() { cleanup(); }
void cleanup()
{
if (pipeline != nullptr) {
gst_element_set_state(pipeline, GST_STATE_NULL);
gst_object_unref(pipeline);
pipeline = nullptr;
}
webrtcsink = nullptr;
if (main_loop != nullptr) {
g_main_loop_quit(main_loop);
if (main_loop_thread.joinable()) { main_loop_thread.join(); }
g_main_loop_unref(main_loop);
main_loop = nullptr;
}
}
void set_state(StreamState new_state)
{
StreamState old_state = state.exchange(new_state);
if (old_state != new_state && state_callback) {
std::scoped_lock lock(callback_mutex);
state_callback(old_state, new_state);
}
}
void report_error(WebRTCError error, const std::string &message = "")
{
set_state(StreamState::Error);
if (error_callback) {
std::scoped_lock lock(callback_mutex);
std::string full_message = error_to_string(error);
if (!message.empty()) { full_message += ": " + message; }
error_callback(error, full_message);
}
}
auto build_source_element() const -> std::string
{
std::ostringstream ss;
switch (config.source) {
case VideoSource::Libcamera:
// libcamerasrc for Raspberry Pi cameras
ss << "libcamerasrc";
if (!config.camera_id.empty()) { ss << " camera-name=\"" << config.camera_id << "\""; }
ss << " ! video/x-raw,width=" << config.width << ",height=" << config.height
<< ",framerate=" << config.framerate << "/1";
break;
case VideoSource::V4L2:
ss << "v4l2src device=" << config.v4l2_device << " ! video/x-raw,width=" << config.width
<< ",height=" << config.height << ",framerate=" << config.framerate << "/1";
break;
case VideoSource::TestPattern:
ss << "videotestsrc is-live=true pattern=ball"
<< " ! video/x-raw,width=" << config.width << ",height=" << config.height
<< ",framerate=" << config.framerate << "/1";
break;
}
return ss.str();
}
auto build_pipeline_description() -> std::string
{
std::ostringstream ss;
// Source
ss << build_source_element();
// Video conversion for compatibility
ss << " ! videoconvert";
// webrtcsink handles encoding, payloading, and signalling automatically
ss << " ! webrtcsink name=ws";
// Configure signalling server URI
ss << " signaller::uri=" << config.signalling_server_uri;
// Set up video encoding based on encoder preference
switch (config.encoder) {
case VideoEncoder::H264_Hardware:
case VideoEncoder::H264_Software:
ss << " video-caps=\"video/x-h264\"";
break;
case VideoEncoder::VP8:
ss << " video-caps=\"video/x-vp8\"";
break;
case VideoEncoder::VP9:
ss << " video-caps=\"video/x-vp9\"";
break;
}
// Add STUN server
if (!config.stun_servers.empty()) {
ss << " stun-server=" << config.stun_servers[0];
} else {
ss << " stun-server=stun://stun.l.google.com:19302";
}
return ss.str();
}
// Callback for when pipeline encounters an error
static auto on_bus_error(GstBus * /* bus */, GstMessage *msg, gpointer user_data) -> gboolean
{
auto *impl = static_cast<Impl *>(user_data);
GError *error = nullptr;
gchar *debug_info = nullptr;
gst_message_parse_error(msg, &error, &debug_info);
std::string error_message = (error != nullptr) ? error->message : "Unknown error";
if (debug_info != nullptr) { error_message += " (" + std::string(debug_info) + ")"; }
g_printerr("Pipeline error: %s\n", error_message.c_str());
impl->report_error(WebRTCError::MediaError, error_message);
if (error != nullptr) { g_error_free(error);
}
if (debug_info != nullptr) { g_free(debug_info);
}
return TRUE;
}
// Callback for state changes
static auto on_bus_state_changed(GstBus * /* bus */, GstMessage *msg, gpointer user_data) -> gboolean
{
auto *impl = static_cast<Impl *>(user_data);
if (GST_MESSAGE_SRC(msg) != GST_OBJECT(impl->pipeline)) { return TRUE; }
GstState old_state;
GstState new_state;
GstState pending;
gst_message_parse_state_changed(msg, &old_state, &new_state, &pending);
g_print(
"Pipeline state: %s -> %s\n", gst_element_state_get_name(old_state), gst_element_state_get_name(new_state));
if (new_state == GST_STATE_PLAYING) {
impl->set_state(StreamState::Streaming);
} else if (new_state == GST_STATE_PAUSED && impl->state.load() == StreamState::Streaming) {
impl->set_state(StreamState::Paused);
}
return TRUE;
}
// Callback for EOS
static auto on_bus_eos(GstBus * /* bus */, GstMessage * /* msg */, gpointer user_data) -> gboolean
{
auto *impl = static_cast<Impl *>(user_data);
g_print("End of stream\n");
impl->set_state(StreamState::Disconnected);
return TRUE;
}
// Callback for element messages (webrtcsink status updates)
static auto on_bus_element(GstBus * /* bus */, GstMessage *msg, gpointer user_data) -> gboolean
{
auto *impl = static_cast<Impl *>(user_data);
const GstStructure *structure = gst_message_get_structure(msg);
if (structure == nullptr) { return TRUE;
}
const gchar *name = gst_structure_get_name(structure);
if (g_str_has_prefix(name, "webrtcsink")) {
g_print("WebRTC event: %s\n", name);
// Check for connection established
if (g_str_has_suffix(name, "consumer-added")) {
g_print("WebRTC peer connected!\n");
impl->set_state(StreamState::Streaming);
} else if (g_str_has_suffix(name, "consumer-removed")) {
g_print("WebRTC peer disconnected\n");
}
}
return TRUE;
}
};
WebRTCStreamer::WebRTCStreamer() : impl_(std::make_unique<Impl>()) {}
WebRTCStreamer::~WebRTCStreamer() = default;
WebRTCStreamer::WebRTCStreamer(WebRTCStreamer &&other) noexcept : impl_(std::move(other.impl_)) {}
auto WebRTCStreamer::operator=(WebRTCStreamer &&other) noexcept -> WebRTCStreamer &
{
if (this != &other) { impl_ = std::move(other.impl_); }
return *this;
}
auto WebRTCStreamer::initialize(int *argc, char ***argv) -> std::expected<void, WebRTCError>
{
std::scoped_lock lock(g_gstreamer_mutex);
if (g_gstreamer_initialized.load()) { return {}; }
GError *error = nullptr;
if (gst_init_check(argc, argv, &error) == 0) {
if (error != nullptr) {
g_printerr("GStreamer init failed: %s\n", error->message);
g_error_free(error);
}
return std::unexpected(WebRTCError::InitializationFailed);
}
// Verify webrtcsink is available
GstElementFactory *factory = gst_element_factory_find("webrtcsink");
if (factory == nullptr) {
g_printerr("webrtcsink element not found. Checking for webrtcbin fallback...\n");
// Fall back to webrtcbin if webrtcsink is not available
factory = gst_element_factory_find("webrtcbin");
if (factory == nullptr) {
g_printerr("Neither webrtcsink nor webrtcbin found!\n");
gst_deinit();
return std::unexpected(WebRTCError::InitializationFailed);
}
g_print("Using webrtcbin (manual signalling required)\n");
} else {
g_print("Using webrtcsink (automatic signalling)\n");
}
gst_object_unref(factory);
g_gstreamer_initialized.store(true);
return {};
}
auto WebRTCStreamer::deinitialize() -> void
{
std::scoped_lock lock(g_gstreamer_mutex);
if (g_gstreamer_initialized.load()) {
gst_deinit();
g_gstreamer_initialized.store(false);
}
}
auto WebRTCStreamer::configure(const StreamConfig &config) -> std::expected<void, WebRTCError>
{
if (!g_gstreamer_initialized.load()) { return std::unexpected(WebRTCError::InitializationFailed); }
// Validate configuration
if (config.width == 0 || config.height == 0 || config.framerate == 0) {
return std::unexpected(WebRTCError::InvalidConfiguration);
}
if (config.signalling_server_uri.empty()) { return std::unexpected(WebRTCError::InvalidConfiguration); }
impl_->cleanup();
impl_->config = config;
impl_->set_state(StreamState::Idle);
// Generate producer ID if not specified
if (config.producer_id.empty()) {
impl_->producer_id =
"stream-" + std::to_string(std::chrono::steady_clock::now().time_since_epoch().count() % 100000);
} else {
impl_->producer_id = config.producer_id;
}
// Build pipeline
std::string pipeline_desc = impl_->build_pipeline_description();
g_print("Pipeline: %s\n", pipeline_desc.c_str());
GError *error = nullptr;
impl_->pipeline = gst_parse_launch(pipeline_desc.c_str(), &error);
if (error != nullptr) {
std::string error_msg = error->message;
g_printerr("Pipeline creation failed: %s\n", error_msg.c_str());
g_error_free(error);
return std::unexpected(WebRTCError::PipelineCreationFailed);
}
if (impl_->pipeline == nullptr) { return std::unexpected(WebRTCError::PipelineCreationFailed); }
// Get webrtcsink element
impl_->webrtcsink = gst_bin_get_by_name(GST_BIN(impl_->pipeline), "ws");
if (impl_->webrtcsink != nullptr) { g_print("Found webrtcsink element\n"); }
// Set up bus watches
GstBus *bus = gst_element_get_bus(impl_->pipeline);
gst_bus_add_signal_watch(bus);
g_signal_connect(bus, "message::error", G_CALLBACK(Impl::on_bus_error), impl_.get());
g_signal_connect(bus, "message::state-changed", G_CALLBACK(Impl::on_bus_state_changed), impl_.get());
g_signal_connect(bus, "message::eos", G_CALLBACK(Impl::on_bus_eos), impl_.get());
g_signal_connect(bus, "message::element", G_CALLBACK(Impl::on_bus_element), impl_.get());
gst_object_unref(bus);
return {};
}
auto WebRTCStreamer::start() -> std::expected<void, WebRTCError>
{
if (impl_->pipeline == nullptr) { return std::unexpected(WebRTCError::PipelineCreationFailed); }
impl_->set_state(StreamState::Connecting);
// Start main loop in background thread for GLib callbacks
impl_->main_loop = g_main_loop_new(nullptr, FALSE);
impl_->main_loop_thread = std::thread([this]() -> void { g_main_loop_run(impl_->main_loop); });
// Start the pipeline
g_print("Starting pipeline...\n");
g_print("Connecting to signalling server: %s\n", impl_->config.signalling_server_uri.c_str());
GstStateChangeReturn ret = gst_element_set_state(impl_->pipeline, GST_STATE_PLAYING);
if (ret == GST_STATE_CHANGE_FAILURE) {
g_printerr("Failed to start pipeline\n");
impl_->set_state(StreamState::Error);
return std::unexpected(WebRTCError::StateChangeFailed);
}
// Wait for pipeline to reach PLAYING state (with timeout)
GstState state;
ret = gst_element_get_state(impl_->pipeline, &state, nullptr, 10 * GST_SECOND);
if (ret == GST_STATE_CHANGE_FAILURE) {
impl_->set_state(StreamState::Error);
return std::unexpected(WebRTCError::StateChangeFailed);
}
g_print("Pipeline started, waiting for peers to connect...\n");
return {};
}
auto WebRTCStreamer::stop() -> std::expected<void, WebRTCError>
{
if (impl_->pipeline == nullptr) { return {}; }
g_print("Stopping pipeline...\n");
// Send EOS to gracefully stop
gst_element_send_event(impl_->pipeline, gst_event_new_eos());
// Wait briefly for EOS to propagate
GstBus *bus = gst_element_get_bus(impl_->pipeline);
gst_bus_timed_pop_filtered(bus, GST_SECOND, GST_MESSAGE_EOS);
gst_object_unref(bus);
GstStateChangeReturn ret = gst_element_set_state(impl_->pipeline, GST_STATE_NULL);
impl_->set_state(StreamState::Idle);
// Stop main loop
if (impl_->main_loop != nullptr) {
g_main_loop_quit(impl_->main_loop);
if (impl_->main_loop_thread.joinable()) { impl_->main_loop_thread.join(); }
g_main_loop_unref(impl_->main_loop);
impl_->main_loop = nullptr;
}
if (ret == GST_STATE_CHANGE_FAILURE) { return std::unexpected(WebRTCError::StateChangeFailed); }
return {};
}
auto WebRTCStreamer::pause() -> std::expected<void, WebRTCError>
{
if (impl_->pipeline == nullptr) { return std::unexpected(WebRTCError::PipelineCreationFailed); }
GstStateChangeReturn ret = gst_element_set_state(impl_->pipeline, GST_STATE_PAUSED);
if (ret == GST_STATE_CHANGE_FAILURE) { return std::unexpected(WebRTCError::StateChangeFailed); }
impl_->set_state(StreamState::Paused);
return {};
}
auto WebRTCStreamer::resume() -> std::expected<void, WebRTCError>
{
if (impl_->pipeline == nullptr) { return std::unexpected(WebRTCError::PipelineCreationFailed); }
GstStateChangeReturn ret = gst_element_set_state(impl_->pipeline, GST_STATE_PLAYING);
if (ret == GST_STATE_CHANGE_FAILURE) { return std::unexpected(WebRTCError::StateChangeFailed); }
return {};
}
auto WebRTCStreamer::get_state() const -> StreamState { return impl_->state.load(); }
auto WebRTCStreamer::is_streaming() const -> bool { return impl_->state.load() == StreamState::Streaming; }
auto WebRTCStreamer::get_producer_id() const -> std::string { return impl_->producer_id; }
auto WebRTCStreamer::set_state_callback(StateCallback callback) -> void
{
std::scoped_lock lock(impl_->callback_mutex);
impl_->state_callback = std::move(callback);
}
auto WebRTCStreamer::set_error_callback(ErrorCallback callback) -> void
{
std::scoped_lock lock(impl_->callback_mutex);
impl_->error_callback = std::move(callback);
}
auto WebRTCStreamer::set_bitrate(std::uint32_t bitrate_kbps) -> std::expected<void, WebRTCError>
{
impl_->config.bitrate_kbps = bitrate_kbps;
// Note: Runtime bitrate changes would require encoder element access
return {};
}
// Factory functions
auto create_libcamera_webrtc_stream(const std::string &signalling_server,
std::uint32_t width,
std::uint32_t height,
std::uint32_t fps) -> std::expected<WebRTCStreamer, WebRTCError>
{
auto init_result = WebRTCStreamer::initialize();
if (!init_result) { return std::unexpected(init_result.error()); }
WebRTCStreamer streamer;
StreamConfig config;
config.source = VideoSource::Libcamera;
config.encoder = VideoEncoder::H264_Hardware;
config.signalling_server_uri = signalling_server;
config.width = width;
config.height = height;
config.framerate = fps;
auto configure_result = streamer.configure(config);
if (!configure_result) { return std::unexpected(configure_result.error()); }
return streamer;
}
auto create_v4l2_webrtc_stream(const std::string &signalling_server,
const std::string &device,
std::uint32_t width,
std::uint32_t height,
std::uint32_t fps) -> std::expected<WebRTCStreamer, WebRTCError>
{
auto init_result = WebRTCStreamer::initialize();
if (!init_result) { return std::unexpected(init_result.error()); }
WebRTCStreamer streamer;
StreamConfig config;
config.source = VideoSource::V4L2;
config.encoder = VideoEncoder::H264_Hardware;
config.signalling_server_uri = signalling_server;
config.v4l2_device = device;
config.width = width;
config.height = height;
config.framerate = fps;
auto configure_result = streamer.configure(config);
if (!configure_result) { return std::unexpected(configure_result.error()); }
return streamer;
}
auto create_test_webrtc_stream(const std::string &signalling_server) -> std::expected<WebRTCStreamer, WebRTCError>
{
auto init_result = WebRTCStreamer::initialize();
if (!init_result) { return std::unexpected(init_result.error()); }
WebRTCStreamer streamer;
StreamConfig config;
config.source = VideoSource::TestPattern;
config.encoder = VideoEncoder::H264_Software;
config.signalling_server_uri = signalling_server;
config.width = 1280;
config.height = 720;
config.framerate = 30;
auto configure_result = streamer.configure(config);
if (!configure_result) { return std::unexpected(configure_result.error()); }
return streamer;
}
// Helper to create StreamConfig from WebRTCConfig
auto create_stream_config_from_webrtc_config(const config::WebRTCConfig &webrtc_config,
VideoSource source,
VideoEncoder encoder) -> StreamConfig
{
StreamConfig config;
config.source = source;
config.encoder = encoder;
config.signalling_server_uri = webrtc_config.signaling_server_url;
config.width = webrtc_config.video.default_width;
config.height = webrtc_config.video.default_height;
config.framerate = webrtc_config.video.default_framerate;
config.bitrate_kbps = webrtc_config.video.default_bitrate_kbps;
config.stun_servers = webrtc_config.stun_servers;
config.turn_servers = webrtc_config.turn_servers;
return config;
}
// Factory function that loads config from JSON file
auto create_webrtc_stream_from_config(const std::filesystem::path &config_path,
VideoSource source,
VideoEncoder encoder) -> std::expected<WebRTCStreamer, WebRTCError>
{
// Initialize GStreamer if not already done
auto init_result = WebRTCStreamer::initialize();
if (!init_result) { return std::unexpected(init_result.error()); }
// Load configuration from JSON file
auto config_result = config::load_webrtc_config(config_path);
if (!config_result) {
g_printerr("Failed to load config from %s, using defaults\n", config_path.c_str());
// Use default config if loading fails
config_result = config::get_default_webrtc_config();
}
// Create StreamConfig from WebRTCConfig
StreamConfig stream_config = create_stream_config_from_webrtc_config(config_result.value(), source, encoder);
// Create and configure the streamer
WebRTCStreamer streamer;
auto configure_result = streamer.configure(stream_config);
if (!configure_result) { return std::unexpected(configure_result.error()); }
return streamer;
}
}// namespace kataglyphis::webrtc