CyberRT-共享内存实现

CyberRT共享内存类图

共享内存消息发布

在这里插入图片描述
数据用共享内存发布时,首先会创建ShmTransmitter对象,包含两个主要成员segment和notifier,Segment用于创建共享内存(上面绿色部分),Notifer 最终构建ReadableInfo通知给其他进程。
使用哪个ConditionNotifier-> notify或MulticastNotifier->notify,是在创建时根据配置文件决定的。
ConditionNotifier 在构建时会创建Indicator对象保存到共享内存中。
调ConditionNotifier-> notify,实际时将ReadableInfo保存到Indicator对象。

ConditionNotifier 共享内存数据接收

在这里插入图片描述
在接收数据时,也会创建同样的共享内存。如果共享内存存在,则直接打开。
在接收端也有同样的共享内存操作ConditionNotifier 。
ShmDispatcher会持有多个通道segment,用std::unordered_map<channelid, segment>表示。
同时启动一个后台线程ThreadFunc 线程轮询处理消息回调。

void ShmDispatcher::ThreadFunc() {
  ReadableInfo readable_info;
  // 轮询处理
  while (!is_shutdown_.load()) {
	// 100ms, Listen会转换100000 ms,对比seq,如果不等处理消息。每次轮询会等待递减50ms。
    if (!notifier_->Listen(100, &readable_info)) {
      ADEBUG << "listen failed.";
      continue;
    }

    if (readable_info.host_id() != host_id_) {
      ADEBUG << "shm readable info from other host.";
      continue;
    }
	//从共享内存Indicator中读出的数据
    uint64_t channel_id = readable_info.channel_id();
    uint32_t block_index = readable_info.block_index();

    {
      ReadLockGuard<AtomicRWLock> lock(segments_lock_);
      if (segments_.count(channel_id) == 0) {
        continue;
      }
      // check block index
      // std::unordered_map<uint64_t, uint32_t> previous_indexes_; 
      // 保存key: channelID, value: block_index
      if (previous_indexes_.count(channel_id) == 0) {
        previous_indexes_[channel_id] = UINT32_MAX;
      }
      uint32_t& previous_index = previous_indexes_[channel_id];
      if (block_index != 0 && previous_index != UINT32_MAX) {
        if (block_index == previous_index) {
          ADEBUG << "Receive SAME index " << block_index << " of channel "
                 << channel_id;
        } else if (block_index < previous_index) {
          ADEBUG << "Receive PREVIOUS message. last: " << previous_index
                 << ", now: " << block_index;
        } else if (block_index - previous_index > 1) {
          ADEBUG << "Receive JUMP message. last: " << previous_index
                 << ", now: " << block_index;
        }
      }
      previous_index = block_index;
	  ReadMessage(channel_id, block_index);
    }
  }
}

MulticastNotifier共享内存数据接收

MulticastNotifier时采用多播socket实现的,默认

std::string mcast_ip("239.255.0.100");
uint16_t mcast_port = 8888;

创建两个socket notify_fd_ 用于发生消息,listen_addr用于接收消息。
在这里插入图片描述
在发送端调用Notify时,时调的MulticastNotifier::Nofify(const ReadableInfo& info)

bool MulticastNotifier::Notify(const ReadableInfo& info) {
  if (is_shutdown_.load()) {
    return false;
  }

  std::string info_str;
  info.SerializeTo(&info_str);
  ssize_t nbytes =
      sendto(notify_fd_, info_str.c_str(), info_str.size(), 0,
             (struct sockaddr*)&notify_addr_, sizeof(notify_addr_));
  return nbytes > 0;
}

接收端用同样的方式轮询

