[Java 并发编程] 19. Read/Write Lock

2020-09-03

读写锁比其他同步锁要更加复杂一些。多个线程读相同的共享资源不会引发线程安全问题,所以允许共享资源被多个线程同时访问。但是如果有一个线程在写某个资源,需要保证没有线程在读取这个共享资源的数据。为了提高系统的性能,我们可以使用读写锁,在读取资源的时候允许多个线程访问共享资源,在写资源的时候保证多个线程是同步访问共享资源。

Java 5 的 java.util.concurrent 提供了读写锁,我们可以使用它,也需要了解它的一些原理。


1. Java 实现 Read/Write Lock

首先我们总结下读写访问共享数据的条件:

  • Read : 没有线程在写共享资源的数据,也没有线程在请求访问写共享资源的数据。
  • Write : 没有线程在读或者写共享资源。

一个线程需要读共享资源,只要保证没有线程在写共享资源的数据,也没有线程在请求写共享资源的数据。 通过优先写共享资源数据的请求我们可以得到写的请求比读的请求更重要。此外,如果读经常发生,并且我们没有优先写操作,饥饿(饥饿是由于线程间不公平竞争共享资源产生某些线程被无限期阻塞的现象)经常发生。如果一个读的线程不断的获取到访问资源的锁,那么写的线程将无限期的等待锁资源被释放,产生饥饿。因此一个线程需要读共享资源,需要保证没有线程在写共享资源,并且没有线程在请求写共享资源。

一个线程需要写共享资源,只要保证没有线程在读或写共享资源。

记住这些简单的规则,我们来实现一个简单的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
class ReadWriteLock {

private int readers = 0;
private int writes = 0;
private int requestWrites = 0;

public synchronized void lockRead() throws InterruptedException {
//读需要保证没有线程在写共享资源并且没有线程在请求写共享资源
while (writes > 0 || requestWrites > 0) {
wait();
}
//读资源线程记录数自增
readers++;
}

public synchronized void unlockRead() {
readers--;
//唤醒所有等待的线程
notifyAll();
}

public synchronized void lockWrite() throws InterruptedException {
//自增请求写资源的线程记录数
requestWrites++;
//写需要保证没有线程在读或写资源
while (readers > 0 || writes > 0) {
wait();
}
//请求写资源的线程获取到了锁,请求数自减
requestWrites--;
//自增正在写资源的线程记录数
writes++;
}

public synchronized void unlockWrite() {
writes--;
//唤醒所有等待的线程
notifyAll();
}

}

这里是我们自定义的一个读写锁,通过synchronized实现,如果你需要实现线程间公平的写资源,可以使用JUC提供的ReadWriteLock,JUC的内容我们将在JUC专题总结。同一时间允许多个线程读共享资源,但是在调用lockRead()方法和unlockRead()方法时使得读线程同步。同一时间只允许一个线程写共享资源,同样在调用lockWrite()方法和unlockWrite()方法时会使得写线程同步。

这里再次强调下我们的规则:

  • 一个线程需要读共享资源,只要保证没有线程在写共享资源的数据,也没有线程在请求写共享资源的数据。
  • 一个线程需要写共享资源,只要保证没有线程在读或写共享资源。

2. 可重入 Read / Write Lock

上面示例中的ReadWriteLock不是可重入锁。使得ReadWriteLock可重入需要做一些改变。

我们可以先列出锁重入的情况,如下所示:

  • 读 -> 读
  • 读 -> 写
  • 写 -> 读
  • 写 -> 写

下面我们按照上面列出的情况,逐一分析,这里做一个简化,因为“读 -> 读”、“写 -> 写”的情况比“读 -> 写”、“写 -> 读”简单,所以我们按照“读 -> 读”、“写 -> 写”、“读 -> 写”、“写 -> 读”的顺序来分析,最后我们将实现一个覆盖所有场景的可重入读写锁。

2.1 读可重入(读 -> 读)

