JCIP chap3 share objects

图片 6

多线程共享变量的情况下,为了保证数据一致性,往往需要对这些变量的访问进行加锁。而锁本身又会带来一些问题和开销。Immutable
Object模式使得我们可以在不使用锁的情况下,既保证共享变量访问的线程安全,又能避免引入锁可能带来的问题和开销。

“同步”确保了操作的原子性执行,但它还有其它重要的方面:memory
visibility.我们不但要确保当一个线程在使用一个对象的时候,其它线程不能修改这个对象,而且还要保证该线程在修改对象状态时,其它线程能够看到该线程对对象所做的改变。

Active Object模式简介

Active
Object模式是一种异步编程模式。它通过对方法的调用与方法的执行进行解耦来提高并发性。若以任务的概念来说,Active
Object模式的核心则是它允许任务的提交(相当于对异步方法的调用)和任务的执行(相当于异步方法的真正执行)分离。这有点类似于System.gc()这个方法:客户端代码调用完gc()后,一个进行垃圾回收的任务被提交,但此时JVM并不一定进行了垃圾回收,而可能是在gc()方法调用返回后的某段时间才开始执行任务——回收垃圾。我们知道,System.gc()的调用方代码是运行在自己的线程上(通常是main线程派生的子线程),而JVM的垃圾回收这个动作则由专门的线程(垃圾回收线程)来执行的。换言之,System.gc()这个方法所代表的动作(其所定义的功能)的调用方和执行方是运行在不同的线程中的,从而提高了并发性。

再进一步介绍Active
Object模式,我们可先简单地将其核心理解为一个名为ActiveObject的类,该类对外暴露了一些异步方法,如图1所示。

图 1. ActiveObject对象示例

图片 1

doSomething方法的调用方和执行方运行在各自的线程上。在并发的环境下,doSomething方法会被多个线程调用。这时所需的线程安全控制封装在doSomething方法背后,使得调用方代码无需关心这点,从而简化了调用方代码:从调用方代码来看,调用一个Active
Object对象的方法与调用普通Java对象的方法并无太大差别。如清单1所示。

清单 1. Active Object方法调用示例

ActiveObject ao=...;
Future future = ao.doSomething("data");
//执行其它操作
String result = future.get();
System.out.println(result);

Immutable Object模式简介

多线程环境中,一个对象常常会被多个线程共享。这种情况下,如果存在多个线程并发地修改该对象的状态或者一个线程读取该对象的状态而另外一个线程试图修改该对象的状态,我们不得不做一些同步访问控制以保证数据一致性。而这些同步访问控制,如显式锁和CAS操作,会带来额外的开销和问题,如上下文切换、等待时间和ABA问题等。Immutable
Object模式的意图是通过使用对外可见的状态不可变的对象(即Immutable
Object),使得被共享对象“天生”具有线程安全性,而无需额外的同步访问控制。从而既保证了数据一致性,又避免了同步访问控制所产生的额外开销和问题,也简化了编程。

所谓状态不可变的对象,即对象一经创建其对外可见的状态就保持不变,例如Java中的String和Integer。这点固然容易理解,但这还不足以指导我们在实际工作中运用Immutable
Object模式。下面我们看一个典型应用场景,这不仅有助于我们理解它,也有助于在实际的环境中运用它。

一个车辆管理系统要对车辆的位置信息进行跟踪,我们可以对车辆的位置信息建立如清单1所示的模型。

可以通过显式的同步语句或内建类库的同步机制以保证对象的正确发布。

Active Object模式的架构

当Active
Object模式对外暴露的异步方法被调用时,与该方法调用相关的上下文信息,包括被调用的异步方法名(或其代表的操作)、调用方代码所传递的参数等,会被封装成一个对象。该对象被称为方法请求(Method
Request)。方法请求对象会被存入Active
Object模式所维护的缓冲区(Activation
Queue)中,并由专门的工作线程负责根据其包含的上下文信息执行相应的操作。也就是说,方法请求对象是由运行调用方代码的线程通过调用Active
Object模式对外暴露的异步方法生成的,而方法请求所代表的操作则由专门的线程来执行,从而实现了方法的调用与执行的分离,产生了并发。

Active Object模式的主要参与者有以下几种。其类图如图2所示。

图 2. Active Object模式的类图

(点击图像放大)

图片 2

  • Proxy:负责对外暴露异步方法接口。当调用方代码调用该参与者实例的异步方法doSomething时,该方法会生成一个相应的MethodRequest实例并将其存储到Scheduler所维护的缓冲区中。doSomething方法的返回值是一个表示其执行结果的外包装对象:Future参与者的实例。异步方法doSomething运行在调用方代码所在的线程中。
  • MethodRequest:负责将调用方代码对Proxy实例的异步方法的调用封装为一个对象。该对象保留了异步方法的名称及调用方代码传递的参数等上下文信息。它使得将Proxy的异步方法的调用和执行分离成为可能。其call方法会根据其所包含上下文信息调用Servant实例的相应方法。
  • ActivationQueue:负责临时存储由Proxy的异步方法被调用时所创建的MethodRequest实例的缓冲区。
  • Scheduler:负责将Proxy的异步方法所创建的MethodRequest实例存入其维护的缓冲区中。并根据一定的调度策略,对其维护的缓冲区中的MethodRequest实例进行执行。其调度策略可以根据实际需要来定,如FIFO、LIFO和根据MethodRequest中包含的信息所定的优先级等。
  • Servant:负责对Proxy所暴露的异步方法的具体实现。
  • Future:负责存储和返回Active Object异步方法的执行结果。

