muduo例子流程解析
muduo例子流程解析
muduo库架构
首先回顾一下muudo库的架构,如下图所示:
muduo库使用了Reactor模式,所谓Reactor模式,是有一个循环的过程,监听对应事件是否触发,触发时调用对应的callback进行处理。
这里的事件在muduo中包括Socket可读写事件、定时器事件。在其他网络库中如libevent也包括了signal、用户自定义事件等。
负责事件循环的部分在muduo命名为EventLoop
,其他库如netty、libevent也都有对应的组件。
负责监听事件是否触发的部分,在muduo中叫做Poller。muduo提供了epoll和poll两种来实现,默认是epoll实现。 通过环境变量MUDUO_USE_POLL来决定是否使用poll:
例子
下面用一个简单的例子来分析muduo库的基本使用以及运行流程。
1 |
|
这个例子是一个简单的echo服务器,当客户端发送数据时,服务器将数据原样返回。
一个典型的muduo的TcpServer工作流程如下:
- 建立一个事件循环器EventLoop
- 建立对应的业务服务器TcpServer
- 设置TcpServer的Callback
- 启动server
- 开启事件循环
陈硕认为,TCP网络编程的本质是处理三个半事件,即:
- 连接建立事件
- 连接断开事件:包括主动断开和被动断开
- 消息到达,文件描述符可读事件
- 消息发送完毕,文件描述符可写事件,这个算半个事件
连接建立
如果使用纯Linux api编写一个简单的TCP服务器,建立连接通常需要4步:
1 |
|
attention
注意下面所说的步骤指的是上图的代码方框编号,而且代码的方框编号不等于执行顺序。
在muduo中,TcpServer对象构建时,也就是图中编号1,TcpServer的属性acceptor同时也被建立,也就是图中编号5。
在Acceptor的构造函数中分别调用了socket函数和bind函数完成了创建socket和绑定地址。(在createNonblockingOrDie
函数中调用了socket函数,在bindAddress
中调用了bind
)
即,当TcpServer server(&loop, listenAddr);
执行结束时,监听socket已经建立好,并已绑定到对应地址和端口了。
而server.start()
主要做了两个工作:
- 在监听socket上启动listen函数(调用
Acceptor::listen()
函数),也就是步骤3; - 将监听socket的可读事件注册到EventLoop中。
1 |
|
然后调用loop_->loop()
,该函数就会循环的获取事件监听器的监听结果,并且根据监听结果调用注册在事件监听器上的Channel对象的事件处理函数。
说完主要的流程再看看Accepter
构造函数中绑定的Accepter::handleRead()
,当程序如果执行到了这个函数里面,说明acceptChannel_发生可读事件,程序处理新客户连接请求。该函数首先调用了Linux的函数accept()
接受新客户连接。接着调用了TcpServer::newConnection( )
函数,这个函数是在编号1中注册给Acceptor
并由成员变量newConnectionCallback_
保存。
TcpServer::newConnection()
的主要功能就是将建立好的连接进行封装(封装TcpConnection
对象),并使用选择算法公平的选择一个sub
EventLoop,并调用TcpConnection::connectEstablished()
将TcpConnection::channel_
注册到刚刚选择的sub
EventLoop上。
消息读取
在MainEventLoop中接受新连接请求之后,将这条Tcp连接封装成TcpConnection
对象。TcpConnection
对象的内容如上图所示,主要就是封装了连接套接字的fd(上图中的socket_
)、连接套接字的channel_
等。在TcpConnection
的构造函数中会将TcpConnection::handleRead()
等四个上图中的蓝色方法注册进这个channel_
内。
当TcpConnection
对象建立完毕之后,MainEventLoop的Acceptor
会将这个TcpConnection
对象中的channel_
注册到某一个SubEventLoop中。
消息读取逻辑:
如上图所示,SubEventLoop中的EventLoop::loop()
函数内部会循环的执行上图中的步骤1和步骤2。步骤1就是调用Poller::poll()方法获取事件监听结果,这个事件监听结果是一个Channel集合,每一个Channel封装着
[一个fd] 及 [fd感兴趣的事件] 和
[事件监听器监听到该fd实际发生的事件]。步骤2就是调用每一个Channel的Channel::HandlerEvent
方法。该方法会根据每一个Channel的感兴趣事件以及实际发生的事件调用提前注册在Channel内的对应的事件处理函数(readCallback_
、writeCallback_
、closeCallback_
、errorCallback_
)。
readCallback_
保存的函数其实是TcpConnection::handleRead()
,消息读取的处理逻辑也就是由这个函数提供的,我们稍微剖析一下这个函数:
1 |
|
TcpConnection::handleRead()
函数首先调用Buffer_.readFd(channel_->fd(), &saveErrno)
,该函数底层调用Linux的函数readv( )
,将Tcp接收缓冲区数据拷贝到用户定义的缓冲区中(inputBuffer_)
。如果在读取拷贝的过程中发生了什么错误,这个错误信息就会保存在savedErrno
中。
当readFd()
返回值大于0,说明从接收缓冲区中读取到了数据,那么会接着调用messageCallback_
中保存的用户自定义的读取消息后的处理函数。
readFd()
返回值等于0,说明客户端连接关闭,这时候应该调用TcpConnection::handleClose()
来处理连接关闭事件
readFd()
返回值等于-1,说明发生了错误,调用TcpConnection::handleError( )
来处理savedErrno
的错误事件。Moduo库只支持LT模式,所以读事件不会出现EAGAIN的错误,所以一旦出现错误,说明肯定是比较不好的非正常错误了。而EAGAIN
错误只不过是非阻塞IO调用时的一种常见错误而已。
消息发送
当用户调用了TcpConnetion::send(buf)
函数时,相当于要求muduo库把数据buf发送给该Tcp连接的客户端。此时该TcpConnection
注册在事件监听器上的感兴趣事件中是没有可写事件的。TcpConnection::send(buf)
函数内部其实是调用了Linux的函数write( )
。
- 如果TCP发送缓冲区能一次性容纳buf,那这个
write( )
函数将buf全部拷贝到发送缓冲区中。 - 如果TCP发送缓冲区内不能一次性容纳buf:
- 这时候write(
)函数buf数据尽可能地拷贝到TCP发送缓冲区中,并且将errno设置为
EWOULDBLOCK
。 - 剩余未拷贝到TCP发送缓冲区中的buf数据会被存放在TcpConnection::outputBuffer_中。并且向事件监听器上注册该
TcpConnection::channel_
的可写事件。 - 事件监听器监听到该Tcp连接可写事件,就会调用
TcpConnection::handleWrite( )
函数把TcpConnection::outputBuffer_
中剩余的数据发送出去。 - 在
TcpConnection::handleWrite( )
函数中,通过调用Buffer::writeFd()
函数将outputBuffer_
的数据写入到Tcp发送缓冲区,如果Tcp发送缓冲区能容纳全部剩余的未发送数据,那最好不过了。如果Tcp发送缓冲区依旧没法容纳剩余的未发送数据,那就尽可能地将数据拷贝到Tcp发送缓冲区中,继续保持可写事件的监听。 - 当数据全部拷贝到Tcp发送缓冲区之后,就会调用用户自定义的【写完后的事件处理函数】,并且移除该
TcpConnection
在事件监听器上的可写事件。(移除可写事件是为了提高效率,不会让epoll_wait()
毫无意义的频繁触发可写事件。因为大多数时候是没有数据需要发送的,频繁触发可写事件但又没有数据可写。)
- 这时候write(
)函数buf数据尽可能地拷贝到TCP发送缓冲区中,并且将errno设置为
连接断开
连接被动断开
服务端TcpConnection::handleRead()
中感知到客户端把连接断开了。
TcpConnection::handleRead( )
函数内部调用了Linux的函数readv( )
,当readv( )
返回0的时候,服务端就知道客户端断开连接了。然后就接着调用TcpConnection::handleClose( )
。
上图中的标号1、2、3是函数调用顺序,我们可以看到:
- 在执行
TcpConnection::handle_Close()
的时候,该函数还是在SubEventLoop线程中运行的,接着调用closeCallback_(connPtr)
回调函数,该函数保存的其实是TcpServer::removeConnection()
函数 TcpServer::removeConnection()
函数调用了remvoveConnectionInLoop()
函数,该函数的运行是在MainEventLoop线程中执行的,这里涉及到线程切换技术,后面再讲。removeConnectionInLoop()
函数:TcpServer
对象中有一个connections_
成员变量,这是一个unordered_map
,负责保存【string --> TcpConnection】的映射,其实就是保存着Tcp连接的名字到TcpConnection
对象的映射。因为这个Tcp连接要关闭了,所以也要把这个TcpConnection
对象从connections_
中删掉。然后再调用TcpConnection::connectDestroyed
函数。- 另外为什么
removeConnectionInLoop()
要在MainEventLoop中运行,因为该函数主要是从TcpServer
对象中删除某条数据。而TcpServer
对象是属于MainEventLoop的。这也是贯彻了One Loop Per Thread的理念。 TcpConnection::connectDestroyed()
函数的执行是又跳回到了subEventLoop线程中。该函数就是将Tcp连接的监听描述符从事件监听器中移除。另外SubEventLoop中的Poller类对象还保存着这条Tcp连接的channel_
,所以调用channel_.remove()
将这个Tcp连接的channel
对象从Poller内的数据结构中删除。
服务器主动关闭导致连接断开
当服务器主动关闭时,调用TcpServer::~TcpServer()析构函数。
info
TcpConnection用到智能指针管理,这里涉及到TcpConnection对象的多线程安全问题
这里在提示一下EventLoop::runInLoop()
函数的意义,假如你有一个EventLoop对象
loop_
,当你调用了loop_->runInLoop(function)
函数时,这个function函数的执行会在这个loop_
绑定的线程上运行!
所以我们画了下面这幅图,在创建TcpConnection
对象时,Acceptor
都要将这个对象分发给一个SubEventLoop来管理。这个TcpConnection
对象的一切函数执行都要在其管理的SubEventLoop线程中运行。再一次贯彻One
Loop Per
Thread的设计模式。比如要想彻底删除一个TcpConnection
对象,就必须要调用这个对象的connecDestroyed()
方法,这个方法执行完后才能释放这个对象的堆内存。每个TcpConnection
对象的connectDestroyed()
方法都必须在这个TcpConnection
对象所属的SubEventLoop绑定的线程中执行。
所有上面的TcpServer::~TcpServer()
函数就是干这事儿的,不断循环的让这个TcpConnection
对象所属的SubEventLoop线程执行TcpConnection::connectDestroyed()
函数,同时在MainEventLoop的TcpServer::~TcpServer()
函数中调用item.second.reset()
释放保管TcpConnection
对象的共享智能指针,以达到释放TcpConnection
对象的堆内存空间的目的。
但是这里面其实有一个问题需要解决,TcpConnection::connectDestroyed()
函数的执行以及这个TcpConnection
对象的堆内存释放操作不在同一个线程中运行,所以要考虑怎么保证一个TcpConnectino
对象的堆内存释放操作是在TcpConnection::connectDestroyed()
调用完后。
这个析构函数巧妙利用了共享智能指针的特点,当没有共享智能指针指向这个TcpConnection
对象时(引用计数为0),这个TcpConnection
对象就会被析构删除(堆内存释放)。
我们解读一下TcpServer::~TcpServer()中的代码逻辑:
1 |
|
- 首先
TcpServer::connections_
是一个unordered_map<string, TcpConnectionPtr>
,其中TcpConnectionPtr
的含义是指向TcpConnection
的shared_ptr
。 - 在一开始,每一个
TcpConnection
对象都被一个共享智能指针TcpConnetionPtr
持有,当执行了TcpConnectionPtr conn(item.second)
时,这个TcpConnetion
对象就被conn
和这个item.second
共同持有,但是这个conn
的生存周期很短,只要离开了当前的这一次for循环,conn
就会被释放。 - 紧接着调用
item.second.reset()
释放掉TcpServer
中保存的该TcpConnectino
对象的智能指针。此时在当前情况下,只剩下conn
还持有这个TcpConnection
对象,因此当前TcpConnection
对象还不会被析构。 - 接着调用了
conn->getLoop()->runInLoop(bind(&TcpConnection::connectDestroyed, conn));
- 这句话的含义是让SubEventLoop线程去执行
TcpConnection::connectDestroyed()
函数。当你把这个conn
的成员函数传进去的时候,conn
所指向的资源的引用计数会加1。因为传给runInLoop的不只有函数,还有这个函数所属的对象conn
。 - SubEventLoop线程开始运行
TcpConnection::connectDestroyed()
- MainEventLoop线程当前这一轮for循环跑完,共享智能指针conn离开代码块,因此被析构,但是
TcpConnection
对象还不会被释放,因为还有一个共享智能指针指向这个TcpConnection
对象,而且这个智能指针在TcpConnection::connectDestroyed()
中,只不过这个智能指针你看不到,它在这个函数中是一个隐式的this的存在。当这个函数执行完后,智能指针就真的被释放了。到此,就没有任何智能指针指向这个TcpConnection对象了。TcpConnection对象就彻底被析构删除了。
如果TcpConnection中有正在发送的数据,怎么保证在触发TcpConnection关闭机制后,能先让TcpConnection先把数据发送完再释放TcpConnection对象的资源?
这个问题就要好好参考这部分代码的设计了,这部分代码也是很值得吸收的精华。
1 |
|
我们先了解一下shared_from_this()
是什么意思,首先TcpConnection
类继承了一个类继承了这个类之后才能使用shared_from_this()函数。
1 |
|
假如我们在TcpConnection
对象(我们管这个对象叫TCA)中的成员函数中调用了shared_from_this()
,该函数可以返回一个shared_ptr
,并且这个shared_ptr
指向的对象是TCA
。
接着这个shared_ptr
就作为channel_
的Channel::tie()
函数的函数参数。
1 |
|
当事件监听器返回监听结果,就要对每一个发生事件的channel
对象调用他们的HandlerEvent()
函数。在这个HandlerEvent
函数中,会先把tie_
这个weak_ptr
提升为强共享智能指针。这个强共享智能指针会指向当前的TcpConnection
对象。就算你外面调用删除析构了其他所有的指向该TcpConnection
对象的智能指针。你只要HandleEventWithGuard()
函数没执行完,你这个TcpConnetion
对象都不会被析构释放堆内存。而HandleEventWithGuard()
函数里面就有负责处理消息发送事件的逻辑。当HandleEventWithGuard()
函数调用完毕,这个guard
智能指针就会被释放。