Java信号量(Semaphore)简析

前言

  之前做力扣的 1226. 哲学家进餐 时,发现了一个用 Java 信号量来控制同时进入一个临界区的线程数量的操作,遂去看了看 Java 信号量的源码。这个 Semaphore 也是位于 JUC 下面,而且也和 AQS 类密切相关,但是相关的操作和思想和可重入锁和读写锁却都有很大的不同,这个东西还是挺有意思的。

  本文接下来将从源码角度对 Semaphore 的主要操作进行分析,可以结合作者前面的两篇博客对比看它和 ReentrantLock 这样的锁有啥不同:Java并发学习笔记:ReentrantLock | Java并发学习笔记:ReentrantReadWriteLock(良心长文)

信号量的结构和初始化

public class Semaphore implements java.io.Serializable {
    private static final long serialVersionUID = -3222578661600680210L;
    private final Sync sync;
    abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 1192457210091910933L;
    Sync(int permits) {
        setState(permits);
    }
        // 其它函数
    }
    // 构造函数,默认是得到非公平实现
    public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }
    // 指定公平类型
    public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }
    // 通过调用 sync 来进行具体操作
    public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
    public void acquireUninterruptibly() {
        sync.acquireShared(1);
    }
    // ......
}
复制代码

  由上面的代码不难看出,和 ReentrantLock 一样,信号量也是借助内部类 Sync 来实现它的方法,Sync 类继承了 AQS 类并得到相关的能力。

  信号量默认的构造函数是非公平实现,如下,是 Sync 的两个子类的定义:

// 非公平实现
static final class NonfairSync extends Sync {
    private static final long serialVersionUID = -2694183684443567898L;
    NonfairSync(int permits) {
        super(permits);
    }
    // 具体代码
}
// 公平实现
static final class FairSync extends Sync {
    private static final long serialVersionUID = 2014338818796000944L;
    FairSync(int permits) {
        super(permits);
    }
    // 具体代码
}
复制代码

  信号量在初始化时,会将 permits 参数传入,默认创建 NonfairSync 类型的对象,若指定 fair 为 true,则创建 FairSync 类型的对象。不管是哪种公平类型,都会调用super(permits);,从而调用 Sync 类的构造函数,进而调用setState(permits);,最终,指定的 permits 变成了 Sync 类的 state 的值(Sync 继承了 AQS 类)。

获取和释放

  对 Semaphore 的使用主要就是获取和释放两个过程,根据是否响应中断和是否有时间限制以及每次获取和释放的数量,获取和释放有很多版本的函数,但是主要流程都类似,下面通过最基本的 acquire,tryAcquire 和 release 方法来分析:

acquire 方法

public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}
// AQS 类中
public final void acquireSharedInterruptibly(int arg)
    throws InterruptedException {
    if (Thread.interrupted() ||
        (tryAcquireShared(arg) < 0 &&
         acquire(null, arg, true, true, false, 0L) < 0))
        throw new InterruptedException();
}
复制代码

  最终还是先调用自定义的tryAcquireShared(arg)方法,如果返回值小于0,接着调用 acquire 方法,看一下这个tryAcquireShared(arg)方法:

// 非公平类型的实现
static final class NonfairSync extends Sync {
    // ...
    protected int tryAcquireShared(int acquires) {
        return nonfairTryAcquireShared(acquires);
    }
}
// Sync 类中定义
final int nonfairTryAcquireShared(int acquires) {
    for (;;) {
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}
// 公平类型的实现
static final class FairSync extends Sync {
    // ...
    protected int tryAcquireShared(int acquires) {
        for (;;) {
            if (hasQueuedPredecessors())
                return -1;
            int available = getState();
            int remaining = available - acquires;
            if (remaining < 0 ||
                compareAndSetState(available, remaining))
                return remaining;
        }
    }
}
复制代码

  tryAcquireShared 有公平和非公平两种实现,两种实现的唯一差别就是公平类型会进行if (hasQueuedPredecessors())的检查,就是如果有线程在此线程之前排队,返回 -1,这体现了所谓公平性。

  看代码的具体内容,是将 state 变量进行 CAS 减操作,这和之前的 ReentrantLock 这样的锁就很不一样了,ReentrantLock 是将 state 变量进行加操作。那么如果由int remaining = available - acquires;计算的 remaining 小于 0,直接返回这个负值,在 AQS 类的判断条件里,会接着调用acquire(null, arg, true, true, false, 0L)方法,阻塞这个线程。

  那么这么设计有什么作用呢?比方说初始化一个信号量:Semaphore sen = new Semaphore(2);,acquire 方法决定了,在没调用 release 方法的情况下,同时只有两个线程能走过这段语句,第 3 个,第 4 个线程过来调用 acquire,则在tryAcquireShared里计算的 remaining 都是 -1,会阻塞在这条语句,没办法走下去,也就控制了临界区同时工作的线程数量。

tryAcquire 方法

public boolean tryAcquire() {
    return sync.nonfairTryAcquireShared(1) >= 0;
}
复制代码

  很简单,不管是公平还是非公平,直接获取。

release 方法

// Semaphore 内
public void release() {
    sync.releaseShared(1);
}  
// AQS 内
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        signalNext(head);
        return true;
    }
    return false;
}
// Sync 内
protected final boolean tryReleaseShared(int releases) {
    for (;;) {
        int current = getState();
        int next = current + releases;
        if (next < current) // overflow
            throw new Error("Maximum permit count exceeded");
        if (compareAndSetState(current, next))
            return true;
    }
}
复制代码

  调用链为:sync.releaseShared(1) -> releaeShared(1) -> sync.tryRelesShared(1),最终还是先调用 Sync 中重写的tryReleaseShared,和其它锁的操作也是恰恰相反,这里是让 state 进行 CAS 加的操作,并且每次 CAS 加成功,tryReleaseShared都会返回 true,从而if (tryReleaseShared(arg))判断为真,进行signalNext(head);来唤醒一个因为调用 acquire 被阻塞的线程。

  还是上面那个例子:初始化一个信号量:Semaphore sen = new Semaphore(2);,acquire 方法决定了,在没调用 release 方法的情况下,同时只有两个线程能走过这段语句,但是现在第 1 个线程工作完了,调用 release 方法,信号量对应的 state 大于 0 了,从而一个被阻塞的线程可以被唤醒,走过 acquire 语句,进入临界区。

