Java多线程(4)——线程协作

挖掘可并发点

使用分治(分而治之)的思想发掘可并发点。
将只能串行的部分与可并发化的部分区分开,再使用工作者线程并发执行。

基于数据的分割实现并发化

程序的原始输入数据规模比较大,比如大文件的下载,从几百万条日志记录中统计信息等。
分割的结果是产生多个 同质 工作者线程。

细节:

  • 工作者线程数量的设置
  • 工作者线程的异常处理
  • 原始输入规模未知
  • 程序的复杂性

基于任务的分割实现并发化

将人物按照一定规则分解成若干子任务,并使用专门的工作者线程去执行子任务。
分割的结果是产生多个 异质 工作者线程。

  • 按任务的资源消耗属性(CPU/IO)
  • 按处理步骤

细节:

  • 程序的复杂性
  • 额外的处理器时间消耗
  • 未必比单线程程序快
  • 考虑从单线程程序向多线程程序进化

合理设置线程数

Amdahl's 定律:

[S_{max} = frac{1}{(P + frac{1-P}{N})} = frac{1}{P} ]

(N):处理器数量
(P):无法串行的耗时占比
(S_{max}):最大提速比

  • 对于 CPU 密集型线程,考虑到这类线程执行任务时消耗的主要是处理器资源, 我们可以将这类线程的线程数设置为(N_{CPU})个。因为 CPU 密集型线程也可能由于某些原因(比如缺页中断/Page Fault)而被切出,为了避免处理器资源浪费,我可以为这类线程设置一个额外的线程,即将线程数设置为(N_{CPU} + 1)
  • 对于 IO 密集型线程,考虑到 IO 操作可能导致上下文切换,设置过多的线程数会导致过多的额外系统开销。因此如果一个这样的工作者线程就足以满足我们的要求,那么就不要设置更多的线程数。如果一个工作者线程仍然不够用,那么我们可以考虑将这类线程的数量设置为 (2xN_{CPU})。这是因为 IO 密集型线程在等待 IO 操作返回结果时是不占用处理器资源的, 因此我们可以为每个处理器安排一个额外的线程以提高处理器资源的利用率。

[N_{threads}=N_{CPU} * U_{CPUT} * (1 + frac{WT}{ST}) ]

(U_{CPU}):cpu 使用率
(WT):程序花费在等待上的时长
(ST):程序实际占用处理器计算的时长

wait/notify

等待(wait):一个线程因其执行目标动作所需的保护条件未满足而被暂停的过程。
通知(notify):一个线程更新了系统的状态,满足了其他线程所需的保护条件,唤醒那些暂停的线程的过程。

synchronized (obj) {
	while (保护条件不成立) {
		obj.wait();
	}
	// 执行任务
}

一个线程只有在持有一个对象的内部锁的情况下才能够调用该对象的wait方法,因此wait方法总是放在相应对象所引导的临界区中。

someObject为 Java 中任意一个类的实例,因执行someObject.wait()而被暂停的线程就称为对象someObject上的等待线程。

someObject. wait()可以被多个线程执行,即存在多个等待线程。someObject上的等待线程可以通过其他线程执行someObject.notify()来唤醒。

someObject.wait()会以原子操作的方式使当前线程暂停并使该线程释放其持有的someObject对应的内部锁。当前线程被暂停的时候其对 someObject.wait()的调用并未返回。其他线程在该线程所 需的保护条件成立的时候执行相应的notify方法,即someObject.notify()可以唤醒someObject上的等待线程。被唤醒的等待线程在其占用处理器继续运行的时候,需要再次申请someObject对应的内部锁。被唤醒的线程在其再次持有内部锁的情况下继续执行someObject.wait()中剩余的指令,直到wait方法返回。

  • 等待线程对保护条件的判断,wait调用应放在相应对象所引导的临界区中的一个循环语句中。
  • 等待线程对保护条件的判断,wait的执行,目标动作的执行应放在同一个对象(内部锁)所引导的临界区中。
  • wait暂停当前线程时释放的锁只是与该wait方法所属对象的内部锁,当前线程所持有的其他内部锁,显示锁并不会被释放。
