Java并发之JUC的通关文牒

一、简介

  • java5.0提供了 java.util.concurrent (简称juc)此包中增加了并发编程中很常用的实用工具类,用于定义类似于线程的自定义系统,包括线程池、异步io和轻量级任务框架。提供可调的、灵活的线程池。还提供了设计用于多线程上下文中的Colletion实现等。

二、volatile关键字

  1. 内存的可见性

    • 内存可见性(Memory Visibility):是指在多个线程操作共享数据时,一个线程在使用共享数据时,而另一个线程修改了共享数据的状态,这时候需要确保该线程修改完共享数据状态后,其他线程能够感知共享数据状态的变化。者与内存模型有关,模型采用主存与缓存的方式对变量进行操作,每个线程有自己的缓存空间,先从主存读取变量,在缓存中修改,然后回写到主存中。
    • 我们可以使用同步锁机制来保证共享数据安全的发布,但是性能会有所降低。我们还可以使用一种更加轻量级的volatile关键字,他是一种稍弱的同步机制,即在并发环境中,当一个共享变量被volatile修饰时,它会保证修改的值会立即被更新到主存,当有其他线程需要读取时,它会去内存中读取新值,这就保证了可见性。
  2. 与同步锁区别

    • 不具备互斥性,即同步锁机制下在同步代码块中只有一个线程在执行,而volatile无法保证。
    • 不能保证变量操作的原子性
  3. 测试代码

    public class VolitateTest {
    ​
        public static void main(String[] args) {
    ​
            try {
                /**
                 * 有两个线程:线程demo和主线程main
                 * 在flag变量未被volatile修饰时,demo线程修改flag值,main线程并未感知会出现死循环
                 * 在flag被volatile修饰时,demo线程修改flag值,main线程刷新线程缓存数据获取最新值,执行结束
                 * 原理:我们在定义变量时,会缓存在主存里,每个线程会有线程缓存,线程会先读取主存数据,然后修改数据,回写到主存中
                 * 如果在回写过程中其他线程读取主存数据,读取的就是未被修改的数据。用volatile修饰变量在保证线程可见性可避免该类问题
                 */
                ThreadDemo demo = new ThreadDemo();
                new Thread(demo).start();
    ​
                while (true){
                    if (demo.getFlag()){
                        System.out.println("------falge is change------");
                        break;
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
    ​
        }
    ​
        @Data
        public static class ThreadDemo implements Runnable{
    ​
            private volatile Boolean flag = false;
    //        private  Boolean flag = false;
    ​
            @Override
            public void run() {
                try {
                    Thread.sleep(500);
                    flag =true;
                    System.out.println("flag="+flag);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    ​
    复制代码

三、CAS算法

  1. 原子性

    • i++操作的原子性问题,i++是非原子操作,实际上分为了三个操作:读-改-写。

      int i=10; 
      ​
      int temp = i;
      i = i+1;
      i=temp;
      复制代码

      这样在并发环境下,多个线程都从主存中读取i值,然后进行修改之后回写主存就会出现重复数据

    • volatile不能解决原子性问题,i++是非原子操作,如果单纯用volatile修饰i,一个线程在执行第二三步阻塞的情况下,其他线程读取的i值其实在主存中是不变的,因为i还没更新主存,这时候其他线程已经读取到i未更新的值再去做i++操作就会出现原子性问题。所以volatile只能保证内存可见性不能保证原子性。

    • 原子变量:java.util.concurrent.atomic 包下提供了一些原子操作的常用类

      1. 原子变量的共享变量全部由volatile修饰,保证内存的可见性
      2. CAS(Compare-And-Swap)算法保证数据的原子性
  2. CAS算法

    • CAS(Compare-And-Swap)算法是一种硬件对并发的支持,针对多处理器操作而设计的处理器的一种特殊指令,用于对管理对共享数据的并发访问,是一种无锁的非阻塞算法的实现。

    • CAS 包含了 3 个操作数:

      1. 需要读取的内存值:V
      2. 进行比较的预估值:A
      3. 修改以后的更新值:B
    • 当且仅当V==A时,CAS通过原子方式更新V值变为B值,否则不进行任何操作。(例如数据库的乐观锁就是采用这种机制)

    • CAS算法模拟

      public class CompareAndSwapTest {
      ​
          public static void main(String[] args) {
              final CompareAndSwap cas = new CompareAndSwap();
              for (int i = 0; i < 10; i++) {
                  new Thread(new Runnable() {
                      @Override
                      public void run() {
                        //模拟并发执行CAS设置值,会发现会有部分设置失败的情况
                          int expectedValue = cas.get();
                          boolean b = cas.compareAndSet(expectedValue, (int) (Math.random() * 1000));
                          System.out.println("CompareAndSwap-->" + b);
                      }
                  }).start();
      ​
              }
      ​
          }
      ​
      }
      ​
      ​
      class CompareAndSwap{
          //内存值
          private int value;
      ​
          //获取内存值
          public synchronized int get(){
              return value;
          }
          //比较
          public synchronized int compareAndSwap(int exceptedValue,int newValue){
              int oldValue = value;
              if (oldValue==exceptedValue){
                  this.value = newValue;
              }
              return oldValue;
          }
          //设置
          public synchronized boolean compareAndSet(int exceptedValue,int newValue){
              return exceptedValue==this.compareAndSwap(exceptedValue,newValue);
          }
      }
      复制代码
  3. 原子变量

    • java.util.concurrent.atomic 包下提供了一些原子操作的常用类

      1. AtomicBoolean 、AtomicInteger 、AtomicLong 、 AtomicReference
      2. AtomicIntegerArray 、AtomicLongArray
      3. AtomicMarkableReference
      4. AtomicReferenceArray
      5. AtomicStampedReference
    • 类的小工具包,支持在单个变量上解除锁的线程安全编程。事实上,此包中的类可将 volatile 值、字段和数组元素的概念扩展到那些也提供原子条件更新操作的类。

    • 类 AtomicBoolean、AtomicInteger、AtomicLong 和 AtomicReference 的实例各自提供对相应类型单个变量的访问和更新。每个类也为该类型提供适当的实用工具方法。

    • AtomicIntegerArray、AtomicLongArray 和 AtomicReferenceArray 类进一步扩展了原子操作,对这些类型的数组提供了支持。这些类在为其数组元素提供 volatile 访问语义方面也引人注目,这对于普通数组来说是不受支持的。

    • 核心方法:boolean compareAndSet(expectedValue, updateValue)(CAS的体现)

四、Concurrent集合

  1. java.util.concurrent

    该包提供了对应的并发集合类,着重讲一下ConcurrentHashMap

    interface non-thread-safe thread-safe
    List ArrayList CopyOnWriteArrayList
    Map HashMap ConcurrentHashMap
    Set HashSet / TreeSet CopyOnWriteArraySet
    Queue ArrayDeque / LinkedList ArrayBlockingQueue / LinkedBlockingQueue
    Deque ArrayDeque / LinkedList LinkedBlockingDeque
  2. HashMap

  • HashMap底层基于数组+链表组成的,在1.7与1.8中的具体实现有所不同

  • Base1.7

    1. 数据结构图

    640?wx_fmt=jpeg.png

    1. 实现原理

      image.png

      HashMap中核心的几个成员变量

      1. 初始化桶大小,因为底层是数组,所以这是数组默认的大小。
      2. 桶最大值。
      3. 默认的负载因子(0.75)
      4. table 真正存放数据的数组。
      5. Map 存放数量的大小。
      6. 桶大小,可在初始化时显式指定。
      7. 负载因子,可在初始化时显式指定。

      HashMap的容量大小是固定的,通过构造器初始化指定或者默认初始化等

          public HashMap(int initialCapacity) {
              this(initialCapacity, DEFAULT_LOAD_FACTOR);
          }
      ​
          /**
           * Constructs an empty {@code HashMap} with the default initial capacity
           * (16) and the default load factor (0.75).
           */
          public HashMap() {
              this.loadFactor = DEFAULT_LOAD_FACTOR; // all other fields defaulted
          }
      复制代码

      给定的默认容量为 16,负载因子为 0.75。Map 在使用过程中不断的往里面存放数据,当数量达到了 16 * 0.75 = 12 就需要将当前 16 的容量进行扩容,而扩容这个过程涉及到 rehash、复制数据等操作,所以非常消耗性能。因此通常建议能提前预估 HashMap 的大小最好,尽量的减少扩容带来的性能损耗。

      put方法

      img

      • 判断当前数组是否需要初始化。
      • 如果 key 为空,则 put 一个空值进去。
      • 根据 key 计算出 hashcode。
      • 根据计算出的 hashcode 定位出所在桶。
      • 如果桶是一个链表则需要遍历判断里面的 hashcode、key 是否和传入 key 相等,如果相等则进行覆盖,并返回原来的值。
      • 如果桶是空的,说明当前位置没有数据存入;新增一个 Entry 对象写入当前位置。

      img

      当调用 addEntry 写入 Entry 时需要判断是否需要扩容。

      如果需要就进行两倍扩充,并将当前的 key 重新 hash 并定位。

      而在 createEntry 中会将当前位置的桶传入到新建的桶中,如果当前桶有值就会在位置形成链表。

      get方法

      img

      img

      • 首先也是根据 key 计算出 hashcode,然后定位到具体的桶中。
      • 判断该位置是否为链表。
      • 不是链表就根据 key、key 的 hashcode 是否相等来返回值。
      • 为链表则需要遍历直到 key 及 hashcode 相等时候就返回值。
      • 啥都没取到就直接返回 null 。
  • Base1.8

    1. 当 Hash 冲突严重时,在桶上形成的链表会变的越来越长,这样在查询时的效率就会越来越低;时间复杂度为 O(N)

    2. 数据结构图

      image.png
      几个重要的区别:

      • TREEIFY_THRESHOLD 用于判断是否需要将链表转换为红黑树的阈值。
      • HashEntry 修改为 Node。
    3. 实现原理

      put方法

      final V putVal(int hash, K key, V value, boolean onlyIfAbsent,
                         boolean evict) {
              Node<K,V>[] tab; Node<K,V> p; int n, i;
              if ((tab = table) == null || (n = tab.length) == 0)//①
                  n = (tab = resize()).length;
              if ((p = tab[i = (n - 1) & hash]) == null)//②
                  tab[i] = newNode(hash, key, value, null);
              else {
                  Node<K,V> e; K k;
                  if (p.hash == hash &&
                      ((k = p.key) == key || (key != null && key.equals(k))))//③
                      e = p;
                  else if (p instanceof TreeNode)//④
                      e = ((TreeNode<K,V>)p).putTreeVal(this, tab, hash, key, value);
                  else {
                      for (int binCount = 0; ; ++binCount) {//⑤
                          if ((e = p.next) == null) {
                              p.next = newNode(hash, key, value, null);
                              if (binCount >= TREEIFY_THRESHOLD - 1) // -1 for 1st⑥
                                  treeifyBin(tab, hash);
                              break;
                          }
                          if (e.hash == hash &&//⑦
                              ((k = e.key) == key || (key != null && key.equals(k))))
                              break;
                          p = e;
                      }
                  }
                  if (e != null) { // existing mapping for key⑧
                      V oldValue = e.value;
                      if (!onlyIfAbsent || oldValue == null)
                          e.value = value;
                      afterNodeAccess(e);
                      return oldValue;
                  }
              }
              ++modCount;
              if (++size > threshold)//⑨
                  resize();
              afterNodeInsertion(evict);
              return null;
          }
      复制代码
    1. 判断当前桶是否为空,空的就需要初始化(resize 中会判断是否进行初始化)。
    2. 根据当前 key 的 hashcode 定位到具体的桶中并判断是否为空,为空表明没有 Hash 冲突就直接在当前位置创建一个新桶即可。
    3. 如果当前桶有值( Hash 冲突),那么就要比较当前桶中的 key、key 的 hashcode 与写入的 key 是否相等,相等就赋值给 e,在第 8 步的时候会统一进行赋值及返回。
    4. 如果当前桶为红黑树,那就要按照红黑树的方式写入数据。
    5. 如果是个链表,就需要将当前的 key、value 封装成一个新节点写入到当前桶的后面(形成链表)。
    6. 接着判断当前链表的大小是否大于预设的阈值,大于时就要转换为红黑树。
    7. 如果在遍历过程中找到 key 相同时直接退出遍历。
    8. 如果 e != null 就相当于存在相同的 key,那就需要将值覆盖。
    9. 最后判断是否需要进行扩容。

    get方法

      public V get(Object key) {
            Node<K,V> e;
            return (e = getNode(hash(key), key)) == null ? null : e.value;
        }
    ​
        /**
         * Implements Map.get and related methods.
         *
         * @param hash hash for key
         * @param key the key
         * @return the node, or null if none
         */
        final Node<K,V> getNode(int hash, Object key) {
            Node<K,V>[] tab; Node<K,V> first, e; int n; K k;
            if ((tab = table) != null && (n = tab.length) > 0 &&
                (first = tab[(n - 1) & hash]) != null) {
                if (first.hash == hash && // always check first node
                    ((k = first.key) == key || (key != null && key.equals(k))))
                    return first;
                if ((e = first.next) != null) {
                    if (first instanceof TreeNode)
                        return ((TreeNode<K,V>)first).getTreeNode(hash, key);
                    do {
                        if (e.hash == hash &&
                            ((k = e.key) == key || (key != null && key.equals(k))))
                            return e;
                    } while ((e = e.next) != null);
                }
            }
            return null;
        }
    复制代码
    • 首先将 key hash 之后取得所定位的桶。
    • 如果桶为空则直接返回 null 。
    • 否则判断桶的第一个位置(有可能是链表、红黑树)的 key 是否为查询的 key,是就直接返回 value。
    • 如果第一个不匹配,则判断它的下一个是红黑树还是链表。
    • 红黑树就按照树的查找方式返回值。
    • 不然就按照链表的方式遍历匹配返回值。
  1. ConcurrentHashMap

  • Base1.7

    1. 数据结构

    image.png

    如图所示,是由 Segment 数组、HashEntry 组成,和 HashMap 一样,仍然是数组加链表。
    Segment 是 ConcurrentHashMap 的一个内部类继承了ReetrantLock,ConcurrentHashMap 在默认并发级别会创建包含 16 个 Segment 对象的数组。每个 Segment 的成员对象 table 包含若干个散列表的桶。每个桶是由 HashEntry 链接起来的一个链表。
    HashEntry和 HashMap 非常类似,唯一的区别就是其中的核心数据如 value ,以及链表都是 volatile 修饰的,保证了获取时的可见性。

    1. 实现原理

      put方法

      1、根据 hash 值找到对应的 Segment

      public V put(K key, V value) {
            Segment<K,V> s;
            if (value == null)
                throw new NullPointerException();
            int hash = hash(key);
            int j = (hash >>> segmentShift) & segmentMask;
            if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
                 (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
                s = ensureSegment(j);
            return s.put(key, hash, value, false);
        }
      复制代码

      2、执行加锁put操作

      final V put(K key, int hash, V value, boolean onlyIfAbsent) {
                  HashEntry<K,V> node = tryLock() ? null :
                      scanAndLockForPut(key, hash, value);
                  V oldValue;
                  try {
                      HashEntry<K,V>[] tab = table;
                      int index = (tab.length - 1) & hash;
                      HashEntry<K,V> first = entryAt(tab, index);
                      for (HashEntry<K,V> e = first;;) {
                          if (e != null) {
                              K k;
                              if ((k = e.key) == key ||
                                  (e.hash == hash && key.equals(k))) {
                                  oldValue = e.value;
                                  if (!onlyIfAbsent) {
                                      e.value = value;
                                      ++modCount;
                                  }
                                  break;
                              }
                              e = e.next;
                          }
                          else {
                              if (node != null)
                                  node.setNext(first);
                              else
                                  node = new HashEntry<K,V>(hash, key, value, first);
                              int c = count + 1;
                              if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                                  rehash(node);
                              else
                                  setEntryAt(tab, index, node);
                              ++modCount;
                              count = c;
                              oldValue = null;
                              break;
                          }
                      }
                  } finally {
                      unlock();
                  }
                  return oldValue;
              }
      复制代码

      虽然 HashEntry 中的 value 是用 volatile 关键词修饰的,但是并不能保证并发的原子性,所以 put 操作时仍然需要加锁处理。首先第一步的时候会尝试获取锁,如果获取失败肯定就有其他线程存在竞争,则利用 scanAndLockForPut() 自旋获取锁。

      put操作主要分为四步:

      1. 将当前 Segment 中的 table 通过 key 的 hashcode 定位到 HashEntry。
      2. 遍历该 HashEntry,如果不为空则判断传入的 key 和当前遍历的 key 是否相等,相等则覆盖旧的 value。
      3. 不为空则需要新建一个 HashEntry 并加入到 Segment 中,同时会先判断是否需要扩容。
      4. 释放锁

      get方法

      public V get(Object key) {
              Segment<K,V> s; // manually integrate access methods to reduce overhead
              HashEntry<K,V>[] tab;
              int h = hash(key);
              long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
              if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
                  (tab = s.table) != null) {
                  for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
                           (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
                       e != null; e = e.next) {
                      K k;
                      if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                          return e.value;
                  }
              }
              return null;
          }
      复制代码

      只需要将 Key 通过 Hash 之后定位到具体的 Segment ,再通过一次 Hash 定位到具体的元素上。由于 HashEntry 中的 value 属性是用 volatile 关键词修饰的,保证了内存可见性,所以每次获取时都是最新值。ConcurrentHashMap 的 get 方法是非常高效的,因为整个过程都不需要加锁

  • Base1.8

  1. 数据结构

    image.png

    和hashmap结构类似,将 1.7 中存放数据的 HashEntry 改为 Node,但作用都是相同的。其中的 val next 都用了 volatile 修饰,保证了可见性,但是抛弃了原有的 Segment 分段锁,而采用了 CAS + synchronized 来保证并发安全性

  2. 实现原理

    put方法:

    final V putVal(K key, V value, boolean onlyIfAbsent) {
            if (key == null || value == null) throw new NullPointerException();
            int hash = spread(key.hashCode());
            int binCount = 0;
            for (Node<K,V>[] tab = table;;) {//①
                Node<K,V> f; int n, i, fh; K fk; V fv;
                if (tab == null || (n = tab.length) == 0)//②
                    tab = initTable();
                else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {//③
                    if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value)))
                        break;                   // no lock when adding to empty bin
                }
                else if ((fh = f.hash) == MOVED)//④
                    tab = helpTransfer(tab, f);
                else if (onlyIfAbsent // check first node without acquiring lock
                         && fh == hash
                         && ((fk = f.key) == key || (fk != null && key.equals(fk)))
                         && (fv = f.val) != null)
                    return fv;
                else {
                    V oldVal = null;
                    synchronized (f) {//⑤
                        if (tabAt(tab, i) == f) {
                            if (fh >= 0) {
                                binCount = 1;
                                for (Node<K,V> e = f;; ++binCount) {
                                    K ek;
                                    if (e.hash == hash &&
                                        ((ek = e.key) == key ||
                                         (ek != null && key.equals(ek)))) {
                                        oldVal = e.val;
                                        if (!onlyIfAbsent)
                                            e.val = value;
                                        break;
                                    }
                                    Node<K,V> pred = e;
                                    if ((e = e.next) == null) {
                                        pred.next = new Node<K,V>(hash, key, value);
                                        break;
                                    }
                                }
                            }
                            else if (f instanceof TreeBin) {
                                Node<K,V> p;
                                binCount = 2;
                                if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                               value)) != null) {
                                    oldVal = p.val;
                                    if (!onlyIfAbsent)
                                        p.val = value;
                                }
                            }
                            else if (f instanceof ReservationNode)
                                throw new IllegalStateException("Recursive update");
                        }
                    }
                    if (binCount != 0) {
                        if (binCount >= TREEIFY_THRESHOLD)//⑥
                            treeifyBin(tab, i);
                        if (oldVal != null)
                            return oldVal;
                        break;
                    }
                }
            }
            addCount(1L, binCount);
            return null;
        }
    复制代码
    • 根据 key 计算出 hashcode 。
    • 判断是否需要进行初始化。
    • f 即为当前 key 定位出的 Node,如果为空表示当前位置可以写入数据,利用 CAS 尝试写入,失败则自旋保证成功。
    • 如果当前位置的 hashcode == MOVED == -1,则需要进行扩容。
    • 如果都不满足,则利用 synchronized 锁写入数据。
    • 如果数量大于 TREEIFY_THRESHOLD 则要转换为红黑树。

    get方法

    • 根据计算出来的 hashcode 寻址,如果就在桶上那么直接返回值。
    • 如果是红黑树那就按照树的方式获取值。
    • 就不满足那就按照链表的方式遍历获取值。
    public V get(Object key) {
        Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
        int h = spread(key.hashCode());
        if ((tab = table) != null && (n = tab.length) > 0 &&
            (e = tabAt(tab, (n - 1) & h)) != null) {
            if ((eh = e.hash) == h) {
                if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                    return e.val;
            }
            else if (eh < 0)
                return (p = e.find(h, key)) != null ? p.val : null;
            while ((e = e.next) != null) {
                if (e.hash == h &&
                    ((ek = e.key) == key || (ek != null && key.equals(ek))))
                    return e.val;
            }
        }
        return null;
    }
    复制代码

五、并发小工具

  1. CountDownLatch同步计数器(闭锁)

    • 概念:CountDownLatch是一个同步工具类,用来协调多个线程之间的同步,能够使一个线程在等待另外一些线程完成各自工作之后,再继续执行。使用一个计数器进行实现。计数器初始值为线程的数量。当每一个线程完成自己任务后,计数器的值就会减一。当计数器的值为0时,表示所有的线程都已经完成一些任务,然后在CountDownLatch上等待的线程就可以恢复执行接下来的任务。

    • API接口

      接口 说明
      void await() 使当前线程在锁存器倒计数至零之前一直等待,除非线程被中断。(可多个线程同时执行该方法,为避免线程意外中断,产生阻塞,通常写在finally中)
      boolean await(long timeout, TimeUnit unit) 使当前线程在锁存器倒计数至零之前一直等待,除非线程被中断或超出了指定的等待时间。
      void countDown() 递减锁存器的计数,如果计数到达零,则释放所有等待的线程。
      long getCount() 返回当前计数。
    • 应用场景

      1. 某一线程开始运行前,需要其他n个线程执行完毕,可将CountDownLatch计数器初始化为new CountDownLatch(n),每当一个线程执行完毕,调用countDownLatch.countDown()方法将计数器减1,直到计数器数变为0,await()的线程就会被唤醒,执行逻辑,比如开启多个子线程去处理业务逻辑,等待所有子线程的业务逻辑处理完毕,主线程统一放行。
      2. 多个线程在某一时刻同时开始执行,类似于赛跑起点,等待枪响同时开跑。首先初始化一个共享new CountDownLatch(1),多线程执行任务前先countDownLatch.await(),主线程countDownLatch.countDown()时,计数器变为0,多个线程同时被唤醒。
    • 不足之处:CountDownLatch是一次性的,计算器的值只能在构造方法中初始化一次,之后没有任何机制再次对其设置值,当CountDownLatch使用完毕后,它不能再次被使用。

    • 示例代码

      执行结果为开启的子线程先打印日志,唤醒主线程以后主线程日志再打印。

      public class CountDownLatchTest {
      ​
          public static void main(String[] args) {
              //初始化计数器
              CountDownLatch latch = new CountDownLatch(1);
      ​
              try {
                  CountDownLatchDemo dl = new CountDownLatchDemo(latch);
      ​
                  new Thread(dl).start();
                  latch.await();
                  System.out.println("---main thread is over---");
              } catch (InterruptedException e) {
                  e.printStackTrace();
              }
      ​
          }
      }
      ​
      class CountDownLatchDemo implements Runnable{
      ​
          private CountDownLatch latch;
      ​
          public CountDownLatchDemo(CountDownLatch latch){
              this.latch=latch;
          }
      ​
          @Override
          public void run() {
              try {
                  //子线程睡眠,模拟业务处理延迟
                  Thread.sleep(10000);
                  System.out.println("---i sleep 10000---");
              } catch (InterruptedException e) {
                  e.printStackTrace();
              }
              //睡眠完毕,开启门闩,唤醒主线程打印完成日志
              latch.countDown();
          }
      }
      复制代码
  2. CyclicBarrier同步屏障

  • 概念:CyclicBarrier 的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活。CyclicBarrier默认的构造方法是CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用await方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞。

  • API接口

    接口 说明
    CyclicBarrier(int parties) 创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,但它不会在启动 barrier 时执行预定义的操作。
    CyclicBarrier(int parties, Runnable barrierAction) 创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,并在启动 barrier 时执行给定的屏障操作,该操作由最后一个进入 barrier 的线程执行。
    public int await() throws InterruptedException, BrokenBarrierException 在所有参与者都已经在此 barrier 上调用 await 方法之前,将一直等待。返回值为到达线程的当前索引,索引 getParties() - 1 指示将到达的第一个线程,0表示最后一个线程
    public int await(long timeout,TimeUnit unit) throws InterruptedException,BrokenBarrierException, ITimeoutException 相比较上一个await方法多了一个超时时间
    public void reset() 将屏障重置为其初始状态。如果所有参与者目前都在屏障处等待,则它们将返回,同时抛出一个 BrokenBarrierException。注意,在由于其他原因造成损坏之后,实行重置可能会变得很复杂;
    public boolean isBroken() 查询此屏障是否处于损坏状态。
    public int getNumberWaiting() 返回当前在屏障处等待的参与者数目。此方法主要用于调试和断言。
    public int getParties() 返回要求启动此 barrier 的参与者数目。一般初始化时指定
  • 应用场景:可以用于多线程计算数据,最后合并计算结果的场景。

  • 示例代码

    执行结果为所有子线程日志打印完毕,再打印预定义线程的日志信息。

    public class CyclicBarrierTest {
    ​
        public static void main(String[] args) {
    ​
            //开启5个线程,并指定预定义操作
            CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> {
                System.out.println("---所有线程都到达了屏障,开启主线任务---");
            });
    ​
            CyclicBarrierDemo demo = new CyclicBarrierDemo(cyclicBarrier);
    ​
            //开启5个线程执行任务
            for (int i = 0; i < 5; i++) {
                new Thread(demo).start();
            }
        }
    }
    ​
    class CyclicBarrierDemo implements Runnable{
    ​
        CyclicBarrier cyclicBarrier;
    ​
        public CyclicBarrierDemo(CyclicBarrier cyclicBarrier){
            this.cyclicBarrier = cyclicBarrier;
        }
    ​
        @Override
        public void run() {
            //到达屏障的线程索引
            try {
                System.out.println("---ThreadName:"+Thread.currentThread().getName()+"已到达---");
                //延迟500模拟业务逻辑处理延迟
                Thread.sleep(500);
                cyclicBarrier.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
    ​
        }
    }
    复制代码
  • 与CountDownLatch区别

    1. CountDownLatch:一个线程(或者多个线程), 等待另外N个线程完成某个事情之后才能执行。而这N个线程通过调用CountDownLatch.countDown()方法 来告知“某件事件”完成,即计数减一。而一个线程(或者多个线程)则通过CountDownLatch.awiat( ) 进入等待状态,直到 CountDownLatch的计数为0时,才会全部被唤醒
    2. CyclicBarrier : N个线程相互等待,任何一个线程完成某个事情之前,所有的线程都必须等待。
    3. CountDownLatch 是计数器, 线程完成一个就记一个, 就像 报数一样, 只不过是递减的.而CyclicBarrier更像一个水闸, 线程执行就想水流, 在水闸处都会堵住, 等到水满(线程到齐)了, 才开始泄流.
    4. CountDownLatch只能使用一次,CyclicBarrier则可以通过reset( )方法重置后,重新使用。所以CyclicBarrier可以用于更复杂的业务场景。例如:计算错误,可以重置计数器,并让线程重新执行一次。
  1. Semaphore信号量

    • 概念:用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。简单来说就是可以允许多少个线程同时访问资源

    • API接口

      1. 获取许可接口

        接口 说明
        public void acquire() throws InterruptedException 从此信号量获取一个许可,在提供一个许可前一直将线程阻塞,否则线程被中断
        public void acquire(int permits) throws InterruptedException 从信号量获取多个许可,在提供这些许可钱一直将线程阻塞,否则线程中断
        public void acquireUninterruptibly() 从此信号量中获取许可,在有可用的许可前将其阻塞。不可中断
        public void acquireUninterruptibly(int permits) 获取多个许可,不可中断
        public boolean tryAcquire() 非阻塞方式获取许可
        public boolean tryAcquire(int permits) 非阻塞方式获取指定数量的许可
        public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException 如果在给定的等待时间内,此信号量有可用的许可并且当前线程未被中断,则从此信号量获取一个许可。
        public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException 如果在给定的等待时间内此信号量有可用的所有许可,并且当前线程未被中断,则从此信号量获取给定数目的许可。
      2. 释放许可

        接口 说明
        public void release( ) 释放一个许可
        public void release(int permits) 释放指定数量许可
      3. 监控

        接口 说明
        public int availablePermits( ) 返回此信号量中当前可用的许可数
        public int drainPermits() 获取并返回立即可用的所有许可
        public final int getQueueLength() 返回正在等待获取的线程的估计数目。该值仅是估计的数字,因为在此方法遍历内部数据结构的同时,线程的数目可能动态地变化。此方法用于监视系统状态,不用于同步控制。
        public final boolean hasQueuedThreads() 查询是否有线程正在等待获取。
        public boolean isFair() 如果此信号量的公平设置为 true,则返回 true。
    • 使用场景:主要用于流量控制,比如限制一个方法最多可以多少个线程同时访问,特别公用资源有限的应用场景,比如数据库连接。假如有一个需求,要读取几万个文件的数据,因为都是IO密集型任务,我们可以启动几十个线程并发的读取,但是如果读到内存后,还需要存储到数据库中,而数据库的连接数只有10个,这时我们必须控制只有十个线程同时获取数据库连接保存数据,否则会报错无法获取数据库连接。这个时候,我们就可以使用Semaphore来做流控

  2. Exchanger数据交换

    • 概念:Exchanger的功能是使2个线程之间交换数据,Exchanger 提供一个同步点,在这个同步点处,两个线程可以交换彼此数据。即一个线程调用了exchange( )方法交换数据,到达了同步点,然后就会一直阻塞等待另一个线程调用exchange( )方法来交换数据。所以,要注意exchange( )方法是有阻塞的特性。

    • API接口

      接口 说明
      public V exchange(V x) throws InterruptedException 等待另一个线程到达此交换点(除非当前线程被中断),然后将给定的对象传送给该线程,并接收该线程的对象。
      public V exchange(V x, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException 等待另一个线程到达此交换点(除非当前线程被中断,或者超出了指定的等待时间),然后将给定的对象传送给该线程,同时接收该线程的对
    • 应用场景:在两个线程处理逻辑过程中,需要交换数据来决定下一步业务处理逻辑时可采用此种方法。

    • 代码示例

      public static void main(String[] args) {
              //创建一个exchanger对象,并指定交换的数据类型
              Exchanger<String> exchanger = new Exchanger<>();
      ​
              new Thread(() -> {
                  try {
                      String a = "this is thread A";
                      String exchange = exchanger.exchange(a);
                      System.out.println("thread A after exchange data :" + exchange);
                  } catch (InterruptedException e) {
                      e.printStackTrace();
                  }
              }).start();
      ​
              new Thread(()->{
                  try {
                      String b =  "this is thread B";
                      String exchange = exchanger.exchange(b);
                      System.out.println("thread B after exchange data :" + exchange);
                  } catch (InterruptedException e) {
                      e.printStackTrace();
                  }
              }).start();
          }
      }
      复制代码

      输出结果:

      thread A after exchange data :this is thread B
      thread B after exchange data :this is thread A
      复制代码

六、Lock同步锁

  1. 线程安全的实现方式

    同步方法、同步代码块、同步锁。前面两种都是通过synchronized修饰。

  2. synchronized与lock区别

    • synchronized是一个关键字,只能修饰块状代码,lock是一个接口类,可灵活的实现不同范围的上锁和释放锁
    • synchronized无法尝试获取锁,lock提供了尝试获取锁tryLock()、超时失效tryLock(long, TimeUnit)、可中断尝试lockInterruptibly()等灵活的获取锁的方式,一定程度避免线程阻塞。同样的synchronized可自动释放锁,lock需要手动释放锁
    • synchronized是非公平锁,lock在创建时可指定公平或者非公平锁
    • 线程通信synchronized使用object类中的waite阻塞和notify、notifyAll唤醒,lock使用Condition中的await阻塞和signal、signalAll唤醒
  3. 锁的实例代码(经典的生产消费者,涉及加锁以及锁的释放、虚假唤醒等)

    public class ProductorAndConsumerTest {
    ​
        public static void main(String[] args) {
            Seller seller = new Seller();
    ​
            //进货线程
            new Thread(new Producter(seller),"ThreadA").start();
    ​
            //卖货线程
            new Thread(new Consumer(seller),"ThreadB").start();
    ​
            //进货线程
            new Thread(new Producter(seller),"ThreadC").start();
    ​
            //卖货线程
            new Thread(new Consumer(seller),"ThreadD").start();
        }
    }
    ​
    /**
     * 销售员:进货/卖货
     */
    class Seller{
        private int product = 0;
        Lock lock = new ReentrantLock();
        private Condition condition = lock.newCondition();
    ​
        public void get(){
            lock.lock();
            try {
                while (product >=1){//为避免线程虚假唤醒,用while替代if notifyAll和signalAll都有可能出现虚假唤醒使用时注意
                    try {
                        System.out.println("仓库已满");
    //                  synchronized线程等待用  this.wait();
                        condition.await();
                    } catch (InterruptedException e) {
    ​
                    }
                }
                System.out.println(Thread.currentThread().getName() + " : " + ++product);
    //           synchronized线程唤醒用 this.notifyAll();
                condition.signalAll();
            }finally {
                lock.unlock();
            }
    ​
        }
    ​
        public void sale(){
            lock.lock();
            try {
                while (product<=0){
                    System.out.println("缺货");
                    try {
    //                    this.wait();
                        condition.await();
                    } catch (InterruptedException e) {
                    }
                }
                System.out.println(Thread.currentThread().getName()+" : "+ --product);
    //            this.notifyAll();
                condition.signalAll();
            }finally {
                lock.unlock();
            }
        }
    }
    ​
    /**
     * 生产者
     */
    class Producter implements Runnable{
    ​
        private Seller seller;
    ​
        public Producter(Seller seller) {
             this.seller = seller;
        }
    ​
        @Override
        public void run() {
    ​
            for (int i = 0; i < 20; i++) {
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
    ​
                }
                seller.get();
            }
        }
    }
    ​
    /**
     * 消费者
     */
    class Consumer implements Runnable{
    ​
        private Seller seller;
    ​
        public Consumer(Seller seller) {
            this.seller = seller;
        }
    ​
        @Override
        public void run() {
    ​
            for (int i = 0; i < 20; i++) {
    ​
                seller.sale();
            }
        }
    }
    复制代码
  4. lock的实现类ReentrantLock&ReentrantReadWriteLock

    • ReentrantLock是一种互斥锁,即每次只有一个线程访问共享数据,上述案例采用的就是这种互斥锁。与之对应的是共享锁ReentrantReadWriteLock,他维护了两个锁:读锁和写锁。虽是共享锁,但是写锁也是独占锁,在修改共享数据时也是与其他线程互斥的。读锁是共享的,可以同时有大量线程读取共享数据,适用于大量读取的操作

    • 读写锁是为了区分读线程和写线程,使得对资源的控制更加地合理,因为读操作不会涉及到并发安全问题,所以多个读线程可以同时获得读锁,但是读线程占用锁时,写线程不能访问资源。写线程占用锁时,读线程和其他写线程不能访问资源,这就既保证了读线程总能读到最新数据。资源的方法控制也更加合理。

    • ReentrantReadWriteLock使用案例代码

      public class ReadWriteLockTest {
      ​
          public static void main(String[] args) {
              ReadWriteLockDemo demo = new ReadWriteLockDemo();
              //一个写入线程
               new Thread(()->{
                   demo.setNum((int)(Math.random() * 100));
               },"写线程").start();
      ​
               //50个读取线程
              for (int i = 0; i < 50; i++) {
                  new Thread(()->{
                      demo.getNum();
                  },"读线程").start();
              }
          }
      }
      ​
      ​
      class ReadWriteLockDemo{
          int num = 0;
          //可通过fair参数来构建公平锁和非公平锁,不指定默认false
          ReentrantReadWriteLock rw =  new ReentrantReadWriteLock();
      ​
          public void getNum() {
              //读锁
              rw.readLock().lock();
      ​
              try {
                  System.out.println(Thread.currentThread().getName()+"读取 : " + num);
              }finally {
                  rw.readLock().unlock();
              }
      ​
          }
      ​
          public void setNum(int num) {
              //写锁
              rw.writeLock().lock();
      ​
              try {
                  this.num = num;
                  System.out.println(Thread.currentThread().getName() + "写入 :" + this.num);
              }finally {
                  rw.writeLock().unlock();
              }
      ​
      ​
      ​
      ​
          }
      }
      复制代码
  5. StampedLock

    • StampedLock是为了优化可重入读写锁性能的一个锁实现工具,jdk8开始引入,相比于普通的ReentranReadWriteLock主要多了一种乐观读的功能,在API上增加了stamp的入参和返回值,不支持重入
    • 在ReentrantReadWriteLock读写是互斥的,相当于是一次悲观的读锁,StampedLock是一种乐观的读锁,即在加锁前先判断有没有写入新值,如果写入,则加入悲观读锁,读的过程中禁止写入,显然乐观读锁的并发性能更好
  6. 示例代码

    class StampedLockDemo{
    ​
        int num = 0;
    ​
        StampedLock lock = new StampedLock();
    ​
        public void getNum() {
            //读锁
            long version = lock.tryOptimisticRead();//获取一个乐观锁
    ​
            if (!lock.validate(version)){//检查是否被修改,如果被修改,则加上悲观读锁,否则正常执行逻辑
                long l = lock.readLock();
                try {
                    System.out.println(Thread.currentThread().getName()+"读取 : " + num);
                }finally {
                    lock.unlockRead(l);
                }
            }
    ​
        }
    ​
        public void setNum(int num) {
            //写锁
            long l = lock.writeLock();
    ​
            try {
                this.num = num;
                System.out.println(Thread.currentThread().getName() + "写入 :" + this.num);
            }finally {
                lock.unlockWrite(l);
            }
    ​
        }
    }
    复制代码

