JUC并发-工具类详解一、🎈CountDownLatch(

这是我参与8月更文挑战的第26天,活动详情查看:8月更文挑战

往期推荐

一、🎈CountDownLatch(减法计数器)

1.1 概述

CountDownLatch是一个同步辅助类,允许一个或多个线程等待,一直到其他线程执行的操作完成后再执行

CountDownLatch是通过一个计数器来实现的,计数器的初始值是线程的数量。每当有一个线程执行完毕后,然后通过 CountDown方法来让计数器的值-1,当计数器的值为0时,表示所有线程都执行完毕,然后继续执行 await方法 之后的语句,即在锁上等待的线程就可以恢复工作了。

1.2 类的内部类

CountDownLatch类存在一个内部类Sync,继承自AbstractQueuedSynchronizer,其源代码如下。

private static final class Sync extends AbstractQueuedSynchronizer {
    // 版本号
    private static final long serialVersionUID = 4982264981922014374L;
    
    // 构造器
    Sync(int count) {
        setState(count);
    }
    
    // 返回当前计数
    int getCount() {
        return getState();
    }

    // 试图在共享模式下获取对象状态
    protected int tryAcquireShared(int acquires) {
        return (getState() == 0) ? 1 : -1;
    }

    // 试图设置状态来反映共享模式下的一个释放
    protected boolean tryReleaseShared(int releases) {
        // Decrement count; signal when transition to zero
        // 无限循环
        for (;;) {
            // 获取状态
            int c = getState();
            if (c == 0) // 没有被线程占有
                return false;
            // 下一个状态
            int nextc = c-1;
            if (compareAndSetState(c, nextc)) // 比较并且设置成功
                return nextc == 0;
        }
    }
}

说明: 对`CountDownLatch`方法的调用会转发到对Sync或AQS的方法的调用,
所以,AQS对CountDownLatch提供支持。
复制代码

1.3 类的构造函数

public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    // 初始化状态数
    this.sync = new Sync(count);
}


说明:  
该构造函数可以构造一个用给定计数初始化的`CountDownLatch`,
并且构造函数内完成了sync的初始化,并设置了状态数。
复制代码

1.4 CountDownLatch两个核心函数

1.4.1 countDown

  • 递减锁存器的计数,如果计数达到零,则释放所有等待的线程
  • 如果当前计数大于零,则递减。 如果新计数为零,则为线程调度目的重新启用所有等待线程。
  • 如果当前计数为零,则什么也不会发生。
public void countDown() {
    sync.releaseShared(1);
}

说明: 对`countDown`的调用转换为对Sync对象的`releaseShared`(从AQS继承而来)方法的调用。
复制代码
  • releaseShared源码如下
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

说明: 此函数会以共享模式释放对象,
并且在函数中会调用到`CountDownLatch``tryReleaseShared`函数,并且可能会调用AQS的`doReleaseShared`函数。
复制代码
  • tryReleaseShared源码如下
protected boolean tryReleaseShared(int releases) {
    // Decrement count; signal when transition to zero
    // 无限循环
    for (;;) {
        // 获取状态
        int c = getState();
        if (c == 0) // 没有被线程占有
            return false;
        // 下一个状态
        int nextc = c-1;
        if (compareAndSetState(c, nextc)) // 比较并且设置成功
            return nextc == 0;
    }
}

说明: 此函数会试图设置状态来反映共享模式下的一个释放。
复制代码
  • AQS的doReleaseShared的源码如下
private void doReleaseShared() {
    /*
        * Ensure that a release propagates, even if there are other
        * in-progress acquires/releases.  This proceeds in the usual
        * way of trying to unparkSuccessor of head if it needs
        * signal. But if it does not, status is set to PROPAGATE to
        * ensure that upon release, propagation continues.
        * Additionally, we must loop in case a new node is added
        * while we are doing this. Also, unlike other uses of
        * unparkSuccessor, we need to know if CAS to reset status
        * fails, if so rechecking.
        */
    // 无限循环
    for (;;) {
        // 保存头结点
        Node h = head;
        if (h != null && h != tail) { // 头结点不为空并且头结点不为尾结点
            // 获取头结点的等待状态
            int ws = h.waitStatus; 
            if (ws == Node.SIGNAL) { // 状态为SIGNAL
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) // 不成功就继续
                    continue;            // loop to recheck cases
                // 释放后继结点
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                        !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) // 状态为0并且不成功,继续
                continue;                // loop on failed CAS
        }
        if (h == head) // 若头结点改变,继续循环  
            break;
    }
}

