muduo例子流程解析
muduo例子流程解析
muduo库架构
首先回顾一下muudo库的架构,如下图所示:

muduo库使用了Reactor模式,所谓Reactor模式,是有一个循环的过程,监听对应事件是否触发,触发时调用对应的callback进行处理。
这里的事件在muduo中包括Socket可读写事件、定时器事件。在其他网络库中如libevent也包括了signal、用户自定义事件等。
负责事件循环的部分在muduo命名为EventLoop,其他库如netty、libevent也都有对应的组件。
负责监听事件是否触发的部分,在muduo中叫做Poller。muduo提供了epoll和poll两种来实现,默认是epoll实现。 通过环境变量MUDUO_USE_POLL来决定是否使用poll:
例子
下面用一个简单的例子来分析muduo库的基本使用以及运行流程。
1 | |
这个例子是一个简单的echo服务器,当客户端发送数据时,服务器将数据原样返回。
一个典型的muduo的TcpServer工作流程如下:
- 建立一个事件循环器EventLoop
- 建立对应的业务服务器TcpServer
- 设置TcpServer的Callback
- 启动server
- 开启事件循环
陈硕认为,TCP网络编程的本质是处理三个半事件,即:
- 连接建立事件
- 连接断开事件:包括主动断开和被动断开
- 消息到达,文件描述符可读事件
- 消息发送完毕,文件描述符可写事件,这个算半个事件
连接建立

如果使用纯Linux api编写一个简单的TCP服务器,建立连接通常需要4步:
1 | |
attention
注意下面所说的步骤指的是上图的代码方框编号,而且代码的方框编号不等于执行顺序。
在muduo中,TcpServer对象构建时,也就是图中编号1,TcpServer的属性acceptor同时也被建立,也就是图中编号5。
在Acceptor的构造函数中分别调用了socket函数和bind函数完成了创建socket和绑定地址。(在createNonblockingOrDie函数中调用了socket函数,在bindAddress中调用了bind)
即,当TcpServer server(&loop, listenAddr);执行结束时,监听socket已经建立好,并已绑定到对应地址和端口了。
而server.start()主要做了两个工作:
- 在监听socket上启动listen函数(调用
Acceptor::listen()函数),也就是步骤3; - 将监听socket的可读事件注册到EventLoop中。
1 | |
然后调用loop_->loop(),该函数就会循环的获取事件监听器的监听结果,并且根据监听结果调用注册在事件监听器上的Channel对象的事件处理函数。
说完主要的流程再看看Accepter构造函数中绑定的Accepter::handleRead(),当程序如果执行到了这个函数里面,说明acceptChannel_发生可读事件,程序处理新客户连接请求。该函数首先调用了Linux的函数accept()接受新客户连接。接着调用了TcpServer::newConnection( )函数,这个函数是在编号1中注册给Acceptor并由成员变量newConnectionCallback_保存。
TcpServer::newConnection()的主要功能就是将建立好的连接进行封装(封装TcpConnection对象),并使用选择算法公平的选择一个sub
EventLoop,并调用TcpConnection::connectEstablished()将TcpConnection::channel_注册到刚刚选择的sub
EventLoop上。
消息读取

在MainEventLoop中接受新连接请求之后,将这条Tcp连接封装成TcpConnection对象。TcpConnection对象的内容如上图所示,主要就是封装了连接套接字的fd(上图中的socket_)、连接套接字的channel_等。在TcpConnection的构造函数中会将TcpConnection::handleRead()等四个上图中的蓝色方法注册进这个channel_内。
当TcpConnection对象建立完毕之后,MainEventLoop的Acceptor会将这个TcpConnection对象中的channel_注册到某一个SubEventLoop中。
消息读取逻辑:

