GStreamer中如何自定义配置线程优先级

1.引言

如果看了gstreamer官方教程配置多线程出现编译不过的问题了,不妨进来看看这篇文章或许能解决一些编译问题。

GStreamer 本质上是多线程的,并且是完全线程安全的。大多数线程内部对应用程序是隐藏的,这应该使应用程序开发更容易。但是,在某些情况下,应用程序可能希望影响其中的某些部分。 GStreamer 允许应用程序在pipeline的某些部分强制使用多个线程,GStreamer 还可以在创建线程时通知您,以便您可以配置要使用的线程优先级或线程池等内容。

2.GStreamer 中的调度

GStreamer pipeline中的每个element决定如何调度它。element可以选择是基于推还是基于拉的方式调度它们的pad。例如,一个element可以选择启动一个线程以开始从sink pad拉取 或/并 开始从source pad推送。element也可以选择使用上游(upstream )或下游(downstream )线程分别以推和拉模式对数据进行处理。 GStreamer 对element选择如何调度没有任何限制。有关更多详细信息,请参阅插件编写指南。

在任何情况下都会发生的是,某些element将启动一个线程来处理它们的数据,称为“流线程”(“streaming threads”)。当element需要创建一个流线程时,流线程或叫做 GstTask 对象是从 GstTaskPool 创建的。在下一节中,我们将看到如何接收任务和池的通知。

3.在 GStreamer 中配置线程

STREAM_STATUS 消息发布在总线上以通知有关流线程的状态。你将从消息中获得以下信息:

当将要创建新线程时,您将收到 GST_STREAM_STATUS_TYPE_CREATE 类型的通知。然后可以在 GstTask 中配置 GstTaskPool。自定义任务池将为任务提供自定义线程以实现流线程。

  • 如果要配置自定义任务池,则需要同步处理此消息。如果你在此消息返回时未在任务上配置任务池,则任务将使用其默认池。

  • 进入或离开线程时。这是你可以配置线程优先级的时刻。当线程被销毁时,你也会收到通知。

  • 当线程开始、暂停和停止时,你会收到消息。这可用于在 gui 应用程序中可视化流线程的状态。

4.如何提高线程的优先级

请添加图片描述
让我们看一下上面的pipeline。我们希望提高流线程的优先级。appsrc element将启动流线程来将appsrc中src pad上的数据推送给对端queue element的sink pad。改变优先级的流程是这样的:

  • 当从 READY 到 PAUSED 状态时, appsrc 将需要一个流线程来将数据推送到 queue 中。它将发布一条STREAM_STATUS 消息,指示其对流线程的需求。

  • 应用程序将使用同步总线处理程序对 STREAM_STATUS 消息做出反应。然后它将在消息内的 GstTask 上配置自定义GstTaskPool。自定义任务池负责创建线程。在这个例子中,我们将创建一个具有更高优先级的线程。

  • 或者,由于同步消息是在线程上下文中调用的,您可以使用线程 ENTER/LEAVE 通知来更改当前线程的优先级或调度策略。

第一步,我们需要实现一个可以在任务上配置的自定义 GstTaskPool。下面是一个 GstTaskPool 子类的实现,它使用 pthreads 创建一个 SCHED_RR 实时线程。请注意,创建实时线程可能需要额外的权限。

#include <gst/gst.h>
#include <gst/gsttaskpool.h>

#define TEST_TYPE_RT_POOL \
  (test_rt_pool_get_type())

GType test_rt_pool_get_type (void);
GstTaskPool *test_rt_pool_new (void);

typedef struct _GstTaskPool      TestRTPool;
typedef struct _GstTaskPoolClass TestRTPoolClass;

typedef struct
{
    pthread_t thread;
} TaskPoolRTId;

G_DEFINE_TYPE (TestRTPool, test_rt_pool, GST_TYPE_TASK_POOL);

static void
default_prepare (GstTaskPool * pool, GError ** error)
{
  /* we don't do anything here. We could construct a pool of threads here that
   * we could reuse later but we don't */
}

static void
default_cleanup (GstTaskPool * pool)
{
}

