如果需要将 glib 的 GMainLoop
替换成其他的消息循环机制,主要的麻烦在于 bus 消息的处理(比如主线程通常要监听 pipeline bus 上的 GST_MESSAGE_ERROR
和 GST_MESSAGE_EOS
消息),因此,需要理解 bus 的消息处理机制。
bus 消息处理
bus 的消息处理相关的信息记录在 bus->priv
字段中,即 GstBusPrivate
实例,bus->priv
在 gst_bus_init
和 gst_bus_constructed
中初始化:
gst_bus_init (GstBus * bus)
{
bus->priv = gst_bus_get_instance_private (bus);
bus->priv->enable_async = DEFAULT_ENABLE_ASYNC; // True
}
gst_bus_constructed (GObject * object)
{
GstBus *bus = GST_BUS_CAST (object);
if (bus->priv->enable_async) {
// 和定时器没有任何关系,就是一个 socketpair,用 pipe 实现也没任何问题,
// 其中第一个 fd 为接收端,第二个 fd 为发送端,将接收端 fd 创建 pollfd 并设置
// POLLIN 监听事件
bus->priv->poll = gst_poll_new_timer ();
// 将创建的 pollfd 保存一份到 bus->priv->pollfd 中,既可以通过 gst_bus_get_pollfd
// 暴露给外部,也可以通过 g_source_add_poll 创建 GSource 事件,并通过 g_source_attach
// 注册到 GMainLoop 主循环中
gst_poll_get_read_gpollfd (bus->priv->poll, &bus->priv->pollfd);
}
}
在 gst_bus_dispose
中释放相关资源:
gst_bus_dispose (GObject * object)
{
GstBus *bus = GST_BUS (object);
if (bus->priv->queue) {
if (bus->priv->poll)
gst_poll_free (bus->priv->poll);
bus->priv->poll = NULL;
}
}
需要注意的是,gst_bin_init
创建的 child_bus
默认 enable_async
是 False 的(虽然 gst_bus_init
将 enable_async
设置成了 True,但 gst_bus_set_property
会将它改回 False):
gst_bin_init (GstBin * bin)
{
GstBus *bus;
bus = g_object_new (GST_TYPE_BUS, "enable-async", FALSE, NULL);
bin->child_bus = bus;
}
但是 pipeline 作为 element 关联的 bus enable_async
默认是 True(因为创建时没有指定 enable-async
属性):
gst_pipeline_init (GstPipeline * pipeline)
{
GstBus *bus;
bus = gst_bus_new ();
gst_element_set_bus (GST_ELEMENT_CAST (pipeline), bus);
}
gst_bus_new (void)
{
GstBus *result;
result = g_object_new (gst_bus_get_type (), NULL);
return result;
}
element 的消息通过 gst_bus_post
逐级往上进行投递:
gst_bus_post (GstBus * bus, GstMessage * message)
{
GstBusSyncReply reply = GST_BUS_PASS;
if (bus->priv->sync_handler)
sync_handler = sync_handler_ref (bus->priv->sync_handler);
// 顶层 pipeline 作为 element 关联的 bus 不会设置这个回调,但是作为 bin 创建的
// child_bus 以及它之下的子 bin 创建的 child_bus 都会设置这个回调,
// 回调默认设置为 bin_bus_handler,将消息通过 bus(即父 bin 传递给它的 child_bus)
// 再投递出去,即实现逐级往上的投递机制,
// 注意 bin_bus_handler 总是返回 GST_BUS_DROP,即消息处理已结束,不要再处理了,
// 顶层 pipeline 由于没有为关联的 bus 设置 sync_handler,因此消息往上投递的机制
// 会终止
if (sync_handler)
reply = sync_handler->handler (bus, message, sync_handler->user_data);
// child_bus 默认 enable_async 为 False,因此只有顶层 pipeline 的
// bus->priv->poll 非 NULL(gst_auto_detect_find_best 为 autovideosink 创建
// 的 xvimagesink element 会短暂关联一个 gst_bus_new 创建的 bus,但是很快会
// 在 gst_auto_detect_detect 中被 gst_bin_add 替换成 autovideosink 的 child_bus)
/* If this is a bus without async message delivery always drop the message */
if (!bus->priv->poll)
reply = GST_BUS_DROP;
switch (reply) {
case GST_BUS_DROP:
/* drop the message */
break;
case GST_BUS_PASS:
// 顶层 pipeline 没有定义 sync_handler, 所以默认走这里,即给 socketpair 创建
// 的 fd 发送端发送一个 ‘W’ 字符
gst_atomic_queue_push (bus->priv->queue, message);
gst_poll_write_control (bus->priv->poll);
break;
case GST_BUS_ASYNC:
{
/* async delivery, we need a mutex and a cond to block on */
GCond *cond = GST_MESSAGE_GET_COND (message);
GMutex *lock = GST_MESSAGE_GET_LOCK (message);
/* now we lock the message mutex, send the message to the async
* queue. When the message is handled by the app and destroyed,
* the cond will be signalled and we can continue */
gst_atomic_queue_push (bus->priv->queue, message);
gst_poll_write_control (bus->priv->poll);
/* now block till the message is freed */
g_cond_wait (cond, lock);
break;
}
default:
break;
}
return TRUE;
}
分析命令行 gst-launch-1.0 filesrc location=1.h264 ! h264parse ! openh264dec ! autovideosink
中窗口关闭消息的处理,可以清晰的看到通过 sync_handler
实现的逐级投递:
#0 wake_event (set=0x55bfd4997a30)
#1 raise_wakeup (set=0x55bfd4997a30)
// 往 bus->priv->poll 对应的 fd 发送端写入字符 ’W‘
#2 gst_poll_write_control (set=0x55bfd4997a30)
// 往 pipeline 关联的 bus 投递消息,即往自身创建的 bus 投递消息,注意这个 bus 并没有设置 sync_handler 回调
// 注意 pipeline 作为 bin 组件通过 gst_bin_init 创建的 child_bus 是给子成员组件投递消息用的,
// 同时 child_bus 通过 gst_bus_set_sync_handler 设置了它的 sync_handler 回调为 bin_bus_handler,
// 回调参数为 pipeline 自身,
// 在 gst_bin_add 中为 bin 添加子 element 时,通过 add_element 方法(gst_bin_add_func)为
// element 关联的 bus 设置为这个 child_bus,
// 而 pipeline 作为 element 在其初始化函数 gst_pipeline_init 中创建并关联了一个新的 bus,也就是说
// pipeline 没有上层 bin 给它传递 child_bus 了,只得自己建一个,当然 pipeline 自身作为 bin 还是得
// 在父类 GstBinClass 方法 gst_bin_class_init 中为下层的 element 创建 child_bus,这一步骤并不会少
#3 gst_bus_post (bus=0x55bfd488c840, message=0x7fdaf8002a30)
// 选择 element->bus 进行消息投递,对应 pipeline 自己创建的 bus
#4 gst_element_post_message_default (element=0x55bfd4739de0, message=0x7fdaf8002a30)
#5 gst_bin_post_message (element=0x55bfd4739de0, msg=0x7fdaf8002a30)
#6 gst_element_post_message (element=0x55bfd4739de0, message=0x7fdaf8002a30)
// 调用父类 GstBinClass 方法 handle_message 默认实现 gst_bin_handle_message_func
#7 gst_bin_handle_message_func (bin=0x55bfd4739de0, message=0x7fdaf8002a30)
// 派生类 GstPipelineClass 覆盖了父类 GstBinClass 方法 handle_message 默认实现
#8 gst_pipeline_handle_message (bin=0x55bfd4739de0, message=0x7fdaf8002a30)
// bin 参数指向顶层 pipeline,这是 gst_bin_init 调用 gst_bus_set_sync_handler 时设置的回调入参
#9 bin_bus_handler (bus=0x55bfd4866540, message=0x7fdaf8002a30, bin=0x55bfd4739de0)
// 往 autovideosink 所属 bin 组件(pipeline)的 bus 投递消息
// 注意 autovideosink 作为 bin 组件通过 gst_bin_init 创建的 child_bus 是给子成员组件投递消息用的,
// 同时 child_bus 通过 gst_bus_set_sync_handler 设置了它的 sync_handler 回调为 bin_bus_handler,
// 回调参数为 autovideosink 自身,
// 在 gst_bin_add 中为 bin 添加子 element 时,通过 add_element 方法(gst_bin_add_func)为
// element 关联的 bus 设置为这个 child_bus,
// 而 autovideosink 作为 element 并没有关联其它 bus
#10 gst_bus_post (bus=0x55bfd4866540, message=0x7fdaf8002a30)
// 选择 element->bus 进行消息投递,对应 pipeline 的 child_bus
#11 gst_element_post_message_default (element=0x55bfd4aea5a0, message=0x7fdaf8002a30)
#12 gst_bin_post_message (element=0x55bfd4aea5a0, msg=0x7fdaf8002a30)
#13 gst_element_post_message (element=0x55bfd4aea5a0, message=0x7fdaf8002a30)
// 调用 GstBinClass 方法 handle_message 默认实现 gst_bin_handle_message_func
#14 gst_bin_handle_message_func (bin=0x55bfd4aea5a0, message=0x7fdaf8002a30)
// bin 参数指向 autovideosink,这是 gst_bin_init 调用 gst_bus_set_sync_handler 时设置的回调入参
#15 bin_bus_handler (bus=0x55bfd4aea7f0, message=0x7fdaf8002a30, bin=0x55bfd4aea5a0)
// 往 xvimagesink 所属 bin 组件(autovideosink)的 bus 投递消息
#16 gst_bus_post (bus=0x55bfd4aea7f0, message=0x7fdaf8002a30)
// 选择 element->bus 进行消息投递,对应 autovideosink 的 child_bus
#17 gst_element_post_message_default (element=0x55bfd4a29c50, message=0x7fdaf8002a30)
#18 gst_element_post_message (element=0x55bfd4a29c50, message=0x7fdaf8002a30)
#19 gst_element_message_full_with_details
(element=0x55bfd4a29c50, type=GST_MESSAGE_ERROR, domain=2733, code=3, text=0x7fdaf80024d0 "Output window was closed")
#20 gst_element_message_full
(element=0x55bfd4a29c50, type=GST_MESSAGE_ERROR, domain=2733, code=3, text=0x7fdaf80024d0 "Output window was closed")
#21 gst_xv_image_sink_handle_xevents (xvimagesink=0x55bfd4a29c50)
#22 gst_xv_image_sink_event_thread (xvimagesink=0x55bfd4a29c50)
bus 消息监听
如果 gst_bus_add_watch
为 bus 增加消息回调,则会将 bus 关联的接收端 fd 监听事件注册到 GMainLoop
主循环。我们要替换 GMainLoop
主循环,关键是截获这个 fd 监听事件的处理,有三种方法,一是将这个 fd 注册到我们新的主循环中,二是注册顶层 pipeline 的 sync_handler
回调,三是将异步消息转成同步信号处理。
第一种方法和原始的 bus 消息处理机制保持一致,底层创建的 socketpair 的接收端 fd 可以通过如下方法拿到:
GPollFD pfd;
gst_bus_get_pollfd(bus, &pfd);
然后结合 gst_bus_pop(bus)
就可以参考 gst_bus_source_dispatch
拿到原始的消息进行处理。
第二种方法是一种比较侵入式的修改,消息的处理在消息投递线程的上下文进行,需要注意锁的处理。
第三种方法将异步消息转成同步信号与注册顶层 pipeline 的 sync_handler
回调类似,此时消息的处理函数在消息投递线程的上下文执行,同样需要注意锁的处理:
bus = gst_pipeline_get_bus(GST_PIPELINE (pipeline));
gst_bus_enable_sync_message_emission(bus);
g_signal_connect (bus, "sync-message", G_CALLBACK(sync_bus_call), NULL);
结合 bus 消息处理 一节对消息处理过程的分析,应该很容易理解其原理,如下源代码所示即其核心处理逻辑:
// gstreamer/gstbus.c
gst_bus_class_init (GstBusClass * klass)
{
/**
* GstBus::sync-message:
* @self: the object which received the signal
* @message: the message that has been posted synchronously
*
* A message has been posted on the bus. This signal is emitted from the
* thread that posted the message so one has to be careful with locking.
*
* This signal will not be emitted by default, you have to call
* gst_bus_enable_sync_message_emission() before.
*/
gst_bus_signals[SYNC_MESSAGE] =
g_signal_new ("sync-message", G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST | G_SIGNAL_DETAILED,
G_STRUCT_OFFSET (GstBusClass, sync_message), NULL, NULL,
NULL, G_TYPE_NONE, 1, GST_TYPE_MESSAGE);
}
gst_bus_post (GstBus * bus, GstMessage * message)
{
GstBusSyncReply reply = GST_BUS_PASS;
if (bus->priv->sync_handler)
sync_handler = sync_handler_ref (bus->priv->sync_handler);
// gst_bus_enable_sync_message_emission 会设置这个
emit_sync_message = bus->priv->num_sync_message_emitters > 0;
// 顶层 pipeline 关联的 bus 没有设置 sync_handler 回调,因此 reply 是 GST_BUS_PASS
/* first call the sync handler if it is installed */
if (sync_handler)
reply = sync_handler->handler (bus, message, sync_handler->user_data);
/* emit sync-message if requested to do so via
gst_bus_enable_sync_message_emission. terrible but effective */
if (emit_sync_message && reply != GST_BUS_DROP
&& (!sync_handler || sync_handler->handler != gst_bus_sync_signal_handler))
// 将异步消息转成同步信号调用,
gst_bus_sync_signal_handler (bus, message, NULL);
}
gst_bus_sync_signal_handler (GstBus * bus, GstMessage * message, gpointer data)
{
GQuark detail = 0;
g_return_val_if_fail (GST_IS_BUS (bus), GST_BUS_DROP);
g_return_val_if_fail (message != NULL, GST_BUS_DROP);
detail = gst_message_type_to_quark (GST_MESSAGE_TYPE (message));
g_signal_emit (bus, gst_bus_signals[SYNC_MESSAGE], detail, message);
return GST_BUS_PASS;
}
参考资料
how to make gstreamer run in separate
https://gstreamer-devel.narkive.com/ZZdwug7h/gst-devel-how-to-make-gstreamer-run-in-separate-thread
最后修改于 2023-06-23