致敬disruptor:CAS实现高效(伪)无锁阻塞队列实践(下篇)

未来已来2018-09-12 09:45

作者:马进


四、    两阶段发布与实现


上文提到,无论是生产者还是消费者,在操作完消息后,都需要通知Entry另一头Disruptor中管这种行为叫做两阶段发布,例如对生产者来说,第一次发布是将消息赋给Entry中的item,第二次发布是通知backDoor中的消费者(如果有)消息到了,如图3所示:


3. 两阶段发布流程


需要特别留意的是,如果我们不在item上做任何互斥,可能出现干等的情况,例如当生产者A发现item不为空,进入阻塞之前,消费者B取走了消息,并向生产者A发出通知,由于这时候A还没有进入阻塞,导致通知失效,A会永无止境地等下去,而且Entry中的消息也会一直为空,新进来的消费者也会一直等下去,进入活锁。

生产者A             消费者B

check item != null?

              check item != null?

              take(item), item = null

              notify producer wakeup

wait consumer notify….

为了避免活锁,我们这里引入JAVA中常用的一个编程技术:双重检查锁定,我们知道Objectwaitnotify需要写在synchronized中,双重检查锁定就是除了在一开始判断item是否为null外,在synchonized内部再判断一次。

下面给出Entry的完整实现,其中publish(T event)为发布消息,take(T event)为取走消息,由于EntryOptimizedQueue的内部类,类型参数T无需再声明。

private class Entry {
private volatile T event = null;
private AtomicReference<Object> backDoor
= new AtomicReference<Object>();
private AtomicReference<Object> frontDoor
= new AtomicReference<Object>();
private int id;

public Entry(int id) {
this.id = id;
}

public void publish(T event) {
this.event = event;
frontDoor.set(null);
Object barrier = backDoor.get();
if (barrier != null) {
synchronized (barrier) {
barrier.notify();
}
}
}

public boolean enterFront(Object barrier) {
return frontDoor.compareAndSet(null, barrier);
}

public boolean enterBack(Object barrier) {
return backDoor.compareAndSet(null, barrier);
}

public T take() {
T e = event;
event = null;
backDoor.set(null);
Object barrier = frontDoor.get();
if (barrier != null) {
synchronized (barrier) {
barrier.notify();
}
}
return e;
}

public int getId() {
return id;
}
}


通过synchronized内部的while判断,保障了生产者和消费者都不会死等,活锁的问题是解决了,但我们也发现我们的代码中再次引入了(虽然没有while循环也无法消除synchronized,但纯粹为了线程同步而加的synchronized严格意义上讲不算锁,如果JAVA能从native层保障waitnotify的原子性,完全可以做到消除synchronized)。所以严格地说,OptimizedQueue没有消除锁,而是将整个队列上的一把大锁细化到了每个Entry中,即所谓的锁细化,锁细化在很大程度上降低了锁冲突概率,并且把加锁的时间控制到了最少,通过上一篇synchronized原理深探,可以知道synchornized的代价在这里非常小(基本不会生成Monitor)。

下面贴出OptimizedQueue的完整实现:

/*
* every producer and consumer must implement this interface
* Barrier is just a object lock for the synchronization between
* producer and consumer
*/
public interface BarrierHolder {
Object getBarrier();
}

public class OptimisticQueue<T> {
private Object[] ringBuffer = null;
private AtomicInteger offerSeq = new AtomicInteger(-1);
private AtomicInteger takeSeq = new AtomicInteger(-1);
private int size;
private int mask;

public OptimisticQueue(int sizePower) {
this.size = 1 << sizePower;
this.ringBuffer = new Object[size];
for (int i = 0; i < size; i++) {
ringBuffer[i] = new Entry(i + 1);
}
this.mask = 0x7FFFFFFF >> (31 - sizePower);
}

@SuppressWarnings("unchecked")
private Entry nextOffer() {
return (Entry) ringBuffer[offerSeq.incrementAndGet() & mask];
}

@SuppressWarnings("unchecked")
private Entry nextTake() {
return (Entry) ringBuffer[takeSeq.incrementAndGet() & mask];
}

public void offer(BarrierHolder holder, T event) throws
InterruptedException {
Entry entry = nextOffer();
Object barrier = holder.getBarrier();
if (!entry.enterFront(barrier)) {
offer(holder, event);
return;
}
if (entry.event != null) {
synchronized (barrier) {
while (entry.event != null) {
barrier.wait();
}
}
}
entry.publish(event);
}

public T take(BarrierHolder consumer) throws InterruptedException {
Entry entry = nextTake();
Object barrier = consumer.getBarrier();
if (!entry.enterBack(barrier))
return take(consumer);
if (entry.event == null) {
synchronized (barrier) {
while (entry.event == null) {
barrier.wait();
}
}
}
return entry.take();
}

private class Entry {
......
}
}


