【Java并发.5】基础构建模块

  本章会介绍一些最有用的并发构建模块,有丶东西(最后一小节,纯干货)。

5.1  同步容器类

  同步容器类包括 Vector 和 Hashtable ,这些类实现线程安全的方式是:将它们的状态封装起来,并对每个公有方法都进行同步,使得每次只有一个线程能访问容器的状态。

5.1.1  同步容器类的问题

  同步容器类都是线程安全的,但在某些情况下可能需要额外的客户端加锁来保护符合操作。容器上常见的复合操作包括:迭代(反复访问元素,直到遍历完容器中所有元素)、跳转(根据指定顺序找到当前元素的下一个元素)以及条件运算,例如“若没有则添加”(检查在Map 中是否存在键值 K,如果没有,就加入二元组(K,V))。在同步线程中,当其他线程并发地修改容器时,它们可能会出现出乎预料之外的行为。

  程序清单 5-1: Vector 上可能导致混乱结果的复合操作

  public static Object getLast(Vector list) {
        int lastIndex = list.size() - 1;   //1:  与 2 同步获得lastIndex 
        return list.get(lastIndex);
    }
    public static void dropLast(Vector list) {
        int lastIndex = list.size() - 1;    //2:  与 1 同步获得lastIndex 
        list.remove(lastIndex);
    }

  这些方法看似没问题,但是当出现同步时,同时调用 list.get(lastIndex)  和 list.remove(lastIndex) ,可能 get(lastIndex)将抛出 ArrayIndexOutIfBoundsException

  由于同步容器要遵守同步策略,支持客户端加锁, 程序清单 5-2: 在使用客户端加锁的 Vector 上的复合操作

  public static Object getLast(Vector list) {
        synchronized (list) {
            int lastIndex = list.size() - 1; 
            return list.get(lastIndex);
        }
    }
    public static void dropLast(Vector list) {
        synchronized (list) {
            int lastIndex = list.size() - 1;   
            list.remove(lastIndex);
        }
    }

  在调用 size() 和相应的 get() 之间,Vector 的长度可能会发生变化,这种风险在对 Vector 中元素进行迭代时仍然会出现,如程序清单 5-3:可能抛出 ArrayIndexOutIfBoundsException 的迭代操作

    for (int i =  0; i < vector.size(); i++) {
            doSometing(vector.get(i));
       }

  对此,我们可能需要牺牲一些伸缩性,通过在迭代期间持有 Vector 的锁,可以防止其他线程在迭代期间修改 Vector

    synchronized (vector) {
            for (int i = 0; i < vector.size(); i++) {
                doSometing(vector.get(i));
            }
        }

  

5.1.2  迭代器与 ConcurrentModificationException

  设计同步容器类的迭代器时并没有考虑到并发修改的问题,并且它们表现出的行为是“及时失败”(fail-fast)的。这意味着,当它们发现容器在迭代过程中被修改时,就会抛出一个 ConcurrentModificationException 异常。

  想要避免出现 ConcurrentModificationException 异常,就必须在迭代过程持有容器的锁。  程序清单 5-5: 通过 Iterator 来迭代 List

    List<Widget> widgetList = Collections.synchronizedList(new ArrayList<Widget>);
        ......
        //可能抛出 ConcurrentModificationException
        for (Widget w: widgetList) {
            doSomething(w);
        }

  然而,开发人员并不希望在迭代期间对容器加锁,如果容器规模很大,或者在每个元素上执行操作的时间很长,那么这些线程将长时间等待。

  如果不希望在迭代期间对容器加锁,那么一种替代方法就是“克隆”容器,并在副本上进行迭代。(类似于 ThreadLocal)

5.1.3  隐藏迭代器

  虽然加锁可以防止迭代器抛出异常,但你必须要记住在所有对共享容器进行迭代的地方都需要加锁。然而实际情况要更加复杂,在某些情况下,迭代器会隐藏起来。如程序清单 5-6: 隐藏在字符串连接中的迭代操作