Active Object模式的序列图如图3所示。

图 3. Active Object模式的序列图

(点击图像放大)

图片 3

第1步:调用方代码调用Proxy的异步方法doSomething。

第2~7步:doSomething方法创建Future实例作为该方法的返回值。并将调用方代码对该方法的调用封装为MethodRequest对象。然后以所创建的MethodRequest对象作为参数调用Scheduler的enqueue方法,以将MethodRequest对象存入缓冲区。Scheduler的enqueue方法会调用Scheduler所维护的ActivationQueue实例的enqueue方法,将MethodRequest对象存入缓冲区。

第8步:doSomething返回其所创建的Future实例。

第9步:Scheduler实例采用专门的工作线程运行dispatch方法。

第10~12步:dispatch方法调用ActivationQueue实例的dequeue方法,获取一个MethodRequest对象。然后调用MethodRequest对象的call方法

第13~16步:MethodRequest对象的call方法调用与其关联的Servant实例的相应方法doSomething。并将Servant.doSomething方法的返回值设置到Future实例上。

第17步:MethodRequest对象的call方法返回。

上述步骤中,第1~8步是运行在Active
Object的调用者线程中的,这几个步骤实现了将调用方代码对Active
Object所提供的异步方法的调用封装成对象(Method
Request),并将其存入缓冲区。这几个步骤实现了任务的提交。第9~17步是运行在Active
Object的工作线程中,这些步骤实现从缓冲区中读取Method
Request,并对其进行执行,实现了任务的执行。从而实现了Active
Object对外暴露的异步方法的调用与执行的分离。

如果调用方代码关心Active
Object的异步方法的返回值,则可以在其需要时,调用Future实例的get方法来获得异步方法的真正执行结果。

清单 1. 状态可变的位置信息模型(非线程安全)

public class Location {

    private double x;
    private double y;

    public Location(double x, double y) {
        this.x = x;
        this.y = y;
    }

    public double getX() {
        return x;
    }

    public double getY() {
        return y;
    }

    public void setXY(double x, double y) {
        this.x = x;
        this.y = y;
    }
}

当系统接收到新的车辆坐标数据时,需要调用Location的setXY方法来更新位置信息。显然,清单1中setXY是非线程安全的,因为对坐标数据x和y的写操作不是一个原子操作。setXY被调用时,如果在x写入完毕,而y开始写之前有其它线程来读取位置信息,则该线程可能读到一个被追踪车辆根本不曾经过的位置。为了使setXY方法具备线程安全性,我们需要借助锁进行访问控制。虽然被追踪车辆的位置信息总是在变化,但是我们也可以将位置信息建模为状态不可变的对象,如清单2所示。

3.1. Visibility

Active Object模式实战案例

某电信软件有一个彩信短号模块。其主要功能是实现手机用户给其它手机用户发送彩信时,接收方号码可以填写为对方的短号。例如,用户13612345678给其同事13787654321发送彩信时,可以将接收方号码填写为对方的短号,如776,而非其真实的号码。

该模块处理其接收到的下发彩信请求的一个关键操作是查询数据库以获得接收方短号对应的真实号码(长号)。该操作可能因为数据库故障而失败,从而使整个请求无法继续被处理。而数据库故障是可恢复的故障,因此在短号转换为长号的过程中如果出现数据库异常,可以先将整个下发彩信请求消息缓存到磁盘中,等到数据库恢复后,再从磁盘中读取请求消息,进行重试。为方便起见,我们可以通过Java的对象序列化API,将表示下发彩信的对象序列化到磁盘文件中从而实现请求缓存。下面我们讨论这个请求缓存操作还需要考虑的其它因素,以及Active
Object模式如何帮助我们满足这些考虑。

首先,请求消息缓存到磁盘中涉及文件I/O这种慢的操作,我们不希望它在请求处理的主线程(即Web服务器的工作线程)中执行。因为这样会使该模块的响应延时增大,降低系统的响应性。并使得Web服务器的工作线程因等待文件I/O而降低了系统的吞吐量。这时,异步处理就派上用场了。Active
Object模式可以帮助我们实现请求缓存这个任务的提交和执行分离:任务的提交是在Web服务器的工作线程中完成,而任务的执行(包括序列化对象到磁盘文件中等操作)则是在Active
Object工作线程中执行。这样,请求处理的主线程在侦测到短号转长号失败时即可以触发对当前彩信下发请求进行缓存,接着继续其请求处理,如给客户端响应。而此时,当前请求消息可能正在被Active
Object线程缓存到文件中。如图4所示。

图 4 .异步实现缓存

图片 4

