muduo-事件驱动EventLoop

muduo-事件驱动EventLoop

之前说过,muduo网络库处理事件是Reactor模式,one loop per thread,一个线程一个事件循环。这个循环称为EventLoop,这种以事件为驱动的编程模式,称为事件驱动模式。

这种事件驱动模型要求所有任务是非阻塞的,其典型特点是: 如果一个任务需要很长时间才能完成,或者中间可能导致阻塞,就需要对任务进行分段,将其设置为非阻塞的,每次监听到前次任务完成,触发事件回调,从而接着完成后续任务。例如,要发送一个大文件,可以先发送一段,完成后,在写完成事件回调中又发送下一段,这样每次都发生一段,从而完成整个文件发送。

EventLoop是实现事件驱动模型的关键之一。核心是为线程提供运行循环,不断监听事件、处理事件,为用户提供在loop循环中运行的接口。

EventLoop事件驱动相关类图关系如下:

上面的类图中有两种不同的关系表示:

聚合关系:has-a,表示拥有的关系,两种生命周期没有必然关联,可以独立存在。

组合关系:contain-a,表包含的关系,是一种强聚合关系,强调整体与部分,生命周期一致。

EventLoop类

EventLoop是一个接口类,不宜暴露太多内部细节给客户,接口及其使用应尽量简洁。EventLoop的主要职责是:

  1. 提供定时执行用户指定任务的方法,支持一次性、周期执行用户任务;
  2. 提供一个运行循环,每当Poller监听到有通道对应事件发生时,会将通道加入激活通道列表,运行循环要不断从取出激活通道,然后调用事件回调处理事件;
  3. 每个EventLoop对应一个线程,不允许一对多或者多对一,提供判断当前线程是否为创建EventLoop对象的线程的方法;
  4. 允许在其他线程中调用EventLoop的public接口,但同时要确保线程安全;

