muduo例子流程解析

muduo例子流程解析

muduo库架构

首先回顾一下muudo库的架构,如下图所示:

muduo库使用了Reactor模式,所谓Reactor模式,是有一个循环的过程,监听对应事件是否触发,触发时调用对应的callback进行处理。

这里的事件在muduo中包括Socket可读写事件、定时器事件。在其他网络库中如libevent也包括了signal、用户自定义事件等。

负责事件循环的部分在muduo命名为EventLoop,其他库如netty、libevent也都有对应的组件。

负责监听事件是否触发的部分,在muduo中叫做Poller。muduo提供了epoll和poll两种来实现,默认是epoll实现。 通过环境变量MUDUO_USE_POLL来决定是否使用poll:

例子

下面用一个简单的例子来分析muduo库的基本使用以及运行流程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
int main()
{
EventLoop loop;
//这个EventLoop就是main EventLoop,即负责循环事件监听处理新用户连接事件的事件循环器。第一章概述篇的图2里面的EventLoop1就是我们的main EventLoop。

InetAddress addr(4567);
//InetAddress其实是对socket编程中的sockaddr_in进行封装,使其变为更友好简单的接口而已。

EchoServer server(&loop, addr, "EchoServer-01");
//EchoServer类,自己等一下往下翻一下。

server.start();
//启动TcpServer服务器

loop.loop(); //执行EventLoop::loop()函数,这个函数在概述篇的EventLoop小节有提及,自己去看一下!!
return 0;
}

class EchoServer
{
public:
EchoServer(EventLoop *loop,
const InetAddress &addr,
const std::string &name)
: server_(loop, addr, name)
, loop_(loop)
{
server_.setConnectionCallback(
std::bind(&EchoServer::onConnection, this, std::placeholders::_1)
);
// 将用户定义的连接事件处理函数注册进TcpServer中,TcpServer发生连接事件时会执行onConnection函数。

server_.setMessageCallback(
std::bind(&EchoServer::onMessage, this,
std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)
);
//将用户定义的可读事件处理函数注册进TcpServer中,TcpServer发生可读事件时会执行onMessage函数。


server_.setThreadNum(3);
//设置sub reactor数量,你这里设置为3,就和概述篇图2中的EventLoop2 EventLoop3 EventLoop4对应,有三个sub EventLoop。
}
void start(){
server_.start();
}
private:
void onConnection(const TcpConnectionPtr &conn)
{
//用户定义的连接事件处理函数:当服务端接收到新连接建立请求,则打印Connection UP,如果是关闭连接请求,则打印Connection Down
if (conn->connected())
LOG_INFO("Connection UP : %s", conn->peerAddress().toIpPort().c_str());
else
LOG_INFO("Connection DOWN : %s", conn->peerAddress().toIpPort().c_str());
}

void onMessage(const TcpConnectionPtr &conn,
Buffer *buf,
Timestamp time)
{
//用户定义的可读事件处理函数:当一个Tcp连接发生了可读事件就把它这个接收到的消息原封不动的还回去
std::string msg = buf->retrieveAllAsString();
conn->send(msg);
conn->shutdown();
}
EventLoop *loop_;
TcpServer server_;
};

这个例子是一个简单的echo服务器,当客户端发送数据时,服务器将数据原样返回。

一个典型的muduo的TcpServer工作流程如下:

  1. 建立一个事件循环器EventLoop
  2. 建立对应的业务服务器TcpServer
  3. 设置TcpServer的Callback
  4. 启动server
  5. 开启事件循环

陈硕认为,TCP网络编程的本质是处理三个半事件,即:

  1. 连接建立事件
  2. 连接断开事件:包括主动断开和被动断开
  3. 消息到达,文件描述符可读事件
  4. 消息发送完毕,文件描述符可写事件,这个算半个事件

连接建立

如果使用纯Linux api编写一个简单的TCP服务器,建立连接通常需要4步:

