澳门新浦京娱乐游戏Java 9新特性,Part 6: Concurrency

Java语言通过已检测异常语法所提供的静态异常检测功能非常实用,通过它程序开发人员可以用很便捷的方式表达复杂的程序流程。

前言

本文翻译自Jeff Friesen发布在Javaworld的文章Java 9’s other new
enhancements,Part 6:
Concurrency
粗略翻译,仅供学习交流,若有错误之处欢迎指出(这一篇很多描述找不到明确的中文来对应,如果觉得很生涩可以参考下原文来理解)

实际上,如果某个函数预期将返回某种类型的数据,通过已检测异常,很容易就可以扩展这个函数,将所提供的输入不适于所请求的计算的各类情况都通知给调用者,以确保每种情况下都能够触发恰当的动作。而且由Java语言所提供的语法级的异常处理执行让这些异常像返回类型的隐式扩展一样,成为合理的函数签名一部分。

Promise async
generator是ES6之后才被提出来的,他们都能够用来解决以前JS异步调用产生的一系列问题,例如大名鼎鼎的回调地狱!!!


这种异常的抽象对于具有分层结构的程序来说特别方便,调用层只需要知道调用内部层级会出现哪些情况,而不需要了解更多的信息。然后,调用层只需要判定这些情况中的哪些需要其在自身范围内跟进,哪些应该作为其作用范围内的非法情况,递归通知到外部层级。

什么是回调地狱?

JEP 266: More Concurrency
Updates为响应式流(reactive
streams)定义了一个彼此协作的发布-订阅框架,改进了java.util.concurrent.CompletableFuture类,和各种其他改进。这篇文章将带你认识这些并发相关的改进。

这种针对自上而下的流程,识别和处理特殊情况的抽象通常是程序规格最自然的非正式表述方式。因此已检测异常的存在,能够让程序实现在视觉形态上可以尽可能的与最初的程序规格保持一致。

在以前js中,我们是无法知晓一个异步操作是否执行完成,为了在异步操作完成后执行特定的代码,我们需要传入回调函数,请看下面的栗子:

响应式流的发布-订阅框架

Java
9包含了一个响应式流的发布-订阅框架。在这部分,我会先介绍一下响应式流,然后展示这个框架。

举例来说,某个Internet服务的自上而下的规格说明可能会在多个层级中确定一个专用层级用于处理某个自定义的表示请求和响应的协议。可以用如下代码来描述这一层的正常行为:

这是一个简单的例子,在请求完成后执行特定的代码

介绍响应式流

数据处理的发展,从过去的收集数据然后在达到某些阈值后开始串行的处理数据的批处理架构,进化到处理数据尽可能快的面向流的架构。面向流的架构捕捉并处理活的数据,并且非常快的根据处理结果来修改系统(通常发生在几秒内甚至更短)。与之相反,一个批处理系统可能会花上几个小时,几天,甚至是几周来响应。
处理数据流(尤其是未确定容量的“活”的数据)在异步系统里需要格外注意。主要的问题是资源的消费者需要被控制着不被生产快速的数据源头给压倒。异步需要在一个机器上能够并行的使用计算机资源,网络上主机间的协作或者拥有多个CPU,这极大的提高来数据的处理速度。
响应式流为异步系统非阻塞的处理提供来一个标准。响应式流提供了方法来向数据源发送信号,当流的目的地被大量数据压倒(overwhelmed)时,示意数据源减缓生产数据。这个发信号的能力就像一个水管的阀门。关上了这个阀门在减轻了目的地的压力的同时增加了背压(back
pressure, 在数据源后的压力)。
最初的目标是在异步场景中管理数据流的交换,使得目的地不需要被迫缓冲任意数量的数据。换句话说,背压是这个模型里不可缺少的一部分,它调节线程间的队列。注意背压的交流被异步的处理了。
响应式流关注于寻找一个接口、方法和协议的最小集描述这些操作和实体来实现这个目标:非阻塞背压的异步处理流。

String processMessage(String req) {
   MyExpression exp = parseRequest(req);
   MyValue val = elaborate(exp);
   return composeResponse(val);
}
 //我们需要在请求完成后输出请求完成,请看回调法 function show { request => { console.log } /** * 模拟发起一个http请求 * @param {object} data 请求的参数 * @param {function} callBack 回调函数 */ function request { //下面的定时器模拟请求时间 setTimeout(data => { callBack; } show()

探索发布-订阅框架

Java 9提供了发布-订阅框架(即Flow
API)来支持响应式流,包含了java.util.concurrent.Flow,
java.util.concurrent.SubmissionPublisher类。
Flow包含了四个嵌套的静态接口,这些接口的方法创建了流控制组件,在其中发布者可以生产数据项,将会被一个或多个订阅者消费:

发布者(Publisher):生产可以被订阅者接收的数据
订阅者(Subscriber): 接收数据
订阅(Subscription): 发布者和订阅者之间的联系
处理者(processor):一个具体的数据传输程序中发布者和订阅者的结合

发布者实现了Flow.Publisher接口,发送数据流给所有注册了的订阅者。这个接口声明了一个方法,向发布者注册一个订阅者。

void subscribe(Flow.Subscriber<? super T> subscriber)

调用这个方法来为发布者注册一个订阅者。然而,如果订阅者已经注册过或者因为什么政策而注册失败,这个方法就会以IllegalStateException对象调用订阅者的onError()方法。否则,会以一个新的Flow.Subscription对象调用订阅者的onSubscribe()。如果null作为subscriber参数传给subscribe()方法,会抛出一个NullPointerException*异常。

