muduo库-TCP连接

muduo库-TCP连接

TCP连接中,两端是对等的,TCP协议也没有区分客户端(client)与服务器端(server),但互联网中通信中,往往有一端提供资源给另一端访问,我们把拥有资源的一端称为服务器端,请求资源的一端称为客户端。

对于server,会启用一个监听循环,不断接受client连接请求(三次握手),进行数据通信,通信完成以后断开连接(四次挥手);对于client,在server启用监听循环时,向server发出连接请求,接收server数据,如有必要向server发送数据,通信完成后,断开连接。连接是指物理上一端(client)到另一端(server)的通信链路,通过server端<ip, port>与客户端<ip, port>,来唯一标识一个TCP连接。

TCP连接有长连接、短连接之分

短连接:client和server建立连接后,一般只传递一次读写操作,然后由client发起close,发送FIN分节,关闭连接。短连接只完成一次read/write操作,就会自动关闭。

长连接:client和server建立连接后,并不会自动关闭,后续的read/write操作会继续用这个连接。长连接没有确切时间限制,可能会长时间存在。

TcpServer类

muduo使用TcpConnection类来管理TCP连接,使用接受器Acceptor来接受连接,连接器Connector发起连接。TcpServer管理accept获得TcpConnection,生命周期由用户控制。

下图是TcpServer新建连接的相关函数调用顺序。当Channel::handleEvent()的触发条件是listening socket可读时,表明有新连接请求达到。TcpServer为新连接创建对应的TcpConnection对象。

Acceptor类

Acceptor是TcpServer的一个内部类,主要职责是用来获得新连接的fd。Accetpor封装了服务器监听套接字fd以及相关处理方法。Acceptor类内部其实没有贡献什么核心的处理函数,主要是对其他类的方法调用进行封装。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/**
* TCP连接接受器
* 基础调用为accept(2)/accept4(2)
*/
class Acceptor : private noncopyable
{
public:
typedef std::function<void(int sockfd, const InetAddress &)> NewConnectionCallback;

Acceptor(EventLoop* loop, const InetAddress& listenAddr, bool reuseport);
~Acceptor();

/* 设置新连接回调 */
void setNewConnectionCallback(const NewConnectionCallback& cb)
{ newConnectionCallback_ = cb; }

/* 监听本地端口 */
void listen();

/* 判断当前是否正在监听端口 */
bool listening() const { return listening_; }

private:
void handleRead(); // 处理读事件

EventLoop *loop_; // 所属EventLoop
Socket acceptSocket_; // 专门用于接受连接的套接字(sock fd)
Channel acceptChannel_; // 专门接受连接通道, 监听conn fd
NewConnectionCallback newConnectionCallback_; // 新建连接回调
bool listening_; // 监听状态
int idleFd_; // 空闲fd, 用于fd资源不够用时, 可以空一个出来作为新建连接conn fd
};

如果fd资源不够用了,导致accept(2)/accept4(2)创建连接失败,比如达到系统上限,怎么办?

Accetor用了这样一种技术:先申请一个空闲的fd(idleFd_),等到发生由于fd资源不够用时,就把这个备用fd暂时用于accept接收连接,然后再马上关闭,以防止不断产生可读事件(连接请求),从而回调相同的失败代码。及早建立连接后并关闭连接,让程序不会频繁响应同一个连接请求。

Acceptor封装的重要成员变量

  • acceptSocket_:服务器监听套接字的文件描述符
  • acceptChannel_:是一个Channel类,把acceptSocket_及其感兴趣事件和事件对应的处理函数都封装进去。
  • EventLoop *loop:监听套接字的fd由哪个EventLoop负责循环监听以及处理相应事件,其实这个EventLoop就是main EventLoop。
  • newConnectionCallback_: TcpServer构造函数中将TcpServer::newConnection()函数注册给了这个成员变量。这个TcpServer::newConnection函数的功能是公平的选择一个subEventLoop,并把已经接受的连接分发给这个subEventLoop。

Acceptor封装的重要成员方法

  • listen( ):该函数底层调用了linux的函数listen( ),开启对acceptSocket_的监听同时将acceptChannel及其感兴趣事件(可读事件)注册到main EventLoop的事件监听器上。换言之就是让main EventLoop事件监听器去监听acceptSocket_
  • handleRead( ):这是一个私有成员方法,这个方法是要注册到acceptChannel_上的, 同时handleRead( )方法内部还调用了成员变量newConnectionCallback_保存的函数。当main EventLoop监听到acceptChannel_上发生了可读事件时(新用户连接事件),就是调用这个handleRead( )方法。