其次,每个短号转长号失败的彩信下发请求消息会被缓存为一个磁盘文件。但我们不希望这些缓存文件被存在同一个子目录下。而是希望多个缓存文件会被存储到多个子目录中。每个子目录最多可以存储指定个数(如2000个)的缓存文件。若当前子目录已存满,则新建一个子目录存放新的缓存文件,直到该子目录也存满,依此类推。当这些子目录的个数到达指定数量(如100个)时,最老的子目录(连同其下的缓存文件,如果有的话)会被删除。从而保证子目录的个数也是固定的。显然,在并发环境下,实现这种控制需要一些并发访问控制(如通过锁来控制),但是我们不希望这种控制暴露给处理请求的其它代码。而Active
Object模式中的Proxy参与者可以帮助我们封装并发访问控制。

下面,我们看该案例的相关代码通过应用Active
Object模式在实现缓存功能时满足上述两个目标。首先看请求处理的入口类。该类就是本案例的Active
Object模式的客调用方代码。如清单2所示。

清单 2. 彩信下发请求处理的入口类

public class MMSDeliveryServlet extends HttpServlet {

    private static final long serialVersionUID = 5886933373599895099L;

    @Override
    public void doPost(HttpServletRequest req, HttpServletResponse resp)
            throws ServletException, IOException {
        //将请求中的数据解析为内部对象
        MMSDeliverRequest mmsDeliverReq = this.parseRequest(req.getInputStream());
        Recipient shortNumberRecipient = mmsDeliverReq.getRecipient();
        Recipient originalNumberRecipient = null;

        try {
            // 将接收方短号转换为长号
            originalNumberRecipient = convertShortNumber(shortNumberRecipient);
        } catch (SQLException e) {

            // 接收方短号转换为长号时发生数据库异常,触发请求消息的缓存
            AsyncRequestPersistence.getInstance().store(mmsDeliverReq);

            // 继续对当前请求的其它处理,如给客户端响应
            resp.setStatus(202);
        }

    }

    private MMSDeliverRequest parseRequest(InputStream reqInputStream) {
        MMSDeliverRequest mmsDeliverReq = new MMSDeliverRequest();
        //省略其它代码
        return mmsDeliverReq;
    }

    private Recipient convertShortNumber(Recipient shortNumberRecipient)
            throws SQLException {
        Recipient recipent = null;
        //省略其它代码
        return recipent;
    }

}

清单2中的doPost方法在侦测到短号转换过程中发生的数据库异常后,通过调用AsyncRequestPersistence类的store方法触发对彩信下发请求消息的缓存。这里,AsyncRequestPersistence类相当于Active
Object模式中的Proxy参与者。尽管本案例涉及的是一个并发环境,但从清单2中的代码可见,AsyncRequestPersistence类的调用方代码无需处理多线程同步问题。这是因为多线程同步问题被封装在AsyncRequestPersistence类之后。

AsyncRequestPersistence类的代码如清单3所示。

清单 3. 彩信下发请求缓存入口类(Active Object模式的Proxy)

// ActiveObjectPattern.Proxy
public class AsyncRequestPersistence implements RequestPersistence {
    private static final long ONE_MINUTE_IN_SECONDS = 60;
    private final Logger logger;
    private final AtomicLong taskTimeConsumedPerInterval = new AtomicLong(0);
    private final AtomicInteger requestSubmittedPerIterval = new AtomicInteger(0);

    // ActiveObjectPattern.Servant
    private final DiskbasedRequestPersistence 
                        delegate = new DiskbasedRequestPersistence();
    // ActiveObjectPattern.Scheduler
    private final ThreadPoolExecutor scheduler;

    private static class InstanceHolder {
        final static RequestPersistence INSTANCE = new AsyncRequestPersistence();
    }

    private AsyncRequestPersistence() {
        logger = Logger.getLogger(AsyncRequestPersistence.class);
        scheduler = new ThreadPoolExecutor(1, 3, 
                60 * ONE_MINUTE_IN_SECONDS,
                TimeUnit.SECONDS,
                // ActiveObjectPattern.ActivationQueue
                new LinkedBlockingQueue(200), 
                new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable r) {
                        Thread t;
                        t = new Thread(r, "AsyncRequestPersistence");
                        return t;
                    }

                });

        scheduler.setRejectedExecutionHandler(
                new ThreadPoolExecutor.DiscardOldestPolicy());

        // 启动队列监控定时任务
        Timer monitorTimer = new Timer(true);
        monitorTimer.scheduleAtFixedRate(
            new TimerTask() {

            @Override
            public void run() {
                if (logger.isInfoEnabled()) {

                    logger.info("task count:" 
                            + requestSubmittedPerIterval
                            + ",Queue size:" 
                            + scheduler.getQueue().size()
                            + ",taskTimeConsumedPerInterval:"
                            + taskTimeConsumedPerInterval.get() 
                            + " ms");
                }

                taskTimeConsumedPerInterval.set(0);
                requestSubmittedPerIterval.set(0);

            }
        }, 0, ONE_MINUTE_IN_SECONDS * 1000);
    }

    public static RequestPersistence getInstance() {
        return InstanceHolder.INSTANCE;
    }

    @Override
    public void store(final MMSDeliverRequest request) {
        /*
         * 将对store方法的调用封装成MethodRequest对象, 并存入缓冲区。
         */
        // ActiveObjectPattern.MethodRequest
        Callable methodRequest = new Callable() {
            @Override
            public Boolean call() throws Exception {
                long start = System.currentTimeMillis();
                try {
                    delegate.store(request);
                } finally {
                    taskTimeConsumedPerInterval.addAndGet(
                            System.currentTimeMillis() - start);
                }

                return Boolean.TRUE;
            }

        };
        scheduler.submit(methodRequest);

        requestSubmittedPerIterval.incrementAndGet();
    }

}

