Java 并发

计算机用户想当然地认为他们的系统在一个时间可以做多件事。他们认为,他们可以工作在一个字处理器,而其他应用程序在下载文件,管理打印队列和音频流。即使是单一的应用程序通常也是被期望在一个时间来做多件事。例如,音频流应用程序必须同时读取数字音频,解压,管理播放,并更新显示。即使字处理器应该随时准备响应键盘和鼠标事件,不管多么繁忙,它总是能格式化文本或更新显示。可以做这样的事情的软件称为并发软件(concurrent
software)。

  • 原文链接
  • 译者:靖靖

基本线程

在 Java 平台是完全支持并发编程。自从 5.0
版本以来,这个平台还包括高级并发 API, 主要集中在 java.util.concurrent
包。

并发

线程

一个线程就是在进程中的一个单一的顺序控制流,因此,单个进程可以拥有多个并发执行的任务。
1.继承Thread类,并且实现run()方法。
2.Runnable接口对象的run()方法。
Runnable更加体现面向对象,Thread是一个线程,Runnable是线程的代码。

进程(Processes )和线程(Threads)

进程和线程是并发编程的两个基本的执行单元。在 Java
中,并发编程主要涉及线程。

一个计算机系统通常有许多活动的进程和线程。在给定的时间内,每个处理器只能有一个线程得到真正的运行。对于单核处理器来说,处理时间是通过时间切片来在进程和线程之间进行共享的。

现在多核处理器或多进程的电脑系统越来越流行。这大大增强了系统的进程和线程的并发执行能力。但即便是没有多处理器或多进程的系统中,并发仍然是可能的。

进程和线程

在并发编程当中,有两个基本的执行单元:进程和线程。在java中,我们大部分关心的是线程。然而进程也很重要。

一个电脑系统通常有许多活跃的进程和线程。在只有一个核心的系统当中,在任意一个时刻,实际上只有一个线程在执行。进程和线程通过操作系统的时间分片特性共享单个核心的处理时间。

定义任务

实现Runnable接口并编写run()方法,Runnable接口只有run()方法。
run()方法是顺序执行的。

进程

进程有一个独立的执行环境。进程通常有一个完整的、私人的基本运行时资源;特别是,每个进程都有其自己的内存空间。

进程往往被视为等同于程序或应用程序。然而,用户将看到一个单独的应用程序可能实际上是一组合作的进程。大多数操作系统都支持进程间通信(
Inter Process Communication,简称 IPC)资源,如管道和套接字。IPC
不仅用于同个系统的进程之间的通信,也可以用在不同系统的进程。

大多数 Java 虚拟机的实现作为一个进程运行。Java
应用程序可以使用 ProcessBuilder 对象创建额外的进程。多进程应用程序超出了本书的讲解范围。

进程

一个进程有独立的执行环境。进程一般有完整的、私有的基本运行资源。尤其要说的每个进程都有自己的独立内存空间。

进程经常被视作一个程序或者是一个应用。然而那些被用户视作的单个应用程序进程也许实际上由一些相互协作的进程组合而成。为了促进进程间的通信,大部分操作系统支持内部进程交流(Inter
Process
Communication[IPC])资源,例如管道和套接字。IPC不仅仅可以在同一个系统的进程间进行通信,也可以在不同的系统的进程间进行通信。

大多数的Java虚拟机的实现都是以单个进程的方式运行的。一个Java应用程序可以用ProcessBuilder对象创建额外的进程。多进程应用程序不再这节课的讨论范围之内。

Runnable对象转变为工作任务

调用Thread的start()方法

线程

线程有时被称为轻量级进程。进程和线程都提供一个执行环境,但创建一个新的线程比创建一个新的进程需要更少的资源。

线程中存在于进程中,每个进程都至少一个线程。线程共享进程的资源,包括内存和打开的文件。这使得工作变得高效,但也存在了一个潜在的问题——通信。

多线程执行是 Java
平台的一个重要特点。每个应用程序都至少有一个线程,或者几个,如果算上“系统”的线程(负责内存管理和信号处理)那就更多。但从程序员的角度来看,你启动只有一个线程,称为主线程。这个线程有能力创建额外的线程。

线程

线程有的时候叫做轻量级的进程。进程和线程都提供一个执行的环境,但是创建一个新的线程所需要的资源比创建一个进程所需要的资源少。

线程存在在进程之中,每一个进程至少有一个线程。线程共享进程的资源,包括内存和打开的文件。这样设计是为了提高效率和交流,但是可能带来隐藏的问题。

多线程执行是Java虚拟机的基本特性。如果你计数系统的线程像内存管理和信号处理一样,那么每一个应用程序至少有一个线程或者是多个。但是从应用程序开发者的角度看,你仅仅只调用了一个叫做main
thread的线程。这个线程有能力去创建额外的线程,我们将在下章节进行讲解。

Executor执行器

Executor在客户端和任务执行之间提供了一个间接层,Executor允许管理异步任务的执行,无需显式地管理线程的生命周期。

  • CachedThreadPool在执行过程中通常会创建与所需数量相同的县城,然后在它回收线程时停止创建新线程。
  • FixedThreadPool使用的Thread对象的数量是有界的。
  • SingleThreadExecutor就是线程数量为1的FixedThreadPool,确保任意时刻再任何线程中都只有未已的任务在运行。

线程对象

每个线程都与 Thread 类的一个实例相关联。有两种使用线程对象来创建并发应用程序的基本策略:

  • 为了直接控制线程的创建和管理,简单地初始化线程,应用程序每次需要启动一个异步任务。
  • 通过传递给应用程序任务给一个 Executor,从而从应用程序的其他部分抽象出线程管理。

线程对象

每一个线程都可以和类Thread的一个实例联系起来。有两种基本的策略使用线程对象创建一个并发的应用程序。

  • 为了直接控制线程的创建和管理,仅仅只在应用程序需要启动一个异步任务的时候实例化线程
  • 为了抽象线程的管理,可以将应用程序的任务加入到executor中。
    本节介绍Thread对象的使用。Executors将和更高级别并行对象一起讨论。

具有返回值的任务

实现Callable接口而不是Runnable接口,实现接口的(具有返回值)call()方法,并且使用ExecutorService.submit()方法调用它。
submit()方法会产生Future对象,可以调用Future对象的get()方法来获取该结果。
可以调用isDone()方法来检查Future是否已经完成。

定义和启动一个线程

有两种方式穿件 Thread 的实例:

  • 提供 Runnable 对象。Runnable 接口定义了一个方法 run
    ,用来包含线程要执行的代码。如 HelloRunnable 所示:

    public class HelloRunnable implements Runnable {
        /* (non-Javadoc)
         * @see java.lang.Runnable#run()
         */
        @Override
        public void run() {
            System.out.println("Hello from a thread!");
        }
    
        /**
         * @param args
         */
        public static void main(String[] args) {
            (new Thread(new HelloRunnable())).start();
        }
    }
    

  • 继承 Thread。Thread 类本身是实现 Runnable,虽然它的 run
    方法啥都没干。HelloThread 示例如下:

    public class HelloThread extends Thread {
    
        public void run() {
            System.out.println("Hello from a thread!");
        }
        /**
         * @param args
         */
        public static void main(String[] args) {
            (new HelloThread()).start();
        }
    }
    

请注意,这两个例子调用 start 来启动线程。

第一种方式,它使用 Runnable 对象,在实际应用中更普遍,因为 Runnable
对象可以继承 Thread
以外的类。第二种方式,在简单的应用程序更容易使用,但受限于你的任务类必须是一个
Thread 的后代。本书推荐使用第一种方法,将 Runnable 任务从 Thread
对象分离来执行任务。这不仅更灵活,而且它适用于高级线程管理 API。

Thread 类定义了大量的方法用于线程管理。

线程的定义和启动

一个程序创建一个线程的实例必须提供需要运行的代码。下面有两种实现的方法:

  • 提供一个Runnable的对象。Runnable接口定义了一个方法run,这个方法里面包含了在线程当中执行的代码。Runnable对象可以做为Thread的构造器参数就像下面HelloRunnable的例子:

    ` public class HelloRunnable implements Runnable {
    
          public void run() {
              System.out.println("Hello from a thread!");
          }
    
          public static void main(String args[]) {
             (new Thread(new HelloRunnable())).start();
          }
       }  `
    
  • 继承Thread。类Thread本身就实现了Runnable接口,Thread的run方法什么都没有做。程序可以继承类Thread,实现自己的run方法就像下面的HelloThread例子:

    `public class HelloThread extends Thread {
    
          public void run() {
              System.out.println("Hello from a thread!");
          }
    
          public static void main(String args[]) {
                (new HelloThread()).start();
          }
        } `
    

请注意两个例子都是调用Thread的start的方法启动新的线程。

你应该使用哪一种方法?第一种方法实现Runnable接口,这种方法使用更加普遍,因为除了Thread类,这个对象还可以继承其他类。第二种方法在简单的程序中使用起来更简单,但是有个限制就是你的任务类必须是Thread的子类。这一节主要是聚焦在第一种方法上,它把用Runnable任务和用Thread对象去执行任务区分开来。这种方法不仅仅更加灵活,而且适用性更高,这点在后面更高级别的线程管理APIs中会提到。

类Thread定义了一些有用的线程管理的方法。包括提供一些关于调用方法的线程的信息和影响线程状态的静态方法。被其他线程调用的一些方法也可以管理线程和Thread对象。我们将在接下来的章节中学习它们中的一些方法。

休眠

concurrent包里提供了TimeUnit类,可以调用sleep()方法进行休眠。

Sleep 来暂停执行

Thread.sleep
可以当前线程执行暂停一个时间段,这样处理器时间就可以给其他线程使用。

sleep
有两种重载形式:一个是指定睡眠时间到毫秒,另外一个是指定的睡眠时间为纳秒级。然而,这些睡眠时间不能保证是精确的,因为它们是通过由基础
OS
提供的,并受其限制。此外,睡眠周期也可以通过中断终止,我们将在后面的章节中看到。在任何情况下,你不能假设调用
sleep 会挂起线程用于指定精确的时间段。

SleepMessages 示例使用 sleep 每隔4秒打印一次消息:

public class SleepMessages {

    /**
     * @param args
     */
    public static void main(String[] args) throws InterruptedException {
        String importantInfo[] = { "Mares eat oats", "Does eat oats", "Little lambs eat ivy",
                "A kid will eat ivy too" };

        for (int i = 0; i < importantInfo.length; i++) {
            // Pause for 4 seconds
            Thread.sleep(4000);
            // Print a message
            System.out.println(importantInfo[i]);
        }
    }
}

请注意 main 声明抛出 InterruptedException。当 sleep
是激活的时候,若有另一个线程中断当前线程时,则 sleep
抛出异常。由于该应用程序还没有定义的另一个线程来引起的中断,所以考虑捕捉
InterruptedException。

用Sleep方法暂停线程的执行

Thread的sleep方法调用会让当前执行的线程暂停一段特定的时间。这是让运行在电脑系统上的应用或者其他应用的其他线程可以占用处理器时间的有效方式。Sleep方法也可以让线程一步一步的执行,就像下面例子中展示的,或者是等待另外一个有时间需求的线程,就像后面章节中介绍的SimpleThreads例子。

提供了两个重载的sleep方法:也是休眠时间单位是微秒,另外一个休眠的时间单位是纳米。然而,这些休眠的时间不能保证是准确的,因为它们受限于操作系统之下的硬件设备。同时,休眠过程中也可以被中断终止,我们将在后面的章节看到。在任何情况下,你不可以认为调用sleep方法可以让线程暂停指定的准确的时间。

SleepMessages例子用sleep方法实现每4秒打印消息:

    `public class SleepMessages {

        public static void main(String args[]) throws InterruptedException {
            String importantInfo[] = {
                "Mares eat oats",
                "Does eat oats",
                "Little lambs eat ivy",
                "A kid will eat ivy too"
            };

            for (int i = 0;
                 i < importantInfo.length;
                 i++) {
                //Pause for 4 seconds
                Thread.sleep(4000);
                //Print a message
                System.out.println(importantInfo[i]);
            }
        }
    }`

注音main函数声明抛出InterruptedException。当一个线程在睡眠当中,另外一个线程中断这个线程,那么sleep方法将抛出InterruptedException。因为这个程序没有定义另外一个线程去调用中断,所以就没有写catch语句去捕获InterruptedException。

优先级

可以通过getPriority()方法读取现有线程的优先级,setPriority()方法设置优先级。

中断(interrupt)

中断是表明一个线程,它应该停止它正在做和将要做事的时。线程通过在 Thread
对象调用 interrupt 来实现线程的中断。为了中断机制能正常工作,被中断的线程必须支持自己的中断。

中断

一个中断是告诉一个线程它应该暂停它正在做的事情。线程怎么去回应这个中断完全取决于开发者的决定,但是让线程终止也是很正常的决定。这是在这一节着重介绍的用处。一个线程通过调用Thread对象的interrupt方法发送一个中断给到需要中断的线程。为了让中断机制运行正确,被中断的线程必须支持自己的中断。