简单来说,handleRead( )最终实现的功能就是接受新连接,并且以负载均衡的选择方式选择一个sub EventLoop,并把这个新连接分发到这个subEventLoop上。

Acceptor构造与析构

Acceptor构造时,创建sockfd(套接字),待后续交给TcpServer来start监听套接字。空闲fd指向文件"/dev/null",用来解决服务器fd资源耗尽问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// Acceptor.cc
Acceptor::Acceptor(EventLoop *loop, const InetAddress &listenAddr, bool reuseport)
: loop_(loop),
acceptSocket_(sockets::createNonblockingOrDie(listenAddr.family())),
acceptChannel_(loop, acceptSocket_.fd()),
listening_(false),
idleFd_(::open("/dev/null", O_RDONLY | O_CLOEXEC)) // 申请空闲fd
{
assert(idleFd_ >= 0);
acceptSocket_.setReuseAddr(true);
acceptSocket_.setReusePort(reuseport);
acceptSocket_.bindAddress(listenAddr);
acceptChannel_.setReadCallback(
std::bind(&Acceptor::handleRead, this));
}

Acceptor::~Acceptor()
{
acceptChannel_.disableAll(); // disable all event of the channel
acceptChannel_.remove(); // remove the channel from poller
::close(idleFd_);
}

Acceptor监听

Acceptor包含2类监听:

  1. 监听套接字,即本地ip地址&端口。
  2. 监听通道事件,读事件。

为什么不在构造时,就调用listen监听sockfd呢?

将非必要资源的初始化,延迟到需要时,用户可以通过调用TcpSever::start()来启动。这样,用户可以更灵活控制资源的申请和释放。

Acceptor接受连接

Acceptor内部有一个Channel成员,当Poller监听到有Tcp连接请求时,就通过Channel的可读事件,在loop线程,来回调Acceptor::handleRead()。从而将conn fd和IP地址传递给上一层TcpServer,用于创建TcpConnection对象管理Tcp连接。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// Acceptor.cc
/**
* 处理读Channel事件, accept连接
* @note 先accept, 然后将相关资源通过回调交由上一层的TcpServer进行处理(管理)
*/
void Acceptor::handleRead()
{
loop_->assertInLoopThread();
InetAddress peerAddr;
// FIXME loop until no more
int connfd = acceptSocket_.accept(&peerAddr); // 获取连接fd及对端ip地址
if (connfd >= 0)
{
if (newConnectionCallback_)
{ // 创建新连接回调
newConnectionCallback_(connfd, peerAddr);
}
else
{
sockets::close(connfd);
}
}
else
{ // 错误
LOG_SYSERR << "in Acceptor::handleRead";
/*
* Read the section named "The special problem of
* accept()ing when you can't" in libev's doc.
* By Marc Lehmann, author of libev.
*
* The per-process limit of open file descriptors has been reached.
*/
if (errno == EMFILE)
{ // 文件描述符资源耗尽错误
::close(idleFd_);
idleFd_ = ::accept(acceptSocket_.fd(), NULL, NULL);
::close(idleFd_);
// reopen /dev/null, it dose not matter whether it succeeds or fails.
idleFd_ = ::open("/dev/null", O_RDONLY | O_CLOEXEC);
}
}
}

TcpServer类

