一.RTPS协议概述
RTPS协议主要由四个部分组成:
1.发现模块(Discovery)
发现模块是定义了RTPS的参与者(Participant)获取其他RTPS的参与者(Participant),端点(Endpoint)的协议,使得每个参与者(Participant)能够了解到整个网络中其他参与者的存在并且相互匹配。
metatraffic通信使得RTPS的参与者(Participant)可以获取到所有Participant,Reader以及Writer的快照并且让本地Reader和远端Writer以及本地Writer和远端Reader之间通信。
RTPS规范将发现模块拆为两个部分:
1.1 参与者发现协议(PDP):指定了参与者如何在网络中发现彼此。
1.2 端点发现协议(EDP):一旦发现彼此,则两个参与者就使用端点发现协议(EDP)交换起包含的端点的信息。
所有RTPS的实现必须提供简单参与者发现协议(Simple PDP)和简单端点发现协议(Simple EDP)。
2.结构(Structor)
结构定义了RTPS的各类实体(Entity)与域(Domain)的关系以及各类实体间的关联:
Entity:所有RTPS实体的父类,每个Entity对象拥有Guid,并且对其他Entity对象可见。Guid属性如下:
struct RTPS_DllAPI GUID_t // 唯一标识一个Entity
{
//!Guid prefix 每个Participant和其下的Endpoint都拥有相同的Guid Prefix
GuidPrefix_t guidPrefix;
//!Entity id 每个实体的EntityID不同
EntityId_t entityId;
}
Endpoint:特殊的Entity,是RTPS消息的起点或者终点。Endpoint涉及的属性如下:
属性 | 类型 | 含义 | 和DDS的关系 |
---|---|---|---|
unicastLocatorList | Locator_t[*] | 发送RTPS消息的目标Endpoint的单播地址的列表 | 在Discovery过程中配置 |
multicastLocatorLi st | Locator_t[*] | 发送RTPS消息的多播地址的列表(可以为空) | 在Discovery过程中配置 |
ReliabilityKind | ReliabilityKind_t | 可靠性等级(BestEffort / Reliable) | Qos中指定的 |
topicKind | TopicKind_t | 标识当前Endpoint上传输的数据是否有key,key数据的主要类型是InstanceHandle_t,对应了Guid | |
endpointGroup | EntityId_t | 标记了Endpoint属于哪个RTPSGroup | 和DDS中的Subscriber/Publisher关联 |
Participant:是所有Endpoint的容器,共享属性。Participant涉及到的属性如下:
属性 | 类型 | 含义 | 和DDS的关系 |
---|---|---|---|
defaultUnicastLocatorList | Locator_t[*] | 默认的单播地址列表(地址+端口) | 在Discovery过程中配置 |
defaultMulticastLocat orList | Locator_t[*] | 默认的组播地址列表(地址+端口) | 在Discovery过程中配置 |
ProtocolVersion | ProtocolVersion_t | RTPS协议版本号 | |
VendorId | VendorId_t | RTPS中间商代码 | 需要向OMG组织申请 |
Writer:RTPS消息的起点,发送HistoryChange中的CacheChange消息到匹配的Reader的HistoryChange。
Reader:RTPS消息的终点,接受匹配的Writer发送的CacheChange数据。
RTPSGroup:主要有两种Group(Publisher和Subscriber)
3.消息(Message)
消息模块定义了Reader和Writer之间交换的消息格式,主要有消息头(Header)和一系列子消息(SubMessage组成),每个子消息都由一系列子消息元素组成:
Header:
一个消息(Message)中可能包含多个子消息(SubMessage),这些子消息之间可能存在依赖关系/解释关系。
子消息分为实体子消息(Entity SubMessage)和解释子消息(Interpreter SubMessage)。
4.行为(Behavior)
行为定义了Reader和Writer交换RTPS消息的顺序以及消息引起的相关实体(主要是Reader和Writer)状态的变化,RTPS中主要有行为模块的视线(有状态 Stateful 和 无状态 Stateless)。
RTPS Writer不控制什么时候从Writer的HistoryCache中删除CacheChange,删除CacheChange的动作是由DDS的Writer来完成的,这个和Qos配置有关,例如如果配置了KEEP_LAST,那么DataWriter只会保留最后一次发送的CacheChange数据,删除之前的Change数据。
必须实现RTPS MessageReceiver接口(因为涉及到解析子消息时上下文的处理和状态机的切换)
StatefulWriter和StatefulReader交互数据的流程如下图:
4.1 通用要求(General Requirement)
Writer:
RTPSWriter发送HistoryCache中的CacheChange数据时,必须按照SequenceNumber的顺序进行发送。
如果Reader要求inline Qos,则Writer发送数据的时候必须带上Qos数据(in-line Qos)。
对于StatefulWriter,必须定周期发送HeartBeat子消息(如果HistoryCache中还有数据)。如果HistoryCache中没有数据了,那么StatefulWriter就不需要发送HeartBeat子消息了。StatefulWriter必须持续发送HeartBeat消息给StatefulReader(如果Reader返回的时候NACK)直到Reader读取了所有数据。
Writer对于Reader的NACK消息进行响应处理(因为这代表Reader有miss的数据需要Writer进行重发)
Reader:
因为Stateless的Reader是完全被动的,只接受数据,不发送响应数据。因此,对Reader的通用要求大部分都是针对StatefulReader的。
如果reader收到的HeartBeat包没有设置final flag标志位,则Reader对于这包HeartBeat必须做出响应,回复ACK或者NACK SubMessage。
如果reader有数据没有收到(收到HeartBeat比对seq以后),必须回复Nack包告知Writer有数据包丢失需要重传。这个回复可以延迟以避免网络数据风暴。
4.2 具体实现
Writer:
StatelessWriter和StatefulWriter的主要区别在于持有对端Reader的方式和信息有所不同,类结构的区别如下图:
RTPSWriter的主要属性有下面8种:
属性 | 类型 | 含义 | 和DDS的关系 |
---|---|---|---|
pushMode | bool | 当属性为true时,Writer主动将change数据推送给reader,反之,change数据随着心跳包一起发送,并且是作为对Reader请求的应答 | N/A |
heartbeatPeriod | Duration_t | 周期发送心跳包消息的间隔(心跳包包含可被reader读取的数据的lastchangenumber) | N/A |
nackResponseDelay | Duration_t | 对于Reader发送的NACK消息,允许Writer进行回复的最大延时 | N/A |
lastChangeSequenceNumber | SequenceNumber | 用于分配给下一个Change数据的内部递增的序列号 | N/A |
writer_cache | HistoryCache | 包含历史CacheChange的容器,每个Writer内部保存 | N/A |
nackSuppressionDuration | Duration_t | 针对已经发送的CacheChange,如果在nackSuppressionDuration时间内收到了Reader发送的NACK报文,则可以忽略。 | N/A |
dataMaxSize | 可选属性 | N/A |
RTPSWriter主要提供new构造函数以及new_change函数,new_change函数返回新构造的CacheChange对象。
函数名 | 参数 | 类型 |
---|---|---|
new | Writer | |
attribute_values | 创建Writer以及其父类Endpoint时需要的属性集合 | |
New_change | CacheChange | |
kind | ChangeKind_t | |
data | Data | |
inlineQos | ParameterList | |
handle | InstanceHandle_t |
new操作大致步骤:
1.给如下成员赋值:
GUID:Entity的GUID
unicastlocatorlist:数据通信的单播地址列表
multicastlocatorlist:数据通信的组播地址列表
reliabilityLevel:可靠性等级(Reliable, BestEffort)
topicKind: NoKey, WithKey
pushMode: 主动推送数据还是跟随HeartBeat一起发送数据
heartbeatPeriod:心跳包间隔
nackResponseDelay:对于Reader的Nack包的最长回复延时
nackSuppressionDuration:发送Change数据多少时间内收到Nack子消息是可以忽略的
lastChangeSequenceNumber:数据包的序列号(递增)
- 创建writer_cache:Writer的HistoryCache(用于存放要发送的CacheChange)
New_change的操作步骤大致如下:
递增lastChangeSequenceNumber
新建CacheChange对象
a_change := new CacheChange(kind, this.guid, this.lastChangeSequenceNumber,
data, inlineQos, handle);
返回CacheChange对象
StatelessWriter:
1.StatelessWriter不知道匹配的Reader的数量,也不保存每个匹配的RTPSReader的状态。StatelessWriter只保留要发送数据的Reader的Loator信息以便发送数据。
属性 | 类型 | 含义 | 和DDS的关系 |
---|---|---|---|
reader_locators | ReaderLocator[*] | StatelessWriter保存发送数据的匹配Reader的locator列表,其中包括单播地址和组播地址 | N/A |
StatelessWriter适用于内存较小(HistoryCache可以分配的小一点)或者对传输性能高的场合(例如组播的通信方式就适合数据大的场景)
函数名 | 参数 | 类型 |
---|---|---|
new | StatelessWriter | |
Attribute_values | 创建Statelesswriter和父类Endpint需要的参数集合 | |
reader_locator_add | void | |
a_locator | Locator_t | |
reader_locator_remove | void | |
a_locator | Locator_t | |
Unsent_change_reset | Void |
New函数主要是创建了一个内部空的ReaderLocator列表用于后续添加Reader的Locator
reader_locator_add 函数功能是添加一个远端Reader的Locator(单播或者组播)到内部的ReaderLocatorList中
ADD a_locator TO {this.reader_locators};
reader_locator_remove 函数的功能是从内部的ReaderLocatorList中移除一个Locator
REMOVE a_locator FROM {this.reader_locators};
Unsent_change_reset函数的功能是重置内部ReaderLocatorList中每个ReaderLocator中的highestSentChangeSN的值
FOREACH readerLocator in {this.reader_locators} DO
readerLocator. highestSentChangeSN := 0
ReaderLocator
ReaderLocator是StatelessWriter用于记录所有匹配的远端Reader位置信息的值类型