订阅者实现Flow.Subscriber<T>接口,并订阅一个发布者来获取数据。这个接口声明了onSubscribe()和一些其他方法:

void onSubscribe(Flow.Subscription subscription)
void onComplete()
void onError(Throwable throwable)
void onNext(T item)
  • onSubscribe()被调用来确认注册,它接收一个subscription作为参数,subscription的方法允许向发布者请求新的数据,也可以请求发布者不再发送数据。
  • onComplete()当订阅者知道没有更多的Subscriber的方法会被调用时就会调用这个方法。这个方法后不能有其他Subscriber的方法被调用。
  • onError()在发布者或subscription发生不可恢复的异常时被调用。这个方法后不会有其他Subscriber的方法被调用。
  • onNext()当有了下一个数据项的时候被调用。如果这个方法抛出一个异常,结果不能被保证,也许会导致订阅被取消。

一个subscription提供了发布者和订阅者的联系,让订阅者只接收请求的数据项并且可以在任何时候取消。subscriptions实现了Flow.Subscription接口,声明了两个方法:

void request(long n)
void cancel()

request()方法为这个subscription向当前没有满的需求增加n个数据项。如果n小于等于0,会以IllegalArgumentException异常调用订阅者的onError()方法。否则,订阅者会被接收n次onNext()的调用(或者提前终止,则小于n)。传递Long.MAX_VALUE给n意味着不限制调用次数。

cancel()会引起订阅者逐渐的停止接收数据项,尽最大的努力,在cancel()调用后也许会接收额外的数据项。

最后,一个processor就是一个数据传输的函数,用来操作流,不需要改变订阅者和发布者。一个或多个processors的链存在于订阅者和发布者之间来传输数据。发布者和订阅者不依赖于transformation(s)。JDK不提供具体的processors的实现,你必须通过实现Processor接口来创建自己的processor。

SubmissionPublisher实现了Flow.Publisher异步的提交非空的数据给当前的订阅者直到它关闭。每个当前的订阅者都以相同的顺序接收新提交的数据项,除非丢弃或者遇到了异常。SubmissionPublisher提供了三个构造器来实例化一个Submission
publisher。最简单的(没有参数)的构造器构造一个依赖于ForkJoinPool.commonPool()方法来提供异步的向订阅者传递数据的操作(除非它不支持并行,这种情况下会new一个Thread对象来运行每个任务)。
Listing 1 展示了FlowDemo程序的源码,用来说明SubmissionPublisher和Java
9的响应流发布-订阅框架的其他方面。
Listing 1. FlowDemo.java

import java.util.Arrays;

import java.util.concurrent.Flow.*;
import java.util.concurrent.SubmissionPublisher;  

public class FlowDemo
{
   public static void main(String[] args)
   {
      // Create a publisher.

      SubmissionPublisher<String> publisher = new SubmissionPublisher<>();

      // Create a subscriber and register it with the publisher.

      MySubscriber<String> subscriber = new MySubscriber<>();
      publisher.subscribe(subscriber);

      // Publish several data items and then close the publisher.

      System.out.println("Publishing data items...");
      String[] items = { "jan", "feb", "mar", "apr", "may", "jun",
                         "jul", "aug", "sep", "oct", "nov", "dec" };
      Arrays.asList(items).stream().forEach(i -> publisher.submit(i));
      publisher.close();

      try
      {
         synchronized("A")
         {
            "A".wait();
         }
      }
      catch (InterruptedException ie)
      {
      }
   }
}

class MySubscriber<T> implements Subscriber<T>
{
   private Subscription subscription;

   @Override
   public void onSubscribe(Subscription subscription)
   {
      this.subscription = subscription;
      subscription.request(1);
   }

   @Override
   public void onNext(T item)
   {
      System.out.println("Received: " + item);
      subscription.request(1);
   }

   @Override
   public void onError(Throwable t)
   {
      t.printStackTrace();
      synchronized("A")
      {
         "A".notifyAll();
      }
   }

   @Override
   public void onComplete()
   {
      System.out.println("Done");
      synchronized("A")
      {
         "A".notifyAll();
      }
   }
}

我使用了Objectwait()notifyAll()方法来让main线程等待onComplete()结束。否则,你可能不会看到任何订阅者的输出。

Interned strings, locks, and more

JVM在字符串常量池中维持着一些字面常量的String对象。每一个intern
String对象都只有一份拷贝。比如说,每一个“A”字面常量都对应着一个一个包含A的interned
String对象,同步代码获得了关联唯一的interned
“A”对象的锁。我可以使用notify()代替notifyAll(),在这个例子没有影响。

输出如下:

Publishing data items...
Received: jan
Received: feb
Received: mar
Received: apr
Received: may
Received: jun
Received: jul
Received: aug
Received: sep
Received: oct
Received: nov
Received: dec
Done

了解更多响应式编程
查看Rahul Srivastava的Reactive Programming with JDK 9 Flow
API这篇文章来了解更多发布-订阅框架信息(包含一个processor的demo)。也可以查看JDK
9的Flow API文档。

除此之外,还需要能够识别各种出错的情况,每种情况可能都会导致不同的与客户端的交互方式。假设:

一次回调当然简单,如果是在这次请求完成后需要立即发起下一次请求呢?例如需要请求request10次,必须在上次请求完成后才能进行下一次请求,来看看
回调地狱 是怎么样的

CompletableFuture改进