TcpServer类管理TcpConnection,供用户直接使用,生命周期由用户控制。接口如下,用户只需要设置好callback,然后调用start()即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
// TcpServer.h
/**
* Tcp Server, 支持单线程和thread-poll模型.
* 接口类, 因此不要暴露太多细节.
*/
class TcpServer : private noncopyable
{
public:
typedef std::function<void (EventLoop*)> ThreadInitCallback;
enum Option
enum Option
{
kNoReusePort, // 不允许重用本地端口
kReusePort, // 允许重用本地端口
};

// TcpServer(EventLoop* loop, const InetAddress& listenAddr);
TcpServer(EventLoop* loop,
const InetAddress& listenAddr,
const std::string& nameArg,
Option option = kNoReusePort);
~TcpServer(); // force out-line dtor, for std::unique_ptr members.

/**
* 如果没有监听, 就启动服务器(监听).
* 多次调用没有副作用.
* 线程安全.
*/
void start();

/**
* 设置连接回调.
* 非线程安全.
*/
void setConnectionCallback(const ConnectionCallback& cb)
{ connectionCallback_ = cb; }

/**
* 设置消息回调.
* 非线程安全.
*/
void setMessageCallback(const MessageCallback & cb)
{ messageCallback_ = cb; }

/**
* 设置写完成回调.
* 非线程安全.
*/
void setWriteCompleteCallback(const WriteCompleteCallback& cb)
{ writeCompleteCallback_ = cb; }

note

并没有连接关闭的回调,这是由TcpServer::removeConnection()负责的,进而把工作转交给TcpConnection::connectDestroyed(),用户不可更改设置。

TcpServer的构造与析构

TcpServer构造函数主要工作是为成员申请资源,为各回调设置回调函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
TcpServer::TcpServer(EventLoop* loop,
const InetAddress& listenAddr,
const std::string& nameArg,
Option option)
: loop_(CHECK_NOTNULL(loop)), // 确保loop非空
ipPort_(listenAddr.toIpPort()), // 将Ip, port转换为字符串
name_(nameArg), // 名称
acceptor_(new Acceptor(loop, listenAddr, option == kReusePort)),
threadPool_(new EventLoopThreadPool(loop, name_)), // 初始化事件循环线程池
connectionCallback_(defaultConnectionCallback), // 连接回调为默认连接回调
messageCallback_(defaultMessageCallback), // 消息回调为默认消息回调
nextConnId_(1) // 连接id
{
acceptor_->setNewConnectionCallback(
std::bind(&TcpServer::newConnection, this, _1, _2)); // 设置新建连接时的回调
}

同样是连接回调,TcpServer::newConnection()和connectionCallback_有何区别?

前者是Acceptor发生连接请求事件时,回调,用来新建一个Tcp连接;后者是在TcpServer内部新建连接即调用TcpServer::newConnection()时,回调connectionCallback_。

TcpServer析构工作内容很简单,主要销毁ConnectionMap中所有Tcp连接,而每个Tcp连接用的是一个TcpConnection对象来管理的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* 析构TcpServer对象, 销毁ConnectionMap中所有连接
*/
TcpServer::~TcpServer()
{
loop_->assertInLoopThread();
LOG_TRACE << "TcpServer::~TcpServer [" << name_ << "] destructing";


// reset all connection of @c connections_
for (auto& item : connections_)
{
TcpConnectionPtr conn(item.second); // shared_ptr manage TcpConnection
item.second.reset();
conn->getLoop()->runInLoop(
std::bind(&TcpConnection::connectDestroyed, conn));
}
}

TcpServer启动Tcp服务器,主要完成

