muduo-事件驱动EventLoop
之前说过,muduo网络库处理事件是Reactor模式,one loop per
thread,一个线程一个事件循环。这个循环称为EventLoop,这种以事件为驱动的编程模式,称为事件驱动模式。
这种事件驱动模型要求所有任务是非阻塞的,其典型特点是:
如果一个任务需要很长时间才能完成,或者中间可能导致阻塞,就需要对任务进行分段,将其设置为非阻塞的,每次监听到前次任务完成,触发事件回调,从而接着完成后续任务。例如,要发送一个大文件,可以先发送一段,完成后,在写完成事件回调中又发送下一段,这样每次都发生一段,从而完成整个文件发送。
EventLoop是实现事件驱动模型的关键之一。核心是为线程提供运行循环,不断监听事件、处理事件,为用户提供在loop循环中运行的接口。
EventLoop事件驱动相关类图关系如下:
上面的类图中有两种不同的关系表示:
聚合关系:has-a,表示拥有的关系,两种生命周期没有必然关联,可以独立存在。
组合关系:contain-a,表包含的关系,是一种强聚合关系,强调整体与部分,生命周期一致。
EventLoop类
EventLoop是一个接口类,不宜暴露太多内部细节给客户,接口及其使用应尽量简洁。EventLoop的主要职责是:
提供定时执行用户指定任务的方法,支持一次性、周期执行用户任务;
提供一个运行循环,每当Poller监听到有通道对应事件发生时,会将通道加入激活通道列表,运行循环要不断从取出激活通道,然后调用事件回调处理事件;
每个EventLoop对应一个线程,不允许一对多或者多对一 ,提供判断当前线程是否为创建EventLoop对象的线程的方法;
允许在其他线程中调用EventLoop的public接口,但同时要确保线程安全;
EventLoop类声明:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 class EventLoop : public noncopyable {public : typedef std::function<void ()> Functor; EventLoop (); ~EventLoop (); void loop () ; void quit () ; Timestamp pollReturnTime () const { return pollReturnTime_; } int64_t iterator () const { return iteration_; } void runInLoop (Functor cb) ; void queueInLoop (Functor cb) ; size_t queueSize () const ; TimerId runAt (Timestamp time, TimerCallback cb) ; TimerId runAfter (double delay, TimerCallback cb) ; TimerId runEvery (double interval, TimerCallback cb) ; void cancel (TimerId timerId) ; void wakeup () ; void updateChannel (Channel* channel) ; void removeChannel (Channel* channel) ; bool hasChannel (Channel* channel) ; void assertInLoopThread () ; bool isInLoopThread () const ; bool callingPendingFunctors () const { return callingPendingFunctors_; } bool eventHandling () const { return eventHandling_; } void setContext (const boost::any& context) { context_ = context; } const boost::any& getContext () const { return context_; } boost::any* getMutableContext () { return &context_; } static EventLoop* getEventLoopOfCurrentThread () ;private : void abortNotInLoopThread () ; void handleRead () ; void doPendingFunctors () ; void printActiveChannels () const ; typedef std::vector<Channel*> ChannelList; bool looping_; std::atomic<bool > quit_; bool eventHandling_; bool callingPendingFunctors_; int64_t iteration_; const pid_t threadId_; Timestamp pollReturnTime_; std::unique_ptr<Poller> poller_; std::unique_ptr<TimerQueue> timerQueue_; int wakeupFd_; std::unique_ptr<Channel> wakeupChannel_; boost::any context_; ChannelList activeChannels_; Channel* currentActiveChannel_; mutable MutexLock mutex_; std::vector<Functor> pendingFunctors_ GUARDED_BY (mutex_) ; };
EventLoop不可拷贝,因为与之关联的不仅对象本身,还有线程以及thread
local数据等资源。其功能主要分为下面这几大类:
提供运行循环;
运行定时任务,一次性 or 周期;
处理激活通道事件;
线程安全。
对于1,loop()提供运行循环,quit()退出循环,iterator()查询循环次数,wakeup()用于唤醒loop线程,handleRead()读取唤醒消息;
对于2,runInLoop()在loop线程中“立即”运行一次用户任务,runAt()/runAfter()添加一次性定时任务,runEvery()添加周期定时任务,doPendingFunctors()回调所有的pending函数,vector
pendingFunctors_用于排队待处理函数到loop线程执行,queueSize()获取该vector大小;cancel()取消定时任务。
对于3,updateChannel()/removeChannel()/hasChannel()用于通道更新/移除/判断,vector
activeChannels_存储当前所有激活的通道,currentActiveChannel_存储当前正在处理的激活通道;
对于4,isInLoopThread()/assertInLoopThread()判断/断言
当前线程是创建当前EventLoop对象的线程,互斥锁mutex_用来做互斥访问需要保护数据。
值得一提的是,boost::any类型的成员context_用来给用户提供利用EventLoop传数据的方式,相当于C里面的void*,用户可利用boost::any_cast进行转型。
EventLoop构造函数与析构函数
构造函数要点:
检查当前线程是否已经创建了EventLoop对象,遇到错误就终止程序(LOG_FATAL);
记住本对象所属线程id(threadId_);
析构函数要点:
清除当前线程EventLoop指针,便于下次再创建EventLoop对象。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 __thread EventLoop* t_loopInThisThread = 0 ; EventLoop::EventLoop () : looping_ (false ),threadId_ (CurrentThread::tid ()), { LOG_DEBUG << "EventLoop create " << this << " in thread " << threadId_; if (t_loopInThisThread) { LOG_FATAL << "Another EventLoop " << t_loopInThisThread << " exists in this thread " << threadId_; } else { t_loopInThisThread = this ; } } EventLoop::~EventLoop () { LOG_DEBUG << "EventLoop " << this << " of thread " << threadId_ << " destructs in thread " << CurrentThread::tid (); t_loopInThisThread = NULL ; }
attention
再次强调,一个线程只能有一个EventLoop对象。
这里通过thread
local变量t_loopInThisThread指向创建的EventLoop对象,来确保每个线程只有一个EventLoop对象。同一个线程内,可通过static函数getEventLoopOfCurrentThread,返回该EventLoop对象指针。
1 2 3 4 EventLoop *EventLoop::getEventLoopOfCurrentThread () { return t_loopInThisThread; }
assertInLoopThread
有些成员函数只能在EventLoop对象所在线程调用,EventLoop提供了isInLoopThread()、assertInLoopThread(),分别用于判断、断言
当前线程为创建EventLoop对象线程。
当assertInLoopThread()断言失败时,调用abortNotInLoopThread()终止程序(LOG_FATAL)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 void EventLoop::assertInLoopThread () { if (!isInLoopThread ()) { abortNotInLoopThread (); } }bool EventLoop::isInLoopThread () const { return threadId_ == CurrentThread::tid (); }void EventLoop::abortNotInLoopThread () { LOG_FATAL << "EventLoop::abortNotInLoopThread - EventLoop " << this << " was created in threadId_ = " << threadId_ << ", current thread id = " << CurrentThread::tid (); }
loop循环
提供运行循环,不断监听事件、处理事件。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 void EventLoop::loop () { assert (!looping_); assertInLoopThread (); looping_ = true ; quit_ = false ; LOG_TRACE << "EventLoop " << this << " start looping" ; while (!quit_) { activeChannels_.clear (); pollReturnTime_ = poller_->poll (kPollTimeMs, &activeChannels_); ++iteration_; if (Logger::logLevel () <= Logger::TRACE) { printActiveChannels (); } eventHandling_ = true ; for (Channel* channel : activeChannels_) { currentActiveChannel_ = channel; currentActiveChannel_->handleEvent (pollReturnTime_); } currentActiveChannel_ = NULL ; eventHandling_ = false ; doPendingFunctors (); } LOG_TRACE << "EventLoop " << this << " stop looping" ; looping_ = false ; }
loop线程运行事件回调的关键是,用Poller::poll()将激活事件的通道填入通道列表activeChannels_,然后逐一调用每个通道的handleEvent,从而调用为Channel注册的事件回调来处理事件。
添加、更新、删除通道
loop循环用来处理激活事件,用户可以用updateChannel/removeChannel
更新/移除 Poller 监听的通道。
Poller类详见muduo库-I/O复用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 void EventLoop::updateChannel (Channel *channel) { assert (channel->ownerLoop () == this ); assertInLoopThread (); poller_->updateChannel (channel); }void EventLoop::removeChannel (Channel *channel) { assert (channel->ownerLoop () == this ); assertInLoopThread (); if (eventHandling_) { assert (currentActiveChannel_ == channel || std::find (activeChannels_.begin (), activeChannels_.end (), channel) == activeChannels_.end ()); } poller_->removeChannel (channel); }
另外,可用hasChannel来判断Poller是否正在监听channel。
1 2 3 4 5 6 7 8 9 10 bool EventLoop::hasChannel (Channel *channel) { assert (channel->ownerLoop () == this ); assertInLoopThread (); return poller_->hasChannel (channel); }
定时任务
EventLoop提供了runAt、runAfter、runEvery三个函数,用于在指定时间点、延迟时间、周期性时间运行用户任务。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 TimerId EventLoop::runAt (Timestamp time, TimerCallback cb) { return timerQueue_->addTimer (std::move (cb), time, 0.0 ); }TimerId EventLoop::runAfter (double delay, TimerCallback cb) { Timestamp time (addTime(Timestamp::now(), delay)) ; return runAt (time, std::move (cb)); }TimerId EventLoop::runEvery (double interval, TimerCallback cb) { Timestamp time (addTime(Timestamp::now(), interval)) ; return timerQueue_->addTimer (std::move (cb), time, interval); }void EventLoop::cancel (TimerId timerId) { return timerQueue_->cancel (timerId); }
用户运行一个loop线程,并添加定时任务示例:
1 2 3 4 5 6 7 8 void threadFunc () { assert (EventLoop::getEventLoopOfCurrentThread () == NULL ); EventLoop loop; assert (EventLoop::getEventLoopOfCurrentThread () == &loop); loop.runAfter (1.0 , callback); loop.loop (); }
runInLoop与queueInLoop执行用户任务
同样是运行用户任务函数,runInLoop和queueInLoop都可以被多个线程执行,分为2种情况:1)如果当前线程是创建当前EventLoop对象的线程,那么立即执行用户任务;2)如果不是,那么在loop循环中排队执行(本次循环末尾),实际上这点也是由queueInLoop完成的。
queueInLoop只做了runInLoop的第2)种情况的工作,也就是只会在loop循环中排队执行用户任务。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 void EventLoop::runInLoop (Functor cb) { if (isInLoopThread ()) { cb (); } else { queueInLoop (std::move (cb)); } }void EventLoop::queueInLoop (Functor cb) { { MutexLockGuard lock (mutex_) ; pendingFunctors_.push_back (std::move (cb)); } if (!isInLoopThread () || callingPendingFunctors_) { wakeup (); } }
为什么要对pendingFunctors_加锁?
因为queueInLoop可以被多个线程访问,意味着pendingFunctors_也能被多个线程访问,加锁确保线程安全。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 void EventLoop::runInLoop (Functor cb) { if (isInLoopThread ()) { cb (); } else { queueInLoop (std::move (cb)); } }void EventLoop::queueInLoop (Functor cb) { { MutexLockGuard lock (mutex_) ; pendingFunctors_.push_back (std::move (cb)); } if (!isInLoopThread () || callingPendingFunctors_) { wakeup (); } }
这里有几个问题:
为什么要唤醒 EventLoop?
wakeup 是怎么实现的?
pendingFunctors_是如何被消费的?
为什么要唤醒 EventLoop :
我们首先调用了 pendingFunctors_.push_back(cb), 将该函数放在
pendingFunctors_中。EventLoop::loop 的每一轮循环在最后会调用
doPendingFunctors 依次执行这些函数。
而 EventLoop 的唤醒是通过 Poller::poll()中调用poll()/epoll_wait()
实现的,如果此时该 EventLoop 中迟迟没有事件触发,那么
poll()/epoll_wait() 一直就会阻塞。
这样会导致,pendingFunctors_中的任务迟迟不能被执行了。
所以必须要唤醒 EventLoop
,从而让pendingFunctors_中的任务尽快被执行。
wakeup 是怎么实现的 :
muduo 这里采用了对 eventfd 的读写来实现对 EventLoop 的唤醒。
在 EventLoop 建立之后,就创建一个 eventfd,并将其可读事件注册到
EventLoop 中。
wakeup() 的过程本质上是对这个 eventfd 进行写操作,以触发该 eventfd
的可读事件。这样就起到了唤醒 EventLoop 的作用。
1 2 3 4 5 void EventLoop::wakeup () { uint64_t one = 1 ; sockets::write (wakeupFd_, &one, sizeof one); }
很多库为了兼容 macOS,往往使用 pipe 来实现这个功能。muduo 采用了
eventfd,性能更好些,但代价是不能支持 macOS 了。但是muduo
似乎从一开始的定位就只支持 Linux。
pendingFunctors_是如何被消费的 :
下面讲下 doPendingFunctors 的实现,muduo
是如何处理这些待处理的函数的,以及中间用了哪些优化操作。
代码如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 void EventLoop::doPendingFunctors () { std::vector<Functor> functors; callingPendingFunctors_ = true ; { MutexLockGuard lock (mutex_) ; functors.swap (pendingFunctors_); } for (size_t i = 0 ; i < functors.size (); ++i) { functors[i](); } callingPendingFunctors_ = false ; }
从代码可以看到,函数非常简单。大概只有十行代码,但是这十行代码中却有两个非常巧妙的地方。
callingPendingFunctors_的作用
从代码可以看出,如果 callingPendingFunctors_为
false,则说明此时尚未开始执行 doPendingFunctors 函数。
这个有什么作用呢,我们需要结合下 queueInLoop 中,对是否执行 wakeup()
的判断
1 2 3 4 if (!isInLoopThread () || callingPendingFunctors_) { wakeup (); }
这里还需要结合下 EventLoop 循环的实现,其中 doPendingFunctors() 是
每轮循环的最后一步处理。
如果调用 queueInLoop 和 EventLoop 在同一个线程,且
callingPendingFunctors_为 false 时,则说明:此时尚未执行到
doPendingFunctors()。
那么此时即使不用 wakeup,也可以在之后照旧执行 doPendingFunctors()
了。
这么做的好处非常明显,可以减少对 eventfd 的 IO 读写。
锁范围的减小
在此函数中,有一段特别的代码:
1 2 3 4 5 std::vector<Functor> functors; { MutexLockGuard lock (mutex_) ; functors.swap (pendingFunctors_); }
这个作用是 pendingFunctors_和 functors 的内容进行交换,实际上就是此时
functors 持有了 pendingFunctors_的内容,而
pendingFunctors_被清空了。
这个好处是什么呢? 如果不这么做,直接遍历 pendingFunctors_,
然后处理对应的函数。这样的话,锁会一直等到所有函数处理完才会被释放。在此期间,queueInLoop
将不可用。
而以上的写法,可以极大减小锁范围,整个锁的持有时间就是 swap
那一下的时间。待处理函数执行的时候,其他线程还是可以继续调用
queueInLoop。
eventfd唤醒线程
eventfd是Linux(Linux
2.6以后)特有的,专用于事件通知的机制,类似于管道(pipe)、域套接字(UNIX
Domain Socket)。
创建eventfd 函数原型:
1 2 3 4 5 6 7 8 9 10 #include <sys/eventfd.h> int eventfd (unsigned int initval, int flags) ;
创建完event fd后,可用read()读取event
fd,如果fd是阻塞的,read可能阻塞线程;如果event
fd设置了EFD_NONBLOCK,read返回EAGIAN错误。直到另外一个线程对event
fd进行write。
1 2 3 4 5 wakeupChannel_->setReadCallback (std::bind (&EventLoop::handleRead, this )); wakeupChannel_->enableReading ();
eventfd使用示例:
线程1阻塞等待,线程2唤醒线程1。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 #include <stdio.h> #include <stdlib.h> #include <sys/eventfd.h> #include <unistd.h> #include <pthread.h> #define __STDC_FORMAT_MACROS #include <inttypes.h> void * thread_func1 (void * arg) { int wakeupfd = *(int *)arg; printf ("thread_func1 start\n" ); uint64_t rdata; int ret = read (wakeupfd, &rdata, sizeof (rdata)); if (ret < 0 ) { perror ("thread_func1 read error" ); pthread_exit (NULL ); } printf ("thread_func1 success to be waked up, rdata = %" PRId64 "\n" , rdata); }void * thread_func2 (void * arg) { int wakeupfd = *(int *)arg; printf ("thread_func2 ready to sleep 1 sec\n" ); sleep (1 ); uint64_t wdata = 10 ; int ret = write (wakeupfd, &wdata, sizeof (wdata)); if (ret < 0 ) { perror ("thread_func2 write error" ); pthread_exit (NULL ); } printf ("thread_func2 success to wake up another thread, wdata = %" PRId64 "\n" , wdata); }int main () { int evfd = eventfd (0 , 0 ); if (evfd < 0 ) { perror ("eventfd error" ); exit (1 ); } pthread_t th1, th2; pthread_create (&th1, NULL , thread_func1, (void *)&evfd); pthread_create (&th2, NULL , thread_func2, (void *)&evfd); pthread_join (th1, NULL ); pthread_join (th2, NULL ); return 0 ; }
EventLoop使用eventfd唤醒loop线程
创建event fd
构造函数中,wakeupFd_ 初值为createEventfd()
1 2 3 4 5 6 7 8 9 10 int createEventfd () { int evtfd = ::eventfd (0 , EFD_NONBLOCK | EFD_CLOEXEC); if (evtfd < 0 ) { LOG_SYSERR << "Failed in eventfd" ; abort (); } return evtfd; }
绑定event fd与唤醒通道wakeupChannel_,利用event
fd构造一个Channel对象后,传递给wakeupChannel_,便于Poller监听、事件回调
1 2 3 4 5 wakeupChannel_->setReadCallback (std::bind (&EventLoop::handleRead, this )); wakeupChannel_->enableReading ();
启动loop循环,可能阻塞在poll()/epoll_wait()
其他线程通过queueInLoop()调用wakeup(),唤醒阻塞的loop线程
1 2 3 4 5 6 7 8 9 10 11 12 13 void EventLoop::wakeup () { uint64_t one = 1 ; ssize_t n = sockets::write (wakeupFd_, &one, sizeof (one)); if (n != sizeof (one)) { LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8" ; } }
loop线程被唤醒后,读取event fd
1 2 3 4 5 6 7 8 9 10 11 12 13 void EventLoop::handleRead () { uint64_t one = 1 ; ssize_t n = sockets::read (wakeupFd_, &one, sizeof (one)); if (n != sizeof (one)) { LOG_ERROR << "EventLoop::handleRead() reads " << n << " bytes instead of 8" ; } }
参考