EventLoop类声明:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
/**
* Reactor模式, 每个线程最多一个EventLoop (One loop per thread).
* 接口类, 不要暴露太多细节给客户
*/
class EventLoop : public noncopyable
{
public:
typedef std::function<void()> Functor;

EventLoop();
~EventLoop(); // force out-line dtor, for std::unique_ptr members.

/* loop循环, 运行一个死循环.
* 必须在当前对象的创建线程中运行.
*/
void loop();

/*
* 退出loop循环.
* 如果通过原始指针(raw pointer)调用, 不是100%线程安全;
* 为了100%安全, 最好通过shared_ptr<EventLoop>调用
*/
void quit();

/*
* Poller::poll()返回的时间, 通常意味着有数据达到.
* 对于PollPoller, 是调用完poll(); 对于EPollPoller, 是调用完epoll_wait()
*/
Timestamp pollReturnTime() const { return pollReturnTime_; }

/* 获取loop循环次数 */
int64_t iterator() const { return iteration_; }

/*
* 在loop线程中, 立即运行回调cb.
* 如果没在loop线程, 就会唤醒loop, (排队)运行回调cb.
* 如果用户在同一个loop线程, cb会在该函数内运行; 否则, 会在loop线程中排队运行.
* 因此, 在其他线程中调用该函数是安全的.
*/
void runInLoop(Functor cb);

/* 排队回调cb进loop线程.
* 回调cb在loop中完成polling后运行.
* 从其他线程调用是安全的.
*/
void queueInLoop(Functor cb);

/* 排队的回调cb个数 */
size_t queueSize() const;

// timers

/*
* 在指定时间点运行回调cb.
* 从其他线程调用安全.
*/
TimerId runAt(Timestamp time, TimerCallback cb);

/*
* 在当前时间点+delay延时后运行回调cb.
* 从其他线程调用安全.
*/
TimerId runAfter(double delay, TimerCallback cb);

/*
* 每隔interval sec周期运行回调cb.
* 从其他线程调用安全.
*/
TimerId runEvery(double interval, TimerCallback cb);

/*
* 取消定时器, timerId唯一标识定时器Timer
* 从其他线程调用安全.
*/
void cancel(TimerId timerId);

// internal usage

/* 唤醒loop线程, 没有事件就绪时, loop线程可能阻塞在poll()/epoll_wait() */
void wakeup();
/* 更新Poller监听的channel, 只能在channel所属loop线程中调用 */
void updateChannel(Channel* channel);
/* 移除Poller监听的channel, 只能在channel所属loop线程中调用 */
void removeChannel(Channel* channel);
/* 判断Poller是否正在监听channel, 只能在channel所属loop线程中调用 */
bool hasChannel(Channel* channel);

// pid_t threadId() const { return threadId_; }
/* 断言当前线程是创建当前对象的线程, 如果不是就终止程序(LOG_FATAL) */
void assertInLoopThread();
/* 判断前线程是否创建当前对象的线程.
* threadId_是创建当前EventLoop对象时, 记录的线程tid
*/
bool isInLoopThread() const;
/*
* 判断是否有待调用的回调函数(pending functor).
* 由其他线程调用runAt/runAfter/runEvery, 会导致回调入队列待调用.
*/
bool callingPendingFunctors() const
{ return callingPendingFunctors_; }

/*
* 判断loop线程是否正在处理事件, 执行事件回调.
* loop线程正在遍历,执行激活channels时, eventHandling_会置位; 其余时候, 会清除.
*/
bool eventHandling() const
{ return eventHandling_; }
/* context_ 用于应用程序传参, 由网络库用户定义数据 */
void setContext(const boost::any& context)
{ context_ = context; }
const boost::any& getContext() const
{ return context_; }
boost::any* getMutableContext()
{ return &context_; }

/* 获取当前线程的EventLoop对象指针 */
static EventLoop* getEventLoopOfCurrentThread();

private:
/* 终止程序(LOG_FATAL), 当前线程不是创建当前EventLoop对象的线程时,
* 由assertInLoopThread()调用 */
void abortNotInLoopThread();
/* 唤醒所属loop线程, 也是wakeupFd_的事件回调 */
void handleRead(); // waked up
/* 处理pending函数 */
void doPendingFunctors();
/* 打印激活通道的事件信息, 用于debug */
void printActiveChannels() const; // DEBUG

typedef std::vector<Channel*> ChannelList;

bool looping_; /* atomic, true表示loop循环执行中 */
std::atomic<bool> quit_; /* loop循环退出条件 */
bool eventHandling_; /* atomic, true表示loop循环正在处理事件回调 */
bool callingPendingFunctors_; /* atomic, true表示loop循环正在调用pending函数 */
int64_t iteration_; /* loop迭代次数 */
const pid_t threadId_; /* 线程id, 对象构造时初始化 */
Timestamp pollReturnTime_; /* poll()返回时间点 */
std::unique_ptr<Poller> poller_; /* 轮询器, 用于监听事件 */
std::unique_ptr<TimerQueue> timerQueue_; /* 定时器队列 */
int wakeupFd_; /* 唤醒loop线程的eventfd */
/* 用于唤醒loop线程的channel.
* 不像TimerQueue是内部类, 不应该暴露Channel给客户. */
std::unique_ptr<Channel> wakeupChannel_;
boost::any context_; /* 用于应用程序通过当前对象传参的变量, 由用户定义数据 */

/* 临时辅助变量 */
ChannelList activeChannels_; /* 激活事件的通道列表 */
Channel* currentActiveChannel_; /* 当前激活的通道, 即正在调用事件回调的通道 */

mutable MutexLock mutex_;
/* 待调用函数列表, 存放不在loop线程的其他线程调用 runAt/runAfter/runEvery, 而要运行的函数 */
std::vector<Functor> pendingFunctors_ GUARDED_BY(mutex_);
};