  1. 线程池的启动;
  2. Acceptor监听Tcp连接请求。

线程池需要指定其初始数量,当然,这需要在start()之前调用TcpServer::setThreadNum()设置。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* 启动TcpServer, 初始化线程池, 连接接受器Accept开始监听(Tcp连接请求)
*/
void TcpServer::start()
{
if (started_.getAndSet(1) == 0)
{
threadPool_->start(threadInitCallback_);


assert(!acceptor_->listening());
loop_->runInLoop(
std::bind(&Acceptor::listen, get_pointer(acceptor_)));
}
}

TcpServer如何获得新连接的connfd(accept返回值)?

TcpServer内部用Acceptor,保存用户提供的Connection-Callback和MessageCallback,新建TcpConnection对象(newConn())的时候直接传递给TcpConnection的构造函数

如何创建TcpConnection对象?

新连接请求到达时,Acceptor回调newConnection(),通过TcpServer::newConnection创建一个新TcpConnection对象,用于管理一个Tcp连接。

即,TcpServer::newConnection回调顺序 EventLoop => Channel => Acceptor => TcpServer

下面是用于创建TcpConnection对象的函数TcpServer::newConnection()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// TcpServer.cc
/**
* 新建一个TcpConnection对象, 用于连接管理.
* @details 新建的TcpConnection对象会加入内部ConnectionMap.
* @param sockfd accept返回的连接fd (accepted socket fd)
* @param peerAddr 对端ip地址信息
* @note 必须在所属loop线程运行
*/
void TcpServer::newConnection(int sockfd, const InetAddress &peerAddr)
{
loop_->assertInLoopThread();
/* 从EventLoop线程池中,取出一个EventLoop对象构造TcpConnection对象,便于均衡各EventLoop负责的连接数 */
EventLoop* ioLoop = threadPool_->getNextLoop(); // next event loop from the event loop thread pool

/* 设置连接对象名称, 包含基础名称+ip地址+端口号+连接Id
* 因为要作为ConnectionMap的key, 要确保运行时唯一性 */
char buf[32];
snprintf(buf, sizeof(buf), "-%s#%d", ipPort_.c_str(), nextConnId_);
++nextConnId_;
std::string connName = name_ + buf;

LOG_INFO << "TcpServer::newConnection [" << name_
<< "] - new connection [" << connName
<< "] from " << peerAddr.toIpPort();

InetAddress localAddr(sockets::getLocalAddr(sockfd)); // 本地ip地址信息
// FIXME poll with zero timeout to double confirm the new connection
// FIXME use make_shared if necessary
/* 新建TcpConnection对象, 并加入ConnectionMap */
TcpConnectionPtr conn(new TcpConnection(ioLoop, connName, sockfd, localAddr, peerAddr));
connections_[connName] = conn;
/* 为新建TcpConnection对象设置各种回调 */
conn->setConnectionCallback(connectionCallback_); // 连接回调
conn->setMessageCallback(messageCallback_); // 消息回调
conn->setWriteCompleteCallback(writeCompleteCallback_); // 写完成回调
conn->setCloseCallback( // 关闭连接回调
std::bind(&TcpServer::removeConnection, this, _1)); // FIXME: unsafe
/* 确认连接是否已建立, 并初始化连接建立后的状态 */
ioLoop->runInLoop(std::bind(&TcpConnection::connectEstablished, conn));
}

TcpConnection类

这个类主要封装了一个已建立的TCP连接,以及控制该TCP连接的方法(连接建立和关闭和销毁),以及该连接发生的各种事件(读/写/错误/连接)对应的处理函数,以及这个TCP连接的服务端和客户端的套接字地址信息等。

TcpConnection类是唯一默认用shared_ptr来管理的类,唯一继承自enable_shared_from_this的类。这是因为其生命周期模糊:可能在连接断开时,还有其他地方持有它的引用,贸然delete会造成空悬指针。只有确保其他地方没有持有该对象的引用的时候,才能安全地销毁对象。

我个人觉得TcpConnection类和Acceptor类是兄弟关系,Acceptor用于main EventLoop中,对服务器监听套接字fd及其相关方法进行封装(监听、接受连接、分发连接给SubEventLoop等),TcpConnection用于SubEventLoop中,对连接套接字fd及其相关方法进行封装(读消息事件、发送消息事件、连接关闭事件、错误事件等)。

TcpConnection的重要变量

  • ocket_:用于保存已连接套接字文件描述符。
  • channel_:封装了上面的socket_及其各类事件的处理函数(读、写、错误、关闭等事件处理函数)。这个Channel种保存的各类事件的处理函数是在TcpConnection对象构造函数中注册的。
  • loop_:这是一个EventLoop*类型,该Tcp连接的Channel注册到了哪一个sub EventLoop上。这个loop_就是那一个sub EventLoop。
  • inputBuffer_:这是一个Buffer类,是该TCP连接对应的用户接收缓冲区。
  • outputBuffer_:也是一个Buffer类,不过是用于暂存那些暂时发送不出去的待发送数据。因为Tcp发送缓冲区是有大小限制的,假如达到了高水位线,就没办法把发送的数据通过send()直接拷贝到Tcp发送缓冲区,而是暂存在这个outputBuffer_中,等TCP发送缓冲区有空间了,触发可写事件了,再把outputBuffer_中的数据拷贝到Tcp发送缓冲区中。
  • state_:这个成员变量标识了当前TCP连接的状态(Connected、Connecting、Disconnecting、Disconnected)
  • connetionCallback_、messageCallback_、writeCompleteCallback_、closeCallback_ : 用户会自定义 [连接建立/关闭后的处理函数] 、[收到消息后的处理函数]、[消息发送完后的处理函数]以及Muduo库中定义的[连接关闭后的处理函数]。这四个函数都会分别注册给这四个成员变量保存。

TcpConnection的重要成员方法

handleRead()、handleWrite()、handleClose()、handleError(): 这四个函数都是私有成员方法,在一个已经建立好的Tcp连接上主要会发生四类事件:可读事件、可写事件、连接关闭事件、错误事件。当事件监听器监听到一个连接发生了以上的事件,那么就会在EventLoop中调用这些事件对应的处理函数。