public class HiddenIterator {
    private final Set<Integer> set = new HashSet<>();
    public synchronized void add(Integer i) {
        set.add(i);
    }
    public synchronized void remove(Integer i) {
        set.remove(i);
    }
    public void addTenThings() {
        Random r = new Random();
        for (int i = 0; i < 10; i++) {
            add(r.nextInt());
        }
        System.out.println("Debug: added ten element to " + set);  // set.toString() 会对容器进行迭代
    }
}

  这里得到的教训是,如果状态与保护它的同步代码之间相隔越远,那么开发人员就越容易忘记在访问状态使用正确的同步。

正如封装对象的状态有助于维持不变性条件一样,封装对象的同步机制童谣有助于确保实施同步策略

  容器的 hashCodeequals 等方法也会间接地执行迭代操作,当容器作为另一个容器的元素或键值时,就会出现这种情况。同样 containsAll、removeAll、retainAll 等方法都会对容器进行迭代。

  

5.2  并发容器

  同步容器将所有对容器状态的访问都串行化,以实现它们的线程安全性。但这种方法的代价是严重降低并发性。在 Java5.0 中增加了 ConcurrentHashMap,用来替代同步且基于散列的 Map,以及 CopyOnWriteArrayList,用于在遍历操作为主要操作的情况下代替同步的 List 。在新的 ConcurrentMap 接口中增加了对一些常见复合操作的支持,例如“若没有则添加”、替换有条件删除等。

通过并发容器来代替同步容器,可以极大地提高伸缩性并降低风险。

  在 Java5.0 增加了两种新的容器类型: QueueBlockingQueueQueue 用来临时保存一组待处理的元素。它提供了几种实现,包括:ConcurrentLinkedQueue,这是一个传统的先进先出队列,以及 PriorityQueue,这是一个(非并发的)优先队列。Queue 上的操作不会阻塞,如果队列为空,那么获取元素的操作将返回空值。虽然可以用 List 来模拟 Queue 的行为(事实上,正是通过 LinkedList 来实现 Queue的),但还需要一个 Queue 类,因为它能去掉 List 的随机访问需求,从而实现更高效的并发。

  BlockingQueue 扩展了 Queue,增加了可阻塞的插入和获取等操作。如果队列为空,那么获取元素的操作将一直阻塞,直到队列中出现一个可用的元素。如果队列已满,那么插入元素的操作将一直阻塞,知道队列中出现可用的空间。在“生产者--消费者”这种设计模式中,阻塞队列是非常有用的。

5.2.1  ConcurrentHashMap

  同步容器类在执行每个操作期间都持有一个锁。例如 HashMap.getList.contains,可能包含大量的工作:当遍历散列桶或链表来查找某个特定的对象时,必须在许多元素上调用 equals(而 equals 本身还包含了一定的计算量)。

  与 HashMap 一样,ConcurrentHashMap 也是一个基于散列的 Map,但它使用了一种完全不同的加锁策略来提供更高的并发性和伸缩性。ConcurrentHashMap 并不是将每个方法都在同一个锁上同步并使得每次只能有一个线程访问容器,而是使用一种粒度更细的加锁机制来实现更大程度的共享,这种机制称为分段锁

  ConcurrentHashMap 返回的迭代器具有弱一致性(Weakly Consistent),而并非“及时失败”。弱一致性的迭代器可以容忍并发的修改,当创建迭代器时会遍历已有的元素,并可以(但是不保证)在迭代器被构造后将修改操作反映给容器。

5.2.2  额外的原子 Map 操作

  由于 ConcurrentHashMap 不能被加锁来执行独占访问,因此我们无法使用客户端加锁来创建新的原子操作。但是,一些常见的复合操作,例如“若没有则添加”、“若相等则移除(Remove-If-Equal)”、“若相等则替换(Replace-If-Equla)”等,都已经实现为原子操作并且在 ConcurrentMap 的接口中声明。

5.2.3  CopyOnWriteArrayList

  CopyOnWriteArrayList 用于替代同步 List,在某些情况下它提供了更好的并发性能,并且在迭代期间不需要对容器进行加锁或复制。

  “写入时复制(Copy-On-Write)” 容器的线程安全性在于,只要正确地发布一个事实不可变的对象,那么在访问该对象时就不再需要进一步的同步。在每次修改时,都会创建并重新发布一个新的容器副本,从而实现可变性。“写入时复制”容器的迭代器保留一个指向底层基础数组的引用,这个数组当前位于迭代器的起始位置,由于它不会被修改,因此在对其进行同步时只需确保数组内容的可见性。

  显然,每当修改容器时都会复制底层数组,这需要一定的开销,特别是当容器的规模较大时。

  