1
2
3
4
- 创建socket: socket() // 调用socket函数建立监听socket
- 绑定地址: bind() // 调用bind函数将socket和地址绑定
- 监听: listen() // 调用listen函数监听socket
- 接受连接: accept() // 调用accept函数接受连接

attention

注意下面所说的步骤指的是上图的代码方框编号,而且代码的方框编号不等于执行顺序。

在muduo中,TcpServer对象构建时,也就是图中编号1,TcpServer的属性acceptor同时也被建立,也就是图中编号5。 在Acceptor的构造函数中分别调用了socket函数和bind函数完成了创建socket绑定地址。(在createNonblockingOrDie函数中调用了socket函数,在bindAddress中调用了bind

即,当TcpServer server(&loop, listenAddr);执行结束时,监听socket已经建立好,并已绑定到对应地址和端口了。

server.start()主要做了两个工作:

  1. 在监听socket上启动listen函数(调用Acceptor::listen()函数),也就是步骤3;
  2. 将监听socket的可读事件注册到EventLoop中。
1
2
3
4
5
6
7
8
9
10
11
12
13
void TcpServer::start()
{
if (started_.getAndSet(1) == 0)
{
threadPool_->start(threadInitCallback_);

assert(!acceptor_->listenning());

//让这个EventLoop,也就是mainloop来执行Acceptor的listen函数,开启服务端监听
loop_->runInLoop(
boost::bind(&Acceptor::listen, get_pointer(acceptor_)));
}
}

然后调用loop_->loop(),该函数就会循环的获取事件监听器的监听结果,并且根据监听结果调用注册在事件监听器上的Channel对象的事件处理函数。

说完主要的流程再看看Accepter构造函数中绑定的Accepter::handleRead(),当程序如果执行到了这个函数里面,说明acceptChannel_发生可读事件,程序处理新客户连接请求。该函数首先调用了Linux的函数accept()接受新客户连接。接着调用了TcpServer::newConnection( )函数,这个函数是在编号1中注册给Acceptor并由成员变量newConnectionCallback_保存。

TcpServer::newConnection()的主要功能就是将建立好的连接进行封装(封装TcpConnection对象),并使用选择算法公平的选择一个sub EventLoop,并调用TcpConnection::connectEstablished()TcpConnection::channel_注册到刚刚选择的sub EventLoop上。

消息读取

在MainEventLoop中接受新连接请求之后,将这条Tcp连接封装成TcpConnection对象。TcpConnection对象的内容如上图所示,主要就是封装了连接套接字的fd(上图中的socket_)、连接套接字的channel_等。在TcpConnection的构造函数中会将TcpConnection::handleRead()等四个上图中的蓝色方法注册进这个channel_内。 当TcpConnection对象建立完毕之后,MainEventLoop的Acceptor会将这个TcpConnection对象中的channel_注册到某一个SubEventLoop中。

消息读取逻辑:

如上图所示,SubEventLoop中的EventLoop::loop()函数内部会循环的执行上图中的步骤1步骤2步骤1就是调用Poller::poll()方法获取事件监听结果,这个事件监听结果是一个Channel集合,每一个Channel封装着 [一个fd] 及 [fd感兴趣的事件] 和 [事件监听器监听到该fd实际发生的事件]。步骤2就是调用每一个Channel的Channel::HandlerEvent方法。该方法会根据每一个Channel的感兴趣事件以及实际发生的事件调用提前注册在Channel内的对应的事件处理函数(readCallback_writeCallback_closeCallback_errorCallback_)。

readCallback_保存的函数其实是TcpConnection::handleRead(),消息读取的处理逻辑也就是由这个函数提供的,我们稍微剖析一下这个函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
void TcpConnection::handleRead(TimeStamp receiveTime)
{
int savedErrno = 0;
ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno);
if(n > 0) //从fd读到了数据,并且放在了inputBuffer_上
{
messageCallback_(shared_from_this(), &inputBuffer_, receiveTime);
}
else if(n == 0)
handleClose();
else
{
errno = savedErrno;
LOG_ERROR("TcpConnection::handleRead");
handleError();
}
}