bool MulticastNotifier::Listen(int timeout_ms, ReadableInfo* info) {
  if (is_shutdown_.load()) {
    return false;
  }

  if (info == nullptr) {
    AERROR << "info nullptr.";
    return false;
  }

  struct pollfd fds;
  fds.fd = listen_fd_;
  fds.events = POLLIN;
  int ready_num = poll(&fds, 1, timeout_ms);
  if (ready_num > 0) {
    char buf[32] = {0};  // larger than ReadableInfo::kSize
    ssize_t nbytes = recvfrom(listen_fd_, buf, 32, 0, nullptr, nullptr);
    if (nbytes == -1) {
      AERROR << "fail to recvfrom, " << strerror(errno);
      return false;
    }
    return info->DeserializeFrom(buf, nbytes);
  } else if (ready_num == 0) {
    ADEBUG << "timeout, no readableinfo.";
  } else {
    if (errno == EINTR) {
      AINFO << "poll was interrupted.";
    } else {
      AERROR << "fail to poll, " << strerror(errno);
    }
  }
  return false;
}
bool Block::TryLockForWrite() {
  int32_t rw_lock_free = kRWLockFree;
  //lock_num_ == rw_lock_free, kWriteExclusive赋值给lock_num_,返回true
  //lock_num_ != rw_lock_free, lock_num_赋值给rw_lock_free,返回false
  if (!lock_num_.compare_exchange_weak(rw_lock_free, kWriteExclusive,
                                       std::memory_order_acq_rel,
                                       std::memory_order_relaxed)) {
    ADEBUG << "lock num: " << lock_num_.load();
    return false;
  }
  return true;
}

总结
1、CyberRT的共享内存读写都时需要加锁的。
2、每次写数据可以是不连续的block
3、每次当Block.lock_num_= 0:空闲,>0:有读操作, -1 : 写操作。
效率不是高。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/180112.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

高通OTA升级方案介绍

高通OTA升级方案介绍 1. 高通LE OTA1.1 背景1.2 Recovery系统 2. SDX12 OTA方案3 OTA包的加密 3UK Penetration Test对于OTA升级也有严格的安全要求&#xff0c;下面是几条用例要求&#xff1a; Firmware: A sufficiently strong signing key MUST be in use. Signing keys MUS…

kubernetes 部署 spinnaker

spinnaker简介 Spinnaker 是一个开源、多云持续交付平台&#xff0c;它将强大而灵活的管道管理系统与主要云提供商的集成相结合。Spinnaker 提供应用程序管理和部署&#xff0c;帮助您快速、自信地发布软件变更。 Spinnaker 提供了两组核心的功能&#xff1a; 应用管理与应用程…

信息系统的安全保护等级的五个级别

信息系统的安全保护等级分为五级&#xff1a;第一级为自主保护级、第二级为指导保护级、第三级为监督保护级、第四级为强制保护级、第五级为专控保护级。 法律依据&#xff1a;《信息安全等级保护管理办法》第四条 信息系统的安全保护等级分为以下五级&#xff1a;   &#…

外贸自建站服务器怎么选?网站搭建的工具?

外贸自建站服务器用哪个好&#xff1f;如何选海洋建站的服务器&#xff1f; 外贸自建站是企业拓展海外市场的重要手段之一。而在这个过程中&#xff0c;选择一个适合的服务器对于网站的稳定运行和优化至关重要。海洋建站将为您介绍如何选择适合的外贸自建站服务器。 外贸自建…

SAP LU04记账更改通知单创建转储单报错:L3094 记帐修改没有份存在

解决办法&#xff1a; 使用事务码LU02&#xff0c;修改过账更改状态&#xff0c;将过账更改状态改为U&#xff0c;强制关闭 1. LU04 查找记账更改通知单号 2. 事务码LU02修改状态 这个时候再用LU04去查看的时候&#xff0c;就不会再显示了

一个ETL流程搞定数据脱敏

数据脱敏是什么&#xff1f; 数据脱敏是指在数据处理过程中&#xff0c;通过一系列的技术手段去除或者替换敏感信息&#xff0c;以保护个人隐私和敏感信息的安全的过程。数据脱敏通常在数据共享、数据分析和软件测试等场景下使用&#xff0c;它旨在降低数据泄露和滥用的风险。…

Sealos 云操作系统私有化部署教程

Sealos 私有云已经正式发布了&#xff0c;它为企业用云提供了一种革命性的新方案。Sealos 的核心优势在于&#xff0c;它允许企业在自己的机房中一键构建一个功能与 Sealos 公有云完全相同的私有云。这意味着企业可以在自己的控制和安全范围内&#xff0c;享受到公有云所提供的…

4.22每日一题(累次积分的计算:交换次序)

注&#xff1a;因为 是积不出的函数&#xff0c;所以先不用算&#xff0c;最后发现&#xff0c;出现dx与dy可以相互抵消&#xff0c;即可算出答案

【TypeScrpt算法】算法的复杂度分析

算法的复杂度分析 什么是算法复杂度&#xff1f; 不同的算法&#xff0c;其实效率是不一样的 让我举一个案例来比较两种不同的算法在查找数组中给定元素的时间复杂度 [1,2,3,4,5,6,7,...9999,n] 顺序查找 这种方法从头到尾遍历整个数组&#xff0c;依次比较每个元素和给定元…

