菜鸟笔记
提升您的技术认知

brpc 笔记

bthread(一) 前言bthread(二) 线程模型及bthreadbthread(三) bthread数据结构bthread(四) bthread用户接口和代码执行路径bthread(五) 无锁队列rq的代码实现bthread(六) 小结brpc的精华bthread源码剖析brpc介绍、编译与使用brpc源码解析(一)—— rpc服务添加以及服务器启动主要过程brpc源码解析(二)—— brpc收到请求的处理过程brpc源码解析(三)—— 请求其他服务器以及往socket写数据的机制brpc源码解析(四)—— Bthread机制brpc源码解析(五)—— 基础类resource pool详解brpc源码解析(六)—— 基础类socket详解brpc源码解析(七)—— worker基于ParkingLot的bthread调度brpc源码解析(八)—— 基础类EventDispatcher详解brpc源码解析(九)—— 基础类WorkStealingQueue详解brpc源码解析(十)—— 核心组件bvar详解(1)简介和整体架构brpc源码解析(十一)—— Reducer类和Adder类解析brpc源码解析(十二)—— 核心组件bvar详解 AgentGroup类详解brpc源码解析(十三)—— 核心组件bvar详解(4)combiner详解brpc源码解析(十四)—— 核心组件bvar详解 sampler详解brpc源码解析(十五)—— bthread栈创建和切换详解brpc源码解析(十六)—— 作为client的连接建立和处理详解brpc源码解析(十七)—— bthread上的类futex同步组件butex详解brpc源码解析(十八)—— MPSC队列ExecutionQueue详解brpc源码解析(十九)—— 双buffer数据结构DoublyBufferedData详解brpc源码解析(二十)—— 用于访问下游的Channel类详解

brpc源码解析(二)—— brpc收到请求的处理过程

阅读 : 816

作为rpc服务器,在启动过后,最主要的一个过程就是收到请求后的处理,而这就牵涉到一个网络编程相关最基本的部分:如何有效地处理socket传过来地数据,这篇文章就来详细聊一聊brpc是怎么处理收到的请求的。

一、基本设计思路

作为服务器,业界最典型的实现就是区分I/O线程和工作线程,一个或多个I/O线程负责从socket读取数据放入一个队列,然后一堆worker线程来从队列里取数据并处理,或者I/O线程读完数据直接交给worker,此类严格区分I/O线程和worker线程的机制会有几种典型的问题:

1.一个I/O线程同时只能读取一个fd,I/O线程通常只有一个或者几个,如果某个fd上数据量很大,读取很耗时会容易导致其他fd数据的阻塞。
2.如果要操作队列会产生竞争,高并发的时候严重影响性能。
3.当前主流的多核cpu机制下,各个核之间进行cache同步是微妙级的,并不是很快,容易出现cache bouncing,I/O线程将任务交给worker涉及到跨核,效率不是很高,之间的同样cpu缓存到内存之间交换数据也比较慢。

和很多rpc框架不同,brpc有一套独特的机制来解决上述问题,总的来说就是,不区分I/O线程和worker线程,通过全局的EventDispatcher来负责监听fd上的事件,如果fd上已经有bthread在处理了直接返回,否则启动一个bthread处理,被EventDispatcher启动的bthread负责读取fd上的消息,每读到一条就启动bthread去处理,最后一条原地执行,直到读完为止,因为EventDispatcher本身既不负责读消息也不负责处理消息,自身吞吐量可以做得很大,fd间和fd内的消息都能获得并发,所有线程都是wait-free的,这使得brpc在高负载时仍能及时处理不同来源的消息。下面就结合源码详细阐述相关机制。

二、实现细节

在上篇文章里已经介绍过了,服务器启动后会由EventDispatcher监听epoll事件,作为服务器,收到一个新连接属于epoll_in事件,会调用Socket::StartInputEvent进行处理。StartInputEvent函数如下:

StartInputEvent函数里,首先是调用Address方法根据SocketId获取socket s,因为一个fd上会不断地发生事件,socket类里面设置了一个butil::atomic _nevent变量,用来保证对于一个fd,同时只会有一个bthread在处理,当收到事件时,EventDispatcher给nevent加1,只有当加1前的值是0时启动一个bthread处理对应fd上的数据,前值不为0说明已经有bthread在处理该fd上的数据了,可以直接返回,因为正在处理的bthread会一直读下去。总的来说StartInputEvent仅仅是负责判断发生事件的fd上有没有bthread在处理了,没有就原地启动一个,有就直接返回,注意调用的是bthread_start_urgent,这个函数启动bthread会让出当前bthread,也就是官方文档所说的,“EventDispatcher把所在的pthread让给了新建的bthread,使其有更好的cache locality,可以尽快地读取fd上的数据”,不需要pthread切换对性能有帮助”。如果因为某些原因bthread_start_urgent启动失败,则原地执行ProcessEvent兜底。

