SpringBoot + Disruptor = 王炸!

Disruptor 介绍

Disruptor 是一个开源的高性能内存队列,由英国外汇交易公司 LMAX 开发的,获得了 2011 年的 Oracle 官方的 Duke’s Choice Awards(Duke 选择大奖)。

图片

推荐一套基于 SpringBoot + Vue + uni-app 实现的全套电商系统mall(Github标星60K),前台商城项目和后台管理系统都有了,能支持完整的订单流程!涵盖商品、订单、购物车、权限、优惠券、会员等功能,功能很强大!

  • 项目地址:https://github.com/macrozheng/mall
  • 视频教程:https://www.macrozheng.com/video/

Disruptor 提供的功能类似于 KafkaRocketMQ 这类分布式队列,不过,其作为范围是 JVM(内存)。

  • Github 地址:https://github.com/LMAX-Exchange/disruptor
  • 官方教程:https://lmax-exchange.github.io/disruptor/user-guide/index.html

Disruptor 解决了 JDK 内置线程安全队列的性能和内存安全问题。

JDK 中常见的线程安全的队列如下:

队列名字 是否有界
ArrayBlockingQueue 加锁(ReentrantLock 有界
LinkedBlockingQueue 加锁(ReentrantLock 有界
LinkedTransferQueue 无锁(CAS 无界
ConcurrentLinkedQueue 无锁(CAS 无界

从上表中可以看出:这些队列要不就是加锁有界,要不就是无锁无界。而加锁的的队列势必会影响性能,无界的队列又存在内存溢出的风险。

因此,一般情况下,我们都是不建议使用 JDK 内置线程安全队列。

Disruptor 就不一样了!它在无锁的情况下还能保证队列有界,并且还是线程安全的。

不过, Disruptor 的基本使用非常简单,我们最重要的还是要搞懂其原理,明白它是如何被设计成这么厉害的并发框架。

Disruptor 核心概念

  • Event :你可以把 Event 理解为存放在队列中等待消费的消息对象。
  • EventFactory :事件工厂用于生产事件,我们在初始化 Disruptor 类的时候需要用到。
  • EventHandler :Event 在对应的 Handler 中被处理,你可以将其理解为生产消费者模型中的消费者。
  • EventProcessor :EventProcessor 持有特定消费者(Consumer)的 Sequence,并提供用于调用事件处理实现的事件循环(Event Loop)。
  • Disruptor :事件的生产和消费需要用到Disruptor 对象。
  • RingBuffer :RingBuffer(环形数组)用于保存事件。
  • WaitStrategy :等待策略。决定了没有事件可以消费的时候,事件消费者如何等待新事件的到来。
  • Producer :生产者,只是泛指调用 Disruptor 发布事件的用户代码,Disruptor 没有定义特定接口或类型。
  • ProducerType :指定是单个事件发布者模式还是多个事件发布者模式(发布者和生产者的意思类似,我个人比较喜欢用发布者)。
  • Sequencer :Sequencer 是 Disruptor 的真正核心。此接口有两个实现类 SingleProducerSequencer、MultiProducerSequencer ,它们定义在生产者和消费者之间快速、正确地传递数据的并发算法。

图片LMAX Disruptor User Guide

Disruptor 实战

我们要使用 Disruptor 实现一个最基本的生产消费模型的整个步骤是下面这样的(标准的生产消费者模型):

  1. 定义事件(Event) : 你可以把 Event 理解为存放在队列中等待消费的消息对象。
  2. 创建事件工厂 :事件工厂用于生产事件,我们在初始化 Disruptor 类的时候需要用到。
  3. 创建处理事件的 Handler :Event 在对应的 Handler 中被处理,你可以将其理解为生产消费者模型中的消费者。
  4. 创建并启动 Disruptor : 事件的生产和消费需要用到Disruptor 对象。
  5. 发布事件 :发布的事件保存在 Disruptor 的环形数组中。
  6. 关闭 Disruptor :类似于线程池的关闭。

整个步骤看似比较复杂,其实,逻辑还是比较简单的。我们需要围绕事件(Event)和Disruptor来做文章。

我们可以在 Mavan 仓库找到 Disruptor 的最新 jar 包。

Disruptor 的 Maven 仓库地址:https://search.maven.org/artifact/com.lmax/disruptor

Maven :

<dependency>
    <groupId>com.lmax</groupId>
    <artifactId>disruptor</artifactId>
    <version>3.4.4</version>
</dependency>

Gradle:

implementation 'com.lmax:disruptor:3.4.4'

1、定义事件

我们先来定义一个代表日志事件的类:LogEvent 。

事件中包含了一些和事件相关的属性,比如我们这里定义的 LogEvent 对象中就有一个用来表示日志消息内容的属性:message

/**
 *
 * @author Guide哥
 **/
public class LogEvent {
    private String message;
    //省略了 Getter/Setter
}

我们这里只是为了演示,实际项目中,一个标准日志事件对象所包含的属性肯定不是只有一个 message (可以参考 log4j2 对 Disruptor 的使用)。

2、创建事件工厂

创建一个工厂类 LogEventFactory 用来创建 LogEvent 对象。

LogEventFactory 继承 EventFactory 接口并实现了 newInstance() 方法 。

public class LogEventFactory implements EventFactory<LogEvent> {
    @Override
    public LogEvent newInstance() {
        return new LogEvent();
    }
}

3、创建处理事件的 Handler

创建一个用于处理后续发布的事件的类:LogEventHandler 。

LogEventHandler 继承 EventHandler 接口并实现了 onEvent() 方法 。

public class LogEventHandler implements EventHandler<LogEvent> {
    @Override
    public void onEvent(LogEvent logEvent, long sequence, boolean endOfBatch) throws Exception {
        System.out.println(logEvent.getMessage());
    }
}

EventHandler 接口的 onEvent() 方法共有 3 个参数:

  • event :待消费/处理的事件
  • sequence :正在处理的事件在环形数组(RingBuffer)中的位置
  • endOfBatch : 表示这是否是来自环形数组(RingBuffer)中一个批次的最后一个事件(批量处理事件)
public interface EventHandler<T>
    void onEvent(T event, long sequence, boolean endOfBatch) throws Exception;
}

4、初始化 Disruptor

我们这里定义一个方法用于获取 Disruptor 对象。

private static Disruptor<LogEvent> getLogEventDisruptor() {
    // 创建 LogEvent 的工厂
    LogEventFactory logEventFactory = new LogEventFactory();
    // Disruptor 的 RingBuffer 缓存大小
    int bufferSize = 1024 * 1024;
    // 生产者的线程工厂
    ThreadFactory threadFactory = new ThreadFactory() {
        final AtomicInteger threadNum = new AtomicInteger(0);

        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, "LogEventThread" + " [#" + threadNum.incrementAndGet() + "]");
        }
    };
    //实例化 Disruptor
    return new Disruptor<>(
            logEventFactory,
            bufferSize,
            threadFactory,
            // 单生产者
            ProducerType.SINGLE,
            // 阻塞等待策略
            new BlockingWaitStrategy());
}

Disruptor 的推荐使用的构造函数如下:

public class Disruptor<T> {
  public Disruptor(
          final EventFactory<T> eventFactory,
          final int ringBufferSize,
          final ThreadFactory threadFactory,
          final ProducerType producerType,
          final WaitStrategy waitStrategy)
  {
      this(
          RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy),
          new BasicExecutor(threadFactory));
  }

......
}

