读写分离功能的背景及架构
当前联邦生产集群的各个子集群只有Active NameNode在工作,当读写任务变得繁忙的时候,只有一个Active负责处理的话,此时集群的响应和处理能力业务侧感知会明显下降,为此,我们将引入Observer架构,实现读写功能的分离,使得Active只负责写请求,而读请求由Observer去负责,并且我们可以引入多个Observer,这将大大增加集群的读写吞吐量和性能,具体架构变更如下:

实现原理
讲解Client、Router、NameNode等子模块如何实现该功能。
RPC原理介绍
纯裸TCP的粘包问题
假设我们需要在 A 电脑的进程发一段数据到 B 电脑的进程,我们一般会在代码里使用 socket 进行编程。

使用纯裸的 TCP 进行字节流的传输会导致无法辨认消息的边界,即粘包现象,发送端的数据包在接收端被不完整地分割或者多个数据包被合并在一起,导致数据边界混乱,接收端无法正确区分数据。
比如,当我们选择使用 TCP 发送 “夏洛"和"特烦恼” 的时候,接收端收到的就是 “夏洛特烦恼” ,这时候接收端没发区分你是想要表达 “夏洛”+“特烦恼” 还是 “夏洛特”+“烦恼” 。

所以我们会把每条要发送的数据都包装一下,比如加入 消息头 ,消息头里写清楚一个完整的包长度是多少,根据这个长度可以继续接收数据,截取出来后它们就是我们真正要传输的 消息体 。

而这里头提到的 消息头 ,还可以放各种东西,比如消息体是否被压缩过和消息体格式之类的,只要上下游都约定好了,互相都认就可以了,这就是所谓的 协议。
协议有很多种,比如 HTTP 和 RPC。
网络的分层图

上面介绍的TCP是传输层的协议,而基于 TCP 造出来的 HTTP 和各类 RPC 协议,它们都只是定义了不同消息格式的 应用层协议 而已。
HTTP
HTTP(Hyper Text Transfer Protocol)协议又叫做 超文本传输协议 。我们用的比较多,平时上网在浏览器上敲个网址就能访问网页,这里用到的就是 HTTP 协议。
RPC
定义
RPC(Remote Procedure Call)又叫做 远程过程调用,它本身并不是一个具体的协议,而是一种 调用方式 。
举个例子,我们平时调用一个 本地方法 就像下面这样。
 res = localFunc(req)
如果现在这不是个本地方法,而是个远端服务器暴露出来的一个方法remoteFunc,如果我们还能像调用本地方法那样去调用它,这样就可以屏蔽掉一些网络细节,用起来更方便,岂不美哉?
res = remoteFunc(req)

基于这个思路,大佬们造出了非常多款式的 RPC 协议,比如比较有名的gRPC,thrift。
值得注意的是,虽然大部分 RPC 协议底层使用 TCP,但实际上 它们不一定非得使用 TCP,改用 UDP 或者 HTTP,其实也可以做到类似的功能。
RPC结构图
RPC 的 核心功能图:

- client以本地调用的方式调用远程服务;
- client stub 接收到调用后负责将方法、参数等组装成能够进行网络传输的消息体(序列化):RpcRequest;
- client stub找到远程服务的地址,并将消息发送到服务提供端;
- Server Stub收到消息将消息反序列化为 Java 对象: RpcRequest;根据RpcRequest中的类、方法、方法参数等信息调用本地的方法;
- Server Stub得到方法执行结果并将组装成能够进行网络传输的消息体:RpcResponse(序列化)发送至消费方;
- client stub接收到消息并将消息反序列化为 Java 对象:RpcResponse,这样也就得到了最终结果。
有了HTTP,为什么还使用RPC
RPC出现时间更早

在Client/Server (C/S) 架构下,客户端(Client) 需要跟服务端(Server) 建立连接收发消息,不涉及标准化,主要使用RPC协议。
在 Browser/Server (B/S) 架构下,比如说Chrome浏览器,需要访问各个公司的服务器,这时候需要统一标准,不然大家没法交流,主要使用HTTP协议。
底层连接形式
底层连接形式,RPC使用连接池,性能更高。

以主流的 HTTP1.1 协议为例,其默认在建立底层 TCP 连接之后会一直保持这个连接(keep alive),之后的请求和响应都会复用这条连接。
而 RPC 协议,也跟 HTTP 类似,也是通过建立 TCP 长链接进行数据交互,但不同的地方在于,RPC 协议一般还会再建个 连接池,在请求量大的时候,建立多条连接放在池内,要发数据的时候就从池里取一条连接出来,用完放回去,下次再复用,可以说非常环保。
传输的内容
HTTP的消息头含有更多的内容,内容非常多的冗余,显得非常啰嗦。


而 RPC,因为它定制化程度更高,消息头冗余较少,可以采用体积更小的 Protobuf 或其他序列化协议去保存结构体数据,同时也不需要像 HTTP 那样考虑各种浏览器行为,比如 302 重定向跳转啥的。因此性能也会更好一些,这也是在公司内部微服务中抛弃 HTTP,选择使用 RPC 的最主要原因。