EventLoop不可拷贝,因为与之关联的不仅对象本身,还有线程以及thread local数据等资源。其功能主要分为下面这几大类:

  1. 提供运行循环;
  2. 运行定时任务,一次性 or 周期;
  3. 处理激活通道事件;
  4. 线程安全。
  • 对于1,loop()提供运行循环,quit()退出循环,iterator()查询循环次数,wakeup()用于唤醒loop线程,handleRead()读取唤醒消息;
  • 对于2,runInLoop()在loop线程中“立即”运行一次用户任务,runAt()/runAfter()添加一次性定时任务,runEvery()添加周期定时任务,doPendingFunctors()回调所有的pending函数,vector pendingFunctors_用于排队待处理函数到loop线程执行,queueSize()获取该vector大小;cancel()取消定时任务。
  • 对于3,updateChannel()/removeChannel()/hasChannel()用于通道更新/移除/判断,vector activeChannels_存储当前所有激活的通道,currentActiveChannel_存储当前正在处理的激活通道;
  • 对于4,isInLoopThread()/assertInLoopThread()判断/断言 当前线程是创建当前EventLoop对象的线程,互斥锁mutex_用来做互斥访问需要保护数据。

值得一提的是,boost::any类型的成员context_用来给用户提供利用EventLoop传数据的方式,相当于C里面的void*,用户可利用boost::any_cast进行转型。

EventLoop构造函数与析构函数

构造函数要点:

  • 检查当前线程是否已经创建了EventLoop对象,遇到错误就终止程序(LOG_FATAL);
  • 记住本对象所属线程id(threadId_);

析构函数要点:

  • 清除当前线程EventLoop指针,便于下次再创建EventLoop对象。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
__thread EventLoop* t_loopInThisThread = 0; // thread local变量, 指向当前线程创建的EventLoop对象

EventLoop::EventLoop()
: looping_(false),
threadId_(CurrentThread::tid()),
{
LOG_DEBUG << "EventLoop create " << this << " in thread " << threadId_;
if (t_loopInThisThread) // 当前线程已经包含了EventLoop对象
{
LOG_FATAL << "Another EventLoop " << t_loopInThisThread
<< " exists in this thread " << threadId_;
}
else // 当前线程尚未包含EventLoop对象
{
t_loopInThisThread = this;
}
}

EventLoop::~EventLoop()
{
LOG_DEBUG << "EventLoop " << this << " of thread " << threadId_
<< " destructs in thread " << CurrentThread::tid();
t_loopInThisThread = NULL;
}

attention

再次强调,一个线程只能有一个EventLoop对象。

这里通过thread local变量t_loopInThisThread指向创建的EventLoop对象,来确保每个线程只有一个EventLoop对象。同一个线程内,可通过static函数getEventLoopOfCurrentThread,返回该EventLoop对象指针。

1
2
3
4
EventLoop *EventLoop::getEventLoopOfCurrentThread() // static
{
return t_loopInThisThread;
}

assertInLoopThread

有些成员函数只能在EventLoop对象所在线程调用,EventLoop提供了isInLoopThread()、assertInLoopThread(),分别用于判断、断言 当前线程为创建EventLoop对象线程。

当assertInLoopThread()断言失败时,调用abortNotInLoopThread()终止程序(LOG_FATAL)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
void EventLoop::assertInLoopThread() // 断言当前线程(tid())是调用当前EventLoop对象的持有者线程(threadId_)
{
if (!isInLoopThread())
{
abortNotInLoopThread(); // 断言失败则终止程序
}
}

bool EventLoop::isInLoopThread() const // 判断当前线程是否为当前EventLoop对象的持有者线程
{ return threadId_ == CurrentThread::tid(); }

void EventLoop::abortNotInLoopThread() // LOG_FATAL 终止程序
{
LOG_FATAL << "EventLoop::abortNotInLoopThread - EventLoop " << this
<< " was created in threadId_ = " << threadId_
<< ", current thread id = " << CurrentThread::tid();
}

loop循环

