Table of Contents
ToggleCondition await and signal Methods in Java
讓執行緒暫停關鍵程式碼的執行,並因此進入等待狀態,必須已經持有同一關鍵程式碼的鎖,當執行緒進入等待狀態時,會自動釋放相應的鎖並暫停執行, Condition await signal Methods 這表示一旦讓該執行緒暫停執行並且釋放鎖,其他執行緒就可以取得該鎖,喚醒該執行緒,繼續執行符合條件的任務,本篇增加了範例,並透過單元測試來驗證產出結果。
檔案目錄
./
+- src
+- test
| +- org
| +- ruoxue
| +- java_147
| +- synchronization
| +- reentrantlock
| +- ConditionAwaitSignalTest.java
單元測試
Condition signal await Methods 提供生產與消費模式,鎖定條件、喚醒條件等操作。
consumeAndProduce
Condition signal await Methods 建立 1 個阻塞佇列,容量大小為 2 ,當佇列已滿時, put 方法會暫停調用執行緒,直到能放入物件時,才會喚醒執行緒,當佇列為空時,take 方法會暫停調用執行緒,直到能取出物件時,才會喚醒執行緒, Condition await and signal with Examples 採用先消費再生產的方式,分別執行 2 個消費執行緒跟 3 個生產執行緒。
protected class BlockQueue<E> {
private int maxSize;
private List<E> list = new ArrayList<E>();
private final Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
public BlockQueue(int maxSize) {
this.maxSize = maxSize;
}
public void put(E e) throws InterruptedException {
lock.lock();
try {
while (list.size() == maxSize) {
System.out.println(String.format("T[%d] producer waiting", Thread.currentThread().getId()));
condition.await();
}
boolean added = list.add(e);
if (added) {
condition.signal();
}
} finally {
lock.unlock();
}
}
public E take() throws InterruptedException {
E result = null;
lock.lock();
try {
while (list.size() == 0) {
System.out.println(String.format("T[%d] consumer waiting", Thread.currentThread().getId()));
condition.await();
}
result = list.remove(0);
if (result != null) {
condition.signal();
}
} finally {
lock.unlock();
}
return result;
}
public int size() {
return list.size();
}
@Override
public String toString() {
ToStringBuilder builder = new ToStringBuilder(this, ToStringStyle.JSON_STYLE);
builder.appendSuper(super.toString());
builder.append("maxSize", maxSize);
builder.append("list", list);
return builder.toString();
}
}
@Test
public void consumeAndProduce() {
int expectedSize = 1;
BlockQueue<Integer> queue = new BlockQueue<Integer>(2);
List<Thread> consumers = Stream.generate(() -> new Thread(() -> {
try {
Integer value = queue.take();
System.out.println(String.format("T[%d] consumer take: %d", Thread.currentThread().getId(), value));
} catch (InterruptedException ex) {
ex.printStackTrace();
}
})).limit(2).collect(Collectors.toList());
consumers.forEach(e -> e.start());
AtomicInteger ids = new AtomicInteger();
List<Thread> producers = Stream.generate(() -> new Thread(() -> {
try {
int value = ids.getAndIncrement();
queue.put(value);
System.out.println(String.format("T[%d] producer put: %d", Thread.currentThread().getId(), value));
} catch (InterruptedException ex) {
ex.printStackTrace();
}
})).limit(3).collect(Collectors.toList());
producers.forEach(e -> e.start());
System.out.println(String.format("T[%d] %s", Thread.currentThread().getId(), queue));
assertEquals(expectedSize, queue.size());
}
T[11] consumer waiting
T[12] consumer waiting
T[13] producer put: 0
T[12] consumer waiting
T[12] consumer take: 1
T[15] producer put: 1
T[14] producer put: 2
T[11] consumer take: 0
T[1] {"maxSize":2,"list":[2]}
produceAndConsume
Condition signal await Methods 建立 1 個阻塞佇列,容量大小為 2 ,當佇列已滿時, put 方法會暫停調用執行緒,直到能放入物件時,才會喚醒執行緒,當佇列為空時,take 方法會暫停調用執行緒,直到能取出物件時,才會喚醒執行緒, Condition await and signal with Examples 採用先生產再消費的方式,分別執行 3 個生產執行緒跟 2 個消費執行緒。
@Test
public void produceAndConsume() {
int expectedSize = 1;
BlockQueue<Integer> queue = new BlockQueue<Integer>(2);
AtomicInteger ids = new AtomicInteger();
List<Thread> producers = Stream.generate(() -> new Thread(() -> {
try {
int value = ids.getAndIncrement();
queue.put(value);
System.out.println(String.format("T[%d] producer put: %d", Thread.currentThread().getId(), value));
} catch (InterruptedException ex) {
ex.printStackTrace();
}
})).limit(3).collect(Collectors.toList());
producers.forEach(e -> e.start());
List<Thread> consumers = Stream.generate(() -> new Thread(() -> {
try {
Integer value = queue.take();
System.out.println(String.format("T[%d] consumer take: %d", Thread.currentThread().getId(), value));
} catch (InterruptedException ex) {
ex.printStackTrace();
}
})).limit(2).collect(Collectors.toList());
consumers.forEach(e -> e.start());
System.out.println(String.format("T[%d] %s", Thread.currentThread().getId(), queue));
assertEquals(expectedSize, queue.size());
}
T[12] producer waiting
T[13] producer put: 2
T[11] producer put: 0
T[14] consumer take: 0
T[15] consumer take: 2
T[12] producer put: 1
T[1] {"maxSize":2,"list":[1]}
ConditionAwaitSignalTest.java
ReentrantLock Condition await signal 新增單元測試,驗證 Condition await signal Methods 是否符合預期。
package org.ruoxue.java_147.synchronization.reentrantlock;
import static org.junit.Assert.*;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
import org.junit.Test;
public class ConditionAwaitSignalTest {
protected class BlockQueue<E> {
private int maxSize;
private List<E> list = new ArrayList<E>();
private final Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
public BlockQueue(int maxSize) {
this.maxSize = maxSize;
}
public void put(E e) throws InterruptedException {
lock.lock();
try {
while (list.size() == maxSize) {
System.out.println(String.format("T[%d] producer waiting", Thread.currentThread().getId()));
condition.await();
}
boolean added = list.add(e);
if (added) {
condition.signal();
}
} finally {
lock.unlock();
}
}
public E take() throws InterruptedException {
E result = null;
lock.lock();
try {
while (list.size() == 0) {
System.out.println(String.format("T[%d] consumer waiting", Thread.currentThread().getId()));
condition.await();
}
result = list.remove(0);
if (result != null) {
condition.signal();
}
} finally {
lock.unlock();
}
return result;
}
public int size() {
return list.size();
}
@Override
public String toString() {
ToStringBuilder builder = new ToStringBuilder(this, ToStringStyle.JSON_STYLE);
builder.appendSuper(super.toString());
builder.append("maxSize", maxSize);
builder.append("list", list);
return builder.toString();
}
}
@Test
public void consumeAndProduce() {
int expectedSize = 1;
BlockQueue<Integer> queue = new BlockQueue<Integer>(2);
List<Thread> consumers = Stream.generate(() -> new Thread(() -> {
try {
Integer value = queue.take();
System.out.println(String.format("T[%d] consumer take: %d", Thread.currentThread().getId(), value));
} catch (InterruptedException ex) {
ex.printStackTrace();
}
})).limit(2).collect(Collectors.toList());
consumers.forEach(e -> e.start());
AtomicInteger ids = new AtomicInteger();
List<Thread> producers = Stream.generate(() -> new Thread(() -> {
try {
int value = ids.getAndIncrement();
queue.put(value);
System.out.println(String.format("T[%d] producer put: %d", Thread.currentThread().getId(), value));
} catch (InterruptedException ex) {
ex.printStackTrace();
}
})).limit(3).collect(Collectors.toList());
producers.forEach(e -> e.start());
System.out.println(String.format("T[%d] %s", Thread.currentThread().getId(), queue));
assertEquals(expectedSize, queue.size());
}
@Test
public void produceAndConsume() {
int expectedSize = 1;
BlockQueue<Integer> queue = new BlockQueue<Integer>(2);
AtomicInteger ids = new AtomicInteger();
List<Thread> producers = Stream.generate(() -> new Thread(() -> {
try {
int value = ids.getAndIncrement();
queue.put(value);
System.out.println(String.format("T[%d] producer put: %d", Thread.currentThread().getId(), value));
} catch (InterruptedException ex) {
ex.printStackTrace();
}
})).limit(3).collect(Collectors.toList());
producers.forEach(e -> e.start());
List<Thread> consumers = Stream.generate(() -> new Thread(() -> {
try {
Integer value = queue.take();
System.out.println(String.format("T[%d] consumer take: %d", Thread.currentThread().getId(), value));
} catch (InterruptedException ex) {
ex.printStackTrace();
}
})).limit(2).collect(Collectors.toList());
consumers.forEach(e -> e.start());
System.out.println(String.format("T[%d] %s", Thread.currentThread().getId(), queue));
assertEquals(expectedSize, queue.size());
}
}
心得分享
Condition await and signal with Examples 鎖定條件為執行緒提供了在執行程式碼的關鍵時刻,等待某個任意條件發生的能力,通常受某種鎖定機制保護的一段程式碼,ReentrantLock Condition await signal 執行緒可能會取得程式碼關鍵部分的獨占鎖,當發現不具備繼續執行的所需條件時,該執行緒會釋放鎖,並將其狀態改變為等待狀態,直到滿足必要的條件為止, 這表示另一條執行緒,稍後將會向當前等待的執行緒發出信號,讓該執行緒重新取得鎖,並檢查是否已經滿足執行的必要條件。