Table of Contents
ToggleJava ReentrantLock with Conditions
鎖定條件為執行緒提供了在執行程式碼的關鍵時刻,等待某個任意條件發生的能力, Java Conditions with Locks 通常受某種鎖定機制保護的一段程式碼,執行緒可能會取得程式碼關鍵部分的獨占鎖,當發現不具備繼續執行的所需條件時,該執行緒會釋放鎖,並將其狀態改變為等待狀態,直到滿足必要的條件為止, Lock Conditions in Java 這表示另一條執行緒,稍後將會向當前等待的執行緒發出信號,讓該執行緒重新取得鎖,並檢查是否已經滿足執行的必要條件, Locks and Conditions 本篇增加了範例,並透過單元測試來驗證產出結果。
檔案目錄
./
+- src
+- test
| +- org
| +- ruoxue
| +- java_147
| +- synchronization
| +- reentrantlock
| +- ReentrantLockWithConditionsTest.java
單元測試
Locks and Conditions 提供鎖定條件、喚醒條件等操作。
consumeAndProduce
Locks and Conditions 建立 1 個阻塞佇列,容量大小為 2 ,當佇列已滿時, put 方法會暫停調用執行緒,直到能放入物件時,才會喚醒執行緒,當佇列為空時,take 方法會暫停調用執行緒,直到能取出物件時,才會喚醒執行緒, Java ReentrantLock with Conditions 採用先消費再生產的方式,分別執行 2 個消費執行緒跟 3 個生產執行緒。
protected class BlockQueue<E> {
private int maxSize;
private List<E> list = new ArrayList<E>();
private final Lock lock = new ReentrantLock();
private Condition notEmpty = lock.newCondition();
private Condition notFull = lock.newCondition();
public BlockQueue(int maxSize) {
this.maxSize = maxSize;
}
public void put(E e) throws InterruptedException {
lock.lock();
try {
while (list.size() == maxSize) {
System.out.println(String.format("T[%d] producer waiting", Thread.currentThread().getId()));
notEmpty.await();
}
boolean added = list.add(e);
if (added) {
notFull.signal();
}
} finally {
lock.unlock();
}
}
public E take() throws InterruptedException {
E result = null;
lock.lock();
try {
while (list.size() == 0) {
System.out.println(String.format("T[%d] comsumer waiting", Thread.currentThread().getId()));
notFull.await();
}
result = list.remove(0);
if (result != null) {
notEmpty.signal();
}
} finally {
lock.unlock();
}
return result;
}
public int size() {
return list.size();
}
@Override
public String toString() {
ToStringBuilder builder = new ToStringBuilder(this, ToStringStyle.JSON_STYLE);
builder.appendSuper(super.toString());
builder.append("maxSize", maxSize);
builder.append("list", list);
return builder.toString();
}
}
@Test
public void consumeAndProduce() {
int expectedSize = 1;
BlockQueue<Integer> queue = new BlockQueue<Integer>(2);
List<Thread> consumers = Stream.generate(() -> new Thread(() -> {
try {
Integer value = queue.take();
System.out.println(String.format("T[%d] consumer take: %d", Thread.currentThread().getId(), value));
} catch (InterruptedException ex) {
ex.printStackTrace();
}
})).limit(2).collect(Collectors.toList());
consumers.forEach(e -> e.start());
AtomicInteger ids = new AtomicInteger();
List<Thread> producers = Stream.generate(() -> new Thread(() -> {
try {
int value = ids.getAndIncrement();
queue.put(value);
System.out.println(String.format("T[%d] producer put: %d", Thread.currentThread().getId(), value));
} catch (InterruptedException ex) {
ex.printStackTrace();
}
})).limit(3).collect(Collectors.toList());
producers.forEach(e -> e.start());
System.out.println("T[" + Thread.currentThread().getId() + "] " + queue);
assertEquals(expectedSize, queue.size());
}
T[11] comsumer waiting
T[12] comsumer waiting
T[14] producer put: 0
T[13] producer put: 1
T[15] producer waiting
T[11] comsumer take: 0
T[12] comsumer take: 1
T[15] producer put: 2
T[1] {"maxSize":2,"list":[2]}
doWorker
Locks and Conditions 建立 3 個執行緒,限制最大執行緒可執行數量為 2 ,當超過限制時,其他執行緒將會暫停等待,直到有可執行數量, Java Conditions with Locks 才會繼續執行任務。
protected class DoWorker {
private final Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
private AtomicInteger count = new AtomicInteger();
private static final int MAX_COUNT = 2;
public DoWorker() {
}
public void doWork() throws Exception {
await();
try {
System.out.println(String.format("T[%d] ready", Thread.currentThread().getId()));
TimeUnit.SECONDS.sleep(3);
} finally {
signal();
}
}
public void await() throws InterruptedException {
lock.lock();
try {
while (count.get() >= MAX_COUNT) {
System.out.println(String.format("T[%d] waiting", Thread.currentThread().getId()));
condition.await();
}
count.incrementAndGet();
} finally {
lock.unlock();
}
}
public void signal() throws InterruptedException {
lock.lock();
try {
count.decrementAndGet();
System.out.println(String.format("T[%d] finished", Thread.currentThread().getId()));
condition.signal();
} finally {
lock.unlock();
}
}
}
@Test
public void doWorker() {
DoWorker worker = new DoWorker();
List<Thread> threads = Stream.generate(() -> new Thread(() -> {
try {
worker.doWork();
} catch (Exception ex) {
ex.printStackTrace();
}
})).limit(3).collect(Collectors.toList());
threads.forEach(e -> e.start());
threads.forEach(e -> {
try {
e.join();
} catch (InterruptedException ex) {
ex.printStackTrace();
}
});
}
T[11] ready
T[12] ready
T[13] waiting
T[12] finished
T[11] finished
T[13] ready
T[13] finished
timeoutWorker
Locks and Conditions 建立 3 個執行緒,限制最大執行緒可執行數量為 2 ,當超過限制時,其他執行緒將會暫停等待,等待 2 秒後逾時, Java Conditions with Locks 會拋出例外,結束任務。
protected class TimeoutWorker {
private final Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
private AtomicInteger count = new AtomicInteger();
private static final int MAX_COUNT = 2;
public TimeoutWorker() {
}
public void doWork() throws Exception {
await();
try {
System.out.println(String.format("T[%d] ready", Thread.currentThread().getId()));
TimeUnit.SECONDS.sleep(3);
} finally {
signal();
}
}
public void await() throws InterruptedException, TimeoutException {
lock.lock();
try {
boolean awaited = false;
long timeoutRemaining = 2000;
long awaitStarted = System.currentTimeMillis();
while (count.get() >= MAX_COUNT && !awaited && timeoutRemaining > 0) {
System.out.println(String.format("T[%d] waiting", Thread.currentThread().getId()));
awaited = condition.await(timeoutRemaining, TimeUnit.MILLISECONDS);
timeoutRemaining -= System.currentTimeMillis() - awaitStarted;
}
if (count.get() >= MAX_COUNT) {
throw new TimeoutException("T[" + Thread.currentThread().getId() + "] ");
}
count.incrementAndGet();
} finally {
lock.unlock();
}
}
public void signal() throws InterruptedException {
lock.lock();
try {
count.decrementAndGet();
System.out.println(String.format("T[%d] finished", Thread.currentThread().getId()));
condition.signal();
} finally {
lock.unlock();
}
}
}
@Test
public void timeoutWorker() {
TimeoutWorker worker = new TimeoutWorker();
List<Thread> threads = Stream.generate(() -> new Thread(() -> {
try {
worker.doWork();
} catch (Exception ex) {
ex.printStackTrace();
}
})).limit(3).collect(Collectors.toList());
threads.forEach(e -> e.start());
threads.forEach(e -> {
try {
e.join();
} catch (InterruptedException ex) {
ex.printStackTrace();
}
});
}
T[11] ready
T[13] waiting
T[12] ready
java.util.concurrent.TimeoutException: T[13]
at org.ruoxue.java_147.synchronization.ReentrantLockWithConditionsTest$TimeoutWorker.await(ReentrantLockWithConditionsTest.java:209)
at org.ruoxue.java_147.synchronization.ReentrantLockWithConditionsTest$TimeoutWorker.doWork(ReentrantLockWithConditionsTest.java:188)
at org.ruoxue.java_147.synchronization.ReentrantLockWithConditionsTest.lambda$11(ReentrantLockWithConditionsTest.java:234)
at java.lang.Thread.run(Thread.java:750)
T[11] finished
T[12] finished
ReentrantLockWithConditionsTest.java
Lock and Condition in Java 新增單元測試,驗證 Lock Conditions in Java 是否符合預期。
package org.ruoxue.java_147.synchronization.reentrantlock;
import static org.junit.Assert.assertEquals;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
import org.junit.Test;
public class ReentrantLockWithConditionsTest {
protected class BlockQueue<E> {
private int maxSize;
private List<E> list = new ArrayList<E>();
private final Lock lock = new ReentrantLock();
private Condition notEmpty = lock.newCondition();
private Condition notFull = lock.newCondition();
public BlockQueue(int maxSize) {
this.maxSize = maxSize;
}
public void put(E e) throws InterruptedException {
lock.lock();
try {
while (list.size() == maxSize) {
System.out.println(String.format("T[%d] producer waiting", Thread.currentThread().getId()));
notEmpty.await();
}
boolean added = list.add(e);
if (added) {
notFull.signal();
}
} finally {
lock.unlock();
}
}
public E take() throws InterruptedException {
E result = null;
lock.lock();
try {
while (list.size() == 0) {
System.out.println(String.format("T[%d] comsumer waiting", Thread.currentThread().getId()));
notFull.await();
}
result = list.remove(0);
if (result != null) {
notEmpty.signal();
}
} finally {
lock.unlock();
}
return result;
}
public int size() {
return list.size();
}
@Override
public String toString() {
ToStringBuilder builder = new ToStringBuilder(this, ToStringStyle.JSON_STYLE);
builder.appendSuper(super.toString());
builder.append("maxSize", maxSize);
builder.append("list", list);
return builder.toString();
}
}
@Test
public void consumeAndProduce() {
int expectedSize = 1;
BlockQueue<Integer> queue = new BlockQueue<Integer>(2);
List<Thread> consumers = Stream.generate(() -> new Thread(() -> {
try {
Integer value = queue.take();
System.out.println(String.format("T[%d] consumer take: %d", Thread.currentThread().getId(), value));
} catch (InterruptedException ex) {
ex.printStackTrace();
}
})).limit(2).collect(Collectors.toList());
consumers.forEach(e -> e.start());
AtomicInteger ids = new AtomicInteger();
List<Thread> producers = Stream.generate(() -> new Thread(() -> {
try {
int value = ids.getAndIncrement();
queue.put(value);
System.out.println(String.format("T[%d] producer put: %d", Thread.currentThread().getId(), value));
} catch (InterruptedException ex) {
ex.printStackTrace();
}
})).limit(3).collect(Collectors.toList());
producers.forEach(e -> e.start());
System.out.println("T[" + Thread.currentThread().getId() + "] " + queue);
assertEquals(expectedSize, queue.size());
}
protected class DoWorker {
private final Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
private AtomicInteger count = new AtomicInteger();
private static final int MAX_COUNT = 2;
public DoWorker() {
}
public void doWork() throws Exception {
await();
try {
System.out.println(String.format("T[%d] ready", Thread.currentThread().getId()));
TimeUnit.SECONDS.sleep(3);
} finally {
signal();
}
}
public void await() throws InterruptedException {
lock.lock();
try {
while (count.get() >= MAX_COUNT) {
System.out.println(String.format("T[%d] waiting", Thread.currentThread().getId()));
condition.await();
}
count.incrementAndGet();
} finally {
lock.unlock();
}
}
public void signal() throws InterruptedException {
lock.lock();
try {
count.decrementAndGet();
System.out.println(String.format("T[%d] finished", Thread.currentThread().getId()));
condition.signal();
} finally {
lock.unlock();
}
}
}
@Test
public void doWorker() {
DoWorker worker = new DoWorker();
List<Thread> threads = Stream.generate(() -> new Thread(() -> {
try {
worker.doWork();
} catch (Exception ex) {
ex.printStackTrace();
}
})).limit(3).collect(Collectors.toList());
threads.forEach(e -> e.start());
threads.forEach(e -> {
try {
e.join();
} catch (InterruptedException ex) {
ex.printStackTrace();
}
});
}
protected class TimeoutWorker {
private final Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
private AtomicInteger count = new AtomicInteger();
private static final int MAX_COUNT = 2;
public TimeoutWorker() {
}
public void doWork() throws Exception {
await();
try {
System.out.println(String.format("T[%d] ready", Thread.currentThread().getId()));
TimeUnit.SECONDS.sleep(3);
} finally {
signal();
}
}
public void await() throws InterruptedException, TimeoutException {
lock.lock();
try {
boolean awaited = false;
long timeoutRemaining = 2000;
long awaitStarted = System.currentTimeMillis();
while (count.get() >= MAX_COUNT && !awaited && timeoutRemaining > 0) {
System.out.println(String.format("T[%d] waiting", Thread.currentThread().getId()));
awaited = condition.await(timeoutRemaining, TimeUnit.MILLISECONDS);
timeoutRemaining -= System.currentTimeMillis() - awaitStarted;
}
if (count.get() >= MAX_COUNT) {
throw new TimeoutException("T[" + Thread.currentThread().getId() + "] ");
}
count.incrementAndGet();
} finally {
lock.unlock();
}
}
public void signal() throws InterruptedException {
lock.lock();
try {
count.decrementAndGet();
System.out.println(String.format("T[%d] finished", Thread.currentThread().getId()));
condition.signal();
} finally {
lock.unlock();
}
}
}
@Test
public void timeoutWorker() {
TimeoutWorker worker = new TimeoutWorker();
List<Thread> threads = Stream.generate(() -> new Thread(() -> {
try {
worker.doWork();
} catch (Exception ex) {
ex.printStackTrace();
}
})).limit(3).collect(Collectors.toList());
threads.forEach(e -> e.start());
threads.forEach(e -> {
try {
e.join();
} catch (InterruptedException ex) {
ex.printStackTrace();
}
});
}
}
心得分享
Java Conditions with Locks 讓執行緒暫停關鍵程式碼的執行,並因此進入等待狀態,必須已經持有同一關鍵程式碼的鎖,當執行緒進入等待狀態時,會自動釋放相應的鎖並暫停執行,Lock and Condition in Java 這表示一旦讓該執行緒暫停執行並且釋放鎖,其他執行緒就可以取得該鎖,喚醒該執行緒,繼續執行符合條件的任務。