Java
8引入了CompletableFuture<T>类,可能是java.util.concurrent.Future<T>明确的完成版(设置了它的值和状态),也可能被用作java.util.concurrent.CompleteStage。支持future完成时触发一些依赖的函数和动作。Java
9引入了一些
CompletableFuture*的改进:

  • 支持delays和timeouts
  • 提升了对子类化的支持
  • 新的工厂方法
  • parseRequest可能会识别出“语法问题”
    • 这种情况下,应该立即中断通信流;
  • 当某个请求所假定的可用资源不可用时,elaborate可能会识别出这个请求的“资源问题”
    • 在这种情况下,我们希望通过底层的传输协议(如HTTP
      404错误)通知上层这种资源缺乏的情况
  • 假如某个用户试图执行她没有权限执行的操作时,elaborate可能还会识别出“授信问题”
    • 在这种情况下,在我们自定义的协议中,会给客户端一个特定的响应
 //我们需要在请求完成后输出请求完成,请看回调法 function show { request => { console.log request => { console.log request => { console.log request => { console.log request => { console.log //这才第五次..... }) }) }) }) }) } /** * 模拟发起一个http请求 * @param {object} data 请求的参数 * @param {function} callBack 回调函数 */ function request { //下面的定时器模拟请求时间 setTimeout(data => { callBack; } show()

支持delays和timeouts

Java 9
用基于时间的一些方法扩展了CompletableFutrue,使future可以在一个确定的时长后以一个值完成或异常地完成。

  • CompletableFuture<T> completeOnTimeout(T value, long
    timeout, TimeUnit unit):

    在timeout(单位在java.util.concurrent.Timeunits
    units中,比如MILLISECONDS)前以给定的value完成这个CompletableFutrue。返回这个CompletableFutrue
  • CompletableFuture<T> orTimeout(long timeout, TimeUnit unit):如果没有在给定的timeout内完成,就以java.util.concurrent.TimeoutException完成这个CompletableFutrue,并返回这个CompletableFutrue

假设我们创造一个CompletableFutrue来从一些推荐服务里获取剧院的推荐。我们引入来一个方法,在推荐服务没有及时地提供期望的结果的时候加载静态的推荐信息:

Supplier<List<Theatre>> invokeRecommendationService = ...
CompletableFuture.supplyAsync(invokeRecommendationService)
                 .completeOnTimeout(Collections.singletonList(cats), 1, TimeUnit.SECONDS)
                 .thenAccept(showRecommendationsToUser);

如果推荐服务在1秒内向我们提供了推荐信息,我们就向用户展示这些信息。否则,我们就推荐用户查看Cats)(假设它正在上映)。新的方法completeOnTimeout(T
value, long timeout, TimeUnit
unit)
当它没有在timeout前完成时,以指定的value完成CompletableFuture
也许你不想提供一个静态的推荐。在这种情况下,你可以通过orTimeout(long
timeout, TimeUnit unit)
抛出一个TimeoutException

CompletableFuture.supplyAsync(invokeRecommendationService)
                 .orTimeout(1, TimeUnit.SECONDS)
                 .thenAccept(showRecommendationsToUser);

除此以外,还增加了一对静态的delayedExecutor()方法。每个方法返回一个java.util.concurrent.Executor,允许一个任务在一个确定的时长后执行:

Executor delayedExecutor(long delay, TimeUnit unit)
Executor delayedExecutor(long delay, TimeUnit unit, Executor executor)

利用已检测异常,我们可以用下面这种方式表示这一层级的代码:

这才第5次回调,但是代码的可读性已经极差了!

增强了对子类化的支持

做了许多改进使得CompletableFuture可以被更简单的继承。比如,你也许想重写新的Executor
defaultExecutor()
方法来代替默认的executor。
另一个新的使子类化更容易的方法是CompletableFuture is <U>
CompletableFuture<U> newIncompleteFuture()
。查看JDK
9的文档来了解更多关于这些方法的信息。

代码片段1:

让我们先看看 Promise async generator怎么解决这个问题,后面再说其使用方式

新的工厂方法

Java 8引入了<U> CompletableFuture<U> completedFuture(U
value)
工厂方法来返回一个已经以给定value完成了的CompletableFuture。Java
9以一个新的<U> CompletableFuture<U> failedFuture(Throwable
ex)
来补充了这个方法,可以返回一个以给定异常完成的CompletableFuture
除此以外,Java
9引入了下面这对stage-oriented工厂方法,返回完成的或异常完成的completion
stages:

  • <U> CompletionStage<U> completedStage(U value):返回一个新的以指定value完成的CompletionStage
    ,并且只支持CompletionStage里的接口。
  • <U> CompletionStage<U> failedStage(Throwable ex):返回一个新的以指定异常完成的CompletionStage
    ,并且只支持CompletionStage里的接口。
MyExpression parseRequest(String req) throws MySyntaxException { ... }
String composeResponse(MyValue val) { ... }
String composeErrorResponse(MyCredentialException ce) { ... }

MyValue elaborate(MyExpression exp) throws MyCredentialException, MyResourceException { ... }

String processMessage(String req) throws MySyntaxException, MyResourceException {
   MyExpression exp = parseRequest(req);
   try {
       MyValue val = elaborate(exp);
       return composeResponse(val);
   } catch (MyCredentialException ce) {
       return composeErrorResponse(ce);
   }
}

首先 Promise 篇

其他的并发改进

