Java ReentrantLock with Conditions - Java 147

Java ReentrantLock with Conditions – Java 147

Java ReentrantLock with Conditions

鎖定條件為執行緒提供了在執行程式碼的關鍵時刻,等待某個任意條件發生的能力, Java Conditions with Locks 通常受某種鎖定機制保護的一段程式碼,執行緒可能會取得程式碼關鍵部分的獨占鎖,當發現不具備繼續執行的所需條件時,該執行緒會釋放鎖,並將其狀態改變為等待狀態,直到滿足必要的條件為止, Lock Conditions in Java 這表示另一條執行緒,稍後將會向當前等待的執行緒發出信號,讓該執行緒重新取得鎖,並檢查是否已經滿足執行的必要條件, Locks and Conditions 本篇增加了範例,並透過單元測試來驗證產出結果。

檔案目錄

./
   +- src
       +- test
       |   +- org
       |       +- ruoxue
       |           +- java_147
       |               +- synchronization
       |                   +- reentrantlock
       |                       +- ReentrantLockWithConditionsTest.java   

單元測試

Locks and Conditions 提供鎖定條件、喚醒條件等操作。

consumeAndProduce

Locks and Conditions 建立 1 個阻塞佇列,容量大小為 2 ,當佇列已滿時, put 方法會暫停調用執行緒,直到能放入物件時,才會喚醒執行緒,當佇列為空時,take 方法會暫停調用執行緒,直到能取出物件時,才會喚醒執行緒, Java ReentrantLock with Conditions 採用先消費再生產的方式,分別執行 2 個消費執行緒跟 3 個生產執行緒。

	protected class BlockQueue<E> {
		private int maxSize;
		private List<E> list = new ArrayList<E>();
		private final Lock lock = new ReentrantLock();
		private Condition notEmpty = lock.newCondition();
		private Condition notFull = lock.newCondition();

		public BlockQueue(int maxSize) {
			this.maxSize = maxSize;
		}

		public void put(E e) throws InterruptedException {
			lock.lock();
			try {
				while (list.size() == maxSize) {
					System.out.println(String.format("T[%d] producer waiting", Thread.currentThread().getId()));
					notEmpty.await();
				}
				boolean added = list.add(e);
				if (added) {
					notFull.signal();
				}
			} finally {
				lock.unlock();
			}
		}

		public E take() throws InterruptedException {
			E result = null;
			lock.lock();
			try {
				while (list.size() == 0) {
					System.out.println(String.format("T[%d] comsumer waiting", Thread.currentThread().getId()));
					notFull.await();
				}
				result = list.remove(0);
				if (result != null) {
					notEmpty.signal();
				}
			} finally {
				lock.unlock();
			}
			return result;
		}

		public int size() {
			return list.size();
		}

		@Override
		public String toString() {
			ToStringBuilder builder = new ToStringBuilder(this, ToStringStyle.JSON_STYLE);
			builder.appendSuper(super.toString());
			builder.append("maxSize", maxSize);
			builder.append("list", list);
			return builder.toString();
		}
	}

	@Test
	public void consumeAndProduce() {
		int expectedSize = 1;
		BlockQueue<Integer> queue = new BlockQueue<Integer>(2);
		List<Thread> consumers = Stream.generate(() -> new Thread(() -> {
			try {
				Integer value = queue.take();
				System.out.println(String.format("T[%d] consumer take: %d", Thread.currentThread().getId(), value));
			} catch (InterruptedException ex) {
				ex.printStackTrace();
			}
		})).limit(2).collect(Collectors.toList());
		consumers.forEach(e -> e.start());

		AtomicInteger ids = new AtomicInteger();
		List<Thread> producers = Stream.generate(() -> new Thread(() -> {
			try {
				int value = ids.getAndIncrement();
				queue.put(value);
				System.out.println(String.format("T[%d] producer put: %d", Thread.currentThread().getId(), value));
			} catch (InterruptedException ex) {
				ex.printStackTrace();
			}
		})).limit(3).collect(Collectors.toList());
		producers.forEach(e -> e.start());

		System.out.println("T[" + Thread.currentThread().getId() + "] " + queue);
		assertEquals(expectedSize, queue.size());
	}