synchronized (obj) {
	// 更新共享变量
	updateSharedState();
	// 唤醒其他线程
	obj.notify();
}

notify唤醒相应对象上的任意一个等待线程。
notifyAll唤醒相应对象上的所有等待线程。

notify方法应尽可能放在靠近临界区结束的地方。等待线程被唤醒后占用处理器继续运行时,如果其他线程持有内部锁,那么这个等待线程可能又被暂停。

内部实现

Java 虚拟机会为每个对象维护一个入口集(Entry Set)用于存储申请该对象内部锁的线程。此外,Java 虚拟机还会为每个对象维护一个名为等待集(Wait Set)的队列,该队列用于存储该对象上的等待线程。
Object.wait()将当前线程暂停并释放相应内部锁的同时会将当前线程(的引用)存入该方法所属对象的等待集中。执行一个对象的notify方法会使该对象的等待集中的一个任意线程被唤醒。被唤醒的线程仍然会停留在相应对象的等待集之中,直到该线程再次持有相应内部锁的时候Object.wait()会使当前线程从其所在的等待集移除,接着Object.wait()调用返回。

问题

  • 过早唤醒:等待线程在其所需的保护条件并未成立的情况下被唤醒的现象。如:某线程更新了共享变量,使得保护条件1得以成立,此时为了唤醒使用该保护条件的所有等待线程,该线程执行了notifyAll。然而,使用保护条件2的线程被唤醒后仍然需要继续等待,浪费资源。
  • 信号丢失:等待线程错过了唤醒信号的现象。如:等待线程在执行wait前未判断保护条件是否已经成立,或使用notify而非notifyAll
  • 欺骗性唤醒:没有执行notify/notifyAll的情况下被唤醒。
  • 上下文切换:wait/notify的使用可能导致较多的上下文切换。使用notify而不是notifyAll,并在执行后尽快释放相应的内部锁。

notify/notifyAll

notify可能导致信号丢失造成正确性问题,notifyAll效率不高,但保证正确。因此,常用做法是优先使用notifyAll,满足以下所有条件时才使用notify

  1. 一次通知至多唤醒一个线程。
  2. 相应对象的等待集中仅包含同质等待线程(保护条件相同 wait调用返回后处理逻辑一致)。

join

使当前线程等待目标线程结束后才能继续进行。

public final void join() throws InterruptedException {
	join(0);
}

// 目标线程终止或 mills 后当前线程继续进行
public final synchronized void join(long millis) throws InterruptedException {
	long base = System.currentTimeMillis();
	long now = 0;

	if (millis < 0) {
		throw new IllegalArgumentException("timeout value is negative");
	}

	if (millis == 0) {
		while (isAlive()) {
			wait(0);
		}
	} else {
		while (isAlive()) {
			long delay = millis - now;
			if (delay <= 0) {
				break;
			}
			wait(delay);
			now = System.currentTimeMillis() - base;
		}
	}
}

Condition

wait/notify过于底层,JDK 1.5 后引入了java.util.concurent.locks.Condition接口。
Condition可作为wait/notify的替代品实现等待通知,为解决过早唤醒问题提供了支持,并解决wait(long)不区分等待是否超时的问题。

public interface Condition {

    void await() throws InterruptedException;

    void awaitUninterruptibly();

    long awaitNanos(long nanosTimeout) throws InterruptedException;

    // 返回 true 表示未达最后期限
    boolean awaitUntil(Date deadline) throws InterruptedException;

    void signal();

    void signalAll();
}

Condition实例也被称为 条件变量(condition variable) 或者 条件队列(condition queue)

每个Condition实例内部都维护了一个用于存储等待线程的队列。
线程使用Condition实例的await方法会导致其暂停,并被存入该实例的等待队列中。
该实例的signal方法会唤醒对应等待队列中的任一线程,signalAll会唤醒对应等待队列中的所有线程。

