muduo库-事件通道Channel
之前说过,Poller的存在是为了监听事件,那么具体监听什么事件呢?
这就需要用到Channel类,它是muduo库负责注册读写事件的类,并保存了fd读写事件发生时调用的回调函数,如果poll/epoll有读写事件发生则将这些事件添加到对应的通道中。一个通道对应唯一EventLoop,一个EventLoop可以有多个通道 。
一个通道对应唯一EventLoop,一个EventLoop可以有多个通道。
一个Channel对象绑定了一个fd(文件描述符),可以用来监听发生在fd上的事件,事件包括空事件(不监听)、可读事件、写完成事件 。当fd上被监听事件就绪时,对应Channel对象就会被Poller放入激活队列(activeChannels_) ,进而在loop循环中调用封装在Channel的相应回调来处理事件。
attention
Channel类不负责fd的生存期,fd的生存期是由socket决定的,断开连接关闭描述符。
Channel可以通过EventLoop,向Poller更新自己关心的(监听)事件(通过map
Poller::channels_存储)。具体来说,对于PollPoller对象,会同步更新(poll)传给内核的poll事件数组pollfds_;对于EPollPoller对象,会同步更新(epoll)传递给内核的epoll事件数组events_;
可以这样理解,poll/epoll监听的是fd(上指定的事件pollfd.events),Poller监听的是Channel对象(上指定的事件events_),当监听到事件就绪时,将对应通道加入激活通道队列,在EventLoop的loop循环中依次调用Channel中注册的事件回调。
EventLoop、Poller、Channel这3个类构成了Reactor模式的核心,其时序关系如下图:
Channel 类
每个Channel对象从始至终只负责一个文件描述符(fd)的IO事件分发,但不拥有fd,也不会在析构时关闭fd 。而是由诸如TcpConnection、Acceptor、EventLoop等,这样需要监听指定文件描述符上事件的类,将fd通过构造函数传递给Channel。
Channel会把不同的IO事件分发为不同的回调,如ReadCallback、WriteCallback,回调对象类型用std::function<>表示,用来定义某个可调用类型。
事件回调类型:
1 2 3 4 #include <functional> typedef std::function<void ()> EventCallback;typedef std::function<void (Timestamp)> ReadEventCallback;
Channel成员函数主要包括:
设置事件处理的回调函数setCallback(如setReadCallback);
开启fd关心的事件events_,可调用enable(如enableReading),该fd及关心的事件会注册到Poller中进行监听;
关闭fd关心的事件events_,可调用disable*(如disableReading),会更新该fd在Poller中监听的事件;
关闭fd关心的所有事件events_,可调用disableAll,会更新该fd在Poller中监听的事件;
删除对fd的监听,会将其从Poller的ChannelMap中移除;
Poller监听到Channel事件被激活时,将其加入到激活列表,在EventLoop中回调handleEvent。
Channel类内部结构
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 class Channel : private noncopyable {public : typedef std::function<void ()> EventCallback; typedef std::function<void (Timestamp)> ReadEventCallback; Channel (EventLoop* loop, int fd__); ~Channel () void handleEvent (Timestamp recevieTime) ; void setReadCallback (ReadEventCallback cb) { readCallback_ = std::move (cb); } void setWriteCallback (EventCallback cb) { writeCallback_ = std::move (cb); } void setCloseCallback (EventCallback cb) { closeCallback_ = std::move (cb); } void setErrorCallback (EventCallback cb) { errorCallback_ = std::move (cb); } void tie (const std::shared_ptr<void >&) ; int fd () const { return fd_; } int events () const { return events_; } void set_revents (int revt) { revents_ = revt; } bool isNoneEvent () const { return events_ == kNoneEvent; } void enableReading () { events_ |= kReadEvent; update (); } void disableReading () { events_ &= ~kReadEvent; update (); } void enableWriting () { events_ |= kWriteEvent; update (); } void disableWriting () { events_ &= ~kWriteEvent; update (); } void disableAll () { events_ = kNoneEvent; update (); } bool isWriting () const { return events_ & kWriteEvent; } bool isReading () const { return events_ & kReadEvent; } int index () { return index_; } void set_index (int idx) { index_ = idx; } string reventsToString () const ; string eventsToString () const ; void doNotLogHup () { logHup_ = false ; } EventLoop* ownerLoop () { return loop_; } void remove () ;private : static string eventsToString (int fd, int ev) ; void update () ; void handleEventWithGuard (Timestamp receiveTime) ; static const int kNoneEvent; static const int kReadEvent; static const int kWriteEvent; EventLoop* loop_; const int fd_; int events_; int revents_; int index_; bool logHup_; std::weak_ptr<void > tie_; bool tied_; bool eventHandling_; bool addedToLoop_; ReadEventCallback readCallback_; EventCallback writeCallback_; EventCallback closeCallback_; EventCallback errorCallback_; };
handleEvent 处理事件
处理激活的Channel事件,由Poller更新激活的Channel列表,EventLoop::loop()根据激活Channel列表,逐个执行Channel中已注册好的相应回调。实际事件处理工作,由handleEventWithGuard完成。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 void Channel::handleEvent (Timestamp recevieTime) { std::shared_ptr<void > guard; if (tied_) { guard = tie_.lock (); if (guard) { handleEventWithGuard (recevieTime); } } else { handleEventWithGuard (recevieTime); } }
handleEventWithGuard
识别事件并回调
根据不同的激活原因,调用不的回调函数。这些回调函数,是在持有Channel对象,需要进行事件监听的class中进行设置,比如TcpConnection,EventLoop,Acceptor,TimerQueue等。而有些回调函数,经过层层传递,会呈现可网络库的调用者,比如TcpConnection会将处理一个socket
fd的读事件回调(新建连接请求),传递给TcpServer::newConnection,这样用户就能通过TcpServer::setConnectionCallback设置其回调。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 void Channel::handleEventWithGuard (Timestamp receiveTime) { eventHandling_ = true ; LOG_TRACE << reventsToString (); if ((revents_ & POLLHUP) && !(revents_ & POLLIN)) { if (logHup_) { LOG_WARN << "fd = " << fd_ << " Channel::handle_event() POLLHUP" ; } if (closeCallback_) closeCallback_ (); } if (revents_ & POLLNVAL) { LOG_WARN << "fd = " << fd_ << " Channel::handle_event() POLLNVAL" ; } if (revents_ & (POLLERR | POLLNVAL)) { if (errorCallback_) errorCallback_ (); } if (revents_ & (POLLIN | POLLPRI | POLLRDHUP)) { if (readCallback_) readCallback_ (receiveTime); } if (revents_ & POLLOUT) { if (writeCallback_) writeCallback_ (); } eventHandling_ = false ; }
update 更新通道
通过EventLoop对象,传递给Poller对象,然后更新其监听的通道列表中对应通道。支持ADD/MOD操作。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 void Channel::update () { addedToLoop_ = true ; loop_->updateChannel (this ); }void EventLoop::updateChannel (Channel *channel) { assert (channel->ownerLoop () == this ); assertInLoopThread (); poller_->updateChannel (channel); }void PollPoller::updateChannel (Channel *channel) { Poller::assertInLoopThread (); LOG_TRACE << "fd = " << channel->fd () << " events = " << channel->events (); if (channel->index () < 0 ) { assert (channels_.find (channel->fd ()) == channels_.end ()); struct pollfd pfd; pfd.fd = channel->fd (); pfd.events = static_cast <short >(channel->events ()); pfd.revents = 0 ; pollfds_.push_back (pfd); int idx = static_cast <int >(pollfds_.size ()) - 1 ; channel->set_index (idx); channels_[pfd.fd] = channel; } else { assert (channels_.find (channel->fd ()) != channels_.end ()); assert (channels_[channel->fd ()] == channel); int idx = channel->index (); assert (0 <= idx && idx < static_cast <int >(pollfds_.size ())); struct pollfd & pfd = pollfds_[idx]; assert (pfd.fd == channel->fd () || pfd.fd == -channel->fd () - 1 ); pfd.fd = channel->fd (); pfd.events = static_cast <short >(channel->events ()); pfd.revents = 0 ; if (channel->isNoneEvent ()) { pfd.fd = -channel->fd () - 1 ; } } }
remove 移除通道
与update类似,也是通过EventLoop传递给Poller对象,将当前通道从Poller的事件列表中删除。支持DEL操作。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 void Channel::update () { addedToLoop_ = true ; loop_->updateChannel (this ); }void EventLoop::updateChannel (Channel *channel) { assert (channel->ownerLoop () == this ); assertInLoopThread (); poller_->updateChannel (channel); }void PollPoller::removeChannel (Channel *channel) { Poller::assertInLoopThread (); LOG_TRACE << "fd = " << channel->fd (); assert (channels_.find (channel->fd ()) != channels_.end ()); assert (channels_[channel->fd ()] == channel); assert (channel->isNoneEvent ()); int idx = channel->index (); assert (0 <= idx && idx < static_cast <int >(pollfds_.size ())); const struct pollfd & pfd = pollfds_[idx]; (void )pfd; assert (pfd.fd == -channel->fd () - 1 && pfd.events == channel->events ()); size_t n = channels_.erase (channel->fd ()); assert (n == 1 ); (void )n; if (implicit_cast <size_t >(idx) == pollfds_.size () - 1 ) { pollfds_.pop_back (); } else { int channelAtEnd = pollfds_.back ().fd; iter_swap (pollfds_.begin () + idx, pollfds_.end () - 1 ); if (channelAtEnd < 0 ) { channelAtEnd = -channelAtEnd - 1 ; } channels_[channelAtEnd]->set_index (idx); pollfds_.pop_back (); } }
参考
muduo笔记
网络库(三)事件通道Channel