GStreamer MainLoop
GStreamer 虽然和 glib 是强绑定的关系,但是并不是说应用的主循环就一定得用 GMainLoop 这一套。

如果需要将 glib 的 GMainLoop 替换成其他的消息循环机制,主要的麻烦在于 bus 消息的处理(比如主线程通常要监听 pipeline bus 上的 GST_MESSAGE_ERRORGST_MESSAGE_EOS 消息),因此,需要理解 bus 的消息处理机制。

bus 消息处理

bus 的消息处理相关的信息记录在 bus->priv 字段中,即 GstBusPrivate 实例,bus->privgst_bus_initgst_bus_constructed 中初始化:

gst_bus_init (GstBus * bus)
{
  bus->priv = gst_bus_get_instance_private (bus);
  bus->priv->enable_async = DEFAULT_ENABLE_ASYNC; // True
}

gst_bus_constructed (GObject * object)
{
  GstBus *bus = GST_BUS_CAST (object);
  if (bus->priv->enable_async) {
    // 和定时器没有任何关系,就是一个 socketpair,用 pipe 实现也没任何问题,
    // 其中第一个 fd 为接收端,第二个 fd 为发送端,将接收端 fd 创建 pollfd 并设置
    // POLLIN 监听事件
    bus->priv->poll = gst_poll_new_timer ();
    // 将创建的 pollfd 保存一份到 bus->priv->pollfd 中,既可以通过 gst_bus_get_pollfd
    // 暴露给外部,也可以通过 g_source_add_poll 创建 GSource 事件,并通过 g_source_attach
    // 注册到 GMainLoop 主循环中
    gst_poll_get_read_gpollfd (bus->priv->poll, &bus->priv->pollfd);
  }
}

gst_bus_dispose 中释放相关资源:

gst_bus_dispose (GObject * object)
{
  GstBus *bus = GST_BUS (object);
  if (bus->priv->queue) {
    if (bus->priv->poll)
      gst_poll_free (bus->priv->poll);
    bus->priv->poll = NULL;
  }
}

需要注意的是,gst_bin_init 创建的 child_bus 默认 enable_async 是 False 的(虽然 gst_bus_initenable_async 设置成了 True,但 gst_bus_set_property 会将它改回 False):

gst_bin_init (GstBin * bin)
{
  GstBus *bus;
  bus = g_object_new (GST_TYPE_BUS, "enable-async", FALSE, NULL);
  bin->child_bus = bus;
}

但是 pipeline 作为 element 关联的 bus enable_async 默认是 True(因为创建时没有指定 enable-async 属性):

gst_pipeline_init (GstPipeline * pipeline)
{
  GstBus *bus;
  bus = gst_bus_new ();
  gst_element_set_bus (GST_ELEMENT_CAST (pipeline), bus);
}

gst_bus_new (void)
{
  GstBus *result;
  result = g_object_new (gst_bus_get_type (), NULL);
  return result;
}

element 的消息通过 gst_bus_post 逐级往上进行投递:

gst_bus_post (GstBus * bus, GstMessage * message)
{
  GstBusSyncReply reply = GST_BUS_PASS;

  if (bus->priv->sync_handler)
    sync_handler = sync_handler_ref (bus->priv->sync_handler);
  
  // 顶层 pipeline 作为 element 关联的 bus 不会设置这个回调,但是作为 bin 创建的
  // child_bus 以及它之下的子 bin 创建的 child_bus 都会设置这个回调,
  // 回调默认设置为 bin_bus_handler,将消息通过 bus(即父 bin 传递给它的 child_bus)
  // 再投递出去,即实现逐级往上的投递机制,
  // 注意 bin_bus_handler 总是返回 GST_BUS_DROP,即消息处理已结束,不要再处理了,
  // 顶层 pipeline 由于没有为关联的 bus 设置 sync_handler,因此消息往上投递的机制
  // 会终止
  if (sync_handler)
    reply = sync_handler->handler (bus, message, sync_handler->user_data);

  // child_bus 默认 enable_async 为 False,因此只有顶层 pipeline 的
  // bus->priv->poll 非 NULL(gst_auto_detect_find_best 为 autovideosink 创建
  // 的 xvimagesink element 会短暂关联一个 gst_bus_new 创建的 bus,但是很快会
  // 在 gst_auto_detect_detect 中被 gst_bin_add 替换成 autovideosink 的 child_bus)
  /* If this is a bus without async message delivery always drop the message */
  if (!bus->priv->poll)
    reply = GST_BUS_DROP;

  switch (reply) {
    case GST_BUS_DROP:
      /* drop the message */
      break;
    case GST_BUS_PASS:
      // 顶层 pipeline 没有定义 sync_handler, 所以默认走这里,即给 socketpair 创建
      // 的 fd 发送端发送一个 ‘W’ 字符
      gst_atomic_queue_push (bus->priv->queue, message);
      gst_poll_write_control (bus->priv->poll);
      break;
    case GST_BUS_ASYNC:
    {
      /* async delivery, we need a mutex and a cond to block on */
      GCond *cond = GST_MESSAGE_GET_COND (message);
      GMutex *lock = GST_MESSAGE_GET_LOCK (message);

      /* now we lock the message mutex, send the message to the async
       * queue. When the message is handled by the app and destroyed,
       * the cond will be signalled and we can continue */
      gst_atomic_queue_push (bus->priv->queue, message);
      gst_poll_write_control (bus->priv->poll);

      /* now block till the message is freed */
      g_cond_wait (cond, lock);
      break;
    }
    default:
      break;
  }
  return TRUE;
}