说明: 此函数在共享模式下释放资源。
复制代码

CountDownLatchcountDown调用链:

6.png

1.4.2 await

  • 使当前线程等待直到闩锁倒计时为零,除非线程被中断。
  • 如果当前计数为零,则此方法立即返回。即await 方法阻塞的线程会被唤醒,继续执行
  • 如果当前计数大于零,则当前线程出于线程调度目的而被禁用并处于休眠状态。
public void await() throws InterruptedException {
    // 转发到sync对象上
    sync.acquireSharedInterruptibly(1);
}
说明:对`CountDownLatch`对象的await的调用会转发为对Sync的`acquireSharedInterruptibly`(从AQS继承的方法)方法的调用。
复制代码
  • acquireSharedInterruptibly源码如下:
public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}



说明:`acquireSharedInterruptibly`又调用了
`CountDownLatch`的内部类Sync的`tryAcquireShared`和AQS的`doAcquireSharedInterruptibly`函数。
复制代码
  • tryAcquireShared函数的源码如下:
protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}
说明: 该函数只是简单的判断AQS的state是否为0,为0则返回1,不为0则返回-1复制代码
  • doAcquireSharedInterruptibly函数的源码如下:
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
    // 添加节点至等待队列
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) { // 无限循环
            // 获取node的前驱节点
            final Node p = node.predecessor();
            if (p == head) { // 前驱节点为头结点
                // 试图在共享模式下获取对象状态
                int r = tryAcquireShared(arg);
                if (r >= 0) { // 获取成功
                    // 设置头结点并进行繁殖
                    setHeadAndPropagate(node, r);
                    // 设置节点next域
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt()) // 在获取失败后是否需要禁止线程并且进行中断检查
                // 抛出异常
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}


说明: 在AQS的`doAcquireSharedInterruptibly`中可能会再次调用
`CountDownLatch`的内部类Sync的`tryAcquireShared`方法和AQS的`setHeadAndPropagate`方法。
复制代码
  • setHeadAndPropagate方法源码如下:
private void setHeadAndPropagate(Node node, int propagate) {
    // 获取头结点
    Node h = head; // Record old head for check below
    // 设置头结点
    setHead(node);
    /*
        * Try to signal next queued node if:
        *   Propagation was indicated by caller,
        *     or was recorded (as h.waitStatus either before
        *     or after setHead) by a previous operation
        *     (note: this uses sign-check of waitStatus because
        *      PROPAGATE status may transition to SIGNAL.)
        * and
        *   The next node is waiting in shared mode,
        *     or we don't know, because it appears null
        *
        * The conservatism in both of these checks may cause
        * unnecessary wake-ups, but only when there are multiple
        * racing acquires/releases, so most need signals now or soon
        * anyway.
        */
    // 进行判断
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        // 获取节点的后继
        Node s = node.next;
        if (s == null || s.isShared()) // 后继为空或者为共享模式
            // 以共享模式进行释放
            doReleaseShared();
    }
}
说明: 该方法设置头结点并且释放头结点后面的满足条件的结点,该方法中可能会调用到AQS的`doReleaseShared`方法
复制代码
  • doReleaseShared方法源码如下:
private void doReleaseShared() {
    /*
        * Ensure that a release propagates, even if there are other
        * in-progress acquires/releases.  This proceeds in the usual
        * way of trying to unparkSuccessor of head if it needs
        * signal. But if it does not, status is set to PROPAGATE to
        * ensure that upon release, propagation continues.
        * Additionally, we must loop in case a new node is added
        * while we are doing this. Also, unlike other uses of
        * unparkSuccessor, we need to know if CAS to reset status
        * fails, if so rechecking.
        */
    // 无限循环
    for (;;) {
        // 保存头结点
        Node h = head;
        if (h != null && h != tail) { // 头结点不为空并且头结点不为尾结点
            // 获取头结点的等待状态
            int ws = h.waitStatus; 
            if (ws == Node.SIGNAL) { // 状态为SIGNAL
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) // 不成功就继续
                    continue;            // loop to recheck cases
                // 释放后继结点
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                        !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) // 状态为0并且不成功,继续
                continue;                // loop on failed CAS
        }
        if (h == head) // 若头结点改变,继续循环  
            break;
    }
}

