在多线程开发中,我们常常遇到这样一种场景:一些线程接受用户请求,另外一些线程处理这些请求,之所以把接受请求和处理请求的逻辑分开,一方面是出于资源调度的考虑(用户请求也许很多,但这些请求涉及的资源很少),另一方面也可能是异步响应的需求。这种场景存在于NIO的通信框架,存在于Tomcat的回调处理框架,存在于日志系统的异步flush,存在于各种类型的线程池中。总之,这种典型的生产者消费者场景比比皆是,而生产者消费者模式的核心就是阻塞队列。由于阻塞队列会涉及大量的锁竞争和线程阻塞,都是非常耗费CPU的操作,因此阻塞队列的性能好坏能够在很大程度上决定上层应用的性能瓶颈。
不同语言中,都会有(较为)官方的阻塞队列实现,JAVA中用BlockingQueue这个接口来描述阻塞队列,有数组实现的有界阻塞队列为ArrayBlockingQueue,用链表实现的无界阻塞队列为LinkedBlockingQueue,除此之外还有优先级阻塞队列PriorityBlockingQueue,这些阻塞队列除了自身特有逻辑外,都采用基于锁的悲观并发控制,即标准的生产者消费者模型(见操作系统教材)。这种模型由于锁冲突严重,已经被证明是一种低效的实现。那么它有优化空间吗?优化空间能有多少?
最近很火的Disruptor给了我们一个答案,在使用了Ringbuffer,内存屏障,乐观并发控制等众多优化手段后,Disrupter的阻塞队列与传统的阻塞队列相比有超过10倍的吞吐率。
遗憾的是,Disruptor主要应用于订阅模式,即一条消息被所有消费者消费,另外Disruptor支持构建具有复杂依赖关系的消费者,Storm底层便利用Disruptor来构建本地任务拓扑。而语言自带的阻塞队列一般是一个消息被单个消费者消费,这种场景更加通用。
介绍Disruptor不在本文范围内,有机会的话会另开一篇博文做Disruptor的源码解析,本文想做的是,利用Disruptor中RingBuffer和乐观并发控制的设计思想,实现一个单消息单消费的通用阻塞队列。
RingBuffer是一个首尾相连的环形数组,所谓首尾相连,是指当RingBuffer上的指针越过数组是上界后,继续从数组头开始遍历。因此,RingBuffer中至少有一个指针,来表示RingBuffer中的操作位置。另外,指针的自增操作需要做并发控制,Disruptor和本文的OptimizedQueue都使用CAS的乐观并发控制来保证指针自增的原子性,关于乐观并发控制之后会着重介绍。
Disruptor中的RingBuffer上只有一个指针,表示当前RingBuffer上消息写到了哪里,此外,每个消费者会维护一个sequence表示自己在RingBuffer上读到哪里,从这个角度讲,Disruptor中的RingBuffer上实际有消费者数+1个指针。由于我们要实现的是一个单消息单消费的阻塞队列,只要维护一个读指针(对应消费者)和一个写指针(对应生产者)即可,无论哪个指针,每次读写操作后都自增一次,一旦越界,即从数组头开始继续读写。如图1所示:
图1:RingBuffer结构
RingBuffer的元素个数最好设置为2的整数次幂,这样可以通过与运算获取指针的真实位置(有人问到溢出问题,解释见评论),Disruptor正是这样做的。RingBuffer的实现摘要如下:
private Object[] ringBuffer = null;
private Object[] ringBuffer = null;
private AtomicInteger offerSeq = new AtomicInteger(-1);
private AtomicInteger takeSeq = new AtomicInteger(-1);
private int size;
private int mask;
public OptimisticQueue(int sizePower) {
this.size = 1 << sizePower;
this.ringBuffer = new Object[size];
for (int i = 0; i < size; i++) {
ringBuffer[i] = new Entry(i + 1);
}
this.mask = 0x7FFFFFFF >> (31 - sizePower);
}
@SuppressWarnings("unchecked")
private Entry nextOffer() {
return (Entry) ringBuffer[offerSeq.incrementAndGet() & mask];
}
@SuppressWarnings("unchecked")
private Entry nextTake() {
return (Entry) ringBuffer[takeSeq.incrementAndGet() & mask];
}
final int inc(int i) {
return (++i == items.length)? 0 : i;
}
那么ArrayBlockingQueue中的RingBuffer和我们这里说的RingBuffer有区别吗?
区别很大。
RingBuffer中之所以分离读指针和写指针,为的是使生产者和消费者互不干扰,两者可以同时操作指针,达到完全并发执行。然而ArrayBlockingQueue中对所有的读写操作都进行了加锁互斥,也就是说同一时刻只能有一个生产者或消费者操作,这其实和用一个指针没什么区别,没有发挥RingBuffer的真正优势,同时这也是ArrayBlockingQueue本身的一个瓶颈。
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
try {
while (count == 0)
notEmpty.await();
} catch (InterruptedException ie) {
notEmpty.signal(); // propagate to non-interrupted thread
throw ie;
}
E x = extract();
return x;
} finally {
lock.unlock();
}
}
那么为什么ArrayBlockingQueue中要对所有的操作都做互斥?这个问题一言两语说不清楚,留给读者自己推敲,一言以蔽之:在基于锁的悲观并发控制下,在阻塞队列中互斥所有操作是最正确的做法。
既然基于锁的悲观并发控制需要互斥所有操作,RingBuffer无法发挥其“读写分离”的优势,那么怎样才能不互斥读写操作呢?
这就要说到CAS与乐观并发控制了。
悲观并发控制,是用锁把可能出现并发操作的临界区保护起来,以杜绝临界区内的并发,将并发错误防范于未然,正如其名,是在“悲观”地认为会发生错误而设计的并发控制策略。
乐观并发控制则相反:在临界区的执行过程中,乐观地认为不会发生并发冲突,没有并发错误自然无需加锁。OK,不加锁性能自然好多了,那如果真地发生并发冲突了呢?首先我们需要随时知道并发冲突发生了没有,也就是需要一种并发冲突检测机制,冲突检测可以在执行临界区之前,也可以在执行临界区之后。OK,假设我们现在有一种冲突检测机制,接下来怎么办呢?自然是需要一种冲突解决机制了,不同的冲突检测机制对应不同的冲突解决机制,这要视场景和情况而定。
说的这么玄乎,举个例子:
public final int incrementAndGet() {
for (;;) {
int current = get();
int next = current + 1;
if (compareAndSet(current, next))
return next;
}
}
上述代码为JDK中AtomicInteger的原子自增操作,可以看到它依赖的是CAS,即compareAndSet操作来检测自增操作是否有并发冲突,当没有并发冲突时,compareAndSet比较当前值等于自增前的值,并成功将自增后的值赋给自己,若有并发冲突发生,也就是有其他线程已经更改了当前值,compareAndSet会返回false,并重新执行自增操作
上述案例中,CAS是冲突检测机制,重新执行自增操作是冲突解决机制。CAS的冲突检测发生在临界区执行之前(这里的临界区是给自己赋值)。
顺带一提,上述的循环操作是不是有点像spinlock?其实spinlock也是一种乐观并发控制。而Mutex,在JAVA中又叫Monitor这种包含阻塞逻辑的锁才真正对应悲观并发控制。
我这里也有一个冲突检测发生在临界区执行之后的案例:著名的MySQL高可用解决方案Galera是通过对比WriteSet中主键的值来检测是否有不同Master的事务对同一行数据进行了操作,这种检测是发生在事务commit前,同步WriteSet后,这时事务中所有的操作都已执行完毕,若检测到事务冲突,直接进行回滚。这里检测writeset主键值冲突是冲突检测机制,事务回滚是冲突解决机制。
由此可见,虽然悲观并发控制都是基于锁的,但乐观并发控制的方式就多种多样了,需要根据场景对症下药。
我们先来看看怎样通过CAS来检测队列中的并发冲突。
图2. RingBuffer的Entry结构
如图2所示,Entry即RingBuffer中的元素,这里为每个Entry定义一个frontDoor和backDoor,即前门和后门,frontDoor和backDoor各为一个AtomicReference(对象的原子引用,对这个类不熟的Google之),对生产者A,当它要往一个Entry中写消息时,需要先调用frontDoor的CAS(null, A)来检测是否能进入前门,若返回true,表示A获得了写Entry的权利,在A写完Entry后,会将frontDoor的值重置为null。
在A写Entry的这段时间,frontDoor的值为A,其他生产者也想写这个Entry时,会在CAS frontDoor时检测到并发冲突。同理,对消费者B,当它想要消费Entry中的消息时,要通过backDoor.CAS(null, B)来检测是否有读的并发冲突。
前门和后门的AtomicReference就像两个单人通道,保证了一个Entry在一个时间点上只会有一个生产者和消费者来读写消息。
这里有个疑问,CAS实现指针自增可以保证多个生产者或多个消费消费者不会进入一个entry,那这个前门后门的设计是为了什么?我们假设这样一个场景:有entry为10的ringbuffer,2个生产者,生产者1获取entry1消息的速度非常慢(假设该线程因为不明原因卡住了),生产者2同时会快速消费掉entry2-10,绕了一圈后进入entry1获取消息,这时候entry1中就会有2个生产者,也就是说指针上的CAS并不能严格保证一个entry上不会有多个生产者或多个消费者并发访问,但是有了frontDoor这个屏障,在生产者2进入entry1后,发现entry1的前门中已经有了生产者1,即可立即发现冲突。
冲突检测机制有了,还需要一个冲突解决机制,很容易想到,无论对生产者还是消费者,检测到冲突后只要对下一个指针上的Entry执行操作即可。当然对下一个Entry也要做冲突检测。
目前为止,一切看起来很美好,我们用CAS来检测生产者与生产者,消费者与消费者之间的并发冲突,通过指针+1来解决冲突。我们是不是遗漏了什么?
当生产者A进入frontDoor后,如果发现Entry中已经有了消息怎么办?相应的,若消费者B进入backDoor后,发现Entry中还没有消息怎么办?
AS实现指针+1解决了生产者与生产者之间,消费者与消费者之间的并发冲突问题,同一个Entry上的生产者和消费者就不会出现并发冲突吗?
先回答第一个问题,我们知道,任何有界队列都可能出现队列被写满的情况,队列被写满后生产者在写消息时被阻塞,直到队列被消费一点后才被唤醒,把消息塞进队列。相应的,在队列为空时消费者会阻塞,直到队列中有消息后才会被唤醒,来消费消息。这种生产者满则阻塞和消费者空则阻塞的模式正是阻塞队列的名称由来。
当生产者A进入frontDoor后,发现Entry中已经有了消息,生产者满则阻塞,相应的,若消费者B进入backDoor后,发现Entry中还没有消息,消费者空则阻塞。传统的队列中,生产者消费者阻塞都发生在队列全体满或者空的状态后,而我们这种队列的阻塞则是针对每个Entry而言的。
稍作斟酌,将阻塞逻辑细化到每个Entry中,会不会使阻塞频繁发生呢?倘若如此,这种优化就很难笃定地说是一种优化了。这里RingBuffer的作用再次凸显出来,在RingBuffer中读写就像赛跑,只有生产者们超越消费者们一圈,才可能出现上述的生产者满则阻塞,而这时也正是队列被写满的情况,相应的,只有消费者们追上生产者们,才可能出现消费者空则阻塞,这时也正是队列为空的情况。所以不会使阻塞频繁发生。
对第二个问题,同一个Entry上的生产者和消费者实际上是通过Entry上的消息进行线程同步,当生产者A写完消息后,通知消费者B获取消息,或者消费者B消费完消息后,通知生产者A可以写新的消息。两者之间通过wait和notify原语做线程同步,因此不会有并发冲突。
写到这里,我们已经实现了一个基于RingBuffer和CAS乐观并发控制的无锁阻塞队列,接下来我们来聊聊这个队列的实现细节。
相关阅读:
致敬disruptor:CAS实现高效(伪)无锁阻塞队列实践(下篇)
本文来自网易实践者社区,经作者马进授权发布