从任务到线程:Java结构化并发应用程序

绝大许多面世都以透过职务执行的方法来促成的。常常常有二种方法进行职务:串行和相互。

现身设计的面目,正是要把程序的逻辑解释为多少个义务,那几个职责独立而又同盟的到位程序的意义。而内部最关键的地点就是怎样将逻辑上的职务分配到实际的线程中去施行。换来说之,任务是指标,而线程是载体,线程的兑现要以义务为指标。

class SingleThreadWebServer {
    public static void main(String[] args) throws Exception {
        ServerSocket socket = new ServerSocket(80);
        while(true) {
            Socket conn = socket.accept();
            handleRequest(conn);
        }
    }
}

class ThreadPerTaskWebServer {
    public static void main(String[] args) throws Exception {
        ServerSocket socket = new ServerSocket(80);
        while(true) {
            final Socket conn = socket.accept();
            Runnable task = new Runnable() {
                public void run() {
                    handleRequest(conn);
                }
            };
            new Thread(task).start();
        }
    }
}

1. 在线程中施行职务

并发程序设计的第一步正是要分开任务的疆界,理想图景下正是装有的职分都独立的:每一种职责都以不依靠于任何职责的图景,结果和边际。因为独立的任务是最有利并发设计的。

有一种最自然的职责划分方法正是以单独的顾客诉求为职分边界。每一个客商央浼是单身的,则管理职分伏乞的职分也是独自的。

在划分完职责之后,下一标题正是怎么调节这一个职务,最简便易行的形式就是串行调用全部任务,相当于一个多个的实施。

举例下边包车型客车那个套接字服务程序,每一趟都必须要响应二个伸手,下八个央浼要求等上四个央浼执行完结之后再被拍卖。

public class SingleThreadWebServer {
    public static void main(String[] args) throws IOException {
        ServerSocket socket = new ServerSocket(80);
        while (true) {
            Socket connection = socket.accept();
            handleRequest(connection);
        }
    }

    private static void handleRequest(Socket connection) {
        // request-handling logic here
    }
}

这种规划当然是无法满意必要的,并发的高吞吐和高响应速度的优势都没发挥出来。

本来上边的那三种方式都以有难点的。单线程的难点正是并发量会是瓶颈,三十多线程版本正是无界定的创立线程会导致财富不足难题。

1.1 展现地成立线程

上述代码的优化版便是为每一个要求都分配独立的线程来实行,也等于每二个乞请职分都是二个独自线程。

public class ThreadPerTaskWebServer {
    public static void main(String[] args) throws IOException {
        ServerSocket socket = new ServerSocket(80);
        while (true) {
            final Socket connection = socket.accept();
            //为每个请求创建单独的线程任务,保证并发性
            Runnable task = new Runnable() {
                public void run() {
                    handleRequest(connection);
                }
            };
            new Thread(task).start();
        }
    }

    private static void handleRequest(Socket connection) {
        // request-handling logic here
    }
}

如此那般设计的亮点在于:

  • 职分管理线程从主线程分离出来,使得主线程不用等待职分完结就能够去神速地去响应下二个诉求,以到达高响应速度;
  • 职分管理能够并行,协理同偶尔候管理三个诉求;
  • 职务管理是线程安全的,因为每一种职分都以单身的。

唯独需求小心的是,任必需需是线程安全的,否者八线程并发时会至极。

Executor 框架

任务是一组逻辑工作单元,而线程是使任务异步履行的编制。

JDK 提供了 Executor 接口:

public interface Executor {
    void execute(Runnable command);
}

即便如此 Executor
接口比较容易,可是却是异步职分奉行框架的根底,该框架能支撑多样差别类型的职分试行政策。它提供了一种标准的艺术把职务的交给进程与实行进度进展了然耦。用
Runnable 来表示职务。Executor
的兑现提供了对生命周期的支撑以至总结消息应用程序管理等编写制定。

Executor
是基于临蓐者购买者方式的,提交任务的操作相当于劳动者,实行职务的线程相当于花费。

基于 Executor 的 WebServer 例子如下:

public class TaskExecutorWebServer {
    private static final int NTHREADS = 100;
    private static final Executor exec = Executors.newFixedThreadPool(NTHREADS);

