muduo库-TCP连接
TCP连接中,两端是对等的,TCP协议也没有区分客户端(client)与服务器端(server),但互联网中通信中,往往有一端提供资源给另一端访问,我们把拥有资源的一端称为服务器端,请求资源的一端称为客户端。
对于server,会启用一个监听循环,不断接受client连接请求(三次握手),进行数据通信,通信完成以后断开连接(四次挥手);对于client,在server启用监听循环时,向server发出连接请求,接收server数据,如有必要向server发送数据,通信完成后,断开连接。连接是指物理上一端(client)到另一端(server)的通信链路,通过server端<ip,
port>与客户端<ip, port>,来唯一标识一个TCP连接。
TCP连接有长连接、短连接之分
短连接:client和server建立连接后,一般只传递一次读写操作,然后由client发起close,发送FIN分节,关闭连接。短连接只完成一次read/write操作,就会自动关闭。
长连接:client和server建立连接后,并不会自动关闭,后续的read/write操作会继续用这个连接。长连接没有确切时间限制,可能会长时间存在。
TcpServer类
muduo使用TcpConnection类来管理TCP连接,使用接受器Acceptor来接受连接,连接器Connector发起连接 。TcpServer管理accept获得TcpConnection,生命周期由用户控制。
下图是TcpServer新建连接的相关函数调用顺序。当Channel::handleEvent()的触发条件是listening
socket可读时,表明有新连接请求达到。TcpServer为新连接创建对应的TcpConnection对象。
Acceptor类
Acceptor是TcpServer的一个内部类,主要职责是用来获得新连接的fd。Accetpor封装了服务器监听套接字fd以及相关处理方法。Acceptor类内部其实没有贡献什么核心的处理函数,主要是对其他类的方法调用进行封装。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 class Acceptor : private noncopyable {public : typedef std::function<void (int sockfd, const InetAddress &)> NewConnectionCallback; Acceptor (EventLoop* loop, const InetAddress& listenAddr, bool reuseport); ~Acceptor (); void setNewConnectionCallback (const NewConnectionCallback& cb) { newConnectionCallback_ = cb; } void listen () ; bool listening () const { return listening_; }private : void handleRead () ; EventLoop *loop_; Socket acceptSocket_; Channel acceptChannel_; NewConnectionCallback newConnectionCallback_; bool listening_; int idleFd_; };
如果fd资源不够用了,导致accept(2)/accept4(2)创建连接失败,比如达到系统上限,怎么办?
Accetor用了这样一种技术:先申请一个空闲的fd(idleFd_),等到发生由于fd资源不够用时,就把这个备用fd暂时用于accept接收连接,然后再马上关闭,以防止不断产生可读事件(连接请求),从而回调相同的失败代码。及早建立连接后并关闭连接,让程序不会频繁响应同一个连接请求。
Acceptor封装的重要成员变量
acceptSocket_:服务器监听套接字的文件描述符
acceptChannel_:是一个Channel类,把acceptSocket_及其感兴趣事件和事件对应的处理函数都封装进去。
EventLoop
*loop:监听套接字的fd由哪个EventLoop负责循环监听以及处理相应事件,其实这个EventLoop就是main
EventLoop。
newConnectionCallback_:
TcpServer构造函数中将TcpServer::newConnection()函数注册给了这个成员变量。这个TcpServer::newConnection函数的功能是公平的选择一个subEventLoop,并把已经接受的连接分发给这个subEventLoop。
Acceptor封装的重要成员方法
listen( ):该函数底层调用了linux的函数listen(
),开启对acceptSocket_的监听同时将acceptChannel及其感兴趣事件(可读事件)注册到main
EventLoop的事件监听器上。换言之就是让main
EventLoop事件监听器去监听acceptSocket_
handleRead(
):这是一个私有成员方法,这个方法是要注册到acceptChannel_上的,
同时handleRead(
)方法内部还调用了成员变量newConnectionCallback_保存的函数。当main
EventLoop监听到acceptChannel_上发生了可读事件时(新用户连接事件),就是调用这个handleRead(
)方法。
简单来说,handleRead(
)最终实现的功能就是接受新连接,并且以负载均衡的选择方式选择一个sub
EventLoop,并把这个新连接分发到这个subEventLoop上。
Acceptor构造与析构
Acceptor构造时,创建sockfd(套接字),待后续交给TcpServer来start监听套接字。空闲fd指向文件"/dev/null",用来解决服务器fd资源耗尽问题。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 Acceptor::Acceptor (EventLoop *loop, const InetAddress &listenAddr, bool reuseport) : loop_ (loop), acceptSocket_ (sockets::createNonblockingOrDie (listenAddr.family ())), acceptChannel_ (loop, acceptSocket_.fd ()), listening_ (false ), idleFd_ (::open ("/dev/null" , O_RDONLY | O_CLOEXEC)) { assert (idleFd_ >= 0 ); acceptSocket_.setReuseAddr (true ); acceptSocket_.setReusePort (reuseport); acceptSocket_.bindAddress (listenAddr); acceptChannel_.setReadCallback ( std::bind (&Acceptor::handleRead, this )); } Acceptor::~Acceptor () { acceptChannel_.disableAll (); acceptChannel_.remove (); ::close (idleFd_); }
Acceptor监听
Acceptor包含2类监听:
监听套接字,即本地ip地址&端口。
监听通道事件,读事件。
为什么不在构造时,就调用listen监听sockfd呢?
将非必要资源的初始化,延迟到需要时,用户可以通过调用TcpSever::start()来启动。这样,用户可以更灵活控制资源的申请和释放。
Acceptor接受连接
Acceptor内部有一个Channel成员,当Poller监听到有Tcp连接请求时,就通过Channel的可读事件,在loop线程,来回调Acceptor::handleRead()。从而将conn
fd和IP地址传递给上一层TcpServer,用于创建TcpConnection对象管理Tcp连接。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 void Acceptor::handleRead () { loop_->assertInLoopThread (); InetAddress peerAddr; int connfd = acceptSocket_.accept (&peerAddr); if (connfd >= 0 ) { if (newConnectionCallback_) { newConnectionCallback_ (connfd, peerAddr); } else { sockets::close (connfd); } } else { LOG_SYSERR << "in Acceptor::handleRead" ; if (errno == EMFILE) { ::close (idleFd_); idleFd_ = ::accept (acceptSocket_.fd (), NULL , NULL ); ::close (idleFd_); idleFd_ = ::open ("/dev/null" , O_RDONLY | O_CLOEXEC); } } }
TcpServer类
TcpServer类管理TcpConnection,供用户直接使用,生命周期由用户控制。接口如下,用户只需要设置好callback,然后调用start()即可。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 class TcpServer : private noncopyable {public : typedef std::function<void (EventLoop*)> ThreadInitCallback; enum Option enum Option { kNoReusePort, kReusePort, }; TcpServer (EventLoop* loop, const InetAddress& listenAddr, const std::string& nameArg, Option option = kNoReusePort); ~TcpServer (); void start () ; void setConnectionCallback (const ConnectionCallback& cb) { connectionCallback_ = cb; } void setMessageCallback (const MessageCallback & cb) { messageCallback_ = cb; } void setWriteCompleteCallback (const WriteCompleteCallback& cb) { writeCompleteCallback_ = cb; }
note
并没有连接关闭的回调,这是由TcpServer::removeConnection()负责的,进而把工作转交给TcpConnection::connectDestroyed(),用户不可更改设置。
TcpServer的构造与析构
TcpServer构造函数主要工作是为成员申请资源,为各回调设置回调函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 TcpServer::TcpServer (EventLoop* loop, const InetAddress& listenAddr, const std::string& nameArg, Option option) : loop_ (CHECK_NOTNULL (loop)), ipPort_ (listenAddr.toIpPort ()), name_ (nameArg), acceptor_ (new Acceptor (loop, listenAddr, option == kReusePort)), threadPool_ (new EventLoopThreadPool (loop, name_)), connectionCallback_ (defaultConnectionCallback), messageCallback_ (defaultMessageCallback), nextConnId_ (1 ) { acceptor_->setNewConnectionCallback ( std::bind (&TcpServer::newConnection, this , _1, _2)); }
同样是连接回调,TcpServer::newConnection()和connectionCallback_有何区别?
前者是Acceptor发生连接请求事件时,回调,用来新建一个Tcp连接;后者是在TcpServer内部新建连接即调用TcpServer::newConnection()时,回调connectionCallback_。
TcpServer析构工作内容很简单,主要销毁ConnectionMap中所有Tcp连接,而每个Tcp连接用的是一个TcpConnection对象来管理的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 TcpServer::~TcpServer () { loop_->assertInLoopThread (); LOG_TRACE << "TcpServer::~TcpServer [" << name_ << "] destructing" ; for (auto & item : connections_) { TcpConnectionPtr conn (item.second) ; item.second.reset (); conn->getLoop ()->runInLoop ( std::bind (&TcpConnection::connectDestroyed, conn)); } }
TcpServer启动Tcp服务器,主要完成
线程池的启动;
Acceptor监听Tcp连接请求。
线程池需要指定其初始数量,当然,这需要在start()之前调用TcpServer::setThreadNum()设置。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 void TcpServer::start () { if (started_.getAndSet (1 ) == 0 ) { threadPool_->start (threadInitCallback_); assert (!acceptor_->listening ()); loop_->runInLoop ( std::bind (&Acceptor::listen, get_pointer (acceptor_))); } }
TcpServer如何获得新连接的connfd(accept返回值)?
TcpServer内部用Acceptor,保存用户提供的Connection-Callback和MessageCallback,新建TcpConnection对象(newConn())的时候直接传递给TcpConnection的构造函数
如何创建TcpConnection对象?
新连接请求到达时,Acceptor回调newConnection(),通过TcpServer::newConnection创建一个新TcpConnection对象,用于管理一个Tcp连接。
即,TcpServer::newConnection回调顺序 EventLoop => Channel =>
Acceptor => TcpServer
下面是用于创建TcpConnection对象的函数TcpServer::newConnection()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 void TcpServer::newConnection (int sockfd, const InetAddress &peerAddr) { loop_->assertInLoopThread (); EventLoop* ioLoop = threadPool_->getNextLoop (); char buf[32 ]; snprintf (buf, sizeof (buf), "-%s#%d" , ipPort_.c_str (), nextConnId_); ++nextConnId_; std::string connName = name_ + buf; LOG_INFO << "TcpServer::newConnection [" << name_ << "] - new connection [" << connName << "] from " << peerAddr.toIpPort (); InetAddress localAddr (sockets::getLocalAddr(sockfd)) ; TcpConnectionPtr conn (new TcpConnection(ioLoop, connName, sockfd, localAddr, peerAddr)) ; connections_[connName] = conn; conn->setConnectionCallback (connectionCallback_); conn->setMessageCallback (messageCallback_); conn->setWriteCompleteCallback (writeCompleteCallback_); conn->setCloseCallback ( std::bind (&TcpServer::removeConnection, this , _1)); ioLoop->runInLoop (std::bind (&TcpConnection::connectEstablished, conn)); }
TcpConnection类
这个类主要封装了一个已建立的TCP连接,以及控制该TCP连接的方法(连接建立和关闭和销毁),以及该连接发生的各种事件(读/写/错误/连接)对应的处理函数,以及这个TCP连接的服务端和客户端的套接字地址信息等。
TcpConnection类是唯一默认用shared_ptr来管理的类,唯一继承自enable_shared_from_this的类。这是因为其生命周期模糊:可能在连接断开时,还有其他地方持有它的引用,贸然delete会造成空悬指针。只有确保其他地方没有持有该对象的引用的时候,才能安全地销毁对象。
我个人觉得TcpConnection类和Acceptor类是兄弟关系,Acceptor用于main
EventLoop中,对服务器监听套接字fd及其相关方法进行封装(监听、接受连接、分发连接给SubEventLoop等),TcpConnection用于SubEventLoop中,对连接套接字fd及其相关方法进行封装(读消息事件、发送消息事件、连接关闭事件、错误事件等)。
TcpConnection的重要变量
ocket_:用于保存已连接套接字文件描述符。
channel_:封装了上面的socket_及其各类事件的处理函数(读、写、错误、关闭等事件处理函数)。这个Channel种保存的各类事件的处理函数是在TcpConnection对象构造函数中注册的。
loop_:这是一个EventLoop*类型,该Tcp连接的Channel注册到了哪一个sub
EventLoop上。这个loop_就是那一个sub EventLoop。
inputBuffer_:这是一个Buffer类,是该TCP连接对应的用户接收缓冲区。
outputBuffer_:也是一个Buffer类,不过是用于暂存那些暂时发送不出去的待发送数据。因为Tcp发送缓冲区是有大小限制的,假如达到了高水位线,就没办法把发送的数据通过send()直接拷贝到Tcp发送缓冲区,而是暂存在这个outputBuffer_中,等TCP发送缓冲区有空间了,触发可写事件了,再把outputBuffer_中的数据拷贝到Tcp发送缓冲区中。
state_:这个成员变量标识了当前TCP连接的状态(Connected、Connecting、Disconnecting、Disconnected)
connetionCallback_、messageCallback_、writeCompleteCallback_、closeCallback_
: 用户会自定义 [连接建立/关闭后的处理函数]
、[收到消息后的处理函数]、[消息发送完后的处理函数]以及Muduo库中定义的[连接关闭后的处理函数]。这四个函数都会分别注册给这四个成员变量保存。
TcpConnection的重要成员方法
handleRead()、handleWrite()、handleClose()、handleError():
这四个函数都是私有成员方法,在一个已经建立好的Tcp连接上主要会发生四类事件:可读事件、可写事件、连接关闭事件、错误事件。当事件监听器监听到一个连接发生了以上的事件,那么就会在EventLoop中调用这些事件对应的处理函数。
handleRead()负责处理Tcp连接的可读事件,它会将客户端发送来的数据拷贝到用户缓冲区中(inputBuffer_),然后再调用connectionCallback_保存的
[连接建立后的处理函数]。
handleWrite( )负责处理Tcp连接的可写事件。
handleClose(
)负责处理Tcp连接关闭的事件。大概的处理逻辑就是将这个TcpConnection对象中的channel_从事件监听器中移除。然后调用connectionCallback_和closeCallback_保存的回调函数。这closeCallback_中保存的函数是由Muduo库提供的,connectionCallback_保存的回调函数则由用户提供的(可有可无其实)
TcpConnection表示的是“一次Tcp连接”,不可再生,一旦连接断开,该TcpConnection对象就没用了。TcpConnection没用发起连接的功能,构造函数参数是已经建立好连接的socket
fd,初始状态是kConnecting。连接可以是TcpServer或TcpClient发起。
接收到消息时,通过Channel::handleEvent会将可读事件转交给TcpConnection::handleRead处理,而TcpConnection::handleRead又会通过messageCallback_将可读事件转交给TcpServer::messageCallback_,进而传递给用户。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 void TcpConnection::handleRead (Timestamp receiveTime) { loop_->assertInLoopThread (); int savedErrno = 0 ; ssize_t n = inputBuffer_.readFd (channel_->fd (), &savedErrno); if (n > 0 ) { messageCallback_ (shared_from_this (), &inputBuffer_, receiveTime); } else if (n == 0 ) { handleClose (); } else { errno = savedErrno; LOG_SYSERR << "TcpConnection::handleRead" ; handleError (); } }
断开Tcp连接
muduo中有2种关闭连接的方式:
被动关闭:即对端先关闭连接,本地read(2)返回0,触发关闭逻辑,调用handleClose。
主动关闭:利用forceClose()或forceCloseWithDelay()成员函数调用handleClose,强制关闭或强制延时关闭连接。
被动关闭流程见下图,图中“X”表示TcpConnection对象通常在此析构。
info
当连接到来,创建一个TcpConnection对象,立刻用shared_ptr来管理,引用计数为1,并在Channel中维护一个weak_ptr(tie_),将这个shared_ptr对象赋值给tie_,由weak_ptr的特性,引用计数仍然为1
当连接关闭,在handleEvent,将tie_提升,得到一个shared_ptr,这时候引用计数就变成了2,然后erase,引用计数变为1,所以TcpConnection计数变为1,并没有销毁,然后再调用closeCallback_,closeCallback_在TcpServer::newConnection()为新连接新建TcpConnection时,已设为TcpServer::removeConnection(),而removeConnection()最终会调用TcpConnection::connectDestroyed()来销毁连接资源。
Channel与断开连接
Channel中有关闭连接的事件回调CloseCallback,由Channel::handleEvent()调用,从而触发TcpConnection::handleClose():
调用链路:
Poller::poll()检测到Channel事件就绪 => EventLoop::loop()
=>Channel::handleEvent() => Channel::closeCallback_ =>
TcpConnection::handleClose()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 void Channel::handleEvent (Timestamp recevieTime) { std::shared_ptr<void > guard; if (tied_) { guard = tie_.lock (); if (guard) { handleEventWithGuard (recevieTime); } } else { handleEventWithGuard (recevieTime); } }void Channel::handleEventWithGuard (Timestamp receiveTime) { eventHandling_ = true ; LOG_TRACE << reventsToString (); if ((revents_ & POLLHUP) && !(revents_ & POLLIN)) { if (logHup_) { LOG_WARN << "fd = " << fd_ << " Channel::handle_event() POLLHUP" ; } if (closeCallback_) closeCallback_ (); } ... }
TcpConnection与断开连接
TcpConnection中有关闭连接的事件回调CloseCallback,由TcpConnection::handleClose()调用,从而触发TcpServer::removeConnection():
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public : void shutdown () ; void forceClose () ; void forceCloseWithDelay (double seconds) ; ... void setCloseCallback (const CloseCallback& cb) { closeCallback_ = cb; } void connectEstablished () ; void connectDestroyed () ; private : void handleClose () ;
被动关闭连接
当收到对端FIN分节时,本地read返回0,,Tcp连接被动关闭,会触发调用本地TcpConnection::handleClose(),其定义如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 void TcpConnection::handleClose () { loop_->assertInLoopThread (); LOG_TRACE << "fd = " << channel_->fd () << " state = " << stateToString (); assert (state_ == kConnected || state_ == kDisconnecting); setState (kDisconnected); channel_->disableAll (); TcpConnectionPtr guardThis (shared_from_this()) ; connectionCallback_ (guardThis); closeCallback_ (guardThis); }
closeCallback_在TcpServer::newConnection()为新连接新建TcpConnection时,已设为TcpServer::removeConnection(),而removeConnection()最终会调用TcpConnection::connectDestroyed()来销毁连接资源。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 void TcpConnection::connectDestroyed () { loop_->assertInLoopThread (); if (state_ == kConnected) { setState (kDisconnected); channel_->disableAll (); connectionCallback_ (shared_from_this ()); } channel_->remove (); }
主动关闭连接 :
主动关闭连接有两种方式:
强制close连接
关闭一个方向的连接(读或写方向)
强制关闭连接 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 void TcpConnection::forceClose () { if (state_ == kConnected || state_ == kDisconnecting) { setState (kDisconnecting); loop_->queueInLoop (std::bind (&TcpConnection::forceCloseInLoop, shared_from_this ())); } }void TcpConnection::forceCloseInLoop () { loop_->assertInLoopThread (); if (state_ == kConnected || state_ == kDisconnecting) { handleClose (); } }
可以看到,除了状态更新,对于关闭连接的真正操作,被动、主动关闭连接都是由handleClose来完成的。
假设想延迟一段时间再关闭连接,可以调用forceCloseWithDelay(),区别在于交给EventLoop:runAfter()延时运行,而不是交给EventLoop::queueInLoop()。
1 2 3 4 5 6 7 8 9 10 void TcpConnection::forceCloseWithDelay (double seconds) { if (state_ == kConnected || state_ == kDisconnecting) { setState (kDisconnecting); loop_->runAfter (seconds, makeWeakCallback (shared_from_this (), &TcpConnection::forceClose)); } }
关闭一个方向的连接 :
TcpConnection::shutdown()用于关闭写半连接,即发送FIN分节,关闭写通道,但仍然可以接收数据。
1 2 3 4 5 6 7 8 9 10 11 12 13 void TcpConnection::shutdown () { if (state_ == kConnected) { setState (kDisconnecting); loop_->runInLoop (std::bind (&TcpConnection::shutDownInLoop, shared_from_this ())); } }
关于close和shutdown的区别,详见Linux
shutdown与clos
TcpServer与断开连接
当新建一个tcp连接时,TcpServer会调用newConnection创建一个新TcpConnection对象管理Tcp连接,并将对象加入自己的ConnectionMap进行管理。
而当tcp连接断开时,需要调用removeConnection进行移除工作,而removeConnection会将工作转交给removeConnectionInLoop,
确保在所属loop线程中执行。
1 2 3 4 5 6 7 8 void TcpServer::removeConnection (const TcpConnectionPtr &conn) { loop_->runInLoop (std::bind (&TcpServer::removeConnectionInLoop, this , conn)); }
removeConnectionInLoop要做的工作是将要移除的tcp连接对应TcpConnection对象,从ConnectionMap移除,然后销毁该对象。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 void TcpServer::removeConnectionInLoop (const TcpConnectionPtr &conn) { loop_->assertInLoopThread (); LOG_INFO << "TcpServer::removeConnectionInLoop [" << name_ << "] - connection " << conn->name (); size_t n = connections_.erase (conn->name ()); (void )n; assert (n == 1 ); EventLoop* ioLoop = conn->getLoop (); ioLoop->queueInLoop ( std::bind (&TcpConnection::connectDestroyed, conn)); }
错误处理
用于发送数据的TcpConnection::send()重载了下面几个版本
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 public : ... void send (const void * message, int len) ; void send (const StringPiece& message) ; void send (Buffer* message) ; 3 个send () 重载版本,最终都会转交给sendInLoop (const char *, int ),在所属loop线程中执行发送工作。void TcpConnection::send (const void *message, int len) { send (StringPiece (static_cast <const char *>(message), len)); }void TcpConnection::send (const StringPiece& message) { if (state_ == kConnected) { if (loop_->isInLoopThread ()) { sendInLoop (message); } else { void (TcpConnection::*fp)(const StringPiece& message); fp = &TcpConnection::sendInLoop; loop_->runInLoop ( std::bind (fp, this , message.as_string ())); } } }void TcpConnection::sendInLoop (const StringPiece &message) { sendInLoop (message.data (), message.size ()); }void TcpConnection::send (Buffer *buf) { if (state_ == kConnected) { if (loop_->isInLoopThread ()) { sendInLoop (buf->peek (), buf->readableBytes ()); buf->retrieveAll (); } } }
sendInLoop定义如下,主要是向对端发送一次数据,如果发送完一次,就进行一次回调;如果待发送数据超高水位,就进行高水位回调;如果发生错误,就进行错误回调。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 void TcpConnection::sendInLoop (const char *data, size_t len) { loop_->assertInLoopThread (); ssize_t nwrote = 0 ; size_t remaining = len; bool faultError = false ; if (state_ == kDisconnected) { LOG_WARN << "disconnected, give up writing" ; return ; } if (!channel_->isWriting () && outputBuffer_.readableBytes () == 0 ) { nwrote = sockets::write (channel_->fd (), data, len); if (nwrote >= 0 ) { remaining = len - nwrote; if (remaining == 0 && writeCompleteCallback_) { loop_->queueInLoop (std::bind (writeCompleteCallback_, shared_from_this ())); } } else { nwrote = 0 ; if (errno != EWOULDBLOCK) { LOG_SYSERR << "TcpConnection::sendInLoop" ; if (errno == EPIPE || errno == ECONNRESET) { faultError = true ; } } } } assert (remaining <= len); if (!faultError && remaining > 0 ) { size_t oldLen = outputBuffer_.readableBytes (); if (oldLen + remaining >= highWaterMark_ && oldLen < highWaterMark_ && highWaterMarkCallback_) { loop_->queueInLoop (std::bind (highWaterMarkCallback_, shared_from_this (), oldLen + remaining)); } outputBuffer_.append (static_cast <const char *>(data) + nwrote, remaining); if (!channel_->isWriting ()) { channel_->enableWriting (); } } }
info
HighWaterMarkCallback指的是如果输出缓冲的长度超过用户指定大小,就会触发回调(只在上升沿触发一次)。
用户使用TcpServer与客户端进行通信
假设客户端请求与服务器建立连接,用户(库的使用者)如何在建立连接、发送数据时,执行自定义任务?
可以在TcpServer启动前,利用TcpServer::setConnectionCallback()、TcpServer::setMessageCallback()等设置回调接口注册任务,等Tcp连接达到指定状态后,会回调用户任务。
示例:使用TcpServer接口类创建Tcp连接
例如,用户自定义服务EchoServer回传收到的客户端消息,EchoServer使用TcpServer注册连接回调(setConnectionCallback)和消息回调(setMessageCallback)。
如果需要分阶段向客户端发送数据,还可以注册写完成回调(setWriteCompleteCallback)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 EchoServer::EchoServer (muduo::net::EventLoop *loop, const muduo::net::InetAddress &listenAddr) : server_ (loop, listenAddr, "EchoServer" ) { server_.setConnectionCallback ( std::bind (&EchoServer::onConnection, this , _1)); server_.setMessageCallback ( std::bind (&EchoServer::onMessage, this , _1, _2, _3)); LOG_INFO << server_.name () << " listen on " << listenAddr.toIpPort (); }void EchoServer::start () { server_.start (); }void EchoServer::onConnection (const muduo::net::TcpConnectionPtr &conn) { LOG_INFO << "EchoServer - " << conn->peerAddress ().toIpPort () << " -> " << conn->localAddress ().toIpPort () << " is " << (conn->connected () ? "UP" : "DOWN" ); }void EchoServer::onMessage (const muduo::net::TcpConnectionPtr &conn, muduo::net::Buffer *buf, muduo::Timestamp time) { muduo::string msg (buf->retrieveAllAsString()) ; LOG_INFO << conn->name () << " echo " << msg.size () << " bytes, " << "data received at " << time.toString (); conn->send (msg); }int main () { LOG_INFO << "pid = " << getpid (); muduo::net::EventLoop loop; muduo::net::InetAddress listenAddr (2007 ) ; EchoServer server (&loop, listenAddr) ; server.start (); loop.loop (); return 0 ; }
参考