ReentrantReadWriteLock介绍
在大多数场景下都是读服务多点,写服务相对来说少点,而且读服务不存在竞争的情况,java提供了另外一个接口,就是今天的要说的ReentrantReadWriteLock(读写锁)
读写锁允许同一时刻被多个读线程访问,但是在写线程访问时,所有的读线程和其他的写线程都会被阻塞同一时间允许多个线程同时访问
ReentrantReadWriteLock 分为读锁和写锁两个实例,读锁是共享锁,可被多个线程同时使用,写锁是独占锁。持有写锁的线程可以继续获取读锁,反之不行。
读写锁的主要特性如下:
- 公平性:支持公平性和非公平性
- 重入性:支持重入,读写锁最多支持65535个递归写入锁和65535个递归读取锁
- 锁降级:遵循获取写锁、获取读锁在释放写锁的次序,写锁能够降级成为读锁
源码分析
下面只截图部分源码,后面会按照功能来详细分析源码
//内部类读锁
private final ReentrantReadWriteLock.ReadLock readerLock;
//内部类写锁
private final ReentrantReadWriteLock.WriteLock writerLock;
final Sync sync;
//构造函数,默认非公平
public ReentrantReadWriteLock() {
this(false);
}
//构造函数,公平
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
//返回写操作的锁
public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
//返回读操作的锁
public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; }
abstract static class Sync extends AbstractQueuedSynchronizer{
/**
* 省略其余源代码
*/
}
public static class WriteLock implements Lock, java.io.Serializable{
/**
* 省略其余源代码
*/
}
public static class ReadLock implements Lock, java.io.Serializable {
/**
* 省略其余源代码
*/
}
从上面的代码我们看出ReentrantReadWriteLock和ReentrantLock一样,大部分功能都是由Sync来完成的,所以ReentrantReadWriteLock实际上只有一个锁,只是在获取读锁和写锁的方式 不一样。读写锁其实就是ReadLock、WriteLock两个类,这两个类实现Lock接口
在ReentrantLock中使用state来表示同步状态,该值表示锁被一个线程重复获取的次数,但是读写锁内部维护的是是一对锁,那么如何用一个变量来表示了,大神采用的是高16位和低16位的方式来切分为2部分,高16位 表示读,低16位表示写,其运算过程源码如下:
static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
/** Returns the number of shared holds represented in count */
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
/** Returns the number of exclusive holds represented in count */
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
假如当前同步状态为N,那么写状态等于N& 0x0000FFFF(将高16位全部抹去),读状态等于S >>> 16,相当于无符号补0右移16位)
写锁
写锁的获取
写锁其实就是一个可重入的排它锁,实现写锁是通过重写AQS中的tryAcquire来实现的,其源码如下:
//WriteLock
public void lock() {
sync.acquire(1);
}
//AQS
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
//ReentrantReadWriteLock.Sync
protected final boolean tryAcquire(int acquires) {
/*
* Walkthrough:
* 1. If read count nonzero or write count nonzero
* and owner is a different thread, fail.
* 2. If count would saturate, fail. (This can only
* happen if count is already nonzero.)
* 3. Otherwise, this thread is eligible for lock if
* it is either a reentrant acquire or
* queue policy allows it. If so, update state
* and set owner.
*/
Thread current = Thread.currentThread();
//获取当前的同步状态(也可以说是获取当前锁的个数)
int c = getState();
//获取写锁的次数
int w = exclusiveCount(c);
//如果已经有线程持有了锁
if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0)
//返回false的则表示无法获取写锁,有如下两种情况
//c != 0 && w == 0 表示存在读锁,则获取写锁失败
//c != 0 && w == 0 && current != getExclusiveOwnerThread()//当前线程不是已经获取写锁的线程,而是被其他线程持有写锁,则写锁获取失败
//如果写锁为0(也就是存在读锁)或者持有锁的线程不是当前线程就返回失败
if (w == 0 || current != getExclusiveOwnerThread())
return false;
//如果写锁的数量大于最大数()超出最大范围
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
//这里不需要 CAS,能到这里的,只可能是写锁重入,不然在上面的 if 就拦截了
setState(c + acquires);
return true;
}
//写锁未被任何线程获取,当前线程可获取写锁
if (writerShouldBlock() ||!compareAndSetState(c, c + acquires))
return false;
//设置获取锁的线程为当前线程
setExclusiveOwnerThread(current);
return true;
}
来看下writerShouldBlock这个方法,
static final class FairSync extends Sync {
//公平模式下,那么如果阻塞队列有线程等待的话,就乖乖去排队
final boolean writerShouldBlock() {
return hasQueuedPredecessors();
}
.....
}
static final class NonfairSync extends Sync {
//非公平模式下,lock 的时候就可以直接用 CAS 去抢锁,抢不到再排队
final boolean writerShouldBlock() {
return false; // writers can always barge
}
......
}
写锁释放
写锁的释放是重写AQS的tryRelease方法,源码为:
//WriteLock
public void unlock() {
sync.release(1);
}
// AQS
public final boolean release(int arg) {
// 1. 释放锁
if (tryRelease(arg)) {
// 2. 如果独占锁释放"完全",唤醒后继节点
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
//ReentrantReadWriteLock.Sync
//释放锁很简单,因为写锁是独占锁,state减1就可以了
protected final boolean tryRelease(int releases) {
//释放的线程不是锁的持有这,则抛出异常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
//同步状态减去写状态
int nextc = getState() - releases;
//如果 exclusiveCount(nextc) == 0,也就是说包括重入的,所有的写锁都释放了,
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}
写锁释放锁的整个过程和独占锁ReentrantLock相似,每次释放均是减少写状态,当写状态为0时表示 写锁已经完全释放了,从而等待的其他线程可以继续访问读写锁,获取同步状态
读锁
读锁获取
相对于写锁来说,读锁不是独占式的,同一个时刻读锁可以被多个线程获取,是一种共享式锁,源码如下:
//ReadLock
public void lock() {
sync.acquireShared(1);
}
//AQS
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
然后我们会进入到ReentrantReadWriteLock中的内部类Sync的方法tryAcquireShared,源码如下:
protected final int tryAcquireShared(int unused) {
/*
* Walkthrough:
* 1. If write lock held by another thread, fail.
* 2. Otherwise, this thread is eligible for
* lock wrt state, so ask if it should block
* because of queue policy. If not, try
* to grant by CASing state and updating count.
* Note that step does not check for reentrant
* acquires, which is postponed to full version
* to avoid having to check hold count in
* the more typical non-reentrant case.
* 3. If step 2 fails either because thread
* apparently not eligible or CAS fails or count
* saturated, chain to version with full retry loop.
*/
//当前线程
Thread current = Thread.currentThread();
//exclusiveCount(c)计算写锁
//如果存在写锁,且获得写锁的线程不是当前线程,直接返回-1,即获取读锁失败
int c = getState();
if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current)
return -1;
//读锁的获取次数
int r = sharedCount(c);
//读锁获取是否需要被阻塞
if (!readerShouldBlock() &&
r < MAX_COUNT &&//判断溢出,最大为65535
compareAndSetState(c, c + SHARED_UNIT)) {//利用CAS将高16位加1,低16位不变,如果成功就表示获取了读锁
if (r == 0) {
//r == 0 说明此线程是第一个获取读锁的,或者前面的读锁都释放了
//记录 firstReader 为当前线程,及其持有的读锁数量:1
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
//说明是 firstReader 重入获取读锁(这非常简单,count 加 1 结束)
firstReaderHoldCount++;
} else {
//cachedHoldCounter 用于缓存最后一个获取读锁的线程
//如果 cachedHoldCounter 缓存的不是当前线程,设置为缓存当前线程的 HoldCounter
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)//到这里,那么就是 cachedHoldCounter 缓存的是当前线程,但是 count 为 0,
readHolds.set(rh);
rh.count++;
}
// return 大于 0 的数,代表获取到了共享锁
return 1;
}
return fullTryAcquireShared(current);
}
上述代码的最后有一个fullTryAcquireShared方法,那如何才能进入这个方法了
首先if分支中的readerShouldBlock方法要返回true,这里就要分为2种情况来看了
- FairSync
在公平模式中的源码如下:
final boolean readerShouldBlock() {
return hasQueuedPredecessors();
}
即阻塞队列中有其他元素在等待锁。
- NonFairSync
在非公平模式下的源码如下:
final boolean readerShouldBlock() {
/* As a heuristic to avoid indefinite writer starvation,
* block if the thread that momentarily appears to be head
* of queue, if one exists, is a waiting writer. This is
* only a probabilistic effect since a new reader will not
* block if there is a waiting writer behind other enabled
* readers that have not yet drained from the queue.
*/
return apparentlyFirstQueuedIsExclusive();
}
即判断阻塞队列中 head 的第一个后继节点是否是来获取写锁的,如果是的话,让这个写锁先来,避免写锁饥饿。
其次就是compareAndSetState(c, c + SHARED_UNIT) 这里 CAS 失败,存在竞争,可能是和另一个读锁获取竞争,当然也可能是和另一个写锁获取操作竞争。
然后就是调用fullTryAcquireShared()方法再次尝试,其源码如下:
final int fullTryAcquireShared(Thread current) {
/*
* This code is in part redundant with that in
* tryAcquireShared but is simpler overall by not
* complicating tryAcquireShared with interactions between
* retries and lazily reading hold counts.
*/
HoldCounter rh = null;
for (;;) {
int c = getState();
if (exclusiveCount(c) != 0) {//说明其他线程获取了写锁
if (getExclusiveOwnerThread() != current)
return -1;
// else we hold the exclusive lock; blocking here
// would cause deadlock.
} else if (readerShouldBlock()) {
//这里有2层含义:
//1.exclusiveCount(c) == 0:写锁没有被占用
//2.readerShouldBlock()为true:说明阻塞队列中有其他线程在等待
// Make sure we're not acquiring read lock reentrantly
// firstReader 线程重入读锁,直接到下面的 CAS
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else {
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
/// cachedHoldCounter 缓存的不是当前线程
// 那么到 ThreadLocal 中获取当前线程的 HoldCounter
// 如果当前线程从来没有初始化过 ThreadLocal 中的值,get() 会执行初始化
rh = readHolds.get();
// 如果发现 count == 0,也就是说,纯属上一行代码初始化的,那么执行 remove
// 然后往下两三行,乖乖排队去
if (rh.count == 0)
readHolds.remove();
}
}
if (rh.count == 0)
return -1;
}
//上面的代码其实只需要知道一点就是处理读锁重入的就可以了
}
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {
// 这里 CAS 成功,那么就意味着成功获取读锁了
// 下面需要做的是设置 firstReader 或 cachedHoldCounter
if (sharedCount(c) == 0) {
// 如果发现 sharedCount(c) 等于 0,就将当前线程设置为 firstReader
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
// 下面这几行,就是将 cachedHoldCounter 设置为当前线程
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}
需要注意的是 当写锁被其他线程获取后,读锁获取失败,否则获取成功利用CAS更新同步状态。另外,当前同步状态需要加上SHARED_UNIT((1 << SHARED_SHIFT)即0x00010000)的原因这是我们在上面所说的同步状态的高16位用来表示读锁被获取的次数。如果CAS失败或者已经获取读锁的线程再次获取读锁时,是靠fullTryAcquireShared方法实现的
读锁释放
读锁释放的实现主要通过方法tryReleaseShared,源码如下:
//ReadLock
public void unlock() {
sync.releaseShared(1);
}
// Sync
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared(); // 这句代码其实唤醒 获取写锁的线程,往下看就知道了
return true;
}
return false;
}
//Sync
protected final boolean tryReleaseShared(int unused) {
//当前线程
Thread current = Thread.currentThread();
//如果想要释放锁的线程为第一个获取锁的线程
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
// 如果等于 1,那么这次解锁后就不再持有锁了,把 firstReader 置为 null,给后来的线程用
// 为什么不顺便设置 firstReaderHoldCount = 0?因为没必要,其他线程使用的时候自己会设值
firstReader = null;
else
firstReaderHoldCount--;
} else {
// 判断 cachedHoldCounter 是否缓存的是当前线程,不是的话要到 ThreadLocal 中取
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
// 这一步将 ThreadLocal remove 掉,防止内存泄漏。因为已经不再持有读锁了
readHolds.remove();
if (count <= 0)
//lock() 一次,unlock() 好几次,抛出异常
throw unmatchedUnlockException();
}
//count减1
--rh.count;
}
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
// 如果 nextc == 0,那就是 state 全部 32 位都为 0,也就是读锁和写锁都空了
// 此时这里返回 true 的话,其实是帮助唤醒后继节点中的获取写锁的线程
return nextc == 0;
}
}
读锁释放的过程还是比较简单的,主要就是将 hold count 减 1,如果减到 0 的话,还要将 ThreadLocal 中的 remove 掉。
然后是在 for 循环中将 state 的高 16 位减 1,如果发现读锁和写锁都释放光了,那么唤醒后继的获取写锁的线程。
HoldCounter
在读锁获取锁和释放锁的过程中,我们一直都可以看到一个变量rh (HoldCounter ),该变量在读锁中扮演着非常重要的作用。
我们了解读锁的内在机制其实就是一个共享锁,为了更好理解HoldCounter ,我们暂且认为它不是一个锁的概念,而相当于一个计数器。一次共享锁的操作就相当于在该计数器的操作。获取共享锁,则该计数器 + 1,释放共享锁,该计数器 - 1。只有当线程获取共享锁后才能对共享锁进行释放、重入操作。所以HoldCounter的作用就是当前线程持有共享锁的数量,这个数量必须要与线程绑定在一起,否则操作其他线程锁就会抛出异常。我们先看HoldCounter的定义:
static final class HoldCounter {
int count = 0;
final long tid = getThreadId(Thread.currentThread());
}
HoldCounter 定义非常简单,就是一个计数器count 和线程 id tid 两个变量。按照这个意思我们看到HoldCounter 是需要和某给线程进行绑定了,我们知道如果要将一个对象和线程绑定仅仅有tid是不够的,而且从上面的代码我们可以看到HoldCounter 仅仅只是记录了tid,根本起不到绑定线程的作用。那么怎么实现呢?答案是ThreadLocal,定义如下:
static final class ThreadLocalHoldCounter
extends ThreadLocal<HoldCounter> {
public HoldCounter initialValue() {
return new HoldCounter();
}
}
通过上面代码HoldCounter就可以与线程进行绑定了。故而,HoldCounter应该就是绑定线程上的一个计数器,而ThradLocalHoldCounter则是线程绑定的ThreadLocal。从上面我们可以看到ThreadLocal将HoldCounter绑定到当前线程上,同时HoldCounter也持有线程Id,这样在释放锁的时候才能知道ReadWriteLock里面缓存的上一个读取线程(cachedHoldCounter)是否是当前线程。这样做的好处是可以减少ThreadLocal.get()的次数,因为这也是一个耗时操作。需要说明的是这样HoldCounter绑定线程id而不绑定线程对象的原因是避免HoldCounter和ThreadLocal互相绑定而GC难以释放它们(尽管GC能够智能的发现这种引用而回收它们,但是这需要一定的代价),所以其实这样做只是为了帮助GC快速回收对象而已。
锁降级
读写锁有一个特性就是锁降级,那何为锁降级?
遵循按照获取写锁,获取读锁再释放写锁的次序,写锁能够降级成为读锁,不支持锁升级,锁降级一定要遵循一定的次序
在获取读锁的方法tryAcquireShared(int unused)中,有一段代码就是来判读锁降级的:
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) {
// 看下这里返回 false 的情况:
// c != 0 && w == 0: 写锁可用,但是有线程持有读锁(也可能是自己持有)
// c != 0 && w !=0 && current != getExclusiveOwnerThread(): 其他线程持有写锁
// 也就是说,只要有读锁或写锁被占用,这次就不能获取到写锁
if (w == 0 || current != getExclusiveOwnerThread())
return false;
...
}
...
}
锁降级中读锁的获取释放为必要?肯定是必要的。
试想,假如当前线程A不获取读锁而是直接释放了写锁,这个时候另外一个线程B获取了写锁,那么这个线程B对数据的修改是不会对当前线程A可见的。如果获取了读锁,则线程B在获取写锁过程中判断如果有读锁还没有释放则会被阻塞,只有当前线程A释放读锁后,线程B才会获取写锁成功。