  • handleRead()负责处理Tcp连接的可读事件,它会将客户端发送来的数据拷贝到用户缓冲区中(inputBuffer_),然后再调用connectionCallback_保存的 [连接建立后的处理函数]。
  • handleWrite( )负责处理Tcp连接的可写事件。
  • handleClose( )负责处理Tcp连接关闭的事件。大概的处理逻辑就是将这个TcpConnection对象中的channel_从事件监听器中移除。然后调用connectionCallback_和closeCallback_保存的回调函数。这closeCallback_中保存的函数是由Muduo库提供的,connectionCallback_保存的回调函数则由用户提供的(可有可无其实)

TcpConnection表示的是“一次Tcp连接”,不可再生,一旦连接断开,该TcpConnection对象就没用了。TcpConnection没用发起连接的功能,构造函数参数是已经建立好连接的socket fd,初始状态是kConnecting。连接可以是TcpServer或TcpClient发起。

接收到消息时,通过Channel::handleEvent会将可读事件转交给TcpConnection::handleRead处理,而TcpConnection::handleRead又会通过messageCallback_将可读事件转交给TcpServer::messageCallback_,进而传递给用户。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// TcpConnection.cc
/**
* 从输入缓存inputBuffer_读取数据, 交给回调messageCallback_处理
* @param receiveTime 接收到读事件的时间点
* @details 通常是TcpServer/TcpClient运行回调messageCallback_, 将处理机会传递给用户
*/
void TcpConnection::handleRead(Timestamp receiveTime)
{
loop_->assertInLoopThread();
int savedErrno = 0;
ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno); // 从指定fd读取数据到内部缓冲
if (n > 0)
{
messageCallback_(shared_from_this(), &inputBuffer_, receiveTime);
}
else if (n == 0)
{
handleClose();
}
else
{
errno = savedErrno;
LOG_SYSERR << "TcpConnection::handleRead";
handleError();
}
}

断开Tcp连接

muduo中有2种关闭连接的方式:

  1. 被动关闭:即对端先关闭连接,本地read(2)返回0,触发关闭逻辑,调用handleClose。
  2. 主动关闭:利用forceClose()或forceCloseWithDelay()成员函数调用handleClose,强制关闭或强制延时关闭连接。

被动关闭流程见下图,图中“X”表示TcpConnection对象通常在此析构。

info

当连接到来,创建一个TcpConnection对象,立刻用shared_ptr来管理,引用计数为1,并在Channel中维护一个weak_ptr(tie_),将这个shared_ptr对象赋值给tie_,由weak_ptr的特性,引用计数仍然为1

当连接关闭,在handleEvent,将tie_提升,得到一个shared_ptr,这时候引用计数就变成了2,然后erase,引用计数变为1,所以TcpConnection计数变为1,并没有销毁,然后再调用closeCallback_,closeCallback_在TcpServer::newConnection()为新连接新建TcpConnection时,已设为TcpServer::removeConnection(),而removeConnection()最终会调用TcpConnection::connectDestroyed()来销毁连接资源。

Channel与断开连接

Channel中有关闭连接的事件回调CloseCallback,由Channel::handleEvent()调用,从而触发TcpConnection::handleClose():

调用链路:

Poller::poll()检测到Channel事件就绪 => EventLoop::loop() =>Channel::handleEvent() => Channel::closeCallback_ => TcpConnection::handleClose()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// Channel.cc
void Channel::handleEvent(Timestamp recevieTime)
{
std::shared_ptr<void> guard;
if (tied_)
{
guard = tie_.lock();
if (guard)
{
handleEventWithGuard(recevieTime);
}
}
else
{
handleEventWithGuard(recevieTime);
}
}

/**
* 根据不同的激活原因, 调用不同的回调函数
*/
void Channel::handleEventWithGuard(Timestamp receiveTime)
{
eventHandling_ = true; // 正在处理事件
LOG_TRACE << reventsToString(); // 打印fd及就绪事件
if ((revents_ & POLLHUP) && !(revents_ & POLLIN))
{ // fd挂起(套接字已不在连接中), 并且没有数据可读
if (logHup_)
{ // 打印挂起log
LOG_WARN << "fd = " << fd_ << " Channel::handle_event() POLLHUP";
}
// 调用关闭回调
if (closeCallback_) closeCallback_();
}
...
}

TcpConnection与断开连接

TcpConnection中有关闭连接的事件回调CloseCallback,由TcpConnection::handleClose()调用,从而触发TcpServer::removeConnection():

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// TcpConnection.h
public:
/* 关闭写半连接 */
void shutdown(); // NOT thread safe, no simultaneous calling
// void shutdownAndForceCloseAfter(double seconds); // NOT thread safe, no simultaneous calling

