wait and notify Methods in Java - Java 147

wait and notify Methods in Java – Java 147

wait and notify Methods in Java

了解 Java 中最基本的機制之一執行緒同步,目前的執行緒必須擁有物件的監視器,也就是一個 lock 鎖,可以使用 synchronized 鎖定方法、區塊、或 static 靜態方法來取得鎖, Java wait notify Methods ,提供生產與消費模式 Produce and Consume ,說明執行緒間的同步機制,本篇增加了範例,並透過單元測試來驗證產出結果。

檔案目錄

./
   +- src
       +- test
       |   +- org
       |       +- ruoxue
       |           +- java_147
       |               +- multithreading
       |                   +- thread
       |                       +- WaitNotifyTest.java   

單元測試

Java notify wait Methods 提供生產與消費模式,暫停執行緒、喚醒執行緒等操作。

consumeAndProduce

Java notify wait Methods 建立 1 個阻塞佇列,容量大小為 2 ,當佇列已滿時, put 方法會暫停調用執行緒,直到能放入物件時,才會喚醒執行緒,當佇列為空時,take 方法會暫停調用執行緒,直到能取出物件時,才會喚醒執行緒,採用先消費再生產的方式,分別執行 2 個消費執行緒跟 3 個生產執行緒。

	protected class BlockQueue<E> {
		private int maxSize;
		private List<E> list = new ArrayList<E>();

		public BlockQueue(int maxSize) {
			this.maxSize = maxSize;
		}

		public synchronized void put(E e) throws InterruptedException {
			while (list.size() == maxSize) {
				System.out.println("T[" + Thread.currentThread().getId() + "] producer waiting");
				wait();
			}
			boolean added = list.add(e);
			if (added) {
				notify();
			}
		}

		public synchronized E take() throws InterruptedException {
			E result = null;
			while (list.size() == 0) {
				System.out.println("T[" + Thread.currentThread().getId() + "] comsumer waiting");
				wait();
			}
			result = list.remove(0);
			if (result != null) {
				notify();
			}
			return result;
		}

		public int size() {
			return list.size();
		}

		@Override
		public String toString() {
			ToStringBuilder builder = new ToStringBuilder(this, ToStringStyle.JSON_STYLE);
			builder.appendSuper(super.toString());
			builder.append("maxSize", maxSize);
			builder.append("list", list);
			return builder.toString();
		}
	}

	@Test
	public void consumeAndProduce() {
		int expectedSize = 1;
		BlockQueue<Integer> queue = new BlockQueue<Integer>(2);
		List<Thread> comsumers = Stream.generate(() -> new Thread(() -> {
			try {
				Integer value = queue.take();
				System.out.println("T[" + Thread.currentThread().getId() + "] comsumer take: " + value);
			} catch (InterruptedException ex) {
				ex.printStackTrace();
			}
		})).limit(2).collect(Collectors.toList());
		comsumers.forEach(e -> e.start());

		AtomicInteger ids = new AtomicInteger();
		List<Thread> producers = Stream.generate(() -> new Thread(() -> {
			try {
				int value = ids.getAndIncrement();
				queue.put(value);
				System.out.println("T[" + Thread.currentThread().getId() + "] producer put: " + value);
			} catch (InterruptedException ex) {
				ex.printStackTrace();
			}
		})).limit(3).collect(Collectors.toList());
		producers.forEach(e -> e.start());

		System.out.println("T[" + Thread.currentThread().getId() + "] " + queue);
		assertEquals(expectedSize, queue.size());
	}
T[11] comsumer waiting
T[12] comsumer waiting
T[13] producer put: 0
T[11] comsumer take: 0
T[12] comsumer waiting
T[15] producer put: 2
T[12] comsumer take: 2
T[14] producer put: 1
T[1] {"maxSize":2,"list":[1]}

produceAndConsume

Java notify wait Methods 建立 1 個阻塞佇列,容量大小為 2 ,當佇列已滿時, put 方法會暫停調用執行緒,直到能放入物件時,才會喚醒執行緒,當佇列為空時,take 方法會暫停調用執行緒,直到能取出物件時,才會喚醒執行緒,採用先生產再消費的方式,分別執行 3 個生產執行緒跟 2 個消費執行緒。

	@Test
	public void produceAndConsume() {
		int expectedSize = 1;
		BlockQueue<Integer> queue = new BlockQueue<Integer>(2);
		AtomicInteger ids = new AtomicInteger();
		List<Thread> producers = Stream.generate(() -> new Thread(() -> {
			try {
				int value = ids.getAndIncrement();
				queue.put(value);
				System.out.println("T[" + Thread.currentThread().getId() + "] producer put: " + value);
			} catch (InterruptedException ex) {
				ex.printStackTrace();
			}
		})).limit(3).collect(Collectors.toList());
		producers.forEach(e -> e.start());

		List<Thread> comsumers = Stream.generate(() -> new Thread(() -> {
			try {
				Integer value = queue.take();
				System.out.println("T[" + Thread.currentThread().getId() + "] comsumer take: " + value);
			} catch (InterruptedException ex) {
				ex.printStackTrace();
			}
		})).limit(2).collect(Collectors.toList());
		comsumers.forEach(e -> e.start());

		System.out.println("T[" + Thread.currentThread().getId() + "] " + queue);
		assertEquals(expectedSize, queue.size());
	}