让步

Thread.yield()

支持中断

如何实现线程支持自己的中断?这要看是什么它目前正在做。如果线程频繁调用抛出InterruptedException
的方法,它只要在 run 方法捕获了异常之后返回即可。例如 :

for (int i = 0; i < importantInfo.length; i++) {
    // Pause for 4 seconds
    try {
        Thread.sleep(4000);
    } catch (InterruptedException e) {
        // We've been interrupted: no more messages.
        return;
    }
    // Print a message
    System.out.println(importantInfo[i]);
}

很多方法都会抛出 InterruptedException,如
sleep,被设计成在收到中断时立即取消他们当前的操作并返回。

若线程长时间没有调用方法抛出 InterruptedException
的话,那么它必须定期调用 Thread.interrupted ,在接收到中断后返回 true。

for (int i = 0; i < inputs.length; i++) {
    heavyCrunch(inputs[i]);
    if (Thread.interrupted()) {
        // We've been interrupted: no more crunching.
        return;
    }
}

在这个简单的例子中,代码简单地测试该中断,如果已接收到中断线程就退出。在更复杂的应用程序,它可能会更有意义抛出一个
InterruptedException:

if (Thread.interrupted()) {
    throw new InterruptedException();
}

支持中断

一个线程怎样支持它自己的中断列?这个依赖于它正在做什么。假如说一个线程经常调用抛出InterruptedException的方法,在它捕获这个异常之后它仅仅是从run方法中返回。例如,假如在SleepMessages例子中的主要循环语句在Runnable对象的run方法当中。然后它可以被修改成下面的形式支持中断:

    `for (int i = 0; i < importantInfo.length; i++) {
        // Pause for 4 seconds
        try {
            Thread.sleep(4000);
        } catch (InterruptedException e) {
            // We've been interrupted: no more messages.
            return;
        }
        // Print a message
        System.out.println(importantInfo[i]);
    }`

很多方法抛出InterruptedException,例如sleep方法,这些方法被设计成当它们收到中断时立马取消当前的操作和立即返回。

假如有个线程没有调用抛出InterruptedException的方法,那它怎样去响应中断列?那么它必须定期的去调用Thread的interrupted方法,这个方法在该线程设置了中断的情况下返回true。例如:

    `for (int i = 0; i < inputs.length; i++) {
        heavyCrunch(inputs[i]);
        if (Thread.interrupted()) {
            // We've been interrupted: no more crunching.
            return;
        }
    }`

在这个简单的例子当中,代码仅仅检查中断如果那个线程已经接收了中断,那么这个check返回true。在很多复杂的应用当中,抛出一个InterruptedException更加让人明白:

    `if (Thread.interrupted()) {
        throw new InterruptedException();
    }`

这个例子让中断处理的代码集中到catch的语句当中。

后台线程

程序在运行时在后台提供一种通用服务的线程,当所有非后台线程结束时,程序也就终止了,例如main().
线程启动之前调用setDaemon(true)方法,设置为后台线程。
可以调用isDaemon()方法确定线程是否是一个后台线程

中断状态标志

中断机制是使用被称为中断状态的内部标志实现的。调用 Thread.interrupt
可以设置该标志。当一个线程通过调用静态方法 Thread.interrupted
检查中断,中断状态被清除。非静态 isInterrupted
方法,它是用于线程来查询另一个线程的中断状态,不会改变中断状态标志。

按照惯例,任何方法因抛出一个 InterruptedException
退出都会清除中断状态。当然,它可能因为另一个线程调用 interrupt
而让那个中断状态立即被重新设置。

中断状态标志

中断原理是用内部的一个叫做中断状态的标志来实现的。调用Thread的interrupt方法会设置这个标志。当一个线程通过调用Thread的静态方法interrupted检查中断时,中断状态会被清除。一个线程会用另外一个线程的非静态的isInterrupted方法来查询它的中断状态,这个操作不会改变另外一个线程的中断状态标志。

按照规定,任何一个可以抛出InterruptedException的方法在抛出InterruptedException之后会清除中断状态。然而,也很有可能,这个中断状态会立马被另外一个线程调用interrupt方法设置。

加入一个线程

一个线程可以在其他线程之上调用join()方法,其效果是等待一段时间直到第二个线程结束才继续执行。对join()方法的调用可以被中断,做法是在调用线程上调用interrupt()方法

join 方法

join 方法允许一个线程等待另一个完成。假设 t 是一个 Thread 对象,

t.join();

它会导致当前线程暂停执行直到 t 线程终止。join
允许程序员指定一个等待周期。与 sleep
一样,等待时间是依赖于操作系统的时间,不能假设 join 等待时间是精确的。

像 sleep 一样,join 响应中断并通过 InterruptedException 退出。

Join方法

Join方法让一个线程可以等待另外一个线程执行完。如果t是一个正在执行的线程对象,

    `t.join();`

这个调用会让当前线程停止执行直到t执行完成。join的方法的重载让开发者可以指定特定的等待时间。然而和sleep方法一样,join方法等待的时间依赖于操作系统,因此你不可以认为join方法会准确的等待你所指定的时间。

就像sleep方法,join通过抛出InterruptedException来响应中断。

共享受限资源

序列化访问共享资源,即在给定时刻只允许一个任务访问共享资源(加锁)。

SimpleThreads 示例

SimpleThreads 示例,有两个线程,第一个线程是每个 Java
应用程序都有主线程。主线程创建的 Runnable 对象
MessageLoop,并等待它完成。如果 MessageLoop
需要很长时间才能完成,主线程就中断它。

该 MessageLoop
线程打印出一系列消息。如果中断之前就已经打印了所有消息,则 MessageLoop
线程打印一条消息并退出。

public class SimpleThreads {
      // Display a message, preceded by
    // the name of the current thread
    static void threadMessage(String message) {
        String threadName =
            Thread.currentThread().getName();
        System.out.format("%s: %s%n",
                          threadName,
                          message);
    }

    private static class MessageLoop
        implements Runnable {
        public void run() {
            String importantInfo[] = {
                "Mares eat oats",
                "Does eat oats",
                "Little lambs eat ivy",
                "A kid will eat ivy too"
            };
            try {
                for (int i = 0;
                     i < importantInfo.length;
                     i++) {
                    // Pause for 4 seconds
                    Thread.sleep(4000);
                    // Print a message
                    threadMessage(importantInfo[i]);
                }
            } catch (InterruptedException e) {
                threadMessage("I wasn't done!");
            }
        }
    }

    public static void main(String args[])
        throws InterruptedException {

        // Delay, in milliseconds before
        // we interrupt MessageLoop
        // thread (default one hour).
        long patience = 1000 * 60 * 60;

        // If command line argument
        // present, gives patience
        // in seconds.
        if (args.length > 0) {
            try {
                patience = Long.parseLong(args[0]) * 1000;
            } catch (NumberFormatException e) {
                System.err.println("Argument must be an integer.");
                System.exit(1);
            }
        }

        threadMessage("Starting MessageLoop thread");
        long startTime = System.currentTimeMillis();
        Thread t = new Thread(new MessageLoop());
        t.start();

        threadMessage("Waiting for MessageLoop thread to finish");
        // loop until MessageLoop
        // thread exits
        while (t.isAlive()) {
            threadMessage("Still waiting...");
            // Wait maximum of 1 second
            // for MessageLoop thread
            // to finish.
            t.join(1000);
            if (((System.currentTimeMillis() - startTime) > patience)
                  && t.isAlive()) {
                threadMessage("Tired of waiting!");
                t.interrupt();
                // Shouldn't be long now
                // -- wait indefinitely
                t.join();
            }
        }
        threadMessage("Finally!");
    }
}

SimpleThreads例子

下面的这个例子把这部分的一些概念综合起来进行展示。SimpleThreads包含两个线程。第一个线程是每个Java程序都会有的主线程。主线程通过Runnable对象创建了一个叫MessageLoop的新线程,然后主线程等待这个线程完成。如果MessageLoop这个线程花费了太久都没有完成,那么主线程就会中断这个线程。

MessageLoop线程会打印一系列的消息。如果在打印完所有消息之前中断这个线程,MessageLoop线程将打印一个消息然后退出。

    `public class SimpleThreads {
        // Display a message, preceded by
        // the name of the current thread
        static void threadMessage(String message) {
            String threadName =
                Thread.currentThread().getName();
            System.out.format("%s: %s%n",
                              threadName,
                              message);
        }

        private static class MessageLoop
            implements Runnable {
            public void run() {
                String importantInfo[] = {
                    "Mares eat oats",
                    "Does eat oats",
                    "Little lambs eat ivy",
                    "A kid will eat ivy too"
                };
                try {
                    for (int i = 0;
                         i < importantInfo.length;
                         i++) {
                        // Pause for 4 seconds
                        Thread.sleep(4000);
                        // Print a message
                        threadMessage(importantInfo[i]);
                    }
                } catch (InterruptedException e) {
                    threadMessage("I wasn't done!");
                }
            }
        }

        public static void main(String args[])
            throws InterruptedException {

            // Delay, in milliseconds before
            // we interrupt MessageLoop
            // thread (default one hour).
            long patience = 1000 * 60 * 60;

            // If command line argument
            // present, gives patience
            // in seconds.
            if (args.length > 0) {
                try {
                    patience = Long.parseLong(args[0]) * 1000;
                } catch (NumberFormatException e) {
                    System.err.println("Argument must be an integer.");
                    System.exit(1);
                }
            }

            threadMessage("Starting MessageLoop thread");
            long startTime = System.currentTimeMillis();
            Thread t = new Thread(new MessageLoop());
            t.start();

            threadMessage("Waiting for MessageLoop thread to finish");
            // loop until MessageLoop
            // thread exits
            while (t.isAlive()) {
                threadMessage("Still waiting...");
                // Wait maximum of 1 second
                // for MessageLoop thread
                // to finish.
                t.join(1000);
                if (((System.currentTimeMillis() - startTime) > patience)
                      && t.isAlive()) {
                    threadMessage("Tired of waiting!");
                    t.interrupt();
                    // Shouldn't be long now
                    // -- wait indefinitely
                    t.join();
                }
            }
            threadMessage("Finally!");
        }
    }`

同步

Java以提供关键字synchronized的形式,为防止资源冲突提供了内置支持。
共享资源一般是以对象形式存在的内存片段,但也可以是文件,输入输出端口,或者是打印机。
每个访问临界共享资源的方法都必须被同步

同步(Synchronization)

线程间的通信主要是通过共享访问字段以及其字段所引用的对象来实现的。这种形式的通信是非常有效的,但可能导致2种可能的错误:线程干扰(thread
interference)和内存一致性错误(memory consistency
errors)。同步就是要需要避免这些错误的工具。

但是,同步可以引入线程竞争(thread
contention),当两个或多个线程试图同时访问相同的资源时,并导致了 Java
运行时执行一个或多个线程更慢,或甚至暂停他们的执行。饥饿(Starvation)和活锁
(livelock) 是线程竞争的表现形式。

同步

线程通信从根本上是通过对属性和对象引用的属性的共享访问实现的。这种形式的通信效率特别高,但是会带来两种可能的错误:线程干扰和内存一致性错误。防止这类错误发生的工具就是同步。

然而,同步又会引入线程竞争问题,这种问题在两个或者多个线程同时去访问相同的资源的时候会发生,会让Java执行一些线程变的更慢甚至可能暂停它们的执行。饥饿和活锁是线程竞争的表现形式。可以在章节活锁中了解关于这方面更多信息。

这一节主要是讲解下面这些话题:

  • 线程干扰是描述多线程访问共享数据时错误是怎么引入的。
  • 内存一致性错误描述的是共享内存的不一致性错误。
  • 同步方法描述的是一种有效的防止线程干扰和内存一致性错误的方法。
  • 隐式锁和同步描述的是一种更加普遍的基于隐士锁的同步方法。
  • 原子性讨论的是不能被其他线程干扰的操作的大体概念。

显示Lock对象

java.util.concurrent类库定义有java.util.concurrent.locks中的显式的互斥机制。

线程干扰

描述当多个线程访问共享数据时是错误如何出现。

考虑下面的一个简单的类 Counter:

public class Counter {
    private int c = 0;

    public void increment() {
        c++;
    }

    public void decrement() {
        c--;
    }

    public int value() {
        return c;
    }
}

其中的 increment 方法用来对 c 加1;decrement 方法用来对 c 减
1。然而,有多个线程中都存在对某个 Counter
对象的引用,那么线程间的干扰就可能导致出现我们不想要的结果。

线程间的干扰出现在多个线程对同一个数据进行多个操作的时候,也就是出现了“交错”。这就意味着操作是由多个步骤构成的,而此时,在这多个步骤的执行上出现了叠加。