提供运行循环,不断监听事件、处理事件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
/**
* 真正的工作循环.
* 获得所有当前激活事件的通道,用Poller->poll()填到activeChannels_,
* 然后调用Channel::handleEvent()处理每个激活通道.
*
* 最后排队运行所有pending函数, 通常是其他线程通过loop来调用运行用户任务
*/
void EventLoop::loop()
{
assert(!looping_); // to avoid reduplicate loop
assertInLoopThread(); // to avoid new EventLoop() and loop() are not one thread
looping_ = true;
quit_ = false; // FIXME: what if someone calls quit() before loop() ?
LOG_TRACE << "EventLoop " << this << " start looping";

while (!quit_)
{
activeChannels_.clear(); // 清除激活事件的通道列表
// 监听所有通道, 可能阻塞线程, 所有激活事件对应通道会填入activeChannels_
pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
++iteration_; // 循环次数+1
if (Logger::logLevel() <= Logger::TRACE)
{
printActiveChannels();
}
// TODO sort channel by priority
// 处理所有激活事件

eventHandling_ = true;
for (Channel* channel : activeChannels_)
{
currentActiveChannel_ = channel;
// 通过Channel::handleEvent回调事件处理函数
currentActiveChannel_->handleEvent(pollReturnTime_);
}
currentActiveChannel_ = NULL;
eventHandling_ = false;

// 运行pending函数, 由其他线程请求调用的用户任务
doPendingFunctors();
}

LOG_TRACE << "EventLoop " << this << " stop looping";
looping_ = false;
}

loop线程运行事件回调的关键是,用Poller::poll()将激活事件的通道填入通道列表activeChannels_,然后逐一调用每个通道的handleEvent,从而调用为Channel注册的事件回调来处理事件。

添加、更新、删除通道

loop循环用来处理激活事件,用户可以用updateChannel/removeChannel 更新/移除 Poller 监听的通道。

Poller类详见muduo库-I/O复用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/**
* 根据具体poller对象, 来更新通道.
* 会修改poller对象监听的通道数组.
* @note 必须在channel所属loop线程运行
*/
void EventLoop::updateChannel(Channel *channel)
{
assert(channel->ownerLoop() == this);
assertInLoopThread();
poller_->updateChannel(channel);
}

/**
* 根据具体poller对象, 来删除通道.
* 会删除poller对象监听的通道数组.
* @note 如果待移除通道正在激活事件队列, 应该先从激活事件队列中移除
*/
void EventLoop::removeChannel(Channel *channel)
{
assert(channel->ownerLoop() == this);
assertInLoopThread();
if (eventHandling_)
{
assert(currentActiveChannel_ == channel ||
std::find(activeChannels_.begin(), activeChannels_.end(), channel) == activeChannels_.end());
}
poller_->removeChannel(channel);
}

另外,可用hasChannel来判断Poller是否正在监听channel。

1
2
3
4
5
6
7
8
9
10
/**
* 判断poller是否正在监听通道channel
* @note 必须在channel所属loop线程运行
*/
bool EventLoop::hasChannel(Channel *channel)
{
assert(channel->ownerLoop() == this);
assertInLoopThread();
return poller_->hasChannel(channel);
}

定时任务

EventLoop提供了runAt、runAfter、runEvery三个函数,用于在指定时间点、延迟时间、周期性时间运行用户任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
/**
* 定时功能,由用户指定绝对时间
* @details 每为定时器队列timerQueue添加一个Timer,
* timerQueue内部就会新建一个Timer对象, TimerId就保含了这个对象的唯一标识(序列号)
* @param time 时间戳对象, 单位1us
* @param cb 超时回调函数. 当前时间超过time代表时间时, EventLoop就会调用cb
* @return 一个绑定timerQueue内部新增的Timer对象的TimerId对象, 用来唯一标识该Timer对象
*/
TimerId EventLoop::runAt(Timestamp time, TimerCallback cb)
{
return timerQueue_->addTimer(std::move(cb), time, 0.0);
}

/**
* 定时功能, 由用户相对时间, 通过runAt实现
* @param delay 相对时间, 单位s, 精度1us(小数)
* @param cb 超时回调
*/
TimerId EventLoop::runAfter(double delay, TimerCallback cb)
{
Timestamp time(addTime(Timestamp::now(), delay));
return runAt(time, std::move(cb));
}

/**
* 定时功能, 由用户指定周期, 重复运行
* @param interval 运行周期, 单位s, 精度1us(小数)
* @param cb 超时回调
* @return 一个绑定timerQueue内部新增的Timer对象的TimerId对象, 用来唯一标识该Timer对象
*/
TimerId EventLoop::runEvery(double interval, TimerCallback cb)
{
Timestamp time(addTime(Timestamp::now(), interval));
return timerQueue_->addTimer(std::move(cb), time, interval);
}