/* 强制关闭连接 */
void forceClose();
/* 强制延时关闭连接 */
void forceCloseWithDelay(double seconds);

...

/* Internal use only */
void setCloseCallback(const CloseCallback& cb)
{ closeCallback_ = cb; }

// called when TcpServer accepts a new connection
void connectEstablished(); // should be called only once per connection
// called when TcpServer has removed me from its map
void connectDestroyed(); // should be called only once per connection

private:
void handleClose(); // 处理关闭连接事件

被动关闭连接

当收到对端FIN分节时,本地read返回0,,Tcp连接被动关闭,会触发调用本地TcpConnection::handleClose(),其定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* 处理Tcp连接关闭
* @details 更新状态为kDisconnected, 清除所有事件通道监听
* @note 必须在所属loop线程中运行.
*/
void TcpConnection::handleClose()
{
loop_->assertInLoopThread(); // 确保在所属loop线程中运行
LOG_TRACE << "fd = " << channel_->fd() << " state = " << stateToString();
assert(state_ == kConnected || state_ == kDisconnecting);
// we don't close fd, leave it to dtor, so we can find leaks easily.
setState(kDisconnected); // 更新Tcp连接状态
channel_->disableAll(); // 停止监听所有通道事件(读写事件)

TcpConnectionPtr guardThis(shared_from_this());
connectionCallback_(guardThis); // 连接回调
// must be the last line
closeCallback_(guardThis); // 关闭连接回调
}

closeCallback_在TcpServer::newConnection()为新连接新建TcpConnection时,已设为TcpServer::removeConnection(),而removeConnection()最终会调用TcpConnection::connectDestroyed()来销毁连接资源。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* 主动销毁当前tcp连接, 移除通道事件
* @note 只有处于已连接状态(kConnected)的tcp连接, 才需要先更新状态, 关闭通道事件监听
*/
void TcpConnection::connectDestroyed()
{
loop_->assertInLoopThread();
if (state_ == kConnected) // 只有kConnected的连接, 才有必要采取断开连接动作
{
setState(kDisconnected);
channel_->disableAll(); // 关闭通道事件监听

connectionCallback_(shared_from_this()); // 调用连接回调
}
channel_->remove(); // 从EventLoop和Poller中移除监听通道事件
}

主动关闭连接:

主动关闭连接有两种方式:

  • 强制close连接
  • 关闭一个方向的连接(读或写方向)

强制关闭连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
* 强制关闭连接, 只对连接为kConnected或kDisconnecting状态才有效
* @details 为防止意外, 动作应该放到loop末尾去做
*/
void TcpConnection::forceClose()
{
// FIXME: use compare and swap
if (state_ == kConnected || state_ == kDisconnecting)
{
setState(kDisconnecting);
loop_->queueInLoop(std::bind(&TcpConnection::forceCloseInLoop, shared_from_this()));
}
}

/**
* 在所属loop循环中强制关闭连接, 只对连接为kConnected或kDisconnecting状态才有效
*/
void TcpConnection::forceCloseInLoop()
{
loop_->assertInLoopThread();
if (state_ == kConnected || state_ == kDisconnecting)
{
// as if we received 0 byte in handleRead()
handleClose();
}
}

可以看到,除了状态更新,对于关闭连接的真正操作,被动、主动关闭连接都是由handleClose来完成的。

假设想延迟一段时间再关闭连接,可以调用forceCloseWithDelay(),区别在于交给EventLoop:runAfter()延时运行,而不是交给EventLoop::queueInLoop()。

1
2
3
4
5
6
7
8
9
10
void TcpConnection::forceCloseWithDelay(double seconds)
{
if (state_ == kConnected || state_ == kDisconnecting)
{
setState(kDisconnecting);
loop_->runAfter(seconds,
makeWeakCallback(shared_from_this(),
&TcpConnection::forceClose)); // not forceCloseInLoop to avoid race condition
}
}

关闭一个方向的连接:

TcpConnection::shutdown()用于关闭写半连接,即发送FIN分节,关闭写通道,但仍然可以接收数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* 关闭连接写方向, 只有已连接状态才有效
*/
void TcpConnection::shutdown()
{
// FIXME: use compare and swap
if (state_ == kConnected)
{
setState(kDisconnecting);
// FIXME: shared_from_this()?
loop_->runInLoop(std::bind(&TcpConnection::shutDownInLoop, shared_from_this()));
}
}

关于close和shutdown的区别,详见Linux shutdown与clos

TcpServer与断开连接