Counter类对象的操作貌似不可能出现这种“交错(interleave)”,因为其中的两个关于c
的操作都很简单,只有一条语句。然而,即使是一条语句也是会被虚拟机翻译成多个步骤的。在这里,我们不深究虚拟机具体上上面的操作翻译成了什么样的步骤。只需要知道即使简单的
c++ 这样的表达式也是会被翻译成三个步骤的:

  1. 获取 c 的当前值。
  2. 对其当前值加 1。
  3. 将增加后的值存储到 c 中。

表达式 c–
也是会被按照同样的方式进行翻译,只不过第二步变成了减1,而不是加1。

假定线程 A 中调用 increment 方法,线程 B 中调用 decrement
方法,而调用时间基本上相同。如果 c 的初始值为
0,那么这两个操作的“交错”顺序可能如下:

  1. 线程A:获取 c 的值。
  2. 线程B:获取 c 的值。
  3. 线程A:对获取到的值加1;其结果是1。
  4. 线程B:对获取到的值减1;其结果是-1。
  5. 线程A:将结果存储到 c 中;此时c的值是1。
  6. 线程B:将结果存储到 c 中;此时c的值是-1。

这样线程 A 计算的值就丢失了,也就是被线程 B
的值覆盖了。上面的这种“交错”只是其中的一种可能性。在不同的系统环境中,有可能是
B
线程的结果丢失了,或者是根本就不会出现错误。由于这种“交错”是不可预测的,线程间相互干扰造成的
bug 是很难定位和修改的。

线程干扰

下面有个叫做Counter的简单类

    `class Counter {
        private int c = 0;

        public void increment() {
            c++;
        }

        public void decrement() {
            c--;
        }

        public int value() {
            return c;
        }

    }`

Counter类的每次increment的方法的调用会让c的值加一,decrement的方法的调用会让c的值减一。然而,如果这个Counter对象被多线程引用,线程之间的干扰让它没有按照预期的运行。

当两个操作在不同的线程间在同样的数据上交替运行时会产生线程干扰。也就是说这两个操作包含多个步骤,然后步骤的序列产生了重叠。

看起来似乎在Counter的实例对象上面的操作交替进行是不可能的,因为两个在变量c上面的操作都是单个的、简单的语句。然而,尽管简单的语句也可以被虚拟机转化成为多个步骤执行。我们不需要检查到底虚拟机会花了多少步,只需要知道表达式c++被分解成3部就足够了:

  1. 取得c现在的值。
  2. 在取得的值上面增加1.
  3. 在把增加之后的值存储到c中。

表达式c–也会被按照相同的方式分解,除了把第二步的增加换为减少就行了。

假定线程A调用了increment方法,在同一时刻,线程B调用了decrement方法。如果c的初始值是0,它们交替执行的序列可能是下面这样:

  1. 线程A:取得c的值。
  2. 线程B:取得c的值。
  3. 线程A:增加取得的值;结果是1。
  4. 线程B:减少取得的值;结果是-1。
  5. 线程A:存储结果到c中;c的值现在是1。
  6. 线程B:存储结果到c中;c的值现在是-1。

线程A执行的结果被线程B重写了。这种交替序列仅仅是其中的一种可能。在不同的情况下,可能是线程B的结果丢失,或者是得到预期的结果。因为线程干扰的bugs是不可预测的。

原子性和易变性

原子操作是不能被线程调度机制中断的操作,原子性可以应用于除long和doubel之外的基本类型之上的’简单操作’

原子类:AtomicInteger、AtomicLong、AtomicReference原子性变量类

临界区:防止多个线程同时访问方法内部的部分代码而不是访问整个方法,通过这种方式分离出来的代码段被称为临界区。

synchronized(syncObject){
  //This code can be accessed
  //by only one task at a time
} 

内存一致性错误

介绍了通过共享内存出现的不一致的错误。

内存一致性错误(Memory consistency
errors)发生在不同线程对同一数据产生不同的“看法”。导致内存一致性错误的原因很复杂,超出了本书的描述范围。庆幸的是,程序员并不需要知道出现这些原因的细节。我们需要的是一种可以避免这种错误的方法。

避免出现内存一致性错误的关键在于理解 happens-before
关系。这种关系是一种简单的方法,能够确保一条语句对内存的写操作对于其它特定的语句都是可见的。为了理解这点,我们可以考虑如下的示例。假定定义了一个简单的
int 类型的字段并对其进行了初始化:

int counter = 0;

该字段由两个线程共享:A 和 B。假定线程 A 对 counter 进行了自增操作:

counter++;

然后,线程 B 打印 counter 的值:

System.out.println(counter);

如果以上两条语句是在同一个线程中执行的,那么输出的结果自然是1。但是如果这两条语句是在两个不同的线程中,那么输出的结构有可能是0。这是因为没有保证线程
A 对 counter 的修改对线程 B
来说是可见的。除非程序员在这两条语句间建立了一定的 happens-before 关系。

我们可以采取多种方式建立这种 happens-before
关系。使用同步就是其中之一,这点我们将会在下面的小节中看到。

到目前为止,我们已经看到了两种建立这种 happens-before 的方式:

  • 当一条语句中调用了 Thread.start 方法,那么每一条和该语句已经建立了
    happens-before 的语句都和新线程中的每一条语句有着这种
    happens-before。引入并创建这个新线程的代码产生的结果对该新线程来说都是可见的。
  • 当一个线程终止了并导致另外的线程中调用 Thread.join
    的语句返回,那么此时这个终止了的线程中执行了的所有语句都与随后的
    join 语句随后的所有语句建立了这种 happens-before
    。也就是说终止了的线程中的代码效果对调用 join 方法的线程来说是可见。

关于哪些操作可以建立这种
happens-before,更多的信息请参阅“java.util.concurrent
包的概要说明”。

内存一致性错误

数据一致性错误发生在不同的线程去读相同的数据时,读到的数据不一致。引起内存一致性错误的原因很复杂超出了这篇教程的范围。幸运的是,开发者不需要详细知道这些原因。开发者需要知道的是如何去避免这些错误。

避免内存一致性错误的关键是理解happens-before关系。这个关系仅仅是保证一块内存被一个特定的语句的写操作对另外一个特定的语句是可见的。为了理解上面的这句话,我们来看下下面的例子。假定定义了一个简单的int类型的属性,且初始值是0:

    `int counter = 0;`

这个属性在线程A和B之间是共享的。假定线程A增加了counter的值:

    `counter++;`

然后在很短的时间内,线程B打印了counter的值:

    `System.out.println(counter);`

如果这个两个语句在同一个线程当中执行,那么这个属性被打印出来的值肯定是‘’1‘’。但是如果两个语句在不同的线程当中执行,那么打印出来的值可能就是“0”,因为没有保证线程A对属性counter的改变对线程B是可见的,除非开发者在这两条语句之间建立了happens-before的关系。

有数种建立happens-before关系的行为。其中一种就是同步,这个我们将在接下来的章节当中看到。

我们已经看到下面两种行为会创建happens-before关系:

  • 当一个语句调用Thread.start方法时,那么每一个对这条语句有happens-before关系的语句对这个新线程执行的语句都有happens-before关系。
  • 当一个线程终止执行且在另外一个线程中调用Thread.join返回时,然后所有的在这个终止的线程中执行的语句都对那个被join的线程的接下来执行的语句有happens-before关系。在这个线程中代码的变化对被join的线程就是可见的。

所有的创建happens-before关系的行为.

线程本地存储

防止任务在共享资源上产生冲突的第二种方式是根除对变量的共享。线程本地存储是一种自动化机制,可以为使用相同变量的每个不同的线程都创建不同的存储。
创建和管理线程本地存储可以由java.lang.ThreadLocal类来实现。.

同步方法

描述了一个简单的做法,可以有效防止线程干扰和内存一致性错误。

Java 编程语言中提供了两种基本的同步用语:同步方法(synchronized
methods)和同步语句(synchronized
statements)。同步语句相对而言更为复杂一些,我们将在下一小节中进行描述。本节重点讨论同步方法。

我们只需要在声明方法的时候增加关键字 synchronized 即可:

public class SynchronizedCounter {
    private int c = 0;

    public synchronized void increment() {
        c++;
    }

    public synchronized void decrement() {
        c--;
    }

    public synchronized int value() {
        return c;
    }
}

如果 count 是 SynchronizedCounter
类的实例,设置其方法为同步方法将有两个效果:

  • 首先,不可能出现对同一对象的同步方法的两个调用的“交错”。当一个线程在执行一个对象的同步方式的时候,其他所有的调用该对象的同步方法的线程都会被挂起,直到第一个线程对该对象操作完毕。
  • 其次,当一个同步方法退出时,会自动与该对象的同步方法的后续调用建立
    happens-before 关系。这就确保了对该对象的修改对其他线程是可见的。

注意:构造函数不能是 synchronized ——在构造函数前使用 synchronized
关键字将导致语义错误。同步构造函数是没有意义的。这是因为只有创建该对象的线程才能调用其构造函数。

警告:在创建多个线程共享的对象时,要特别小心对该对象的引用不能过早地“泄露”。例如,假定我们想要维护一个保存类的所有实例的列表
instances。我们可能会在构造函数中这样写到:

instances.add(this);

但是,其他线程可会在该对象的构造完成之前就访问该对象。

同步方法是一种简单的可以避免线程相互干扰和内存一致性错误的策略:如果一个对象对多个线程都是可见的,那么所有对该对象的变量的读写都应该是通过同步方法完成的(一个例外就是
final
字段,他在对象创建完成后是不能被修改的,因此,在对象创建完毕后,可以通过非同步的方法对其进行安全的读取)。这种策略是有效的,但是可能导致“活跃度(liveness)”问题。这点我们会在本课程的后面进行描述。

同步方法

Java编程语言提供了两种编程方式:同步方法和同步代码块。其中复杂的同步代码块在下个章节进行讲解。这个章节讲解同步方法。

仅仅只需要在方法声明前面加上关键字synchronized,就也可以把这个方法变成同步的:

    `public class SynchronizedCounter {
        private int c = 0;

        public synchronized void increment() {
            c++;
        }

        public synchronized void decrement() {
            c--;
        }

        public synchronized int value() {
            return c;
        }
    }`

如果count是SynchronizedCounter的一个实例,把这些方法变成同步的有下面两个影响:

  • 首先,交替调用同一个对象的同步方法是不可能的。当一个线程在执行一个对象的同步方法时,所有其他再调用这个对象的同步方法的线程都将被阻塞直到第一个调用的线程执行完成。
  • 其次,当一个同步方法退出时,它就自动对后面的同一个对象的同步方法的调用建立了happens-before关系。这样子保证了对象的状态的改变对所有的线程都是可见的。

记住构造器方法是不可以同步的,在构造器方法前面加关键字synchronized是有语法错误的。同步构造器方法没有任何意义,因为只有创建这个对象的线程在这个创建这个对象的时候才有访问它的权限。

*警告:当创建一个在多个线程中共享的对象时,要特别小心对象的引用提早泄漏出去。举个例子,假定你想让一个叫instances的List包含每一个类的实例。你也许会在你的构造器中加入下面的代码,但是然后其他线程可以在对象的构造完成之前用instances去访问对象(has
issue,need optimize):

          `instances.add(this);`

同步方法是一个简单的防止线程干扰和内存一致性错误的策略;如果一个对象对一个以上的线程是可见的,那么这个对象的变量的所有的读和写操作是通过同步方法完成的。(一个重要的特例:当对象创建之后不可以被修改的final属性是可以被非同步的方法安全的读的。)这个策略很有效,但是可能会产生并发活跃性的问题,我们将在后面的章节看到。

线程之间的协作

内部锁和同步

描述了一个更通用的同步方法,并介绍了同步是如何基于内部锁的。

同步是构建在被称为“内部锁(intrinsic lock)”或者是“监视锁(monitor
lock)”的内部实体上的。(在 API
中通常被称为是“监视器(monitor)”。)内部锁在两个方面都扮演着重要的角色:保证对对象状态访问的排他性和建立也对象可见性相关的重要的“
happens-before。

每一个对象都有一个与之相关联动的内部锁。按照传统的做法,当一个线程需要对一个对象的字段进行排他性访问并保持访问的一致性时,他必须在访问前先获取该对象的内部锁,然后才能访问之,最后释放该内部锁。在线程获取对象的内部锁到释放对象的内部锁的这段时间,我们说该线程拥有该对象的内部锁。只要有一个线程已经拥有了一个内部锁,其他线程就不能再拥有该锁了。其他线程将会在试图获取该锁的时候被阻塞了。

当一个线程释放了一个内部锁,那么就会建立起该动作和后续获取该锁之间的
happens-before 关系。

内部锁和同步

同步时建立在一个内部实体周围也就是大家知道的内部锁或者叫监控锁。(API文档里面几次把这实体叫做监控)内部锁不仅仅在同步方面(强制排它的访问对象的状态)起到作用,同时也在建立happens-before关系(对可见性是必须的)起到作用。