Jenkins+Maven+Gitlab+Tomcat 自动化构建打包、部署

JenkinsMavenGitlabTomcat 自动化构建打包、部署 1、环境需求 本帖针对的是Linux环境&#xff0c;Windows或其他系统也可借鉴。具体只讲述Jenkins配置以及整个流程的实现。 1.JDK&#xff08;或JRE&#xff09;及Java环境变量配置&#xff0c;我用的是JDK1.8.0_144&#xff0…

Talk | UCSB博士生宋珍巧:基于人工智能的功能性蛋白质设计

本期为TechBeat人工智能社区第549期线上Talk。 北京时间11月22日(周三)20:00&#xff0c;UC Santa Barbara博士生—宋珍巧的Talk已准时在TechBeat人工智能社区开播&#xff01; 她与大家分享的主题是: “基于人工智能的功能性蛋白质设计”&#xff0c;介绍了如何利用机器学习算…

好用的局域网监控软件推荐

局域网监控软件是一种用于监控局域网内计算机使用情况的软件&#xff0c;可以帮助企业管理者更好地了解员工的工作状态和行为&#xff0c;规范上网行为并保护企业网络资源。 一、域之盾软件 这是一款专业的上网监控软件&#xff0c;它支持多种操作系统和平台&#xff0c;可以全…

CodeWhisperer 体验总结

CodeWhisperer 体验总结 | CodeWhisperer 是一款亚马逊新推出的通用代码生成器 可以实时进行代码数据的提供 还可以定义安全问题 CodeWhisperer 对个人用户是免费使用 企业用户需要订阅使用 亚马逊云科技开发者社区为开发者们提供全球的开发技术资源。这里有技术文档、开发案例…

【精选】改进的YOLOv5:红外遥感图像微型目标的高效识别系统

1.研究背景与意义 随着科技的不断发展&#xff0c;红外遥感技术在军事、安防、环境监测等领域中得到了广泛应用。红外遥感图像具有独特的优势&#xff0c;可以在夜间或恶劣天气条件下获取目标信息&#xff0c;因此在小目标检测方面具有重要的应用价值。然而&#xff0c;由于红…

当当网获得dangdang商品详情商品列表API 测试请求入口

item_get-获得dangdang商品详情 获取商品详情 item_search-按关键字搜索dangdang商品 获取商品列表 公共参数 名称类型必须描述keyString是调用key&#xff08;必须以GET方式拼接在URL中&#xff09;secretString是调用密钥api_nameString是API接口名称&#xff08;包括在请…

python数据结构与算法-13_高级排序算法-快速排序

快速排序 快速排序名字可不是盖的&#xff0c;很多程序语言标准库实现的内置排序都有它的身影&#xff0c;我们就直奔主题吧。 和归并排序一样&#xff0c;快排也是一种分而治之(divide and conquer)的策略。归并排序把数组递归成只有单个元素的数组&#xff0c;之后再不断两两…

PC端页面进去先出现加载效果

自定义指令v-loading&#xff0c;只需要绑定Boolean即可 v-loading“loading” <el-table :data"list" border style"width: 100%" v-loading"loading"><el-table-column align"center" label"序号" width"5…

java--static修饰成员变量

1.static 叫静态&#xff0c;可以修饰成员变量、成员方法。 2.成员变量按照有无static修饰&#xff0c;分为两种&#xff1a; ①类变量&#xff1a;有static修饰&#xff0c;属于类&#xff0c;在计算机里只有一份&#xff0c;会被类的全部对象共享(不管那个类调用的&#x…

脸爱云一脸通智慧管理平台未授权访问

声明 本文仅用于技术交流&#xff0c;请勿用于非法用途 由于传播、利用此文所提供的信息而造成的任何直接或者间接的后果及损失&#xff0c;均由使用者本人负责&#xff0c;文章作者不为此承担任何责任。 一、漏洞概述 脸爱云一脸通智慧管理平台存在严重漏洞&#xff0c;允许…

数据结构与算法编程题13

设计算法将一个带头结点的单链表A分解为两个具有相同结构的链表B、C&#xff0c;其中B表的结点为A表中值小于零的结点&#xff0c;而C表的结点为A表中值大于零的结点&#xff08;链表A中的元素为非零整数&#xff0c;要求B、C表利用A表的结点&#xff09; for example: A -1 2 …