当新建一个tcp连接时,TcpServer会调用newConnection创建一个新TcpConnection对象管理Tcp连接,并将对象加入自己的ConnectionMap进行管理。

而当tcp连接断开时,需要调用removeConnection进行移除工作,而removeConnection会将工作转交给removeConnectionInLoop, 确保在所属loop线程中执行。

1
2
3
4
5
6
7
8
/**
* 转交给removeConnectionInLoop, 在所属loop线程中执行
*/
void TcpServer::removeConnection(const TcpConnectionPtr &conn)
{
// FIXME: unsafe
loop_->runInLoop(std::bind(&TcpServer::removeConnectionInLoop, this, conn));
}

removeConnectionInLoop要做的工作是将要移除的tcp连接对应TcpConnection对象,从ConnectionMap移除,然后销毁该对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* 在所属loop线程循环中, 排队移除指定tcp连接 conn
* @param conn 指向待移除tcp连接对应TcpConnection对象
*/
void TcpServer::removeConnectionInLoop(const TcpConnectionPtr &conn)
{
loop_->assertInLoopThread();
LOG_INFO << "TcpServer::removeConnectionInLoop [" << name_
<< "] - connection " << conn->name();
size_t n = connections_.erase(conn->name()); // 从ConnectionMap中擦除待移除TcpConnection对象
(void)n;
assert(n == 1);
EventLoop* ioLoop = conn->getLoop();
ioLoop->queueInLoop(
std::bind(&TcpConnection::connectDestroyed, conn)); // 在所属loop线程中排队销毁TcpConnection对象
}

错误处理

用于发送数据的TcpConnection::send()重载了下面几个版本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
public:
...
/* 发送消息给连接对端 */
// void send(std::string&& mesage); // C++11
void send(const void* message, int len);
void send(const StringPiece& message);
// void send(Buffer&& message);
void send(Buffer* message); // this one will swap data

3send() 重载版本,最终都会转交给sendInLoop(const char*, int),在所属loop线程中执行发送工作。


// 转发给send(const StringPiece&), 最终转交给sendInLoop(const char*, int)
void TcpConnection::send(const void *message, int len)
{
send(StringPiece(static_cast<const char*>(message), len));
}

/**
* 转交给 sendInLoop(const char*, int)
* 发送消息给对端, 允许在其他线程调用
* @param message 要发送的消息. StringPiece兼容C/C++风格字符串, 二进制缓存, 提供统一字符串接口
*/
void TcpConnection::send(const StringPiece& message)
{
if (state_ == kConnected)
{
if (loop_->isInLoopThread())
{ // 当前线程是所属loop线程
sendInLoop(message);
}
else
{ // 当前线程并非所属loop线程
void (TcpConnection::*fp)(const StringPiece& message);
fp = &TcpConnection::sendInLoop;
loop_->runInLoop(
std::bind(fp,
this, // FIXME
message.as_string()));
}
}
}

// 转交给sendInLoop(const char*, int)
void TcpConnection::sendInLoop(const StringPiece &message)
{
sendInLoop(message.data(), message.size());
}

// 转交给sendInLoop(const char*, int)
// FIXME efficiency!!!
void TcpConnection::send(Buffer *buf)
{
if (state_ == kConnected)
{
if (loop_->isInLoopThread())
{
// send all readable bytes
sendInLoop(buf->peek(), buf->readableBytes());
buf->retrieveAll();
}
}
}

sendInLoop定义如下,主要是向对端发送一次数据,如果发送完一次,就进行一次回调;如果待发送数据超高水位,就进行高水位回调;如果发生错误,就进行错误回调。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
/**
* 在所属loop线程中, 发送data[len]
* @param data 要发送的缓冲区首地址
* @param len 要发送的缓冲区大小(bytes)
* @details 发生write错误, 如果发送缓冲区未满, 对端已发FIN/RST分节 表明tcp连接发生致命错误(faultError为true)
*/
void TcpConnection::sendInLoop(const char *data, size_t len)
{
loop_->assertInLoopThread();
ssize_t nwrote = 0;
size_t remaining = len;
bool faultError = false;
if (state_ == kDisconnected) // 如果已经断开连接(kDisconnected), 就无需发送, 打印log(LOG_WARN)
{
LOG_WARN << "disconnected, give up writing";
return;
}

// write一次, 往对端发送数据, 后面再看是否发生错误, 是否需要高水位回调
// if no thing output queue, try writing directly
if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0)
{ // 如果通道没有使能监听写事件, 并且outputBuffer 没有待发送数据, 就直接通过socket写
nwrote = sockets::write(channel_->fd(), data, len);
if (nwrote >= 0)
{
remaining = len - nwrote;
if (remaining == 0 && writeCompleteCallback_)
{
loop_->queueInLoop(std::bind(writeCompleteCallback_, shared_from_this()));
}
}
else // nwrote < 0, error
{
nwrote = 0;
if (errno != EWOULDBLOCK) // EWOULDBLOCK: 发送缓冲区已满, 且fd已设为nonblocking
{ // O_NONBLOCK fd, write block but return EWOULDBLOCK error
LOG_SYSERR << "TcpConnection::sendInLoop";
if (errno == EPIPE || errno == ECONNRESET) // FIXME: any others?
{ // EPIPE: reading end is closed; ECONNRESET: connection reset by peer
faultError = true;
}
}
}
}