七、线程八锁

  1. 关键因素

    • 非同步方法不具备排他性,即可以多个线程同时访问
    • 同步锁或者同步方法中非静态方法同步锁对象为实例对象本身,即我们new的对象,静态方法的锁对象为类对象本身,即Class实例
    • 同步锁具有互斥排他性,所以在某一时刻内,只能有一个线程持有锁。
  2. 验证代码

    /*
     * 题目:判断打印的 "one" or "two" ?
     *
     * 1. 两个普通同步方法,两个线程,标准打印, 打印? //one  two
     * 2. 新增 Thread.sleep() 给 getOne() ,打印? //one  two
     * 3. 新增普通方法 getThree() , 打印? //three  one   two
     * 4. 两个普通同步方法,两个 Number 对象,打印?  //two  one
     * 5. 修改 getOne() 为静态同步方法,打印?  //two   one
     * 6. 修改两个方法均为静态同步方法,一个 Number 对象?  //one   two
     * 7. 一个静态同步方法,一个非静态同步方法,两个 Number 对象?  //two  one
     * 8. 两个静态同步方法,两个 Number 对象?   //one  two
     *
     * 线程八锁的关键:
     * ①非静态方法的锁默认为  this,  静态方法的锁为 对应的 Class 实例
     * ②某一个时刻内,只能有一个线程持有锁,无论几个方法。
     */
    public class Thread8MonitorTest {
        public static void main(String[] args) {
            Number number = new Number();
            Number number2 = new Number();
    ​
            new Thread(new Runnable() {
                @Override
                public void run() {
                    number.getOne();
                }
            }).start();
    ​
            new Thread(new Runnable() {
                @Override
                public void run() {
    //        number.getTwo();
                    number2.getTwo();
                }
            }).start();
    ​
        /*new Thread(new Runnable() {
          @Override
          public void run() {
            number.getThree();
          }
        }).start();*/
    ​
        }
    ​
    }
    ​
    class Number{
    ​
        public static synchronized void getOne(){//Number.class
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
            }
    ​
            System.out.println("one");
        }
    ​
        public synchronized void getTwo(){//this
            System.out.println("two");
        }
    ​
        public void getThree(){
            System.out.println("three");
        }
    }
    复制代码