    public static void main(String[] args) throws Exception {
        ServerSocket serverSocket = new ServerSocket(80);
        while (true) {
            final Socket conn = serverSocket.accept();
            Runnable task = new Runnable() {
                @Override
                public void run() {
                    handleRequest(conn);
                }
            };
            exec.execute(task);
        }
    }
}

别的能够团结实现 Executor 来决定是现身依旧并行的,如上边代码:

/**
 * 执行已提交的 Runnable 任务的对象。
 * 此接口提供一种将任务提交与每个任务将如何运行的机制(包括线程使用的细节、调度等)分离开来的方法。
 * 通常使用 Executor 而不是显式地创建线程。
 *
 *
 * @author renchunxiao
 *
 */
public class ExecutorDemo {
    public static void main(String[] args) {
        Executor executor = new ThreadExecutor();
        executor.execute(new Runnable() {
            @Override
            public void run() {
                // do something
            }
        });

        Executor executor2 = new SerialExecutor();
        executor2.execute(new Runnable() {
            @Override
            public void run() {
                // do something
            }
        });

    }
}

/**
 * 创建一个线程来执行 command
 *
 * @author renchunxiao
 *
 */
class ThreadExecutor implements Executor {
    @Override
    public void execute(Runnable command) {
        new Thread(command).start();
    }
}

/**
 * 串行执行 command
 *
 * @author renchunxiao
 *
 */
class SerialExecutor implements Executor {
    @Override
    public void execute(Runnable command) {
        command.run();
    }
}

1.3 无界定创造线程的欠缺

可是上述的方案也许有欠缺的:

  1. 线程的生命周期的开销超大:每创设二个线程都以要消耗大批量的寻思能源;
  2. 财富的花费:活跃的线程要开销内部存款和储蓄器能源,若是有太多的悠闲能源就能使得众多内部存储器资源浪费,招致内部存款和储蓄器财富不足,多线程并发时就能够并发能源强占的标题;
  3. 路不拾遗:可成立线程的个数是有限制的,过多的线程数会招致内部存款和储蓄器溢出;

采纳制造线程来抨击的例子中,最醒指标正是一再成立死循环的线程,最终引致整个计算机的能源都耗尽。

线程池

线程池正是线程的能源池,能够经过 Executors
中的静态工厂方法来成立线程池。

  • newFixedThreadPool。创制固定长度的线程池,每一回提交职分创立叁个线程,直到达到线程池的最大数据,线程池的大大小小不再变化。
  • newSingleThreadExecutor。单个线程池。
  • newCachedThreadPool。根据职分范围变动的线程池。
  • newScheduledThreadPool。创设固定长度的线程池,以延缓或依期的艺术来试行职务。

JVM 唯有在全部非守护线程整体安息后才会退出,所以,如若不恐怕精确的关门
Executor,那么 JVM 就无法收场。

为了消除试行服务的生命周期难点,有个增添 Executor 接口的新接口
ExecutorService。

public interface ExecutorService extends Executor {

    void shutdown();

    List<Runnable> shutdownNow();

    boolean isShutdown();

    boolean isTerminated();

    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;

    <T> Future<T> submit(Callable<T> task);

    <T> Future<T> submit(Runnable task, T result);