// 处理剩余待发送数据
assert(remaining <= len);
if (!faultError && remaining > 0) // 没有故障, 并且还有待发送数据, 可能是发送太快, 对方来不及接收
{ // no error and data remaining to be written
size_t oldLen = outputBuffer_.readableBytes(); // Buffer中待发送数据量

if (oldLen + remaining >= highWaterMark_ // Buffer及当前要发送的数据量之和 超 高水位(highWaterMark)
&& oldLen < highWaterMark_ // 单独的Buffer中待发送数据量 未超 高水位
&& highWaterMarkCallback_)
{
loop_->queueInLoop(std::bind(highWaterMarkCallback_, shared_from_this(), oldLen + remaining));
}
// append data to be written to the output buffer
outputBuffer_.append(static_cast<const char*>(data) + nwrote, remaining);
// enable write event for channel_
if (!channel_->isWriting())
{ // 如果没有在监听通道写事件, 就使能通道写事件
channel_->enableWriting();
}
}
}

info

HighWaterMarkCallback指的是如果输出缓冲的长度超过用户指定大小,就会触发回调(只在上升沿触发一次)。

用户使用TcpServer与客户端进行通信

假设客户端请求与服务器建立连接,用户(库的使用者)如何在建立连接、发送数据时,执行自定义任务?

可以在TcpServer启动前,利用TcpServer::setConnectionCallback()、TcpServer::setMessageCallback()等设置回调接口注册任务,等Tcp连接达到指定状态后,会回调用户任务。

示例:使用TcpServer接口类创建Tcp连接

例如,用户自定义服务EchoServer回传收到的客户端消息,EchoServer使用TcpServer注册连接回调(setConnectionCallback)和消息回调(setMessageCallback)。

如果需要分阶段向客户端发送数据,还可以注册写完成回调(setWriteCompleteCallback)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// examples/simple/echo
EchoServer::EchoServer(muduo::net::EventLoop *loop, const muduo::net::InetAddress &listenAddr)
: server_(loop, listenAddr, "EchoServer")
{
server_.setConnectionCallback(
std::bind(&EchoServer::onConnection, this, _1));
server_.setMessageCallback(
std::bind(&EchoServer::onMessage, this, _1, _2, _3));

LOG_INFO << server_.name() << " listen on " << listenAddr.toIpPort();
}

void EchoServer::start()
{
server_.start();
}

void EchoServer::onConnection(const muduo::net::TcpConnectionPtr &conn)
{
LOG_INFO << "EchoServer - " << conn->peerAddress().toIpPort() << " -> "
<< conn->localAddress().toIpPort() << " is "
<< (conn->connected() ? "UP" : "DOWN");
}

void EchoServer::onMessage(const muduo::net::TcpConnectionPtr &conn, muduo::net::Buffer *buf, muduo::Timestamp time)
{
muduo::string msg(buf->retrieveAllAsString());
LOG_INFO << conn->name() << " echo " << msg.size() << " bytes, "
<< "data received at " << time.toString();
conn->send(msg);
}

int main()
{
LOG_INFO << "pid = " << getpid();
muduo::net::EventLoop loop; // 创建EventLoop对象
muduo::net::InetAddress listenAddr(2007); // 创建包含ip地址、端口号的对象
EchoServer server(&loop, listenAddr); // 创建用户自定义EchoServer对象
server.start(); // 启动EchoServer服务器
loop.loop(); // 启动loop循环
return 0;
}

参考


muduo库-TCP连接
https://gstarmin.github.io/2023/07/10/muduo库-TCP连接/
作者
Starmin
发布于
2023年7月10日
更新于
2023年7月11日
许可协议