分析命令行 gst-launch-1.0 filesrc location=1.h264 ! h264parse ! openh264dec ! autovideosink 中窗口关闭消息的处理,可以清晰的看到通过 sync_handler 实现的逐级投递:

            #0  wake_event (set=0x55bfd4997a30)
            #1  raise_wakeup (set=0x55bfd4997a30)
            // 往 bus->priv->poll 对应的 fd 发送端写入字符 ’W‘
            #2  gst_poll_write_control (set=0x55bfd4997a30)

        // 往 pipeline 关联的 bus 投递消息,即往自身创建的 bus 投递消息,注意这个 bus 并没有设置 sync_handler 回调
        // 注意 pipeline 作为 bin 组件通过 gst_bin_init 创建的 child_bus 是给子成员组件投递消息用的,
        // 同时 child_bus 通过 gst_bus_set_sync_handler 设置了它的 sync_handler 回调为 bin_bus_handler,
        // 回调参数为 pipeline 自身,
        // 在 gst_bin_add 中为 bin 添加子 element 时,通过 add_element 方法(gst_bin_add_func)为
        // element 关联的 bus 设置为这个 child_bus,
        // 而 pipeline 作为 element 在其初始化函数 gst_pipeline_init 中创建并关联了一个新的 bus,也就是说
        // pipeline 没有上层 bin 给它传递 child_bus 了,只得自己建一个,当然 pipeline 自身作为 bin 还是得
        // 在父类 GstBinClass 方法 gst_bin_class_init 中为下层的 element 创建 child_bus,这一步骤并不会少
        #3  gst_bus_post (bus=0x55bfd488c840, message=0x7fdaf8002a30)
        // 选择 element->bus 进行消息投递,对应 pipeline 自己创建的 bus
        #4  gst_element_post_message_default (element=0x55bfd4739de0, message=0x7fdaf8002a30)
        #5  gst_bin_post_message (element=0x55bfd4739de0, msg=0x7fdaf8002a30)
        #6  gst_element_post_message (element=0x55bfd4739de0, message=0x7fdaf8002a30)
        // 调用父类 GstBinClass 方法 handle_message 默认实现 gst_bin_handle_message_func
        #7  gst_bin_handle_message_func (bin=0x55bfd4739de0, message=0x7fdaf8002a30)
        // 派生类 GstPipelineClass 覆盖了父类 GstBinClass 方法 handle_message 默认实现
        #8  gst_pipeline_handle_message (bin=0x55bfd4739de0, message=0x7fdaf8002a30)
        // bin 参数指向顶层 pipeline,这是 gst_bin_init 调用 gst_bus_set_sync_handler 时设置的回调入参
        #9  bin_bus_handler (bus=0x55bfd4866540, message=0x7fdaf8002a30, bin=0x55bfd4739de0)

    // 往 autovideosink 所属 bin 组件(pipeline)的 bus 投递消息
    // 注意 autovideosink 作为 bin 组件通过 gst_bin_init 创建的 child_bus 是给子成员组件投递消息用的,
    // 同时 child_bus 通过 gst_bus_set_sync_handler 设置了它的 sync_handler 回调为 bin_bus_handler,
    // 回调参数为 autovideosink 自身,
    // 在 gst_bin_add 中为 bin 添加子 element 时,通过 add_element 方法(gst_bin_add_func)为
    // element 关联的 bus 设置为这个 child_bus,
    // 而 autovideosink 作为 element 并没有关联其它 bus
    #10 gst_bus_post (bus=0x55bfd4866540, message=0x7fdaf8002a30)
    // 选择 element->bus 进行消息投递,对应 pipeline 的 child_bus
    #11 gst_element_post_message_default (element=0x55bfd4aea5a0, message=0x7fdaf8002a30)
    #12 gst_bin_post_message (element=0x55bfd4aea5a0, msg=0x7fdaf8002a30)
    #13 gst_element_post_message (element=0x55bfd4aea5a0, message=0x7fdaf8002a30)
    // 调用 GstBinClass 方法 handle_message 默认实现 gst_bin_handle_message_func
    #14 gst_bin_handle_message_func (bin=0x55bfd4aea5a0, message=0x7fdaf8002a30)
    // bin 参数指向 autovideosink,这是 gst_bin_init 调用 gst_bus_set_sync_handler 时设置的回调入参
    #15 bin_bus_handler (bus=0x55bfd4aea7f0, message=0x7fdaf8002a30, bin=0x55bfd4aea5a0)