许多实现的改进从Java 8开始积累,直到Java
9引入。这些改进有的很小,比如有的改进只是包含了Javadoc的改写。
比如,下面的这个新的构造器,加入了java.util.concurrent.ForkJoinPool类,创造了一个fork/join池:

ForkJoinPool(int parallelism,
             ForkJoinPool.ForkJoinWorkerThreadFactory factory,
             Thread.UncaughtExceptionHandler handler,
             boolean asyncMode,
             int corePoolSize,
             int maximumPoolSize,
             int minimumRunnable,
             Predicate<? super ForkJoinPool> saturate,
             long keepAliveTime,
             TimeUnit unit)

还有,java.util.concurrent.atomic.AtomicReferenceArray<E>类的boolean
weakCompareAndSet(int i, E expectedValue, E
newValue)
被弃用,加入了下面的这些方法:

E compareAndExchange(int i, E expectedValue, E newValue)
E compareAndExchangeAcquire(int i, E expectedValue, E newValue)
E compareAndExchangeRelease(int i, E expectedValue, E newValue)
E getAcquire(int i)
E getOpaque(int i)
E getPlain(int i)
void setOpaque(int i, E newValue)
void setPlain(int i, E newValue)
void setRelease(int i, E newValue)
E weakCompareAndSetAcquire(int i, E expectedValue, E newValue)
boolean weakCompareAndSetPlain(int i, E expectedValue, E newValue)
E weakCompareAndSetRelease(int i, E expectedValue, E newValue)
E weakCompareAndSetVolatile(int i, E expectedValue, E newValue)

如果没有已检测异常,想要保存同样的信息,我们就需要引入专用的类型表示每种可能出现的特殊情况的函数输出。这些类型让我们可以保存所有可能的情况,包括在正常情况下所生成的值。

 //我们需要在请求完成后输出请求完成,请看回调法 function show { request.then( resolve => { console.log; return request.then( resolve => { console.log; return request.then( resolve => { console.log; return request.then( resolve => { console.log; return request.then( resolve => { console.log; return request } /** * 模拟发起一个http请求 * @param {object} data 请求的参数 * @param {function} callBack 回调函数 */ function request { return new Promise( resolve => { //下面的定时器模拟请求时间 setTimeout(data => { resolve } show()

小结

程序里的并发和并行的使用在持续不断的进化,这需要类库的持续不断的改进来支持。Java
9通过它的发布-订阅框架、CompletableFuture和其他的类的改进迈出了一大步。

此外,为了达到和基于类型的执行相同的层次,我们必须要扩展输出类型,封装这些类型所有可用的操作,这样才能将所有情况都考虑在内。

虽然还是很长,但是至少嵌套很少了,可读性也比之前更高

Unfortunately, Java seems not to supply ready-made mechanisms for
defining aggregate outcome types of this kind, that is, something like:

再来看看 async

不幸的是,Java看起来还没有现成的机制来定义下面这种聚合输出类型集合:

切记,async必须和Promise配合使用

Outcome<T, Exc1, Exc2, Exc3>

 //我们需要在请求完成后输出请求完成,请看回调法 async function show { let result = await request console.log; result = await request console.log; result = await request console.log; result = await request console.log; result = await request console.log; } /** * 模拟发起一个http请求 * @param {object} data 请求的参数 * @param {function} callBack 回调函数 */ function request { return new Promise( resolve => { //下面的定时器模拟请求时间 setTimeout(data => { resolve } show()

在上面的例子中,T是正常的返回值,增加的Exc1,Exc2等则是可能会出现的错误情况,这样这些输出中只有一个能够在返回时传递返回值。

代码是不是更加简短了?而且看起来和同步一样,事实上,这就是使用同步的方式写异步代码,这代码也是同步执行的

Java中最类似的工具就是Java
8的CompletionStage<T>,它封装了函数可能抛出的异常并且负责保证在检测到异常的情况下,跳过对前置输出的进一步操作。但是这个接口旨在启用“一元”风格的代码,将异常作为与正常工作流程完全分离的计算的某一方面隐藏。因此,这个工具是为了处理那些不需要恢复的异常而设计,并不适用于自定义已检测异常,因为已检测异常是工作流程不可分割的一部分。因此尽管CompletionStage<T> 可以在保持其他类型异常的同时,选择性的处理某些类型的异常,这种处理并不能在任意特定的情景下执行。

最后看看 generator

因此,如果要用CompletionStage<T>对我们之前的情况建模并保持基于类型的执行,就需要在基础类型T中包含我们的已检测异常同时还要保留专用的输出类型。

 //我们需要在请求完成后输出请求完成,请看回调法 function* show() { let a1 = yield request => { console.log; let a2 = yield request => { console.log; let a3 = yield request => { console.log; let a4 = yield request => { console.log; let a5 = yield request => { console.log; } /** * 模拟发起一个http请求 * @param {object} data 请求的参数 * @param {function} callBack 回调函数 */ function request { //下面的定时器模拟请求时间 setTimeout => { callBack; } let a = show; a.next;

坚持原生方式并引入定制化的专用输出类型后(同时仍然利用Java
8语法的优势),代码展示如下:

以上是异步编程的ES6解决方案,接下来让我们把这3种方式都详细了解下

代码片段2:

一.Promise

class ProcessingOutcome {
   private String resp;
   private MySyntaxErrorNotif se;
   private MyResourceErrorNotif re;

   ......
}

class ParsingOutcome {
   private MyExpression exp;
   private MySyntaxErrorNotif se;

   ......