AsyncRequestPersistence类所实现的接口RequestPersistence定义了Active
Object对外暴露的异步方法:store方法。由于本案例不关心请求缓存的结果,故该方法没有返回值。其代码如清单4所示。

清单 4. RequestPersistence接口源码

public interface RequestPersistence {

     void store(MMSDeliverRequest request);
}

AsyncRequestPersistence类的实例变量scheduler相当于Active
Object模式中的Scheduler参与者实例。这里我们直接使用了JDK1.5引入的Executor
Framework中的ThreadPoolExecutor。在ThreadPoolExecutor类的实例化时,其构造器的第5个参数(BlockingQueue<Runnable>
workQueue)我们指定了一个有界阻塞队列:new
LinkedBlockingQueue<Runnable>(200)。该队列相当于Active
Object模式中的ActivationQueue参与者实例。

AsyncRequestPersistence类的实例变量delegate相当于Active
Object模式中的Servant参与者实例。

AsyncRequestPersistence类的store方法利用匿名类生成一个java.util.concurrent.Callable实例methodRequest。该实例相当于Active
Object模式中的MethodRequest参与者实例。利用闭包(Closure),该实例封装了对store方法调用的上下文信息(包括调用参数、所调用的方法对应的操作信息)。AsyncRequestPersistence类的store方法通过调用scheduler的submit方法,将methodRequest送入ThreadPoolExecutor所维护的缓冲区(阻塞队列)中。确切地说,ThreadPoolExecutor是Scheduler参与者的一个“近似”实现。ThreadPoolExecutor的submit方法相对于Scheduler的enqueue方法,该方法用于接纳MethodRequest对象,以将其存入缓冲区。当ThreadPoolExecutor当前使用的线程数量小于其核心线程数量时,submit方法所接收的任务会直接被新建的线程执行。当ThreadPoolExecutor当前使用的线程数量大于其核心线程数时,submit方法所接收的任务才会被存入其维护的阻塞队列中。不过,ThreadPoolExecutor的这种任务处理机制,并不妨碍我们将它用作Scheduler的实现。

methodRequest的call方法会调用delegate的store方法来真正实现请求缓存功能。delegate实例对应的类DiskbasedRequestPersistence是请求消息缓存功能的真正实现者。其代码如清单5所示。

清单 5. DiskbasedRequestPersistence类的源码

public class DiskbasedRequestPersistence implements RequestPersistence {
    // 负责缓存文件的存储管理
    private final SectionBasedDiskStorage storage = new SectionBasedDiskStorage();
    private final Logger logger = Logger
                                     .getLogger(DiskbasedRequestPersistence.class);

    @Override
    public void store(MMSDeliverRequest request) {
        // 申请缓存文件的文件名
        String[] fileNameParts = storage.apply4Filename(request);
        File file = new File(fileNameParts[0]);
        try {
            ObjectOutputStream objOut = new ObjectOutputStream(
            new FileOutputStream(file));
            try {
                objOut.writeObject(request);
            } finally {
                objOut.close();
            }
        } catch (FileNotFoundException e) {
            storage.decrementSectionFileCount(fileNameParts[1]);
            logger.error("Failed to store request", e);
        } catch (IOException e) {
            storage.decrementSectionFileCount(fileNameParts[1]);
            logger.error("Failed to store request", e);
        }

    }

    class SectionBasedDiskStorage {
        private Deque sectionNames = new LinkedList();
        /*
         * Key->value: 存储子目录名->子目录下缓存文件计数器
         */
        private Map sectionFileCountMap 
                        = new HashMap();
        private int maxFilesPerSection = 2000;
        private int maxSectionCount = 100;
        private String storageBaseDir = System.getProperty("user.dir") + "/vpn";

        private final Object sectionLock = new Object();

        public String[] apply4Filename(MMSDeliverRequest request) {
            String sectionName;
            int iFileCount;
            boolean need2RemoveSection = false;
            String[] fileName = new String[2];
            synchronized (sectionLock) {
                //获取当前的存储子目录名
                sectionName = this.getSectionName();
                AtomicInteger fileCount;
                fileCount = sectionFileCountMap.get(sectionName);
                iFileCount = fileCount.get();
                //当前存储子目录已满
                if (iFileCount >= maxFilesPerSection) {
                    if (sectionNames.size() >= maxSectionCount) {
                        need2RemoveSection = true;
                    }
                    //创建新的存储子目录
                    sectionName = this.makeNewSectionDir();
                    fileCount = sectionFileCountMap.get(sectionName);

                }
                iFileCount = fileCount.addAndGet(1);

            }

            fileName[0] = storageBaseDir + "/" + sectionName + "/"
                + new DecimalFormat("0000").format(iFileCount) + "-"
                + request.getTimeStamp().getTime() / 1000 + "-" 
                               + request.getExpiry()
                + ".rq";
            fileName[1] = sectionName;

            if (need2RemoveSection) {
                //删除最老的存储子目录
                String oldestSectionName = sectionNames.removeFirst();
                this.removeSection(oldestSectionName);
            }

            return fileName;
        }