首先我们总结下读可重入的规则:如果一个线程要实现读可重入,那么需要保证没有线程正在写共享资源获取请求写共享资源,或者它已经拥有读共享资源的锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
class ReadWriteLock {

private Map<Thread, Integer> readerMap = new HashMap<Thread, Integer>();

private int writes = 0;
private int requestWrites = 0;

public synchronized void lockRead() throws InterruptedException {
Thread currentThread = Thread.currentThread();
//自旋
while (!canGrantReadAccess(currentThread)) {
wait();
}
//读资源线程记录数自增
if (!readerMap.containsKey(currentThread)) {
readerMap.put(currentThread, 0);
}
readerMap.put(currentThread, readerMap.get(currentThread) + 1);
}

/**
* 是否可以获取读访问
*
* <p>读需要保证没有线程在写共享资源并且没有线程在请求写共享资源,或者当前线程已经具有读的锁</p>
*
* @param thread 线程
* @return boolean 结果
*/
private boolean canGrantReadAccess(Thread thread) {
//如果当前线程已经具有读的锁,就排除了有线程在写的可能性
if (readerMap.containsKey(thread) && readerMap.get(thread) > 0) {
return true;
}
if (writes > 0) {
return false;
}
return requestWrites == 0;
}

public synchronized void unlockRead() {
Thread currentThread = Thread.currentThread();
readerMap.put(currentThread, readerMap.get(currentThread) - 1);
if (readerMap.get(currentThread) == 0) {
//清空引用
readerMap.remove(currentThread);
}
//唤醒所有等待的线程
notifyAll();
}

public synchronized void lockWrite() throws InterruptedException {
//自增请求写资源的线程记录数
requestWrites++;
//自旋
while (!canGrateWriteAccess()) {
wait();
}
//请求写资源的线程获取到了锁,请求数自减
requestWrites--;
//自增正在写资源的线程记录数
writes++;
}

/**
* 是否可以获取写访问
*
* <p>写需要保证没有线程在读或写资源<p/>
*
* @return 结果
*/
private boolean canGrateWriteAccess() {
return readerMap.values().stream().mapToInt(i -> i).sum() > 0 || writes > 0;
}

public synchronized void unlockWrite() {
writes--;
//唤醒所有等待的线程
notifyAll();
}

}

当你充分的了解了读重入锁的规则,你也可以轻易的写出类似的代码。所以:重点是规则。

2.2 写可重入(写 -> 写)

写可重入的规则:当前线程拥有写资源的锁,不需要再次获取锁可再次进入同步代码块。 让我们在上面示例的基础上做一些修改。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
class ReadWriteLock {

private Map<Thread, Integer> readerMap = new HashMap<Thread, Integer>();

private int writes = 0;
private int requestWrites = 0;

/**
* 正在写的线程
*/
private Thread writingThread = null;

//...省略读的 lockRead() 和 unlockRead() 方法

public synchronized void lockWrite() throws InterruptedException {
//自增请求写资源的线程记录数
requestWrites++;
//自旋
while (!canGrateWriteAccess()) {
wait();
}
//请求写资源的线程获取到了锁,请求数自减
requestWrites--;
//自增正在写资源的线程记录数
writes++;
}

/**
* 是否可以获取写访问
*
* <p>写需要保证没有线程在读或写资源<p/>
*
* @return 结果
*/
private boolean canGrateWriteAccess() {
return writingThread == Thread.currentThread()
|| readerMap.values().stream().mapToInt(i -> i).sum() > 0
|| writes > 0;
}

public synchronized void unlockWrite() {
writes--;
//没有写的线程,清空writingThread
if (writes == 0) {
writingThread = null;
}
//唤醒所有等待的线程
notifyAll();
}

}

我只增加了一个writingThread变量记录当前写的线程,并修改了lockWrite()方法的自旋条件canGrateWriteAccess()方法和unlockWrite()方法。

2.3 读-写可重入(读 -> 写)

当一个线程拥有读共享资源的锁时,获取写的锁需要满足以下条件:没有其他线程正在读取数据,并且没有任何线程在写共享资源数据或者请求写共享资源数据。