每一个对象都有一个和它相关联的内部锁。一个需要排它和一致访问对象属性的线程在访问对象属性之前必须要先获得对象的内部锁,然后完成之后释放内部锁。也就是线程在持有这个对象的内部锁在获得和释放这个锁之间。只要一个线程持有了一个内部锁,那么其他的线程就都不可以拿到这个内部锁。其他的线程会被阻塞当它们尝试去获得这个锁时。

当一个线程释放一个内部锁时,在这个动作和任意后面的获得这个锁的动作建立了happens-before关系。

wait()和notify()

wait()被调用时,线程的执行被挂起,对象上的锁被释放。
可以通过notify() 或者notifyAll(),或者令时间到期,从wait()中恢复执行。
只能在同步控制方法或同步控制块里调用wait()、notify()和notifyAll()

同步方法中的锁

当一个线程调用一个同步方法的时候,他就自动地获得了该方法所属对象的内部锁,并在方法返回的时候释放该锁。即使是由于出现了没有被捕获的异常而导致方法返回,该锁也会被释放。

我们可能会感到疑惑:当调用一个静态的同步方法的时候会怎样了,静态方法是和类相关的,而不是和对象相关的。在这种情况下,线程获取的是该类的类对象的内部锁。这样对于静态字段的方法是通过一个和类的实例的锁相区分的另外的锁来进行的。

在同步方法中的锁

当一个线程调用一个同步方法时,它会自动去获得这个方法的对象的内部锁,然后当这个方法退出时释放这个锁,即使是这个方法的退出是因为没有捕获的异常引起的。

你也许想知道当一个同步的静态方法被调用时会发生什么,因为一个静态的方法是和类相关联的,不是对象。在这种情况下,线程获得的是和这个类相关的对象的内部锁。因此控制访问类的静态属性的锁是和任何类的实例的锁是不一样的。

线程内部的数据共享

ThreadLocal线程局部变量。一个ThreadLocal代表一个变量,ThreadLocal每个使用该变量的线程提供独立的变量副本。
ThreadLocal类中有一个Map,用于存储每一个线程的变量副本,Map中元素的键位线程对象,而值对应线程的变量副本。

同步语句

另外一种创建同步代码的方式就是使用同步语句。和同步方法不同,使用同步语句是必须指明是要使用哪个对象的内部锁:

public void addName(String name) {
    synchronized(this) {
        lastName = name;
        nameCount++;
    }
    nameList.add(name);
}

在上面的示例中,方法 addName 需要对 lastName 和 nameCount
的修改进行同步,还要避免同步调用其他对象的方法(在同步代码段中调用其他对象的方法可能导致“活跃度(Liveness)”中描述的问题)。如果没有使用同步语句,那么将不得不使用一个单独的,未同步的方法来完成对
nameList.add 的调用。

在改善并发性时,巧妙地使用同步语句能起到很大的帮助作用。例如,我们假定类
MsLunch 有两个实例字段,c1 和
c2,这两个变量绝不会一起使用。所有对这两个变量的更新都需要进行同步。但是没有理由阻止对
c1 的更新和对 c2
的更新出现交错——这样做会创建不必要的阻塞,进而降低并发性。此时,我们没有使用同步方法或者使用和this
相关的锁,而是创建了两个单独的对象来提供锁。

public class MsLunch {
    private long c1 = 0;
    private long c2 = 0;
    private Object lock1 = new Object();
    private Object lock2 = new Object();

    public void inc1() {
        synchronized(lock1) {
            c1++;
        }
    }

    public void inc2() {
        synchronized(lock2) {
            c2++;
        }
    }
}

采用这种方式时需要特别的小心。我们必须绝对确保相关字段的访问交错是完全安全的。

同步代码块

另外一个种建立同步代码的的方法是同步代码块。和同步方法不一样,同步代码块必须指定哪一个对象的内部锁:

          `public void addName(String name) {
                  synchronized(this) {
                      lastName = name;
                      nameCount++;
                  }
                  nameList.add(name);
           }`

在这个例子中,addName方法需要同步的改变lastName和nameCount的属性的值,但是同时又需要避免其他对象方法的同步调用。(通过同步代码调用其他对象的方法会带来像章节Liveness。)如果没有同步代码,则必须要有单独的非同步的方法去调用nameList.add。

同步代码块用细粒度的锁可以提高并发。举个例子,类MsLunch有两个实例属性,c1和c2,它们从来不回被同时使用。所有的这两个属性的更新必须是同步的,如果c1的更新和c2
的更新用的是同一个对象的锁,那么在c1和c2交替更新时会造成不必要的阻塞从而降低了并发。我们可以创建两个对象单独的提供锁来代替使用同步方法获使用this锁。

          `public class MsLunch {
                    private long c1 = 0;
                    private long c2 = 0;
                    private Object lock1 = new Object();
                    private Object lock2 = new Object();

                    public void inc1() {
                          synchronized(lock1) {
                                  c1++;
                          }
                    }

                    public void inc2() {
                          synchronized(lock2) {
                                  c2++;
                          }
                    }
          }`

使用这种非常谨慎的方式。你一定可以完全确认交替的去访问受影响的属性是线程安全的。

重入同步

回想一下,一个线程是不可以获得一个被其他线程持有的锁。但是一个线程可以获得他自己持有的锁。允许一个线程可以获得多次获得同一个锁让重入同步成为可能。这个描述的是这样一个场景,一个同步的代码直接或者间接的调用一个包含同步的代码的方法,且这两个同步包含的同一个锁。如果没有重入同步特性,同步代码不得不做很多额外的措施去避免自己把自己阻塞。

生产者与消费者

重入同步(Reentrant Synchronization)

回忆前面提到的:线程不能获取已经被别的线程获取的锁。但是线程可以获取自身已经拥有的锁。允许一个线程能重复获得同一个锁就称为重入同步(reentrant
synchronization)。它是这样的一种情况:在同步代码中直接或者间接地调用了还有同步代码的方法,两个同步代码段中使用的是同一个锁。如果没有重入同步,在编写同步代码时需要额外的小心,以避免线程将自己阻塞。

原子访问

在编写程序的过程中,一个原子是操作是指一次执行完所有的操作。一个原子操作不可以在中间停下来:要么全部执行完成,要么都没有执行。原子操作带来的影响只有当它全部完成了以后才是可见的。

我们已经看过一个这样的自增表达式,例如c++,这不是一个原子操作。即使是非常简单的表达式也可以定义复杂的操作,这个操作可以被分解成其它的不同的操作。然而你可以定义一组操作是原子的。

  • 对于引用变量和大多数的简单类型的变量(除了long和double类型)的读和写操作都是原子类型的。

  • 对于所有声明为volatile的变量(包括long和double类型)的读和写操作都是原子类型的。

原子操作不可以被打断,所以使用原子操作不会受线程干扰的影响。然而,这并不排除所有需要同步的原子操作的错误,因为内存一致性错误还是存在。使用volatile变量降低了内存一致性错误出现的风险,因为任何对volatile变量的写操作都对后续的这个变量的读操作建立了happens-before的关系。也就是说volatile变量的变化对其他的线程总是可见的。更重要的是,当一个线程读一个volatile的变量时,不仅仅可以看到最新的变化,连代码的副作用导致的变化也可以看到。

使用简单的原子变量访问比用同步代码去控制变量访问更高效,但是开发者需要考虑的更多去避免内存一致性错误。这么做是否值得取决于应用的大小和复杂性。

在java.util.concurrent的包中的一些类提供了原子的方法,这些方法不依赖同步。我们将在后续章节中讨论。

死锁

某个任务在等待另一个任务,而后者又等待别的任务,这样一直下去,直到这个链条上的任务又在等待第一个任务释放锁。这得到了一个任务之间相互等待的连续循环,没有哪个线程能继续。

原子访问

介绍了不会被其他线程干扰的做法的总体思路。

在编程中,原子性动作就是指一次性有效完成的动作。原子性动作是不能在中间停止的:要么一次性完全执行完毕,要么就不执行。在动作没有执行完毕之前,是不会产生可见结果的。

通过前面的示例,我们已经发现了诸如 c++
这样的自增表达式并不属于原子操作。即使是非常简单的表达式也包含了复杂的动作,这些动作可以被解释成许多别的动作。然而,的确存在一些原子操作的:

  • 对几乎所有的原生数据类型变量(除了 long he
    double)的读写以及引用变量的读写都是原子的。
  • 对所有声明为 Volatile
    的变量的读写都是原子的,包括 long 和 double 类型。

原子性动作是不会出现交错的,因此,使用这些原子性动作时不用考虑线程间的干扰。然而,这并不意味着可以移除对原子操作的同步。因为内存一致性错误还是有可能出现的。使用
volatile 变量可以减少内存一致性错误的风险,因为任何对 volatile 变
量的写操作都和后续对该变量的读操作建立了 happens-before
关系。这就意味着对 volatile
类型变量的修改对于别的线程来说是可见的。更重要的是,这意味着当一个线程读取一个
volatile
类型的变量时,他看到的不仅仅是对该变量的最后一次修改,还看到了导致这种修改的代码带来的其他影响。

使用简单的原子变量访问比通过同步代码来访问变量更高效,但是需要程序员的更多细心考虑,以避免内存一致性错误。这种额外的付出是否值得完全取决于应用程序的大小和复杂度。

活跃性

一个并发应用及时执行任务的能力叫做活跃性。这个章节主要介绍最常见的活跃性问题死锁,然后会简短的介绍两种其他的活跃性问题,饥饿和活锁。

活跃度(Liveness)

一个并行应用程序的及时执行能力被称为它的活跃度(liveness)。本节将介绍最常见的一种活跃度的问题——死锁,以及另外两个活跃度的问题——饥饿和活锁。

死锁

死锁描述的是这样一个场景,二个或者多个线程因为互相等待而永久阻塞。举个例子。

A和B是朋友,同时也是非常有礼貌的信徒。一个严格的有礼貌的规则是当你向你的朋友鞠躬时,你必须保持鞠躬的动作直到你的朋友有机会向你鞠躬回礼。不幸的是,这个规则没有考虑两个朋友同时向对方鞠躬的情况。下面的这个死锁例子模拟了这种情况:

    `public class Deadlock {
        static class Friend {
            private final String name;
            public Friend(String name) {
                this.name = name;
            }
            public String getName() {
                return this.name;
            }
            public synchronized void bow(Friend bower) {
                System.out.format("%s: %s"
                    + "  has bowed to me!%n", 
                    this.name, bower.getName());
                bower.bowBack(this);
            }
            public synchronized void bowBack(Friend bower) {
                System.out.format("%s: %s"
                    + " has bowed back to me!%n",
                    this.name, bower.getName());
            }
        }

        public static void main(String[] args) {
            final Friend alphonse =
                new Friend("Alphonse");
            final Friend gaston =
                new Friend("Gaston");
            new Thread(new Runnable() {
                public void run() { alphonse.bow(gaston); }
            }).start();
            new Thread(new Runnable() {
                public void run() { gaston.bow(alphonse); }
            }).start();
        }
    }`

当这个死锁例子运行时,两个线程极有可能阻塞当它们尝试去调用bowBack方法时。两个线程都会一直阻塞下去,因为它们都在等待对方鞠躬回礼。

死锁(Deadlock)

死锁是指两个或两个以上的线程永远被阻塞,一直等待对方的资源。

下面是一个例子。

Alphonse 和 Gaston
是朋友,都很有礼貌。礼貌的一个严格的规则是,当你给一个朋友鞠躬时,你必须保持鞠躬,直到你的朋友鞠躬回给你。不幸的是,这条规则有个缺陷,那就是如果两个朋友同一时间向对方鞠躬,那就永远不会完了。这个示例应用程序中,死锁模型是这样的:

public class Deadlock {
    static class Friend {
        private final String name;

        public Friend(String name) {
            this.name = name;
        }

        public String getName() {
            return this.name;
        }

        public synchronized void bow(Friend bower) {
            System.out.format("%s: %s" + "  has bowed to me!%n", this.name, bower.getName());
            bower.bowBack(this);
        }

        public synchronized void bowBack(Friend bower) {
            System.out.format("%s: %s" + " has bowed back to me!%n", this.name, bower.getName());
        }
    }

    public static void main(String[] args) {
        final Friend alphonse = new Friend("Alphonse");
        final Friend gaston = new Friend("Gaston");
        new Thread(new Runnable() {
            public void run() {
                alphonse.bow(gaston);
            }
        }).start();
        new Thread(new Runnable() {
            public void run() {
                gaston.bow(alphonse);
            }
        }).start();
    }
}

当他们尝试调用 bowBack
两个线程将被阻塞。无论是哪个线程永远不会结束,因为每个线程都在等待对方鞠躬。这就是死锁了。

饥饿和活锁

饥饿和活锁没有死锁那么常见,但也是每一个并发程序设计者可能会遇到的问题。

