一.RTPS协议概述
RTPS协议主要由四个部分组成:
1.发现模块(Discovery)
 发现模块是定义了RTPS的参与者(Participant)获取其他RTPS的参与者(Participant),端点(Endpoint)的协议,使得每个参与者(Participant)能够了解到整个网络中其他参与者的存在并且相互匹配。
 metatraffic通信使得RTPS的参与者(Participant)可以获取到所有Participant,Reader以及Writer的快照并且让本地Reader和远端Writer以及本地Writer和远端Reader之间通信。
 RTPS规范将发现模块拆为两个部分:
 1.1 参与者发现协议(PDP):指定了参与者如何在网络中发现彼此。
 1.2 端点发现协议(EDP):一旦发现彼此,则两个参与者就使用端点发现协议(EDP)交换起包含的端点的信息。
 所有RTPS的实现必须提供简单参与者发现协议(Simple PDP)和简单端点发现协议(Simple EDP)。
2.结构(Structor)
 结构定义了RTPS的各类实体(Entity)与域(Domain)的关系以及各类实体间的关联:

 Entity:所有RTPS实体的父类,每个Entity对象拥有Guid,并且对其他Entity对象可见。Guid属性如下:
struct RTPS_DllAPI GUID_t    // 唯一标识一个Entity
{
    //!Guid prefix    每个Participant和其下的Endpoint都拥有相同的Guid Prefix
    GuidPrefix_t guidPrefix;
    //!Entity id    每个实体的EntityID不同
    EntityId_t entityId;
}
 
 Endpoint:特殊的Entity,是RTPS消息的起点或者终点。Endpoint涉及的属性如下:
| 属性 | 类型 | 含义 | 和DDS的关系 | 
|---|---|---|---|
| unicastLocatorList | Locator_t[*] | 发送RTPS消息的目标Endpoint的单播地址的列表 | 在Discovery过程中配置 | 
| multicastLocatorLi st | Locator_t[*] | 发送RTPS消息的多播地址的列表(可以为空) | 在Discovery过程中配置 | 
| ReliabilityKind | ReliabilityKind_t | 可靠性等级(BestEffort / Reliable) | Qos中指定的 | 
| topicKind | TopicKind_t | 标识当前Endpoint上传输的数据是否有key,key数据的主要类型是InstanceHandle_t,对应了Guid | |
| endpointGroup | EntityId_t | 标记了Endpoint属于哪个RTPSGroup | 和DDS中的Subscriber/Publisher关联 | 
 Participant:是所有Endpoint的容器,共享属性。Participant涉及到的属性如下:
| 属性 | 类型 | 含义 | 和DDS的关系 | 
|---|---|---|---|
| defaultUnicastLocatorList | Locator_t[*] | 默认的单播地址列表(地址+端口) | 在Discovery过程中配置 | 
| defaultMulticastLocat orList | Locator_t[*] | 默认的组播地址列表(地址+端口) | 在Discovery过程中配置 | 
| ProtocolVersion | ProtocolVersion_t | RTPS协议版本号 | |
| VendorId | VendorId_t | RTPS中间商代码 | 需要向OMG组织申请 | 
 Writer:RTPS消息的起点,发送HistoryChange中的CacheChange消息到匹配的Reader的HistoryChange。
 Reader:RTPS消息的终点,接受匹配的Writer发送的CacheChange数据。
 RTPSGroup:主要有两种Group(Publisher和Subscriber)
3.消息(Message)
 消息模块定义了Reader和Writer之间交换的消息格式,主要有消息头(Header)和一系列子消息(SubMessage组成),每个子消息都由一系列子消息元素组成:

Header:
一个消息(Message)中可能包含多个子消息(SubMessage),这些子消息之间可能存在依赖关系/解释关系。
子消息分为实体子消息(Entity SubMessage)和解释子消息(Interpreter SubMessage)。
4.行为(Behavior)
 行为定义了Reader和Writer交换RTPS消息的顺序以及消息引起的相关实体(主要是Reader和Writer)状态的变化,RTPS中主要有行为模块的视线(有状态 Stateful 和 无状态 Stateless)。
 RTPS Writer不控制什么时候从Writer的HistoryCache中删除CacheChange,删除CacheChange的动作是由DDS的Writer来完成的,这个和Qos配置有关,例如如果配置了KEEP_LAST,那么DataWriter只会保留最后一次发送的CacheChange数据,删除之前的Change数据。
 必须实现RTPS MessageReceiver接口(因为涉及到解析子消息时上下文的处理和状态机的切换)
 StatefulWriter和StatefulReader交互数据的流程如下图:
 