// 往 xvimagesink 所属 bin 组件(autovideosink)的 bus 投递消息
#16 gst_bus_post (bus=0x55bfd4aea7f0, message=0x7fdaf8002a30)
// 选择 element->bus 进行消息投递,对应 autovideosink 的 child_bus
#17 gst_element_post_message_default (element=0x55bfd4a29c50, message=0x7fdaf8002a30)
#18 gst_element_post_message (element=0x55bfd4a29c50, message=0x7fdaf8002a30)
#19 gst_element_message_full_with_details
    (element=0x55bfd4a29c50, type=GST_MESSAGE_ERROR, domain=2733, code=3, text=0x7fdaf80024d0 "Output window was closed")
#20 gst_element_message_full
    (element=0x55bfd4a29c50, type=GST_MESSAGE_ERROR, domain=2733, code=3, text=0x7fdaf80024d0 "Output window was closed")
#21 gst_xv_image_sink_handle_xevents (xvimagesink=0x55bfd4a29c50)
#22 gst_xv_image_sink_event_thread (xvimagesink=0x55bfd4a29c50)

bus 消息监听

如果 gst_bus_add_watch 为 bus 增加消息回调,则会将 bus 关联的接收端 fd 监听事件注册到 GMainLoop 主循环。我们要替换 GMainLoop 主循环,关键是截获这个 fd 监听事件的处理,有三种方法,一是将这个 fd 注册到我们新的主循环中,二是注册顶层 pipeline 的 sync_handler 回调,三是将异步消息转成同步信号处理。

第一种方法和原始的 bus 消息处理机制保持一致,底层创建的 socketpair 的接收端 fd 可以通过如下方法拿到:

GPollFD pfd;
gst_bus_get_pollfd(bus, &pfd);

然后结合 gst_bus_pop(bus) 就可以参考 gst_bus_source_dispatch 拿到原始的消息进行处理。

第二种方法是一种比较侵入式的修改,消息的处理在消息投递线程的上下文进行,需要注意锁的处理。

第三种方法将异步消息转成同步信号与注册顶层 pipeline 的 sync_handler 回调类似,此时消息的处理函数在消息投递线程的上下文执行,同样需要注意锁的处理:

bus = gst_pipeline_get_bus(GST_PIPELINE (pipeline));
gst_bus_enable_sync_message_emission(bus);
g_signal_connect (bus, "sync-message", G_CALLBACK(sync_bus_call), NULL);

结合 bus 消息处理 一节对消息处理过程的分析,应该很容易理解其原理,如下源代码所示即其核心处理逻辑:

// gstreamer/gstbus.c

gst_bus_class_init (GstBusClass * klass)
{
  /**
   * GstBus::sync-message:
   * @self: the object which received the signal
   * @message: the message that has been posted synchronously
   *
   * A message has been posted on the bus. This signal is emitted from the
   * thread that posted the message so one has to be careful with locking.
   *
   * This signal will not be emitted by default, you have to call
   * gst_bus_enable_sync_message_emission() before.
   */
  gst_bus_signals[SYNC_MESSAGE] =
      g_signal_new ("sync-message", G_TYPE_FROM_CLASS (klass),
      G_SIGNAL_RUN_LAST | G_SIGNAL_DETAILED,
      G_STRUCT_OFFSET (GstBusClass, sync_message), NULL, NULL,
      NULL, G_TYPE_NONE, 1, GST_TYPE_MESSAGE);
}

gst_bus_post (GstBus * bus, GstMessage * message)
{
  GstBusSyncReply reply = GST_BUS_PASS;

  if (bus->priv->sync_handler)
    sync_handler = sync_handler_ref (bus->priv->sync_handler);

  // gst_bus_enable_sync_message_emission 会设置这个
  emit_sync_message = bus->priv->num_sync_message_emitters > 0;

  // 顶层 pipeline 关联的 bus 没有设置 sync_handler 回调,因此 reply 是 GST_BUS_PASS
  /* first call the sync handler if it is installed */
  if (sync_handler)
    reply = sync_handler->handler (bus, message, sync_handler->user_data);

  /* emit sync-message if requested to do so via
     gst_bus_enable_sync_message_emission. terrible but effective */
  if (emit_sync_message && reply != GST_BUS_DROP
      && (!sync_handler || sync_handler->handler != gst_bus_sync_signal_handler))
    // 将异步消息转成同步信号调用,
    gst_bus_sync_signal_handler (bus, message, NULL);
}

gst_bus_sync_signal_handler (GstBus * bus, GstMessage * message, gpointer data)
{
  GQuark detail = 0;

  g_return_val_if_fail (GST_IS_BUS (bus), GST_BUS_DROP);
  g_return_val_if_fail (message != NULL, GST_BUS_DROP);

  detail = gst_message_type_to_quark (GST_MESSAGE_TYPE (message));

  g_signal_emit (bus, gst_bus_signals[SYNC_MESSAGE], detail, message);

  return GST_BUS_PASS;
}

参考资料

how to make gstreamer run in separate
https://gstreamer-devel.narkive.com/ZZdwug7h/gst-devel-how-to-make-gstreamer-run-in-separate-thread


最后修改于 2023-06-23