饥饿和活锁(Starvation and Livelock)

饥饿和活锁虽比死锁问题稍微不常见点,但这些是在并发软件种每一个设计师仍然可能会遇到的问题。

饥饿

饥饿描述的是一种线程不能够获得定期访问共享资源的场景,且不能够取得进展。饥饿发生在共享资源被“贪婪的”线程长期占用造成共享资源不可用的情况下。例如,假如一个对象提供一个同步方法,然后这个同步方法经常需要很长的时间才能返回。如果一个线程经常调用这个方法,其他需要经常同步访问这个对象的线程就会经常被阻塞。

饥饿(Starvation)

饥饿描述了这样一个情况,一个线程不能获得定期访问共享资源,于是无法继续执行。这种情况一般出现在共享资源被某些“贪婪”线程占用,而导致资源长时间不被其他线程可用。例如,假设一个对象提供一个同步的方法,往往需要很长时间返回。如果一个线程频繁调用该方法,其他线程若也需要频繁的同步访问同一个对象通常会被阻塞。

活锁

一个线程经常是去响应另外一个线程的操作。如果这个另外的一个线程的执行又是响应另外的一个线程,然后活锁可能就产生了。和死锁一样,活锁线程也不能够取得进一步的进展。然而这些线程并没有被阻塞——它们仅仅是因为太忙而不能去互相响应然后继续工作。这就和两个人在一条走廊里面尝试去超过对方:A移到他的左边让B通过,同事B也同时移动他的右边让A通过。他们仍然阻塞对方。

活锁(Livelock)

一个线程常常处于响应另一个线程的动作,如果其他线程也常常处于该线程的动作,那么就可能出现活锁。与死锁、活锁的线程一样,程序无法进一步执行。然而,线程是不会阻塞的,他们只是会忙于应对彼此的恢复工作。现实种的例子是,两人面对面试图通过一条走廊:
Alphonse 移动到他的左则让路给 Gaston ,而 Gaston 移动到他的右侧想让
Alphonse
过去,两个人同时让路,但其实两人都挡住了对方没办法过去,他们仍然彼此阻塞。

警戒块

线程经常需要去调整它们的行为。最常见的调整方式是警戒块。这个块在可以继续执行之前通过轮询一个必须为true的条件开始。为了保证这个做正确有一些步骤需要做。

假定,例如guardedJoy是一个在共享变量joy被另外一个线程设置了以后才会继续执行的一个方法。这个方法理论上可以一直循环直到条件满足,但是轮询是很浪费的,因为它在等待的过程中一直在执行。

    `public void guardedJoy() {
        // Simple loop guard. Wastes
        // processor time. Don't do this!
        while(!joy) {}
        System.out.println("Joy has been achieved!");
    }`

一个更有效的警戒方式是调用Object的wait方法挂起当前线程。wait方法的调用直到另外一个线程发布通知之后,才会返回,这个通知可能是一些事件已经发生,尽管这个事件可能不是这个线程等待的:

          `public synchronized void guardedJoy() {
                   // This guard only loops once for each special event, which may not
                   // be the event we're waiting for.
                   while(!joy) {
                         try {
                              wait();
                        } catch (InterruptedException e) {}
                   }
                   System.out.println("Joy and efficiency have been achieved!");
           }`

注意:你需要在检测特定的条件的循环中调用wait方法。不要认为这个中断是你等待的特定的条件或者这个条件这个条件仍然是正确的。


像许多其他挂起执行的方法一样,wait方法也会抛出InterruptedException。在上面的这个例子中,我们可以忽略这个异常,变量joy的值是我们唯一关心的。

为什么这个版本的guardedJoy方法需要用关键字synchronized修饰咧?假定d是我们调用wait方法的对象。当一个线程调用
d的wait方法时,它必须要先获得d的对象内部锁否则会抛出异常——调用wait方法前,必须要先获得对象的内部锁。在同步方法内部调用wait时一种简单的获得内部锁的方法。

当wait方法被调用时,线程释放了内部锁,挂起了执行。在将来的某个时间,另外一个线程将获得同一个内部锁,调用该对象的notifyAll方法,告诉所有等待在这个锁上面的线程重要的事情放生了:

         `public synchronized notifyJoy() {
                joy = true;
                notifyAll();
         }`�

在第二个线程已经释放锁之后的某个时间,第一个线程重新获得了锁,从wait方法中返回然后继续执行。


注意:有另外一种通知的方法,notify,这个方法只唤醒一个线程。因为notify这个方法不允许你指定将要被唤醒的线程,所以notify方法只适用于大规模的并行应用程序,那种大量的做类似工作的
线程。在这种应用程序中,你不需要去关心哪一个线程被唤醒。


让我们用警戒块来创建一个生产者——消费者应用程序。这中应用程序在两个线程当中共享数据:生产者线程,负责创建数据;消费者线程,负责消费数据。两个线程用一个共享的对象进行交流。协调是必不可少的:消费者线程在生产者传递数据之前不可以尝试获得数据,生产者在消费者还没有获得就数据之前不可以尝试去传递新数据。

在下面的这个例子中,数据时一系列的文本消息,这些数据在Drop对象中共享。

    `public class Drop {
        // Message sent from producer
        // to consumer.
        private String message;
        // True if consumer should wait
        // for producer to send message,
        // false if producer should wait for
        // consumer to retrieve message.
        private boolean empty = true;

        public synchronized String take() {
            // Wait until message is
            // available.
            while (empty) {
                try {
                    wait();
                } catch (InterruptedException e) {}
            }
            // Toggle status.
            empty = true;
            // Notify producer that
            // status has changed.
            notifyAll();
            return message;
        }

        public synchronized void put(String message) {
            // Wait until message has
            // been retrieved.
            while (!empty) {
                try { 
                    wait();
                } catch (InterruptedException e) {}
            }
            // Toggle status.
            empty = false;
            // Store message.
            this.message = message;
            // Notify consumer that status
            // has changed.
            notifyAll();
        }
    }`

消费者线程在类Producer中进行定义,它发送一系列常见的消息。DONE字符串表示所有的消息已经被发送完成。为了模仿真实世界中应用程序的不可预知性,生产者线程在发送消息之间停止随机的时间间隔。

    `import java.util.Random;

    public class Producer implements Runnable {
        private Drop drop;

        public Producer(Drop drop) {
            this.drop = drop;
        }

        public void run() {
            String importantInfo[] = {
                "Mares eat oats",
                "Does eat oats",
                "Little lambs eat ivy",
                "A kid will eat ivy too"
            };
            Random random = new Random();

            for (int i = 0;
                 i < importantInfo.length;
                 i++) {
                drop.put(importantInfo[i]);
                try {
                    Thread.sleep(random.nextInt(5000));
                } catch (InterruptedException e) {}
            }
            drop.put("DONE");
        }
    }`

消费者线程在类Consumer中定义,它仅仅获取这些数据然后打印出来直到收到DONE字符串。这个线程也停止随机时间。

    `import java.util.Random;

    public class Consumer implements Runnable {
        private Drop drop;

        public Consumer(Drop drop) {
            this.drop = drop;
        }

        public void run() {
            Random random = new Random();
            for (String message = drop.take();
                 ! message.equals("DONE");
                 message = drop.take()) {
                System.out.format("MESSAGE RECEIVED: %s%n", message);
                try {
                    Thread.sleep(random.nextInt(5000));
                } catch (InterruptedException e) {}
            }
        }
    }`

最后,下面是启动生产线程和消费着线程的类ProducerConsumerExample。

    `public class ProducerConsumerExample {
        public static void main(String[] args) {
            Drop drop = new Drop();
            (new Thread(new Producer(drop))).start();
            (new Thread(new Consumer(drop))).start();
        }
    }`

注意:类Drop这样写是为了展示警戒块。为了避免重复造轮子,在去编写你的共享对象时吗,先去Java
Collections
Framework
中检查一下已经存在的数据结构。如果你想了解更多的信息,请跳转到章节Questions
and
Exercises


Guarded Blocks

多线程之间经常需要协同工作,最常见的方式是使用 Guarded
Blocks,它循环检查一个条件(通常初始值为
true),直到条件发生变化才跳出循环继续执行。在使用 Guarded Blocks
时有以下几个步骤需要注意:

假设 guardedJoy 方法必须要等待另一线程为共享变量 joy
设值才能继续执行。那么理论上可以用一个简单的条件循环来实现,但在等待过程中
guardedJoy 方法不停的检查循环条件实际上是一种资源浪费。

public void guardedJoy() {
    // Simple loop guard. Wastes
    // processor time. Don't do this!
    while(!joy) {}
    System.out.println("Joy has been achieved!");
}

更加高效的保护方法是调用 Object.wait 将当前线程挂起,直到有另一线程发起事件通知(尽管通知的事件不一定是当前线程等待的事件)。

public synchronized void guardedJoy() {
    // This guard only loops once for each special event, which may not
    // be the event we're waiting for.
    while(!joy) {
        try {
            wait();
        } catch (InterruptedException e) {}
    }
    System.out.println("Joy and efficiency have been achieved!");
}

注意:一定要在循环里面调用 wait
方法,不要想当然的认为线程唤醒后循环条件一定发生了改变。

和其他可以暂停线程执行的方法一样,wait 方法会抛出
InterruptedException,在上面的例子中,因为我们关心的是 joy
的值,所以忽略了 InterruptedException。

为什么 guardedJoy 是 synchronized 的?假设 d 是用来调用 wait
的对象,当一个线程调用 d.wait,它必须要拥有
d的内部锁(否则会抛出异常),获得 d 的内部锁的最简单方法是在一个
synchronized 方法里面调用 wait。

当一个线程调用 wait
方法时,它释放锁并挂起。然后另一个线程请求并获得这个锁并调用
Object.notifyAll 通知所有等待该锁的线程。

public synchronized notifyJoy() {
    joy = true;
    notifyAll();
}

当第二个线程释放这个该锁后,第一个线程再次请求该锁,从 wait
方法返回并继续执行。

注意:还有另外一个通知方法,notify(),它只会唤醒一个线程。但由于它并不允许指定哪一个线程被唤醒,所以一般只在大规模并发应用(即系统有大量相似任务的线程)中使用。因为对于大规模并发应用,我们其实并不关心哪一个线程被唤醒。

现在我们使用 Guarded blocks
创建一个生产者/消费者应用。这类应用需要在两个线程之间共享数据:生产者生产数据,消费者使用数据。两个线程通过共享对象通信。在这里,线程协同工作的关键是:生产者发布数据之前,消费者不能够去读取数据;消费者没有读取旧数据前,生产者不能发布新数据。

在下面的例子中,数据通过 Drop 对象共享的一系列文本消息:

public class Drop {
      // Message sent from producer
    // to consumer.
    private String message;
    // True if consumer should wait
    // for producer to send message,
    // false if producer should wait for
    // consumer to retrieve message.
    private boolean empty = true;

    public synchronized String take() {
        // Wait until message is
        // available.
        while (empty) {
            try {
                wait();
            } catch (InterruptedException e) {}
        }
        // Toggle status.
        empty = true;
        // Notify producer that
        // status has changed.
        notifyAll();
        return message;
    }

    public synchronized void put(String message) {
        // Wait until message has
        // been retrieved.
        while (!empty) {
            try { 
                wait();
            } catch (InterruptedException e) {}
        }
        // Toggle status.
        empty = false;
        // Store message.
        this.message = message;
        // Notify consumer that status
        // has changed.
        notifyAll();
    }
}

Producer 是生产者线程,发送一组消息,字符串 DONE
表示所有消息都已经发送完成。为了模拟现实情况,生产者线程还会在消息发送时随机的暂停。

public class Producer implements Runnable {
    private Drop drop;

    public Producer(Drop drop) {
        this.drop = drop;
    }

    public void run() {
        String importantInfo[] = { "Mares eat oats", "Does eat oats", "Little lambs eat ivy",
                "A kid will eat ivy too" };
        Random random = new Random();

        for (int i = 0; i < importantInfo.length; i++) {
            drop.put(importantInfo[i]);
            try {
                Thread.sleep(random.nextInt(5000));
            } catch (InterruptedException e) {
            }
        }
        drop.put("DONE");
    }
}

Consumer 是消费者线程,读取消息并打印出来,直到读取到字符串 DONE
为止。消费者线程在消息读取时也会随机的暂停。

public class Consumer implements Runnable {
    private Drop drop;

    public Consumer(Drop drop) {
        this.drop = drop;
    }

    public void run() {
        Random random = new Random();
        for (String message = drop.take(); !message.equals("DONE"); message = drop.take()) {
            System.out.format("MESSAGE RECEIVED: %s%n", message);
            try {
                Thread.sleep(random.nextInt(5000));
            } catch (InterruptedException e) {
            }
        }
    }
}

ProducerConsumerExample 是主线程,它启动生产者线程和消费者线程。

