澳门新浦京8455com解读Disruptor系列–解读源码(0)之源码导读

澳门新浦京8455com 16

这个故事源自一个很简单的想法:创建一个对开发人员友好的、简单轻量的线程间通讯框架,完全不用锁、同步器、信号量、等待和通知,在Java里开发一个轻量、无锁的线程内通讯框架;并且也没有队列、消息、事件或任何其他并发专用的术语或工具。

本篇文章是后续解读Disruptor源码的导读,适合对Disruptor还不了解的同学。如果有兴趣,还可以看下我之前发的Disruptor系列文章。
要大概弄明白Disruptor是个什么玩意,可以先回答这几个问题:
Disruptor是什么?
为什么要用Disruptor?
Disruptor为什么那么快?

多线程已经成为大多数开发者的兴趣所在了。他们努力尝试想找出最优策略来解决这个问题。过去已经有各种尝试去标准化这些方案。特别是随着大数据,实时分析等问题领域的兴起,又存在着新的挑战。在这个方向需要走的一步是“Doug
Lea”的作品(一部巨作),以并发框架(JSR 166)的形式提供给我们。

只用普通的老式Java接口实现POJO的通讯。

Disruptor是什么?

我也来个一句话总结:Disruptor是LMAX开源的、用于替代并发线程间数据交换的有界队列的、可选无锁的、高性能的线程间通讯框架。

