muduo库-BlockingQueue和BounderBlockingQueue

muduo库-BlockingQueue和BounderBlockingQueue

无界阻塞队列 BlockingQueue

muduo库的BlcokingQueue实际上用的生产这消费者模型。我们知道生产者消费者模型一般有两种实现方式,可以利用信号量也可以利用条件变量实现,muduo库采用条件变量实现。

BlockingQueue比较简单,它是线程安全的,我们在外部调用它时无需加锁。

BlockingQueue结构如下:

其代码实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
#ifndef MUDUO_BASE_BLOCKINGQUEUE_H
#define MUDUO_BASE_BLOCKINGQUEUE_H

#include <muduo/base/Condition.h>
#include <muduo/base/Mutex.h>

#include <boost/noncopyable.hpp>
#include <deque>
#include <assert.h>

namespace muduo
{

template<typename T>
class BlockingQueue : boost::noncopyable
{
public:
BlockingQueue()
: mutex_(),
notEmpty_(mutex_),
queue_() //使用deque
{
}

void put(const T& x) //往阻塞队列放任务
{
MutexLockGuard lock(mutex_);
queue_.push_back(x);
notEmpty_.notify(); // wait morphing saves us //不空唤醒
// http://www.domaigne.com/blog/computing/condvars-signal-with-mutex-locked-or-not/
}

#ifdef __GXX_EXPERIMENTAL_CXX0X__
void put(T&& x)
{
MutexLockGuard lock(mutex_);
queue_.push_back(std::move(x));
notEmpty_.notify();
}
// FIXME: emplace()
#endif

T take() //取任务
{
MutexLockGuard lock(mutex_); //取任务时也要保证线程安全
// always use a while-loop, due to spurious wakeup
while (queue_.empty()) //如果为空就阻塞在这里啦,用while循环不必多说
{
notEmpty_.wait();
}
assert(!queue_.empty()); //确保队列不空
#ifdef __GXX_EXPERIMENTAL_CXX0X__
T front(std::move(queue_.front()));
#else
T front(queue_.front()); //取出队头
#endif
queue_.pop_front(); //弹出队头。
return front; //返回
}

size_t size() const //返回队列大小
{
MutexLockGuard lock(mutex_);
return queue_.size();
}

private:
mutable MutexLock mutex_;
Condition notEmpty_;
std::deque<T> queue_;
};

}

#endif // MUDUO_BASE_BLOCKINGQUEUE_H

有界阻塞队列 BoundBlockingQueue

BoundBlockingQueue有界阻塞队列,实际上就是实现了一个循环队列。功能和上面的BlockingQueue都是一样的。

muduo库实现该队列实际上是内部把boost::circular_buffer类作为底层数据结构实现的。

其实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
#ifndef MUDUO_BASE_BOUNDEDBLOCKINGQUEUE_H
#define MUDUO_BASE_BOUNDEDBLOCKINGQUEUE_H

#include <muduo/base/Condition.h>
#include <muduo/base/Mutex.h>

#include <boost/circular_buffer.hpp>
#include <boost/noncopyable.hpp>
#include <assert.h>

namespace muduo
{

template<typename T>
class BoundedBlockingQueue : boost::noncopyable
{
public:
explicit BoundedBlockingQueue(int maxSize)
: mutex_(),
notEmpty_(mutex_),
notFull_(mutex_),
queue_(maxSize)
{
}

void put(const T& x)
{
MutexLockGuard lock(mutex_);
while (queue_.full())
{
notFull_.wait();
}
assert(!queue_.full());
queue_.push_back(x);
notEmpty_.notify();
}

T take()
{
MutexLockGuard lock(mutex_);
while (queue_.empty())
{
notEmpty_.wait();
}
assert(!queue_.empty());
T front(queue_.front());
queue_.pop_front();
notFull_.notify();
return front;
}

bool empty() const
{
MutexLockGuard lock(mutex_);
return queue_.empty();
}

bool full() const
{
MutexLockGuard lock(mutex_);
return queue_.full();
}

size_t size() const //返回大小
{
MutexLockGuard lock(mutex_);
return queue_.size();
}

size_t capacity() const //返回容量
{
MutexLockGuard lock(mutex_);
return queue_.capacity();
}

private:
mutable MutexLock mutex_;
Condition notEmpty_;
Condition notFull_;
boost::circular_buffer<T> queue_; //使用boost的环形缓冲区
};

}

#endif // MUDUO_BASE_BOUNDEDBLOCKINGQUEUE_H

总结

这两个阻塞队列其实没啥区别,只是底层采用不同数据结构存储数据而已。

参考:


muduo库-BlockingQueue和BounderBlockingQueue
https://gstarmin.github.io/2023/06/26/muduo库-BlockingQueue和BounderBlockingQueue/
作者
Starmin
发布于
2023年6月26日
许可协议