T[11] comsumer waiting
T[12] comsumer waiting
T[14] producer put: 0
T[13] producer put: 1
T[15] producer waiting
T[11] comsumer take: 0
T[12] comsumer take: 1
T[15] producer put: 2
T[1] {"maxSize":2,"list":[2]}

doWorker

Locks and Conditions 建立 3 個執行緒,限制最大執行緒可執行數量為 2 ,當超過限制時,其他執行緒將會暫停等待,直到有可執行數量, Java Conditions with Locks 才會繼續執行任務。

	protected class DoWorker {

		private final Lock lock = new ReentrantLock();
		private Condition condition = lock.newCondition();
		private AtomicInteger count = new AtomicInteger();
		private static final int MAX_COUNT = 2;

		public DoWorker() {
		}

		public void doWork() throws Exception {
			await();
			try {
				System.out.println(String.format("T[%d] ready", Thread.currentThread().getId()));
				TimeUnit.SECONDS.sleep(3);
			} finally {
				signal();
			}
		}

		public void await() throws InterruptedException {
			lock.lock();
			try {
				while (count.get() >= MAX_COUNT) {
					System.out.println(String.format("T[%d] waiting", Thread.currentThread().getId()));
					condition.await();
				}
				count.incrementAndGet();
			} finally {
				lock.unlock();
			}
		}

		public void signal() throws InterruptedException {
			lock.lock();
			try {
				count.decrementAndGet();
				System.out.println(String.format("T[%d] finished", Thread.currentThread().getId()));
				condition.signal();
			} finally {
				lock.unlock();
			}
		}
	}

	@Test
	public void doWorker() {
		DoWorker worker = new DoWorker();
		List<Thread> threads = Stream.generate(() -> new Thread(() -> {
			try {
				worker.doWork();
			} catch (Exception ex) {
				ex.printStackTrace();
			}
		})).limit(3).collect(Collectors.toList());
		threads.forEach(e -> e.start());

		threads.forEach(e -> {
			try {
				e.join();
			} catch (InterruptedException ex) {
				ex.printStackTrace();
			}
		});
	}
T[11] ready
T[12] ready
T[13] waiting
T[12] finished
T[11] finished
T[13] ready
T[13] finished

timeoutWorker

Locks and Conditions 建立 3 個執行緒,限制最大執行緒可執行數量為 2 ,當超過限制時,其他執行緒將會暫停等待,等待 2 秒後逾時, Java Conditions with Locks 會拋出例外,結束任務。

	protected class TimeoutWorker {

		private final Lock lock = new ReentrantLock();
		private Condition condition = lock.newCondition();
		private AtomicInteger count = new AtomicInteger();
		private static final int MAX_COUNT = 2;

		public TimeoutWorker() {
		}

		public void doWork() throws Exception {
			await();
			try {
				System.out.println(String.format("T[%d] ready", Thread.currentThread().getId()));
				TimeUnit.SECONDS.sleep(3);
			} finally {
				signal();
			}
		}

		public void await() throws InterruptedException, TimeoutException {
			lock.lock();
			try {
				boolean awaited = false;
				long timeoutRemaining = 2000;
				long awaitStarted = System.currentTimeMillis();
				while (count.get() >= MAX_COUNT && !awaited && timeoutRemaining > 0) {
					System.out.println(String.format("T[%d] waiting", Thread.currentThread().getId()));
					awaited = condition.await(timeoutRemaining, TimeUnit.MILLISECONDS);
					timeoutRemaining -= System.currentTimeMillis() - awaitStarted;
				}
				if (count.get() >= MAX_COUNT) {
					throw new TimeoutException("T[" + Thread.currentThread().getId() + "] ");
				}
				count.incrementAndGet();
			} finally {
				lock.unlock();
			}
		}

		public void signal() throws InterruptedException {
			lock.lock();
			try {
				count.decrementAndGet();
				System.out.println(String.format("T[%d] finished", Thread.currentThread().getId()));
				condition.signal();
			} finally {
				lock.unlock();
			}
		}
	}

	@Test
	public void timeoutWorker() {
		TimeoutWorker worker = new TimeoutWorker();
		List<Thread> threads = Stream.generate(() -> new Thread(() -> {
			try {
				worker.doWork();
			} catch (Exception ex) {
				ex.printStackTrace();
			}
		})).limit(3).collect(Collectors.toList());
		threads.forEach(e -> e.start());

		threads.forEach(e -> {
			try {
				e.join();
			} catch (InterruptedException ex) {
				ex.printStackTrace();
			}
		});
	}