public class ProducerConsumerExample {
    public static void main(String[] args) {
        Drop drop = new Drop();
        (new Thread(new Producer(drop))).start();
        (new Thread(new Consumer(drop))).start();
    }
}

不可变对象

如果一个对象在创建之后,它的状态就不可以改变了,那么这个对象就是不可变对象。对不可变对象的最大依赖被广泛认为是创建简单可靠代码的合理策略。

不可变对象在并发应用当中特别有用。因为它们的状态无法改变,它们不可能受线程干扰和状态不一致的影响。

开发者经常不情愿的去使用不可变对象,比起更新对象的开销它们更担心的是创建一个新的对象的开销。创建新对象的影响经常被估高了,这些影响可以被一些高效的不可变对像抵消。这包括减少由于gc或者为了消除让那些可变对象在并发中运行的代码的开销。

下面的几个小节从可变的对象的类当中得到不可变对象的类。通过这么做,让大家知道这种通用的转换规则,同时也展示了一些不可变对象的优点。

不可变对象(Immutable Objects)

如果一个对象它被构造后其,状态不能改变,则这个对象被认为是不可变的(immutable
)。不可变对象的好处是可以创建简单的、可靠的代码。

不可变对象在并发应用种特别有用。因为他们不能改变状态,它们不能被线程干扰所中断或者被其他线程观察到内部不一致的状态。

程序员往往不愿使用不可变对象,因为他们担心创建一个新的对象要比更新对象的成本要高。实际上这种开销常常被过分高估,而且使用不可变对象所带来的一些效率提升也抵消了这种开销。例如:使用不可变对象降低了垃圾回收所产生的额外开销,也减少了用来确保使用可变对象不出现并发错误的一些额外代码。

接下来看一个可变对象的类,然后转化为一个不可变对象的类。通过这个例子说明转化的原则以及使用不可变对象的好处。

一个同步类的例子

类SynchronizedRGB定义了代表颜色的对象。每一个对象代表一种颜色,由三个int类型代表颜色的和颜色的名字组成。

    `public class SynchronizedRGB {

        // Values must be between 0 and 255.
        private int red;
        private int green;
        private int blue;
        private String name;

        private void check(int red,
                           int green,
                           int blue) {
            if (red < 0 || red > 255
                || green < 0 || green > 255
                || blue < 0 || blue > 255) {
                throw new IllegalArgumentException();
            }
        }

        public SynchronizedRGB(int red,
                               int green,
                               int blue,
                               String name) {
            check(red, green, blue);
            this.red = red;
            this.green = green;
            this.blue = blue;
            this.name = name;
        }

        public void set(int red,
                        int green,
                        int blue,
                        String name) {
            check(red, green, blue);
            synchronized (this) {
                this.red = red;
                this.green = green;
                this.blue = blue;
                this.name = name;
            }
        }

        public synchronized int getRGB() {
            return ((red << 16) | (green << 8) | blue);
        }

        public synchronized String getName() {
            return name;
        }

        public synchronized void invert() {
            red = 255 - red;
            green = 255 - green;
            blue = 255 - blue;
            name = "Inverse of " + name;
        }
    }`

使用类SynchronizedRGB必须特别小心,避免出现名字和颜色的状态不统一的情况。假定,例如,一个线程执行了下面的代码:

    `SynchronizedRGB color =
        new SynchronizedRGB(0, 0, 0, "Pitch Black");
    ...
    int myColorInt = color.getRGB();      //Statement 1
    String myColorName = color.getName(); //Statement 2`

如果另外一个线程在语句一和语句二之间调用了color的set方法,myColorInt的值将和myColorName的值不符。为了避免这种情况发生,这个两条语句必须绑在一起执行:

    `synchronized (color) {
        int myColorInt = color.getRGB();
        String myColorName = color.getName();
    } `

这一类的不一致只可能在可变的对象中出现,这对于类SynchronizedRGB的不可变版本来说不是问题。

一个同步类的例子

SynchronizedRGB
是表示颜色的类,每一个对象代表一种颜色,使用三个整形数表示颜色的三基色,字符串表示颜色名称。

public class SynchronizedRGB {
    // Values must be between 0 and 255.
    private int red;
    private int green;
    private int blue;
    private String name;

    private void check(int red,
                       int green,
                       int blue) {
        if (red < 0 || red > 255
            || green < 0 || green > 255
            || blue < 0 || blue > 255) {
            throw new IllegalArgumentException();
        }
    }

    public SynchronizedRGB(int red,
                           int green,
                           int blue,
                           String name) {
        check(red, green, blue);
        this.red = red;
        this.green = green;
        this.blue = blue;
        this.name = name;
    }

    public void set(int red,
                    int green,
                    int blue,
                    String name) {
        check(red, green, blue);
        synchronized (this) {
            this.red = red;
            this.green = green;
            this.blue = blue;
            this.name = name;
        }
    }

    public synchronized int getRGB() {
        return ((red << 16) | (green << 8) | blue);
    }

    public synchronized String getName() {
        return name;
    }

    public synchronized void invert() {
        red = 255 - red;
        green = 255 - green;
        blue = 255 - blue;
        name = "Inverse of " + name;
    }
}

使用 SynchronizedRGB
时需要小心,避免其处于不一致的状态。例如一个线程执行了以下代码:

SynchronizedRGB color =
    new SynchronizedRGB(0, 0, 0, "Pitch Black");
...
int myColorInt = color.getRGB();      //Statement 1
String myColorName = color.getName(); //Statement 2

如果有另外一个线程在 Statement 1 之后、Statement 2 之前调用了 color.set
方法,那么 myColorInt 的值和 myColorName
的值就会不匹配。为了避免出现这样的结果,必须要像下面这样把这两条语句绑定到一块执行:

synchronized (color) {
    int myColorInt = color.getRGB();
    String myColorName = color.getName();
}

这种不一致的问题只可能发生在可变对象上。

定义不可变对象的策略

下面几条规则定义了一个简单的创建不可变对象的策略。不是所有记录在案的不可变对象都遵循这些规则。但这不是指这些类的创建者很粗心——他们也许有更好的理由去保证这些类的实例在创建之后不会改变状态。然而这些策略需要复杂的分析,不适合初学者。

  1. 不提供改变属性或者对象引用属性的setter方法。
  2. 所有的属性都声明为final和private的。
  3. 不允许子类重写方法。最简单实现这个的方法是声明类为final的。
    一个更加复杂的方法是声明构造器是private的,在工厂方法中创建实例。
  4. 如果实例属性中包含可变对象,不允许这些对象被改变:
  • 不提供改变这些对象的方法。
  • 不要共享可变对象的引用。决不存储传递给构造器的外部可变的对象的引用;如果需要,创建副本,存储副本的引用。同样,创建你的内部可变对象的引用时,要避免直接使用原来的方法。

将这些策略应用到类SynchronizedRGB的结果如果:

  1. 在这个类当中有两个setter方法。第一个方法set可以任意改变对象的状态,在不可变版本的类当中需要删除。第二个方法invert可以转化为创建一个新对象而不是改变现有的对象。
  2. 所有的属性已经是private的了;它们需要进一步声明为final的。
  3. 将类声明为final的。
  4. 只有一个属性指向这个对象,且这个对象本身是不可变的。所以防范改变包含的可变的对象是不需要的。

下面是改变之后的ImmutableRGB:

    `final public class ImmutableRGB {

        // Values must be between 0 and 255.
        final private int red;
        final private int green;
        final private int blue;
        final private String name;

        private void check(int red,
                           int green,
                           int blue) {
            if (red < 0 || red > 255
                || green < 0 || green > 255
                || blue < 0 || blue > 255) {
                throw new IllegalArgumentException();
            }
        }

        public ImmutableRGB(int red,
                            int green,
                            int blue,
                            String name) {
            check(red, green, blue);
            this.red = red;
            this.green = green;
            this.blue = blue;
            this.name = name;
        }


        public int getRGB() {
            return ((red << 16) | (green << 8) | blue);
        }

        public String getName() {
            return name;
        }

        public ImmutableRGB invert() {
            return new ImmutableRGB(255 - red,
                           255 - green,
                           255 - blue,
                           "Inverse of " + name);
        }
    }`

定义不可变对象的策略

以下的一些创建不可变对象的简单策略。并非所有不可变类都完全遵守这些规则,不过这不是编写这些类的程序员们粗心大意造成的,很可能的是他们有充分的理由确保这些对象在创建后不会被修改。但这需要非常复杂细致的分析,并不适用于初学者。

  • 不要提供 setter 方法。(包括修改字段的方法和修改字段引用对象的方法)
  • 将类的所有字段定义为 final、private 的。
  • 不允许子类重写方法。简单的办法是将类声明为
    final,更好的方法是将构造函数声明为私有的,通过工厂方法创建对象。
  • 如果类的字段是对可变对象的引用,不允许修改被引用对象。
    • 不提供修改可变对象的方法。
    • 不共享可变对象的引用。当一个引用被当做参数传递给构造函数,而这个引用指向的是一个外部的可变对象时,一定不要保存这个引用。如果必须要保存,那么创建可变对象的拷贝,然后保存拷贝对象的引用。同样如果需要返回内部的可变对象时,不要返回可变对象本身,而是返回其拷贝。

将这一策略应用到 SynchronizedRGB 有以下几步:

  • SynchronizedRGB 类有两个 setter 方法。第一个 set
    方法只是简单的为字段设值,第二个 invert
    方法修改为创建一个新对象,而不是在原有对象上修改。
  • 所有的字段都已经是私有的,加上 final 即可。
  • 将类声明为 final 的
  • 只有一个字段是对象引用,并且被引用的对象也是不可变对象。

经过以上这些修改后,我们得到了 ImmutableRGB:

public class ImmutableRGB {
      // Values must be between 0 and 255.
    final private int red;
    final private int green;
    final private int blue;
    final private String name;

    private void check(int red,
                       int green,
                       int blue) {
        if (red < 0 || red > 255
            || green < 0 || green > 255
            || blue < 0 || blue > 255) {
            throw new IllegalArgumentException();
        }
    }

    public ImmutableRGB(int red,
                        int green,
                        int blue,
                        String name) {
        check(red, green, blue);
        this.red = red;
        this.green = green;
        this.blue = blue;
        this.name = name;
    }

    public int getRGB() {
        return ((red << 16) | (green << 8) | blue);
    }

    public String getName() {
        return name;
    }

    public ImmutableRGB invert() {
        return new ImmutableRGB(255 - red,
                       255 - green,
                       255 - blue,
                       "Inverse of " + name);
    }
}

高级别的并发对象

到目前为止,从一开始我们主要集中在讲解Java平台的部分低级别的API。对于一些基本的任务这些API可以胜任,但是一些更高级的任务需要更高级别的构建块。这对于充分利用当今的多处理器和多核系统的大规模并发应用来说尤其如此。

在本节中,我们将介绍Java平台5.0版本中引入的一些高级并发功能。这些功能中的大多数都是在新包java.util.concurrent中实现的。在Java容器框架中也有新的并发数据结构。

  • Lock
    objects支持简化许多应用程序的锁定方法。
  • Executors定义了一种高级别的启动和管理线程的API。包java.util.concurrent的Executor的实现提供了使用于大型应用程序的线程池管理。
  • 并发集合使管理大量数据变得更容易,可以极大的减少同步的需要。
  • 原子变量有最小化同步的特性,且可以帮助避免内存一致性错误。
  • 在JDK7中的ThreadLocalRandom类提供了在多线程中高效产生随机数的方式。

高级并发对象

目前为止,之前的教程都是重点讲述了最初作为 Java 平台一部分的低级别
API。这些API
对于非常基本的任务来说已经足够,但是对于更高级的任务就需要更高级的
API。特别是针对充分利用了当今多处理器和多核系统的大规模并发应用程序。
本章,我们将着眼于 Java 5.0
新增的一些高级并发特征。大多数功能已经在新的java.util.concurrent
包中实现。Java 集合框架中也定义了新的并发数据结构。

锁对象

同步代码依赖一种简单的重入锁。这种锁极易使用但是也有很多限制。包java.util.concurrent.locks支持更加复杂的锁方式。我们不会去检查这个包的详情,我们主要是集中在基础的接口Lock 上面。

锁对象和同步代码的隐士锁一样工作。和隐士锁一样,在同一时刻,只有一个线程可以占有锁对象。锁对象同时也支持wait或notify原理,通过相关联的Condition对象实现。

锁对象相比于隐士锁最大的优势是锁对象有尝试获得锁然后退出的能力。tryLock方法会立即退出或者在一个指定的超时时间后退出。lockInterruptibly方法在它获得锁之前如果另外一个线程发送了中断会退出。