private final Lock lock = new ReentrantLock();
private final Condition condition = lock.newCondition();

public void guaredMethod() throws InterruptedException {
	lock.lock();
	try {
		while (保护条件不成立) {
			condition.await();
		}
		// 执行
	} finally {
		lock.unlock();
	}
}

public void notificationMethod () {
	lock.lock();
	try {
		// 更新共享变量
		condition.signal();
	} finally {
		lock.unlock();
	}
}

CountDownLatch

java.util.concurrent.CountDownLatch可以用来实现一个或多个线程等待其他线程完成一组特定操作(先决条件)后才继续运行。

CountDownLatch内部会维护一个用于表示未完成的先决操作数量的计数器。countDown()每被执行一次,计数器值减一。
CountDownLatch.await()相当于一个受保护方法,保护条件为计数器值为0。
CountDownLatch.countDown()相当于一个通知方法,计数器值为0时唤醒相应实例上的所有等待线程。

public class CountDownLatch {
    
    private static final class Sync extends AbstractQueuedSynchronizer {
        // ...
    }

    private final Sync sync;

    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    public boolean await(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }

    public void countDown() {
        sync.releaseShared(1);
    }

    public long getCount() {
        return sync.getCount();
    }
}

一个CountDownLatch实例只能实现一次等待和唤醒。

如果CountDownLatch内部的计数器由于程序错误而永远无法到达0,那么等待线程会一直处于 WAITING 状态。

CyclicBarrier

java.util.concurrent.CyclicBarrier实现多个线程相互等待执行到某个地方,再同时执行的情况。
Cyclic 意味实例可以重复使用。

使用CyclicBarrier实现等待的线程被成为参与方(party)。参与方执行CyclicBarrier.await()实现等待。最后一个执行await的参与方会唤醒其他所有参与方。最后一个线程自身不会被暂停。

public class CyclicBarrier {

	private static class Generation {
        boolean broken = false;
    }

    private final ReentrantLock lock = new ReentrantLock();
   
    private final Condition trip = lock.newCondition();

    private final int parties;

    private final Runnable barrierCommand;

    private Generation generation = new Generation();

    private int count;

    private void nextGeneration() {
        // signal completion of last generation
        trip.signalAll();
        // set up next generation
        count = parties;
        generation = new Generation();
    }

    private void breakBarrier() {
        generation.broken = true;
        count = parties;
        trip.signalAll();
    }

    private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        // ...
    }

    // barrierAction 会在所有 party 调用 await 方法后执行
    // barrierAction 执行结束后其他等待线程才会被唤醒
    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }

    public CyclicBarrier(int parties) {
        this(parties, null);
    }

    public int getParties() {
        return parties;
    }

    public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }

    public int await(long timeout, TimeUnit unit)
        throws InterruptedException,
               BrokenBarrierException,
               TimeoutException {
        return dowait(true, unit.toNanos(timeout));
    }

    public boolean isBroken() {
        // ...
    }

    public void reset() {
        // ...
    }

    public int getNumberWaiting() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return parties - count;
        } finally {
            lock.unlock();
        }
    }
}

CyclicBarrier的内部实现基于Condition,主要开销在于可能产生的上下文切换。
CyclicBarrier内部使用了一个条件变量trip来实现等待/通知。

CyclicBarrier内部实现使用了分代(Generation)用于重复使用。除最后一个线程外的任何一个参与方都相当于一个等待线程,这些线程所使用的保护条件是“当前分代内,尚未执行await方法的参与方个数为0"。

应用场景

  • 使迭代算法并发化
  • 在测试代码中模拟高并发

生产者——消费者模式

生产者(Producer):主要职责是生产产品(Product),产品可以时数据,也可以是任务。
消费者(Consumer):主要职责是消费生产者所生产的产品。既对产品数据加工处理或执行产品任务。

生产者和消费者并发运行在各自的线程。

