菜鸟笔记
提升您的技术认知

brpc 笔记

bthread(一) 前言bthread(二) 线程模型及bthreadbthread(三) bthread数据结构bthread(四) bthread用户接口和代码执行路径bthread(五) 无锁队列rq的代码实现bthread(六) 小结brpc的精华bthread源码剖析brpc介绍、编译与使用brpc源码解析(一)—— rpc服务添加以及服务器启动主要过程brpc源码解析(二)—— brpc收到请求的处理过程brpc源码解析(三)—— 请求其他服务器以及往socket写数据的机制brpc源码解析(四)—— Bthread机制brpc源码解析(五)—— 基础类resource pool详解brpc源码解析(六)—— 基础类socket详解brpc源码解析(七)—— worker基于ParkingLot的bthread调度brpc源码解析(八)—— 基础类EventDispatcher详解brpc源码解析(九)—— 基础类WorkStealingQueue详解brpc源码解析(十)—— 核心组件bvar详解(1)简介和整体架构brpc源码解析(十一)—— Reducer类和Adder类解析brpc源码解析(十二)—— 核心组件bvar详解 AgentGroup类详解brpc源码解析(十三)—— 核心组件bvar详解(4)combiner详解brpc源码解析(十四)—— 核心组件bvar详解 sampler详解brpc源码解析(十五)—— bthread栈创建和切换详解brpc源码解析(十六)—— 作为client的连接建立和处理详解brpc源码解析(十七)—— bthread上的类futex同步组件butex详解brpc源码解析(十八)—— MPSC队列ExecutionQueue详解brpc源码解析(十九)—— 双buffer数据结构DoublyBufferedData详解brpc源码解析(二十)—— 用于访问下游的Channel类详解

brpc源码解析(六)—— 基础类socket详解

阅读 : 567

1 基本介绍

Socket在计算机领域通常指套接字,这里说的brpc里面的socket指的是一种用于方便在多线程环境下使用包括套接字在内的fd等资源的通用结构,前面的文章介绍brpc服务器启动以及发送接收请求相关内容的时候就多次设计到了这个数据结构。下面先贴一段官方文档的描述:

和fd相关的数据均在Socket中,是rpc最复杂的结构之一,这个结构的独特之处在于用64位的SocketId指代Socket对象以方便在多线程环境下使用fd。常用的三个方法:
Create:创建Socket,并返回其SocketId。
Address:取得id对应的Socket,包装在一个会自动释放的unique_ptr中(SocketUniquePtr),当Socket被SetFailed后,返回指针为空。只要Address返回了非空指针,其内容保证不会变化,直到指针自动析构。这个函数是wait-free的。
SetFailed:标记一个Socket为失败,之后所有对那个SocketId的Address会返回空指针(直到健康检查成功)。当Socket对象没人使用后会被回收。这个函数是lock-free的。
可以看到Socket类似shared_ptr,SocketId类似weak_ptr,但Socket独有的SetFailed可以在需要时确保Socket不能被继续Address而最终引用计数归0,单纯使用shared_ptr/weak_ptr则无法保证这点,当一个server需要退出时,如果请求仍频繁地到来,对应Socket的引用计数可能迟迟无法清0而导致server无法退出。另外weak_ptr无法直接作为epoll的data,而SocketId可以。这些因素使我们设计了Socket,这个类的核心部分自14年10月完成后很少改动,非常稳定。

可以看到,设计这个类最主要原因是简化多线程环境下对fd等资源的复用,在网络编程里fd是会被频繁复用的。我觉得这段文档描述得很好,用shared_ptr/weak_ptr做了类比,这里顺便简单说下shared_ptr/weak_ptr的机制。这俩都是智能指针,智能指针内存管理要解决的根本问题是:一个堆对象在被多个对象引用的情况下,何时释放资源。我们需要做的就是在最后一个引用它的对象被释放的时候释放它,难点在于确定哪个引用它的对象是被最后释放的。

std::shared_ptr确定最后一个引用它的对象何时被释放的基本想法是:对被管理的资源进行引用计数,当一个shared_ptr对象要共享这个资源的时候,该资源的引用计数加1,当这个对象生命期结束的时候,再把该引用技术减少1。这样当最后一个引用它的对象被释放的时候,资源的引用计数减少到0,此时释放该资源。std::shared_ptr的问题在于环状引用情况下会出现引用计数器无法清零从而导致内存泄漏,而weak_ptr可以解决此问题,weak_ptr对象引用资源时不会增加引用计数,但是它能够通过lock()方法来判断它所管理的资源是否被释放,在需要访问资源的时候weak_ptr为你生成一个shared_ptr,shared_ptr能够保证在shared_ptr没有被释放之前,其所管理的资源是不会被释放的。创建shared_ptr的方法就是lock()方法。换句话说,weak_ptr只有在真正访问资源的时候才会临时申请sharedptr锁住进行访问,如果lock()的时候资源已经释放了是会失败的,但一旦lock成功了能保证使用过程不被释放,此时如果原来的 std::shared_ptr 被销毁,则该对象的生命期将被延长至这个临时的 std::shared_ptr 同样被销毁为止。具体可以参考这篇文章:https://www.cnblogs.com/diegodu/p/6370736.html

上面的官方文档说到了Socket类似shared_ptr,SocketId类似weak_ptr,因为总体机制很类似,socketId是对socket的弱引用,不加引用计数的那种,但SetFailed可以保证引用计数的归零,因为shared_ptr/weak_ptr机制下资源只要源源不断地被新来的请求所引用,计数器就无法归0,而且SocketId可以作为epoll的data。

2 主要函数详解

2.1 Create