/**
* 取消指定定时器
* @param timerId Timer id, 唯一对应一个Timer对象
*/
void EventLoop::cancel(TimerId timerId)
{
return timerQueue_->cancel(timerId);
}

用户运行一个loop线程,并添加定时任务示例:

1
2
3
4
5
6
7
8
void threadFunc()
{
assert(EventLoop::getEventLoopOfCurrentThread() == NULL); // 断言当前线程没有创建EventLoop对象
EventLoop loop; // 创建EventLoop对象
assert(EventLoop::getEventLoopOfCurrentThread() == &loop); // 断言当前线程创建了EventLoop对象
loop.runAfter(1.0, callback); // 1sec后运行callback
loop.loop(); // 启动loop循环
}

runInLoop与queueInLoop执行用户任务

同样是运行用户任务函数,runInLoop和queueInLoop都可以被多个线程执行,分为2种情况:1)如果当前线程是创建当前EventLoop对象的线程,那么立即执行用户任务;2)如果不是,那么在loop循环中排队执行(本次循环末尾),实际上这点也是由queueInLoop完成的。

queueInLoop只做了runInLoop的第2)种情况的工作,也就是只会在loop循环中排队执行用户任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/**
* 执行用户任务
* @param cb 用户任务函数
* @note 可以被多个线程执行:
* 如果当前线程是创建当前EventLoop对象的线程,直接执行;
* 否则,用户任务函数入队列pendingFunctors_成为一个pending functor,在loop循环中排队执行
*/
void EventLoop::runInLoop(Functor cb)
{
if (isInLoopThread())
{
cb();
}
else
{
queueInLoop(std::move(cb));
}
}

/**
* 排队进入pendingFunctors_,等待执行
* @param cb 用户任务函数
* @note 如果当前线程不是创建当前EventLoop对象的线程 或者正在调用pending functor,
* 就唤醒loop线程,避免loop线程阻塞.
*/
void EventLoop::queueInLoop(Functor cb)
{
{
MutexLockGuard lock(mutex_);
pendingFunctors_.push_back(std::move(cb));
}

if (!isInLoopThread() || callingPendingFunctors_)
{
wakeup();
}
}

为什么要对pendingFunctors_加锁?

因为queueInLoop可以被多个线程访问,意味着pendingFunctors_也能被多个线程访问,加锁确保线程安全。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/**
* 执行用户任务
* @param cb 用户任务函数
* @note 可以被多个线程执行:
* 如果当前线程是创建当前EventLoop对象的线程,直接执行;
* 否则,用户任务函数入队列pendingFunctors_成为一个pending functor,在loop循环中排队执行
*/
void EventLoop::runInLoop(Functor cb)
{
if (isInLoopThread())
{
cb();
}
else
{
queueInLoop(std::move(cb));
}
}

/**
* 排队进入pendingFunctors_,等待执行
* @param cb 用户任务函数
* @note 如果当前线程不是创建当前EventLoop对象的线程 或者正在调用pending functor,
* 就唤醒loop线程,避免loop线程阻塞.
*/
void EventLoop::queueInLoop(Functor cb)
{
{
MutexLockGuard lock(mutex_);
pendingFunctors_.push_back(std::move(cb));
}

if (!isInLoopThread() || callingPendingFunctors_)
{
wakeup();
}
}

这里有几个问题:

  • 为什么要唤醒 EventLoop?
  • wakeup 是怎么实现的?
  • pendingFunctors_是如何被消费的?

为什么要唤醒 EventLoop:

我们首先调用了 pendingFunctors_.push_back(cb), 将该函数放在 pendingFunctors_中。EventLoop::loop 的每一轮循环在最后会调用 doPendingFunctors 依次执行这些函数。

而 EventLoop 的唤醒是通过 Poller::poll()中调用poll()/epoll_wait() 实现的,如果此时该 EventLoop 中迟迟没有事件触发,那么 poll()/epoll_wait() 一直就会阻塞。 这样会导致,pendingFunctors_中的任务迟迟不能被执行了。

所以必须要唤醒 EventLoop ,从而让pendingFunctors_中的任务尽快被执行。

wakeup 是怎么实现的:

muduo 这里采用了对 eventfd 的读写来实现对 EventLoop 的唤醒。