        public void decrementSectionFileCount(String sectionName) {
            AtomicInteger fileCount = sectionFileCountMap.get(sectionName);
            if (null != fileCount) {
                fileCount.decrementAndGet();
            }
        }

        private boolean removeSection(String sectionName) {
            boolean result = true;
            File dir = new File(storageBaseDir + "/" + sectionName);
            for (File file : dir.listFiles()) {
                result = result && file.delete();
            }
            result = result && dir.delete();
            return result;
        }

        private String getSectionName() {
            String sectionName;

            if (sectionNames.isEmpty()) {
                sectionName = this.makeNewSectionDir();

            } else {
                sectionName = sectionNames.getLast();
            }

            return sectionName;
        }

        private String makeNewSectionDir() {
            String sectionName;
            SimpleDateFormat sdf = new SimpleDateFormat("MMddHHmmss");
            sectionName = sdf.format(new Date());
            File dir = new File(storageBaseDir + "/" + sectionName);
            if (dir.mkdir()) {
                sectionNames.addLast(sectionName);
                sectionFileCountMap.put(sectionName, new AtomicInteger(0));
            } else {
                throw new RuntimeException(
                                 "Cannot create section dir " + sectionName);
            }

            return sectionName;
        }
    }
}

methodRequest的call方法的调用者代码是运行在ThreadPoolExecutor所维护的工作者线程中,这就保证了store方法的调用方和真正的执行方是分别运行在不同的线程中:服务器工作线程负责触发请求消息缓存,ThreadPoolExecutor所维护的工作线程负责将请求消息序列化到磁盘文件中。

DiskbasedRequestPersistence类的store方法中调用的SectionBasedDiskStorage类的apply4Filename方法包含了一些多线程同步控制代码(见清单5)。这部分控制由于是封装在DiskbasedRequestPersistence的内部类中,对于该类之外的代码是不可见的。因此,AsyncRequestPersistence的调用方代码无法知道该细节,这体现了Active
Object模式对并发访问控制的封装。

清单 2. 状态不可变的位置信息模型

public final class Location {
    public final double x;
    public final double y;

    public Location(double x, double y) {
        this.x = x;
        this.y = y;
    }
}

使用状态不可变的位置信息模型时,如果车辆的位置发生变动,则更新车辆的位置信息是通过替换整个表示位置信息的对象(即Location实例)来实现的。如清单3所示。

3.1.1. Stale Data

小结

本篇介绍了Active
Object模式的意图及架构,并以一个实际的案例展示了该模式的代码实现。下篇将对Active
Object模式进行评价,并结合本文案例介绍实际运用Active
Object模式时需要注意的一些事项。

清单 3. 在使用不可变对象的情况下更新车辆的位置信息

public class VehicleTracker {

    private Map<String, Location> locMap 
        = new ConcurrentHashMap();

    public void updateLocation(String vehicleId, Location newLocation) {
        locMap.put(vehicleId, newLocation);
    }

}

因此,所谓状态不可变的对象并非指被建模的现实世界实体的状态不可变,而是我们在建模的时候的一种决策:现实世界实体的状态总是在变化的,但我们可以用状态不可变的对象来对这些实体进行建模。

3.1.2. Non-atomic 64-bit Operations(针对没有声明为volatile的double和long)

Java内存模型要求读取和写入操作都是原子性的,但对于非volitile的long和double变量,jvm通过两个32位操作来完成一个64位的读写。
因此,当在一个多线程程序中有double和long型的共享变量是,对它的读写一定要加volatile或通过锁来同步

Immutable Object模式的架构

Immutable Object模式的主要参与者有以下几种。其类图如图1所示。

图 1. Immutable Object模式的类图

图片 5

  • ImmutableClass:负责存储一组不可变状态的类。该类不对外暴露任何可以修改其状态的方法,其主要方法及职责如下:getStateX,getStateN:这些getter方法返回该类所维护的状态相关变量的值。这些变量在对象实例化时通过其构造器的参数获得值。getStateSnapshot:返回该类维护的一组状态的快照。
  • Manipulator:负责维护ImmutableClass所建模的现实世界实体状态的变更。当相应的现实世界实体状态变更时,该类负责生成新的ImmutableClass的实例,以反映新的状态。changeStateTo:根据新的状态值生成新的ImmutableClass的实例。

不可变对象的使用主要包括以下几种类型:

获取单个状态的值:调用不可变对象的相关getter方法即可实现。

获取一组状态的快照:不可变对象可以提供一个getter方法,该方法需要对其返回值做防御性拷贝或者返回一个只读的对象,以避免其状态对外泄露而被改变。

生成新的不可变对象实例:当被建模对象的状态发生变化的时候,创建新的不可变对象实例来反映这种变化。

Immutable Object模式的典型交互场景如图2所示:

图 2. Immutable Object模式的序列图

图片 6

1~4、客户端代码获取ImmutableClass的各个状态值。

5、客户端代码调用Manipulator的changeStateTo方法来更新应用的状态。

6、Manipulator创建新的ImmutableClass实例以反映应用的新状态。