调用的ProcessEvent如下:

也就是执行socket 里的_on_edge_triggered_events,对于目前收到新连接请求的情况,事件发生在监听端口上,对应的_on_edge_triggered_events是上篇文章提到的StartAccept里赋值的OnNewConnection,也就是来了新连接要调用的回调函数

OnNewConnections内部调用OnNewConnectionsUntilEAGAIN函数,OnNewConnectionsUntilEAGAIN核心如下:

首先accept监听fd来得到clinet fd

然后调用Create新建一个socket来具体处理本次新连接的后续消息,在里面添加了一个新的epoll_in 事件,注册的处理函数是InputMessenger::OnNewMessages,注意和前面的OnNewConnections对比,前面是处理新连接,这个是新连接处理后来处理新连接上的新消息。在InputMessenger::OnNewMessages函数里面,核心是读取消息然后交由相应协议的处理函数,进而调用用户服务里的实际业务函数。

对于clinet fd上收到的epoll_in事件,入口仍然是StartInputEvent,只不过随后的ProcessEvent调用的处理函数是InputMessenger::OnNewMessages,核心部分代码如下:

一直读取直到读完。

做一次读取

messenger是InputMessenger对象,负责从fd上切割和处理消息。上篇文章介绍了brpc的自定义协议,每个协议其中最重要的就是ParseXXXmessage和ProcessXXX,分别对应处理消息的两个基本步骤,Parse和Process,Parse一般是把消息从二进制流上切割下来,运行时间较固定;Process则是进一步解析消息(比如反序列化为protobuf)后调用用户回调,时间不确定。因为brpc是支持单端口多协议的,因此InputMessenger会逐一尝试用户指定的种协议的回调,当某一个Parse成功切割下一个消息后,调用对应的ProcessXXX。由于一个连接上往往只有一种消息格式,InputMessenger会记录下上次的选择,而避免每次都重复尝试。


QueueMessage是启动一个bthread执行ProcessInputMessage来处理一个msg。若连续从某个fd读取出n个消息(n > 1),InputMessenger会启动n-1个bthread分别处理前n-1个消息,最后一个消息则会在原地被Process。
根据handler里的解析器处理消息得到解析后的结果
ProcessInputMessage如下,调用的是msg->_process,也就是对应协议的process_request来处理cut下来的message:

这里实现的关键就是last_msg,定义如下,是一个自定义deleter的unique_ptr:

每次循环都会去执行对last_msg的处理,如果last_msg非null,则新启一个bthread去处理,在第一次循环,也就是处理第一个msg的时候,last_msg为null,随后的每一次循环都会先调用QueueMessage处理last_msg,

随后last_msg置成这次循环的msg,待下次循环处理,分别会对msg->_process (消息处理函数)和msg->_arg 赋值供后续上述ProcessInputMessage调用

所以最后一次循环完了后还剩一个last_msg没有处理,怎么办呢,last_msg的deleter是RunlastMessage,如下:

RunlastMessage跟queueMessage一样都是调用的ProcessInputMessage,析构的时候会自动调用deleter,从而实现最后一个msg原地调用。
Msg->_process是对应协议的process_request请求处理函数,其中最终会调用用户函数,以http协议为例,Msg->_proces指向的是协议注册时我们看到的ProcessHttpRequest,

可以看到msg作为参数进来后,通过msg->arg()拿到了指向server的指针,msg->arg()返回的就是msg->_arg,前面图里可以看到msg->_arg 所赋予的是handler 的arg,回忆一下,handler 的arg其实就是所启动的server的指针,是在BuildAcceptor()里面添加各种handler的时候赋值的:

ProcessHttpRequest核心处理部分如下:
先拿到MethodProperty sp

得到service和method

调用业务代码,比如echo示例:


具体的打包发送则是通过调用done->Run(),在推荐用法里,由RAII机制保证,也就是上图的done_guard。

三、总结

brpc在处理接收到的请求的时候,不区分io线程核worker线程,二是利用EventDispatcher基于边缘触发的epoll做event的分发,这样并发可以做得很大,具体的消息切割、协议相关的请求处理、以及后续的用户逻辑均交由独立的bthread来处理,实现了wait-free,不用担心某个请求io耗时过大造成批量IO阻塞,也没有生产者消费者队列所带来的瓶颈。并且是让出当前pthread的方式进行调度,保证了cache locality。以上消息处理上的优势也是brpc高性能的基石之一。