而我们的HADOOP生产集群的存储系统,服务端和客户端进行远程调用时使用就是Protobuf序列化协议。
protobuf序列化
在hadoop中,大量需要进行RPC调用传输的数据参数是通过protobuf协议去序列化。
使用protobuf进行序列化有如下好处:
- protobuf序列化和反序列化更加高效,简洁,字段在传输过程中只传输序列号
- protobuf支持跨语言,可以根据 .proto文件的配置自动生成各种编程语言的数据结构代码,简化开发过程。
为了在联邦集群中支持observer功能,引入消息namespaceStateIds,主要保存各个ns的最新的state id。
RouterFederatedStateProto
message RouterFederatedStateProto {
  map<string, int64> namespaceStateIds = 1; // Last seen state IDs for multiple namespaces.
}
RpcRequestHeaderProto
message RpcRequestHeaderProto { // the header for the RpcRequest
  enum OperationProto {
    RPC_FINAL_PACKET        = 0; // The final RPC Packet
    RPC_CONTINUATION_PACKET = 1; // not implemented yet
    RPC_CLOSE_CONNECTION     = 2; // close the rpc connection
  }
  optional RpcKindProto rpcKind = 1;
  optional OperationProto rpcOp = 2;
  required sint32 callId = 3; // a sequence number that is sent back in response
  required bytes clientId = 4; // Globally unique client ID
  // clientId + callId uniquely identifies a request
  // retry count, 1 means this is the first retry
  optional sint32 retryCount = 5 [default = -1];
  optional RPCTraceInfoProto traceInfo = 6; // tracing info
  optional RPCCallerContextProto callerContext = 7; // call context
  optional int64 stateId = 8; // The last seen Global State ID
  // Alignment context info for use with routers.
  // The client should not interpret these bytes, but only forward bytes
  // received from RpcResponseHeaderProto.routerFederatedState.
  optional bytes routerFederatedState = 9;
}
RpcResponseHeaderProto
message RpcResponseHeaderProto {
  /**
    * 
    * RpcStastus - success or failure
    * The reponseHeader's errDetail,  exceptionClassName and errMsg contains
    * further details on the error
    **/
  enum RpcStatusProto {
   SUCCESS = 0;  // RPC succeeded
   ERROR = 1;    // RPC or error - connection left open for future calls
   FATAL = 2;    // Fatal error - connection closed
  }
  enum RpcErrorCodeProto {
   // Non-fatal Rpc error - connection left open for future rpc calls
   ERROR_APPLICATION = 1;      // RPC Failed - rpc app threw exception
   ERROR_NO_SUCH_METHOD = 2;   // Rpc error - no such method
   ERROR_NO_SUCH_PROTOCOL = 3; // Rpc error - no such protocol
   ERROR_RPC_SERVER  = 4;      // Rpc error on server side
   ERROR_SERIALIZING_RESPONSE = 5; // error serializign response
   ERROR_RPC_VERSION_MISMATCH = 6; // Rpc protocol version mismatch
   // Fatal Server side Rpc error - connection closed
   FATAL_UNKNOWN = 10;                   // unknown Fatal error
   FATAL_UNSUPPORTED_SERIALIZATION = 11; // IPC layer serilization type invalid
   FATAL_INVALID_RPC_HEADER = 12;        // fields of RpcHeader are invalid
   FATAL_DESERIALIZING_REQUEST = 13;     // could not deserilize rpc request
   FATAL_VERSION_MISMATCH = 14;          // Ipc Layer version mismatch
   FATAL_UNAUTHORIZED = 15;              // Auth failed
  }
  required uint32 callId = 1; // callId used in Request
  required RpcStatusProto status = 2;
  optional uint32 serverIpcVersionNum = 3; // Sent if success or fail
  optional string exceptionClassName = 4;  // if request fails
  optional string errorMsg = 5;  // if request fails, often contains strack trace
  optional RpcErrorCodeProto errorDetail = 6; // in case of error
  optional bytes clientId = 7; // Globally unique client ID
  optional sint32 retryCount = 8 [default = -1];
  optional int64 stateId = 9; // The last written Global State ID
  // Alignment context info for use with routers.
  // The client should not interpret these bytes, but only
  // forward them to the router using RpcRequestHeaderProto.routerFederatedState.
    optional bytes routerFederatedState = 10;
}
状态流转
下图为rbf架构下的状态id流转

Client模块
故障转移代理提供类
当客户端执行hdfs请求时,比如hdfs dfs -cat /t.sh,由于是高可用模式,请求首先会通过故障转移代理提供类获取一个NN的代理,这样客户端就知道请求应该发向哪个NN,在联邦集群中,NN则换成了Router。
目前生产环境常用的有
dfs.client.failover.proxy.provider=org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider
在初始化ConfiguredFailoverProxyProvider类的时候,会获取所有的NN的代理,并且可以random打散,这对rbf集群的router负载均衡非常有用。
而为了在rbf集群中支持读写分离,读请求发送到observer,写请求发送到active,我们需要变更配置类,如下,
dfs.client.failover.proxy.provider=org.apache.hadoop.hdfs.server.namenode.ha.RouterObserverReadConfiguredFailoverProxyProvider
RouterObserverReadConfiguredFailoverProxyProvider主要继承RouterObserverReadProxyProvider,之所以扩展RouterObserverReadProxyProvider,是因为单独使用RouterObserverReadProxyProvider不支持高可用模式。
这是因为RouterObserverReadProxyProvider使用IPFailoverProxyProvider去初始化NN代理,只能识别单个ip。
    public RouterObserverReadProxyProvider(Configuration conf, URI uri, Class<T> xface,
                                           HAProxyFactory<T&g

