7~9、客户端代码获取新的ImmutableClass实例的状态快照。

一个严格意义上不可变对象要满足以下所有条件:

1) 类本身使用final修饰:防止其子类改变其定义的行为;

2) 所有字段都是用final修饰的:使用final修饰不仅仅是从语义上说明被修饰字段的引用不可改变。更重要的是这个语义在多线程环境下由JMM(Java
Memory
Model)保证了被修饰字段的所引用对象的初始化安全,即final修饰的字段在其它线程可见时,它必定是初始化完成的。相反,非final修饰的字段由于缺少这种保证,可能导致一个线程“看到”一个字段的时候,它还未被初始化完成,从而可能导致一些不可预料的结果。

3) 在对象的创建过程中,this关键字没有泄露给其它类:防止其它类(如该类的匿名内部类)在对象创建过程中修改其状态。

4)
任何字段,若其引用了其它状态可变的对象(如集合、数组等),则这些字段必须是private修饰的,并且这些字段值不能对外暴露。若有相关方法要返回这些字段值,应该进行防御性拷贝(Defensive
Copy)。

3.1.3. Locking and Visibility

Immutable Object模式实战案例

某彩信网关系统在处理由增值业务提供商(VASP,Value-Added Service
Provider)下发给手机终端用户的彩信消息时,需要根据彩信接收方号码的前缀(如1381234)选择对应的彩信中心(MMSC,Multimedia
Messaging Service
Center),然后转发消息给选中的彩信中心,由其负责对接电信网络将彩信消息下发给手机终端用户。彩信中心相对于彩信网关系统而言,它是一个独立的部件,二者通过网络进行交互。这个选择彩信中心的过程,我们称之为路由(Routing)。而手机号前缀和彩信中心的这种对应关系,被称为路由表。路由表在软件运维过程中可能发生变化。例如,业务扩容带来的新增彩信中心、为某个号码前缀指定新的彩信中心等。虽然路由表在该系统中是由多线程共享的数据,但是这些数据的变化频率并不高。因此,即使是为了保证线程安全,我们也不希望对这些数据的访问进行加锁等并发访问控制,以免产生不必要的开销和问题。这时,Immutable
Object模式就派上用场了。

维护路由表可以被建模为一个不可变对象,如清单4所示。

清单 4. 使用不可变对象维护路由表

public final class MMSCRouter {
    // 用volatile修饰,保证多线程环境下该变量的可见性
    private static volatile MMSCRouter instance = new MMSCRouter();
         //维护手机号码前缀到彩信中心之间的映射关系
    private final Map<String, MMSCInfo> routeMap;

    public MMSCRouter() {
        // 将数据库表中的数据加载到内存,存为Map
        this.routeMap = MMSCRouter.retrieveRouteMapFromDB();
    }

    private static Map<String, MMSCInfo> retrieveRouteMapFromDB() {
        Map<String, MMSCInfo> map = new HashMap<String, MMSCInfo>();
        // 省略其它代码
        return map;
    }

    public static MMSCRouter getInstance() {

        return instance;
    }

    /**
     * 根据手机号码前缀获取对应的彩信中心信息
     * 
     * @param msisdnPrefix
     *          手机号码前缀
     * @return 彩信中心信息
     */
    public MMSCInfo getMMSC(String msisdnPrefix) {
        return routeMap.get(msisdnPrefix);

    }

    /**
     * 将当前MMSCRouter的实例更新为指定的新实例
     * 
     * @param newInstance
     *          新的MMSCRouter实例
     */
    public static void setInstance(MMSCRouter newInstance) {
        instance = newInstance;
    }

    private static Map<String, MMSCInfo> deepCopy(Map<String, MMSCInfo> m) {
        Map<String, MMSCInfo> result = new HashMap<String, MMSCInfo>();
        for (String key : m.keySet()) {
            result.put(key, new MMSCInfo(m.get(key)));
        }
        return result;
    }

    public Map<String, MMSCInfo> getRouteMap() {
        //做防御性拷贝
        return Collections.unmodifiableMap(deepCopy(routeMap));
    }

}

而彩信中心的相关数据,如彩信中心设备编号、URL、支持的最大附件尺寸也被建模为一个不可变对象。如清单5所示。

清单 5. 使用不可变对象表示彩信中心信息

public final class MMSCInfo {
    /**
     * 设备编号
     */
    private final String deviceID;
    /**
     * 彩信中心URL
     */
    private final String url;
    /**
     * 该彩信中心允许的最大附件大小
     */
    private final int maxAttachmentSizeInBytes;

    public MMSCInfo(String deviceID, String url, int maxAttachmentSizeInBytes) {
        this.deviceID = deviceID;
        this.url = url;
        this.maxAttachmentSizeInBytes = maxAttachmentSizeInBytes;
    }

    public MMSCInfo(MMSCInfo prototype) {
        this.deviceID = prototype.deviceID;
        this.url = prototype.url;
        this.maxAttachmentSizeInBytes = prototype.maxAttachmentSizeInBytes;
    }

    public String getDeviceID() {
        return deviceID;
    }

    public String getUrl() {
        return url;
    }

    public int getMaxAttachmentSizeInBytes() {
        return maxAttachmentSizeInBytes;
    }

}

