ReentrantLock公平锁获取同步状态实现分析

lock()

同步器依赖内部的同步队列来完成同步状态的管理,当前线程获取同步状态失败时,同步器会将当前线程以及等待状态等信息构造成一个节点并将其加入到同步队列,同时会阻塞当前线程,当同步线程释放时,会唤醒等待的节点,使其再次尝试获取同步状态。

同步队列添加节点的过程如下图所示:

image

具体代码如下:

private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        Node pred = tail;
    	// 判断tail是否为空,若不为空,CAS设置尾结点
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
    	// 若tail为空,则证明需要初始化队列。调用enq()方法
        enq(node);
        return node;
    }
private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { 
                // 构建一个空的Node将head指向这个空Node,并且也让tail指向这个空Node这时tail==head
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                // 这部分的逻辑与addWaiter()处的一致,只是返回值不同addWaiter()返回当前节点,enq()返回compareAndSetTail()之前的tail
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

enq()方法,为什么要用一个无限for循环呢?这个点之前我没有注意到,现在仔细想了一下,我认为有以下两点:

  1. 预防编译器报错,我们可以看到这个方法是有一个返回值的,但是在if中并没有返回值,这在编译器中是要报错的,但是加上for循环就不会了。
  2. 当head和tail初始完成之后需要再执行往队列中添加节点和设置tail的操作,并且在多线程的环境中,CAS设置head失败应该继续执行往队列中添加节点和设置tail的操作。
 final void lock() {
     acquire(1);
}

获取同步状态时首先会调用acquire(int arg)方法,我们看看这个方法中的逻辑

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
}

acquire(int arg)方法调用了四个方法,我们一一来看,首先是tryAcquire()

protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
    		// c == 0 表示当前同步状态没有被任何线程持有,可以开始获取同步状态
            if (c == 0) {
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
    		// c != 0 表明已有线程获取到了同步状态并判断是否是当前线程获取到了同步状态
    		// 则可以直接令同步状态增加(可重入性),不需要CAS操作,因为是独占锁,获取到同步状态的情况下不会到导致并发问题
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    // 判断是否超过了int的最大值
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    }

公平锁与非公平锁的不同就是多了hasQueuedPredecessors()方法,这个方法是判断当前节点是否有前驱节点,没有线程持有同步状态时,只有队列为空或者位于队列第一个时才去获取同步状态,否则进行排队。

public final boolean hasQueuedPredecessors() {
    	Node t = tail;
        Node h = head;
        Node s;
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }

接下来看acquire(int arg)方法调用的第三个方法acquireQueued(),若tryAcquire()返回false,即获取同步状态失败时会调用此方法,我们来看看实现逻辑:

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                // 获取当前节点的前驱节点,判断是否是head节点,如果是,尝试获取同步状态
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                // 前驱节点不是head节点,或者同步状态获取失败,则将阻塞,等待唤醒
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

shouldParkAfterFailedAcquire()方法会检查当前节点的前驱节点的等待状态waitStatus(具体看下图),当返回true时,会执行parkAndCheckInterrupt(),将当前节点阻塞。

image

acquireQueued()函数有一个返回值,表示什么意思呢?虽然该函数不会中断响应,但它会记录被阻塞期间有没有其他线程向它发送过中断信号。如果有,则该函数会返回true;否则,返回false。

基于这个返回值,才有了下面的代码,也就是acquire(int arg)方法调用的第四个方法

static void selfInterrupt() {
        Thread.currentThread().interrupt();
    }

acquireQueued()返回true 时,会调用selfInterrupt(),自己给自己发送中断信号,也就是自己把自己的中断标志位设为true。之所以要这么做,是因为自己在阻塞期间,收到其他线程中断信号没有及时响应,现在要进行补偿。这样一来,如果该线程在lock代码块内部有调用sleep()之类的阻塞方法,就可以抛出异常,响应该中断信号。

private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

从代码可知调用park()方法,自己把阻塞起来。park()函数返回有两种情况:

  1. 其他线程调用了unpark(Thread t)
  2. 其他线程调用了t.interrupt()。这里要注意的是,lock()不能响应中断,但LockSupport.park()会响应中断。

也正因为LockSupport.park()可能被中断唤醒,acquireQueued()函数才写了一个for死循环。唤醒之后,如果发现自己排在队列头部,就去拿锁;如果拿不到锁,则再次自己阻塞自己。不断重复此过程,直到拿到锁。被唤醒之后,通过Thread.interrupted()来判断是否被中断唤醒。如果是情况1,会返回false;如果是情况2,则返回true。

若在获取同步状态的过程中,抛出异常,则会进入finally中,取消获取同步状态。

unlock()

unlock()会调用tryRelease()方法

public void unlock() {
        sync.release(1);
    }

我们继续往下看

public final boolean release(int arg) {
    // 释放同步状态
    if (tryRelease(arg)) {
        // 释放成功,唤醒其他等待的线程
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

来看看tryRelease()的具体实现

 protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
     		// 不是获取同步状态的线程调用unlock直接抛异常
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
     		// 排他锁  不需要CAS设置状态
            setState(c);
            return free;
        }

释放同步状态的过程就简单太多了。暂时就写到这里吧。


ReentrantLock公平锁获取同步状态实现分析
https://www.zhaojun.inkhttps://www.zhaojun.ink/archives/reentrantlock-fair-lock-analysis
作者
卑微幻想家
发布于
2021-06-01
许可协议