我们需要传递 5 个参数:

  • eventFactory : 我们自定义的时间工厂。
  • ringBufferSize : 指定 RingBuffer 的容量大小。
  • threadFactory :自定义的线程工厂。Disruptor 的默认线程池是自定义的,我们只需要传入线程工厂即可。
  • producerType : 指定是单个事件发布者模式还是多个事件发布者模式(发布者和生产者的意思类似,我个人比较喜欢用发布者)。
  • waitStrategy : 等待策略,决定了没有事件可以消费的时候,事件消费者如何等待新事件的到来。

ProducerType 的源码如下,它是一个包含两个变量的枚举类型

public enum ProducerType
{
    SINGLE,
    MULTI
}
  • SINGLE : 单个事件发布者模式,不需要保证线程安全。
  • MULTI :多个事件发布者模式,基于 CAS 来保证线程安全。

WaitStrategy (等待策略)接口的实现类中只有两个方法:

  • waitFor() : 等待新事件的到来。
  • signalAllWhenBlocking() : 唤醒所有等待的消费者。
public interface WaitStrategy
{
    long waitFor(long sequence, Sequence cursor, Sequence dependentSequence, SequenceBarrier barrier)
        throws AlertException, InterruptedException, TimeoutException;
    void signalAllWhenBlocking();
}