由于线程之间无法像函数调用那样通过参数直接传递数据,因此生产者和消费者之间需要一个用于传递产品的 传输通道(Channel)
传输通道相当于生产者和消费者之间的缓冲区, 生产者每生产一个产品就将其放入传输通道,消费者则不断地从传输通道中取出产品进行消费, 传输通道通常可以使用一个线程安全的队列来实现。

public interface BlockingQueue<E> extends Queue<E> {
    boolean add(E e);

    boolean offer(E e);

    // 在队列中存入一个元素
    void put(E e) throws InterruptedException;

    boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException;

    // 从队列中取出一个元素
    E take() throws InterruptedException;

    E poll(long timeout, TimeUnit unit)
        throws InterruptedException;

    int remainingCapacity();

    boolean remove(Object o);

    public boolean contains(Object o);

    int drainTo(Collection<? super E> c);

    int drainTo(Collection<? super E> c, int maxElements);
}

阻塞队列常用实现类:ArrayBlockingQueue,LinkedBlockingQueue,SynchronousQueue

通常生产者和消费者的处理能力是不同的,较为常见的情形是生产者的处理能力比消费者的处理能力大。传输通道所起的作用不仅仅作为生产者和消费者之间传递数据的中介,它在一定程度上还起到一个平衡生产者和消费者处理能力的作用。

阻塞队列

阻塞队列:从传输通道中存入一个产品或者取出一个产品时,相应的线程可能因为传输通道中没有产品或者其存储空间已满而被阻塞。

  • 有界队列:存储容量受限 如:ArrayBlockingQueue LinkedBlockingQueue
  • *队列:存储容量最大为Integer.MAX_VALUE 如:LinkedBlockingQueue

ArrayBlockingQueue内部使用数组作为存储空间,空间预先分配,因此puttake操作本身并不会增加垃圾回收的负担。缺点是puttake操作使用同一个锁,从而可能导致锁的高争用,进而导致较多的上下文切换。

LinkedBlockingQueue内部使用链表作为存储空间,空间动态分配,puttake导致链表节点动态创建和移除,可能增加垃圾回收的负担。puttake使用两个锁,size使用AtomicInteger,可能会被生产者和消费者线程争用,可能导致额外开销。

SynchronousQueue,是一种特殊的有界队列, 内部并不维护用于存储队列元素的存储空间。生产者线程执行put时如果没有消费者线程执行take,那么该生产者线程会被暂停,直到有消费者线程执行了 take。反之亦然。因此,SynchronousQueue适合于在消费者处理能力与生产者处理能力相差不大的情况下使用。否则可能导致较多的等待(这意味着上下文切换)。

阻塞队列也支持非阻塞式操作。比如,BlockingQueue接口定义的offerpoll分别相当于puttake的非阻塞版。

LinkedBlockingQueue适合在生产者线程和消费者线程之间的并发程度比较大的 情况下使用。
ArrayBlockingQueue适合在生产者线程和消费者线程之间的并发程度较低的情况 下使用。
SynchronousQueue适合在消费者处理能力与生产者处理能力相差不大的情况下 使用。

流量控制与信号量

使用*队列作为传输通道的一个好处是put操作并不会导致生产者线程被阻塞。但是在队列积压的情况下,*队列中存储的元素可能越来越多,最终导致这些元素所占用的资源过多。因此,一般我们在使用*队列作为传输通道的时候会同时限制生产者的生产速率, 即进行流量控制以避免传输通道中积压过多的产品。

java.util.concurrent.Semaphore可以用来实现流量控制。将代码所访问的特定资源或者执行特定操作的机会统一看作一种资源,这种资源被称为虚拟资源(Virtual Resource)。Semaphore相当于虚拟资源配额管理器,可以用来控制同一时间内对虚拟资源的访问次数。因此代码在访问虚拟资源前必须先申请相应的配额,并在资源访问结束后返还相应的配额。
Semaphore.acquire()/release()分别用于申请配额和返还配额。Semaphore.acquire()在成功获得一个配额后会立即返回。如果当前的可用配额不足,那么会使其执行线程暂停。Semaphore内部会维护一个等待队列用于存储这些被暂停的线程。acquire在其返回之前总是会将当前的可用配额减少1。release会使当前可用配额增加1,并唤醒相应实例的等待队列中的一个任意等待线程。

