菜鸟笔记
提升您的技术认知

brpc 笔记

bthread(一) 前言bthread(二) 线程模型及bthreadbthread(三) bthread数据结构bthread(四) bthread用户接口和代码执行路径bthread(五) 无锁队列rq的代码实现bthread(六) 小结brpc的精华bthread源码剖析brpc介绍、编译与使用brpc源码解析(一)—— rpc服务添加以及服务器启动主要过程brpc源码解析(二)—— brpc收到请求的处理过程brpc源码解析(三)—— 请求其他服务器以及往socket写数据的机制brpc源码解析(四)—— Bthread机制brpc源码解析(五)—— 基础类resource pool详解brpc源码解析(六)—— 基础类socket详解brpc源码解析(七)—— worker基于ParkingLot的bthread调度brpc源码解析(八)—— 基础类EventDispatcher详解brpc源码解析(九)—— 基础类WorkStealingQueue详解brpc源码解析(十)—— 核心组件bvar详解(1)简介和整体架构brpc源码解析(十一)—— Reducer类和Adder类解析brpc源码解析(十二)—— 核心组件bvar详解 AgentGroup类详解brpc源码解析(十三)—— 核心组件bvar详解(4)combiner详解brpc源码解析(十四)—— 核心组件bvar详解 sampler详解brpc源码解析(十五)—— bthread栈创建和切换详解brpc源码解析(十六)—— 作为client的连接建立和处理详解brpc源码解析(十七)—— bthread上的类futex同步组件butex详解brpc源码解析(十八)—— MPSC队列ExecutionQueue详解brpc源码解析(十九)—— 双buffer数据结构DoublyBufferedData详解brpc源码解析(二十)—— 用于访问下游的Channel类详解

brpc源码解析(十四)—— 核心组件bvar详解 sampler详解

阅读 : 805

这篇来聊一下bvar另外一个很重要的基础组件sampler,也就是采集器,像window等和时间窗口相关的类型需要使用sampler来定时采集,每一个需要采集的bvar对应一个sampler,sampler相关代码在sampler.h和sampler.cpp里,主要包括:
(1)Sample:用于保存采样值的结构体。
(2)SamplerCollector:全局的用来遍历所有的Sampler并执行采样操作的结构。
(3)Sampler:采样器的基类
(4)ReducerSampler:继承自Sampler的用于对Reducer类型的bvar进行采样的采样器

下面就分别介绍下这几种结构的实现。

1. Sample

采集到的采样值使用一个简单的结构体Sample保存,包含值和时间,如下:

2. SamplerCollector

在介绍采样器本身之前,先说下SamplerCollector类,这个类是一个用来遍历所有的Sampler并执行采样操作的全局结构,定义如下:

一眼看过去是不是有点奇怪,SamplerCollector为什么要继承Reducer,这也正是设计比较巧妙的一个地方,注释里也提到了,对于需要定期执行的操作,一种常见的做法是用定时器线程,但一旦sampler多了,效率上是不尽人意的,这里把SamplerCollector定义成模板参数为Sampler指针和一个特定运算符CombineSampler的Reducer。

首先我们来看下这个CombineSampler运算符,做了一个合并,也就是把s1插到s2之前:

而在SamplerCollector的构造函数里,最主要的就是调用pthread_create启动线程执行sampling_thread,实质上就是调用run(),run()函数也是采样的核心函数,首先是一些值的初始化:

Root是整个collector的根节点,也就是串联起来的sampler的起点,consecutive_nosleep是保存连续没有sleep的次数,设定是每隔一秒采集一次,如果一秒内都没有执行完所有的sampler这个值就会加1,表明延迟了,PassiveStatus类型的cumulated_time和使用它的usage则是用来监控执行时间的。

随后是一个死循环,只要_stop不为true就一直执行定时采集任务,采集的部分如下:

首先是调用reset拿到一个sampler的指针s,SamplerCollector作为一个Reducer,reset实质上调用的是combiner的reset,前面的文章介绍过,如下:

根据CombineSampler运算符不难理解,就是把所有的Sampler串起来返回后并重置为NULL,随后将这些sampler指针插入到root节点后面,简单来说就是每次采样前会将新增的sampler加到队列里。

随后则是遍历链表进行采样,需要注意的是会有加锁操作,因为需要和sampler其他的诸如destroy的函数互斥,然后判断s->_used,如果为false,说明已经执行了destroy,从链表移除并delete,否则采样后解锁,nremoved和nsampled这两个计数器没发现在其他哪里使用了,有点奇怪。

一轮采样完成后则是耗时的计算和判断,简单来说就是以函数开头获取的时间为基准时间,纪录本轮耗时,并判断是否超过1s,如果没超过,sleep掉剩余的部分,如果超过了,consecutive_nosleep加1,表明本次出现了延迟,不会sleep而是直接进入了下一轮采集,并且如果达到阈值会打warning日志,没超过则consecutive_nosleep清0。

3. Sampler和ReducerSampler

3.1 基类Sampler


Sampler则是真正和要采集的bvar一一对应的采样器结构的基类,和以前介绍过的agent一样,这里又是奇异递归模板模式,因为全局上sampler都是串起来由一个线程统一调度的,所以要用到链表。包含三个成员函数,分别如下:
(1)take_sample():虚函数,需要继承类实现,真正的采样逻辑就在里面。
(2)schedule():把当前sampler全局注册以实现周期性地调用take_sample():

这个函数很简单,就是调用get_leaky_singleton获得一个全局的单例后调用<<操作符把this指针写进这个单例的tls存储,get_leaky_singleton()的具体实现如下,主要是使用phtread once实现:

(3)destroy:去除这个sampler,停止采样,实质是将_used置为false,这样collector在执行的时候会将其移出链表:

3.2 继承类ReducerSampler

因为只有reducer有window类的需求,继承自Sampler的目前只有ReducerSampler,定义如下:

3.2.1 成员变量


(1)_reducer:sampler对应reducer的指针.
(2)_q:保存采样结果的队列。
(3)_window_size:当前Sampler的窗口指导大小,称他为指导大小是因为实际大小是由_q的大小决定的,在采样过程中会根据_window_size来调整_q的大小。

3.2.2 构造函数


构造函数的入参是reducer的指针,赋值给_reducer,_window_size初始值为1,该值可以通过函数改变,构造函数会调用一次采样函数。

3.2.3 成员函数

(1)take_sample:采样直接执行的采样函数,reduce sampler本身会在构造的时候调用一次,其余都是sampler collector去定期调用。该函数首先会判断队列容量是否小于_window_size,如果是则扩充队列大小,实现上是新建一个BoundedQueue将item挪过去然后交换,老的队列会在析构的时候释放空间,如下:

Check完大小后就是真正的采样过程,如下:

根据操作符是否可以逆向来确定调用的reducer函数,butil::is_same::value如果为true表明没有InvOp,也就是不能逆向,比如maxer miner这种reducer,此类reducer只能每次保存完值重置,取值的时候也需要汇总时段内的所有sampler才能得到值,而有inverse op的,以adder为例,每次记录当前值就行,不需要重置,取值的时候只需首尾要减一下即可。最后通过elim_push将sampler存入队列,elim_push在push前会检测是否full,如果是会pop掉队首。

(2)get_value,用于获取某个时间点到现在的sample值,如下:

主要逻辑也是根据是否能够逆向操作采用不同的方式得到时间点对应的值,比如,调用get_value(10, result),对于maxer,那么就是10秒内的最大值,需要使用10秒内的所有采样汇总得到,对于adder,是10秒内的累加值,只需要将最近的值减去十秒前的值即可。

(3)set_window_size:改变window大小,由WindowBase(window类型基类)直接调用,比较简单,如下:

(4)get_samples:返回从某个时间点开始的所有sample,如下:

4. 总结

基础组件sampler是bvar中很重要的一个组件,bvar除了是一个性能优秀的计数器,设计的最重要的使用场景就是监控,定时采集数据是很重要的一环sampler就扮演了这么一个角色,通过一个全局单例以遍历链表的方式来实现对所有sampler的采集。