说明: 该方法在共享模式下释放。
复制代码

CountDownLatchawait调用链:

7.png

1.5 CountDownLatch示例

import java.util.concurrent.CountDownLatch;

class MyThread extends Thread {
    private CountDownLatch countDownLatch;
    
    public MyThread(String name, CountDownLatch countDownLatch) {
        super(name);
        this.countDownLatch = countDownLatch;
    }
    
    public void run() {
        System.out.println(Thread.currentThread().getName() + " doing something");
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName() + " finish");
        countDownLatch.countDown();
    }
}

public class CountDownLatchDemo {
    public static void main(String[] args) {
        CountDownLatch countDownLatch = new CountDownLatch(2);
        MyThread t1 = new MyThread("t1", countDownLatch);
        MyThread t2 = new MyThread("t2", countDownLatch);
        t1.start();
        t2.start();
        System.out.println("Waiting for t1 thread and t2 thread to finish");
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }            
        System.out.println(Thread.currentThread().getName() + " continue");        
    }
}


运行结果:
Waiting for t1 thread and t2 thread to finish
t1 doing something
t2 doing something
t1 finish
t2 finish
main continue
复制代码

二、🎀CyclicBarrier(加法计数器)

2.1 概述

CyclicBarrier也叫循环栅栏,是一个可循环利用的屏障。通过它可以实现让一组线程等待至某个状态之后全部同时执行。每个线程在到达栅栏的时候都会调用await()方法将自己阻塞,此时计数器会减1,当计数器减为0的时候所有因调用await方法而被阻塞的线程将被唤醒。叫做循环是因为当所有等待线程都被释放以后,CyclicBarrier还可以被重用(调用CyclicBarrier的reset()方法)。

2.2 CyclicBarrier 主要方法:

public class CyclicBarrier {

    private int dowait(boolean timed, long nanos); // 供await方法调用 判断是否达到条件 可以往下执行吗
    
    //创建一个新的CyclicBarrier,它将在给定数量的参与方(线程)等待时触发,每执行一次CyclicBarrier就累加1,达到了parties,就会触发barrierAction的执行
    public CyclicBarrier(int parties, Runnable barrierAction) ;
    
    //创建一个新的CyclicBarrier ,参数就是目标障碍数,它将在给定数量的参与方(线程)等待时触发,每次执行 CyclicBarrier 一次障碍数会加一,如果达到了目标障碍数,才会执行 cyclicBarrier.await()之后的语句
    public CyclicBarrier(int parties) 
        
	//返回触发此障碍所需的参与方数量。
    public int getParties()
	
    //等待,直到所有各方都在此屏障上调用了await 。
	// 如果当前线程不是最后一个到达的线程,那么它会出于线程调度目的而被禁用并处于休眠状态.直到所有线程都调用了或者被中断亦或者发生异常中断退出
    public int await()
	
    // 基本同上 多了个等待时间 等待时间内所有线程没有完成,将会抛出一个超时异常
    public int await(long timeout, TimeUnit unit)

    //将障碍重置为其初始状态。 
    public void reset()

}
复制代码

2.3 构造函数

  • CyclicBarrier(int, Runnable)型构造函数
public CyclicBarrier(int parties, Runnable barrierAction) {
    // 参与的线程数量小于等于0,抛出异常
    if (parties <= 0) throw new IllegalArgumentException();
    // 设置parties
    this.parties = parties;
    // 设置count
    this.count = parties;
    // 设置barrierCommand
    this.barrierCommand = barrierAction;
}

说明: 该构造函数可以指定关联该`CyclicBarrier`的线程数量,并且可以指定在所有线程都进入屏障后的执行动作,
该执行动作由最后一个进行屏障的线程执行。
复制代码
  • CyclicBarrier(int)型构造函数
public CyclicBarrier(int parties) {
    // 调用含有两个参数的构造函数
    this(parties, null);
}

说明: 该构造函数仅仅执行了关联该CyclicBarrier的线程数量,没有设置执行动作。
复制代码

2.4 CyclicBarrier两个核心函数

2.4.1 dowait