彩信中心信息变更的频率也同样不高。因此,当彩信网关系统通过网络(Socket连接)被通知到这种彩信中心信息本身或者路由表变更时,网关系统会重新生成新的MMSCInfo和MMSCRouter来反映这种变更。如清单6所示。

清单 6. 处理彩信中心、路由表的变更

/**
 * 与运维中心(Operation and Maintenance Center)对接的类
 *
 */
public class OMCAgent extends Thread{

  @Override
  public void run() {
     boolean isTableModificationMsg=false;
     String updatedTableName=null;
      while(true){
        //省略其它代码
        /*
         * 从与OMC连接的Socket中读取消息并进行解析,
         * 解析到数据表更新消息后,重置MMSCRouter实例。
         */
        if(isTableModificationMsg){
            if("MMSCInfo".equals(updatedTableName)){
                MMSCRouter.setInstance(new MMSCRouter());
            }
        }
        //省略其它代码
      }
  }

}

上述代码会调用MMSCRouter的setInstance方法来替换MMSCRouter的实例为新创建的实例。而新创建的MMSCRouter实例通过其构造器会生成多个新的MMSCInfo的实例。

本案例中,MMSCInfo是一个严格意义上的不可变对象。虽然MMSCRouter对外提供了setInstance方法用于改变其静态字段instance的值,但它仍然可视作一个等效的不可变对象。这是因为,setInstance方法仅仅是改变instance变量指向的对象,而instance变量采用Volatile修饰保证了其在多线程之间的内存可见性,这意味着setInstance对instance变量的改变无需加锁也能保证线程安全。而其它代码在调用MMSCRouter的相关方法获取路由信息时也无需加锁。

从图1的类图上看,OMCAgent类(见清单6)是一个Manipulator参与者实例,而MMSCInfo、MMSCRouter是一个ImmutableClass参与者实例。通过使用不可变对象,我们既可以应对路由表、彩信中心这些不是非常频繁的变更,又可以使系统中使用路由表的代码免于并发访问控制的开销和问题。

3.1.4. Volatile Variables

锁能确保可见性和原子性,volitale只能保证可见性

Immutable Object模式的评价与实现考量

不可变对象具有天生的线程安全性,多个线程共享一个不可变对象的时候无需使用额外的并发访问控制,这使得我们可以避免显式锁(Explicit
Lock)等并发访问控制的开销和问题,简化了多线程编程。

Immutable Object模式特别适用于以下场景。

被建模对象的状态变化不频繁:正如本文案例所展示的,这种场景下可以设置一个专门的线程(Manipulator参与者所在的线程)用于在被建模对象状态变化时创建新的不可变对象。而其它线程则只是读取不可变对象的状态。此场景下的一个小技巧是Manipulator对不可变对象的引用采用volatile关键字修饰,既可以避免使用显式锁(如synchronized),又可以保证多线程间的内存可见性。

同时对一组相关的数据进行写操作,因此需要保证原子性:此场景为了保证操作的原子性,通常的做法是使用显式锁。但若采用Immutable
Object模式,将这一组相关的数据“组合”成一个不可变对象,则对这一组数据的操作就可以无需加显式锁也能保证原子性,既简化了编程,又提高了代码运行效率。本文开头所举的车辆位置跟踪的例子正是这种场景。

使用某个对象作为安全的HashMap的Key:我们知道,一个对象作为HashMap的Key被“放入”HashMap之后,若该对象状态变化导致了其Hash
Code的变化,则会导致后面在用同样的对象作为Key去get的时候无法获取关联的值,尽管该HashMap中的确存在以该对象为Key的条目。相反,由于不可变对象的状态不变,因此其Hash
Code也不变。这使得不可变对象非常适于用作HashMap的Key。

Immutable Object模式实现时需要注意以下几个问题:

被建模对象的状态变更比较频繁:此时也不见得不能使用Immutable
Object模式。只是这意味着频繁创建新的不可变对象,因此会增加GC(Garbage
Collection)的负担和CPU消耗,我们需要综合考虑:被建模对象的规模、代码目标运行环境的JVM内存分配情况、系统对吞吐率和响应性的要求。若这几个方面因素综合考虑都能满足要求,那么使用不可变对象建模也未尝不可。

使用等效或者近似的不可变对象:有时创建严格意义上的不可变对象比较难,但是尽量向严格意义上的不可变对象靠拢也有利于发挥不可变对象的好处。

防御性拷贝:如果不可变对象本身包含一些状态需要对外暴露,而相应的字段本身又是可变的(如HashMap),那么在返回这些字段的方法还是需要做防御性拷贝,以避免外部代码修改了其内部状态。正如清单4的代码中的getRouteMap方法所展示的那样。

3.2. Publication and Escape

发布一个对象是指该对象在它当前的上下文范围之外是可用的,如存储一个引用,使其他代码能够访问、通过非私有的方法返回自己、或者是将自己传递给另一个类的方法中(特别注意!!)。

//Publishing an Object
//发布一个对象,通过静态字段引用它
public static Set<Secret> knownSecrets;

public void initialize(){
    knownSecrets = new HashSet<Secret>();
}

//Allowing Internal Mutable State to Escape.Don't do this.
//私有变量通过公共方法逃逸