八、线程池

  1. 线程

    • 概念:程序运行的最小单位。进程是一个应用程序的运行,线程是应用程序中运行的最小单位

    • 生命周期:新建、就绪、运行、阻塞、销毁

      1. 新建:就是刚使用new方法,new出来的线程
      2. 就绪:就是调用的线程的start()方法后,这时候线程处于等待CPU分配资源阶段,谁先抢的CPU资源,谁开始执行
      3. 运行:当就绪的线程被调度并获得CPU资源时,便进入运行状态,run方法定义了线程的操作和功能
      4. 阻塞:在运行状态的时候,可能因为某些原因导致运行状态的线程变成了阻塞状态,比如sleep()、wait()之后线程就处于了阻塞状态,这个时候需要其他机制将处于阻塞状态的线程唤醒,比如调用notify或者notifyAll()方法。唤醒的线程不会立刻执行run方法,它们要再次等待CPU分配资源进入运行状态
      5. 销毁:如果线程正常执行完毕后或线程被提前强制性的终止或出现异常导致结束,那么线程就要被销毁,释放资源

      img

  2. 线程池

    • 概念:在高并发情况下,不停的创建销毁线程是非常耗费性能的,线程池提供了一个线程队列,队列中保持所有等待的线程,避免了创建和销毁的性能损失,提高响应速度

    • 线程池结构:

      java.util.concurrent.Executor:负责线程使用与调度的跟接口
        |--ExecutorService:子接口,线程池的主要接口
          |--ThreadPoolExecutor:线程池的实现类
          |--ScheduledExecutorService:子接口,负责线程调度
            |--ScheduledThreadPoolExecutor:继承 ThreadPoolExecutor, 实现 ScheduledExecutorService
      复制代码
    • 线程的创建&常见线程池

      1. Executors:是我们创建线程池的工具类,给我们提供了各种常见线程池的创建方法。类似的有集合工具了:Collections、数组工具类:Arrays、文件工具类:Files等

      2. 常见线程池

        • Executors.newCachedThreadPool():缓存线程池,线程池的数量不固定,可以根据需求自动的更改数量(常用)
        • Executors.newFixedThreadPool():固定大小的线程池
        • Executors.newSingleThreadExecutor():创建单个线程池,只有一个线程
        • Executors.newScheduledThreadPool():创建固定大小的线程,可以延迟或定时的执行任务
    • 线程池主要参数

      1. corePoolSize:线程池核心线程大小,即线程池中维护最小的线程数量,即使这些线程处理空闲状态,他们也不会被销毁,除非设置了allowCoreThreadTimeOut。这里的最小线程数量即是corePoolSize

      2. maximumPoolSize:线程池最大线程数量,一个任务被提交到线程池以后,首先会找有没有空闲存活线程,如果有则直接将任务交给这个空闲线程来执行,如果没有则会缓存到工作队列(后面会介绍)中,如果工作队列满了,才会创建一个新线程,然后从工作队列的头部取出一个任务交由新线程来处理,而将刚提交的任务放入工作队列尾部。线程池不会无限制的去创建新线程,它会有一个最大线程数量的限制,这个数量即由maximunPoolSize指定。

      3. keepAliveTime:空闲线程存活时间,一个线程如果处于空闲状态,并且当前的线程数量大于corePoolSize,那么在指定时间后,这个空闲线程会被销毁,这里的指定时间由keepAliveTime来设定

      4. unit:空闲线程存活时间单位,即keepAliveTime时间单位

      5. workQueue:工作队列,新任务被提交后,会先进入到此工作队列中,任务调度时再从队列中取出任务。jdk中提供了四种工作队列:

        • ArrayBlockingQueue:基于数组的有界阻塞队列,按FIFO排序。新任务进来后,会放到该队列的队尾,有界的数组可以防止资源耗尽问题。当线程池中线程数量达到corePoolSize后,再有新任务进来,则会将任务放入该队列的队尾,等待被调度。如果队列已经是满的,则创建一个新线程,如果线程数量已经达到maxPoolSize,则会执行拒绝策略
        • LinkedBlockingQuene:基于链表的无界阻塞队列(其实最大容量为Interger.MAX),按照FIFO排序。由于该队列的近似无界性,当线程池中线程数量达到corePoolSize后,再有新任务进来,会一直存入该队列,而不会去创建新线程直到maxPoolSize,因此使用该工作队列时,参数maxPoolSize其实是不起作用的。
        • SynchronousQuene:一个不缓存任务的阻塞队列,生产者放入一个任务必须等到消费者取出这个任务。也就是说新任务进来时,不会缓存,而是直接被调度执行该任务,如果没有可用线程,则创建新线程,如果线程数量达到maxPoolSize,则执行拒绝策略。
        • PriorityBlockingQueue:具有优先级的无界阻塞队列,优先级通过参数Comparator实现。
      6. threadFactory: 线程工厂,创建一个新线程时使用的工厂,可以用来设定线程名、是否为daemon线程等等

      7. handler :拒绝策略,当工作队列中的任务已到达最大限制,并且线程池中的线程数量也达到最大限制,这时如果有新任务提交进来,该如何处理呢。这里的拒绝策略,就是解决这个问题的,jdk中提供了4中拒绝策略:

        • CallerRunsPolicy:该策略下,在调用者线程中直接执行被拒绝任务的run方法,除非线程池已经shutdown,则直接抛弃任务。
        • AbortPolicy:该策略下,直接丢弃任务,并抛出RejectedExecutionException异常。
        • DiscardPolicy:该策略下,直接丢弃任务,什么都不做。
        • DiscardOldestPolicy:该策略下,抛弃进入队列最早的那个任务,然后尝试把这次拒绝的任务放入队列
    • 线程池的使用示例代码:1、创建线程池 2、添加任务 3、关闭线程池

      public class ThreadPoolTest {
      ​
          public static void main(String[] args) throws Exception{
      ​
              //1、创建线程池(固定大小线程池)
              ExecutorService pool = Executors.newFixedThreadPool(5);
              //调度线程池
              ScheduledExecutorService scheduledPool = Executors.newScheduledThreadPool(5);
              //2、添加任务,选择Runable或者Callable
              /**
               * Runable方式
               */
              for (int i = 0; i < 100; i++) {
                  pool.submit(() -> {
                      //打印当前线程名
                      System.out.println(Thread.currentThread().getName());
                  });
              }
      ​
              /**
               * Callable方式:对比Runable方式,可有返回值,可抛出异常
               */
              for (int i = 0; i < 5; i++) {
                  /**
                   * 第一个参数:callable实现
                   * 第二个参数:延迟时间
                   * 第三个参数:时间单位
                   */
                  ScheduledFuture<Integer> schedule = scheduledPool.schedule(() -> (int) (Math.random() * 100), 2, TimeUnit.SECONDS);
                  System.out.println(schedule.get());
              }
              /**
               * 周期执行
               * 第一个参数:Runable实现
               * 第二个参数:初始延迟时间
               * 第三个参数:周期时间,即5秒执行一次
               * 第三个参数:时间单位
               */
              scheduledPool.scheduleAtFixedRate(() -> System.out.println((int) (Math.random() * 100)),1,5,TimeUnit.SECONDS);
      ​
              //3、关闭线程池
              pool.shutdown();
              //测试周期延迟,可暂时不关闭线程池
              scheduledPool.shutdown();
          }
      }
      复制代码