private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
            TimeoutException {
    // 保存当前锁
    final ReentrantLock lock = this.lock;
    // 锁定
    lock.lock();
    try {
        // 保存当前代
        final Generation g = generation;
        
        if (g.broken) // 屏障被破坏,抛出异常
            throw new BrokenBarrierException();

        if (Thread.interrupted()) { // 线程被中断
            // 损坏当前屏障,并且唤醒所有的线程,只有拥有锁的时候才会调用
            breakBarrier();
            // 抛出异常
            throw new InterruptedException();
        }
        
        // 减少正在等待进入屏障的线程数量
        int index = --count;
        if (index == 0) {  // 正在等待进入屏障的线程数量为0,所有线程都已经进入
            // 运行的动作标识
            boolean ranAction = false;
            try {
                // 保存运行动作
                final Runnable command = barrierCommand;
                if (command != null) // 动作不为空
                    // 运行
                    command.run();
                // 设置ranAction状态
                ranAction = true;
                // 进入下一代
                nextGeneration();
                return 0;
            } finally {
                if (!ranAction) // 没有运行的动作
                    // 损坏当前屏障
                    breakBarrier();
            }
        }

        // loop until tripped, broken, interrupted, or timed out
        // 无限循环
        for (;;) {
            try {
                if (!timed) // 没有设置等待时间
                    // 等待
                    trip.await(); 
                else if (nanos > 0L) // 设置了等待时间,并且等待时间大于0
                    // 等待指定时长
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) { 
                if (g == generation && ! g.broken) { // 等于当前代并且屏障没有被损坏
                    // 损坏当前屏障
                    breakBarrier();
                    // 抛出异常
                    throw ie;
                } else { // 不等于当前带后者是屏障被损坏
                    // We're about to finish waiting even if we had not
                    // been interrupted, so this interrupt is deemed to
                    // "belong" to subsequent execution.
                    // 中断当前线程
                    Thread.currentThread().interrupt();
                }
            }

            if (g.broken) // 屏障被损坏,抛出异常
                throw new BrokenBarrierException();

            if (g != generation) // 不等于当前代
                // 返回索引
                return index;

            if (timed && nanos <= 0L) { // 设置了等待时间,并且等待时间小于0
                // 损坏屏障
                breakBarrier();
                // 抛出异常
                throw new TimeoutException();
            }
        }
    } finally {
        // 释放锁
        lock.unlock();
    }
}
复制代码

dowait方法的逻辑流程:

7.png

2.4.2 nextGeneration

此函数在所有线程进入屏障后会被调用,即生成下一个版本,所有线程又可以重新进入到屏障中,其源代码如下:

private void nextGeneration() {
    // signal completion of last generation
    // 唤醒所有线程
    trip.signalAll();
    // set up next generation
    // 恢复正在等待进入屏障的线程数量
    count = parties;
    // 新生一代
    generation = new Generation();
}
复制代码

在此函数中会调用AQS的signalAll方法,即唤醒所有等待线程。如果所有的线程都在等待此条件,则唤醒所有线程。其源代码如下:

public final void signalAll() {
    if (!isHeldExclusively()) // 不被当前线程独占,抛出异常
        throw new IllegalMonitorStateException();
    // 保存condition队列头结点
    Node first = firstWaiter;
    if (first != null) // 头结点不为空
        // 唤醒所有等待线程
        doSignalAll(first);
}

说明: 此函数判断头结点是否为空,即条件队列是否为空,然后会调用`doSignalAll`函数。
复制代码
  • doSignalAll函数源码如下:
private void doSignalAll(Node first) {
    // condition队列的头结点尾结点都设置为空
    lastWaiter = firstWaiter = null;
    // 循环
    do {
        // 获取first结点的nextWaiter域结点
        Node next = first.nextWaiter;
        // 设置first结点的nextWaiter域为空
        first.nextWaiter = null;
        // 将first结点从condition队列转移到sync队列
        transferForSignal(first);
        // 重新设置first
        first = next;
    } while (first != null);
}

此函数会依次将条件队列中的节点转移到同步队列中,会调用到`transferForSignal`函数。
复制代码
  • transferForSignal函数源码如下:
final boolean transferForSignal(Node node) {
    /*
        * If cannot change waitStatus, the node has been cancelled.
        */
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;

    /*
        * Splice onto queue and try to set waitStatus of predecessor to
        * indicate that thread is (probably) waiting. If cancelled or
        * attempt to set waitStatus fails, wake up to resync (in which
        * case the waitStatus can be transiently and harmlessly wrong).
        */
    Node p = enq(node);
    int ws = p.waitStatus;
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}

说明: 此函数的作用就是将处于条件队列中的节点转移到同步队列中,并设置结点的状态信息,其中会调用到`enq`函数。
复制代码
  • enq函数源码如下:
private Node enq(final Node node) {
    for (;;) { // 无限循环,确保结点能够成功入队列
        // 保存尾结点
        Node t = tail;
        if (t == null) { // 尾结点为空,即还没被初始化
            if (compareAndSetHead(new Node())) // 头结点为空,并设置头结点为新生成的结点
                tail = head; // 头结点与尾结点都指向同一个新生结点
        } else { // 尾结点不为空,即已经被初始化过
            // 将node结点的prev域连接到尾结点
            node.prev = t; 
            if (compareAndSetTail(t, node)) { // 比较结点t是否为尾结点,若是则将尾结点设置为node
                // 设置尾结点的next域为node
                t.next = node; 
                return t; // 返回尾结点
            }
        }
    }
}

说明: 此函数完成了结点插入同步队列的过程
复制代码

newGeneration函数调用链:

9.png

2.5 CyclicBarrier示例

public class CyclicBarrierDemo {
    public static void main(String[] args) {
        
        CyclicBarrier cyclicBarrier = new CyclicBarrier(7, () -> {
            System.out.println("集齐七张卡片,参与抽奖");
        });

        for (int i = 1; i <= 7; i++) {
            final int temp=i;
            new Thread(()->{
                try {
                    System.out.println(Thread.currentThread().getName()+"收集到第"+temp+"张");
                    //阻塞任务线程
                    cyclicBarrier.await();

                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }).start();
        }


    }
}

程序输出:
Thread-1收集到第2张
Thread-4收集到第5张
Thread-3收集到第4张
Thread-2收集到第3张
Thread-0收集到第1张
Thread-6收集到第7张
Thread-5收集到第6张
集齐七张卡片,参与抽奖
复制代码

三、🩰Semaphore( 信号灯)

3.1 概述

Semaphore:信号量通常用于限制可以访问某些(物理或逻辑)资源的线程数

使用场景

限制资源,如抢位置、限流等。

3.2 Semaphore常用方法

  • void acquire() 许可数-1,从该信号量获取许可证,阻塞直到可用或线程被中断。

  • void acquire(int permits) 许可数 - permits,从该信号量获取给定数量的许可证,阻塞直到可用或线程被中断。

  • int availablePermits() 返回此信号量中当前可用的许可数。

  • void release() 许可数+1,释放许可证,将其返回到信号量。

  • void release(int permits) 许可数+permits,释放给定数量的许可证,将其返回到信号量。

  • boolean hasQueuedThreads() 查询是否有线程正在等待获取许可。

  • int getQueueLength() 返回等待获取许可的线程数的估计

3.3 Semaphore构造函数

  • Semaphore(int)型构造函数
public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}

说明: 该构造函数会创建具有给定的许可数和非公平的公平设置的`Semaphore`复制代码
  • Semaphore(int, boolean)型构造函数
public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

说明: 该构造函数会创建具有给定的许可数和给定的公平设置的`Semaphore`复制代码

3.4 Semaphore两个核心函数

3.4.1 acquire

此方法从信号量获取一个(多个)许可,在提供一个许可前一直将线程阻塞,或者线程被中断,其源码如下:

public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

说明: 该方法中将会调用Sync对象的`acquireSharedInterruptibly`
(从AQS继承而来的方法)方法,而`acquireSharedInterruptibly`方法在`CountDownLatch`中已经进行了分析,在此不再累赘。
复制代码

3.4.2 release

此方法释放一个(多个)许可,将其返回给信号量,源码如下:

public void release() {
    sync.releaseShared(1);
}

说明: 该方法中将会调用Sync对象的`releaseShared`
(从AQS继承而来的方法)方法,而`releaseShared`方法在`CountDownLatch`中已经进行了分析,在此不再累赘。
复制代码

3.5 Semaphore示例

/**
 * @Author: Akiang
 * @Date: 2021-08-26 09:03
 * version 1.0
 */
public class SemaphoreDemo1 {
    public static void main(String[] args) {
        // 10台电脑
        Semaphore semaphore = new Semaphore(10);

        // 20 个小伙伴想要上网
        for (int i = 1; i <= 20; i++) {
            new Thread(() -> {
                try {
                    //等待获取许可证
                    semaphore.acquire();
                    System.out.println(Thread.currentThread().getName() + "抢到了电脑");
                    //抢到的小伙伴,迅速就开打啦 这里就模拟个时间哈,
                    TimeUnit.SECONDS.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    //打完几把游戏的小伙伴 女朋友来找了 溜啦溜啦 希望大家都有人陪伴
                    System.out.println("女朋友来找,"+Thread.currentThread().getName() + "离开了");
                    semaphore.release();//释放资源,离开了就要把电脑让给别人啦。
                }
            }, String.valueOf(i)).start();
        }
    }
}