信号量的应用

  以作者目前的水平来看,信号量主要有控制临界区线程数量和作为同步控制机制两种应用:

控制临界区线程数量

  以 1226. 哲学家进餐 为例,可以用Semaphore limit = new Semaphore(3);控制下面的代码段只有三个线程能同时进入,每个线程完成了工作后,调用limit.release();来让 state 自增 1,这可以让一个阻塞在limit.acquire();的线程进来吃饭。

Semaphore limit = new Semaphore(3);
// 控制同时吃饭的人数
limit.acquire();
// 吃饭的相关操作代码
// ...
// 完成了吃饭的动作,释放信号量,让别的线程可以进来
limit.release();
复制代码

  当然,如果你把信号量的 state 初始化为 1,其实就相当于一个独占锁了。

作为同步控制机制

  可以用信号量作为多线程编程的同步控制机制,比方说 1115. 交替打印FooBar 这道题:两个线程共享一个 FooBar 类型的实例,一个线程调用 foo 方法,另一个线程调用 bar 方法,要求打印出 n 个 "FooBar",也就是要控制两个线程轮流执行,就可以用信号量来处理:

class FooBar {
    private int n;
    Semaphore foo = new Semaphore(1);
    Semaphore bar = new Semaphore(0);
    public FooBar(int n) {
        this.n = n;
    }
    public void foo(Runnable printFoo) throws InterruptedException {
        for (int i = 0; i < n; i++) {
            foo.acquire();
            printFoo.run();
            bar.release();
        }
    }

    public void bar(Runnable printBar) throws InterruptedException {
        for (int i = 0; i < n; i++) {
            bar.acquire();
            printBar.run();
            foo.release();
        }
    }
}
复制代码

  由于要先打印 foo,再打印 bar,所以把 foo 对应的信号量初始化为 1,bar 对应的信号量初始化为 0。即使 bar 线程先进来,但是它调用bar.acquire();肯定会阻塞的,因为 bar 信号量初始化为 0,必须得等 foo 线程先打印一次,调用bar.release();,bar 线程才能越过bar.acquire();,进行下面的操作。

  由于两个信号量的 state 最多只能是 1,所以一个线程打印之后,调用对应信号量的 acquire 肯定是阻塞了,必须等待另一个线程打印了之后调用 release 方法才能走下去,这样子,就实现了两个线程轮流打印。

  通过把线程对应的信号量的 state 初始化为 0,满足某条件后,再调用该信号量的 release 方法,就可以对线程的工作顺序进行控制。

  再比如 1116. 打印零与奇偶数 ,也可以用信号量来做:

class ZeroEvenOdd {
    private int n;
    Semaphore zero = new Semaphore(1);
    Semaphore odd = new Semaphore(0);
    Semaphore even = new Semaphore(0);
    public ZeroEvenOdd(int n) {
        this.n = n;
    }
    public void zero(IntConsumer printNumber) throws InterruptedException {
        for (int i = 0;i < n;++i) {
            zero.acquire();
            printNumber.accept(0);
            // release 对应的信号量
            if (i % 2 == 0) {
                odd.release();
            }
            else {
                even.release();
            }
        }
    }

    public void even(IntConsumer printNumber) throws InterruptedException {
        for (int i = 2;i <= n;i += 2) {
            even.acquire();
            printNumber.accept(i);
            zero.release();
        }
    }

    public void odd(IntConsumer printNumber) throws InterruptedException {
        for (int i = 1;i <= n;i += 2) {
            odd.acquire();
            printNumber.accept(i);
            zero.release();
        }
    }
}
复制代码

  当然,以上两题也可以用synchronized + obj.wait() + obj.notifyAll()来做(这种方法可以参考我的这篇博客:力扣多线程题目合辑 ),不过那样代码会很长,用信号量做,代码很简洁,也很好理解。

总结

  本文先对 Java 信号量的结构,初始化,获取和释放进行了源码级的分析,又通过例子列举了 Java 信号量的作用,这块源码只有 700 来行,想看懂也比较简单。