在 EventLoop 建立之后,就创建一个 eventfd,并将其可读事件注册到 EventLoop 中。

wakeup() 的过程本质上是对这个 eventfd 进行写操作,以触发该 eventfd 的可读事件。这样就起到了唤醒 EventLoop 的作用。

1
2
3
4
5
void EventLoop::wakeup()
{
uint64_t one = 1;
sockets::write(wakeupFd_, &one, sizeof one);
}

很多库为了兼容 macOS,往往使用 pipe 来实现这个功能。muduo 采用了 eventfd,性能更好些,但代价是不能支持 macOS 了。但是muduo 似乎从一开始的定位就只支持 Linux。

pendingFunctors_是如何被消费的

下面讲下 doPendingFunctors 的实现,muduo 是如何处理这些待处理的函数的,以及中间用了哪些优化操作。

代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
void EventLoop::doPendingFunctors()
{
std::vector<Functor> functors;

callingPendingFunctors_ = true;

{
MutexLockGuard lock(mutex_);
functors.swap(pendingFunctors_);
}

for (size_t i = 0; i < functors.size(); ++i)
{
functors[i]();
}
callingPendingFunctors_ = false;
}

从代码可以看到,函数非常简单。大概只有十行代码,但是这十行代码中却有两个非常巧妙的地方。

  1. callingPendingFunctors_的作用

从代码可以看出,如果 callingPendingFunctors_为 false,则说明此时尚未开始执行 doPendingFunctors 函数。

这个有什么作用呢,我们需要结合下 queueInLoop 中,对是否执行 wakeup() 的判断

1
2
3
4
if (!isInLoopThread() || callingPendingFunctors_)
{
wakeup();
}

这里还需要结合下 EventLoop 循环的实现,其中 doPendingFunctors() 是 每轮循环的最后一步处理。

如果调用 queueInLoop 和 EventLoop 在同一个线程,且 callingPendingFunctors_为 false 时,则说明:此时尚未执行到 doPendingFunctors()。

那么此时即使不用 wakeup,也可以在之后照旧执行 doPendingFunctors() 了。

这么做的好处非常明显,可以减少对 eventfd 的 IO 读写。

  1. 锁范围的减小

在此函数中,有一段特别的代码:

1
2
3
4
5
std::vector<Functor> functors;
{
MutexLockGuard lock(mutex_);
functors.swap(pendingFunctors_);
}

这个作用是 pendingFunctors_和 functors 的内容进行交换,实际上就是此时 functors 持有了 pendingFunctors_的内容,而 pendingFunctors_被清空了。

这个好处是什么呢? 如果不这么做,直接遍历 pendingFunctors_, 然后处理对应的函数。这样的话,锁会一直等到所有函数处理完才会被释放。在此期间,queueInLoop 将不可用。

而以上的写法,可以极大减小锁范围,整个锁的持有时间就是 swap 那一下的时间。待处理函数执行的时候,其他线程还是可以继续调用 queueInLoop。

eventfd唤醒线程

eventfd是Linux(Linux 2.6以后)特有的,专用于事件通知的机制,类似于管道(pipe)、域套接字(UNIX Domain Socket)。

创建eventfd 函数原型:

1
2
3
4
5
6
7
8
9
10
#include <sys/eventfd.h>
/* 创建一个文件描述符(event fd), 用于事件通知
* initval 计数初值
* flags 标志位, 如果没用到可设为0, 也可以用以下选项 按位或 取值:
* EFD_CLOEXEC 为新建的fd设置close-on-exec(FD_CLOEXEC), 等效于以O_CLOEXEC方式open(2)
* EFD_NONBLOCK 等效于fcntl(2)设置O_NONBLOCK
* EFD_SEMAPHORE 将eventfd当信号量一样调用, read 将导致计数-1, write 将导致计数+1; 如果没指定该标志, read将返回8byte计数值, 且计数值归0, write将计数值+指定值.
* 返回 新建的fd, 用于事件通知, 绑定到一个eventfd对象; 失败, 返回-1
*/
int eventfd(unsigned int initval, int flags);