5.3  阻塞队列 和 生产者--消费者

  阻塞队列提供了可阻塞的 put 和 take 方法,以及支持定时的 offer 和 poll 方法。如果队列已经满了,那么 put 方法将阻塞直到有空间可用;如果队列为空,那么 take 方法将会阻塞直到有元素可用。队列可以是有界的也可以是*的,*队列永远都不会充满,因此*队列上的 put 方法永远不会阻塞。

  阻塞队列支持 生产者--消费者 这种设计模式。BlockingQueue 简化了生产者--消费者的实现过程。阻塞队列提供一个 offer 方法,如果数据项不能被添加到队列中,那么将返回一个失败状态。这样你就能够创建更多灵活的策略来处理符合过载的情况,例如减轻负载,将多余的工作项序列化并写入磁盘,减少生产者线程的数量,或者通过某种方式来一致生产者线程。

在构建高可靠的应用程序时,有界队列是一种强大的资源管理工具:它们能抑制并防止产生过多的工作项,使应用程序在负荷过载的情况下变得更加健壮。

5.3.1  示例:桌面搜索

  有一种类型的程序适合被分解为生产者和消费者,例如代理程序,它将扫描本地驱动器上的文件并建立索引以便随后进行搜索,类似于某些桌面搜索程序。

  程序清单 5-8:桌面搜索应用程序中的生产者任务和消费者任务

public class FileCrawler implements Runnable {
    private final BlockingQueue<File> filequeue;
    private final FileFilter fileFilter;
    private final File root;

    public FileCrawler(BlockingQueue<File> filequeue, FileFilter fileFilter, File root) {
        this.filequeue = filequeue;
        this.fileFilter = fileFilter;
        this.root = root;
    }