public class SemaphoreBasedChannel<P> implements Channel<P> {
	private final BlockingQueue<P> queue;
	private final Semaphore semaphore;

	public SemaphoreBasedChannel (BlockingQueue<P> queue, int flowLimit) {
		this(queue, flowLimit, false);
	}

	public SemaphoreBasedChannel (BlockingQueue<P> queue, int flowLimit, boolean isFair) {
		this.queue = queue;
		this.semaphore = new Semaphore(flowLimit, isFair);
	}

	@Override
	public P take() throws InterruptedException {
		return queue.take();
	}

	@Override
	public void put (P product) throws InterruptedException {
		semaphore.acquire();	// 申请一个配额
		try {
			queue.put(product);	// 访问虚拟资源
		} finally {
			semaphore.release();	// 返还一个配额
		}
	}
}

创建Sempaphore实例时如果构造器中参数permits值为1,那么所创建的实例相当于一个互斥锁。

管道

PipedOutputStreamPipedInputSteam时生产者——消费者模式的一个具体例子。

PipedOutputStreamPipedInputStream分别是OutputStreamInputStream的一个子类,它们可用来实现线程间的直接输出和输入。一个线程的输出可作为另外一个线程的输入,而不必借用文件、数据库 、网络连接等其他数据交换中介。

PipedOutputStream相当于生产者,产品是字节形式的数据;PipedInputStream相当于消费者。PipedInputStream内部使用byte型数组维护了一个循环缓冲区,这个缓冲区相当于传输通道。
在使用PipedOutputStreamPipedInputStream进行输出、输入操作前,PipedOutputStream实例和PipedInputStream实例需要建立起关联(Connect)。建立关联的PipedOutputStream实例和PipedInputStream实例就像一条输送水流的管道,管道的一端连着注水口,另一端连着出水口。

注意:

  1. PipedOutputStreamPipedInputStream适合在两个线程间使用,即适用于单生产者单消费者的情形。

输出异常的处理。如果生产者线程在其执行过程中出现了不可恢复的异常,那么 消费者线程就会永远也无法读取到新的数据。生产者线程出现异常时需要通过某种方式通知相应的消费者线程,通常可以关闭PipedOutputStream实例来实现。

双缓冲

缓冲区相当于数据源与数据使用方之间的数据容器。在多线程环境下,有时候我们会使用两个(或更多)缓冲区来实现数据从数据源到数据使用方的移动。
其中一个缓冲区填充满来白数据源的数据后可以被数据使用方进行“消费",而另外一个缓冲区则用来填充来自数据源的新的数据。

因此,当消费者线程消费一个已填充的缓冲区时,另外一个缓冲区可以由生产者线程进行填充,从而实现了数据生成与消费的并发。这种缓冲技术就被称为双缓冲(Double Buffering)。

java.util.concurrent.Exchanger可以用来实现双缓冲。Exchanger相当于一个只有两个参与方的CyclicBarrierExchanger.exchange(V)相当于CyclicBarrier.await()

通常,初始状态下生产者和消费者各自创建一个空的缓冲区。消费者线程执行Exchanger.exchange(V x)时将参数x指定为一个空的或者已经使用过的缓冲区,生产者线程执行Exchanger.exchange(V x)时将参数x指定为一个已经填充完毕的缓冲区。

比照CyclicBarrier来说,生产者线程和消费者线程都执行到Exchanger.exchange(V)相当于这两个线程都到达了集合点,此时生产者线程和消费者线程各自对exchange调用就会返回。返回值是对方线程执行该方法时所指定的参数x的值。