TcpConnection::handleRead()函数首先调用Buffer_.readFd(channel_->fd(), &saveErrno),该函数底层调用Linux的函数readv( ),将Tcp接收缓冲区数据拷贝到用户定义的缓冲区中(inputBuffer_)。如果在读取拷贝的过程中发生了什么错误,这个错误信息就会保存在savedErrno中。 当readFd()返回值大于0,说明从接收缓冲区中读取到了数据,那么会接着调用messageCallback_中保存的用户自定义的读取消息后的处理函数。 readFd()返回值等于0,说明客户端连接关闭,这时候应该调用TcpConnection::handleClose()来处理连接关闭事件 readFd()返回值等于-1,说明发生了错误,调用TcpConnection::handleError( )来处理savedErrno的错误事件。Moduo库只支持LT模式,所以读事件不会出现EAGAIN的错误,所以一旦出现错误,说明肯定是比较不好的非正常错误了。而EAGAIN错误只不过是非阻塞IO调用时的一种常见错误而已。

消息发送

当用户调用了TcpConnetion::send(buf)函数时,相当于要求muduo库把数据buf发送给该Tcp连接的客户端。此时该TcpConnection注册在事件监听器上的感兴趣事件中是没有可写事件的。TcpConnection::send(buf)函数内部其实是调用了Linux的函数write( )

  • 如果TCP发送缓冲区能一次性容纳buf,那这个write( )函数将buf全部拷贝到发送缓冲区中。
  • 如果TCP发送缓冲区内不能一次性容纳buf:
    • 这时候write( )函数buf数据尽可能地拷贝到TCP发送缓冲区中,并且将errno设置为EWOULDBLOCK
    • 剩余未拷贝到TCP发送缓冲区中的buf数据会被存放在TcpConnection::outputBuffer_中。并且向事件监听器上注册该TcpConnection::channel_的可写事件
    • 事件监听器监听到该Tcp连接可写事件,就会调用TcpConnection::handleWrite( )函数把TcpConnection::outputBuffer_中剩余的数据发送出去。
    • TcpConnection::handleWrite( )函数中,通过调用Buffer::writeFd()函数将outputBuffer_的数据写入到Tcp发送缓冲区,如果Tcp发送缓冲区能容纳全部剩余的未发送数据,那最好不过了。如果Tcp发送缓冲区依旧没法容纳剩余的未发送数据,那就尽可能地将数据拷贝到Tcp发送缓冲区中,继续保持可写事件的监听。
    • 当数据全部拷贝到Tcp发送缓冲区之后,就会调用用户自定义的【写完后的事件处理函数】,并且移除该TcpConnection在事件监听器上的可写事件。(移除可写事件是为了提高效率,不会让epoll_wait()毫无意义的频繁触发可写事件。因为大多数时候是没有数据需要发送的,频繁触发可写事件但又没有数据可写。)

连接断开

连接被动断开

服务端TcpConnection::handleRead()中感知到客户端把连接断开了。 TcpConnection::handleRead( )函数内部调用了Linux的函数readv( ),当readv( )返回0的时候,服务端就知道客户端断开连接了。然后就接着调用TcpConnection::handleClose( )