让我们来用锁对象解决在Liveness章节中遇到的死锁问题。A和B通过训练知道什么时候别人会鞠躬。我们通过要求我们的朋友对象在继续进行鞠躬之前必须获得两个参与者的锁来模拟这种改进。下面是改进的类SafeLock的源代码。为了展示这种方式的多功能性,我们假定A和B非常喜欢他们新的安全的鞠躬的能力,他们不停的像对方鞠躬。

    `import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    import java.util.Random;

    public class Safelock {
        static class Friend {
            private final String name;
            private final Lock lock = new ReentrantLock();

            public Friend(String name) {
                this.name = name;
            }

            public String getName() {
                return this.name;
            }

            public boolean impendingBow(Friend bower) {
                Boolean myLock = false;
                Boolean yourLock = false;
                try {
                    myLock = lock.tryLock();
                    yourLock = bower.lock.tryLock();
                } finally {
                    if (! (myLock && yourLock)) {
                        if (myLock) {
                            lock.unlock();
                        }
                        if (yourLock) {
                            bower.lock.unlock();
                        }
                    }
                }
                return myLock && yourLock;
            }

            public void bow(Friend bower) {
                if (impendingBow(bower)) {
                    try {
                        System.out.format("%s: %s has"
                            + " bowed to me!%n", 
                            this.name, bower.getName());
                        bower.bowBack(this);
                    } finally {
                        lock.unlock();
                        bower.lock.unlock();
                    }
                } else {
                    System.out.format("%s: %s started"
                        + " to bow to me, but saw that"
                        + " I was already bowing to"
                        + " him.%n",
                        this.name, bower.getName());
                }
            }

            public void bowBack(Friend bower) {
                System.out.format("%s: %s has" +
                    " bowed back to me!%n",
                    this.name, bower.getName());
            }
        }

        static class BowLoop implements Runnable {
            private Friend bower;
            private Friend bowee;

            public BowLoop(Friend bower, Friend bowee) {
                this.bower = bower;
                this.bowee = bowee;
            }

            public void run() {
                Random random = new Random();
                for (;;) {
                    try {
                        Thread.sleep(random.nextInt(10));
                    } catch (InterruptedException e) {}
                    bowee.bow(bower);
                }
            }
        }


        public static void main(String[] args) {
            final Friend alphonse =
                new Friend("Alphonse");
            final Friend gaston =
                new Friend("Gaston");
            new Thread(new BowLoop(alphonse, gaston)).start();
            new Thread(new BowLoop(gaston, alphonse)).start();
        }
    }`

锁对象

提供了可以简化许多并发应用的锁的惯用法。

同步代码依赖于一种简单的可重入锁。这种锁使用简单,但也有诸多限制。java.util.concurrent.locks
包提供了更复杂的锁。这里会重点关注其最基本的接口 Lock。 Lock
对象作用非常类似同步代码使用的内部锁。如同内部锁,每次只有一个线程可以获得
Lock 对象。通过关联 Condition 对象,Lock 对象也支持 wait/notify 机制。

Lock
对象之于隐式锁最大的优势在于,它们有能力收回获得锁的尝试。如果当前锁对象不可用,或者锁请求超时(如果超时时间已指定),tryLock
方法会收回获取锁的请求。如果在锁获取前,另一个线程发送了一个中断,lockInterruptibly
方法也会收回获取锁的请求。

让我们使用 Lock 对象来解决我们在活跃度中见到的死锁问题。Alphonse 和
Gaston 已经把自己训练成能注意到朋友何时要鞠躬。我们通过要求 Friend
对象在双方鞠躬前必须先获得锁来模拟这次改善。下面是改善后模型的源代码
Safelock :

public class Safelock {
    static class Friend {
        private final String name;
        private final Lock lock = new ReentrantLock();

        public Friend(String name) {
            this.name = name;
        }

        public String getName() {
            return this.name;
        }

        public boolean impendingBow(Friend bower) {
            Boolean myLock = false;
            Boolean yourLock = false;
            try {
                myLock = lock.tryLock();
                yourLock = bower.lock.tryLock();
            } finally {
                if (!(myLock && yourLock)) {
                    if (myLock) {
                        lock.unlock();
                    }
                    if (yourLock) {
                        bower.lock.unlock();
                    }
                }
            }
            return myLock && yourLock;
        }

        public void bow(Friend bower) {
            if (impendingBow(bower)) {
                try {
                    System.out.format("%s: %s has" + " bowed to me!%n", this.name, bower.getName());
                    bower.bowBack(this);
                } finally {
                    lock.unlock();
                    bower.lock.unlock();
                }
            } else {
                System.out.format(
                        "%s: %s started" + " to bow to me, but saw that" + " I was already bowing to" + " him.%n",
                        this.name, bower.getName());
            }
        }

        public void bowBack(Friend bower) {
            System.out.format("%s: %s has" + " bowed back to me!%n", this.name, bower.getName());
        }
    }

    static class BowLoop implements Runnable {
        private Friend bower;
        private Friend bowee;

        public BowLoop(Friend bower, Friend bowee) {
            this.bower = bower;
            this.bowee = bowee;
        }

        public void run() {
            Random random = new Random();
            for (;;) {
                try {
                    Thread.sleep(random.nextInt(10));
                } catch (InterruptedException e) {
                }
                bowee.bow(bower);
            }
        }
    }

    public static void main(String[] args) {
        final Friend alphonse = new Friend("Alphonse");
        final Friend gaston = new Friend("Gaston");
        new Thread(new BowLoop(alphonse, gaston)).start();
        new Thread(new BowLoop(gaston, alphonse)).start();
    }
}

Executors

执行器(Executors)

为加载和管理线程定义了高级 API。Executors 的实现由 java.util.concurrent
包提供,提供了适合大规模应用的线程池管理。

在之前所有的例子中,Thread 对象表示的线程和 Runnable
对象表示的线程所执行的任务之间是紧耦合的。这对于小型应用程序来说没问题,但对于大规模并发应用来说,合理的做法是将线程的创建与管理和程序的其他部分分离开。封装这些功能的对象就是执行器,接下来的部分将讲详细描述执行器。

Executor的一些接口

包java.util.concurrent定义了3个executor接口:

  • Executor,一个可以支持启动新任务的简单接口。
  • ExecutorService是Executor的子接口,它增加了有助于管理生命周期的功能,包括单个任务和执行器本身。
  • ScheduledExecutorService是ExecutorService的子接口,支持未来和/或定期执行任务。

通常,引用执行器对象的变量被声明为这三种接口类型之一,而不是执行器类类型。

执行器接口

在 java.util.concurrent 中包括三个执行器接口:

  • Executor,一个运行新任务的简单接口。
  • ExecutorService,扩展了 Executor
    接口。添加了一些用来管理执行器生命周期和任务生命周期的方法。
  • ScheduledExecutorService,扩展了 ExecutorService。支持 future
    和(或)定期执行任务。

通常来说,指向 executor
对象的变量应被声明为以上三种接口之一,而不是具体的实现类

Executor接口

Executor接口提供了一个简单的execute方法,这个方法被设计成代替常见的线程创建方式。如果r是一个Runnable对象,且e是一个Executor对象,你可以用替换

    `(new Thread(r)).start();`

            `e.execute(r);`

然而execute的定义不太具体。低级别的方式创建一个线程然后立即启动它。依赖于Executor的实现,execute方法可以做相同的事情,但是这个更像是使用一个已经存在的线程去跑r或者是说把r放到一个等待队列中,等到有空闲的工作线程。我们将在章节Thread
Pools中讲述工作线程。

Executor 接口

Executor 接口只有一个 execute
方法,用来替代通常创建(启动)线程的方法。例如:r 是一个 Runnable
对象,e 是一个 Executor 对象。可以使用

e.execute(r);

代替

(new Thread(r)).start();

但 execute 方法没有定义具体的实现方式。对于不同的 Executor 实现,execute
方法可能是创建一个新线程并立即启动,但更有可能是使用已有的工作线程运行r,或者将
r放入到队列中等待可用的工作线程。(我们将在线程池一节中描述工作线程。)

ExecutorService接口

ExecutorService接口提供类似于execute的更通用的submit方法。和execute方法一样,submit方法接受Runnable的对象,同时也接受Callable 对象,这个对象允许任务有返回值。submit方法返回一个Future 的对象,这个对象用来获得Callable的返回值,管理Callable和Runnable任务的状态。

ExecutorService 接口也提供方法提交大量的Callable对象。最后,ExecutorService提供一些管理executor停止执行的方法。为了支持立即的停止执行,任务需要正确的处理中断。

ExecutorService 接口

ExecutorService 接口在提供了 execute 方法的同时,新加了更加通用的 submit
方法。submit 方法除了和 execute 方法一样可以接受 Runnable
对象作为参数,还可以接受 Callable 对象作为参数。使用
Callable对象可以能使任务返还执行的结果。通过 submit
方法返回的Future 对象可以读取 Callable 任务的执行结果,或是管理 Callable
任务和 Runnable 任务的状态。 ExecutorService 也提供了批量运行 Callable
任务的方法。最后,ExecutorService
还提供了一些关闭执行器的方法。如果需要支持即时关闭,执行器所执行的任务需要正确处理中断。

ScheduledExecutorService Interface

ScheduledExecutorService 接口schedule方法,这个方法可以在指定的延迟之后执行Runnable或者是Callable任务。而且,接口还定义了scheduleAtFixedRate和schedulerWithFixedDelay方法,这些方法可以在指定的时间间隔内重复执行。

ScheduledExecutorService 接口

ScheduledExecutorService 扩展 ExecutorService接口并添加了 schedule
方法。调用 schedule 方法可以在指定的延时后执行一个Runnable 或者 Callable
任务。ScheduledExecutorService
接口还定义了按照指定时间间隔定期执行任务的 scheduleAtFixedRate 方法和
scheduleWithFixedDelay 方法。

线程池

大多数在包java.util.concurrent中的executor的实现都使用了线程池,线程池当中包含工作线程。这种线程和Runable任务,Callable任务是不同的,它们经常被用来执行多任务。

使用工作线程最小化线程创建所带来的开销。线程对象需要使用大量的内存,在大规模的应用中,多线程对象的分配和释放需要消耗大量的内存。

一种常见的线程池类型是固定大小的线程池。这种类型的线程池当中总是保持指定大小的线程数运行着;如果一个线程在使用当中因为某些原因被终止了,它将自动的被一个新的线程替代。任务通过一个内部的队列提交到线程池当中,当提交的任务数超过线程数时,这些超过的任务会进入到队列当中去。

固定线程池的一个重要优点是应用程序可以优雅降级的使用它。为了理解这个,假设一个网页服务应用程序用单独的线程处理每个http的请求。如果应用程序为每个新的http请求都创建一个新的线程,紧接着系统会立即收到很多线程,当所有这些线程的开销超过系统的容量时应用程序会突然停止响应所有的请求。因为线程创建的数量是有限制的,应用程序将不能够按照http请求进来的速度响应它们,但是应用程序会以自己所能处理的最快速度去响应它们。

一个简单的创建固定大小的线程池的方法是调用java.util.concurrent.Executors
的newFixedThreadPool 工厂方法。这个类同时也提供了下列的这些工厂方法:

  • newCachedThreadPool
    方法创建一个有着可以扩展大小的线程池的执行器。此执行器适用于启动许多短时间任务的应用程序。

  • newSingleThreadExecutor 方法创建一个在同一时刻只执行单个任务的执行器。

  • 数个工厂方法是上述执行器的ScheduledExecutorService数个版本。

如果上述executors提供的工厂方法没有一个满足你的要求,创建
java.util.concurrent.ThreadPoolExecutor 或java.util.concurrent.ScheduledThreadPoolExecutor
的实例将给你更多的选择。

线程池

线程池是最常见的一种执行器的实现。

在 java.util.concurrent
包中多数的执行器实现都使用了由工作线程组成的线程池,工作线程独立于所它所执行的
Runnable 任务和 Callable 任务,并且常用来执行多个任务。

使用工作线程可以使创建线程的开销最小化。在大规模并发应用中,创建大量的
Thread 对象会占用占用大量系统内存,分配和回收这些对象会产生很大的开销。

一种最常见的线程池是固定大小的线程池。这种线程池始终有一定数量的线程在运行,如果一个线程由于某种原因终止运行了,线程池会自动创建一个新的线程来代替它。需要执行的任务通过一个内部队列提交给线程,当没有更多的工作线程可以用来执行任务时,队列保存额外的任务。

使用固定大小的线程池一个很重要的好处是可以实现优雅退化(degrade
gracefully)。例如一个 Web 服务器,每一个 HTTP
请求都是由一个单独的线程来处理的,如果为每一个 HTTP
都创建一个新线程,那么当系统的开销超出其能力时,会突然地对所有请求都停止响应。如果限制
Web
服务器可以创建的线程数量,那么它就不必立即处理所有收到的请求,而是在有能力处理请求时才处理。

创建一个使用线程池的执行器最简单的方法是调用 java.util.concurrent.Executors 的 newFixedThreadPool 方法。Executors
类还提供了下列一下方法:

  • newCachedThreadPool 方法创建了一个可扩展的线程池。适合用来启动很多短任务的应用程序。
  • newSingleThreadExecutor 方法创建了每次执行一个任务的执行器。
  • 还有一些 ScheduledExecutorService 执行器创建的工厂方法。

