队列锁-基于数组的锁基本理论文章用意先上代码

基本理论

  • 一种自定义实现可扩展自旋锁的方法
  • 特点:
  1. 在队列中,每个线程检测其前驱线程是否已完成来判断是否轮到自身线程执行
  2. 每个线程在不同的存储单元上旋转,从而降低cache一致性流量。
  3. 提高了临界区的利用率,因为没有必要去判断何时要访问它:每个线程直接由队列中的前驱线程来通知
  4. 提供了先来先服务的公平性

文章用意

在于加深作者本人对锁的实现原理的理解,加深对锁知识点的巩固和认知。当做自己的学习笔记!

知识来源

书籍:《多处理器编程的艺术》

先上代码

public class ALock implements Lock {
    AtomicInteger tail; //所有线程共享的变量
    boolean[] flag;//相关线程是否持有锁标识
    int size;//锁数量大小

    ThreadLocal<Integer> mySlotIndex = new ThreadLocal<Integer>() {
        protected Integer initialValue() {
            return 0;
        }
    };


    public ALock(int capacity) {
        size = capacity;
        tail = new AtomicInteger(0);
        flag = new boolean[capacity];
        flag[0] = true;
    }

    @Override
    public void lock() {
        int slot = tail.getAndIncrement() % size;
        mySlotIndex.set(slot);
        while (!flag[slot]) {
        }
    }


    @Override
    public void lockInterruptibly() throws InterruptedException {

    }

    @Override
    public boolean tryLock() {
        return false;
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return false;
    }

    @Override
    public void unlock() {
        int slot = mySlotIndex.get();
        flag[slot] = false;
        flag[(slot + 1) % size] = true;
    }

    @Override
    public Condition newCondition() {
        return null;
    }
}
复制代码

代码分析

1.模拟单线程锁的获取和释放

public static void main(String[] args) throws InterruptedException {
 ALock lock = new ALock(4);
 System.out.println("业务开始...获取锁");
 lock.lock();
 System.out.println("占锁成功...业务进行中");
 Thread.sleep(1000 * 10);
 System.out.println("业务完成...准备释放锁");
 lock.unlock();
 System.out.println("释放锁成功...业务结束");
}
复制代码

1_1.png
1.1:初始化Alock锁对象信息
锁大小为4
tail尾域初始化为0
flag:0下标值默认为true表示可以获取锁

1_2.png

1.2:锁的占用:lock.lock();
先原子化的获取tail尾域的值(0)然后再自增1然后再取模获取当前线程的槽。
将槽值当做flag的下标传入,如果flag[slot] = true,则表示当前线程有权获取到锁。否则自旋

image.png
1.3:锁的释放:lock.unlock();
首先得到自身线程的槽值,然后将槽的值带入flag下标将其值设置为false(表示锁已释放)再将flag下一个下标值设置为true(表示下一个索引的线程可以获得锁)

2.模拟多线程锁的获取和释放

public static void main(String[] args) throws InterruptedException {
    ALock lock = new ALock(4);
    threadA = new Thread(() -> {
        System.out.println("线程A获取锁开始...");
        lock.lock();
        System.out.println("线程A获取锁成功...");
        try {
            Thread.sleep(1000 * 10);
        } catch (InterruptedException interruptedException) {
            interruptedException.printStackTrace();
        }
        lock.unlock();
        System.out.println("线程A释放锁成功...");
    }, "A");

    threadB = new Thread(() -> {
        System.out.println("线程B获取锁开始...");
        lock.lock();
        System.out.println("线程B获取锁成功...");
        lock.unlock();
        System.out.println("线程B释放锁成功...");
    }, "B");
    threadA.start();
    threadB.start();
    System.out.println("main主业务执行完成");
}
复制代码

2.1:输出结果:

1-4.png

2.2)逐步分析:

  • 2.2.1:首先线程A获取锁:此时数据结果为:
  • slot的值为0
  • tail尾域值由0->1
  • 然后将线程槽索引设为0
  • 索引值取槽值成功获取锁成功:flag[solt] = true;
  • 并进行模拟业务睡眠十秒。 如下图:

1_5.png

  • 2.2.2:在线程A获取锁睡眠的时候此时线程B也调用了lock方法来获取对象锁。
  • slot的值为1
  • tail尾域由1->2
  • 然后将线程槽索引设为1
  • 索引值取槽值成功获取锁失败:flag[solt] = false,进行while自旋。如下图:

1_6.png
2.2.3:线程A任务完成释放锁:
线程从自身线程本地变量(MySlotIndex)中获取到索引槽值为0
然后获取槽值相关的flag索引信息将其值设置为false(表示当前线程锁已释放,没有资格获取锁)
再获取当前flag数组索引的下一个索引并取模将其值改为true,表示slot+1槽的线程可以获取取(此时前面的B线程不在自旋,获取锁成功)线程B释放锁同理。如下图:

1_7.png

1_8.png

至此所有线程的获取、释放锁完成。

总结:

AtomicInteger域被所有的线程所共享,其初始值为0。为了获得锁,每个线程原子地增加tail域.
所得的结果值称为线程的槽(slot)。槽则被当作布尔数组flag的索引。如果flag[solt]为true,那么槽为solt的线程有权获得锁。
在初始状态时,flag[0]为true。为了获取锁,线程不断地旋转直到它的槽所对应的flag变为true(while语句)。
而在释放锁时,线程把对应于它自己槽的flag设为false,并将下一个槽的flag设为true。
注意:所有的算术运算都对n进行求模,其中n至少应与最大的并发线程数相同。

在ALock算法中,mySlotIndex是线程的局部变量。线程的局部变量与线程的常规变量不同,对于每个局部变量,线程都有它自己独立初始化的副本。局部变量不需要保存在共享存储器中,不需要同步,也不会产生任何一致性流量,因为它们只能被一个线程访问。通常使用get()和set()方法来访问局部变量的值。
数组flag[]是被多个线程所共享的。但在任意给定的时间,由于每个线程都是在一个数组存储单元的本地cache副本上旋转,大大降低了无效流量,从而使得对数组存储单元的争用达到最小
要注意争用仍有可能发生,其原因在于存在着一种称为假共享的现象,当相邻的数据项(如数组元素)共享单一cache线时会发生这种现象。对一个数据项的写将会使该数据项的cache线无效,对于那些恰好进入同一个cache线的未改变但很接近的数据项来说,这种写将会引起正在这些数据上进行旋转的处理器的无效流量。
在图7-8的例子中,访问8个ALock存储单元的线程有可能遇到不必要的无效,因为这些存储单元已被缓存到两个同样的4字线中。避免假共享的一种方法就是填补数组元素,以使不同的元素被映射到不同的cache线中。在类似于C或者C++的低级语言中填补是很容易实现的,在这些语言中程序员可以直接控制对象在存储器中的布局。在图7-8的例子中,可以通过将锁数组的大小增加为原来的4倍,并以4个字来隔开存放存储单元以使得两个存储单元不会落在同一个cache线中,来填补最初的8个ALock存储单元。(通过计算4(i+1) mod 32而不是i+1 mod 8来从单元i增加到下一个单元。)
注:本段总结源自书籍:《多处理器编程的艺术》源语。

1_9.png

扩展知识点

1.何为cache缓存一致性?
简单理解就是:cpu cache(缓存)。

2.为什么需要cpu缓存?
因为CPU的频率太快了,快到主存跟不上,这样在处理器时钟周期内,CPU常常需要等待主存,浪费资源。所以cache的出现,是为了缓解CPU和内存之间速度的不匹配问题(结构:cpu -> cache -> memory)。

3.CPU cache有什么意义?
cache的容量远远小于主存,因此出现cache丢失在所难免,既然cache不能包含CPU所需要的所有数据,那么cache的存在真的有意义吗?当然是有意义的---局部性原理。
 A. 时间局部性:如果某个数据被访问,那么在不久的将来它很可能被再次访问;
 B. 空间局部性:如果某个数据被访问,那么与它相邻的数据很快也可能被访问;