    Future<?> submit(Runnable task);

    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;

    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;

    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;

    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

ExecutorService 生命周期有二种情景:运维、关闭、已消声匿迹。ExecutorService在起来成立时处于运维处境。shutdown
方法会平缓关闭:不在选拔新的职分,并且等待已经实践的职分实践到位(富含这么些还没开头的天职卡塔尔国。shutdownNow
方法将强行关闭:它将尝试撤除全体运营中的义务,而且不再运维队列中从不起头的天职。全体任务都奉行到位后步向到已消声匿迹景况。

2.Executor框架

职分是一组逻辑工作单元,而线程则是职责异步实践的建制。为了让职责越来越好地分配到线程中执行,java.util.concurrent提供了Executor框架。

Executor基于临蓐者-消费者格局:提交职责的操作相当于劳动者(生成待完毕的劳作单元),奉行义务的线程则一定于消费者(施行完那个职业单元)。

将上述的服务端代码改换为Executor框架如下:

public class TaskExecutionWebServer {
    //设定线程池大小;
    private static final int NTHREADS = 100;
    private static final Executor exec
            = Executors.newFixedThreadPool(NTHREADS);

    public static void main(String[] args) throws IOException {
        ServerSocket socket = new ServerSocket(80);
        while (true) {
            final Socket connection = socket.accept();
            Runnable task = new Runnable() {
                public void run() {
                    handleRequest(connection);
                }
            };
            exec.execute(task);
        }
    }

    private static void handleRequest(Socket connection) {
        // request-handling logic here
    }
}

Callable 和 Future

Executor 框架使用 Runnable 作为主导的天职表示情势。Runnable
是一种有局限性的抽象,它的 run 方法无法再次回到值和抛出一个受检查卓殊。

众多职务实际是存在延时的乘除,比方数据库查询,从网络获取能源。对于那几个职务,Callable
是更加好的聊以自慰,它以为 call 将回到五个值,并且只怕抛出特别。

Executor
实施的天职有八个生命周期阶段:创设、提交、先河和实现。由于有些任务要求非常短日子有希望希望收回,在
Executor 框架个中,已提交未起先的义务能够裁撤。

Future
表示三个义务的生命周期,而且提供了相应的法门来剖断是不是业已到位或注销,以至获得任务的结果和撤回义务等。

2.1 线程池

Executor的本色正是处理和调解线程池。所谓线程池正是指管理一组同构职业线程的能源池。线程池和任务队列相反相成:职分队列中保存着具备带实施的职分,而线程池中负有能够去推行职责的行事线程,职业线程从职分队列中世界一个职分实践,试行义务完成之后在重返线程池中等待下二个职分的赶来。

任务池的优势在于:

  1. 由此复用现成线程而不是创立新的线程,减少创设线程时的支付;
  2. 复用现存线程,能够一直实践职责,幸免因创造线程而让义务等待,升高响应速度。

Executor能够创立的线程池共有各个:

  1. newFixedThreadPool,即定位大小的线程池,假如有线程因发生了丰盛而夭亡,会创制新的线程代替:
  2. newCachedThreadPool,即支持缓存的线程池,假使线程池的层面超过了必要的范围,就能回笼空闲线程,要是供给大增,则会扩展线程池的范畴;
  3. newScheduledThreadPool,固定大小的线程池,並且以延时要么准期的章程实施;
  4. newSingleThreadExecutor,单线程方式,串行实践职分;

2.2 Executor的生命周期

那边须要单独说下Executor的生命周期。由于JVM独有在非守护线程全体停下才会脱离,所以只要没得法退出Executor,就能够形成JVM一定要荒谬甘休。可是Executor是接纳异步的不二秘技实行线程,并不能够立即掌握所有线程的景观。为了越来越好的管理Executor的生命周期,Java1.5伊始提供了Executor的恢宏接口ExecutorService

ExecutorService提供了三种形式关闭措施:

  • shutdown:
    平缓的闭馆进度,即不再接受新的职务,等到已提交的职分奉行达成后关门进度池;
  • shutdownNow: 立即关闭全数职分,无论是或不是再试行;

服务端ExecutorService版的落到实处如下:

public class LifecycleWebServer {
    private final ExecutorService exec = Executors.newCachedThreadPool();

    public void start() throws IOException {
        ServerSocket socket = new ServerSocket(80);
        while (!exec.isShutdown()) {
            try {
                final Socket conn = socket.accept();
                exec.execute(new Runnable() {
                    public void run() {
                        handleRequest(conn);
                    }
                });
            } catch (RejectedExecutionException e) {
                if (!exec.isShutdown())
                    log("task submission rejected", e);
            }
        }
    }

    public void stop() {
        exec.shutdown();
    }

    private void log(String msg, Exception e) {
        Logger.getAnonymousLogger().log(Level.WARNING, msg, e);
    }

    void handleRequest(Socket connection) {
        Request req = readRequest(connection);
        if (isShutdownRequest(req))
            stop();
        else
            dispatchRequest(req);
    }

    interface Request {
    }

    private Request readRequest(Socket s) {
        return null;
    }

    private void dispatchRequest(Request r) {
    }

    private boolean isShutdownRequest(Request r) {
        return false;
    }
}

2.3 延迟职责和周期性职责

Java中提供Timer来施行延时职务和周期职务,然而Timer类有以下的后天不良:

  1. 提姆er只会创建三个线程来施行任务,借使有三个TimerTask实施时间太长,就能潜濡默化到任何TimerTask的准时精度;
  2. Timer不会捕捉TimerTask未定义的十一分,所以当有极其抛出到Timer中时,Timer就能够崩溃,何况也无计可施恢复生机,就能够影响到已经被调解可是并未有施行的任务,变成“线程败露”。

提出采用ScheduledThreadPoolExecutor来代替Timer类。

3. Callable & Future

如上文所说,Executor以Runnable的样式描述职分,不过Runnable有相当的大的局限性:

  • 不曾重回值,只是试行职责;
  • 不可能管理被抛出的足够;

为了弥补以上的标题,Java中布置了另一种接口Callable

public interface Callable<V> {
    V call() throws Exception;
}

Callable支持任务有重回值,并匡助特别的抛出。固然愿意得到子线程的实行结果,那Callable将比Runnable更为切合。

任由Callable照旧Runnable都以对于职务的指雁为羹描述,即注解职务的限量:有威名赫赫的起源,而且都会在肯定原则下甘休。

Executor框架下所实行的职分都有四种生命周期:

  • 创建;
  • 提交;
  • 开始;
  • 完成;

对于贰个已提交但尚未曾从头的义务,是足以任何时候被甘休;不过借使一个任务现已假使已经上马实践,就不得不等到其相应中断时再收回;当然,对于几个早就履行到位的职分,对其打消职务是一贯不别的作用的。

既然职分有生命周期,那要如何本领明白一个任务当前的生命周期状态吧?Callable既然有重回值,怎样去在主线程中获取子线程的重回值呢?为了消除这么些标题,就须求Future类的支持。

public interface Future<V> {
    //取消任务
    boolean cancel(boolean mayInterruptIfRunning);
    // 任务是否被取消
    boolean isCancelled();
    // 任务是否完成
    boolean isDone();
    // 获得任务的返回值
    V get() throws InterruptedException, ExecutionException;
    // 在超时期限内等待返回值
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

Future类表示职务生命周期状态,其取名突显了职责的生命周期只可以前进不可能后退。

Future类提供格局查询职分状态外,还提供get办法赢得职分的重临值,可是get方法的行事决定于职分境况:

  • 假如职务现已做到,get方法则会立刻回去;
  • 假使职责还在施行中,get方法规会拥塞直到职分到位;
  • 举个例子任务在推行的进度中抛出十分,get方法会将该特别封装为ExecutionException中,并能够透过getCase方法得到具体极度原因;

假若将多少个Callable对象提交给ExecutorService,submit方法就能重回叁个Future对象,通过这几个Future对象就足以在主线程中获得该职务的动静,并赢得再次来到值。

除了这几个之外,能够显式地把Runnable和Callable对象封装成FutureTask对象,FutureTask不光世袭了Future接口,也世襲Runnable接口,所以能够一向调用run方法推行。

既然如此是出新管理,当然会蒙受一次性交给一组职分的情景,此时能够选取CompletionService,Completion瑟维斯能够精晓为Executor和BlockingQueue的构成:当一组职分被交给后,CompletionService将奉公守法职务成功的各类将任务的Future对象归入队列中。

CompletionService的接口如下:

public interface CompletionService<V> {

    Future<V> submit(Callable<V> task);

    Future<V> submit(Runnable task, V result);
    //如果队列为空,就会阻塞以等待有任务被添加
    Future<V> take() throws InterruptedException;

    //如果队列为空,就会返回null;
    Future<V> poll();

    Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;
}

请介意take方法和poll方法的分别。

而外利用CompletionService来叁个三个得到成功职分的Future对象外,还是能够调用ExecutorSerive的invokeAll()方法。

invokeAll扶助限期提交一组任务(职责的联谊),并拿走叁个Future数组。invokeAll方法将如约职务集合迭代器的次第将职务对应的Future对象归入数组中,这样就足以把传播的职分(Callable)和结果(Future)联系起来。当全部义务实行完成,或许逾期,再也许被搁浅时,invokeAll将赶回Future数组。

当invokeAll方法重回时,每一个职分依然正常完成,要么被撤回,即都感觉止的境况了。

You can leave a response, or trackback from your own site.

Leave a Reply

网站地图xml地图