如果上面的方法都不满足需要,可以尝试 java.util.concurrent.ThreadPoolExecutor 或者java.util.concurrent.ScheduledThreadPoolExecutor。

Fork/Join

Fork/Join是一种实现了ExecutorService的框架,它可以帮助你充分利用多处理器的优势。它是为那些可以递归的分解成更小的任务的工作而设计的。它的目的是使用所有可用的处理器的能力来增强应用的性能。

和其它ExecutorService的实现一样,fork/join框架将任务分配给在线程池中的工作线程。但是fork/join框架的区别是它使用了work-stealing算法。工作线程在做完自己的任务之后可以偷其它繁忙的线程的任务来做。

fork/join框架的核心是类是ForkJoinPool ,它是类AbstractExecutorService的扩展。ForkJoinPool实现了work-stealing算法,它可以执行ForkJoinTask

Fork/Join

该框架是 JDK 7 中引入的并发框架。

fork/join 框架是 ExecutorService
接口的一种具体实现,目的是为了帮助你更好地利用多处理器带来的好处。它是为那些能够被递归地拆解成子任务的工作类型量身设计的。其目的在于能够使用所有可用的运算能力来提升你的应用的性能。

类似于 ExecutorService 接口的其他实现,fork/join
框架会将任务分发给线程池中的工作线程。fork/join
框架的独特之处在与它使用工作窃取(work-stealing)算法。完成自己的工作而处于空闲的工作线程能够从其他仍然处于忙碌(busy)状态的工作线程处窃取等待执行的任务。

fork/join 框架的核心是 ForkJoinPool 类,它是对 AbstractExecutorService
类的扩展。ForkJoinPool 实现了工作窃取算法,并可以执行ForkJoinTask 任务。

基本使用方法

使用fork/join框架的第一步是编写工作的一段的代码。你的代码应该和下面的这些代码类似:

if (my portion of the work is small enough) do the work directly else split my work into two pieces invoke the two pieces and wait for the results

把这段代码放在ForkJoinTask的子类当中,典型的使用更加专业的类,RecursiveTask (可以返回结果)或者是RecursiveAction 。

在你的ForkJoinTask的子类准备好了之后,创建一个代表所有的需要做的工作的对象,然后把它给到ForkJoinPool实例的invoke方法。

基本使用方法

使用 fork/join
框架的第一步是编写执行一部分工作的代码。你的代码结构看起来应该与下面所示的伪代码类似:

if (my portion of the work is small enough)
  do the work directly
else
  split my work into two pieces
  invoke the two pieces and wait for the results

翻译为中文为:

if (当前这个任务工作量足够小)
    直接完成这个任务
else
    将这个任务或这部分工作分解成两个部分
    分别触发(invoke)这两个子任务的执行,并等待结果

你需要将这段代码包裹在一个 ForkJoinTask
的子类中。不过,通常情况下会使用一种更为具体的的类型,或者是 RecursiveTask(会返回一个结果),或者是 RecursiveAction。
当你的 ForkJoinTask
子类准备好了,创建一个代表所有需要完成工作的对象,然后将其作为参数传递给一个ForkJoinPool
实例的 invoke() 方法即可。

模糊处理

为了帮你理解fork/join框架是如何工作的,看一下下面的例子。假定你要模糊处理一张图片。一个整数类型的数组代表原始的图片,每一个单个的整数代表单个像素的颜色的值。经过模糊处理的目标图片也是用与原图片相同大小的整数数组表示的。

模糊处理是通过一次处理原数组的一个像素来完成的。每一个像素取它的周围的像素的平均值
(红色、绿色和蓝色分别平均),然后将结果放到目标数组当中。因为一张图片是一个很大的数组,这样子处理会消耗很长的时间。你可以使用fork/join框架充分利用多处理器系统的并发处理能力。下面是一个可能的实现:

`public class ForkBlur extends RecursiveAction {
private int[] mSource;
private int mStart;
private int mLength;
private int[] mDestination;

// Processing window size; should be odd.
private int mBlurWidth = 15;

public ForkBlur(int[] src, int start, int length, int[] dst) {
    mSource = src;
    mStart = start;
    mLength = length;
    mDestination = dst;
}

protected void computeDirectly() {
    int sidePixels = (mBlurWidth - 1) / 2;
    for (int index = mStart; index < mStart + mLength; index++) {
        // Calculate average.
        float rt = 0, gt = 0, bt = 0;
        for (int mi = -sidePixels; mi <= sidePixels; mi++) {
            int mindex = Math.min(Math.max(mi + index, 0),
                                mSource.length - 1);
            int pixel = mSource[mindex];
            rt += (float)((pixel & 0x00ff0000) >> 16)
                  / mBlurWidth;
            gt += (float)((pixel & 0x0000ff00) >>  8)
                  / mBlurWidth;
            bt += (float)((pixel & 0x000000ff) >>  0)
                  / mBlurWidth;
        }

        // Reassemble destination pixel.
        int dpixel = (0xff000000     ) |
               (((int)rt) << 16) |
               (((int)gt) <<  8) |
               (((int)bt) <<  0);
        mDestination[index] = dpixel;
    }
}`

未完待续~~~

模糊图片的例子

想要了解 fork/join
框架的基本工作原理,接下来的这个例子会有所帮助。假设你想要模糊一张图片。原始的
source 图片由一个整数的数组表示,每个整数表示一个像素点的颜色数值。与
source 图片相同,模糊之后的 destination 图片也由一个整数数组表示。
对图片的模糊操作是通过对 source
数组中的每一个像素点进行处理完成的。处理的过程是这样的:将每个像素点的色值取出,与周围像素的色值(红、黄、蓝三个组成部分)放在一起取平均值,得到的结果被放入
destination
数组。因为一张图片会由一个很大的数组来表示,这个流程会花费一段较长的时间。如果使用
fork/join
框架来实现这个模糊算法,你就能够借助多处理器系统的并行处理能力。下面是上述算法结合
fork/join 框架的一种简单实现:

public class ForkBlur extends RecursiveAction {
    private int[] mSource;
    private int mStart;
    private int mLength;
    private int[] mDestination;

    // Processing window size; should be odd.
    private int mBlurWidth = 15;

    public ForkBlur(int[] src, int start, int length, int[] dst) {
        mSource = src;
        mStart = start;
        mLength = length;
        mDestination = dst;
    }

    protected void computeDirectly() {
        int sidePixels = (mBlurWidth - 1) / 2;
        for (int index = mStart; index < mStart + mLength; index++) {
            // Calculate average.
            float rt = 0, gt = 0, bt = 0;
            for (int mi = -sidePixels; mi <= sidePixels; mi++) {
                int mindex = Math.min(Math.max(mi + index, 0),
                                    mSource.length - 1);
                int pixel = mSource[mindex];
                rt += (float)((pixel & 0x00ff0000) >> 16)
                      / mBlurWidth;
                gt += (float)((pixel & 0x0000ff00) >>  8)
                      / mBlurWidth;
                bt += (float)((pixel & 0x000000ff) >>  0)
                      / mBlurWidth;
            }

            // Reassemble destination pixel.
            int dpixel = (0xff000000     ) |
                   (((int)rt) << 16) |
                   (((int)gt) <<  8) |
                   (((int)bt) <<  0);
            mDestination[index] = dpixel;
        }
    }

  ...

接下来你需要实现父类中的 compute()
方法,它会直接执行模糊处理,或者将当前的工作拆分成两个更小的任务。数组的长度可以作为一个简单的阀值来判断任务是应该直接完成还是应该被拆分。

protected static int sThreshold = 100000;

protected void compute() {
    if (mLength < sThreshold) {
        computeDirectly();
        return;
    }

    int split = mLength / 2;

    invokeAll(new ForkBlur(mSource, mStart, split, mDestination),
              new ForkBlur(mSource, mStart + split, mLength - split,
                           mDestination));
}

如果前面这个方法是在一个 RecursiveAction
的子类中,那么设置任务在ForkJoinPool
中执行就再直观不过了。通常会包含以下一些步骤:

  1. 创建一个表示所有需要完成工作的任务。// source image pixels are in
    src // destination image pixels are in dst ForkBlur fb = new
    ForkBlur(src, 0, src.length, dst);
  2. 创建将要用来执行任务的 ForkJoinPool。ForkJoinPool pool = new
    ForkJoinPool();
  3. 执行任务。pool.invoke(fb);

想要浏览完成的源代码,请查看 ForkBlur示例,其中还包含一些创建
destination 图片文件的额外代码。

标准实现

除了能够使用 fork/join
框架来实现能够在多处理系统中被并行执行的定制化算法(如前文中的
ForkBlur.java 例子),在 Java SE 中一些比较常用的功能点也已经使用
fork/join 框架来实现了。在 Java SE 8
中,java.util.Arrays 类的一系列parallelSort() 方法就使用了 fork/join
来实现。这些方法与 sort() 方法很类似,但是通过使用 fork/join框
架,借助了并发来完成相关工作。在多处理器系统中,对大数组的并行排序会比串行排序更快。这些方法究竟是如何运用
fork/join 框架并不在本教程的讨论范围内。想要了解更多的信息,请参见 Java
API 文档。 其他采用了 fork/join
框架的方法还包括java.util.streams包中的一些方法,此包是作为 Java SE 8
发行版中 Project Lambda 的一部分。想要了解更多信息,请参见 Lambda
表达式一节。

并发集合

并发集合简化了大型数据集合管理,且极大的减少了同步的需求。

java.util.concurrent 包囊括了 Java
集合框架的一些附加类。它们也最容易按照集合类所提供的接口来进行分类:

  • BlockingQueue 定义了一个先进先出的数据结构,当你尝试往满队列中添加元素,或者从空队列中获取元素时,将会阻塞或者超时。
  • ConcurrentMap 是 java.util.Map 的子接口,定义了一些有用的原子操作。移除或者替换键值对的操作只有当
    key 存在时才能进行,而新增操作只有当 key
    不存在时。使这些操作原子化,可以避免同步。ConcurrentMap
    的标准实现是 ConcurrentHashMap,它是 HashMap 的并发模式。
  • ConcurrentNavigableMap 是 ConcurrentMap
    的子接口,支持近似匹配。ConcurrentNavigableMap 的标准实现是
    ConcurrentSkipListMap,它是 TreeMap 的并发模式。

所有这些集合,通过在集合里新增对象和访问或移除对象的操作之间,定义一个happens-before
的关系,来帮助程序员避免内存一致性错误。

原子变量

java.util.concurrent.atomic 包定义了对单一变量进行原子操作的类。所有的类都提供了
get 和 set 方法,可以使用它们像读写 volatile
变量一样读写原子类。就是说,同一变量上的一个 set 操作对于任意后续的 get
操作存在 happens-before 关系。原子的 compareAndSet
方法也有内存一致性特点,就像应用到整型原子变量中的简单原子算法。

为了看看这个包如何使用,让我们返回到最初用于演示线程干扰的 Counter 类:

class Counter {
    private int c = 0;

    public void increment() {
        c++;
    }

    public void decrement() {
        c--;
    }

    public int value() {
        return c;
    }
}

使用同步是一种使 Counter 类变得线程安全的方法,如 SynchronizedCounter:

class SynchronizedCounter {
    private int c = 0;

    public synchronized void increment() {
        c++;
    }

    public synchronized void decrement() {
        c--;
    }

    public synchronized int value() {
        return c;
    }
}

对于这个简单的类,同步是一种可接受的解决方案。但是对于更复杂的类,我们可能想要避免不必要同步所带来的活跃度影响。将
int 替换为 AtomicInteger 允许我们在不进行同步的情况下阻止线程干扰,如
AtomicCounter:

import java.util.concurrent.atomic.AtomicInteger;

class AtomicCounter {
    private AtomicInteger c = new AtomicInteger(0);

    public void increment() {
        c.incrementAndGet();
    }

    public void decrement() {
        c.decrementAndGet();
    }

    public int value() {
        return c.get();
    }
}

并发随机数

并发随机数(JDK7)提供了高效的多线程生成伪随机数的方法。

在 JDK7 中,java.util.concurrent 包含了一个相当便利的类 ThreadLocalRandom,可以在当应用程序期望在多个线程或
ForkJoinTasks 中使用随机数时使用。

对于并发访问,使用 TheadLocalRandom 代替 Math.random()
可以减少竞争,从而获得更好的性能。

你只需调用 ThreadLocalRandom.current(),
然后调用它的其中一个方法去获取一个随机数即可。下面是一个例子:

int r = ThreadLocalRandom.current() .nextInt(4, 77);

源码

本章例子的源码,可以在  中
com.waylau.essentialjava.concurrency 包下找到。

参考

  • 更多内容可移步至笔者所著开源书《Java
    编程要点》
You can leave a response, or trackback from your own site.

Leave a Reply

网站地图xml地图