T[11] ready
T[13] waiting
T[12] ready
java.util.concurrent.TimeoutException: T[13] 
	at org.ruoxue.java_147.synchronization.ReentrantLockWithConditionsTest$TimeoutWorker.await(ReentrantLockWithConditionsTest.java:209)
	at org.ruoxue.java_147.synchronization.ReentrantLockWithConditionsTest$TimeoutWorker.doWork(ReentrantLockWithConditionsTest.java:188)
	at org.ruoxue.java_147.synchronization.ReentrantLockWithConditionsTest.lambda$11(ReentrantLockWithConditionsTest.java:234)
	at java.lang.Thread.run(Thread.java:750)
T[11] finished
T[12] finished

ReentrantLockWithConditionsTest.java

Lock and Condition in Java 新增單元測試,驗證 Lock Conditions in Java 是否符合預期。

package org.ruoxue.java_147.synchronization.reentrantlock;

import static org.junit.Assert.assertEquals;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
import org.junit.Test;

public class ReentrantLockWithConditionsTest {

	protected class BlockQueue<E> {
		private int maxSize;
		private List<E> list = new ArrayList<E>();
		private final Lock lock = new ReentrantLock();
		private Condition notEmpty = lock.newCondition();
		private Condition notFull = lock.newCondition();

		public BlockQueue(int maxSize) {
			this.maxSize = maxSize;
		}

		public void put(E e) throws InterruptedException {
			lock.lock();
			try {
				while (list.size() == maxSize) {
					System.out.println(String.format("T[%d] producer waiting", Thread.currentThread().getId()));
					notEmpty.await();
				}
				boolean added = list.add(e);
				if (added) {
					notFull.signal();
				}
			} finally {
				lock.unlock();
			}
		}

		public E take() throws InterruptedException {
			E result = null;
			lock.lock();
			try {
				while (list.size() == 0) {
					System.out.println(String.format("T[%d] comsumer waiting", Thread.currentThread().getId()));
					notFull.await();
				}
				result = list.remove(0);
				if (result != null) {
					notEmpty.signal();
				}
			} finally {
				lock.unlock();
			}
			return result;
		}

		public int size() {
			return list.size();
		}

		@Override
		public String toString() {
			ToStringBuilder builder = new ToStringBuilder(this, ToStringStyle.JSON_STYLE);
			builder.appendSuper(super.toString());
			builder.append("maxSize", maxSize);
			builder.append("list", list);
			return builder.toString();
		}
	}

	@Test
	public void consumeAndProduce() {
		int expectedSize = 1;
		BlockQueue<Integer> queue = new BlockQueue<Integer>(2);
		List<Thread> consumers = Stream.generate(() -> new Thread(() -> {
			try {
				Integer value = queue.take();
				System.out.println(String.format("T[%d] consumer take: %d", Thread.currentThread().getId(), value));
			} catch (InterruptedException ex) {
				ex.printStackTrace();
			}
		})).limit(2).collect(Collectors.toList());
		consumers.forEach(e -> e.start());

		AtomicInteger ids = new AtomicInteger();
		List<Thread> producers = Stream.generate(() -> new Thread(() -> {
			try {
				int value = ids.getAndIncrement();
				queue.put(value);
				System.out.println(String.format("T[%d] producer put: %d", Thread.currentThread().getId(), value));
			} catch (InterruptedException ex) {
				ex.printStackTrace();
			}
		})).limit(3).collect(Collectors.toList());
		producers.forEach(e -> e.start());

		System.out.println("T[" + Thread.currentThread().getId() + "] " + queue);
		assertEquals(expectedSize, queue.size());
	}

