Java SynchronousQueue Methods - Java 147

Java SynchronousQueue Methods – Java 147

Java SynchronousQueue Methods

這是一個比較特別的佇列,實際上並不是一個真正的佇列,因為不會為佇列中元素維護存儲空間,與其他佇列不同的是,是維護一組執行緒,這些執行緒在等待著把元素加入或移出佇列, Synchronized Queue Java 介紹常見的 add 、 remove 、 take 、 put 等方法,本篇增加了範例,並透過單元測試來驗證產出結果。

檔案目錄

./
   +- src
       +- test
       |   +- org
       |       +- ruoxue
       |           +- java_147
       |               +- queue
       |                   +- synchronousqueue
       |                       +- SynchronousQueueMethodsTest.java   

單元測試

Java Synchronized Queue 提供新增、取得等操作佇列中的元素。

add

建立一個 SynchronousQueue ,增加一個元素,會拋出例外,因為沒有任何取出,無任何執行緒準備好參與到交付過程中。

	@Test(expected = IllegalStateException.class)
	public void add() {
		int expectedSize = 0;
		BlockingQueue<String> queue = new SynchronousQueue<String>();
		queue.add("Papaya");
		System.out.println(queue);
		assertEquals(expectedSize, queue.size());
	}
java.lang.IllegalStateException: Queue full
	at java.util.AbstractQueue.add(AbstractQueue.java:98)
	at org.ruoxue.java_147.queue.SynchronousQueueMethodsTest.add(SynchronousQueueMethodsTest.java:25)

remove

Java Synchronized Queue 建立一個 SynchronousQueue ,內無元素,刪除第一個元素。

	@Test
	public void remove() {
		int expectedSize = 0;
		BlockingQueue<String> queue = new SynchronousQueue<String>();
		queue.remove("Papaya");
		System.out.println(queue);
		assertEquals(expectedSize, queue.size());
	}
[]

put

Java Synchronized Queue 建立一個 SynchronousQueue ,使用 take 方法取出 1 個元素, put 方法,加入 2 個元素,當加入第 2 個元素時,會阻塞調用執行緒。

	@Test
	public void put() {
		try {
			int expectedSize = 0;
			BlockingQueue<String> queue = new SynchronousQueue<String>();

			Thread thread = new Thread(() -> {
				try {
					String value = queue.take();
					System.out.println("T[" + Thread.currentThread().getId() + "] take: " + value);
				} catch (InterruptedException ex) {
					ex.printStackTrace();
				}
			});
			thread.start();

			queue.put("Papaya");
			queue.put("Strawberry");
			System.out.println("T[" + Thread.currentThread().getId() + "] put: " + queue);
			assertEquals(expectedSize, queue.size());
		} catch (InterruptedException ex) {
			ex.printStackTrace();
		}
	}
T[11] take: Papaya

take

Java Synchronized Queue 建立一個 SynchronousQueue ,使用 take 方法取出 1 個元素, put 方法,加入 2 個元素,當加入第 2 個元素時,會阻塞調用執行緒。

	@Test
	public void take() {
		try {
			int expectedSize = 0;
			BlockingQueue<String> queue = new SynchronousQueue<String>();

			Thread thread = new Thread(() -> {
				try {
					queue.put("Papaya");
					System.out.println("T[" + Thread.currentThread().getId() + "] put: " + queue);
					queue.put("Strawberry");
					System.out.println("T[" + Thread.currentThread().getId() + "] put: " + queue);
				} catch (InterruptedException ex) {
					ex.printStackTrace();
				}
			});
			thread.start();

			String value = queue.take();
			System.out.println("T[" + Thread.currentThread().getId() + "] take: " + value);
			assertEquals(expectedSize, queue.size());
			
			try {
				TimeUnit.SECONDS.sleep(3);
			} catch (InterruptedException ex) {
				ex.printStackTrace();
			}
		} catch (InterruptedException ex) {
			ex.printStackTrace();
		}
	}
T[1] take: Papaya
T[11] put: []