因此,exchange的返回就造成一种生产者线程和消费者线程之间交换缓冲区的效果,即消费者线程向生产者线程提供(通过x的值)的是一个空的(或者已经使用过的)的缓冲区,而生产者线程向消费者线程提供(x)的则是一个已经填充完毕的缓冲区。这就好比当面交易的情况下,交易双方“一手交钱,一手交货"。

这样,生产者线程和消费者线程之间通过不断地交换缓冲区就实现了将生产者所生产的一个个产品传递给消费者的效果。因此,Exchanger从逻辑上可以被看作一种SynchronousQueue,其内部也不维护用于存储产品的存储空间。在单生产者单消费者模式中,可以使用Exchanger作为传输通道。

线程中断

Java 线程中断机制相当于 Java 线程与线程间协作的一套协议框架。中断(Interrupt)可被看作由一个线程发送给另外一个线程的一种指示(Indication),该指示用于表示发起线程希望目标线程的操作。
中断仅仅代表发起线程的一个诉求,而这个诉求能否被满足则取决了目标线程自身。

Java 平台会为每个线程维护一个被称为中断标记的布尔型状态变量用于表示相应线程是否接收到了中断。目标线程可以通过Thread.currentThread().isInterrupted()来获取中断标记值。也可以通过Thread.interrupted()来获取并重置该线程的标记值。

目标线程检查中断标记后所执行的操作,被称为中断响 应。

  • 无影响。originator 调用 target.interrupt()不会对 target 的运行产生任何影响。InputStream.read()ReentrantLock.lock()以及申请内部锁等阻塞方法/操作就属于这种类型。
  • 取消任务的运行。所执行的任务被取消(中止),继续处理其他任务。
  • 工作者线程停止。生命周期状态变更为 TERMINATED。

线程停止

需要主动停止线程的一些典型场景如下:

  • 服务或系统关闭
  • 错误处理
  • 用户取消任务

通用且优雅地(目标线程只有在其处理完所有待处理的任务后才能停止)实现线程停止:使用布尔型变量作为线程停止标记,使用原子变量表示目标线程待处理任务的数量。关闭时,将标志置为false。而run方法每次取出一个任务前判断停止标记和待处理任务的数量。

public class TerminatableTaskRunner implements TaskRunnerSpec {
  protected final BlockingQueue<Runnable> channel;
  // 线程停止标记
  protected volatile boolean inUse = true;
  // 待处理任务计数器
  public final AtomicInteger reservations = new AtomicInteger(0);
  private volatile Thread workerThread;
  public TerminatableTaskRunner(BlockingQueue<Runnable> channel) {
    this.channel = channel;
    this.workerThread = new WorkerThread();
  }

  public TerminatableTaskRunner() {
    this(new LinkedBlockingQueue<Runnable>());
  }

  @Override
  public void init() {
    final Thread t = workerThread;
    if (null != t) {
      t.start();
    }
  }

  @Override
  public void submit(Runnable task) throws InterruptedException {
    channel.put(task);
    reservations.incrementAndGet();
  }

  public void shutdown() {
    Debug.info("Shutting down service...");
    inUse = false;
    final Thread t = workerThread;
    if (null != t) {
      t.interrupt();
    }
  }

  public void cancelTask() {
    Debug.info("Canceling in progress task...");
    workerThread.interrupt();
  }

  class WorkerThread extends Thread {
    @Override
    public void run() {
      Runnable task = null;
      try {
        for (;;) {
          // 线程不再被需要,且无待处理任务
          if (!inUse && reservations.get() <= 0) {
            break;
          }
          task = channel.take();
          try {
            task.run();
          } catch (Throwable e) {
            e.printStackTrace();
          }
          // 使待处理任务数减少1
          reservations.decrementAndGet();
        }
      } catch (InterruptedException e) {
        workerThread = null;
      }
      Debug.info("worker thread terminated.");
    }
  }
}





参考资料:《Java 多线程编程实战指南(核心篇)》 黄文海 著