创建完event fd后,可用read()读取event fd,如果fd是阻塞的,read可能阻塞线程;如果event fd设置了EFD_NONBLOCK,read返回EAGIAN错误。直到另外一个线程对event fd进行write。

1
2
3
4
5
// 为wakeupChannel_设置读回调
wakeupChannel_->setReadCallback(std::bind(&EventLoop::handleRead, this));
// we are always reading the wakeupfd
// 使能wakeupChannel_读事件
wakeupChannel_->enableReading();

eventfd使用示例:

线程1阻塞等待,线程2唤醒线程1。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
#include <stdio.h>
#include <stdlib.h>
#include <sys/eventfd.h>
#include <unistd.h>
#include <pthread.h>

#define __STDC_FORMAT_MACROS // for 跨平台打印
#include <inttypes.h>

void* thread_func1(void* arg) /* 等待线程 */
{
int wakeupfd = *(int*)arg;
printf("thread_func1 start\n");
uint64_t rdata;
int ret = read(wakeupfd, &rdata, sizeof(rdata));
if (ret < 0) {
perror("thread_func1 read error");
pthread_exit(NULL);
}

printf("thread_func1 success to be waked up, rdata = %" PRId64 "\n", rdata);
}

void* thread_func2(void* arg) /* 唤醒线程 */
{
int wakeupfd = *(int*)arg;
printf("thread_func2 ready to sleep 1 sec\n");
sleep(1);
uint64_t wdata = 10;
int ret = write(wakeupfd, &wdata, sizeof(wdata));
if (ret < 0) {
perror("thread_func2 write error");
pthread_exit(NULL);
}
printf("thread_func2 success to wake up another thread, wdata = %" PRId64 "\n", wdata);
}

/* 创建2个线程,thread_func1阻塞等待eventfd,thread_func2唤醒等等eventfd的线程 */
int main()
{
int evfd = eventfd(0, 0);
if (evfd < 0) {
perror("eventfd error");
exit(1);
}
pthread_t th1, th2;
pthread_create(&th1, NULL, thread_func1, (void*)&evfd);
pthread_create(&th2, NULL, thread_func2, (void*)&evfd);
pthread_join(th1, NULL);
pthread_join(th2, NULL);
return 0;
}

EventLoop使用eventfd唤醒loop线程

  1. 创建event fd

构造函数中,wakeupFd_ 初值为createEventfd()

1
2
3
4
5
6
7
8
9
10
int createEventfd()
{
int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
if (evtfd < 0)
{
LOG_SYSERR << "Failed in eventfd";
abort();
}
return evtfd;
}
  1. 绑定event fd与唤醒通道wakeupChannel_,利用event fd构造一个Channel对象后,传递给wakeupChannel_,便于Poller监听、事件回调
1
2
3
4
5
// 为wakeupChannel_设置读回调
wakeupChannel_->setReadCallback(std::bind(&EventLoop::handleRead, this));
// we are always reading the wakeupfd
// 使能wakeupChannel_读事件
wakeupChannel_->enableReading();
  1. 启动loop循环,可能阻塞在poll()/epoll_wait()
  2. 其他线程通过queueInLoop()调用wakeup(),唤醒阻塞的loop线程
1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* 其他线程唤醒等待在wakeupFd_上的线程, 产生读就绪事件.
* @note write将添加8byte数据到内部计数器. 被唤醒线程必须调用read读取8byte数据.
*/
void EventLoop::wakeup()
{
uint64_t one = 1;
ssize_t n = sockets::write(wakeupFd_, &one, sizeof(one));
if (n != sizeof(one))
{
LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8";
}
}
  1. loop线程被唤醒后,读取event fd
1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* 处理wakeupChannel_读事件
* @note read wakeupfd_
*/
void EventLoop::handleRead()
{
uint64_t one = 1;
ssize_t n = sockets::read(wakeupFd_, &one, sizeof(one));
if (n != sizeof(one))
{
LOG_ERROR << "EventLoop::handleRead() reads " << n << " bytes instead of 8";
}
}

参考


muduo-事件驱动EventLoop
https://gstarmin.github.io/2023/07/03/muduo-EventLoop/
作者
Starmin
发布于
2023年7月3日
许可协议