* 添加节点到等待队列, 如果有必须就初始化Node. * 自选CAS添加等待队列尾部. 直到成功 * @param node the node to insert * @return node's predecessor */ private Node enq(final Node node){ for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
* Creates and enqueues node for current thread and given mode. * 创建node并入队, 设置当期线程和指定锁的类型 是独占还是共享 * * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared * @return the new node */ private Node addWaiter(Node mode){ Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure // 尝试快速设置 将node加入队列尾. 如果失败了就交个end方法. 自旋去 Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
* * Fair version of tryAcquire. Don't grant access unless * recursive call or no waiters or is first. */ protectedfinalbooleantryAcquire(int acquires){ //acquires =1 final Thread current = Thread.currentThread(); //获得当期线程 int c = getState(); // 获得同步状态 if (c == 0) { // 0代表锁并未被占用 //如果等待队列中. 在当期线程前没有其他线程. 则用CAS获得锁 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { //更新成功. 将锁的占有线程设置为当期线程 setExclusiveOwnerThread(current); returntrue; } } //c!=0 说明锁被占用. 判断是否是重入的情况 elseif (current == getExclusiveOwnerThread()) { // state+1 int nextc = c + acquires; // 重入次数过大. 溢出了 if (nextc < 0) thrownew Error("Maximum lock count exceeded"); //设置状态 setState(nextc); returntrue; } returnfalse; }
publicfinalbooleanhasQueuedPredecessors(){ // The correctness of this depends on head being initialized // before tail and on head.next being accurate if the current // thread is first in queue. Node t = tail; // Read fields in reverse initialization order Node h = head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); }
privatestaticbooleanshouldParkAfterFailedAcquire(Node pred, Node node){ int ws = pred.waitStatus;//获得node的前节点的等待状态 if (ws == Node.SIGNAL) // 如果是SIGNAL(-1): 后置节点需要唤醒. 自己就可以放心阻塞了. 将来会唤醒我的. /* * This node has already set status asking a release * to signal it, so it can safely park. */ returntrue; if (ws > 0) { // 也就是CANCELLED 取消状态. 说明前置节点因为中断或者取消. 需要在等待队列中移除 /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. * 知道遇到前置节点的状态小于等为止. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); // 将前置节点的后置节点设置成自己. pred.next = node; } else { //如果既不是SIGNAL也不是CANCELED. 那么就将前置节点的waitStatus设置成SIGNAL. /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } returnfalse; } //阻塞在这里. 并检查线程是否被中断. privatefinalbooleanparkAndCheckInterrupt(){ LockSupport.park(this); return Thread.interrupted(); }
protectedfinalbooleantryRelease(int releases){ int c = getState() - releases; // 将当期状态-1 //获得锁的线程和释放锁的线程不一样. 抛异常 if (Thread.currentThread() != getExclusiveOwnerThread()) thrownew IllegalMonitorStateException(); boolean free = false; // c ==0 表示锁处于空闲状态了 if (c == 0) { free = true; // 将锁的拥有者设置为null setExclusiveOwnerThread(null); } // 将状态设置为 更新状态值 setState(c); return free; }
privatevoidunparkSuccessor(Node node){ /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. 获得首节点的状态 */ int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); // 将状态设置为0 表示后置节点即将被唤醒
/* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. * */ Node s = node.next; //得到后置节点 //当后置节点为null或者s.waitStats=1 也就是取消状态时 if (s == null || s.waitStatus > 0) { s = null; //从尾部开始向前遍历. 直到找到一个waitStatus<=0的. 并将其唤醒 for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } //正常状态下 直接唤醒后置节点的线程 if (s != null) LockSupport.unpark(s.thread); }
privatevoiddoAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); //将当期线程加入等待队列尾部. 和独占锁的流程一样, 区别在于mode是共享锁类型 boolean failed = true; try { for (;;) { // node也就是当期节点的前置节点 final Node p = node.predecessor(); if (p == head) { // 当期节点的前置节点如果是head int r = tryAcquireShared(arg); // 尝试获得共享锁 这里当然也是失败的 if (r >= 0) { //成功 键 setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } // 如果node的前置节点不是head节点. or 获取共享锁失败. if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) thrownew InterruptedException(); } } finally { if (failed) cancelAcquire(node); } } //这个方法和独占锁一样, 都是判断自己是否要阻塞 privatestaticbooleanshouldParkAfterFailedAcquire(Node pred, Node node){ int ws = pred.waitStatus;// 前置节点的等待状态 if (ws == Node.SIGNAL) // SIGNAL的化 自己就可以安心等待被唤醒了 /* * This node has already set status asking a release * to signal it, so it can safely park. */ returntrue; if (ws > 0) { // 如果是CANCELED. 进入循环向前查找. 知道找到一个<0的节点 /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { //都不是 也就是锁状态可能是 0 -2 -3 /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); // 将前置节点设置为SIGNAL. } returnfalse; } //将自己阻塞掉. 唤醒时会返回中断状态. privatefinalbooleanparkAndCheckInterrupt(){ LockSupport.park(this); return Thread.interrupted(); }
publicfinalbooleanreleaseShared(int arg){ if (tryReleaseShared(arg)) { //尝试一次释放锁 doReleaseShared(); returntrue; } returnfalse; } protectedbooleantryReleaseShared(int releases){ // Decrement count; signal when transition to zero // 自旋的释放锁 知道count ==0 for (;;) { int c = getState(); // 获取共享锁状态 if (c == 0) returnfalse; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } } privatevoiddoReleaseShared(){ /* * Ensure that a release propagates, even if there are other * in-progress acquires/releases. This proceeds in the usual * way of trying to unparkSuccessor of head if it needs * signal. But if it does not, status is set to PROPAGATE to * ensure that upon release, propagation continues. * Additionally, we must loop in case a new node is added * while we are doing this. Also, unlike other uses of * unparkSuccessor, we need to know if CAS to reset status * fails, if so rechecking. * 释放锁之后 将等待队列中的线程唤醒 */ for (;;) { // 唤醒下一个等待的节点. 自旋是为了保证state能够设置成功. Node h = head; // 拿到首节点 if (h != null && h != tail) { //哨兵 int ws = h.waitStatus; // 获得首节点的等待状态 if (ws == Node.SIGNAL) { // ==SIGNAL CAS的将状态设置为0 直到成功 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); //唤醒 } elseif (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } } //唤醒逻辑 privatevoidunparkSuccessor(Node node){ /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; //获得当期节点的等待状态 if (ws < 0) // 小于0 将其设置为0 compareAndSetWaitStatus(node, ws, 0);
/* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Node s = node.next; // 获得后置节点 if (s == null || s.waitStatus > 0) { //如果是null或者 是CANCELED. 从尾部开始向前遍历. 直到找到下于等于0的节点 s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) //唤醒他 LockSupport.unpark(s.thread); }
//将当期节点设置为head. 并尝试唤醒并释放后续节点 privatevoidsetHeadAndPropagate(Node node, int propagate){ Node h = head; // Record old head for check below setHead(node);// 将自己设置为head /* * Try to signal next queued node if: * Propagation was indicated by caller, * or was recorded (as h.waitStatus either before * or after setHead) by a previous operation * (note: this uses sign-check of waitStatus because * PROPAGATE status may transition to SIGNAL.) * and * The next node is waiting in shared mode, * or we don't know, because it appears null * * The conservatism in both of these checks may cause * unnecessary wake-ups, but only when there are multiple * racing acquires/releases, so most need signals now or soon * anyway */ //判断是否要唤醒后置节点. if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; //唤醒后置节点. 也就是唤醒B if (s == null || s.isShared()) doReleaseShared(); } }
近期评论