这个函数功能很明确,创建Socket,并返回其SocketId,这里说的创建是从使用层面上来讲的,实际上为了节省开销,socket是可以重复使用的。该函数里,主要是从上篇文章解析过的resource pool里取出socket,用传入的option进行各变量的赋值,删除了部分变量赋值和容错代码的核心部分如下:

从resource pool里面获取到一个socket后,将option的各字段赋值给响应变量,最主要的有远端地址,用户回调函数等。bthread_id是一个特殊的同步结构,它可以互斥RPC过程中的不同环节,也可以O(1)时间内找到RPC上下文(即Controller)。创建用作认证的authid,并调用bthread_id_list_init初始化一个用来保存正在处理的rpc请求的调用id。调用MakeSocketId 创建socketID赋值给_this_id,最后也会赋值给参数id,用于外部调用。随后的关键一步是调用ResetFileDescriptor来处理fd,里面有个关键步骤如下:

如果_on_edge_triggered_events不为null,那么会在event dispatcher里面注册边缘触发的EPOLLIN事件,也就是上图的AddConsumer的调用,将fd注册到epoll里,epoll事件的data是socketID,event dispatcher是brpc里用于分发epoll事件的组件,后面的文章再详细介绍。

2.2 Address

Address函数定义如下:

其中SockeUniquePtr就是个unique_ptr,定义如下:

这个函数功能上很明确,就是传入一个socket id,并把ptr指向根据id找到的socket,SocketId = 32-bit version + 32-bit slot,前32位是版本,后32位是resource pool中真正的id,如果socket失效了,会尝试将其销毁并归还到resource pool里面。首先根据slot从resource pool获取socket实例m,如果为null直接返回-1失败。

否则对m的引用数+1,然后判断socketid的版本和socket实际的版本是否匹配匹配,如果匹配说明socket有效,直接把拿到的socket指针包装在SocketUniquePtr里返回。顺便说下,__builtin_expect是gcc指令,和普通的条件语句的区别在于额外告诉编译器m!=NULL大概率为真,用于做指令跳转的优化。如果socketId的版本和socket实际的版本不匹配,说明有操作让socket失效了,后续尝试进行回收操作,先用fetch_sub减掉先加的1,


减掉一之后进行引用计数的判断,注意这个nref是sub之前的,也就是本次操作加完一之后的,如果大于1,说明本次操作之前计数就不为0,因此不进行后续操作直接返回。如果小于1,说明已经被其他地方释放回收了,如果等于1,则尝试进行释放和归还到resource pool,在尝试释放的时候先判断ver2 & 1是否为真,也就是当前版本号是奇数,如果是偶数,加上版本不一致的前提,说明已经被其他地方销毁归还了,只有版本不一致的奇数才有可能需要被回收。内层判断条件ver1 == ver2 || ver1 + 1 == ver2分别对应了一直是奇数没变和ver1是偶数,ver2新增了1的情况;后者相对有点晦涩,比如初始版本是0,已经经过一轮循环成了2,也就是说socket已经变了,但是因为resource pool重用会导致版本一直累加,这个时候用老的socketid去adress先看到了2,但随后ver2可能会为3,虽然整体上都是下一个socket了,但是ver1和ver3对应的sockte是一致的,所以个人认为这两个条件确保了ver1和ver2对应同一个socket,具体可以参考评论区的讨论。内部则是一个有竞争的对_versioned_ref的cas操作,成功了就回收。失败有两种情况,要么是版本变了,说明其他地方已经执行了释放回收,要么是ref变了,也就是预期是0但其实不是,说明有其他地方address发现版本不一致后ref又加了1,这种情况等其他地方的去释放回收就行,之所以逻辑这么复杂是主要是为了实现address的wait-free。

2.2 SetFailed

前面提到了,SetFailed是用于标记和某个id关联的socket失效的,某个socketid一旦被setfailed,之后对其的Adress都会返回null,需要注意的是,这个函数调用不会让对应的socket马上回收,而是在没有人引用它的时候才会被回收。

首先是拿socketid中提取的版本和_versioned_ref中提取的版本做比对,如果不相等,说明已经有其他地方让这个socket失效了,直接返回。然后用compare exchange strong判断版本和引用计数是不是都没变,如果没变,则把版本加一更新到socket里的versionref,在此之后,对该socket的addres就会返回null。如果变了,说明有其他地方对socket进行了操作,需要重新来一次循环,直到版本不匹配或者compare exchange strong成功,成功后进行一系列的释放相关的操作,注意最后调用的ReleaseAdditionalReference,该函数内部是调用Dereference,


使用_recycle_flag来保证只会执行一次附加dereference,调用ReleaseAdditionalReference如果引用本来就是1会直接销毁归还,否则就可以让address在版本不一致并且ref由1->0或者socketuniqueptr调用deleter发现ref由1->0的时候去尝试销毁归还socket。

2.3 Dereference

Dereference顾名思义,用于对对应socket执行减1引用,SocketUniquePtr的deleter和setfailed成功后的ReleaseAdditionalReference会调用它。函数代码如下:

首先是对引用计数执行原子操作减1,减之前引用计数大于1,说明还有人在用,直接返回即可,若等于1,则说明已经没其他地方在用了,因为Create的时候就对引用计数加了1,没其他地方在用则进一步判断进行回收相关操作,注意图中ver id_ver || ver id_ver + 1这个判断条件ver id_ver表示当前版本没有变化,意味着没有被setfailed,属于用完了没其他人用了就回收的情况,ver id_ver + 1则是说明先前被setfailed了,无论哪种情况都需要销毁这个socket并返回给resource pool。内部对_versioned_ref的判断和Address类似,版本变了或者ref数变了说明已经在其他地方dereference或者其他地方后面会进行dereference,不处理直接返回。这部分和Adress的相应部分是有竞争的。