如上图所示,SubEventLoop中的EventLoop::loop()函数内部会循环的执行上图中的步骤1和步骤2。步骤1就是调用Poller::poll()方法获取事件监听结果,这个事件监听结果是一个Channel集合,每一个Channel封装着
[一个fd] 及 [fd感兴趣的事件] 和
[事件监听器监听到该fd实际发生的事件]。步骤2就是调用每一个Channel的Channel::HandlerEvent方法。该方法会根据每一个Channel的感兴趣事件以及实际发生的事件调用提前注册在Channel内的对应的事件处理函数(readCallback_、writeCallback_、closeCallback_、errorCallback_)。
readCallback_保存的函数其实是TcpConnection::handleRead(),消息读取的处理逻辑也就是由这个函数提供的,我们稍微剖析一下这个函数:
1 | |
TcpConnection::handleRead()函数首先调用Buffer_.readFd(channel_->fd(), &saveErrno),该函数底层调用Linux的函数readv( ),将Tcp接收缓冲区数据拷贝到用户定义的缓冲区中(inputBuffer_)。如果在读取拷贝的过程中发生了什么错误,这个错误信息就会保存在savedErrno中。
当readFd()返回值大于0,说明从接收缓冲区中读取到了数据,那么会接着调用messageCallback_中保存的用户自定义的读取消息后的处理函数。
readFd()返回值等于0,说明客户端连接关闭,这时候应该调用TcpConnection::handleClose()来处理连接关闭事件
readFd()返回值等于-1,说明发生了错误,调用TcpConnection::handleError( )来处理savedErrno的错误事件。Moduo库只支持LT模式,所以读事件不会出现EAGAIN的错误,所以一旦出现错误,说明肯定是比较不好的非正常错误了。而EAGAIN错误只不过是非阻塞IO调用时的一种常见错误而已。
消息发送
当用户调用了TcpConnetion::send(buf)函数时,相当于要求muduo库把数据buf发送给该Tcp连接的客户端。此时该TcpConnection注册在事件监听器上的感兴趣事件中是没有可写事件的。TcpConnection::send(buf)函数内部其实是调用了Linux的函数write( )。
- 如果TCP发送缓冲区能一次性容纳buf,那这个
write( )函数将buf全部拷贝到发送缓冲区中。 - 如果TCP发送缓冲区内不能一次性容纳buf:
- 这时候write(
)函数buf数据尽可能地拷贝到TCP发送缓冲区中,并且将errno设置为
EWOULDBLOCK。 - 剩余未拷贝到TCP发送缓冲区中的buf数据会被存放在TcpConnection::outputBuffer_中。并且向事件监听器上注册该
TcpConnection::channel_的可写事件。 - 事件监听器监听到该Tcp连接可写事件,就会调用
TcpConnection::handleWrite( )函数把TcpConnection::outputBuffer_中剩余的数据发送出去。 - 在
TcpConnection::handleWrite( )函数中,通过调用Buffer::writeFd()函数将outputBuffer_的数据写入到Tcp发送缓冲区,如果Tcp发送缓冲区能容纳全部剩余的未发送数据,那最好不过了。如果Tcp发送缓冲区依旧没法容纳剩余的未发送数据,那就尽可能地将数据拷贝到Tcp发送缓冲区中,继续保持可写事件的监听。 - 当数据全部拷贝到Tcp发送缓冲区之后,就会调用用户自定义的【写完后的事件处理函数】,并且移除该
TcpConnection在事件监听器上的可写事件。(移除可写事件是为了提高效率,不会让epoll_wait()毫无意义的频繁触发可写事件。因为大多数时候是没有数据需要发送的,频繁触发可写事件但又没有数据可写。)
- 这时候write(
)函数buf数据尽可能地拷贝到TCP发送缓冲区中,并且将errno设置为
连接断开
连接被动断开
服务端TcpConnection::handleRead()中感知到客户端把连接断开了。
TcpConnection::handleRead( )函数内部调用了Linux的函数readv( ),当readv( )返回0的时候,服务端就知道客户端断开连接了。然后就接着调用TcpConnection::handleClose( )。

上图中的标号1、2、3是函数调用顺序,我们可以看到:
- 在执行
TcpConnection::handle_Close()的时候,该函数还是在SubEventLoop线程中运行的,接着调用closeCallback_(connPtr)回调函数,该函数保存的其实是TcpServer::removeConnection()函数 TcpServer::removeConnection()函数调用了remvoveConnectionInLoop()函数,该函数的运行是在MainEventLoop线程中执行的,这里涉及到线程切换技术,后面再讲。removeConnectionInLoop()函数:TcpServer对象中有一个connections_成员变量,这是一个unordered_map,负责保存【string --> TcpConnection】的映射,其实就是保存着Tcp连接的名字到TcpConnection对象的映射。因为这个Tcp连接要关闭了,所以也要把这个TcpConnection对象从connections_中删掉。然后再调用TcpConnection::connectDestroyed函数。- 另外为什么
removeConnectionInLoop()要在MainEventLoop中运行,因为该函数主要是从TcpServer对象中删除某条数据。而TcpServer对象是属于MainEventLoop的。这也是贯彻了One Loop Per Thread的理念。 TcpConnection::connectDestroyed()函数的执行是又跳回到了subEventLoop线程中。该函数就是将Tcp连接的监听描述符从事件监听器中移除。另外SubEventLoop中的Poller类对象还保存着这条Tcp连接的channel_,所以调用channel_.remove()将这个Tcp连接的channel对象从Poller内的数据结构中删除。
服务器主动关闭导致连接断开
当服务器主动关闭时,调用TcpServer::~TcpServer()析构函数。
info
TcpConnection用到智能指针管理,这里涉及到TcpConnection对象的多线程安全问题
这里在提示一下EventLoop::runInLoop()函数的意义,假如你有一个EventLoop对象
loop_,当你调用了loop_->runInLoop(function)函数时,这个function函数的执行会在这个loop_绑定的线程上运行!
所以我们画了下面这幅图,在创建TcpConnection对象时,Acceptor都要将这个对象分发给一个SubEventLoop来管理。这个TcpConnection对象的一切函数执行都要在其管理的SubEventLoop线程中运行。再一次贯彻One
Loop Per
Thread的设计模式。比如要想彻底删除一个TcpConnection对象,就必须要调用这个对象的connecDestroyed()方法,这个方法执行完后才能释放这个对象的堆内存。每个TcpConnection对象的connectDestroyed()方法都必须在这个TcpConnection对象所属的SubEventLoop绑定的线程中执行。