produceAndConsume

Java SynchronousQueue 建立一個 SynchronousQueue ,使用 take 方法取出 2 個元素, put 方法,加入 2 個元素,主執行緒等待 3 秒後,結束任務。

	@Test
	public void produceAndConsume() {
		int expectedSize = 0;
		BlockingQueue<Integer> queue = new SynchronousQueue<Integer>();
		AtomicInteger ids = new AtomicInteger();
		List<Thread> putThreads = Stream.generate(() -> new Thread(() -> {
			try {
				int value = ids.getAndIncrement();
				queue.put(value);
				System.out.println("T[" + Thread.currentThread().getId() + "] put: " + value);
				TimeUnit.SECONDS.sleep(1);
			} catch (InterruptedException ex) {
				ex.printStackTrace();
			}
		})).limit(2).collect(Collectors.toList());
		putThreads.forEach(e -> e.start());

		List<Thread> takeThreads = Stream.generate(() -> new Thread(() -> {
			try {
				Integer value = queue.take();
				System.out.println("T[" + Thread.currentThread().getId() + "] take: " + value);
				TimeUnit.SECONDS.sleep(1);
			} catch (InterruptedException ex) {
				ex.printStackTrace();
			}
		})).limit(2).collect(Collectors.toList());
		takeThreads.forEach(e -> e.start());

		try {
			TimeUnit.SECONDS.sleep(3);
		} catch (InterruptedException ex) {
			ex.printStackTrace();
		}
		System.out.println("T[" + Thread.currentThread().getId() + "] " + queue);
		assertEquals(expectedSize, queue.size());
	}
T[13] take: 0
T[14] take: 1
T[11] put: 0
T[12] put: 1
T[1] []

newCachedThreadPool

Java SynchronousQueue 建立一個執行緒池,使用 SynchronousQueue ,無限制數量執行緒,執行 3 個任務,每個任務耗時 3 秒完成,。

	@Test
	public void newCachedThreadPool() {
		ExecutorService executorService = Executors.newCachedThreadPool();
		int taskSize = 3;
		CountDownLatch latch = new CountDownLatch(taskSize);
		try {
			IntStream.range(0, taskSize).forEach(e -> {
				executorService.execute(() -> {
					try {
						System.out.println("T[" + Thread.currentThread().getId() + "] task: " + e + " ready");
						TimeUnit.SECONDS.sleep(3);
						System.out.println("T[" + Thread.currentThread().getId() + "] task: " + e + " finished");
					} catch (InterruptedException ex) {
						ex.printStackTrace();
					} finally {
						latch.countDown();
					}
				});
			});
			latch.await();
		} catch (Exception ex) {
			ex.printStackTrace();
		}
	}
T[11] task: 0 ready
T[13] task: 2 ready
T[12] task: 1 ready
T[11] task: 0 finished
T[13] task: 2 finished
T[12] task: 1 finished

SynchronousQueueMethodsTest.java

Java SynchronousQueue 新增單元測試,驗證是否符合預期。

package org.ruoxue.java_147.queue.synchronousqueue;

import static org.junit.Assert.*;

import java.util.List;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.junit.Test;

public class SynchronousQueueMethodsTest {

	@Test(expected = IllegalStateException.class)
	public void add() {
		int expectedSize = 0;
		BlockingQueue<String> queue = new SynchronousQueue<String>();
		queue.add("Papaya");
		System.out.println(queue);
		assertEquals(expectedSize, queue.size());
	}

	@Test
	public void remove() {
		int expectedSize = 0;
		BlockingQueue<String> queue = new SynchronousQueue<String>();
		queue.remove("Papaya");
		System.out.println(queue);
		assertEquals(expectedSize, queue.size());
	}

