springboot~disruptor异步队列 Disruptor Java内置队列的问题 伪共享 伪共享问题 的解决方案 Disruptor框架是如何解决伪共享问题的? 例子

Disruptor是英国外汇交易公司LMAX开发的一个高性能队列,研发的初衷是解决内存队列的延迟问题(在性能测试中发现竟然与I/O操作处于同样的数量级)。

Java内置队列的问题

介绍Disruptor之前,我们先来看一看常用的线程安全的内置队列有什么问题。Java的内置队列如下表所示。
springboot~disruptor异步队列
Disruptor
Java内置队列的问题
伪共享
伪共享问题 的解决方案
Disruptor框架是如何解决伪共享问题的?
例子

队列的底层一般分成三种:数组、链表和堆。其中,堆一般情况下是为了实现带有优先级特性的队列,暂且不考虑。

从数组和链表两种数据结构来看,基于数组线程安全的队列,比较典型的是ArrayBlockingQueue,它主要通过加锁的方式来保证线程安全;基于链表的线程安全队列分成LinkedBlockingQueue和ConcurrentLinkedQueue两大类,前者也通过锁的方式来实现线程安全,而后者以及上面表格中的LinkedTransferQueue都是通过原子变量compare and swap(以下简称“CAS”)这种不加锁的方式来实现的。

但是对 volatile类型的变量进行 CAS 操作,存在伪共享问题,下面介绍一下

伪共享

CPU的缓存系统是以缓存行(cache line)为单位存储的,一般的大小为64bytes。在多线程程序的执行过程中,存在着一种情况,多个需要频繁修改的变量存在同一个缓存行当中。

假设:有两个线程分别访问并修改X和Y这两个变量,X和Y恰好在同一个缓存行上,这两个线程分别在不同的CPU上执行。那么每个CPU分别更新好X和Y时将缓存行刷入内存时,发现有别的修改了各自缓存行内的数据,这时缓存行会失效,从L3中重新获取。这样的话,程序执行效率明显下降。为了减少这种情况的发生,其实就是避免X和Y在同一个缓存行中,可以主动添加一些无关变量将缓存行填充满,比如在X对象中添加一些变量,让它有64 Byte那么大,正好占满一个缓存行。
springboot~disruptor异步队列
Disruptor
Java内置队列的问题
伪共享
伪共享问题 的解决方案
Disruptor框架是如何解决伪共享问题的?
例子

伪共享问题 的解决方案

简单的说,就是 以空间换时间: 使用占位字节,将变量的所在的 缓冲行 塞满。
disruptor 无锁框架就是这么干的。

Disruptor框架是如何解决伪共享问题的?

在Disruptor中有一个重要的类Sequence,该类包装了一个volatile修饰的long类型数据value,无论是Disruptor中的基于数组实现的缓冲区RingBuffer,还是生产者,消费者,都有各自独立的Sequence,RingBuffer缓冲区中,Sequence标示着写入进度,例如每次生产者要写入数据进缓冲区时,都要调用RingBuffer.next()来获得下一个可使用的相对位置。对于生产者和消费者来说,Sequence标示着它们的事件序号。

例子

/**
 * 停车场问题.
 * 1) 事件对象Event
 * 2)三个消费者Handler
 * 3)一个生产者Processer
 * 4)执行Main方法
 */
public class DisruptorCar {
    private static final Integer NUM = 1; // 1,10,100,1000

    /**
     * 测试入口 ,
     * 一个生产者(汽车进入停车场);
     * 三个消费者(一个记录汽车信息,一个发送消息给系统,一个发送消息告知司机)
     * 前两个消费者同步执行,都有结果了再执行第三个消费者
     */
    @Test
     public  void main() throws InterruptedException {
        long beginTime = System.currentTimeMillis();
        int bufferSize = 2048; // 2的N次方
        try {
            // 创建线程池,负责处理Disruptor的四个消费者
            ExecutorService executor = Executors.newFixedThreadPool(4);

            // 初始化一个 Disruptor
            Disruptor<MyInParkingDataEvent> disruptor = new Disruptor<MyInParkingDataEvent>(new EventFactory<MyInParkingDataEvent>() {
                @Override
                public MyInParkingDataEvent newInstance() {
                    return new MyInParkingDataEvent(); // Event 初始化工厂
                }
            }, bufferSize, executor, ProducerType.SINGLE, new YieldingWaitStrategy());

            // 使用disruptor创建消费者组 MyParkingDataInDbHandler 和 MyParkingDataToKafkaHandler
            EventHandlerGroup<MyInParkingDataEvent> handlerGroup = disruptor.handleEventsWith(
                    new MyParkingDataInDbHandler(), new MyParkingDataToKafkaHandler());

            // 当上面两个消费者处理结束后在消耗 smsHandler
            MyParkingDataSmsHandler myParkingDataSmsHandler = new MyParkingDataSmsHandler();
            handlerGroup.then(myParkingDataSmsHandler);

            // 启动Disruptor
            disruptor.start();

            CountDownLatch countDownLatch = new CountDownLatch(1); // 一个生产者线程准备好了就可以通知主线程继续工作了
            // 生产者生成数据
            executor.submit(new MyInParkingDataEventPublisher(countDownLatch, disruptor));
            countDownLatch.await(); // 等待生产者结束

            disruptor.shutdown();
            executor.shutdown();
        } catch (Exception e) {
            e.printStackTrace();
        }

        System.out.println("总耗时:" + (System.currentTimeMillis() - beginTime));
    }

