Table of Contents
Togglewait and notify Methods in Java
了解 Java 中最基本的機制之一執行緒同步,目前的執行緒必須擁有物件的監視器,也就是一個 lock 鎖,可以使用 synchronized 鎖定方法、區塊、或 static 靜態方法來取得鎖, Java wait notify Methods ,提供生產與消費模式 Produce and Consume ,說明執行緒間的同步機制,本篇增加了範例,並透過單元測試來驗證產出結果。
檔案目錄
./
+- src
+- test
| +- org
| +- ruoxue
| +- java_147
| +- multithreading
| +- thread
| +- WaitNotifyTest.java
單元測試
Java notify wait Methods 提供生產與消費模式,暫停執行緒、喚醒執行緒等操作。
consumeAndProduce
Java notify wait Methods 建立 1 個阻塞佇列,容量大小為 2 ,當佇列已滿時, put 方法會暫停調用執行緒,直到能放入物件時,才會喚醒執行緒,當佇列為空時,take 方法會暫停調用執行緒,直到能取出物件時,才會喚醒執行緒,採用先消費再生產的方式,分別執行 2 個消費執行緒跟 3 個生產執行緒。
protected class BlockQueue<E> {
private int maxSize;
private List<E> list = new ArrayList<E>();
public BlockQueue(int maxSize) {
this.maxSize = maxSize;
}
public synchronized void put(E e) throws InterruptedException {
while (list.size() == maxSize) {
System.out.println("T[" + Thread.currentThread().getId() + "] producer waiting");
wait();
}
boolean added = list.add(e);
if (added) {
notify();
}
}
public synchronized E take() throws InterruptedException {
E result = null;
while (list.size() == 0) {
System.out.println("T[" + Thread.currentThread().getId() + "] comsumer waiting");
wait();
}
result = list.remove(0);
if (result != null) {
notify();
}
return result;
}
public int size() {
return list.size();
}
@Override
public String toString() {
ToStringBuilder builder = new ToStringBuilder(this, ToStringStyle.JSON_STYLE);
builder.appendSuper(super.toString());
builder.append("maxSize", maxSize);
builder.append("list", list);
return builder.toString();
}
}
@Test
public void consumeAndProduce() {
int expectedSize = 1;
BlockQueue<Integer> queue = new BlockQueue<Integer>(2);
List<Thread> comsumers = Stream.generate(() -> new Thread(() -> {
try {
Integer value = queue.take();
System.out.println("T[" + Thread.currentThread().getId() + "] comsumer take: " + value);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
})).limit(2).collect(Collectors.toList());
comsumers.forEach(e -> e.start());
AtomicInteger ids = new AtomicInteger();
List<Thread> producers = Stream.generate(() -> new Thread(() -> {
try {
int value = ids.getAndIncrement();
queue.put(value);
System.out.println("T[" + Thread.currentThread().getId() + "] producer put: " + value);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
})).limit(3).collect(Collectors.toList());
producers.forEach(e -> e.start());
System.out.println("T[" + Thread.currentThread().getId() + "] " + queue);
assertEquals(expectedSize, queue.size());
}
T[11] comsumer waiting
T[12] comsumer waiting
T[13] producer put: 0
T[11] comsumer take: 0
T[12] comsumer waiting
T[15] producer put: 2
T[12] comsumer take: 2
T[14] producer put: 1
T[1] {"maxSize":2,"list":[1]}
produceAndConsume
Java notify wait Methods 建立 1 個阻塞佇列,容量大小為 2 ,當佇列已滿時, put 方法會暫停調用執行緒,直到能放入物件時,才會喚醒執行緒,當佇列為空時,take 方法會暫停調用執行緒,直到能取出物件時,才會喚醒執行緒,採用先生產再消費的方式,分別執行 3 個生產執行緒跟 2 個消費執行緒。
@Test
public void produceAndConsume() {
int expectedSize = 1;
BlockQueue<Integer> queue = new BlockQueue<Integer>(2);
AtomicInteger ids = new AtomicInteger();
List<Thread> producers = Stream.generate(() -> new Thread(() -> {
try {
int value = ids.getAndIncrement();
queue.put(value);
System.out.println("T[" + Thread.currentThread().getId() + "] producer put: " + value);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
})).limit(3).collect(Collectors.toList());
producers.forEach(e -> e.start());
List<Thread> comsumers = Stream.generate(() -> new Thread(() -> {
try {
Integer value = queue.take();
System.out.println("T[" + Thread.currentThread().getId() + "] comsumer take: " + value);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
})).limit(2).collect(Collectors.toList());
comsumers.forEach(e -> e.start());
System.out.println("T[" + Thread.currentThread().getId() + "] " + queue);
assertEquals(expectedSize, queue.size());
}
T[11] producer put: 0
T[13] producer waiting
T[12] producer put: 1
T[13] producer put: 2
T[15] comsumer take: 1
T[14] comsumer take: 0
T[1] {"maxSize":2,"list":[2]}
WaitNotifyTest.java
wait notify Methods 新增單元測試,驗證 Java notify wait Methods 是否符合預期。
package org.ruoxue.java_147.synchronization.thread;
import static org.junit.Assert.*;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
import org.junit.Test;
public class WaitNotifyTest {
protected class BlockQueue<E> {
private int maxSize;
private List<E> list = new ArrayList<E>();
public BlockQueue(int maxSize) {
this.maxSize = maxSize;
}
public synchronized void put(E e) throws InterruptedException {
while (list.size() == maxSize) {
System.out.println("T[" + Thread.currentThread().getId() + "] producer waiting");
wait();
}
boolean added = list.add(e);
if (added) {
notify();
}
}
public synchronized E take() throws InterruptedException {
E result = null;
while (list.size() == 0) {
System.out.println("T[" + Thread.currentThread().getId() + "] comsumer waiting");
wait();
}
result = list.remove(0);
if (result != null) {
notify();
}
return result;
}
public int size() {
return list.size();
}
@Override
public String toString() {
ToStringBuilder builder = new ToStringBuilder(this, ToStringStyle.JSON_STYLE);
builder.appendSuper(super.toString());
builder.append("maxSize", maxSize);
builder.append("list", list);
return builder.toString();
}
}
@Test
public void consumeAndProduce() {
int expectedSize = 1;
BlockQueue<Integer> queue = new BlockQueue<Integer>(2);
List<Thread> comsumers = Stream.generate(() -> new Thread(() -> {
try {
Integer value = queue.take();
System.out.println("T[" + Thread.currentThread().getId() + "] comsumer take: " + value);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
})).limit(2).collect(Collectors.toList());
comsumers.forEach(e -> e.start());
AtomicInteger ids = new AtomicInteger();
List<Thread> producers = Stream.generate(() -> new Thread(() -> {
try {
int value = ids.getAndIncrement();
queue.put(value);
System.out.println("T[" + Thread.currentThread().getId() + "] producer put: " + value);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
})).limit(3).collect(Collectors.toList());
producers.forEach(e -> e.start());
System.out.println("T[" + Thread.currentThread().getId() + "] " + queue);
assertEquals(expectedSize, queue.size());
}
@Test
public void produceAndConsume() {
int expectedSize = 1;
BlockQueue<Integer> queue = new BlockQueue<Integer>(2);
AtomicInteger ids = new AtomicInteger();
List<Thread> producers = Stream.generate(() -> new Thread(() -> {
try {
int value = ids.getAndIncrement();
queue.put(value);
System.out.println("T[" + Thread.currentThread().getId() + "] producer put: " + value);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
})).limit(3).collect(Collectors.toList());
producers.forEach(e -> e.start());
List<Thread> comsumers = Stream.generate(() -> new Thread(() -> {
try {
Integer value = queue.take();
System.out.println("T[" + Thread.currentThread().getId() + "] comsumer take: " + value);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
})).limit(2).collect(Collectors.toList());
comsumers.forEach(e -> e.start());
System.out.println("T[" + Thread.currentThread().getId() + "] " + queue);
assertEquals(expectedSize, queue.size());
}
}
心得分享
Java wait and notify with Examples 使用生產與消費模式,協調不同執行緒的同步機制,要注意的是,需要定義快速檢查,用來檢查執行緒繼續執行的所需條件,這是因為在某些情況下,Java wait notify Methods 執行緒可能會在沒有收到通知的情況下被喚醒, wait notify Methods 提供執行緒間的同步機制應用。