上图中的标号1、2、3是函数调用顺序,我们可以看到:

  1. 在执行TcpConnection::handle_Close()的时候,该函数还是在SubEventLoop线程中运行的,接着调用closeCallback_(connPtr)回调函数,该函数保存的其实是TcpServer::removeConnection()函数
  2. TcpServer::removeConnection()函数调用了remvoveConnectionInLoop()函数,该函数的运行是在MainEventLoop线程中执行的,这里涉及到线程切换技术,后面再讲。 removeConnectionInLoop()函数:TcpServer对象中有一个connections_成员变量,这是一个unordered_map,负责保存【string --> TcpConnection】的映射,其实就是保存着Tcp连接的名字到TcpConnection对象的映射。因为这个Tcp连接要关闭了,所以也要把这个TcpConnection对象从connections_中删掉。然后再调用TcpConnection::connectDestroyed函数。
  3. 另外为什么removeConnectionInLoop()要在MainEventLoop中运行,因为该函数主要是从TcpServer对象中删除某条数据。而TcpServer对象是属于MainEventLoop的。这也是贯彻了One Loop Per Thread的理念。
  4. TcpConnection::connectDestroyed()函数的执行是又跳回到了subEventLoop线程中。该函数就是将Tcp连接的监听描述符从事件监听器中移除。另外SubEventLoop中的Poller类对象还保存着这条Tcp连接的channel_,所以调用channel_.remove()将这个Tcp连接的channel对象从Poller内的数据结构中删除。

服务器主动关闭导致连接断开

当服务器主动关闭时,调用TcpServer::~TcpServer()析构函数。

info

TcpConnection用到智能指针管理,这里涉及到TcpConnection对象的多线程安全问题

这里在提示一下EventLoop::runInLoop()函数的意义,假如你有一个EventLoop对象 loop_,当你调用了loop_->runInLoop(function)函数时,这个function函数的执行会在这个loop_绑定的线程上运行!

所以我们画了下面这幅图,在创建TcpConnection对象时,Acceptor都要将这个对象分发给一个SubEventLoop来管理。这个TcpConnection对象的一切函数执行都要在其管理的SubEventLoop线程中运行。再一次贯彻One Loop Per Thread的设计模式。比如要想彻底删除一个TcpConnection对象,就必须要调用这个对象的connecDestroyed()方法,这个方法执行完后才能释放这个对象的堆内存。每个TcpConnection对象的connectDestroyed()方法都必须在这个TcpConnection对象所属的SubEventLoop绑定的线程中执行。

所有上面的TcpServer::~TcpServer()函数就是干这事儿的,不断循环的让这个TcpConnection对象所属的SubEventLoop线程执行TcpConnection::connectDestroyed()函数,同时在MainEventLoop的TcpServer::~TcpServer()函数中调用item.second.reset()释放保管TcpConnection对象的共享智能指针,以达到释放TcpConnection对象的堆内存空间的目的。

但是这里面其实有一个问题需要解决,TcpConnection::connectDestroyed()函数的执行以及这个TcpConnection对象的堆内存释放操作不在同一个线程中运行,所以要考虑怎么保证一个TcpConnectino对象的堆内存释放操作是在TcpConnection::connectDestroyed()调用完后。

这个析构函数巧妙利用了共享智能指针的特点,当没有共享智能指针指向这个TcpConnection对象时(引用计数为0),这个TcpConnection对象就会被析构删除(堆内存释放)。

我们解读一下TcpServer::~TcpServer()中的代码逻辑:

1
2
3
4
5
6
7
8
9
10
TcpServer::~TcpServer()
{
//connections类型为std::unordered_map<std::string, TcpConnectionPtr>;
for(auto &item : connections_)
{
TcpConnectionPtr conn(item.second);
item.second.reset();
conn->getLoop()->runInLoop(bind(&TcpConnection::connectDestroyed, conn));
}
}
  1. 首先TcpServer::connections_是一个unordered_map<string, TcpConnectionPtr>,其中TcpConnectionPtr的含义是指向TcpConnectionshared_ptr
  2. 在一开始,每一个TcpConnection对象都被一个共享智能指针TcpConnetionPtr持有,当执行了TcpConnectionPtr conn(item.second)时,这个TcpConnetion对象就被conn和这个item.second共同持有,但是这个conn的生存周期很短,只要离开了当前的这一次for循环,conn就会被释放。
  3. 紧接着调用item.second.reset()释放掉TcpServer中保存的该TcpConnectino对象的智能指针。此时在当前情况下,只剩下conn还持有这个TcpConnection对象,因此当前TcpConnection对象还不会被析构。
  4. 接着调用了conn->getLoop()->runInLoop(bind(&TcpConnection::connectDestroyed, conn));
  5. 这句话的含义是让SubEventLoop线程去执行TcpConnection::connectDestroyed()函数。当你把这个conn的成员函数传进去的时候,conn所指向的资源的引用计数会加1。因为传给runInLoop的不只有函数,还有这个函数所属的对象conn
  6. SubEventLoop线程开始运行TcpConnection::connectDestroyed()
  7. MainEventLoop线程当前这一轮for循环跑完,共享智能指针conn离开代码块,因此被析构,但是TcpConnection对象还不会被释放,因为还有一个共享智能指针指向这个TcpConnection对象,而且这个智能指针在TcpConnection::connectDestroyed()中,只不过这个智能指针你看不到,它在这个函数中是一个隐式的this的存在。当这个函数执行完后,智能指针就真的被释放了。到此,就没有任何智能指针指向这个TcpConnection对象了。TcpConnection对象就彻底被析构删除了。
如果TcpConnection中有正在发送的数据,怎么保证在触发TcpConnection关闭机制后,能先让TcpConnection先把数据发送完再释放TcpConnection对象的资源?

这个问题就要好好参考这部分代码的设计了,这部分代码也是很值得吸收的精华。

1
2
3
4
5
6
7
8
9
/***** TcpConnection.cc *****/
void TcpConnection::connectEstablished()
{
setState(kConnected);
channel_->tie(shared_from_this());
channel_->enableReading(); //向poller注册channel的epollin事件
//新连接建立,执行回调
connectionCallback_(shared_from_this());
}

我们先了解一下shared_from_this()是什么意思,首先TcpConnection类继承了一个类继承了这个类之后才能使用shared_from_this()函数。

1
class TcpConnection :public std::enable_shared_from_this<TcpConnection>

假如我们在TcpConnection对象(我们管这个对象叫TCA)中的成员函数中调用了shared_from_this(),该函数可以返回一个shared_ptr,并且这个shared_ptr指向的对象是TCA

接着这个shared_ptr就作为channel_Channel::tie()函数的函数参数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/*****  Channel.h   ******/
std::weak_ptr<void> tie_;
/***** Channel.cc ******/
void Channel::tie(const shared_ptr<void>& obj)
{
tie_ = obj;
tied_ = true;
}
void Channel::HandlerEvent(TimeStamp receiveTime)
{
if(tied_){
shared_ptr<void> guard = tie_.lock();
if (guard)
HandleEventWithGuard(receiveTime);
}
else{
。。。。一般不会执行到这里其实。我实在想不到正常运行的情况下怎么会执行到这里,可能是我比较菜。
HandleEventWithGuard(receiveTime);
}
}

当事件监听器返回监听结果,就要对每一个发生事件的channel对象调用他们的HandlerEvent()函数。在这个HandlerEvent函数中,会先把tie_这个weak_ptr提升为强共享智能指针。这个强共享智能指针会指向当前的TcpConnection对象。就算你外面调用删除析构了其他所有的指向该TcpConnection对象的智能指针。你只要HandleEventWithGuard()函数没执行完,你这个TcpConnetion对象都不会被析构释放堆内存。而HandleEventWithGuard()函数里面就有负责处理消息发送事件的逻辑。当HandleEventWithGuard()函数调用完毕,这个guard智能指针就会被释放。

转载自万字长文梳理Muduo库核心代码及优秀编程细节思想剖析


muduo例子流程解析
https://gstarmin.github.io/2023/09/19/muduo例子流程解析/
作者
Starmin
发布于
2023年9月19日
更新于
2023年9月23日
许可协议