Condition await and signal Methods in Java - Java 147

Condition await and signal Methods in Java – Java 147

Condition await and signal Methods in Java

讓執行緒暫停關鍵程式碼的執行,並因此進入等待狀態,必須已經持有同一關鍵程式碼的鎖,當執行緒進入等待狀態時,會自動釋放相應的鎖並暫停執行, Condition await signal Methods 這表示一旦讓該執行緒暫停執行並且釋放鎖,其他執行緒就可以取得該鎖,喚醒該執行緒,繼續執行符合條件的任務,本篇增加了範例,並透過單元測試來驗證產出結果。

檔案目錄

./
   +- src
       +- test
       |   +- org
       |       +- ruoxue
       |           +- java_147
       |               +- synchronization
       |                   +- reentrantlock
       |                          +- ConditionAwaitSignalTest.java   

單元測試

Condition signal await Methods 提供生產與消費模式,鎖定條件、喚醒條件等操作。

consumeAndProduce

Condition signal await Methods 建立 1 個阻塞佇列,容量大小為 2 ,當佇列已滿時, put 方法會暫停調用執行緒,直到能放入物件時,才會喚醒執行緒,當佇列為空時,take 方法會暫停調用執行緒,直到能取出物件時,才會喚醒執行緒, Condition await and signal with Examples 採用先消費再生產的方式,分別執行 2 個消費執行緒跟 3 個生產執行緒。

	protected class BlockQueue<E> {
		private int maxSize;
		private List<E> list = new ArrayList<E>();
		private final Lock lock = new ReentrantLock();
		private Condition condition = lock.newCondition();

		public BlockQueue(int maxSize) {
			this.maxSize = maxSize;
		}

		public void put(E e) throws InterruptedException {
			lock.lock();
			try {
				while (list.size() == maxSize) {
					System.out.println(String.format("T[%d] producer waiting", Thread.currentThread().getId()));
					condition.await();
				}
				boolean added = list.add(e);
				if (added) {
					condition.signal();
				}
			} finally {
				lock.unlock();
			}
		}

		public E take() throws InterruptedException {
			E result = null;
			lock.lock();
			try {
				while (list.size() == 0) {
					System.out.println(String.format("T[%d] consumer waiting", Thread.currentThread().getId()));
					condition.await();
				}
				result = list.remove(0);
				if (result != null) {
					condition.signal();
				}
			} finally {
				lock.unlock();
			}
			return result;
		}

		public int size() {
			return list.size();
		}

		@Override
		public String toString() {
			ToStringBuilder builder = new ToStringBuilder(this, ToStringStyle.JSON_STYLE);
			builder.appendSuper(super.toString());
			builder.append("maxSize", maxSize);
			builder.append("list", list);
			return builder.toString();
		}
	}

	@Test
	public void consumeAndProduce() {
		int expectedSize = 1;
		BlockQueue<Integer> queue = new BlockQueue<Integer>(2);
		List<Thread> consumers = Stream.generate(() -> new Thread(() -> {
			try {
				Integer value = queue.take();
				System.out.println(String.format("T[%d] consumer take: %d", Thread.currentThread().getId(), value));
			} catch (InterruptedException ex) {
				ex.printStackTrace();
			}
		})).limit(2).collect(Collectors.toList());
		consumers.forEach(e -> e.start());

		AtomicInteger ids = new AtomicInteger();
		List<Thread> producers = Stream.generate(() -> new Thread(() -> {
			try {
				int value = ids.getAndIncrement();
				queue.put(value);
				System.out.println(String.format("T[%d] producer put: %d", Thread.currentThread().getId(), value));
			} catch (InterruptedException ex) {
				ex.printStackTrace();
			}
		})).limit(3).collect(Collectors.toList());
		producers.forEach(e -> e.start());

		System.out.println(String.format("T[%d] %s", Thread.currentThread().getId(), queue));
		assertEquals(expectedSize, queue.size());
	}
T[11] consumer waiting
T[12] consumer waiting
T[13] producer put: 0
T[12] consumer waiting
T[12] consumer take: 1
T[15] producer put: 1
T[14] producer put: 2
T[11] consumer take: 0
T[1] {"maxSize":2,"list":[2]}

produceAndConsume

Condition signal await Methods 建立 1 個阻塞佇列,容量大小為 2 ,當佇列已滿時, put 方法會暫停調用執行緒,直到能放入物件時,才會喚醒執行緒,當佇列為空時,take 方法會暫停調用執行緒,直到能取出物件時,才會喚醒執行緒, Condition await and signal with Examples 採用先生產再消費的方式,分別執行 3 個生產執行緒跟 2 個消費執行緒。

	@Test
	public void produceAndConsume() {
		int expectedSize = 1;
		BlockQueue<Integer> queue = new BlockQueue<Integer>(2);
		AtomicInteger ids = new AtomicInteger();
		List<Thread> producers = Stream.generate(() -> new Thread(() -> {
			try {
				int value = ids.getAndIncrement();
				queue.put(value);
				System.out.println(String.format("T[%d] producer put: %d", Thread.currentThread().getId(), value));
			} catch (InterruptedException ex) {
				ex.printStackTrace();
			}
		})).limit(3).collect(Collectors.toList());
		producers.forEach(e -> e.start());

		List<Thread> consumers = Stream.generate(() -> new Thread(() -> {
			try {
				Integer value = queue.take();
				System.out.println(String.format("T[%d] consumer take: %d", Thread.currentThread().getId(), value));
			} catch (InterruptedException ex) {
				ex.printStackTrace();
			}
		})).limit(2).collect(Collectors.toList());
		consumers.forEach(e -> e.start());

		System.out.println(String.format("T[%d] %s", Thread.currentThread().getId(), queue));
		assertEquals(expectedSize, queue.size());
	}