4.1 通用要求(General Requirement)
 Writer:
 RTPSWriter发送HistoryCache中的CacheChange数据时,必须按照SequenceNumber的顺序进行发送。
 如果Reader要求inline Qos,则Writer发送数据的时候必须带上Qos数据(in-line Qos)。
 对于StatefulWriter,必须定周期发送HeartBeat子消息(如果HistoryCache中还有数据)。如果HistoryCache中没有数据了,那么StatefulWriter就不需要发送HeartBeat子消息了。StatefulWriter必须持续发送HeartBeat消息给StatefulReader(如果Reader返回的时候NACK)直到Reader读取了所有数据。
 Writer对于Reader的NACK消息进行响应处理(因为这代表Reader有miss的数据需要Writer进行重发)
 Reader:
 因为Stateless的Reader是完全被动的,只接受数据,不发送响应数据。因此,对Reader的通用要求大部分都是针对StatefulReader的。
 如果reader收到的HeartBeat包没有设置final flag标志位,则Reader对于这包HeartBeat必须做出响应,回复ACK或者NACK SubMessage。
 如果reader有数据没有收到(收到HeartBeat比对seq以后),必须回复Nack包告知Writer有数据包丢失需要重传。这个回复可以延迟以避免网络数据风暴。
4.2 具体实现
Writer:
 StatelessWriter和StatefulWriter的主要区别在于持有对端Reader的方式和信息有所不同,类结构的区别如下图:
 
RTPSWriter的主要属性有下面8种:
| 属性 | 类型 | 含义 | 和DDS的关系 | 
|---|---|---|---|
| pushMode | bool | 当属性为true时,Writer主动将change数据推送给reader,反之,change数据随着心跳包一起发送,并且是作为对Reader请求的应答 | N/A | 
| heartbeatPeriod | Duration_t | 周期发送心跳包消息的间隔(心跳包包含可被reader读取的数据的lastchangenumber) | N/A | 
| nackResponseDelay | Duration_t | 对于Reader发送的NACK消息,允许Writer进行回复的最大延时 | N/A | 
| lastChangeSequenceNumber | SequenceNumber | 用于分配给下一个Change数据的内部递增的序列号 | N/A | 
| writer_cache | HistoryCache | 包含历史CacheChange的容器,每个Writer内部保存 | N/A | 
| nackSuppressionDuration | Duration_t | 针对已经发送的CacheChange,如果在nackSuppressionDuration时间内收到了Reader发送的NACK报文,则可以忽略。 | N/A | 
| dataMaxSize | 可选属性 | N/A | 
RTPSWriter主要提供new构造函数以及new_change函数,new_change函数返回新构造的CacheChange对象。
| 函数名 | 参数 | 类型 | 
|---|---|---|
| new | Writer | |
| attribute_values | 创建Writer以及其父类Endpoint时需要的属性集合 | |
| New_change | CacheChange | |
| kind | ChangeKind_t | |
| data | Data | |
| inlineQos | ParameterList | |
| handle | InstanceHandle_t | 
new操作大致步骤:
1.给如下成员赋值:
 GUID:Entity的GUID
 unicastlocatorlist:数据通信的单播地址列表
 multicastlocatorlist:数据通信的组播地址列表
 reliabilityLevel:可靠性等级(Reliable, BestEffort)
 topicKind: NoKey, WithKey
 pushMode: 主动推送数据还是跟随HeartBeat一起发送数据
 heartbeatPeriod:心跳包间隔
 nackResponseDelay:对于Reader的Nack包的最长回复延时
 nackSuppressionDuration:发送Change数据多少时间内收到Nack子消息是可以忽略的
 lastChangeSequenceNumber:数据包的序列号(递增)
- 创建writer_cache:Writer的HistoryCache(用于存放要发送的CacheChange)
 
New_change的操作步骤大致如下:
 递增lastChangeSequenceNumber
 新建CacheChange对象
 a_change := new CacheChange(kind, this.guid, this.lastChangeSequenceNumber,
						    data, inlineQos, handle);
 
 返回CacheChange对象
StatelessWriter:
1.StatelessWriter不知道匹配的Reader的数量,也不保存每个匹配的RTPSReader的状态。StatelessWriter只保留要发送数据的Reader的Loator信息以便发送数据。
| 属性 | 类型 | 含义 | 和DDS的关系 | 
|---|---|---|---|
| reader_locators | ReaderLocator[*] | StatelessWriter保存发送数据的匹配Reader的locator列表,其中包括单播地址和组播地址 | N/A | 
StatelessWriter适用于内存较小(HistoryCache可以分配的小一点)或者对传输性能高的场合(例如组播的通信方式就适合数据大的场景)
| 函数名 | 参数 | 类型 | 
|---|---|---|
| new | StatelessWriter | |
| Attribute_values | 创建Statelesswriter和父类Endpint需要的参数集合 | |
| reader_locator_add | void | |
| a_locator | Locator_t | |
| reader_locator_remove | void | |
| a_locator | Locator_t | |
| Unsent_change_reset | Void | 
New函数主要是创建了一个内部空的ReaderLocator列表用于后续添加Reader的Locator
reader_locator_add 函数功能是添加一个远端Reader的Locator(单播或者组播)到内部的ReaderLocatorList中
ADD a_locator TO {this.reader_locators};
reader_locator_remove 函数的功能是从内部的ReaderLocatorList中移除一个Locator
REMOVE a_locator FROM {this.reader_locators};
Unsent_change_reset函数的功能是重置内部ReaderLocatorList中每个ReaderLocator中的highestSentChangeSN的值
FOREACH readerLocator in {this.reader_locators} DO
 readerLocator. highestSentChangeSN := 0
ReaderLocator
ReaderLocator是StatelessWriter用于记录所有匹配的远端Reader位置信息的值类型



