   public ElaborationOutcome applyElaboration(
           Function<MyExpression,  ElaborationOutcome> elabFun) {
       if (se != null) {
           return new ExtendedElaborationOutcome(se);
       } else {
           return elabFun.apply(exp);
       }
   }
}

class ElaborationOutcome {
   private MyValue val;
   private MyCredentialErrorNotif ce;
   private MyResourceErrorNotif re;

   ......

   public ProcessingOutcome applyProtocol(
           Function<MyValue, String> composeFun,
           Function<MyCredentialErrorNotif, String> composeErrorFun) {
       if (re != null) {
           return new ProcessingOutcome(re);
       } else if (ce != null) {
           return new ProcessingOutcome(composeErrorFun.apply(ce));
       } else {
           return new ProcessingOutcome(composeFun.apply(val));
       }
   }
}

class ExtendedElaborationOutcome extends ElaborationOutcome {
   private MySyntaxErrorNotif se;

   ......

   public ProcessingOutcome applyProtocol(
           Function<MyValue, String> composeFun,
           Function<MyCredentialErrorNotif, String> composeErrorFun) {
       if (se != null) {
           return new ProcessingOutcome(se);
       } else {
           return super.applyProtocol(composeFun, composeErrorFun);
       }
   }
}

ParsingOutcome parseRequest(String req) { ... }
String composeResponse(MyValue val) { ... }
String composeErrorResponse(MyCredentialErrorNotif ce) { ... }

ElaborationOutcome elaborate(MyExpression exp) { ... }

ProcessingOutcome processMessage(String req) {
   ParsingOutcome expOutcome = parseRequest(req);
   ElaborationOutcome valOutcome = expOutcome.applyElaboration(exp -> elaborate(exp));
   ProcessingOutcome respOutcome = valOutcome.applyProtocol(
       val -> composeResponse(val), ce -> composeErrorResponse(ce));
   return respOutcome;
}

关于Promise的一些原型,函数,请移步 官方链接

实际上,通过比较代码片段1和代码片段2我们可以看到已检测异常这个特性实际上只是一种语法糖,旨在用前一种较短的语法重写之后这段代码,同时又保留了基于类型的执行的所有优点。

Promise的中文名,也就是承诺,保证,

不过,这个特性有一个令人讨厌的问题:它只能在同步代码中使用。

大家可以将Promise理解为JS的一个承诺,也就是对异步操作的一个承诺,咱先不管异步操作是否能够执行成功,使用Promise的所有异步操作,JS已经承诺处理了,咱就通过Promise的状态来知晓异步操作的执行结果。

如果在我们的流程中,即使很简单的子任务都可能会引入异步的API调用并且可能有较大的延迟,那么我们可能不希望让处理线程一直保持等待直到异步计算完成(仅考虑性能和可扩展性因素)。

一个 Promise有以下几种状态:

因此,在每个调用层级中,可能会在异步API调用之后执行的代码都不得不移到回调函数中。这样,就无法再用代码片段1中的简单递归结构启用静态异常检测。

pending: 初始状态,既不是成功,也不是失败状态。 fulfilled:
表示着操作完成,状态成功。 rejected: 意味着操作失败。

造成的后果就是,在异步代码中,能够保证每种错误情况最终会被处理的唯一方法可能只有将各种函数输出封装到专用的返回类型中。

pending 状态的 Promise 对象可能会变为fulfilled
状态并传递一个值给相应的状态处理方法,也可能变为失败状态并传递失败信息。当其中任一种情况出现时,Promise
对象的 then 方法绑定的处理方法就会被调用

幸运的是,利用Java 8
JDK,我们可以以一种能够保留代码结构的方式对在流程中引入异步性负责。例如,假设elaborate函数需要异步处理。那么就可以将其重写为返回一个CompletableFuture对象,代码将变成:

上文提到Promise的原型中的函数then,then可以接收2个参数

代码片段3:

第一个参数resolve 是对成功的一个处理,类型为Function。你可以在其中做
一些异步成功后的操作

class ProcessingOutcome {
   private String resp;
   private MySyntaxErrorNotif se;
   private MyResourceErrorNotif re;

   ......
}

class ParsingOutcome {
   private MyExpression exp;
   private MySyntaxErrorNotif se;

   ......

   public CompletableFuture<ElaborationOutcome> applyElaboration(
           Function<MyExpression, CompletableFuture<ElaborationOutcome>> elabFun) {
       if (se != null) {
           return CompletableFuture.completedFuture(new ExtendedElaborationOutcome(se));
       } else {
           return elabFun.apply(exp);
       }
   }
}

class ElaborationOutcome {
   private MyValue val;
   private MyCredentialErrorNotif ce;
   private MyResourceErrorNotif re;

   ......

   public ProcessingOutcome applyProtocol(
           Function<MyValue, String> composeFun,
           Function<MyCredentialErrorNotif, String> composeErrorFun) {
       if (re != null) {
           return new ProcessingOutcome(re);
       } else if (ce != null) {
           return new ProcessingOutcome(composeErrorFun.apply(ce));
       } else {
           return new ProcessingOutcome(composeFun.apply(val));
       }
   }
}

class ExtendedElaborationOutcome extends ElaborationOutcome {
   private MySyntaxErrorNotif se;

   ......

   public ProcessingOutcome applyProtocol(
           Function<MyValue, String> composeFun,
           Function<MyCredentialErrorNotif, String> composeErrorFun) {
       if (se != null) {
           return new ProcessingOutcome(se);
       } else {
           return super.applyProtocol(composeFun, composeErrorFun);
       }
   }
}