复制代码

四、 🏏简单讲述 | Phaser & Exchanger

4.1 Phaser

Phaser一种可重用的同步屏障,功能上类似于CyclicBarrier和CountDownLatch,但使用上更为灵活。非常适用于在多线程环境下同步协调分阶段计算任务(Fork/Join框架中的子任务之间需同步时,优先使用Phaser)

  • 函数列表:
//构造方法
public Phaser() {
    this(null, 0);
}
public Phaser(int parties) {
    this(null, parties);
}
public Phaser(Phaser parent) {
    this(parent, 0);
}
public Phaser(Phaser parent, int parties)
//注册一个新的party
public int register()
//批量注册
public int bulkRegister(int parties)
//使当前线程到达phaser,不等待其他任务到达。返回arrival phase number
public int arrive() 
//使当前线程到达phaser并撤销注册,返回arrival phase number
public int arriveAndDeregister()
/*
 * 使当前线程到达phaser并等待其他任务到达,等价于awaitAdvance(arrive())。
 * 如果需要等待中断或超时,可以使用awaitAdvance方法完成一个类似的构造。
 * 如果需要在到达后取消注册,可以使用awaitAdvance(arriveAndDeregister())。
 */
public int arriveAndAwaitAdvance()
//等待给定phase数,返回下一个 arrival phase number
public int awaitAdvance(int phase)
//阻塞等待,直到phase前进到下一代,返回下一代的phase number
public int awaitAdvance(int phase) 
//响应中断版awaitAdvance
public int awaitAdvanceInterruptibly(int phase) throws InterruptedException
public int awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit)
    throws InterruptedException, TimeoutException
//使当前phaser进入终止状态,已注册的parties不受影响,如果是分层结构,则终止所有phaser
public void forceTermination()

复制代码
  • 示例:
public class PhaserDemo {

    private static Phaser phaser = new MyPhaser();

    //自定义一个移相器来自定义输出
    static class MyPhaser extends Phaser {
        /**
         * @deprecated 在即将到来的阶段提前时执行操作并控制终止的可覆盖方法。 此方法在推进此移相器的一方到达时调用(当所有其他等待方处于休眠状态时)。
         *             如果此方法返回true ,则此移相器将在提前时设置为最终终止状态,并且对isTerminated后续调用将返回 true。
         * @param phase 进入此方法的当前阶段号,在此移相器前进之前
         * @param registeredParties 当前注册方的数量
         * @return
         */
        @Override
        protected boolean onAdvance(int phase, int registeredParties) {
            if (phase == 0) {
                System.out.println("所有人都到达了网吧,准备开始开黑!!!");
                return false;
            } else if (phase == 1) {
                System.out.println("大家都同意,一起去次烧烤咯!!!");
                return false;
            } else if (phase == 2) {
                System.out.println("大家一起回寝室!!!");
                return true;
            }
            return true;
        }
    }

    //构建一个线程任务
    static class DoSomeThing implements Runnable {
        @Override
        public void run() {
            /**
             * 向此移相器添加一个新的未到达方
             */
            phaser.register();
            System.out.println(Thread.currentThread().getName() + "从家里出发,准备去学校后街上网开黑!!!");
            phaser.arriveAndAwaitAdvance();
            System.out.println(Thread.currentThread().getName() + "上着上着饿了,说去次烧烤吗?");
            phaser.arriveAndAwaitAdvance();
            System.out.println(Thread.currentThread().getName() + "烧烤次完了");
            phaser.arriveAndAwaitAdvance();
        }
    }

    public static void main(String[] args) throws Exception {
        DoSomeThing thing = new DoSomeThing();
        new Thread(thing, "小明").start();
        new Thread(thing, "小王").start();
        new Thread(thing, "小李").start();
    }
}
/**
 * 小李从家里出发,准备去学校后街上网开黑!!!
 * 小王从家里出发,准备去学校后街上网开黑!!!
 * 小明从家里出发,准备去学校后街上网开黑!!!
 * 所有人都到达了网吧,准备开始开黑!!!
 * 小李上着上着饿了,说去次烧烤吗?
 * 小明上着上着饿了,说去次烧烤吗?
 * 小王上着上着饿了,说去次烧烤吗?
 * 大家都同意,一起去次烧烤咯!!!
 * 小明烧烤次完了
 * 小李烧烤次完了
 * 小王烧烤次完了
 * 大家一起回寝室!!!
 */