五、    使用场景

本文中实现的OptimizedQueue的使用场景有如下限制:

1 OptimizedQueue类似于JDK中的ArrayBlockingQueue,是有界数组,当队列中挤压的消息数大于数组长度时,生产者再想塞入消息会阻塞。

2 OptimizedQueue的参数为RingBuffer长度的自然对数,例如当参数为10时,RingBuffer长度为1024,这样设计是为了用更高效的方式获取真实指针位置。

3 OptimizedQueue的生产者和消费者的个数都必须小于RingBuffer长度,试想,倘若生产者的个数大于RingBuffer长度,可能出现某个生产者因为RingBuffer中所有frontDoor都被占用而一直找不到槽位,以至于进入死循环。消费者亦然。

OptimizedQueue的实际限制其实只有3,而在大多数应用场景中,都能保障3的条件,因为线程是非常消耗资源的对象,相对的RingBufferEntry只会消耗内存中非常少量的空间,在使用OptimizedQueue需要保证RingBuffer长度大于生产者消费者上限。

阻塞队列一个典型的应用是线程池,例如在网易分布式数据库DDB中,一个简单的select语句在创建完执行计划后,可能要将SQL下发给多个mysql节点,这里就需要线程池来完成多个mysql节点上的查询,而单个client连接数有1024的最大限制,这种情况下只要保证RingBuffer长度大于1024,并且连接池中的线程个数设上限为1023,即可用OptimizedQueue构建线程池。

六、    Benchmark

最后我们来看看OptimizedQueue在性能上究竟比JDK的自带阻塞队列高出多少。

为了衡量队列性能,设计了一个简单的生产者消费者场景,生产者初始化一个整数2000,并从1一直累加到2000,然后将2000放入队列,生产者从队列中取出2000,也同样从1累加到2000,累加完后吞吐量+1,这个场景本身没什么逻辑,只是为了测量性能而设计。这里就不再贴代码了,附件中的源码中包含完整的benchmark实现。

用于对比的是OptimizedQueueJDK中的ArrayBlockingQueue,因为ArrayBlockingQueue也是数组实现,而且相比无界队列LinkedBlockingQueue性能更好。性能指标为队列在各种生产者和消费者个数下的吞吐率(实际上是单位时间内生产者完成的计算量)。

测试分别在两个机器上进行,第一个是公司的办公机,性能较差,第二个是我自己的笔记本,因为比较新,性能好。

办公机的性能测试结果如下:

配置烂,用eclipse写代码没法网页听歌,干什么都卡的一B

我的笔记本的测试结果如下:


i7 2.9GHZ 双核,平时用起来很流畅

在我的办公机上,两种队列性能差距非常明显,尤其是10个生产者和1个消费者的场景下,OptimizedQueueArrayBlockingQueue有将近60的性能差距。相对的,在我的笔记本上,两者差距也存在,但不那么明显了,两者差距大小应该和业务场景,机器配置甚至CPU架构都相关。

值得关注的是表格中标红的数据,用办公机在1个生产者和10个消费者的测试场景下,ArrayBlockingQueue的吞吐率反而高出很多,而在接下来的测试中,TPS直接降到了个位数,而在我自己性能更好的笔记本中,同等场景下的吞吐率也才只有31W。因只能用异常来形容这组数据了。ArrayBlockingQueue是用ReentrantLock来实现的,大概是恰巧踩到了什么优化点。当然我们不能期待现实环境中总能遇到这样的情况。另外,这几组数据中,使用ArrayBlockingQueue在生产者个数大于消费者个数时性能较差,各种原因值得玩味。



相关阅读:

致敬disruptor:CAS实现高效(伪)无锁阻塞队列实践(上篇)

本文来自网易实践者社区,经作者马进授权发布