菜鸟笔记
提升您的技术认知

brpc 笔记

bthread(一) 前言bthread(二) 线程模型及bthreadbthread(三) bthread数据结构bthread(四) bthread用户接口和代码执行路径bthread(五) 无锁队列rq的代码实现bthread(六) 小结brpc的精华bthread源码剖析brpc介绍、编译与使用brpc源码解析(一)—— rpc服务添加以及服务器启动主要过程brpc源码解析(二)—— brpc收到请求的处理过程brpc源码解析(三)—— 请求其他服务器以及往socket写数据的机制brpc源码解析(四)—— Bthread机制brpc源码解析(五)—— 基础类resource pool详解brpc源码解析(六)—— 基础类socket详解brpc源码解析(七)—— worker基于ParkingLot的bthread调度brpc源码解析(八)—— 基础类EventDispatcher详解brpc源码解析(九)—— 基础类WorkStealingQueue详解brpc源码解析(十)—— 核心组件bvar详解(1)简介和整体架构brpc源码解析(十一)—— Reducer类和Adder类解析brpc源码解析(十二)—— 核心组件bvar详解 AgentGroup类详解brpc源码解析(十三)—— 核心组件bvar详解(4)combiner详解brpc源码解析(十四)—— 核心组件bvar详解 sampler详解brpc源码解析(十五)—— bthread栈创建和切换详解brpc源码解析(十六)—— 作为client的连接建立和处理详解brpc源码解析(十七)—— bthread上的类futex同步组件butex详解brpc源码解析(十八)—— MPSC队列ExecutionQueue详解brpc源码解析(十九)—— 双buffer数据结构DoublyBufferedData详解brpc源码解析(二十)—— 用于访问下游的Channel类详解

brpc源码解析(二十)—— 用于访问下游的Channel类详解

阅读 : 1085

以前写过访问下游的一些源码解析,但更多的是从整个处理流程上过了一遍,能够快速地熟悉处理过程,但对每个组件的具体细节并没有过多深入,有点走马观花的感觉。相比较于那会现在对整个brpc的熟悉程度也高了很多,因此打算换个角度逐个介绍下各个涉及到的组件。

brpc的客户端和服务端的通信都是基于protobuf的,换句话说,不管是什么样的协议,最终都是走到了protobuf的相关接口,比如访问下游,不管是是访问pb service还是http接口,最终都是调用的channel的CallMethod,protobuf里虽然没有rpc的具体实现,但却有rpc的定义,而channel则是protobuf里定义的一个rpc访问相关概念,protobuf定义的相关类主要有三个,brpc也是基于这几个概念实现的rpc,这里贴下官方文档:

1.RpcController:An RpcController mediates a single method call. The primary purpose of the controller is to provide a way to manipulate settings specific to the RPC implementation and to find out about RPC-level errors. The methods provided by the RpcController interface are intended to be a “least common denominator” set of features which we expect all implementations to support. Specific implementations may provide more advanced features (e.g. deadline propagation).

2.RpcChannel:Abstract interface for an RPC channel. An RpcChannel represents a communication line to a Service which can be used to call that Service’s methods. The Service may be running on another machine. Normally, you should not call an RpcChannel directly, but instead construct a stub Service wrapping it.

3.Service:Abstract base interface for protocol-buffer-based RPC services. Services themselves are abstract interfaces (implemented either by servers or as stubs), but they subclass this base interface. The methods of this interface can be used to call the methods of the Service without knowing its exact type at compile time (analogous to Reflection).

文档已经说得很清楚了,这里不再赘述,而其中的channel代表和一台或一组服务器的交互通道,我们可以将channel理解为rpc client。所有类型的调用最终都是会调用channel的CallMethod进行访问,区别只是在于不同的协议打包方式等不一样,

brpc基于pb的protobuf实现了一系列的适用于不同场景的channel,分为两类,一次访问单个下游的基础channel和可以同时访问多个server的组合channel,组合channel包括ParallelChannel、SelectiveChannel和PartitionChannel。这篇文章就来详细聊聊brpc种最常用的基础channel。

一、ChannelBase类定义

所有的Channel都是继承自ChannelBase类,因此可以方便地进行各种channel的组合,ChannelBase定义如下:

整个类很简单,首先是继承自protobuf的RpcChannel和Describable,后者是用来在监控页面上展示相关状态信息的。Weight()函数需要被子类实现,代表当前Channel的权重,之所以要被子类实现是因为不同的channel权重的定义不一样。比如普通Channel的权重是当前Channel连接的所有server的和,而Parallelchannel的权重则是所有子Channel中的最小权重。CheckHealth()健康检查对于不同类型的channel实现也不一样。

二、Channel类基本定义

这个就是基础Channel,也是最常用的Channel,基本定义如下:

注释里给出了一个标准的protobuf rpc调用使用Channel的方式,由于brpc的Channel是继承自protobuf的RpcChannel的,因此可以直接传给protobuf生成的xxx_Stub使用,而在http访问的场景中,我们会直接调用Channel里的方法进行访问。

三、Channel参数

3.1 ChannelOptions定义

Channel在初始化的时候需要一系列的参数,均定义在ChannelOptions里,这里整体过一遍,不得不说brpc的注释是真的挺详细。

(1)connect_timeout_ms是连接超时,注意这个超时时间是rpc框架内部的连接超时,和TCP的连接超时之间是独立的,如果TCP超时比这个connect_timeout_ms小的话会在它未达到的时候由于TCP连接超时而出错。
(2)timeout_ms控制的是一次rpc请求的超时。
(3)backup_request_ms顾名思义是发送备用请求的时间,即一次rpc访问到了一定时间还没返回则选择同一个名字服务内的另一个服务器再发一次,随后会取用先返回的结果。
(4)max_retry是channel上所有RPC的默认最大重试次数,0表示不重试,重试时框架会尽量避开之前尝试过的
server,注意brpc只有在以下条件都满足的情况下才会进行重试:

  • 连接出错。如果server一直没有返回,但连接没有问题,这种情况下不会重试。如果需要在一定时间后发送另一个请求,使用backup request。
  • 没到超时时间。
  • 有剩余重试次数。Controller.set_max_retry(0)或ChannelOptions.max_retry = 0可关闭重试。
  • 重试对错误可能有效。比如请求有错时(EREQUEST)不会重试,因为server总不会接受,没有意义。

用户还可以通过继承baidu::rpc::RetryPolicy自定义重试条件。

(5)protocol用于指定该Channel使用的协议,brpc是个支持多协议的框架,在初始化Channel的时候会利用这个protocal对象从options.proto定义的协议中选取要用的协议,
(6)连接类型connection_type则是Channel使用的连接方式,每个协议都有自己的默连接方式,如果不指定这个字段就是使用默认值。
(7)succeed_without_server可以在ns中没有可用server的情况下创建Channel,待后续添加了server可以正常使用。
其余几个注释说得很明白了,不再赘述,需要注意的是,auth、retry_policy和ns_filter都是裸指针,需要在channel使用的过程中保证它们的生命周期。

顺便提一下Channel中ChannelOption的默认值如下:

3.2 根据ChannelOptions为Channel设置参数

InitChannelOptions则是在Channel init过程中用来解析ChannelOption 进行相关初始化操作的,如下:

以上是协议指定相关的,因为是作为client所以需要指定打包和序列化的方法,_get_method_name是自定义method name,用的比较少,现有的协议基本都是NULL。

然后是连接方式选择相关的,以前的文章也介绍过,在协议支持的前提下如果用户指定了连接方式,则会以用户指定的为准,否则按单连接、连接池、短连接的优先级进行自动选择,因为绝大部分情况下,从性能和开销上来说也是这个顺序。

_preferred_index是优先考虑的协议index,在Channel初始化的过程中这个字段自然是来自于Option指定的协议,这里把这个值保存下来是为了让controller在使用Channel的时候能获取到,进而传导给socket,以指导处理server返回数据时的协议选择。

四、Channel初始化

Channel的初始化可以理解为将Channel连接到服务端,当然实际的连接需要等到用的时候才会真正发生,既可以连接单台服务器,也可以连接一组某个名字服务下的服务器,相关函数定义如下:

连接单台服务器有三种重载,本质上都是指定ip port和option。而根据名字服务连接则是指定名字服务url、负载均衡策略和option。支持的名字服务和负载均衡策略注释里都列出来了。

具体到实现上,不考虑不同参数的重载版本,核心逻辑在以下两个函数:

以上是连接单台服务器使用的init,GlobalInitializeOrDie()是利用pthread_once执行协议和负载均衡方法注册之类的操作,全局只需要执行一次。随后将这个endpoint插入到socketmap里并将对应的socket_id保存到_server_id里供后续使用,socketmap是一个保存某远端到socketid映射的结构,这里的远端指的就是某个确定的ip point,一个名字服务下面的多个ip port属于不同的远端。

以上是用于连接名字服务的init,和连接单机最大的区别就在于不是去创立单台的socket,而是新建一个和名字服务对应的load balancer,后续在实际访问的时候,由controller使用该loadbalancer选取当前请求需要使用的endpoint。

五、核心调用函数CallMethod

整个Channel里面,最核心的是CallMethod,无论什么的协议,对于下游的访问最终都会走到整个函数里面,调用完CallMethod,整个请求就算完成了,这个函数输入输出都很明确,注释也说得很清楚,protobuf生成的XXX_stub在发起rpc调用的时候内部也是调的这个函数,如下:

因为controller才是和一次请求强绑定的,CallMethod里的各种操作基本都是围绕controller展开的,首先是一些初始化计时和重试之类的设定,注意这个static_cast,传入的其实是brpc的Controller类型, 它是protobuf RpcController的子类,但是为了使用protobuf的接口,传入的时候用的是父类的指针。

随后设定协议和重试机制,callid是和一次rpc请求一一对应的用于同步的对象,bthread_id_lock_and_reset_range是锁定这个callid并且重置range,这里的range是用来处理版本相关的问题,这里不展开讨论,后面单独聊一聊bthread::Id这个数据结构,这里知道是用于同步的就行,

接下来这一部分是供rpcz等trace功能使用的,在没有sender进行rpc调用拦截并且IsTraceable为true的情况下记录相关信息生成span赋值给对应的controller,一个span对应着一次请求。

随后是针对此次请求的controller进行相关赋值,主要分为两部分,一是各个超时参数如果未设置,使用Channel的option来补上,比较特别的是连接超时,由于连接是各个rpc调用之间大量复用的,单独为cntl设置没有意义,因此直接使用Channel option里的。二是这次请求实际内容相关的response、打包函数、调用的远端方法和鉴权方式的指定。

接着首先是根据Channel指定controller的远端地址,如果是单server直接给对应server的socketid和一个EndPoint类型的地址。如果是根据名字服务访问则给cntl传loadbalancer,用于实际访问的时候选取server。

然后是check如果是pthread模式下,当前pthread code队列是否超限,是的话直接返回失败,brpc提供了FLAGS_usercode_in_pthread来将用户代码运行在pthread里以满足一些特殊需求。

再往后是真正在发请求前check是controller否已经是failed的了,比如其他地方调用controller的setfailed会让FailedInline()返回true,这里用了两次check,中间夹了一个相对耗时的序列化要发送的数据的函数_serialize_request,应该是为了提高发送前拦截的成功率。

最后一个check是判断如果是streaming rpc则关闭重试和backup request,因为目前不支持。

随后则是设置timer和当前rpc请求完成的deadline:_abstime_us,超时和backup request等能力都需要用到timer。分为三种情况,一是backup_request_ms设置有效的情况,二是没有有效backup_request_ms但是有有效timeout_ms地情况,三是二者皆无的情况,如下:

第一种情况会用HandleBackupRequest作为回调set一个用于backup_request的timer,也就是到了指定时间后会发起backup请求,同时也会指定当前rpc请求完成的deadline。

第二种只有超时,因此设定的是HandleTimeout作为回调的timer和相应的额deadline,到点后需要处理超时。
HandleBackupRequest和HandleTimeout如下:

bthread_id_error里会执行相应的真正的处理函数,也就是注释里的的“on_error”。
一切准备工作准备就绪后,就是真正发起访问了,也就是调用controller的IssurRPC,这部分以前的博客讲过,这里就先不展开了,发起访问后会根据是不是异步请求判断是否需要join等待请求完成。

六、Weight()和CheckHealth()

所有的类型的Channel都会实现Weight()和CheckHealth(),而且Weight是强制实现,这两个函数主要用于负载均衡。

  • 对于基础Channel在有lb的情况下Weight()返回的是lb的权重否则是0,这个会被用于组合Channel Weight的计算,比如ParallelChannel的Weight是根据子Channel的Weight计算的。
  • 单个服务器健康检查是通过判断对应的socketid是否有效,基于名字服务的则是根据对应的lb是否能选取到有效的server。

    【参考】
    brpc官方文档