ParsingOutcome parseRequest(String req) { ... }
String composeResponse(MyValue val) { ... }
String composeErrorResponse(MyCredentialErrorNotif ce) { ... }
CompletableFuture<ElaborationOutcome> elaborate(MyExpression exp) { ... }
CompletableFuture<ProcessingOutcome> processMessage(String req) {
   ParsingOutcome expOutcome = parseRequest(req);
   CompletableFuture<ElaborationOutcome> valFutOutcome = expOutcome.applyElaboration(exp -> elaborate(exp));
   CompletableFuture<ProcessingOutcome> respFutOutcome = valFutOutcome.thenApply(outcome -> outcome.applyProtocol(
           val -> composeResponse(val), ce -> composeErrorResponse(ce)));
   return respFutOutcome;
}

第二个参数reject是对失败的一个处理,类型为Function。你可以在其中做
一些异步失败后的操作

在引入异步调用的同时保留代码结构是一个非常理想的功能。实际上,底层的执行到底是在同一个线程内还是(一次或多次)切换到不同的线程也许并不总是那么重要的方面。在我们最初的自上而下的规范中,并没有提及线程相关的事宜而且我们只是假设了一个比较显而易见的效率方面的潜在需求。在这里,适当的错误处理当然是更加重要的一个方面。

一般情况下then我们只会传一个参数,也就是默认的成功处理,失败处理会使用
catch函数

如果我们能够在有底层线程切换的情况下保留住代码片段1的代码结构,就像我们保留代码片段2的结构那样,就可能会获得最优的代码展示。

catch函数只有一个参数,也就是代表失败的reject

换句话说,既然代码片段2中的代码可以用更加简单的基于可检测异常的表示形式替换,为什么代码片段3中稍作变化的代码就不可以呢?

来看看then catch的使用案例