T[12] producer waiting
T[13] producer put: 2
T[11] producer put: 0
T[14] consumer take: 0
T[15] consumer take: 2
T[12] producer put: 1
T[1] {"maxSize":2,"list":[1]}

ConditionAwaitSignalTest.java

ReentrantLock Condition await signal 新增單元測試,驗證 Condition await signal Methods 是否符合預期。

package org.ruoxue.java_147.synchronization.reentrantlock;

import static org.junit.Assert.*;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
import org.junit.Test;

public class ConditionAwaitSignalTest {

	protected class BlockQueue<E> {
		private int maxSize;
		private List<E> list = new ArrayList<E>();
		private final Lock lock = new ReentrantLock();
		private Condition condition = lock.newCondition();

		public BlockQueue(int maxSize) {
			this.maxSize = maxSize;
		}

		public void put(E e) throws InterruptedException {
			lock.lock();
			try {
				while (list.size() == maxSize) {
					System.out.println(String.format("T[%d] producer waiting", Thread.currentThread().getId()));
					condition.await();
				}
				boolean added = list.add(e);
				if (added) {
					condition.signal();
				}
			} finally {
				lock.unlock();
			}
		}

		public E take() throws InterruptedException {
			E result = null;
			lock.lock();
			try {
				while (list.size() == 0) {
					System.out.println(String.format("T[%d] consumer waiting", Thread.currentThread().getId()));
					condition.await();
				}
				result = list.remove(0);
				if (result != null) {
					condition.signal();
				}
			} finally {
				lock.unlock();
			}
			return result;
		}

		public int size() {
			return list.size();
		}

		@Override
		public String toString() {
			ToStringBuilder builder = new ToStringBuilder(this, ToStringStyle.JSON_STYLE);
			builder.appendSuper(super.toString());
			builder.append("maxSize", maxSize);
			builder.append("list", list);
			return builder.toString();
		}
	}

	@Test
	public void consumeAndProduce() {
		int expectedSize = 1;
		BlockQueue<Integer> queue = new BlockQueue<Integer>(2);
		List<Thread> consumers = Stream.generate(() -> new Thread(() -> {
			try {
				Integer value = queue.take();
				System.out.println(String.format("T[%d] consumer take: %d", Thread.currentThread().getId(), value));
			} catch (InterruptedException ex) {
				ex.printStackTrace();
			}
		})).limit(2).collect(Collectors.toList());
		consumers.forEach(e -> e.start());

		AtomicInteger ids = new AtomicInteger();
		List<Thread> producers = Stream.generate(() -> new Thread(() -> {
			try {
				int value = ids.getAndIncrement();
				queue.put(value);
				System.out.println(String.format("T[%d] producer put: %d", Thread.currentThread().getId(), value));
			} catch (InterruptedException ex) {
				ex.printStackTrace();
			}
		})).limit(3).collect(Collectors.toList());
		producers.forEach(e -> e.start());

		System.out.println(String.format("T[%d] %s", Thread.currentThread().getId(), queue));
		assertEquals(expectedSize, queue.size());
	}

	@Test
	public void produceAndConsume() {
		int expectedSize = 1;
		BlockQueue<Integer> queue = new BlockQueue<Integer>(2);
		AtomicInteger ids = new AtomicInteger();
		List<Thread> producers = Stream.generate(() -> new Thread(() -> {
			try {
				int value = ids.getAndIncrement();
				queue.put(value);
				System.out.println(String.format("T[%d] producer put: %d", Thread.currentThread().getId(), value));
			} catch (InterruptedException ex) {
				ex.printStackTrace();
			}
		})).limit(3).collect(Collectors.toList());
		producers.forEach(e -> e.start());

		List<Thread> consumers = Stream.generate(() -> new Thread(() -> {
			try {
				Integer value = queue.take();
				System.out.println(String.format("T[%d] consumer take: %d", Thread.currentThread().getId(), value));
			} catch (InterruptedException ex) {
				ex.printStackTrace();
			}
		})).limit(2).collect(Collectors.toList());
		consumers.forEach(e -> e.start());

		System.out.println(String.format("T[%d] %s", Thread.currentThread().getId(), queue));
		assertEquals(expectedSize, queue.size());
	}
}

心得分享

Condition await and signal with Examples 鎖定條件為執行緒提供了在執行程式碼的關鍵時刻,等待某個任意條件發生的能力,通常受某種鎖定機制保護的一段程式碼,ReentrantLock Condition await signal 執行緒可能會取得程式碼關鍵部分的獨占鎖,當發現不具備繼續執行的所需條件時,該執行緒會釋放鎖,並將其狀態改變為等待狀態,直到滿足必要的條件為止, 這表示另一條執行緒,稍後將會向當前等待的執行緒發出信號,讓該執行緒重新取得鎖,並檢查是否已經滿足執行的必要條件。

發佈留言