    @Override
    public void run() {
            try {
                crawl(root);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    }

    private void crawl(File root) throws InterruptedException {
        File[] entries = root.listFiles(fileFilter);
        if (entries != null) {
            for (File entry : entries) {
                if (entry.isDirectory()) {
                    crawl(entry);
                } else if (!alreadyIndexed(entry)) {
                    filequeue.put(entry);  //
                }
            }
        }
    }
}

public class Indexer implements Runnable {
    private final BlockingQueue<File> queue;
    public Indexer(BlockingQueue<File> queue) {
        this.queue = queue;
    }
    @Override
    public void run() {
        try {
            while (true) {
                indexFile(queue.take());
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

  程序清单 5-9:启动桌面搜索

public static void startIndexing(File[] roots) {
        BlockingQueue<File> queue = new LinkedBlockingDeque<>(BOUND);
        FileFilter filter = new FileFilter() {
            @Override
            public boolean accept(File pathname) {
                return true;
            }
        };
        for (File root : roots) {
            new Thread(new FileCrawler(queue, filter, root)).start();
        }
        for (int i = 0; i < N_CONSUMERS; i++) {
            new Thread(new Indexer(queue)).start();
        }
    }

5.3.3  双端队列与工作密取

  Java6.0 增加了两种容器类型,Deque(发音为‘deck’)和 BlockingDuque,它们分别对 QueueBlockingQueue 进行了扩展。Deque 是一个双端队列,实现了在队列头和队列尾的高效插入和移除

  正如阻塞队列适用于生产者--消费者模式,双端队列同样适用于另一种相关模式,即工作密取(Work Stealing)。在生产者--消费者设计中,所有消费者有一个共享的工作队列,而在工作密取设计中,每个消费者都有各自的双端队列。如果一个消费者完成了自己双端队列中的全部工作,那么它可以从其他消费者双端队列末尾秘密地获取工作。

5.4  阻塞方法与中断方法

  线程可能阻塞或暂停执行,原因有多种:等待 I/O 操作结束,等待获得一个锁,等待从 Thread.sleep 方法中醒来,或是等待另一个线程的计算结果。当线程阻塞时,它通常被挂起,并处于阻塞状态。 BlockingQueueputtake 方法则会抛出受检查异常 InterruptedException。

  Thread 提供了 interrupt 方法,用于中断线程或者查询线程是否已经中断。当在代码中调用了一个将抛出 InterruptedException 异常的方法时,你自己的方法也就变成了一个阻塞方法,并且需要处理对中断的响应。对此,有两种基本选择:

  • 传递 InterruptedException:避开这个异常通常是最明确的策略----只需要把 InterruptedException 传递给方法的调用者。传递 InterruptedException 的方法包括,根本不捕获该异常,或者捕获该异常,然后在执行某种简单的清理工作后再次抛出这个异常。
  • 恢复中断。有时候不能抛出 InterruptedException。例如当代码是 Runable 的一部分时。在这些情况下,必须捕获 InterruptedException,并通过调用当前线程上的 interupt 方法恢复中断状态,这样在调用栈中更高层的代码将看到引发一个中断。如代码清代 5-10:恢复中断状态以避免屏蔽中断
        public void run() {
                try {
                    doSomething();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt(); // 恢复中断状态
                }
        }

5.5  同步工具类

  在容器类中,阻塞队列是一种独特的类:它们不仅能作为保存对象的容器,还能协调生产者和消费者等线程之间的控制流,因为 take 和 put 等方法将阻塞,知道队列达到期望的状态(队列既非空,也非满)。

  阻塞度咧可以作为同步工具类,其他类型的同步工具类还包括 信号量 Semaphore、栅栏 Barrier、闭锁 Latch

5.5.1  闭锁

  闭锁是一种同步工具类,可以延迟线程的进度知道其到达终止状态。闭锁的作用相当于一扇门:在闭锁到达结束状态之前,这扇门一直是关闭的,并且没有任何线程能通过,当到达结束状态时,这扇门会打开并允许所有的线程通过。当闭锁到达结束状态后,将不会再改变状态,因此这扇门将永远保持打开状态

  闭锁包括一个计数器,该计数器初始化为一个正数,表示需要等待的事件数量。 countDown 方法递减计数器,表示有一个事件已经发生,而 await 方法等待计数器达到零,这表示所有需要等待的事件都已经发生。

  在下程序清单 5-11 中 TestHarness中给出了闭锁的两种常见用法。TestHarness 创建一定数量的线程,利用它们并发地执行指定任务。它使用两个闭锁,分别表示“起始门(Starting Gate)”和“结束门(Ending Gate)”。起始门计数器的初始值为1,而结束们计数器的初始值为工作线程数量。每个工作线程首先要做的就是在启动门上等待,从而确保所有线程都就绪后才开始执行。而每个线程要做的最后一件事情时将调用结束门的 countDown 方法,这样使主线程高效地等待知道所有工作线程都执行完成,因此统计所消耗的时间。

  程序清单5-11 :在计时测试中使用 CountDownLatch 来启动和停止线程

public class TestHarness {
    public long teimeTasks(int nThreads, final Runnable task) throws InterruptedException{
        final CountDownLatch startGate = new CountDownLatch(1);
        final CountDownLatch endGate = new CountDownLatch(nThreads);
        for (int i = 0; i < nThreads; i++) {
            Thread t = new Thread() {
                public void run() {
                    try {
                        startGate.await();
                        try {
                            task.run();
                        } finally {
                            endGate.countDown();
                        }
                    } catch (InterruptedException e) {
                        // do not 
                    }
                }
            };
            t.start();
        }
        long startTime = System.nanoTime();
        startGate.countDown();  //开启
        endGate.await();        //等待所有线程结束
        long endTime = System.nanoTime();
        return endTime - startTime;
    }
}

5.5.2   FutureTask

  FutureTask 也可以用来做闭锁。FutureTask表示的计算时通过 Callable 来实现的,相当于一种可生成结果的 Runnable,并且可以处于以下三种状态:等待运行,正在运行和运行完成。“执行完成”表示计算的所有可能结束方式,包括正常结束、由于取消而结束和由于异常而结束等。

  FutureTask.get 的行为取决于任务的状态。如果任务已经完成,那么 get 会立即返回结果,否则 get 将阻塞直到任务进入完成状态,然后返回结果或抛出异常。

  FutureTask 在 Executor 框架中表示异步任务, 可以用来做一些时间较长的计算,可以在使用计算结果之前提前启动。 程序清单 5-12:使用FutureTask 来提前加载稍后需要的数据

public class PreLoader {
    private final FutureTask<ProductInfo> futureTask = new FutureTask<ProductInfo>(new Callable<ProductInfo>() {
        @Override
        public ProductInfo call() throws Exception {
            return loadProductInfo();
        }
    });
    private final Thread thread = new Thread(futureTask);
    public void start() {
        thread.start();
    }
    public ProductInfo get() throws InterruptedException{
        try {
            return futureTask.get();   // 记得调用 start() 不然会一直在这里等候
        } catch (ExecutionException e) {
            Throwable cause = e.getCause();
            if (cause instanceof DataLoadException) throw (DataLoadException)cause;
            else ;
        }
        return null;
    }
}

   当程序需要 ProductInfo 时,可以调用 get 方法,如果数据已经加载,那么将返回这些数据,否则将等待加载完成后再返回。

5.5.3  信号量

  计数信号量(Counting Semaphore)用来控制同时访问某个特定资源的操作数量,或者同时执行某个指定操作的数量。计数信号量可以用来实现某种资源池,或者对容器施加边界。

  Semaphore 中管理者一组虚拟的许可(Permit),许可的初始数量可以通过构造函数来指定。在执行操作时可以首先获得许可(只要还有剩余的许可),并在使用以后释放许可。如果没有许可,那么 acquire 将阻塞直到有许可(或者直到被中断或者操作超时)。release 方法将返回一个许可给信号量。计算信号量的一种简化形式是二值信号量,即初始值为 1 的 Semaphore 。二值信号量可以用户互斥体(mutex),并具备不可重入的加锁语义:谁拥有这个唯一的许可,谁就拥有了互斥锁。

  如果将 Semaphore 的计数值初始化为池的大小,并在从池中获取一个资源之前首先调用 acquire 方法获取一个许可,在将资源返回给池之后调用 release 释放许可,那么 acquire 将一直阻塞直到资源池不为空。

  程序清单 5-14 使用 Semaphore 为容器设置边界 

public class BoundedHashSet<A> {
    private final Set<A> set;
    private final Semaphore sem;

    public BoundedHashSet(int bound) {
        this.set = Collections.synchronizedSet(new HashSet<>());
        this.sem = new Semaphore(bound);
    }
    public boolean add(A a) throws InterruptedException {
        sem.acquire();
        boolean wasAdded = false;
        try {
            wasAdded = set.add(a);
            return wasAdded;
        } finally {
            if (!wasAdded) sem.release();
        }
    }
    public boolean remove(Object o) {
        boolean wasRemoved = set.remove(o);
        if (wasRemoved) sem.release();
        return wasRemoved;
    }
}

5.5.4  栅栏

  栅栏 Barrier 类似于闭锁,它能阻塞一组线程直到某个事件发生。栅栏与闭锁的关键区别在于,所有线程必须同时到达栅栏位置,才能继续执行。闭锁用于等待事件,而栅栏用于等待其他线程

  CyclicBarrier 可以使一定数量的参与方反复地在栅栏位置汇集,它在并行迭代算法中非常有用:这种算法通常将一个问题拆分成一系列相互独立的子问题。

  当线程到达栅栏位置时将调用 await 方法,这个方法将阻塞直到所有线程都到达栅栏位置。如果所有线程都到达了栅栏位置,那么栅栏将打开,此时所有线程都被释放,而栅栏将被重置以便下次使用。

  在程序清单 5-15 的 CellularAutomata 中给出了如何通过栅栏来计算细胞的自动化模拟,例如 Conway 的生命游戏。在把模拟过程并行化,为每个元素(在这个实例中相当于一个细胞)分配一个独立的线程是不现实的,因为这将会产生过多的线程,而在协调这些线程上导致德开销将降低计算性能。合理的做法是:将问题分解成一定数量的子问题,为每个子问题分配一个线程来进行求解,之后再将所有的结果合并起来。

  程序清单 5-15:通过 CyclicBarrier 协调细胞自动衍生系统中的计算

public class CellularAutomata {
    private final Board mainBoard;
    private final CyclicBarrier barrier;
    private final Worker[] workers;

    public CellularAutomata(Board board) {
        this.mainBoard = board;
        int count = Runtime.getRuntime().availableProcessors();
        this.barrier = new CyclicBarrier(count, new Runnable() {  //
            @Override
            public void run() {
                mainBoard.commitNewValues();
            }
        });
        this.workers = new Worker[count];
        for (int i = 0; i < count; i++) {
            workers[i] = new Worker(mainBoard.getSubBoard(count, i));
        }
    }

    public void start() {
        for (int i = 0; i < workers.length; i++) {
            new Thread(workers[i]).start();
        }
        mainBoard.waitForConvergence();
    }

    private class Worker implements Runnable {
        private final Board board;
        public Worker(Board board) {
            this.board = board;
        }
        public void run() {
            while (!board.hasConverged()) {
                for (int x = 0; x < board.getMaxX(); x++) {
                    for (int y = 0; y < board.getMaxY; y++) {
                        board.setNewValue(x, y, computeValue(x, y));
                    }
                }
                try {
                    barrier.await();  //
                } catch (InterruptedException e) {
                    return;
                } catch (BrokenBarrierException e) {
                    return;
                }
            }
        }
    }
}

5.6  构建高效且可伸缩的结果缓存

  几乎所有的服务器应用程序都会使用某种形式的缓存。重用之前的计算结果能降低延迟,提高吞吐量,但却需要消耗更多的内存。本节我们将开发一个高效且可伸缩的缓存,用于改进一个高计算开销的函数。

  程序清单 5-16 :使用 HashMap 和同步机制来初始化缓存

public interface Computable<A, V> {          【皱眉】
    V compute(A arg) throws InterruptedException;
}

public class ExpensiveFunction implements Computable<String, BigInteger>{
    @Override
    public BigInteger compute(String arg) throws InterruptedException {
        //经过长时间计算后
        return new BigInteger(arg);
    }
}

public class Memoizer1<A, V> implements Computable<A, V> {
    private final Map<A, V> cache = new HashMap<>(); //使用HashMap 存储
    private final Computable<A, V> c;
    public Memoizer1(Computable<A, V> c) {
        this.c = c;
    }
    @Override
    public synchronized V compute(A arg) throws InterruptedException {
        V result = cache.get(arg);  
        if (result == null) {   // 单例容器,最好有长时间自动销毁功能
            result = c.compute(arg);
            cache.put(arg, result);
        }
        return result;
    }
}

  在程序清单 5-16 中 Memoizer1 给出一种尝试:使用 HashMap 来保存计算结果,compute 检查是否有缓存,有则返回,没有则计算后保存缓存再返回。

  但是 HashMap 不是线程安全的,因此要确保两个线程同时访问 HashMap,Memoizer1 给出一种保守的方法,即对整个 compute 方法进行同步,可见这有明显的伸缩性问题。

  下面的 程序清单 5-17 中 Memoizer2 用 ConcurrentHashMap 来代替 HashMap 来改进 Memoizer1 中的并发行为。Memoizer2 比 Memoizer1 有着更好的并发行为:多线程可以并发地使用它。但它作为缓存仍然存在一些不足----当两个线程同时调用 compute 时存在一个漏洞,可能会导致计算得到相同值。

  程序清单 5-17 :用 ConcurrentHashMap 替换 HashMap

public class Memoizer2<A, V> implements Computable<A, V> {
    private final Map<A, V> cache = new ConcurrentHashMap<>(); //使用HashMap 存储
    private final Computable<A, V> c;
    public Memoizer2(Computable<A, V> c) {
        this.c = c;
    }
    @Override
    public V compute(A arg) throws InterruptedException {  //去掉同步
        V result = cache.get(arg);
        if (result == null) {   
            result = c.compute(arg); //假设这出现并发情况,很可能会进行两次计算,不符合要求
            cache.put(arg, result);
        }
        return result;
    }
}

  Memoizer2 的问题在于,如果某个线程启动了一个开销很大的计算,而其他线程并不知道这个计算正在进行,那么很可能会重复计算。我们希望实现 “线程A在计算 f(21)”,另一个线程查找 f(21) 时,它能够知道  f(21) 正在进行,并且等线程 A 计算结束后,再去查询缓存  f(21) 的结果。

  我们已经知道有个类能基本实现这个功能:FutureTask。FutureTask 表示一个计算的过程,这个过程可能已经计算完成,也可能正在进行。如果有结果可用,那么 FutureTask.get 将立即返回结果,否则它会一直阻塞,直到结果计算出来并返回。

  程序清单 5-18 中 Memoizer3 将用于缓存值的 Map 重新定义为 ConcurrentHashMap<A, Future<V>>,来替换原来的 ConcurrentHashMap<A, V> 。

  程序清单 5-18:基于 FutureTask 的 Memoizing 封装器

public class Memoizer3<A, V> implements Computable<A, V> {
    private final Map<A, Future<V>> cache = new ConcurrentHashMap<A, Future<V>>();
    private final Computable<A, V> c;
    public Memoizer3(Computable<A, V> c) {
        this.c = c;
    }
    @Override
    public V compute(A arg) throws InterruptedException {
        Future<V> result = cache.get(arg);
        if (result == null) {
            FutureTask<V> ft = new FutureTask<V>(new Callable<V>() {
                @Override
                public V call() throws Exception {
                    return c.compute(arg);
                }
            });
            result = ft;
            cache.put(arg, ft); //还是可能出现  并发问题
            ft.run();
        }
        try {
            return result.get();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        return null;
    }
}

  Memoizer3 的实现几乎是完美的:它表现出了非常好的并发性(基本上是源于 ConcurrentHashMap 高效的并发性)。但它还有一个小缺陷,即仍然存在两个线程计算出相同值的漏洞。由于 compute 方法中的 if 代码块仍然是非原子的“先检查再执行”操作。因此两个线程仍然可能在同一时间调用 compute 来计算相同的值,即二者没有在缓存中找到期望的值,因此都开始计算。

  Memoizer3 中存在这个问题的原因是,复合操作(“若没有则添加”)是在底层的Map 对象上执行的,而这个对象无法通过加锁来确保原子性。程序清单 5-19 中的 Memoizer 使用了 ConcurrentMap 中的原子方法 putIfAbsent,避免了Memoizer3  中的漏洞。

  程序清单 5-19 Memoizer的最终实现

public class Memoizer<A, V> implements Computable<A, V> {
    private final Map<A, Future<V>> cache = new ConcurrentHashMap<A, Future<V>>();
    private final Computable<A, V> c;
    public Memoizer(Computable<A, V> c) {
        this.c = c;
    }
    @Override
    public V compute(A arg) throws InterruptedException {
        Future<V> result = cache.get(arg);
        if (result == null) {
            FutureTask<V> ft = new FutureTask<V>(new Callable<V>() {
                @Override
                public V call() throws Exception {
                    return c.compute(arg);
                }
            });
            result = cache.putIfAbsent(arg, ft);
            if (result == null) {
                result = ft;
                ft.run();
            }
        }
        try {
            return result.get();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        return null;
    }
}

  在完成并发缓存的实现后,就可以为在实际中使用了。程序清单 5-20 :在因式分解 servlet 中使用 Memoizer 来缓存结果

public class Factorizer implements Servlet {
    private final Computable<BigInteger, BigInteger[]> c = new Computable<BigInteger, BigInteger[]>() {
        @Override
        public BigInteger[] compute(BigInteger arg) throws InterruptedException {
            return factor(arg);
        }
    };
    private final Computable<BigInteger, BigInteger[]> cache = new Memoizer<>(c);
    public void service(ServletRequest req, ServletResponse resp) {
        try {
            BigInteger i = extractFromRequest(req);
            encodeInroResponse(resp, cache.compute(i)); //使用自己写的缓存工具
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}