因此,我们只需要改动canGrateWriteAccess()方法的代码:

1
2
3
4
5
6
7
private boolean canGrateWriteAccess() {
return writingThread == Thread.currentThread()
|| writes > 0
|| readerMap.entrySet().stream().filter(thread -> thread != Thread.currentThread())
.map(Map.Entry::getValue)
.mapToInt(i -> i).sum() > 0;
}
2.4 写-读可重入(写 -> 读)

当一个线程拥有写共享资源的锁时,需要确保没有线程在读取或写共享资源,因此它可以直接读共享资源的数据。

因此,我们只要改动canGrantReadAccess()方法的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private boolean canGrantReadAccess(Thread thread) {
//如果当前线程已经获取写资源的锁,则不需要再次获取锁
if (thread == writingThread) {
return true;
}
//如果当前线程已经具有读的锁,就排除了有线程在写的可能性
if (readerMap.containsKey(thread) && readerMap.get(thread) > 0) {
return true;
}
if (writes > 0) {
return false;
}
return requestWrites == 0;
}
2.5 全功能读写可重入锁

经过前面四小结的完善,ReadWriteLock就具备了读-读、读-写、写-读、写-写各个功能的锁重入。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
class ReadWriteLock {

private Map<Thread, Integer> readerMap = new HashMap<Thread, Integer>();

private int writes = 0;
private int requestWrites = 0;
private Thread writingThread = null;

public synchronized void lockRead() throws InterruptedException {
Thread currentThread = Thread.currentThread();
//自旋
while (!canGrantReadAccess(currentThread)) {
wait();
}
//读资源线程记录数自增
if (!readerMap.containsKey(currentThread)) {
readerMap.put(currentThread, 0);
}
readerMap.put(currentThread, readerMap.get(currentThread) + 1);
}

/**
* 是否可以获取读访问
*
* <p>读需要保证没有线程在写共享资源并且没有线程在请求写共享资源,或者当前线程已经具有读的锁</p>
*
* @param thread 线程
* @return boolean 结果
*/
private boolean canGrantReadAccess(Thread thread) {
//如果当前线程已经获取写资源的锁,则不需要再次获取锁
if (thread == writingThread) {
return true;
}
//如果当前线程已经具有读的锁,就排除了有线程在写的可能性
if (readerMap.containsKey(thread) && readerMap.get(thread) > 0) {
return true;
}
if (writes > 0) {
return false;
}
return requestWrites == 0;
}

public synchronized void unlockRead() {
Thread currentThread = Thread.currentThread();
readerMap.put(currentThread, readerMap.get(currentThread) - 1);
if (readerMap.get(currentThread) == 0) {
//清空引用
readerMap.remove(currentThread);
}
//唤醒所有等待的线程
notifyAll();
}

public synchronized void lockWrite() throws InterruptedException {
//自增请求写资源的线程记录数
requestWrites++;
//自旋
while (!canGrateWriteAccess()) {
wait();
}
//请求写资源的线程获取到了锁,请求数自减
requestWrites--;
//自增正在写资源的线程记录数
writes++;
}

/**
* 是否可以获取写访问
*
* <p>写需要保证没有线程在读或写资源<p/>
*
* @return 结果
*/
private boolean canGrateWriteAccess() {
return writingThread == Thread.currentThread()
|| writes > 0
|| readerMap.entrySet().stream().filter(thread -> thread != Thread.currentThread())
.map(Map.Entry::getValue)
.mapToInt(i -> i).sum() > 0;
}

public synchronized void unlockWrite() {
writes--;
//没有写的线程,清空writingThread
if (writes == 0) {
writingThread = null;
}
//唤醒所有等待的线程
notifyAll();
}

}
2.6 在finally代码块中调用unlock()方法

为了产生死锁,请在finally代码块中调用unlock()方法。

1
2
3
4
5
6
lock.lockWrite();
try{
//do critical section code, which may throw exception
} finally {
lock.unlockWrite();
}