复制代码

4.2 Exchanger

Exchanger允许两个线程在某个汇合点交换对象,在某些管道设计时比较有用

Exchanger提供了一个同步点,在这个同步点,一对线程可以交换数据。每个线程通过exchange()方法的入口提供数据给他的伙伴线程,并接收他的伙伴线程提供的数据并返回。

当两个线程通过Exchanger交换了对象,这个交换对于两个线程来说都是安全的。Exchanger可以认为是SynchronousQueue的双向形式,在运用到遗传算法和管道设计的应用中比较有用。

  • Exchanger实现机制
for (;;) {
    if (slot is empty) { // offer
        // slot为空时,将item 设置到Node 中        
        place item in a Node;
        if (can CAS slot from empty to node) {
            // 当将node通过CAS交换到slot中时,挂起线程等待被唤醒
            wait for release;
            // 被唤醒后返回node中匹配到的item
            return matching item in node;
        }
    } else if (can CAS slot from node to empty) { // release
         // 将slot设置为空
        // 获取node中的item,将需要交换的数据设置到匹配的item
        get the item in node;
        set matching item in node;
        // 唤醒等待的线程
        release waiting thread;
    }
    // else retry on CAS failure
}
复制代码
  • Exchanger示例

来一个非常经典的并发问题:你有相同的数据buffer,一个或多个数据生产者,和一个或多个数据消费者。只是Exchange类只能同步2个线程,所以你只能在你的生产者和消费者问题中只有一个生产者和一个消费者时使用这个类。

    static class Producer extends Thread {
        private Exchanger<Integer> exchanger;
        private static int data = 0;
        Producer(String name, Exchanger<Integer> exchanger) {
            super("Producer-" + name);
            this.exchanger = exchanger;
        }

        @Override
        public void run() {
            for (int i=1; i<5; i++) {
                try {
                    TimeUnit.SECONDS.sleep(1);
                    data = i;
                    System.out.println(getName()+" 交换前:" + data);
                    data = exchanger.exchange(data);
                    System.out.println(getName()+" 交换后:" + data);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    static class Consumer extends Thread {
        private Exchanger<Integer> exchanger;
        private static int data = 0;
        Consumer(String name, Exchanger<Integer> exchanger) {
            super("Consumer-" + name);
            this.exchanger = exchanger;
        }

        @Override
        public void run() {
            while (true) {
                data = 0;
                System.out.println(getName()+" 交换前:" + data);
                try {
                    TimeUnit.SECONDS.sleep(1);
                    data = exchanger.exchange(data);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(getName()+" 交换后:" + data);
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        Exchanger<Integer> exchanger = new Exchanger<Integer>();
        new Producer("", exchanger).start();
        new Consumer("", exchanger).start();
        TimeUnit.SECONDS.sleep(7);
        System.exit(-1);
    }
}

其结果可能如下:

Consumer- 交换前:0
Producer- 交换前:1
Consumer- 交换后:1
Consumer- 交换前:0
Producer- 交换后:0
Producer- 交换前:2
Producer- 交换后:0
Consumer- 交换后:2
Consumer- 交换前:0
Producer- 交换前:3
Producer- 交换后:0
Consumer- 交换后:3
Consumer- 交换前:0
Producer- 交换前:4
Producer- 交换后:0
Consumer- 交换后:4
Consumer- 交换前:0
复制代码

五、CyclicBarrier与CountDownLatch的区别

  • CountDownLatch的await()方法阻塞的是主线程或调用await()的线程,而CyclicBarrier的await()阻塞的是任务线程,主线程或调用线程不受影响。

  • CountDownLatch无法重置计数次数,而CyclicBarrier可以通过reset()方法来重用

  • CountDownLatchCyclicBarrier都是用作多线程同步,CountDownLatch基于AQS,CyclicBarrier基于ReentrantLockCondition

参考:《Java并发编程的艺术》

JUC系列(七)| JUC三大常用工具类CountDownLatch、CyclicBarrier、Semaphore