T[11] producer put: 0
T[13] producer waiting
T[12] producer put: 1
T[13] producer put: 2
T[15] comsumer take: 1
T[14] comsumer take: 0
T[1] {"maxSize":2,"list":[2]}

WaitNotifyTest.java

wait notify Methods 新增單元測試,驗證 Java notify wait Methods 是否符合預期。

package org.ruoxue.java_147.synchronization.thread;

import static org.junit.Assert.*;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
import org.junit.Test;

public class WaitNotifyTest {

	protected class BlockQueue<E> {
		private int maxSize;
		private List<E> list = new ArrayList<E>();

		public BlockQueue(int maxSize) {
			this.maxSize = maxSize;
		}

		public synchronized void put(E e) throws InterruptedException {
			while (list.size() == maxSize) {
				System.out.println("T[" + Thread.currentThread().getId() + "] producer waiting");
				wait();
			}
			boolean added = list.add(e);
			if (added) {
				notify();
			}
		}

		public synchronized E take() throws InterruptedException {
			E result = null;
			while (list.size() == 0) {
				System.out.println("T[" + Thread.currentThread().getId() + "] comsumer waiting");
				wait();
			}
			result = list.remove(0);
			if (result != null) {
				notify();
			}
			return result;
		}

		public int size() {
			return list.size();
		}

		@Override
		public String toString() {
			ToStringBuilder builder = new ToStringBuilder(this, ToStringStyle.JSON_STYLE);
			builder.appendSuper(super.toString());
			builder.append("maxSize", maxSize);
			builder.append("list", list);
			return builder.toString();
		}
	}

	@Test
	public void consumeAndProduce() {
		int expectedSize = 1;
		BlockQueue<Integer> queue = new BlockQueue<Integer>(2);
		List<Thread> comsumers = Stream.generate(() -> new Thread(() -> {
			try {
				Integer value = queue.take();
				System.out.println("T[" + Thread.currentThread().getId() + "] comsumer take: " + value);
			} catch (InterruptedException ex) {
				ex.printStackTrace();
			}
		})).limit(2).collect(Collectors.toList());
		comsumers.forEach(e -> e.start());

		AtomicInteger ids = new AtomicInteger();
		List<Thread> producers = Stream.generate(() -> new Thread(() -> {
			try {
				int value = ids.getAndIncrement();
				queue.put(value);
				System.out.println("T[" + Thread.currentThread().getId() + "] producer put: " + value);
			} catch (InterruptedException ex) {
				ex.printStackTrace();
			}
		})).limit(3).collect(Collectors.toList());
		producers.forEach(e -> e.start());

		System.out.println("T[" + Thread.currentThread().getId() + "] " + queue);
		assertEquals(expectedSize, queue.size());
	}

	@Test
	public void produceAndConsume() {
		int expectedSize = 1;
		BlockQueue<Integer> queue = new BlockQueue<Integer>(2);
		AtomicInteger ids = new AtomicInteger();
		List<Thread> producers = Stream.generate(() -> new Thread(() -> {
			try {
				int value = ids.getAndIncrement();
				queue.put(value);
				System.out.println("T[" + Thread.currentThread().getId() + "] producer put: " + value);
			} catch (InterruptedException ex) {
				ex.printStackTrace();
			}
		})).limit(3).collect(Collectors.toList());
		producers.forEach(e -> e.start());

		List<Thread> comsumers = Stream.generate(() -> new Thread(() -> {
			try {
				Integer value = queue.take();
				System.out.println("T[" + Thread.currentThread().getId() + "] comsumer take: " + value);
			} catch (InterruptedException ex) {
				ex.printStackTrace();
			}
		})).limit(2).collect(Collectors.toList());
		comsumers.forEach(e -> e.start());

		System.out.println("T[" + Thread.currentThread().getId() + "] " + queue);
		assertEquals(expectedSize, queue.size());
	}
}

心得分享

Java wait and notify with Examples 使用生產與消費模式,協調不同執行緒的同步機制,要注意的是,需要定義快速檢查,用來檢查執行緒繼續執行的所需條件,這是因為在某些情況下,Java wait notify Methods 執行緒可能會在沒有收到通知的情況下被喚醒, wait notify Methods 提供執行緒間的同步機制應用。

發佈留言