muduo库-TimerQueue定时器队列
网络编程中,有一类非常重要的事件,跟IO事件没有直接联系,而是内部产生的事件,即定时事件。
muduo网络库中的定时功能是如何实现的呢?
传统的Reactor通过控制select和poll的等待时间,来实现定时,而Linux中,可以用timerfd来实现。前面讲过,timerfd是Linux特有的定时器,能有效融入select/poll/epoll框架,来做超时事件处理。
timerfd简要介绍
timerfd的特点是有一个与之关联fd,可绑定Channel,交由Poller监听感兴趣的事件(读、写等)。
timerfd 3个接口:
timerfd_create,timerfd_settime,timerfd_gettime。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 #include <sys/timerfd.h> int timerfd_create (int clockid, int flags) ;int timerfd_settime (int fd, int flags, const struct itimerspec *new_value, struct itimerspec *old_value) ;int timerfd_gettime (int fd, struct itimerspec *curr_value) ;
定时功能相关类
muduo库中,定时相关功能由3个class实现:TimerID、Timer、TimerQueue。用户可见的只有TimerId。
Timestamp类是时间戳类,用来保存超时时刻(精确到1us),保存的是UTC时间,即从
Unix Epoch(1970-01-01 00:00:00)到指定时间的微秒数。
Timer类对应一个超时任务,保存了超时时刻Timestamp,超时回调函数,以及超时任务类型(一次
or 周期)。
TimerId类用于保存Timer对象,以及独一无二的id。
TimerQueue类用于设置所有超时任务(Timer),需要高效组织尚未到期的Timer,快速查找已到期Timer,以及高效添加和删除Timer。TimerQueue用std::set存储
,set会对Timer按到期时间先后顺序进行二叉搜索树排序,时间复杂度O(logN)。
TimerQueue的定时接口并不是直接暴露给库的使用者的,而是通过EventLoop的runAfter和runEvery来运行用户任务的。其中,runAfter延迟固定秒数后运行一次指定用户任务;runEvery延迟固定秒数后运行用户任务,后续以指定周期运行用户任务。
TimerQueue回调用户代码onTimer()的时序:
时序图里的TimerQueue获取超时Timer(getExpired())后,User及onTimer()是指用户自定义的超时处理函数,并非库本身的。
与普通Channel事件一样,超时任务TimerQueue也会使用一个Channel,专门用于绑定timerfd,交由Poller监听,发生可读事件(代表超时)后加入激活通道列表,然后EventLoop::loop()逐个Channel调用对应的回调,从而处理超时事件。
note
一个EventLoop只持有一个TimerQueue对象,而TimerQueue通过std::set持有多个Timer对象,但只会设置一个Channel。
Timer类
Timer类代表一个超时任务,但并不直接绑定Channel。Timer主要包含超时时刻(expiration_),超时回调(callback_),周期时间值(interval_),全局唯一id(sequence_)。
其声明如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 class Timer : noncopyable {public : Timer (TimerCallback cb, Timestamp when, double interval) : callback_ (std::move (cb)), expiration_ (when), interval_ (interval), repeat_ (interval > 0.0 ), sequence_ (s_numCreated_.incrementAndGet ()) { } void run () const { callback_ (); } Timestamp expiration () const { return expiration_; } bool repeat () const { return repeat_; } int64_t sequence () const { return sequence_; } void restart (Timestamp now) ; static int64_t numCreated () { return s_numCreated_.get (); }private : const TimerCallback callback_; Timestamp expiration_; const double interval_; const bool repeat_; const int64_t sequence_; static AtomicInt64 s_numCreated_; };
每当创建一个新Timer对象时,原子变量s_numCreated_就会自增1,作为全剧唯一序列号sequence_,用来标识该Timer对象。
周期Timer
创建Timer时,超时时刻when决定了回调超时事件时间点,而interval决定了Timer是一次性的,还是周期性的。如果是周期性的,会在TimerQueue::reset中,调用Timer::restart,在当前时间点基础上,重启定时器。
restart函数
restart重启Timer,根据Timer是否为周期类型,分为两种情况:
周期Timer,restart将重置超时时刻expiration_为当前时间 +
周期间隔时间;
非周期Timer,即一次性Timer,将restart将expiration_置为无效时间(默认自UTC
Epoch以来的微妙数为0);
1 2 3 4 5 6 7 8 9 10 11 void Timer::restart (Timestamp now) { if (repeat_) { expiration_ = addTime (now, interval_); } else { expiration_ = Timestamp::invalid (); } }
TimerId类
TimerId来主要用来作为Timer的唯一标识,用于取消(canceling)Timer。
TimerId类声明如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 class TimerId : public muduo::copyable {public : TimerId () : timer_ (NULL ), sequence_ (0 ) { } TimerId (Timer* timer, int64_t seq) : timer_ (timer), sequence_ (seq) { } friend class TimerQueue ;private : Timer* timer_; int64_t sequence_; };
注意:TimerId并不直接生成Timer序列号sequence_,这是由Timer来生成的,通过构造函数传递给TimerId。而生成Timer标识的方式,在Timer类介绍中也提到过,只需要创建一个Timer对象即可,然后通过Timer::sequence()方法就可以取得该序列号。
TimerQueue类
定时器队列TimerQueue是定时功能的核心,由所在EventLoop持有,绑定一个Channel,同时维护多个定时任务(Timer)。为用户(EventLoop)提供添加定时器(addTimer)、取消定时器(cancel)接口。
同样是定时,TimerQueue与Timer有什么区别?
TimerQueue包含2个Timer集合:
timers_定时器集合:包含用户添加的所有Timer对象,std::set会用AVL搜索树,对集合元素按时间戳(Timestamp)从小到大顺序;
activeTimers_激活定时器集合:包含激活的Timer对象,与timers_包含的Timer对象相同,个数也相同,std::set会根据Timer*指针大小,对元素进行排序;
cancelingTimers_取消定时器集合:包含所有取消的Timer对象,与activeTimers_相对。
注意:timers_和activeTimers_的类型并不相同,只是包含的Timer*相同。cancelingTimers_和activeTimers_的类型相同。
这也是TimerQueue并非Timer的原因,TimerQueue是一个Timer集合,根据其时间戳大小进行排序,更像是一个队列,先到期的先触发超时事件。因此,可称为Timer队列,即TimerQueue 。
调用TimerQueue::addTimer的,只有EventLoop中这3个函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 TimerId EventLoop::runAt (Timestamp time, TimerCallback cb) { return timerQueue_->addTimer (std::move (cb), time, 0.0 ); }TimerId EventLoop::runAfter (double delay, TimerCallback cb) { Timestamp time (addTime(Timestamp::now(), delay)) ; return runAt (time, std::move (cb)); }TimerId EventLoop::runEvery (double interval, TimerCallback cb) { Timestamp time (addTime(Timestamp::now(), interval)) ; return timerQueue_->addTimer (std::move (cb), time, interval); }
下面是TimerQueue中,3个集合相关的类型及成员定义:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 typedef std::pair<Timestamp, Timer*> Entry;typedef std::set<Entry> TimerList;typedef std::pair<Timer*, int64_t > ActiveTimer;typedef std::set<ActiveTimer> ActiveTimerSet; TimerList timers_; ActiveTimerSet activeTimers_;bool callingExpiredTimers_; ActiveTimerSet cancelingTimers_;
下面是TimerQueue中,3个集合相关的类型及成员定义:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 typedef std::pair<Timestamp, Timer*> Entry;typedef std::set<Entry> TimerList;typedef std::pair<Timer*, int64_t > ActiveTimer;typedef std::set<ActiveTimer> ActiveTimerSet; TimerList timers_; ActiveTimerSet activeTimers_;bool callingExpiredTimers_; ActiveTimerSet cancelingTimers_;
TimerQueue声明
除了前面提到的3个集合相关类型及成员,其他成员函数和变量声明如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 class TimerQueue : noncopyable {public : explicit TimerQueue (EventLoop* loop) ; ~TimerQueue (); TimerId addTimer (TimerCallback cb, Timestamp when, double interval) ; void cancel (TimerId) ;private : ... void addTimerInLoop (Timer* timer) ; void cancelInLoop (TimerId timerId) ; void handleRead () ; std::vector<Entry> getExpired (Timestamp now) ; void reset (const std::vector<Entry>& expired, Timestamp now) ; bool insert (Timer* timer) ; EventLoop* loop_; const int timerfd_; Channel timerfdChannel_; ... }
TimerQueue所属EventLoop对象,通过一个EventLoop*来传递,注意这是一个raw
pointer,而非smart
pointer。EventLoop对象与TimerQueue对象生命周期相同,而且只会通过EventLoop对象来调用TimerQueue对象方法,因此不存在与之相关的内存泄漏或非法访问的问题。
TimerQueue构造函数
1 2 3 4 5 6 7 8 9 10 11 TimerQueue::TimerQueue (EventLoop *loop) : loop_ (loop),timerfd_ (createTimerfd ()),timerfdChannel_ (loop, timerfd_),timers_ (),callingExpiredTimers_ (false ) { timerfdChannel_.setReadCallback (std::bind (&TimerQueue::handleRead, this )); timerfdChannel_.enableReading (); }
构造TimerQueue对象时,就会绑定TimerQueue所属EventLoop,即创建TimerQueue的EventLoop对象。
另外,调用Channel::enableReading(),会将通道事件加入Poller的监听通道列表中。
交给Poller监听的timerfd,是由createTimerfd创建的:
1 2 3 4 5 6 7 8 9 10 int createTimerfd () { int timerfd = ::timerfd_create (CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC); if (timerfd < 0 ) { LOG_SYSFATAL << "Failed in timerfd_create" ; } return timerfd; }
构造TimerQueue对象时,就会绑定TimerQueue所属EventLoop,即创建TimerQueue的EventLoop对象。
另外,调用Channel::enableReading(),会将通道事件加入Poller的监听通道列表中。
交给Poller监听的timerfd,是由createTimerfd创建的:
1 2 3 4 5 6 7 8 9 10 int createTimerfd () { int timerfd = ::timerfd_create (CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC); if (timerfd < 0 ) { LOG_SYSFATAL << "Failed in timerfd_create" ; } return timerfd; }
TimerQueue构造函数
1 2 3 4 5 6 7 8 9 10 11 TimerQueue::TimerQueue (EventLoop *loop) : loop_ (loop),timerfd_ (createTimerfd ()),timerfdChannel_ (loop, timerfd_),timers_ (),callingExpiredTimers_ (false ) { timerfdChannel_.setReadCallback (std::bind (&TimerQueue::handleRead, this )); timerfdChannel_.enableReading (); }
构造TimerQueue对象时,就会绑定TimerQueue所属EventLoop,即创建TimerQueue的EventLoop对象。
另外,调用Channel::enableReading(),会将通道事件加入Poller的监听通道列表中。
交给Poller监听的timerfd,是由createTimerfd创建的:
1 2 3 4 5 6 7 8 9 10 int createTimerfd () { int timerfd = ::timerfd_create (CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC); if (timerfd < 0 ) { LOG_SYSFATAL << "Failed in timerfd_create" ; } return timerfd; }
TimerQueue析构
析构有2点需要注意:
在remove绑定的通道前,要先disableAll停止监听所有通道事件;
timers_中Timer对象是在TimerQueue::addTimer中new出来的,需要手动delete;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 TimerQueue::~TimerQueue () { timerfdChannel_.disableAll (); timerfdChannel_.remove (); ::close (timerfd_); for (const Entry& timer : timers_) { delete timer.second; } }
TimerQueue重要接口
addTimer 添加定时器
注意到addTimer
会在构造一个Timer对象后,将其添加到timers_的工作转交给addTimerInLoop完成了。这是为什么?
因为调用EventLoop::runAt/runEvery的线程,可能并非TimerQueue的loop线程,而修改TimerQueue数据成员时,必须在所属loop线程中进行,因此需要通过loop_->runInLoop将工作转交给所属loop线程。
runInLoop:如果当前线程是所属loop线程,则直接运行函数;如果不是,就排队到所属loop线程末尾,等待运行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 TimerId TimerQueue::addTimer (TimerCallback cb, Timestamp when, double interval) { Timer* timer = new Timer (std::move (cb), when, interval); loop_->runInLoop (std::bind (&TimerQueue::addTimerInLoop, this , timer)); return TimerId (timer, timer->sequence ()); }void TimerQueue::addTimerInLoop (Timer *timer) { loop_->assertInLoopThread (); bool earliestChanged = insert (timer); if (earliestChanged) { resetTimerfd (timerfd_, timer->expiration ()); } }
addTimerInLoop的主要工作由2个函数来完成:insert,resetTimerfd。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 bool TimerQueue::insert (Timer *timer) { loop_->assertInLoopThread (); assert (timers_.size () == activeTimers_.size ()); bool earliestChanged = false ; Timestamp when = timer->expiration (); TimerList::iterator it = timers_.begin (); if (it == timers_.end () || when < it->first) { earliestChanged = true ; } { std::pair<TimerList::iterator, bool > result = timers_.insert (Entry (when, timer)); assert (result.second); (void )result; } { std::pair<ActiveTimerSet::iterator, bool > result = activeTimers_.insert (ActiveTimer (timer, timer->sequence ())); assert (result.second); (void )result; } assert (timers_.size () == activeTimers_.size ()); return earliestChanged; }
cancel 取消定时器
一个已超时的定时器,会通过TimerQueue::getExpired自动清除,但一个尚未到期的定时器如何取消?
可以通过调用TimerQueue::cancel。类似于addTimer,cancel也可能在别的线程被调用,因此需要将其转交给cancelInLoop执行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 void TimerQueue::cancel (TimerId timerId) { loop_->runInLoop ( std::bind (&TimerQueue::cancelInLoop, this , timerId)); }void TimerQueue::cancelInLoop (TimerId timerId) { loop_->assertInLoopThread (); assert (timers_.size () == activeTimers_.size ()); ActiveTimer timer (timerId.timer_, timerId.sequence_) ; ActiveTimerSet::const_iterator it = activeTimers_.find (timer); if (it != activeTimers_.end ()) { size_t n = timers_.erase (Entry (it->first->expiration (), it->first)); assert (n == 1 ); (void )n; delete it->first; activeTimers_.erase (it); } else if (callingExpiredTimers_) { cancelingTimers_.insert (timer); } assert (timers_.size () == activeTimers_.size ()); }
handleRead处理TimerQueue上所有超时任务
handleRead有几个要点:
必须在所在loop线程运行;
可能不止一个定时任务超时,可用getExpired()获取;
所有超时任务执行完后,重置周期定时任务,释放一次性定时任务;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 void TimerQueue::handleRead () { loop_->assertInLoopThread (); Timestamp now (Timestamp::now()) ; readTimerfd (timerfd_, now); std::vector<Entry> expired = getExpired (now); callingExpiredTimers_ = true ; cancelingTimers_.clear (); for (const Entry& it : expired) { it.second->run (); } callingExpiredTimers_ = false ; reset (expired, now); }
getExpired以参数时间点now为界限,查找set
timers_中所有超时定时任务(Timer)。set会对timers_元素进行排序,std::set::lower_bound()会找到第一个时间点
< now时间点的定时任务。
getExpired调用reset重置所有超时的周期定时任务,释放超时的一次性任务。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 std::vector<TimerQueue::Entry> TimerQueue::getExpired (Timestamp now) { assert (timers_.size () == activeTimers_.size ()); std::vector<Entry> expired; Entry sentry (now, reinterpret_cast <Timer*>(UINTPTR_MAX)) ; TimerList::iterator end = timers_.lower_bound (sentry); assert (end == timers_.end () || now < end->first); std::copy (timers_.begin (), end, back_inserter (expired)); timers_.erase (timers_.begin (), end); for (const Entry& it : expired) { ActiveTimer timer (it.second, it.second->sequence()) ; size_t n = activeTimers_.erase (timer); assert (n == 1 ); (void )n; } assert (timers_.size () == activeTimers_.size ()); return expired; }void TimerQueue::reset (const std::vector<Entry> &expired, Timestamp now) { Timestamp nextExpire; for (const Entry& it : expired) { ActiveTimer timer (it.second, it.second->sequence()) ; if (it.second->repeat () && cancelingTimers_.find (timer) == cancelingTimers_.end ()) { it.second->restart (now); insert (it.second); } else { delete it.second; } } if (!timers_.empty ()) { nextExpire = timers_.begin ()->second->expiration (); } if (nextExpire.valid ()) { resetTimerfd (timerfd_, nextExpire); } }
参考
转载自muduo笔记
网络库(四)TimerQueue定时器队列