我们并不是说要试图正式面对问题,也不是说可以对语言做扩展以支持上述情况。我们只是先讨论一下如果有这样的扩展该多好。

 function show { //正常的请求成功操作 request.then( resolve => { console.log //在then里面同时做成功和失败的操作 request.then( resolve => { //这儿是成功 console.log; }, reject => { //这儿是失败 console.log //正常的请求失败操作 request.catch( reject => { console.log } /** * 模拟发起一个http请求 * @param {object} data 请求的参数 * @param {function} callBack 回调函数 */ function request { return new Promise => { //下面的定时器模拟请求时间 setTimeout(data => { // resolve reject } show()

为了阐明这个问题,假设Java可以识别一个函数是异步的但仍然是顺序执行的。例如,使用如下方式编写函数(使用一个神奇的关键字seq

使用 Promise
能够使你的异步操作变得更加优雅,可读性也比较高,同时,Promise在ES6的各大插件中也使用的相当广泛,所以掌握
Promise是非常有必要的

CompletableFuture<T> seq fun(A1 a1, A2 a2) { … }

二.async / await

我们可以让JVM以某种方式强制返回的CompletableFuture对象只完成一次(通过丢弃后续的虚假调用);这会被看作是这个函数的“正式”终止,不管实际的线程调用情况如何。

想详细了解更多,请移步官方文档

然后,编译器将允许我们使用好像由下述简化的签名所定义的fun函数(用另外一个神奇的关键字async):

async关键字用来定义一个function,用来标识此函数是一个异步函数

T async fun(A1 a1, A2 a2);

切记 切记 切记, await 关键字仅仅在 async function中有效。如果在 async
function函数体外使用 await ,你只会得到一个语法错误SyntaxError

有了这个签名,我们就可以像同步函数那样调用这个函数,不过JVM必须要负责提取fun函数之后所有制定的代码,并且在“正式”终止后(如,在CompletableFuture对象完成之后)在适当的线程中执行这些代码。

async关键字放在函数的声明之前,例如:

这种代码转换将递归地应用到函数调用栈中的所有函数之上。实际上,如果在定义一个新的函数时使用了fun函数的简化签名,新函数就需要强制包含async关键字,以表明这一函数本质上是异步的(虽然仍是顺序执行)。

 async function test => { } async data => { } class Test { async show() { } }

另外,调用如下签名的方法后,递归的传递将会终止

无论是普通的函数,Lambda表达式,或是ES6的class,该关键字都是放置在函数的声明之前

void async fun(A1 a1, A2 a2);

在调用声明了async的函数时,会返回一个Promise对象

以便调用线程(可能属于某个ExecutorService对象)可以完成其他的工作。

而await关键字则是放置在异步操作的调用之前,await会使得async函数在执行到异步操作时暂停代码执行,直到异步操作返回的Promise状态更改为
fulfilled 或
rejected,此时代码会继续执行,并自动解析Promise返回的值,无论是成功还是失败,都会解析到await关键字前面定义接收的参数,请看例子:

可以很方便地扩展上述假想的功能以支持已检测异常。在实践中,通过如下形式的函数定义:

class Test { /** * 线程休眠 * @param {number} timer 休眠毫秒数 */ Sleep { return new Promise( resolve => { setTimeout => { resolve }}let T = new Test();async function show() { console.log; await T.Sleep console.log

CompletableFuture<Outcome<T, E1, E2>> seq fun(A1 a1, A2
a2) { … }

上面的实例调用show函数,会立即打印出 第一次,延时1000毫秒后,会打印出
第二次

其中,Outcome是某个返回类型和异常的标准包装器,异常可以是一个或多个,编译器会把它看做由下述经过简化的签名所定义,从而允许我们使用这个函数:

原理嘛,就是模仿Java的线程休眠函数Sleep

T async fun(A1 a1, A2 a2) throws E1, E2;

在打印了 第一次
后,会调用T的Sleep函数,Sleep函数返回了一个Promise,在定时器执行完毕后调用Promise的resolve函数,会将Promise
的状态更改为
fulfilled,此时await检测到Promise的状态更改,继续执行代码,输出 第二次

利用这个语法,代码片段3的等价版本可以简化如下:

三 . generator

代码片段4:

generator是ES6标准引入的新的数据类型,使用方式是在函数名前加上*,generator和普通的函数差不多,但是可以返回多次。

MyExpression parseRequest(String req) throws MySyntaxException { ... }
String composeResponse(MyValue val) { ... }
String composeErrorResponse(MyCredentialException ce) { ... }

CompletableFuture<Outcome<MyValue, MyCredentialException, MyResourceException>> seq elaborate(MyExpression exp) { ... }
/*
   equivalent to:
   MyValue async elaborate(MyExpression exp) throws MyCredentialException, MyResourceException;
*/

String async processMessage(String req) throws MySyntaxException, MyResourceException {
   MyExpression exp = parseRequest(req);
   try {
       MyValue val = elaborate(exp);
       return composeResponse(val);
   } catch (MyCredentialException ce) {
       return composeErrorResponse(ce);
   }
}

嗯,所有的函数都有默认的返回值,如果没有明确定义,那就会返回undefined,generator也不例外!

而且,在elaborate中引入异步性的基础上,将代码片段1转化为代码片段4是很自然的事情。

generator使用yield关键字来中途返回值,请看案例

是否有什么其他的方法能够达成与可用的语法能达成的相似的目标(服从合理的妥协)?

 function* a { let sum = yield num + 1 console.log;//2 此处是next传入的值 let sum2 = yield sum + 2 } let result = a; let r = result.next;//此处返回第一次yield的值 2 console.log;//此处返回第二次yield的值 4 console.log;//此处并没有第三次yield

我们需要实现一种机制,通过这种机制,某个异步调用之后的所有代码都会在这个调用在其所在的线程中产生输出后被分割(比如,通过将其转入一个回调)并执行。

第一次输出的是第一次yield的值,此时num为调用a函数时传入的值
1,yield返回了num+1,所以第一行打印的对象 value值是 2

作为一种直观的方法,一种可能可行的尝试(只要每一层级的异步调用数量都比较少,这种尝试就是可行的)包含如下步骤:

第二次打印的是sum值,也就是第一个yield关键字前面接收的值,此值是下面result.next传入的
,next函数传入的参数,会赋值到相应的yield关键字左边接收的那个变量,在上方案例,也就是sum变量

  1. 首先,从工作流程的同步展示开始(如代码片段1所示),然后识别出可能会变成异步的函数(在这个例子中即指:evaluate以及相应的processMessage方法本身)
  2. 如果多个可能的异步调用存在于同一个函数中,就需要合理安排代码,可以通过引入中间函数的方式,每个函数中间仅包含一个可能的异步调用,所有其他的异步调用则作为返回前的最后一步操作被调用。(在我们的简单示例中,不需要做任何安排)
  3. 用这样的方式转化代码,每个可能成为异步函数并且参与了内部(inner)函数调用的外部(outer)函数都将会被分割为“outerBefore”和“outerAfter”两部分。outerBefore将包含所有在内部函数之前执行的代码,然后调用内部函数作为其最后一步操作;另一方面,outerAfter则将调用outerBefore作为其第一个操作,然后执行全部剩余代码。需要注意的是,这样造成的后果就是outerBeforeouterAfter将共享相同的参数。在我们的示例中,将会生成如下代码:代码片段5:

    MyExpression parseRequest(String req) throws MySyntaxException { ... }
    String composeResponse(MyValue val) { ... }
    String composeErrorResponse(MyCredentialException ce) { ... }
    
    String processMessage(String req) throws MySyntaxException, MyResourceException {
       return processMessageAfter(req);
    }
    String processMessageAfter(String req) throws MySyntaxException, MyResourceException {
       try {
           MyValue val = processMessageBefore(req);
           return composeResponse(val);
       } catch (MyCredentialException ce) {
           return composeErrorResponse(ce);
       }
    }
    
    MyValue processMessageBefore(String req)
           throws MySyntaxException, MyResourceException, MyCredentialException {
       MyExpression exp = parseRequest(req);
       return elaborate(exp);
    
    }
    
    MyValue elaborate(MyExpression exp) throws MyCredentialException, MyResourceException {
       return elaborateAfter(exp);
    }
    
    MyValue elaborateAfter(MyExpression exp) throws MyCredentialException, MyResourceException { ... }
    
    ......
    
  4. 引入专用的类用来包含由“xxxBefore”和“xxxAfter”组成的函数对,然后用一个临时实例调用任意函数对。我们的代码可能会扩展成如下形式:代码片段6:

    String processMessage(String req) throws MySyntaxException, MyResourceException {
       return new ProtocolHandler().processMessageAfter(req);
    }
    
    class ProtocolHandler {
    
       MyExpression parseRequest(String req) throws MySyntaxException { ... }
       String composeResponse(MyValue val) { ... }
       String composeErrorResponse(MyCredentialException ce) { ... }
    
       String processMessageAfter(String req) throws MySyntaxException, MyResourceException {
           try {
               MyValue val = processMessageBefore(req);
               return composeResponse(val);
           } catch (MyCredentialException ce) {
               return composeErrorResponse(ce);
           }
       }
    
       MyValue processMessageBefore(String req)
               throws MySyntaxException, MyResourceException, MyCredentialException {
           MyExpression exp = parseRequest(req);
           return elaborate(exp);
       }
    }
    
    MyValue elaborate(MyExpression exp) throws MyCredentialException, MyResourceException {
       return new ExpressionHandler().elaborateAfter(exp);
    }
    
    class ExpressionHandler {
       MyValue elaborateAfter(MyExpression exp) throws MyCredentialException, MyResourceException { ... }
    
       ......
    
    }
    
  5. 用适当的代理对象替代前一步引入的示例;代理的工作包括集合所有“xxxAfter”函数然后只在相关的“xxxBefore”函数完成后再调用它们(在“xxxBefore”函数完成的线程中)。最后这一步主要考虑将最内部函数转换为异步函数。最终的代码如下所示:代码片段7:

    String processMessage(String req) throws MySyntaxException, MyResourceException {
       ProtocolHandler proxy = createProxy(new ProtocolHandler());
       return proxy.processMessageAfter(req);
    }
    
    class ProtocolHandler {
    
       MyExpression parseRequest(String req) throws MySyntaxException { ... }
       String composeResponse(MyValue val) { ... }
       String composeErrorResponse(MyCredentialException ce) { ... }
       String processMessageAfter(String req) throws MySyntaxException, MyResourceException {
           try {
               MyValue val = processMessageBefore(req);
               return composeResponse(val);
           } catch (MyCredentialException ce) {
               return composeErrorResponse(ce);
           }
       }
    
       MyValue processMessageBefore(String req)
               throws MySyntaxException, MyResourceException, MyCredentialException {
           MyExpression exp = parseRequest(req);
           return elaborate(exp);
       }
    
    }
    
    MyValue elaborate(MyExpression exp) throws MyCredentialException, MyResourceException {
       ExpressionHandler proxy = createProxy(new ExpressionHandler());
       return proxy.elaborateAfter(exp);
    }
    
    class ExpressionHandler {
    
       MyValue elaborateAfter(MyExpression exp) throws MyCredentialException, MyResourceException { ... }
    
       ......
    
    }
    

第三次输出的是a函数中第二个yield返回的值,此时sum为第一次next传入的2,所以此次返回的值是2+2,所以值也就是
4

即使涉及到的转化全部完成之后,最终生成的代码作为初始规范的自然映射仍然具有较强的可读性。

第四次输出的是最后一个next(),但是上方generator并没有相应的yield返回,所以此时的value为undefined

作为附注,我们认为这种方法确实可行,特别是,对于代理工作的需求是可行的,本质上来说,代理用如下方式重写了“xxxBefore”和“xxxAfter”方法。

yield返回的值是一个对象,其中有done和value两个属性,

(让我们考虑一下示例中ProtocolHandler的代理)

done
表示该generator是否执行完毕,当没有yield返回时,done的值为true,也就是代表当前generator执行完毕
value表示此次yield关键字右方表达式返回的值,当没有yield时,value为undefined

  • Proxy.processMessageAfter:[此方法必须是这个代理的首次调用]
    • 记录获取到的参数
    • 查找上一个被调用的代理(如果存在)并通知它;记录查找到的信息;然后将当前代理设置为最后一个被调用的代理;
    • 用获取到的参数调用ProtocolHandler.processMessageBefore;
    • 如果某一方法已经调用了下一个代理并且发送通知,不再做任何事情;
    • 否则同步终止该方法;调用onCompleted (如下所示)并将方法的结果传递给它。
  • Proxy.processMessageBefore:[必须要从ProtocolHandler.processMessageAfter内部调用此方法,这样我们就会在onCompleted 方法内(如下所示)并且方法的结果也会被保留]
    • 回放保存的输出结果

generator是支持迭代器操作的,例:

除此之外:

 function* a { let sum = yield num + 1 console.log;//2 此处是next传入的值 let sum2 = yield sum + 2 } let result = a; for  { console.log; }
  • Proxy.onCompleted:
    • 记录作为参数获取的方法结果;
    • 将当前方法设置为被调用的最后一个代理;
    • 用调用Proxy.processMessageAfter时获取并保存的参数调用ProtocolHandler.processMessageAfter方法;
    • 如果某一方法已经调用了下一个代理并且发布通知,就不再做任何事情;不过,需要注意的是,要通知下一个代理它的前置代理并不是当前方法,而是当前方法的前置代理。
    • 其他情况下,这个方法将同步终止;如果有前置代理,则调用前置代理的onCompleted方法并将当前方法的输出作为参数传入。

事实证明generator是实现了迭代器的接口的!

以上只是一个不完全的概括。

嗯,关于generator的实际应用场景,我是没有遇见的,不过听说
async/await是generator的语法糖??

我们试图用这些理念用来创造一个完整的解决方案。目前的阶段性成果是可以应用于具体场景的一种实验性技术。

总结

这一预想的技术隐含着在易用性方面的许多妥协,这可能会限制其在有限的一些场景下的吸引力。在我们的场景中,已经证明我们在这种技术上所花费的努力是值得的。

以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,谢谢大家对脚本之家的支持。

感兴趣的读者可以从这里找到关于我们的技术的详细介绍,除此之外还包含一个对易用性利弊的全面讨论。

You can leave a response, or trackback from your own site.

Leave a Reply

网站地图xml地图