static gpointer
default_push (GstTaskPool * pool, GstTaskPoolFunction func, gpointer data,
    GError ** error)
{
  TestRTId *tid;
  gint res;
  pthread_attr_t attr;
  struct sched_param param;

  tid = g_slice_new0 (TestRTId);

  pthread_attr_init (&attr);
  if ((res = pthread_attr_setschedpolicy (&attr, SCHED_RR)) != 0)
    g_warning ("setschedpolicy: failure: %p", g_strerror (res));

  param.sched_priority = 50;
  if ((res = pthread_attr_setschedparam (&attr, &param)) != 0)
    g_warning ("setschedparam: failure: %p", g_strerror (res));

  if ((res = pthread_attr_setinheritsched (&attr, PTHREAD_EXPLICIT_SCHED)) != 0)
    g_warning ("setinheritsched: failure: %p", g_strerror (res));

  res = pthread_create (&tid->thread, &attr, (void *(*)(void *)) func, data);

  if (res != 0) {
    g_set_error (error, G_THREAD_ERROR, G_THREAD_ERROR_AGAIN,
        "Error creating thread: %s", g_strerror (res));
    g_slice_free (TestRTId, tid);
    tid = NULL;
  }

  return tid;
}

static void
default_join (GstTaskPool * pool, gpointer id)
{
  TestRTId *tid = (TestRTId *) id;

  pthread_join (tid->thread, NULL);

  g_slice_free (TestRTId, tid);
}

static void
test_rt_pool_class_init (TestRTPoolClass * klass)
{
  GstTaskPoolClass *gsttaskpool_class;

  gsttaskpool_class = (GstTaskPoolClass *) klass;

  gsttaskpool_class->prepare = default_prepare;
  gsttaskpool_class->cleanup = default_cleanup;
  gsttaskpool_class->push = default_push;
  gsttaskpool_class->join = default_join;
}

static void
test_rt_pool_init (TestRTPool * pool)
{
}

GstTaskPool *
test_rt_pool_new (void)
{
  GstTaskPool *pool;

  pool = g_object_new (TEST_TYPE_RT_POOL, NULL);

  return pool;
}

编写任务池时要实现的重要功能是“push”功能。 实现上应该启动一个调用给定函数的线程。 更复杂的实现可能希望在池中保留一些线程,因为创建和销毁线程并不总是最快的操作。

在下一步中,我们需要在 appsrc 需要时实际配置自定义任务池。 为此,我们使用同步处理程序拦截 STREAM_STATUS 消息。

static GMainLoop* loop;

static void
on_stream_status (GstBus     *bus,
                  GstMessage *message,
                  gpointer    user_data)
{
  GstStreamStatusType type;
  GstElement *owner;
  const GValue *val;
  GstTask *task = NULL;

  gst_message_parse_stream_status (message, &type, &owner);

  val = gst_message_get_stream_status_object (message);

  /* see if we know how to deal with this object */
  if (G_VALUE_TYPE (val) == GST_TYPE_TASK) {
    task = g_value_get_object (val);
  }

  switch (type) {
    case GST_STREAM_STATUS_TYPE_CREATE:
      if (task) {
        GstTaskPool *pool;

        pool = test_rt_pool_new();

        gst_task_set_pool (task, pool);
      }
      break;
    default:
      break;
  }
}

static void
on_error (GstBus     *bus,
          GstMessage *message,
          gpointer    user_data)
{
  g_message ("received ERROR");
  g_main_loop_quit (loop);
}

static void
on_eos (GstBus     *bus,
        GstMessage *message,
        gpointer    user_data)
{
  g_main_loop_quit (loop);
}


int
main (int argc, char *argv[])
{
  GstElement *bin, *AppSrc;
  GstBus *bus;
  GstStateChangeReturn ret;

  gst_init (&argc, &argv);

  /* create a new bin to hold the elements */
  bin = gst_pipeline_new ("pipeline");
  g_assert (bin);
    
  //pseudo-code

  /* create some elements */
  AppSrc = gst_element_factory_make ("appsrc", "appsrc");
  .............


  /* add objects to the main pipeline */
  gst_bin_add_many (GST_BIN (bin),AppSrc ,..., ..., NULL);

  /* link the elements */
  gst_element_link_many(AppSrc,..., ...,null);

  loop = g_main_loop_new (NULL, FALSE);

  /* get the bus, we need to install a sync handler */
  bus = gst_pipeline_get_bus (GST_PIPELINE (bin));

  gst_bus_enable_sync_message_emission (bus);

  gst_bus_add_signal_watch (bus);

  g_signal_connect (bus, "sync-message::stream-status",
      (GCallback) on_stream_status, NULL);
  g_signal_connect (bus, "message::error",
      (GCallback) on_error, NULL);
  g_signal_connect (bus, "message::eos",
      (GCallback) on_eos, NULL);

  /* start playing */
  ret = gst_element_set_state (bin, GST_STATE_PLAYING);
  if (ret != GST_STATE_CHANGE_SUCCESS) {
    g_message ("failed to change state");
    return -1;
  }

  /* Run event loop listening for bus messages until EOS or ERROR */
  g_main_loop_run (loop);

  /* stop the bin */
  gst_element_set_state (bin, GST_STATE_NULL);
  gst_object_unref (bus);
  g_main_loop_unref (loop);

  return 0;
}