	protected class DoWorker {

		private final Lock lock = new ReentrantLock();
		private Condition condition = lock.newCondition();
		private AtomicInteger count = new AtomicInteger();
		private static final int MAX_COUNT = 2;

		public DoWorker() {
		}

		public void doWork() throws Exception {
			await();
			try {
				System.out.println(String.format("T[%d] ready", Thread.currentThread().getId()));
				TimeUnit.SECONDS.sleep(3);
			} finally {
				signal();
			}
		}

		public void await() throws InterruptedException {
			lock.lock();
			try {
				while (count.get() >= MAX_COUNT) {
					System.out.println(String.format("T[%d] waiting", Thread.currentThread().getId()));
					condition.await();
				}
				count.incrementAndGet();
			} finally {
				lock.unlock();
			}
		}

		public void signal() throws InterruptedException {
			lock.lock();
			try {
				count.decrementAndGet();
				System.out.println(String.format("T[%d] finished", Thread.currentThread().getId()));
				condition.signal();
			} finally {
				lock.unlock();
			}
		}
	}

	@Test
	public void doWorker() {
		DoWorker worker = new DoWorker();
		List<Thread> threads = Stream.generate(() -> new Thread(() -> {
			try {
				worker.doWork();
			} catch (Exception ex) {
				ex.printStackTrace();
			}
		})).limit(3).collect(Collectors.toList());
		threads.forEach(e -> e.start());

		threads.forEach(e -> {
			try {
				e.join();
			} catch (InterruptedException ex) {
				ex.printStackTrace();
			}
		});
	}

	protected class TimeoutWorker {

		private final Lock lock = new ReentrantLock();
		private Condition condition = lock.newCondition();
		private AtomicInteger count = new AtomicInteger();
		private static final int MAX_COUNT = 2;

		public TimeoutWorker() {
		}

		public void doWork() throws Exception {
			await();
			try {
				System.out.println(String.format("T[%d] ready", Thread.currentThread().getId()));
				TimeUnit.SECONDS.sleep(3);
			} finally {
				signal();
			}
		}

		public void await() throws InterruptedException, TimeoutException {
			lock.lock();
			try {
				boolean awaited = false;
				long timeoutRemaining = 2000;
				long awaitStarted = System.currentTimeMillis();
				while (count.get() >= MAX_COUNT && !awaited && timeoutRemaining > 0) {
					System.out.println(String.format("T[%d] waiting", Thread.currentThread().getId()));
					awaited = condition.await(timeoutRemaining, TimeUnit.MILLISECONDS);
					timeoutRemaining -= System.currentTimeMillis() - awaitStarted;
				}
				if (count.get() >= MAX_COUNT) {
					throw new TimeoutException("T[" + Thread.currentThread().getId() + "] ");
				}
				count.incrementAndGet();
			} finally {
				lock.unlock();
			}
		}

		public void signal() throws InterruptedException {
			lock.lock();
			try {
				count.decrementAndGet();
				System.out.println(String.format("T[%d] finished", Thread.currentThread().getId()));
				condition.signal();
			} finally {
				lock.unlock();
			}
		}
	}

	@Test
	public void timeoutWorker() {
		TimeoutWorker worker = new TimeoutWorker();
		List<Thread> threads = Stream.generate(() -> new Thread(() -> {
			try {
				worker.doWork();
			} catch (Exception ex) {
				ex.printStackTrace();
			}
		})).limit(3).collect(Collectors.toList());
		threads.forEach(e -> e.start());

		threads.forEach(e -> {
			try {
				e.join();
			} catch (InterruptedException ex) {
				ex.printStackTrace();
			}
		});
	}
}

心得分享

Java Conditions with Locks 讓執行緒暫停關鍵程式碼的執行,並因此進入等待狀態,必須已經持有同一關鍵程式碼的鎖,當執行緒進入等待狀態時,會自動釋放相應的鎖並暫停執行,Lock and Condition in Java 這表示一旦讓該執行緒暫停執行並且釋放鎖,其他執行緒就可以取得該鎖,喚醒該執行緒,繼續執行符合條件的任務。

發佈留言