    public class MyInParkingDataEvent {

        private String carLicense; // 车牌号

        public String getCarLicense() {
            return carLicense;
        }

        public void setCarLicense(String carLicense) {
            this.carLicense = carLicense;
        }

    }

    /**
     * Handler 第一个消费者,负责保存进场汽车的信息
     */
    public class MyParkingDataInDbHandler implements EventHandler<MyInParkingDataEvent>, WorkHandler<MyInParkingDataEvent> {

        @Override
        public void onEvent(MyInParkingDataEvent myInParkingDataEvent) throws Exception {
            long threadId = Thread.currentThread().getId(); // 获取当前线程id
            String carLicense = myInParkingDataEvent.getCarLicense(); // 获取车牌号
            System.out.println(String.format("Thread Id %s 保存 %s 到数据库中 ....", threadId, carLicense));
        }

        @Override
        public void onEvent(MyInParkingDataEvent myInParkingDataEvent, long sequence, boolean endOfBatch)
                throws Exception {
            this.onEvent(myInParkingDataEvent);
        }

    }

    /**
     * 第二个消费者,负责发送通知告知工作人员(Kafka是一种高吞吐量的分布式发布订阅消息系统)
     */
    public class MyParkingDataToKafkaHandler implements EventHandler<MyInParkingDataEvent> {

        @Override
        public void onEvent(MyInParkingDataEvent myInParkingDataEvent, long sequence, boolean endOfBatch)
                throws Exception {
            long threadId = Thread.currentThread().getId(); // 获取当前线程id
            String carLicense = myInParkingDataEvent.getCarLicense(); // 获取车牌号
            System.out.println(String.format("Thread Id %s 发送 %s 进入停车场信息给 kafka系统...", threadId, carLicense));
        }

    }

    /**
     * 第三个消费者,sms短信服务,告知司机你已经进入停车场,计费开始。
     */
    public class MyParkingDataSmsHandler implements EventHandler<MyInParkingDataEvent> {

        @Override
        public void onEvent(MyInParkingDataEvent myInParkingDataEvent, long sequence, boolean endOfBatch)
                throws Exception {
            long threadId = Thread.currentThread().getId(); // 获取当前线程id
            String carLicense = myInParkingDataEvent.getCarLicense(); // 获取车牌号
            System.out.println(String.format("Thread Id %s 给  %s 的车主发送一条短信,并告知他计费开始了 ....", threadId, carLicense));
        }

    }

    /**
     * 生产者,进入停车场的车辆
     */
    public class MyInParkingDataEventPublisher implements Runnable {

        private CountDownLatch countDownLatch; // 用于监听初始化操作,等初始化执行完毕后,通知主线程继续工作
        private Disruptor<MyInParkingDataEvent> disruptor;

        public MyInParkingDataEventPublisher(CountDownLatch countDownLatch,
                                             Disruptor<MyInParkingDataEvent> disruptor) {
            this.countDownLatch = countDownLatch;
            this.disruptor = disruptor;
        }

        @Override
        public void run() {
            MyInParkingDataEventTranslator eventTranslator = new MyInParkingDataEventTranslator();
            try {
                for (int i = 0; i < NUM; i++) {
                    disruptor.publishEvent(eventTranslator);
                    Thread.sleep(1000); // 假设一秒钟进一辆车
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                countDownLatch.countDown(); // 执行完毕后通知 await()方法
                System.out.println(NUM + "辆车已经全部进入进入停车场!");
            }
        }

    }

    class MyInParkingDataEventTranslator implements EventTranslator<MyInParkingDataEvent> {

        @Override
        public void translateTo(MyInParkingDataEvent myInParkingDataEvent, long sequence) {
            this.generateData(myInParkingDataEvent);
        }

        private MyInParkingDataEvent generateData(MyInParkingDataEvent myInParkingDataEvent) {
            myInParkingDataEvent.setCarLicense("车牌号: 鄂A-" + (int) (Math.random() * 100000)); // 随机生成一个车牌号
            System.out.println("Thread Id " + Thread.currentThread().getId() + " 写完一个event");
            return myInParkingDataEvent;
        }

    }

}