	@Test
	public void put() {
		try {
			int expectedSize = 0;
			BlockingQueue<String> queue = new SynchronousQueue<String>();

			Thread thread = new Thread(() -> {
				try {
					String value = queue.take();
					System.out.println("T[" + Thread.currentThread().getId() + "] take: " + value);
				} catch (InterruptedException ex) {
					ex.printStackTrace();
				}
			});
			thread.start();

			queue.put("Papaya");
			queue.put("Strawberry");
			System.out.println("T[" + Thread.currentThread().getId() + "] put: " + queue);
			assertEquals(expectedSize, queue.size());
		} catch (InterruptedException ex) {
			ex.printStackTrace();
		}
	}

	@Test
	public void take() {
		try {
			int expectedSize = 0;
			BlockingQueue<String> queue = new SynchronousQueue<String>();

			Thread thread = new Thread(() -> {
				try {
					queue.put("Papaya");
					System.out.println("T[" + Thread.currentThread().getId() + "] put: " + queue);
					queue.put("Strawberry");
					System.out.println("T[" + Thread.currentThread().getId() + "] put: " + queue);
				} catch (InterruptedException ex) {
					ex.printStackTrace();
				}
			});
			thread.start();

			String value = queue.take();
			System.out.println("T[" + Thread.currentThread().getId() + "] take: " + value);
			assertEquals(expectedSize, queue.size());
			
			try {
				TimeUnit.SECONDS.sleep(3);
			} catch (InterruptedException ex) {
				ex.printStackTrace();
			}
		} catch (InterruptedException ex) {
			ex.printStackTrace();
		}
	}

	@Test
	public void produceAndConsume() {
		int expectedSize = 0;
		BlockingQueue<Integer> queue = new SynchronousQueue<Integer>();
		AtomicInteger ids = new AtomicInteger();
		List<Thread> putThreads = Stream.generate(() -> new Thread(() -> {
			try {
				int value = ids.getAndIncrement();
				queue.put(value);
				System.out.println("T[" + Thread.currentThread().getId() + "] put: " + value);
				TimeUnit.SECONDS.sleep(1);
			} catch (InterruptedException ex) {
				ex.printStackTrace();
			}
		})).limit(2).collect(Collectors.toList());
		putThreads.forEach(e -> e.start());

		List<Thread> takeThreads = Stream.generate(() -> new Thread(() -> {
			try {
				Integer value = queue.take();
				System.out.println("T[" + Thread.currentThread().getId() + "] take: " + value);
				TimeUnit.SECONDS.sleep(1);
			} catch (InterruptedException ex) {
				ex.printStackTrace();
			}
		})).limit(2).collect(Collectors.toList());
		takeThreads.forEach(e -> e.start());

		try {
			TimeUnit.SECONDS.sleep(3);
		} catch (InterruptedException ex) {
			ex.printStackTrace();
		}
		System.out.println("T[" + Thread.currentThread().getId() + "] " + queue);
		assertEquals(expectedSize, queue.size());
	}

	@Test
	public void newCachedThreadPool() {
		ExecutorService executorService = Executors.newCachedThreadPool();
		int taskSize = 3;
		CountDownLatch latch = new CountDownLatch(taskSize);
		try {
			IntStream.range(0, taskSize).forEach(e -> {
				executorService.execute(() -> {
					try {
						System.out.println("T[" + Thread.currentThread().getId() + "] task: " + e + " ready");
						TimeUnit.SECONDS.sleep(3);
						System.out.println("T[" + Thread.currentThread().getId() + "] task: " + e + " finished");
					} catch (InterruptedException ex) {
						ex.printStackTrace();
					} finally {
						latch.countDown();
					}
				});
			});
			latch.await();
		} catch (Exception ex) {
			ex.printStackTrace();
		}
	}
}

心得分享

Synchronous Queue 這種實現佇列的方式,可以直接交付工作,從而降低了將資料從生產者移動到消費者的延遲,沒有存儲功能,因此 put 和 take 會一直阻塞,直到有另一個執行緒已經準備好參與到交付過程中。僅當有足夠多的消費者,並且總是有一個消費者準備好獲取交付的工作時,才適合使用同步佇列, Java SynchronousQueue 提供了幾種 SynchronousQueue 常見方法的操作範例。

發佈留言