Table of Contents
ToggleJava ArrayBlockingQueue Class
通常用於多執行緒的應用程式, Queue 的大小是固定的,一旦建立後,便無法再更改其容量,當佇列已滿時,將元素放入佇列,將會導致操作阻塞,同樣地,當佇列無元素時,取得元素也會被阻塞, ArrayBlockingQueue Class in Java 介紹常見的 offer 、 poll 、 put 、 take 、 contains 等方法,本篇增加了範例,並透過單元測試來驗證產出結果。
檔案目錄
./
+- src
+- test
| +- org
| +- ruoxue
| +- java_147
| +- queue
| +- arrayblockingqueue
| +- ArrayBlockingQueueClassTest.java
單元測試
ArrayBlockingQueue Java 提供新增、取得、檢查包含、串流操作佇列中的元素。
offer
建立一個 ArrayBlockingQueue ,增加三個元素。
@Test
public void offer() {
int expectedSize = 3;
BlockingQueue<String> queue = new ArrayBlockingQueue<String>(10);
queue.offer("Papaya");
queue.offer("Strawberry");
queue.offer("Watermelon");
System.out.println(queue);
assertEquals(expectedSize, queue.size());
}
[Papaya, Strawberry, Watermelon]
offerWhenFull
建立一個 ArrayBlockingQueue ,大小容量為 2,當佇列已滿時,加入第三個元素,無法加入,傳回 false。
@Test
public void offerWhenFull() {
int expectedSize = 2;
BlockingQueue<String> queue = new ArrayBlockingQueue<String>(2);
queue.offer("Papaya");
queue.offer("Strawberry");
boolean offered = queue.offer("Watermelon");
System.out.println(offered);
assertFalse(offered);
System.out.println(queue);
assertEquals(expectedSize, queue.size());
}
false
[Papaya, Strawberry]
offerTimeout
ArrayBlockingQueue Java 建立一個 ArrayBlockingQueue ,大小容量為 2,當佇列已滿時,加入第三個元素,等待 3 秒後,若無法加入,傳回 false。
@Test
public void offerTimeout() {
try {
int expectedSize = 2;
BlockingQueue<String> queue = new ArrayBlockingQueue<String>(2);
queue.offer("Papaya");
queue.offer("Strawberry");
boolean offered = queue.offer("Watermelon", 3, TimeUnit.SECONDS);
System.out.println(offered);
assertFalse(offered);
System.out.println(queue);
assertEquals(expectedSize, queue.size());
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
false
[Papaya, Strawberry]
poll
ArrayBlockingQueue Java 建立一個 ArrayBlockingQueue ,內有三個元素,取得第一個位置元素,元素會移出佇列。
@Test
public void poll() {
int expectedSize = 2;
BlockingQueue<String> queue = new ArrayBlockingQueue<String>(10);
queue.offer("Papaya");
queue.offer("Strawberry");
queue.offer("Watermelon");
String value = queue.poll();
System.out.println(value);
assertNotNull(value);
System.out.println(queue);
assertEquals(expectedSize, queue.size());
}
Papaya
[Strawberry, Watermelon]
pollWhenEmpty
ArrayBlockingQueue Java 建立一個 ArrayBlockingQueue ,當佇列為空,沒有任何元素時,取得第一個位置元素,會傳回 null 。
@Test
public void pollWhenEmpty() {
int expectedSize = 0;
BlockingQueue<String> queue = new ArrayBlockingQueue<String>(2);
String value = queue.poll();
System.out.println(value);
assertNull(value);
System.out.println(queue);
assertEquals(expectedSize, queue.size());
}
null
[]
pollTimeout
Blocking Queue Java 建立一個 ArrayBlockingQueue ,當佇列為空,沒有任何元素時,取得第一個位置元素,等待 3 秒後,若無法取得,會傳回 null 。
@Test
public void pollTimeout() {
try {
int expectedSize = 0;
BlockingQueue<String> queue = new ArrayBlockingQueue<String>(2);
String value = queue.poll(3, TimeUnit.SECONDS);
System.out.println(value);
assertNull(value);
System.out.println(queue);
assertEquals(expectedSize, queue.size());
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
null
[]
put
Blocking Queue Java 建立一個 ArrayBlockingQueue ,容量大小為 2 ,當佇列已滿時, put 方法會暫停調用執行緒,直到能放入物件時,才會喚醒執行緒。
@Test
public void put() {
try {
int expectedSize = 2;
BlockingQueue<String> queue = new ArrayBlockingQueue<String>(2);
Thread thread = new Thread(() -> {
try {
String value = queue.take();
System.out.println("T[" + Thread.currentThread().getId() + "] take: " + value);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
});
thread.start();
queue.put("Papaya");
queue.put("Strawberry");
queue.put("Watermelon");
System.out.println("T[" + Thread.currentThread().getId() + "] put: " + queue);
assertEquals(expectedSize, queue.size());
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
T[11] take: Papaya
T[1] put: [Strawberry, Watermelon]
take
Blocking Queue Java 建立一個 ArrayBlockingQueue ,當佇列為空時,take 方法會暫停調用執行緒,直到能取出物件時,才會喚醒執行緒。
@Test
public void take() {
try {
int expectedSize = 2;
BlockingQueue<String> queue = new ArrayBlockingQueue<String>(2);
Thread thread = new Thread(() -> {
try {
queue.put("Papaya");
queue.put("Strawberry");
queue.put("Watermelon");
System.out.println("T[" + Thread.currentThread().getId() + "] put: " + queue);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
});
thread.start();
String value = queue.take();
System.out.println("T[" + Thread.currentThread().getId() + "] take: " + value);
assertEquals(expectedSize, queue.size());
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
T[1] take: Papaya
T[11] put: [Strawberry, Watermelon]
contains
ArrayBlockingQueue Class in Java 建立一個 ArrayBlockingQueue ,內有三個元素,檢查包含指定的元素。
@Test
public void contains() {
BlockingQueue<String> queue = new ArrayBlockingQueue<String>(10);
queue.add("Papaya");
queue.add("Strawberry");
queue.add("Watermelon");
boolean contains = queue.contains("Papaya");
System.out.println(contains);
assertTrue(contains);
contains = queue.contains("Grape");
System.out.println(contains);
assertFalse(contains);
}
true
false
containsAll
ArrayBlockingQueue Class in Java 建立一個 ArrayBlockingQueue ,內有三個元素,查包含所有指定的元素。
@Test
public void containsAll() {
BlockingQueue<String> queue = new ArrayBlockingQueue<String>(10);
queue.add("Papaya");
queue.add("Strawberry");
queue.add("Watermelon");
BlockingQueue<String> queue2 = new ArrayBlockingQueue<String>(10);
queue2.add("Papaya");
queue2.add("Strawberry");
boolean contains = queue.containsAll(queue2);
System.out.println(contains);
assertTrue(contains);
contains = queue2.containsAll(queue);
System.out.println(contains);
assertFalse(contains);
}
true
false
stream
ArrayBlockingQueue Class in Java 建立一個 ArrayBlockingQueue ,內有三個元素,執行串流,取得長度大於 6 的元素。
@Test
public void stream() {
int expectedSize = 2;
BlockingQueue<String> queue = new ArrayBlockingQueue<String>(10);
queue.add("Papaya");
queue.add("Strawberry");
queue.add("Watermelon");
List<String> list = queue.stream().filter(e -> e.length() > 6).collect(Collectors.toList());
System.out.println(list);
assertEquals(expectedSize, list.size());
}
[Strawberry, Watermelon]
parallelStream
建立一個 ArrayBlockingQueue ,內有三個元素,並行執行串流。
@Test
public void parallelStream() {
BlockingQueue<String> queue = new ArrayBlockingQueue<String>(10);
queue.add("Papaya");
queue.add("Strawberry");
queue.add("Watermelon");
queue.parallelStream().forEach(System.out::println);
System.out.println("----------");
queue.parallelStream().forEachOrdered(System.out::println);
}
Strawberry
Papaya
Watermelon
----------
Papaya
Strawberry
Watermelon
retainAll
建立一個 ArrayBlockingQueue ,內各有三個元素,只保留相同元素。
@Test
public void retainAll() {
int expectedSize = 1;
BlockingQueue<String> queue = new ArrayBlockingQueue<String>(10);
queue.add("Papaya");
queue.add("Strawberry");
queue.add("Watermelon");
BlockingQueue<String> queue2 = new ArrayBlockingQueue<String>(10);
queue2.add("Papaya");
queue2.add("Lemon");
queue2.add("Mango");
queue.retainAll(queue2);
System.out.println(queue);
assertEquals(expectedSize, queue.size());
}
[Papaya]
ArrayBlockingQueueClassTest.java
Blocking Queue Java 新增單元測試,驗證是否符合預期。
package org.ruoxue.java_147.queue.arrayblockingqueue;
import static org.junit.Assert.*;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.junit.Test;
public class ArrayBlockingQueueClassTest {
@Test
public void offer() {
int expectedSize = 3;
BlockingQueue<String> queue = new ArrayBlockingQueue<String>(10);
queue.offer("Papaya");
queue.offer("Strawberry");
queue.offer("Watermelon");
System.out.println(queue);
assertEquals(expectedSize, queue.size());
}
@Test
public void offerWhenFull() {
int expectedSize = 2;
BlockingQueue<String> queue = new ArrayBlockingQueue<String>(2);
queue.offer("Papaya");
queue.offer("Strawberry");
boolean offered = queue.offer("Watermelon");
System.out.println(offered);
assertFalse(offered);
System.out.println(queue);
assertEquals(expectedSize, queue.size());
}
@Test
public void offerTimeout() {
try {
int expectedSize = 2;
BlockingQueue<String> queue = new ArrayBlockingQueue<String>(2);
queue.offer("Papaya");
queue.offer("Strawberry");
boolean offered = queue.offer("Watermelon", 3, TimeUnit.SECONDS);
System.out.println(offered);
assertFalse(offered);
System.out.println(queue);
assertEquals(expectedSize, queue.size());
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
@Test
public void poll() {
int expectedSize = 2;
BlockingQueue<String> queue = new ArrayBlockingQueue<String>(10);
queue.offer("Papaya");
queue.offer("Strawberry");
queue.offer("Watermelon");
String value = queue.poll();
System.out.println(value);
assertNotNull(value);
System.out.println(queue);
assertEquals(expectedSize, queue.size());
}
@Test
public void pollWhenEmpty() {
int expectedSize = 0;
BlockingQueue<String> queue = new ArrayBlockingQueue<String>(2);
String value = queue.poll();
System.out.println(value);
assertNull(value);
System.out.println(queue);
assertEquals(expectedSize, queue.size());
}
@Test
public void pollTimeout() {
try {
int expectedSize = 0;
BlockingQueue<String> queue = new ArrayBlockingQueue<String>(2);
String value = queue.poll(3, TimeUnit.SECONDS);
System.out.println(value);
assertNull(value);
System.out.println(queue);
assertEquals(expectedSize, queue.size());
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
@Test
public void put() {
try {
int expectedSize = 2;
BlockingQueue<String> queue = new ArrayBlockingQueue<String>(2);
Thread thread = new Thread(() -> {
try {
String value = queue.take();
System.out.println("T[" + Thread.currentThread().getId() + "] take: " + value);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
});
thread.start();
queue.put("Papaya");
queue.put("Strawberry");
queue.put("Watermelon");
System.out.println("T[" + Thread.currentThread().getId() + "] put: " + queue);
assertEquals(expectedSize, queue.size());
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
@Test
public void take() {
try {
int expectedSize = 2;
BlockingQueue<String> queue = new ArrayBlockingQueue<String>(2);
Thread thread = new Thread(() -> {
try {
queue.put("Papaya");
queue.put("Strawberry");
queue.put("Watermelon");
System.out.println("T[" + Thread.currentThread().getId() + "] put: " + queue);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
});
thread.start();
String value = queue.take();
System.out.println("T[" + Thread.currentThread().getId() + "] take: " + value);
assertEquals(expectedSize, queue.size());
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
@Test
public void contains() {
BlockingQueue<String> queue = new ArrayBlockingQueue<String>(10);
queue.add("Papaya");
queue.add("Strawberry");
queue.add("Watermelon");
boolean contains = queue.contains("Papaya");
System.out.println(contains);
assertTrue(contains);
contains = queue.contains("Grape");
System.out.println(contains);
assertFalse(contains);
}
@Test
public void containsAll() {
BlockingQueue<String> queue = new ArrayBlockingQueue<String>(10);
queue.add("Papaya");
queue.add("Strawberry");
queue.add("Watermelon");
BlockingQueue<String> queue2 = new ArrayBlockingQueue<String>(10);
queue2.add("Papaya");
queue2.add("Strawberry");
boolean contains = queue.containsAll(queue2);
System.out.println(contains);
assertTrue(contains);
contains = queue2.containsAll(queue);
System.out.println(contains);
assertFalse(contains);
}
@Test
public void stream() {
int expectedSize = 2;
BlockingQueue<String> queue = new ArrayBlockingQueue<String>(10);
queue.add("Papaya");
queue.add("Strawberry");
queue.add("Watermelon");
List<String> list = queue.stream().filter(e -> e.length() > 6).collect(Collectors.toList());
System.out.println(list);
assertEquals(expectedSize, list.size());
}
@Test
public void parallelStream() {
BlockingQueue<String> queue = new ArrayBlockingQueue<String>(10);
queue.add("Papaya");
queue.add("Strawberry");
queue.add("Watermelon");
queue.parallelStream().forEach(System.out::println);
System.out.println("----------");
queue.parallelStream().forEachOrdered(System.out::println);
}
@Test
public void retainAll() {
int expectedSize = 1;
BlockingQueue<String> queue = new ArrayBlockingQueue<String>(10);
queue.add("Papaya");
queue.add("Strawberry");
queue.add("Watermelon");
BlockingQueue<String> queue2 = new ArrayBlockingQueue<String>(10);
queue2.add("Papaya");
queue2.add("Lemon");
queue2.add("Mango");
queue.retainAll(queue2);
System.out.println(queue);
assertEquals(expectedSize, queue.size());
}
}
心得分享
ArrayBlockingQueue in Java 使用 FIFO 先進先出的方式,阻塞兩個或多個執行緒對單個資源的操作,使用容量限制因子來阻塞線程, Blocking Queue Java 提供了幾種 ArrayBlockingQueue 常見方法的操作範例。