请注意,该程序可能需要 root 权限才能创建实时线程。 当无法创建线程时,状态更改函数将失败,我们在上面的应用程序中捕获了这一点。

当pipeline中有多个线程时,您将收到多个 STREAM_STATUS 消息。 您应该使用消息的所有者(可能是 pad 或启动线程的element)来确定该线程在应用程序上下文中的功能是什么。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/621559.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

R语言:肿瘤突变负荷分析

> merge_maf <- function(metadata, path){ #通过合并path,还有sample sheet前两列得到每一个文件的完整路径 filenames <- file.path(path, metadata$file_id, metadata$file_name, fsep .Platform$file.sep) message (##############…

RabbitMQ的用途

RabbitMQ主要有四个用途&#xff0c;分别是应用解耦、异步提速、削峰填谷、消息分发。详情讲解如下&#xff1a; RabbitMQ介绍、解耦、提速、削峰、分发 详解、RabbitMQ安装 可视化界面讲解 1.应用解耦&#xff1a;提高系统容错性和可维护性 2.异步提速&#xff1a;提升用户体验…

【JVM基础篇】打破双亲委派机制

文章目录 打破双亲委派机制自定义类加载器双亲委派机制核心代码打破双亲委派机制自定义类加载器父类怎么是AppClassLoader呢&#xff1f;两个自定义类加载器加载相同限定名的类&#xff0c;不会冲突吗&#xff1f;拓展类加载器功能 线程上下文类加载器JDBC案例那么问题来了&…

打造本地GPT专业领域知识库AnythingLLM+Ollama

如果你觉得openai的gpt没有隐私&#xff0c;或者需要离线使用gpt&#xff0c;还是打造专业领域知识&#xff0c;可以借用AnythingLLMOllama轻松实现本地GPT. AnythingLLMOllama 实现本地GPT步聚&#xff1a; 1 下载 AnythingLLM软件 AnythingLLM官网地址&#xff1a; Anythi…

C++17新特性 结构化绑定

一、Python中的相似功能 熟悉python的应该对下面的代码很熟悉 def return_multiple_values():return 11, 7x, y return_multiple_values()函数返回一个元组&#xff0c;元组自动分配给了x和y。 二、C11中的元组 c11中就存在类似python元组的概念了&#xff1a; std::tupl…

高速电流反馈运放总结

目录 前言 基础架构 CFB运算放大器拓扑结构的进步 前言 最近项目发现有震荡&#xff0c;发现是电流反馈型运放导致&#xff0c;所以对电流运放的知识做了全面的复习。 基础架构 现在&#xff0c;我们将详细考察高速运算放大器中非常流行的电流反馈(CFB)运算放大器拓扑结 构…

黑盒测试中的边界值分析

黑盒测试是一种基于需求和规格的测试方法&#xff0c;它主要关注软件系统输出的正确性和完整性&#xff0c;而不考虑内部代码的实现方式。在黑盒测试中&#xff0c;边界值分析是一种重要的测试技术&#xff0c;它可以帮助测试人员有效地发现输入和输出的问题。本文将从什么是边…

【数据结构】二叉排序树(查找+插入+删除+效率分析)完整代码+解析

3.1 二叉排序树 3.1.1 定义 二叉排序树的定义 又称二叉查找树&#xff08;BST&#xff0c;Binary Search Tree&#xff09; 二叉排序树是具有以下性质的二叉树&#xff1a; 左子树结点值<根结点值<右子树结点值 进行中序遍历&#xff0c;可以得到一个递增的有序序列。 3…

无需公网IP、无需云服务器,异地组网实现远程直连NAS、游戏联机

手机图片、视频太多&#xff0c;存储空间不够用怎么办?出门在外无法直连家中NAS&#xff0c;远程访问NAS速度慢&#xff1f;自建私有云、多媒体服务器&#xff0c;如何多人远程共享媒体资源&#xff1f;幻兽帕鲁、我的世界、泰拉瑞亚…局域网游戏&#xff0c;想远程多人联机&a…

Golang面向对象编程(二)

文章目录 封装基本介绍封装的实现工厂函数 继承基本介绍继承的实现字段和方法访问细节多继承 封装 基本介绍 基本介绍 封装&#xff08;Encapsulation&#xff09;是面向对象编程&#xff08;OOP&#xff09;中的一种重要概念&#xff0c;封装通过将数据和相关的方法组合在一起…

RobbitMQ基本消息队列的消息接收

1.先给工程引入依赖 父工程有了子工程就不用导了 <!--AMQP依赖&#xff0c;包含RabbitMQ--> <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId> </dependency> 2.配置yml…

基于大数据+Hadoop的豆瓣电子图书推荐系统设计和实现

博主介绍&#xff1a;✌全网粉丝30W,csdn特邀作者、博客专家、CSDN新星计划导师、Java领域优质创作者,博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和学生毕业项目实战,高校老师/讲师/同行交流合作✌ 主要内容&#xff1a;SpringBoot、Vue、SSM、HLM…

linux学习:多媒体开发库SDL+视频、音频、事件子系统+处理yuv视频源

目录 编译和移植 视频子系统 视频子系统产生图像的步骤 api 初始化 SDL 的相关子系统 使用指定的宽、高和色深来创建一个视窗 surface 使用 fmt 指定的格式创建一个像素点​编辑 将 dst 上的矩形 dstrect 填充为单色 color​编辑 将 src 快速叠加到 dst 上​编辑 更新…

sqli-labs 第十七关

目录 找注入点&#xff1a; 源码分析&#xff1a; 测试&#xff1a; 奇怪现象&#xff1a; &#xff08;1&#xff09;&#xff1a;当我们输入的密码为字符进行注入时。 &#xff08;2&#xff09;&#xff1a;当我们输入的密码为整数时。 产生原因&#xff1a; 解决方法…

Docker:docker在项目中常用的一些命令

简介   Docker 是一个开源的容器化平台&#xff0c;它允许开发者将应用程序及其依赖项打包到一个可移植的容器中&#xff0c;并发布到任何安装了 Docker 引擎的机器上。这些容器是轻量级的&#xff0c;包含了应用程序运行所需的所有东西&#xff0c;如代码、系统库、系统工具…

SpringBoot集成Redis环境搭建及配置详解

前言 Redis作为当前最火的NoSQL数据库&#xff0c;支持很多语言客户端操作Redis。 而SpringBoot作为java当前最火的开发框架&#xff0c;提供了Spring-data-redis框架实现对Redis的各种操作。 在springboot1.5.x版本的默认的Redis客户端都是Jedis实现的&#xff0c;springboot…

大模型时代下两种few shot高效文本分类方法

介绍近年(2022、2024)大语言模型盛行下的两篇文本分类相关的论文&#xff0c;适用场景为few shot。两种方法分别是setfit和fastfit&#xff0c;都提供了python的包使用方便。 论文1&#xff1a;Efficient Few-Shot Learning Without Prompts 题目&#xff1a;无需提示的高效少…

浪潮信息企业级存储逆势增长 市场份额位列中国前二

2023年&#xff0c;中国企业级存储市场竞争激烈&#xff0c;在挑战重重之下&#xff0c;浪潮信息仍然实现逆势增长&#xff0c;销售额增幅达4.7%&#xff0c;市场份额相比2022年扩大0.6%&#xff0c;位列中国前二。另外&#xff0c;在高端和全闪存阵列细分市场&#xff0c;浪潮…

Vue3实战Easy云盘(三):文件删除+文件移动+目录导航+上传优化/文件过滤/搜索

一、文件删除 &#xff08;1&#xff09;选中了之后才可以删除&#xff0c;没有选中时就显示暗调删除按钮 &#xff08;2&#xff09;实现选中高亮功能 &#xff08;3&#xff09;单个删除 &#xff08;4&#xff09;批量删除 Main.vue中 <!-- 按钮3 --><!-- 如果sel…

鸿蒙内核源码分析(用户态锁篇) | 如何使用快锁Futex(上)

快锁上下篇 鸿蒙内核实现了Futex&#xff0c;系列篇将用两篇来介绍快锁&#xff0c;主要两个原因: 网上介绍Futex的文章很少&#xff0c;全面深入内核介绍的就更少&#xff0c;所以来一次详细整理和挖透。涉及用户态和内核态打配合&#xff0c;共同作用&#xff0c;既要说用户…