Table of Contents
ToggleJava SynchronousQueue Methods
這是一個比較特別的佇列,實際上並不是一個真正的佇列,因為不會為佇列中元素維護存儲空間,與其他佇列不同的是,是維護一組執行緒,這些執行緒在等待著把元素加入或移出佇列, Synchronized Queue Java 介紹常見的 add 、 remove 、 take 、 put 等方法,本篇增加了範例,並透過單元測試來驗證產出結果。
檔案目錄
./
+- src
+- test
| +- org
| +- ruoxue
| +- java_147
| +- queue
| +- synchronousqueue
| +- SynchronousQueueMethodsTest.java
單元測試
Java Synchronized Queue 提供新增、取得等操作佇列中的元素。
add
建立一個 SynchronousQueue ,增加一個元素,會拋出例外,因為沒有任何取出,無任何執行緒準備好參與到交付過程中。
@Test(expected = IllegalStateException.class)
public void add() {
int expectedSize = 0;
BlockingQueue<String> queue = new SynchronousQueue<String>();
queue.add("Papaya");
System.out.println(queue);
assertEquals(expectedSize, queue.size());
}
java.lang.IllegalStateException: Queue full
at java.util.AbstractQueue.add(AbstractQueue.java:98)
at org.ruoxue.java_147.queue.SynchronousQueueMethodsTest.add(SynchronousQueueMethodsTest.java:25)
remove
Java Synchronized Queue 建立一個 SynchronousQueue ,內無元素,刪除第一個元素。
@Test
public void remove() {
int expectedSize = 0;
BlockingQueue<String> queue = new SynchronousQueue<String>();
queue.remove("Papaya");
System.out.println(queue);
assertEquals(expectedSize, queue.size());
}
[]
put
Java Synchronized Queue 建立一個 SynchronousQueue ,使用 take 方法取出 1 個元素, put 方法,加入 2 個元素,當加入第 2 個元素時,會阻塞調用執行緒。
@Test
public void put() {
try {
int expectedSize = 0;
BlockingQueue<String> queue = new SynchronousQueue<String>();
Thread thread = new Thread(() -> {
try {
String value = queue.take();
System.out.println("T[" + Thread.currentThread().getId() + "] take: " + value);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
});
thread.start();
queue.put("Papaya");
queue.put("Strawberry");
System.out.println("T[" + Thread.currentThread().getId() + "] put: " + queue);
assertEquals(expectedSize, queue.size());
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
T[11] take: Papaya
take
Java Synchronized Queue 建立一個 SynchronousQueue ,使用 take 方法取出 1 個元素, put 方法,加入 2 個元素,當加入第 2 個元素時,會阻塞調用執行緒。
@Test
public void take() {
try {
int expectedSize = 0;
BlockingQueue<String> queue = new SynchronousQueue<String>();
Thread thread = new Thread(() -> {
try {
queue.put("Papaya");
System.out.println("T[" + Thread.currentThread().getId() + "] put: " + queue);
queue.put("Strawberry");
System.out.println("T[" + Thread.currentThread().getId() + "] put: " + queue);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
});
thread.start();
String value = queue.take();
System.out.println("T[" + Thread.currentThread().getId() + "] take: " + value);
assertEquals(expectedSize, queue.size());
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
T[1] take: Papaya
T[11] put: []
produceAndConsume
Java SynchronousQueue 建立一個 SynchronousQueue ,使用 take 方法取出 2 個元素, put 方法,加入 2 個元素,主執行緒等待 3 秒後,結束任務。
@Test
public void produceAndConsume() {
int expectedSize = 0;
BlockingQueue<Integer> queue = new SynchronousQueue<Integer>();
AtomicInteger ids = new AtomicInteger();
List<Thread> putThreads = Stream.generate(() -> new Thread(() -> {
try {
int value = ids.getAndIncrement();
queue.put(value);
System.out.println("T[" + Thread.currentThread().getId() + "] put: " + value);
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
})).limit(2).collect(Collectors.toList());
putThreads.forEach(e -> e.start());
List<Thread> takeThreads = Stream.generate(() -> new Thread(() -> {
try {
Integer value = queue.take();
System.out.println("T[" + Thread.currentThread().getId() + "] take: " + value);
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
})).limit(2).collect(Collectors.toList());
takeThreads.forEach(e -> e.start());
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
System.out.println("T[" + Thread.currentThread().getId() + "] " + queue);
assertEquals(expectedSize, queue.size());
}
T[13] take: 0
T[14] take: 1
T[11] put: 0
T[12] put: 1
T[1] []
newCachedThreadPool
Java SynchronousQueue 建立一個執行緒池,使用 SynchronousQueue ,無限制數量執行緒,執行 3 個任務,每個任務耗時 3 秒完成,。
@Test
public void newCachedThreadPool() {
ExecutorService executorService = Executors.newCachedThreadPool();
int taskSize = 3;
CountDownLatch latch = new CountDownLatch(taskSize);
try {
IntStream.range(0, taskSize).forEach(e -> {
executorService.execute(() -> {
try {
System.out.println("T[" + Thread.currentThread().getId() + "] task: " + e + " ready");
TimeUnit.SECONDS.sleep(3);
System.out.println("T[" + Thread.currentThread().getId() + "] task: " + e + " finished");
} catch (InterruptedException ex) {
ex.printStackTrace();
} finally {
latch.countDown();
}
});
});
latch.await();
} catch (Exception ex) {
ex.printStackTrace();
}
}
T[11] task: 0 ready
T[13] task: 2 ready
T[12] task: 1 ready
T[11] task: 0 finished
T[13] task: 2 finished
T[12] task: 1 finished
SynchronousQueueMethodsTest.java
Java SynchronousQueue 新增單元測試,驗證是否符合預期。
package org.ruoxue.java_147.queue.synchronousqueue;
import static org.junit.Assert.*;
import java.util.List;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.junit.Test;
public class SynchronousQueueMethodsTest {
@Test(expected = IllegalStateException.class)
public void add() {
int expectedSize = 0;
BlockingQueue<String> queue = new SynchronousQueue<String>();
queue.add("Papaya");
System.out.println(queue);
assertEquals(expectedSize, queue.size());
}
@Test
public void remove() {
int expectedSize = 0;
BlockingQueue<String> queue = new SynchronousQueue<String>();
queue.remove("Papaya");
System.out.println(queue);
assertEquals(expectedSize, queue.size());
}
@Test
public void put() {
try {
int expectedSize = 0;
BlockingQueue<String> queue = new SynchronousQueue<String>();
Thread thread = new Thread(() -> {
try {
String value = queue.take();
System.out.println("T[" + Thread.currentThread().getId() + "] take: " + value);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
});
thread.start();
queue.put("Papaya");
queue.put("Strawberry");
System.out.println("T[" + Thread.currentThread().getId() + "] put: " + queue);
assertEquals(expectedSize, queue.size());
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
@Test
public void take() {
try {
int expectedSize = 0;
BlockingQueue<String> queue = new SynchronousQueue<String>();
Thread thread = new Thread(() -> {
try {
queue.put("Papaya");
System.out.println("T[" + Thread.currentThread().getId() + "] put: " + queue);
queue.put("Strawberry");
System.out.println("T[" + Thread.currentThread().getId() + "] put: " + queue);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
});
thread.start();
String value = queue.take();
System.out.println("T[" + Thread.currentThread().getId() + "] take: " + value);
assertEquals(expectedSize, queue.size());
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
@Test
public void produceAndConsume() {
int expectedSize = 0;
BlockingQueue<Integer> queue = new SynchronousQueue<Integer>();
AtomicInteger ids = new AtomicInteger();
List<Thread> putThreads = Stream.generate(() -> new Thread(() -> {
try {
int value = ids.getAndIncrement();
queue.put(value);
System.out.println("T[" + Thread.currentThread().getId() + "] put: " + value);
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
})).limit(2).collect(Collectors.toList());
putThreads.forEach(e -> e.start());
List<Thread> takeThreads = Stream.generate(() -> new Thread(() -> {
try {
Integer value = queue.take();
System.out.println("T[" + Thread.currentThread().getId() + "] take: " + value);
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
})).limit(2).collect(Collectors.toList());
takeThreads.forEach(e -> e.start());
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
System.out.println("T[" + Thread.currentThread().getId() + "] " + queue);
assertEquals(expectedSize, queue.size());
}
@Test
public void newCachedThreadPool() {
ExecutorService executorService = Executors.newCachedThreadPool();
int taskSize = 3;
CountDownLatch latch = new CountDownLatch(taskSize);
try {
IntStream.range(0, taskSize).forEach(e -> {
executorService.execute(() -> {
try {
System.out.println("T[" + Thread.currentThread().getId() + "] task: " + e + " ready");
TimeUnit.SECONDS.sleep(3);
System.out.println("T[" + Thread.currentThread().getId() + "] task: " + e + " finished");
} catch (InterruptedException ex) {
ex.printStackTrace();
} finally {
latch.countDown();
}
});
});
latch.await();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
心得分享
Synchronous Queue 這種實現佇列的方式,可以直接交付工作,從而降低了將資料從生產者移動到消費者的延遲,沒有存儲功能,因此 put 和 take 會一直阻塞,直到有另一個執行緒已經準備好參與到交付過程中。僅當有足夠多的消費者,並且總是有一個消費者準備好獲取交付的工作時,才適合使用同步佇列, Java SynchronousQueue 提供了幾種 SynchronousQueue 常見方法的操作範例。