九、ForkJoin分支/合并框架

  1. 概念

    从JDK1.7开始,Java提供Fork/Join框架用于并行执行任务,它的思想就是讲一个大任务分割成若干小任务,最终汇总每个小任务的结果得到这个大任务的结果。主要两个步骤:1、任务拆分 2、结果合并

    image-20210910155412266.png

  2. 工作窃取模式(work-stealing)

    工作窃取(work-stealing)算法是指某个线程从其他队列里窃取任务来执行。工作窃取的运行流程图如下:

    img

    假如我们需要做一个比较大的任务,我们可以把这个任务分割为若干互不依赖的子任务,为了减少线程间的竞争,于是把这些子任务分别放到不同的队列里,并为每个队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应,比如A线程负责处理A队列里的任务。但是有的线程会先把自己队列里的任务干完,而其他线程对应的队列里还有任务等待处理。干完活的线程与其等着,不如去帮其他线程干活,于是它就去其他线程的队列里窃取一个任务来执行。而在这时它们会访问同一个队列,所以为了减少窃取任务线程和被窃取任务线程之间的竞争,通常会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行。

  3. 类结构&API接口

    • 类结构

    image-20210910160027664.png

    • API接口

      ForkJoinTask代表运行在ForkJoinPool中的任务。

      主要方法:

      • fork() 在当前线程运行的线程池中安排一个异步执行。简单的理解就是再创建一个子任务。
      • join() 当任务完成的时候返回计算结果。
      • invoke() 开始执行任务,如果必要,等待计算完成。

      子类:

      • RecursiveAction 一个递归无结果的ForkJoinTask(没有返回值)
      • RecursiveTask 一个递归有结果的ForkJoinTask(有返回值)
  4. 代码示例

    public class TestForkJoinPool {
      
      public static void main(String[] args) {
        Instant start = Instant.now();
        //创建pool
        ForkJoinPool pool = new ForkJoinPool();
        
        ForkJoinTask<Long> task = new ForkJoinSumCalculate(0L, 50000000000L);
        //执行任务
        Long sum = pool.invoke(task);
        
        System.out.println(sum);
        
        Instant end = Instant.now();
        
        System.out.println("耗费时间为:" + Duration.between(start, end).toMillis());
      }
      
    ​
    /**
     *RecursiveAction无返回值
      RecursiveTask有返回值
     *
    **/
    class ForkJoinSumCalculate extends RecursiveTask<Long>{
    ​
      private static final long serialVersionUID = -259195479995561737L;
      
      private long start;
      private long end;
      
      private static final long THURSHOLD = 10000L;  //临界值
      
      public ForkJoinSumCalculate(long start, long end) {
        this.start = start;
        this.end = end;
      }
    ​
      @Override
      protected Long compute() {
        long length = end - start;
        
        if(length <= THURSHOLD){
          long sum = 0L;
          
          for (long i = start; i <= end; i++) {
            sum += i;
          }
          
          return sum;
        }else{
          long middle = (start + end) / 2;
          
          ForkJoinSumCalculate left = new ForkJoinSumCalculate(start, middle); 
          left.fork(); //进行拆分,同时压入线程队列
          
          ForkJoinSumCalculate right = new ForkJoinSumCalculate(middle+1, end);
          right.fork(); //
          
          return left.join() + right.join();
        }
      }
      
    }
    复制代码

十、CompletableFuture

  1. 概念

    CompletableFuture是java.util.concurrent库在java 8中新增的主要工具,同传统的Future相比,其支持流式计算、函数式编程、完成通知、自定义异常处理等很多新的特性。(java 11的CompletableFuture新增了一些方法)。CompletableFuture实现了Future和CompletionStage两个接口。CompletableFuture强大的功能大多来自CompletionStage

    public class CompletableFuture<T> implements Future<T>, CompletionStage<T>
    复制代码

    CompletionStage任务关系可分为如下几种:

    • 串行执行关系
    • 并行执行关系
    • AND 汇聚关系
    • OR 汇聚关系
  2. API介绍

    • 创建
      public static <U> CompletableFuture<U> completedFuture(U value)//同步创建带有默认值实例
      /**
      异步创建实例
      **/
      public static CompletableFuture<Void> runAsync(Runnable runnable)
      public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
      public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
      public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
      复制代码

      示例代码:

      CompletableFuture.completedFuture(10);
      CompletableFuture.supplyAsync(() -> (int) (Math.random() * 100));//没有指定线程池默认ForkJoinPool
      CompletableFuture.runAsync(()-> System.out.println("指定线程池"), Executors.newSingleThreadExecutor());
      复制代码
      • 没有指定线程池情况,默认使用ForkJoinPool.commonPool() ,线程数默认cpu核数。在任务比较复杂的情况下,共享线程池一旦有任务阻塞,其他任务将没有机会执行,所以对于复杂业务建议指定线程池构建构建实例,进行资源隔离,避免互相干扰
      • runAsync方法不支持返回值。
      • supplyAsync可以支持返回值。
    • 设置任务结果
       boolean complete(T value)//设置执行结果,若返回true说明设置成功,false设置失败
       boolean completeExceptionally(Throwable ex)//设置异常信息
      复制代码

      设置返回结果:输出:修改完的结果 : true

      try {
                  CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(() -> "初始化结果");
                  boolean complete = supplyAsync.complete("修改完的结果");
                  System.out.println(supplyAsync.get() + " : " + complete);
              } catch (InterruptedException e) {
                  e.printStackTrace();
              } catch (ExecutionException e) {
                  e.printStackTrace();
              }
      复制代码

      设置异常信息:输出:自定义异常信息

      try {
                  CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(() -> "初始化结果");
                  boolean b = supplyAsync.completeExceptionally(new RuntimeException("啊,我挂了!"));
                  System.out.println(supplyAsync.get() + " : " + b);
              } catch (InterruptedException e) {
                  e.printStackTrace();
              } catch (ExecutionException e) {
                  e.printStackTrace();
              }
      复制代码

      注意:1、只能设置结果一次,并且不能修改,重复设置会返回false

      2、设置异常信息,调用get()返回的是异常,需要捕获异常信息。

    • 串行执行关系
      //参数为Function函数,输入即为上一步的输出结果,并且可以返回计算后的结果
      public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
      public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
      public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
      //参数为Consumer函数,输入为上一步的输出结果,对此结果进行消费,无返回值
      public CompletableFuture<Void> thenAccept(Consumer<? super T> action) 
      public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)
      public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor)
      //入参为Runnable,并不受上一步结果的影响,等上一步完成即处理预定义操作
      public CompletableFuture<Void> thenRun(Runnable action)
      public CompletableFuture<Void> thenRunAsync(Runnable action)
      public CompletableFuture<Void> thenRunAsync(Runnable action,Executor executor)
      //与thenApply相似参数为Function函数,返回为CompletionStage类型,有点类似Stream里面的flatMap做对象转换
      public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn)
      public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn)
      public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn,Executor executor)
      复制代码

      实例代码

      CompletableFuture<Integer> async = CompletableFuture.supplyAsync(() -> (int) (Math.random() * 100));
      ​
              async.thenAccept(System.out::println);
              //输出结果为下一步的输入
              async.thenApply(p->{
                          System.out.println("thenApply 执行");
                          return "thenApply执行后结果:"+ p;
                      })
                      //入参为上一步的输出,并打印消费
                      .thenAccept(System.out::println)
                      //开启新线程,打印输出
                      .thenRun(()-> System.out.println("run 执行结果"));
              //转换数据类型,并开启了新的线程
              CompletableFuture<String> compose = async.thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " & thenCompose 执行"));
              compose.thenAccept(System.out::println);
              compose.join();
      复制代码

      注意:

      • xxx():表示该方法将继续在已有的线程中执行;

      • xxxAsync():表示将异步在线程池中执行。

      • 重载方法中有Executor入参,即可以指定线程池类型

    • AND 汇聚关系:所有任务执行完毕汇总以后,执行下一个任务,有点类似CountDownLatch
      //参数为CompletionStage和BiFunction,两个线程执行完毕之后的结果作为输入,并且有返回值
      public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn)
        
      //参数为CompletionStage和BiConsumer,两个线程执行完毕之后的结果作为输入,无返回值
      public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action)//参数为CompletionStage和Runnable,两个线程执行完毕之后,执行runable预定义操作
      public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other,Runnable action)
        
      //静态方法,不定参,可以合并多个CompletableFuture,但是返回值为void,可以用来实现CountDownLatch,所有线程处理完成统一执行后续任务。
      public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) 
      复制代码

      示例代码

      //创建两个CompletableFuture
      CompletableFuture<Integer> async = CompletableFuture.supplyAsync(() -> 10);
      CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "输出结果:");
      //先合并两个线程执行结果,并打印输出
      async.thenCombine(future,(a,f)->f+a)
              .thenAccept(System.out::println);
      //消费两个线程执行结果
      async.thenAcceptBoth(future,(a,f)->System.out.println(f+a));
      //等待两个线程执行完毕,执行预定义操作
      async.runAfterBoth(future,()-> System.out.println("两个线程执行完毕之后的预定义操作。。。。"));
      //合并多个线程
      CompletableFuture<Void> all = CompletableFuture.allOf(async, future);
      复制代码
    • OR 汇聚关系:多个任务中任意任务完成即可进行下一步
      //参数为CompletionStage和Function,任意线程执行结束,获取结果执行业务逻辑,有返回值
      public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn)
      //参数为CompletionStage和Consumer,任意线程执行结束,获取结果进行消费,无返回值
      public CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action)//参数为CompletionStage和Runnable,任意线程执行结束,执行runable预定义操作
      public CompletableFuture<Void> runAfterEither(CompletionStage<?> other,Runnable action)
        
      //不定参,任意任务出结果即可
      public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
      复制代码

      示例代码

      //创建两个CompletableFuture并延迟几秒模拟不同线程处理的先后顺序
              CompletableFuture<String> async = CompletableFuture.supplyAsync(() ->{
                  try {
                      Thread.sleep(500);
                  } catch (InterruptedException e) {
                      e.printStackTrace();
                  }
                  System.out.println("async is complete...");
                  return "async is complete...";
              });
              CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
      ​
                  try {
                      Thread.sleep(800);
                  } catch (InterruptedException e) {
                      e.printStackTrace();
                  }
                  System.out.println("future is complete...");
                  return "future is complete...";
              });
      ​
              //获取两个线程最先运行完毕的结果并返回打印
              async.applyToEither(future,p->p)
                      .thenAccept(System.out::println);
              //获取两个线程最先运行完毕的结果并消费
              async.acceptEither(future,System.out::println);
              //两个线程有一个执行完即执行预定义操作
              async.runAfterEither(future,()->System.out.println("任意线程执行完,执行预定义操作。。。。"));
      ​
              CompletableFuture.anyOf(async,future)
                      .thenAccept(System.out::println);
      复制代码
    • 异常处理

      有时我们需要捕获线程运行中的异常信息来进行其他业务处理,普通的try显得很冗余,CompletableFuture为我们提供了优雅的处理异常的方法。

      //参数为BiConsumer,有点想finally代码块无论是否发生异常都会执行
      public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action)//参数为Function,有点类似catch代码块,捕获异常,然后返回处理结果
      public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn)//handle与whenComplete相比有返回值
      public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn) 
      复制代码

      示例代码

      CompletableFuture<String> async = CompletableFuture.supplyAsync(() -> {
                  throw new RuntimeException("hello is exception...");
      //            return "hello world";
              });
              //可消费结果并处理异常,无返回值
              async.whenComplete((s,e)->{
                  //消费结果
                  System.out.println(s + " && deal result...");
                  //处理异常
                  System.out.println(e.getMessage() + " && deal exception...");
              });
              
              //捕获异常并处理
              async.exceptionally(e->e.getMessage())
                      .thenAccept(System.out::println);
              //有返回值
              async.handle((s,e)->{
                  System.out.println(s);
                  System.out.println(e);
                  return StringUtils.isEmpty(s)?e.getMessage() + " && deal exception..." : s + " && deal data...";
              })
              .thenAccept(System.out::println);
      ​
              async.join();
      复制代码

源码地址:源码地址