所有上面的TcpServer::~TcpServer()函数就是干这事儿的,不断循环的让这个TcpConnection对象所属的SubEventLoop线程执行TcpConnection::connectDestroyed()函数,同时在MainEventLoop的TcpServer::~TcpServer()函数中调用item.second.reset()释放保管TcpConnection对象的共享智能指针,以达到释放TcpConnection对象的堆内存空间的目的。
但是这里面其实有一个问题需要解决,TcpConnection::connectDestroyed()函数的执行以及这个TcpConnection对象的堆内存释放操作不在同一个线程中运行,所以要考虑怎么保证一个TcpConnectino对象的堆内存释放操作是在TcpConnection::connectDestroyed()调用完后。
这个析构函数巧妙利用了共享智能指针的特点,当没有共享智能指针指向这个TcpConnection对象时(引用计数为0),这个TcpConnection对象就会被析构删除(堆内存释放)。
我们解读一下TcpServer::~TcpServer()中的代码逻辑:
1 | |
- 首先
TcpServer::connections_是一个unordered_map<string, TcpConnectionPtr>,其中TcpConnectionPtr的含义是指向TcpConnection的shared_ptr。 - 在一开始,每一个
TcpConnection对象都被一个共享智能指针TcpConnetionPtr持有,当执行了TcpConnectionPtr conn(item.second)时,这个TcpConnetion对象就被conn和这个item.second共同持有,但是这个conn的生存周期很短,只要离开了当前的这一次for循环,conn就会被释放。 - 紧接着调用
item.second.reset()释放掉TcpServer中保存的该TcpConnectino对象的智能指针。此时在当前情况下,只剩下conn还持有这个TcpConnection对象,因此当前TcpConnection对象还不会被析构。 - 接着调用了
conn->getLoop()->runInLoop(bind(&TcpConnection::connectDestroyed, conn)); - 这句话的含义是让SubEventLoop线程去执行
TcpConnection::connectDestroyed()函数。当你把这个conn的成员函数传进去的时候,conn所指向的资源的引用计数会加1。因为传给runInLoop的不只有函数,还有这个函数所属的对象conn。 - SubEventLoop线程开始运行
TcpConnection::connectDestroyed() - MainEventLoop线程当前这一轮for循环跑完,共享智能指针conn离开代码块,因此被析构,但是
TcpConnection对象还不会被释放,因为还有一个共享智能指针指向这个TcpConnection对象,而且这个智能指针在TcpConnection::connectDestroyed()中,只不过这个智能指针你看不到,它在这个函数中是一个隐式的this的存在。当这个函数执行完后,智能指针就真的被释放了。到此,就没有任何智能指针指向这个TcpConnection对象了。TcpConnection对象就彻底被析构删除了。
如果TcpConnection中有正在发送的数据,怎么保证在触发TcpConnection关闭机制后,能先让TcpConnection先把数据发送完再释放TcpConnection对象的资源?
这个问题就要好好参考这部分代码的设计了,这部分代码也是很值得吸收的精华。
1 | |
我们先了解一下shared_from_this()是什么意思,首先TcpConnection类继承了一个类继承了这个类之后才能使用shared_from_this()函数。
1 | |
假如我们在TcpConnection对象(我们管这个对象叫TCA)中的成员函数中调用了shared_from_this(),该函数可以返回一个shared_ptr,并且这个shared_ptr指向的对象是TCA。
接着这个shared_ptr就作为channel_的Channel::tie()函数的函数参数。
1 | |
当事件监听器返回监听结果,就要对每一个发生事件的channel对象调用他们的HandlerEvent()函数。在这个HandlerEvent函数中,会先把tie_这个weak_ptr提升为强共享智能指针。这个强共享智能指针会指向当前的TcpConnection对象。就算你外面调用删除析构了其他所有的指向该TcpConnection对象的智能指针。你只要HandleEventWithGuard()函数没执行完,你这个TcpConnetion对象都不会被析构释放堆内存。而HandleEventWithGuard()函数里面就有负责处理消息发送事件的逻辑。当HandleEventWithGuard()函数调用完毕,这个guard智能指针就会被释放。