听起来是不是感觉高大上?是不是觉得很好奇怎么个高性能?是不是还有点云里雾里咋交换数据?这也是我最初的感觉。
慢慢来,我们逐步分解这些名词。
Disruptor:首先我们看看Disruptor这个词。熟悉星际迷航(Star
Trek)的同学可能对这个词会更熟悉,Disruptor(裂解炮)和Phaser(相位枪)都是星际迷航中的武器,Phaser是联邦武器,而Disruptor是克林贡的等价物。看起来Disruptor和Phaser是类似的,那Phaser又是啥呢?在jdk1.7中,Doug
Lea大师为我们带来了Phaser,一个可重用的同步屏障,功能类似CyclicBarrier和CountDownLatch,但是使用更加灵活。
LMAX团队之所以为Disruptor起了这个名字,一方面是因为Disruptor和Phaser在处理依赖图表(或者说是消费链、多阶段处理、流水线)时有类似的地方;另一方面,Disruptor这个词本身就有分裂、破坏的意思,很符合Disruptor中的设计思想对于传统并发编程思想的某种“对立”–并发编程总是在想如何利用多线程去拥有更大的吞吐量、更少的延迟,而Disruptor却通过非常多的性能测试发现,在并发编程中正是过多的线程间通讯、争抢导致了性能瓶颈,而最终选择了单线程的处理逻辑去完成业务处理。
LMAX:LMAX是英国一个交易量很大的金融交易所,也是开源Disruptor的组织。
开源:https://github.com/LMAX-Exchange/disruptor
并发线程间数据交换的有界队列
:在并发编程时,经常需要在线程间交换数据,如任务分发、事件传递、状态变更等,
这时通常需要多个线程去访问一个线程安全的数据结构,一般选择使用队列(Queue)实现。队列在容量上又分为有界和无界,使用无界队列通常都需要保证生产者生产速度小于消费者消费速度,否则将由于内存耗尽导致灾难结果。为了避免这种情况,队列通常是有界的。Java中经常使用BlockingQueue作为并发线程使用的有界队列,使用put()、take()在队列满、空的情况下进行等待,非常适合多线程间的数据共享,实现方式一般有ArrayBlockingQueue和LinkedBlockingQueue。
线程间通讯:等同于并发线程间的数据交换。
可选无锁:使用Disruptor唯一可能遇到Java锁的时候,就是在消费者等待可用事件进行消费时。而Disruptor为这个等待过程,编写了包括使用锁和不使用锁的多种策略,可根据不同场景和需求进行选择。
高性能:单生产者+单消费者性能测试:
使用LinkedBlockingQueue(https://github.com/LMAX-Exchange/disruptor/blob/master/src/perftest/java/com/lmax/disruptor/queue/OneToOneQueueThroughputTest.java

Starting Queue tests
Run 0, BlockingQueue=3,687,315 ops/sec
Run 1, BlockingQueue=2,721,088 ops/sec
Run 2, BlockingQueue=3,471,017 ops/sec
Run 3, BlockingQueue=5,347,593 ops/sec
Run 4, BlockingQueue=5,316,321 ops/sec
Run 5, BlockingQueue=5,449,591 ops/sec
Run 6, BlockingQueue=5,173,305 ops/sec

使用常用的带Translator的Disruptor测试(澳门新浦京8455com ,https://github.com/LMAX-Exchange/disruptor/blob/master/src/perftest/java/com/lmax/disruptor/translator/OneToOneTranslatorThroughputTest.java)

Starting Disruptor tests
Run 0, Disruptor=20,222,446 ops/sec
Run 1, Disruptor=70,671,378 ops/sec
Run 2, Disruptor=37,878,787 ops/sec
Run 3, Disruptor=37,105,751 ops/sec
Run 4, Disruptor=38,986,354 ops/sec
Run 5, Disruptor=27,578,599 ops/sec
Run 6, Disruptor=34,281,796 ops/sec

综上的名词解释,聪明如你,想必已经明白个大概了。如果还是不明白,肯定是我讲的不好,请留言。

现在开始区分并发和并行性。这些只是不同的策略,而且市面上有很多框架提供,都能帮我们达到相同的目的。但在选择的时候如果能同时知道他们内部的实现细节对我们也是大有好处的。本文将要探究JVM线程池和线程共享的一些稳定有效的选项。当然,随着多核处理器的广泛使用,新的问题也随之而来。开发人员也开始思考利用高级硬件的“mechanical
sympathy”(译者注:表示底层硬件的运作方式以及与硬件运行方式协同的软件编程)来提高性能。

它可能跟Akka的类型化actor类似,但作为一个必须超级轻量,并且要针对单台多核计算机进行优化的新框架,那个可能有点过了。

为什么要用Disruptor?

那为什么要用Disruptor呢?或者说Disruptor解决了什么问题呢?
一般情况下,新的发明创造都是伴随着对旧有事物的痛点产生的。在并发线程间交换数据这个问题上,使用传统的阻塞队列有什么问题呢?
根据LMAX团队在2011年发布的论文,可简单归结为以下几个问题:

  1. 锁的成本:
    传统阻塞队列使用锁保证线程安全。而锁通过操作系统内核的上下文切换实现,会暂停线程去等待锁直到释放。执行这样的上下文切换,会丢失之前保存的数据和指令。由于消费者和生产者之间的速度差异,队列总是接近满或者空的状态。这种状态会导致高水平的写入争用。
  2. 伪共享问题导致的性能低下。
  3. 队列是垃圾的重要来源,队列中的元素和用于存储元素的节点对象需要进行频繁的重新分配。

论文中给出的,循环5亿次64位计数操作使用时间对比:

澳门新浦京8455com 1

执行5亿次64位计数操作使用时间对比

Disruptor通过良好的设计,最大限度解决了上述三个问题。
好奇如你,难道就不想知道其中奥妙吗?

个人以为,当讨论线程池时,目前广泛应用的主要有下述机制:

当actor跨越不同JVM实例(在同一台机器上,或分布在网络上的不同机器上)的进程边界时,Akka框架很善于处理进程间的通讯。

Disruptor为什么那么快?

简单来说,Disruptor在设计上有以下几点优势:

  1. 内部数据存储使用环形缓冲(Ring
    Buffer),在启动时进行对象内存分配,这个对象并非数据本身,而只是一个数据容器。这个容器由用户提供,在Disruptor运行时,生产者负责拿到容器设置好数据,消费者再去可用的容器中拿到数据完成消费。这样分配对象内存,将有极大可能让这些对象内存在主存中连续分配,从而支持了CPU缓存位置预测。否则每次new一个对象,很难知道对象内存分配到哪了。这样做好有个好处,环形缓冲在JVM生命周期中通常是永生的,GC的压力更小
  2. 尽量使用无锁设计,合理使用CAS。举两个例子。
    a.
    通过区分是否允许并发生产者,细分单生产者模式和多生产者模式,单生产者单线程更新游标,多生产者模式使用CAS更新游标位置。
    b.
    在阻塞型等待策略中,将等待分两部分。第一部分,消费者等待生产者发布新数据,由于不确定等待时间,使用锁的条件等待。而第二部分,消费者等待上一组消费者(Disruptor支持链式消费,类似Phaser分阶段处理)
    完成此数据消费时,由于已确定有可消费数据,且假定通常的消费时间较短,所以使用自旋(忙循环)来避免上下文切换导致的性能开销。能不用锁就不用锁,即便要用锁,也要将使用锁的粒度用的最小。
  3. 优化数据结构,解决伪共享问题。Java中通过填充缓存行,来解决伪共享问题的思路,现在可能已经是老生常谈,连Java8中都新增了sun.misc.Contended注解来避免伪共享问题。但在Disruptor刚出道那会儿,能够以“机械同情(mechanical
    sympathy)”的思考方式,来优化Java数据结构,这恐怕还很新潮。
  4. 优化其他实现细节。举几个例子。
    a.
    如一个RingBuffer,容量bufferSize为2的幂。使用sequence表示事件序号,可通过sequence & (bufferSize - 1)定位元素的index,比普通的求余取模(%)要快得多。事先计算好bufferSize以2为底的对数indexShift,可通过sequence >>> indexShift快速计算出sequence/bufferSize的商flag(其实相当于当前sequence在环形跑道上跑了几圈,在数据生产时要设置好flag,在数据消费时就可以通过比对flag判断此位置的数据是不是本圈的数据),比除法要快得多。
    b.
    合理使用Unsafe,实现更加高效地内存管理和原子访问。Unsafe封装了一些类似C/C++中的指针操作,除了JDK,在Netty、Spring、Kafka、Storm等非常多的流行开源项目中都使用了Unsafe。

简单讲到这,后续开始分享解读Disruptor源码的内容。


参考资料
1.Java8使用@sun.misc.Contended避免伪共享 –
http://www.jianshu.com/p/c3c108c3dcfd
2.写Java也得了解CPU–CPU缓存
http://www.cnblogs.com/techyc/p/3607085.html
3.linux查看CPU高速缓存(cache)信息
http://www.cnblogs.com/kekukele/p/3829369.html
4.理解 CPU Cache
http://wsfdl.com/linux/2016/06/11/%E7%90%86%E8%A7%A3CPU%E7%9A%84cache.html
5.关于CPU Cache — 程序猿需要知道的那些事
http://cenalulu.github.io/linux/all-about-cpu-cache/
6.Mechanical Sympathy – Memory Barriers/Fences
https://mechanical-sympathy.blogspot.tw/2011/07/memory-barriersfences.html
7.《Java高并发程序设计》- 4.4.3 Java中的指针:Unsafe类

  1. Executor框架提供的线程。
  2. LMAX的Ring Buffer概念 (译者注:Ring
    Buffer即环形缓冲,LMAX是一种新型零售金融交易平台,其框架能以低延迟产生大量交易,LMAX建立在JVM平台上。
  3. 基于Actor(事件)的实现。

但对于那种只需要线程间通讯的小型项目而言,用Akka类型化actor可能有点儿像用牛刀杀鸡,不过类型化actor仍然是一种理想的实现方式。

并发框架下的线程池选项:

我花了几天时间,用动态代理,阻塞队列和缓存线程池创建了一个解决方案。

首先,我个人不赞同使用当下流行的线程池概念,而应该使用工作队列的概念。简而言之,在一个执行框架可供选择的各种不同选项都是基于某种顺序数据结构,如数组或队列(阻塞或非阻塞)之类的,比如ConcurrentLinkedQueue(并发链式队列),ArrayBlockingQueue(数组阻塞队列),
LinkedBlockingQueue(链式阻塞队列)等等。文档表明,尽管它们的使用环境各不相同,但他们隐含的本质/数据结构有相同的性质,如顺序插入和遍历。

图一是这个框架的高层次架构:

澳门新浦京8455com 2

澳门新浦京8455com 3
图一: 框架的高层次架构

优势:

SPSC队列是指单一生产者/单一消费者队列。MPSC队列是指多生产者/单一消费者队列。

  1. 减少线程创建导致的延迟
  2. 通过优化线程数量,可以解决资源不足的问题。

派发线程负责接收Actor线程发送的消息,并把它们派发到对应的SPSC队列中去。

这些可以使应用程序和服务器应用响应更快。使用线程池看似一个很不错的解决方案但是却有一个根本性的缺陷:连续争用问题。这里是Java中关于一些并发框架下线程池选项的讨论。

接收到消息的Actor线程用其中的数据调用相应的actor实例中的方法。借助其他actor的代理,actor实例可以将消息发送到MPSC队列中,然后消息会被发送给目标actor线程。

Disruptor(环形缓冲):

我创建了一个简单的例子来测试,就是下面这个打乒乓球的程序:

(LMAX的一个基于环形缓冲区的高性能进程间消息库)LMAX的开发人员使用一个干扰框架来解决连续争用问题,这个框架是基于一个叫环形缓冲的数据结构。它可能是线程间发送消息的最有效方式了。它是队列的一种替代实现方式,但它又和SEDA和Actors(译者注:这两种都是和Disruptor类似的并发模型)有一些共同特征。向Disruptor中放入消息需要两步,第一步申请一个环形缓冲的槽位,槽位可为用户提供写对应数据的记录。然后需要提交该条记录,为了能灵活使用内存,2步法是必须的。只有经过提交,这条消息才能对消费者线程可见。下图描述了环状缓冲这个数据结构(Disruptor的核心):

public interface PlayerA (
  void pong(long ball); //发完就忘的方法调用 
}
public interface PlayerB {   
  void ping(PlayerA playerA, long ball); //发完就忘的方法调用 
}    
public class PlayerAImpl implements PlayerA {    
  @Override    
  public void pong(long ball) {    
  }    
}
public class PlayerBImpl implements PlayerB {   
  @Override    
  public void ping(PlayerA playerA, long ball) {    
    playerA.pong(ball);    
  }    
}
public class PingPongExample {   
  public void testPingPong() {
    // 管理器隐藏了线程间通讯的复杂性
    // 控制actor代理,actor实现和线程  
    ActorManager manager = new ActorManager();
    // 在管理器内注册actor实现 
    manager.registerImpl(PlayerAImpl.class);    
    manager.registerImpl(PlayerBImpl.class);
    //创建actor代理。代理会将方法调用转换成内部消息。 
    //会在线程间发给特定的actor实例。    
    PlayerA playerA = manager.createActor(PlayerA.class);    
    PlayerB playerB = manager.createActor(PlayerB.class);    
    for(int i = 0; i < 1000000; i++) {    
       playerB.ping(playerA, i);     
   }    
}

(译者注:LMAX的核心是一个业务逻辑处理器,而该业务逻辑处理器的核心是Disruptor,这是一个并发组件,能够在无锁的情况下实现网络的Queue并发操作)

经过测试,速度大约在每秒500,000
次乒/乓左右;还不错吧。然而跟单线程的运行速度比起来,我突然就感觉没那么好了。在 单线程中运行的代码每秒速度能达到20亿
(2,681,850,373)!

澳门新浦京8455com 4

居然差了5,000
多倍。太让我失望了。在大多数情况下,单线程代码的效果都比多线程代码更高效。

Disruptor在多核平台上能达到很低的延迟同时又有高吞吐量,尽管线程程间需要共享数据以及传递消息。

我开始找原因,想看看我的乒乓球运动员们为什么这么慢。经过一番调研和测试,我发现是阻塞队列的问题,我用来在actor间传递消息的队列影响了性能。

它的独特之处在于锁和免争用结构。它甚至不使用CAS或内存保护。若想了解关于它的更多细节,这里有一篇不错的文章和官网。使用Disruptor的一个缺点(事实上也算不上缺点)是,你需要提前告知Disruptor应用程序完成任务所需要的线程数。

澳门新浦京8455com 5

基于事件:

图 2: 只有一个生产者和一个消费者的SPSC队列

对于传统线程池机制,一个强大的替代方案就是基于事件模型。这种基于事件的线程轮询/线程池/线程调度机制在函数式编程中很常见。关于这个概念的一个非常流行的实现是基于actor的系统(译者注:Scala的并发系统),Akka已成为其实际上的标准。(译者注:Akka,一种善于处理进程间通信的框架)

所以我发起了一场竞赛,要将它换成Java里最快的队列。我发现了Nitsan
Wakart的 博客 。他发了几篇文章介绍单一生产者/单一消费者(SPSC)无锁队列的实现。这些文章受到了Martin
Thompson的演讲 终极性能的无锁算法的启发。

Actors是非常轻量级的并发实体。它们使用一种事件驱动的接收循环来异步处理消息。消息模式匹配可以很方便地表述一个actor的行为。它们提高了抽象级别从而使写,测试,理解和维护并发/分布式系统更加容易。让你专注于工作流——消息如何流入系统——而不是低层次的基本概念如线程,锁以及套接字IO。一个线程可以分配多个或单个actor,而且两种模型都是依需要选择的。

跟基于私有锁的队列相比,无锁队列的性能更优。在基于锁的队列中,当一个线程得到锁时,其它线程就要等着锁被释放。而在无锁的算法中,某个生产者线程生产消息时不会阻塞其它生产者线程,消费者也不会被其它读取队列的消费者阻塞。

像Akka这种基于actor的系统的优势有如下所列:

在Martin
Thompson的演讲以及在Nitsan的博客中介绍的SPSC队列的性能简直令人难以置信—— 超过了100M
ops/sec。比JDK的并发队列实现还要快10倍
(在4核的 Intel Core i7 上的性能大约在 8M ops/sec 左右)。

  • 可封装
  • 可监督
  • 可配置执行
  • 位置透明
  • 重试机制

我怀着极大的期望,将所有actor上连接的链式阻塞队列都换成了无锁的SPSC队列。可惜,在吞吐量上的性能测试并没有像我预期的那样出现大幅提升。不过很快我就意识到,瓶颈并不在SPSC队列上,而是在多个生产者/单一消费者(MPSC)那里。

注:调试一个基于actor的系统是一个非常艰难的事情。

用SPSC队列做MPSC队列的任务并不那么简单;在做put操作时,多个生产者可能会覆盖掉彼此的值。SPSC
队列就没有控制多个生产者put操作的代码。所以即便换成最快的SPSC队列,也解决不了我的问题。

Disruptor使用一个线程一个消费者模式,不同于Actors使用N个线程M个消费者模式。比如,你可以拥有任意多的actors,然后它们会被分散到一些数目固定的线程中(通常是一个核一个线程),至于其他的部分,actor模型就和Disruptor模型差不多了;特别是用于批处理的时候。

为了处理多个生产者/单一消费者的情况,我决定启用LMAX
Disruptor ——一个基于环形缓冲区的高性能进程间消息库。

我最初在因特网上搜多到的答案也说明开源空间中关于确定JVM选项基准的贡献还是有一些的。其中一个选项是ExecutorBenchmarkt。它是一个并行任务的开源测试框架。它是用Scala编写的,但是可以用于Java和Scala负载。

澳门新浦京8455com 6

简而言之,快速发展的软硬件行业在呈现新挑战给我们的同时也提供了大量解决应用程序容错性和响应性的方法。对于不可预知的少量线程,我建议使用JDK并发框架中的线程池机制。对于大量规模相似的任务,个人建议使用Disruptor。Disruptor的确是有一点学习曲线,但在性能和扩展性方面的收获远远值得投入的学习时间。在应用程序需要某种重试或管理机制,以及分布式任务时,建议使用Akka的Actor模型。尽管结果还有可能被其它因素所影响,你还是会选择map
reduce或fork/join模型或是其它自定义实现一个分布式应用程序。

图3: 单一生产者和单一消费者的LMAX Disruptor

借助Disruptor,很容易实现低延迟、高吞吐量的线程间消息通讯。它还为生产者和消费者的不同组合提供了不同的用例。几个线程可以互不阻塞地读取环形缓冲中的消息:

澳门新浦京8455com 7

图 4: 单一生产者和两个消费者的LMAX Disruptor    

下面是有多个生产者写入环形缓冲区,多个消费者从中读取消息的场景。

澳门新浦京8455com 8

图 5: 两个生产者和两个消费者的LMAX Disruptor

经过对性能测试的快速搜索,我找到了 三个发布者和一个消费者的吞吐量测试。
这个真是正合我意,它给出了下面这个结果:

LinkedBlockingQueue Disruptor
Run 0 4,550,625 ops/sec 11,487,650 ops/sec
Run 1 4,651,162 ops/sec 11,049,723 ops/sec
Run 2 4,404,316 ops/sec 11,142,061 ops/sec

在3 个生产者/1个 消费者场景下,
Disruptor要比LinkedBlockingQueue快两倍多。然而这跟我所期望的性能上提升10倍仍有很大差距。

这让我觉得很沮丧,并且我的大脑一直在搜寻解决方案。就像命中注定一样,我最近不在跟人拼车上下班,而是改乘地铁了。突然灵光一闪,我的大脑开始将车站跟生产者消费者对应起来。在一个车站里,既有生产者(车和下车的人),也有消费者(同一辆车和上车的人)。

我创建了
Railway类,并用AtomicLong追踪从一站到下一站的列车。我先从简单的场景开始,只有一辆车的铁轨。

public class RailWay {  
 private final Train train = new Train();  
 // stationNo追踪列车并定义哪个车站接收到了列车
 private final AtomicInteger stationIndex = new AtomicInteger();
// 会有多个线程访问这个方法,并等待特定车站上的列车 
public Train waitTrainOnStation(final int stationNo) {

   while (stationIndex.get() % stationCount != stationNo) {
    Thread.yield(); // 为保证高吞吐量的消息传递,这个是必须的。
                   //但在等待列车时它会消耗CPU周期 
   }  
   // 只有站号等于stationIndex.get() % stationCount时,这个忙循环才会返回

   return train;
 }
// 这个方法通过增加列车的站点索引将这辆列车移到下一站
  public void sendTrain() {
    stationIndex.getAndIncrement();
   }
  }

为了测试,我用的条件跟在Disruptor性能测试中用的一样,并且也是测的SPSC队列——测试在线程间传递long值。我创建了下面这个Train类,其中包含了一个long数组:

public class Train {   
  //   
  public static int CAPACITY = 2*1024;
  private final long[] goodsArray; // 传输运输货物的数组

  private int index;

  public Train() {   
      goodsArray = new long[CAPACITY];     
 }

 public int goodsCount() { //返回货物数量    
  return index;    
 }    
 public void addGoods(long i) { // 向列车中添加条目    
  goodsArray[index++] = i;    
 }    
 public long getGoods(int i) { //从列车中移走条目    
  index--;    
  return goodsArray[i];    
 }    
}

然后我写了一个简单的测试 :两个线程通过列车互相传递long值。

澳门新浦京8455com 9

图 6: 使用单辆列车的单一生产者和单一消费者Railway

public void testRailWay() {   
  final Railway railway = new Railway();    
  final long n = 20000000000l;    
  //启动一个消费者进程 
  new Thread() {    
   long lastValue = 0;
   @Override   
   public void run() {    
    while (lastValue < n) {    
      Train train = railway.waitTrainOnStation(1); //在#1站等列车
      int count = train.goodsCount();    
      for (int i = 0; i < count; i++) {    
        lastValue = train.getGoods(i); // 卸货   
      }    
      railway.sendTrain(); //将当前列车送到第一站 
     }    
   }    
 }.start();

final long start = System.nanoTime();
long i = 0;   
while (i < n) {    
 Train train = railway.waitTrainOnStation(0); // 在#0站等列车    
 int capacity = train.getCapacity();    
 for (int j = 0; j < capacity; j++) {    
   train.addGoods((int)i++); // 将货物装到列车上 
 }    
 railway.sendTrain();
 if (i % 100000000 == 0) { //每隔100M个条目测量一次性能 
    final long duration = System.nanoTime() - start;    
    final long ops = (i * 1000L * 1000L * 1000L) / duration;    
    System.out.format("ops/sec = %,dn", ops);    
    System.out.format("trains/sec = %,dn", ops / Train.CAPACITY);    
    System.out.format("latency nanos = %.3f%nn", 
    duration / (float)(i) * (float)Train.CAPACITY);    
  }    
 }    
}

在不同的列车容量下运行这个测试,结果惊着我了:

容量 吞吐量: ops/sec 延迟: ns
1 5,190,883 192.6
2 10,282,820 194.5
32 104,878,614 305.1
256 344,614,640 742. 9
2048 608,112,493 3,367.8
32768 767,028,751 42,720.7

在列车容量达到32,768时,两个线程传送消息的吞吐量达到了767,028,751
ops/sec。比Nitsan博客中的SPSC队列快了几倍。

继续按铁路列车这个思路思考,我想知道如果有两辆列车会怎么样?我觉得应该能提高吞吐量,同时还能降低延迟。每个车站都会有它自己的列车。当一辆列车在第一个车站装货时,第二辆列车会在第二个车站卸货,反之亦然。

澳门新浦京8455com 10

图 7: 使用两辆列车的单一生产者和单一消费者Railway

下面是吞吐量的结果:

容量 吞吐量: ops/sec 延时: ns
1 7,492,684 133.5
2 14,754,786 135.5
32 174,227,656 183.7
256 613,555,475 417.2
2048 940,144,900 2,178.4
32768 797,806,764 41,072.6

结果是惊人的;比单辆列车的结果快了1.4倍多。列车容量为一时,延迟从192.6纳秒降低到133.5纳秒;这显然是一个令人鼓舞的迹象。

因此我的实验还没结束。列车容量为2048的两个线程传递消息的延迟为2,178.4
纳秒,这太高了。我在想如何降低它,创建一个有很多辆列车 的例子:

澳门新浦京8455com 11

图 8: 使用多辆列车的单一生产者和单一消费者Railway 

我还把列车容量降到了1个long值,开始玩起了列车数量。下面是测试结果:

列车数量 吞吐量: ops/sec 延迟: ns
2 10,917,951 91.6
32 31,233,310 32.0
256 42,791,962 23.4
1024 53,220,057 18.8
32768 71,812,166 13.9

用32,768 列车在线程间发送一个long值的延迟降低到了13.9
纳秒。通过调整列车数量和列车容量,当延时不那么高,吞吐量不那么低时,吞吐量和延时就达到了最佳平衡。

对于单一生产者和单一消费者(SPSC)而言,这些数值很棒;但我们怎么让它在有多个生产者和消费者时也能生效呢?答案很简单,添加更多的车站!

澳门新浦京8455com 12

图 9:一个生产者和两个消费者的Railway

每个线程都等着下一趟列车,装货/卸货,然后把列车送到下一站。在生产者往列车上装货时,消费者在从列车上卸货。列车周而复始地从一个车站转到另一个车站。

为了测试单一生产者/多消费者(SPMC)
的情况,我创建了一个有8个车站的Railway测试。
一个车站属于一个生产者,而另外7个车站属于消费者。结果是:

列车数量 = 256 ,列车容量 = 32:

 ops/sec = 116,604,397     延迟(纳秒) = 274.4

列车数量= 32,列车容量= 256:

 ops/sec = 432,055,469     延迟(纳秒) = 592.5

如你所见,即便有8个工作线程,测试给出的结果也相当好–
32辆容量为256个long的列车吞吐量为432,055,469
ops/sec。在测试期间,所有CPU内核的负载都是100%。

澳门新浦京8455com 13

图 10:在测试有8个车站的Railway 期间的CPU 使用情况

在玩这个Railway算法时,我几乎忘了我最初的目标:提升多生产者/单消费者情况下的性能。

澳门新浦京8455com 14

图 11:三个生产者和一个消费者的 Railway 

我创建了3个生产者和1个消费者的新测试。每辆列车一站一站地转圈,而每个生产者只给每辆车装1/3容量的货。消费者取出每辆车上三个生产者给出的全部三项货物。性能测试给出的平均结果如下所示:

 ops/sec = 162,597,109  列车/秒 = 54,199,036     延迟(纳秒) = 18.5

结果相当棒。生产者和消费者工作的速度超过了160M ops/sec。

为了填补差异,下面给出相同情况下的Disruptor结果- 3个生产者和1个消费者:

Run 0, Disruptor=11,467,889 ops/sec
Run 1, Disruptor=11,280,315 ops/sec
Run 2, Disruptor=11,286,681 ops/sec
Run 3, Disruptor=11,254,924 ops/sec

下面是另一个批量消息的Disruptor 3P:1C 测试 (10 条消息每批):

Run 0, Disruptor=116,009,280 ops/sec
Run 1, Disruptor=128,205,128 ops/sec
Run 2, Disruptor=101,317,122 ops/sec
Run 3, Disruptor=98,716,683 ops/sec;

最后是用带LinkedBlockingQueue 实现的Disruptor 在3P:1C场景下的测试结果:

Run 0, BlockingQueue=4,546,281 ops/sec
Run 1, BlockingQueue=4,508,769 ops/sec
Run 2, BlockingQueue=4,101,386 ops/sec
Run 3, BlockingQueue=4,124,561 ops/sec

如你所见,Railway方式的平均吞吐量是162,597,109
ops/sec,而Disruptor在同样的情况下的最好结果只有128,205,128
ops/sec。至于 LinkedBlockingQueue,最好的结果只有4,546,281 ops/sec。

Railway算法为事件批处理提供了一种可以显著增加吞吐量的简易办法。通过调整列车容量或列车数量,很容易达成想要的吞吐量/延迟。

另外,
当同一个线程可以用来消费消息,处理它们并向环中返回结果时,通过混合生产者和消费者,Railway也能用来处理复杂的情况:

澳门新浦京8455com 15

图 12: 混合生产者和消费者的Railway

最后,我会提供一个经过优化的超高吞吐量 单生产者/单消费者测试:

澳门新浦京8455com 16

图 13:单个生产者和单个消费者的Railway

它的平均结果为:吞吐量超过每秒15亿 (1,569,884,271)次操作,延迟为1.3
微秒。如你所见,本文开头描述的那个规模相同的单线程测试的结果是每秒2,681,850,373。

你自己想想结论是什么吧。

我希望将来再写一篇文章,阐明如何用Queue和
BlockingQueue接口支持Railway算法,用来处理不同的生产者和消费者组合。敬请关注。

You can leave a response, or trackback from your own site.

Leave a Reply

网站地图xml地图