WaitStrategy 的实现类共有 8 个,也就是说共有 8 种等待策略可供选择。

图片

除了上面介绍的这个构造函数之外,Disruptor 还有一个只有 3 个参数构造函数。

使用这个构造函数创建的 Disruptor 对象会默认使用 ProducerType.MULTI(多个事件发布者模式)和 BlockingWaitStrategy(阻塞等待策略) 。

public Disruptor(final EventFactory<T> eventFactory, final int ringBufferSize, final ThreadFactory threadFactory)
{
    this(RingBuffer.createMultiProducer(eventFactory, ringBufferSize), new BasicExecutor(threadFactory));
}

5、发布事件

//获取 Disruptor 对象
Disruptor<LogEvent> disruptor = getLogEventDisruptor();
//绑定处理事件的Handler对象
disruptor.handleEventsWith(new LogEventHandler());
//启动 Disruptor
disruptor.start();
//获取保存事件的环形数组(RingBuffer)
RingBuffer<LogEvent> ringBuffer = disruptor.getRingBuffer();
//发布 10w 个事件
for (int i = 1; i <= 100000; i++) {
    // 通过调用 RingBuffer 的 next() 方法获取下一个空闲事件槽的序号
    long sequence = ringBuffer.next();
    try {
        LogEvent logEvent = ringBuffer.get(sequence);
        // 初始化 Event,对其赋值
        logEvent.setMessage("这是第%d条日志消息".formatted(i));
    } finally {
        // 发布事件
        ringBuffer.publish(sequence);
    }
}
// 关闭 Disruptor
disruptor.shutdown();

上面的代码中,我们通过 Disruptor 的 handleEventsWith 方法来绑定处理事件的 Handler 对象。

Disruptor 可以设置多个处理事件的 Handler,并且可以灵活的设置消费者的处理顺序,串行,并行都是可以的。

就比如下面的代码表示 Handler1 和 Handler2 是并行执行,最后再执行 Handler3 。

disruptor.handleEventsWith(new Handler1(), new Handler2()).handleEventsWith(new Handler3());

6、结果

这是第1条日志消息
这是第2条日志消息
这是第3条日志消息
......
这是第99999条日志消息
这是第100000条日志消息

从打印结果可以看出,我们发布的 10w 个事件已经成功被处理。

总结

Disruptor 提供的功能类似于 KafkaRocketMQ 这类分布式队列,不过,其作为范围是 JVM(内存)。

  • Github 地址:https://github.com/LMAX-Exchange/disruptor
  • 官方教程:https://lmax-exchange.github.io/disruptor/user-guide/index.html

Disruptor 在无锁的情况下还能保证队列有界,并且还是线程安全的,性能非常强,比较适合单机场景需要使用生产者-消费者模式的项目。

扫码领红包

微信赞赏支付宝扫码领红包

发表回复

后才能评论