class UnsafeStates{
    private String[] states = new String[]{"ak","al",...};

    public String[] getStates(){
        return states;
    }
}

发布一个对象,有可能同时会发布它字段锁引用的对象,这些对象通过它的公有变量或共有的get方法发布出来,要小心无意的泄漏。

小心,不要在你的构造方法里面泄漏了你的对象!!!

public class ThisEscape{
    public ThisEscape(EventSource source){
        source.registerListener(new EventListener{
            public void onEvent(Event e){
                doSomething(e);
            }
        });
    }
}

总结

本文介绍了Immutable
Object模式的意图及架构。并结合笔者工作经历提供了一个实际的案例用于展示使用该模式的典型场景,在此基础上对该模式进行了评价并分享在实际运用该模式时需要注意的事项。

3.2.1. Safe Construction Practices

从构造函数中发布一个对象(匿名内部类实现),会导致当前对象也跟着发布,但是当前对象的构造函数还没有返回,即它不一定被正确的构建就已经发布了

Do not allow the this reference to escape during construction.

一个常见错误是在构造函数中起一个线程,这样会提前发布this引用。

//ThisEscape 改进版本
public class SafeListener{
    private final EventListener listener;

    private SafeListener(){
        listener = new EventListener(){
            public void onEvent(Event e){
                doSomething(e);
            }
        };
    }

    public static SafeListener newInstance(EventSource source){
        SafeListener safe = new SafeListener();
        source.registerListener(safe.listener);
        return safe;
    }
}

3.3. Thread Confinement

3.3.1. Ad-hoc Thread Confinement

3.3.2. Stack Confinement

就是本地变量

//本地原是类型和引用类型的线程封闭
public int loadTheArk(Collection<Animal> candidates){
    SortedSet<Animals> animals;
    int numPairs = 0;
    Animal candidate = null;
    animals = new TreeSet<Animal>(new SpeciesGenderComparator());
    animals.addAll(candidates);
    for(Animal a:animals){
        if(candidate == null || !candidate.isPotentialMate(a))
            candidate = a;
        esle{
            ark.load(new AnimalPair(candidate,a));
            ++ numPairs;
            candidates = null;
        }
    }
    return numPairs;
}

3.3.3. ThreadLocal

//using ThreadLocal to ensure thread confinement
private static ThreadLocal<Connection> connectionHolder = new ThreadLocal<Connection>(){
    public Connection initialValue(){
        return DriverManager.getConnection(dburl);
    }
};

public static Connection getConnection(){
    return connectionHolder.get();
}

该技术适用于频繁申请临时对象的操作,即每来一个线程,就分配一个临时对象的情况。但是不要滥用,会增加耦合性。

3.4. Immutability

不可变对象总是线程安全的。

//一个不可变对象
@Immutable
public final class /threeStooges{
    private fianl Set<String> stooges = new HashSet<String>();

    public ThreeStooges(){
        stooges.add("Moe");
        atooges.add("Larry");
        stooges.add("Curly");
    }
    public boolean isStooge(String name){
        return stooges.contain(name);
    }
}

3.4.1 Final Fields

就像对象的字段如果不需要保持对外可见,用private修饰一样,若字段不需要修改,那么也要设置为final

3.4.2 Example: Using Volatile to Publish Immutable Objects

3.5. Safe Publication

3.5.1 Improper Publication:When Good Objects Go Bad

3.5.2 Immutable Objects and initialization Safety

3.5.3 Safe Publication Idioms

要安全发布一个对象,对该对象的引用与对象的状态必须同时对其他线程可见。一个正确构建的对象可以通过如下步骤被安全发布:

  • 使用静态初始化器初始化一个对象引用
  • 将引用存储为volatile或AtomicReference
  • 将引用存为一个正确构建的对象的final字段
  • 将引用存为一个字段,该字段用适当的锁来保护

推荐使用静态初始化器,如:

public static Holder holder = new Holder(12);

这样发布的对象一定是安全的。

3.5.4 Efectively immutable Objects

事实不可变对象(对象在发布后就不需要改变,但是技术上是可变的),在发布之后不会被修改的对象,那么只要保证安全发布就好了,由于后续不需修改,也就不用同步机制来保证可见性。例如,Date就是可变对象,但是假如你不去修改它,那么就不需要锁来保证可见性。

3.5.5 Mutable Objects

对象的发布需求取决于它的可变性:

  • 不可变对象可以通过任何机制发布
  • 事实不可变对象必须安全发布
  • 可变对象必须安全发布,而且必须线程安全或通过锁来保护

3.5.6 Sharing Objects Safely

当你发布一个对象,你要指定该对象应该被怎样访问

在并发程序中,使用和共享对象的最有效的策略是:

  • 线程封闭:该对象为该线程独有,其他线程无法修改
  • 只读共享:其它线程无法修改该对象,也就不会有线程问题,可以用不可变对象或事实不可变对象
  • 线程安全共享:一个线程安全的对象使用内部同步,那么多线程可以自由的获取或修改它而不需要额外的同步机制
  • 监视状态:一个受监视的对象只能通过持有正确锁的线程来访问。
You can leave a response, or trackback from your own site.

Leave a Reply

网站地图xml地图