Table of Contents
ToggleJava LinkedBlockingQueue Class
執行緒安全鏈結阻塞佇列,是通過悲觀鎖來保證的,通常用於多執行緒的應用程式, 建立時若無固定大小容量, 佇列會隨著元素的增加而動態成長, LinkedBlockingQueue Class in Java 介紹常見的 offer 、 poll 、 put 、 take 、 contains 等方法,本篇增加了範例,並透過單元測試來驗證產出結果。
檔案目錄
./
+- src
+- test
| +- org
| +- ruoxue
| +- java_147
| +- queue
| +- linkedblockingqueue
| +- LinkedBlockingQueueClassTest.java
單元測試
LinkedBlockingQueue java 提供新增、取得、檢查包含、串流操作佇列中的元素。
offer
建立一個 LinkedBlockingQueue ,增加三個元素。
@Test
public void offer() {
int expectedSize = 3;
BlockingQueue<String> queue = new LinkedBlockingQueue<String>();
queue.offer("Papaya");
queue.offer("Strawberry");
queue.offer("Watermelon");
System.out.println(queue);
assertEquals(expectedSize, queue.size());
}
[Papaya, Strawberry, Watermelon]
offerWhenFull
建立一個 LinkedBlockingQueue ,大小容量為 2,當佇列已滿時,加入第三個元素,無法加入,傳回 false。
@Test
public void offerWhenFull() {
int expectedSize = 2;
BlockingQueue<String> queue = new LinkedBlockingQueue<String>(2);
queue.offer("Papaya");
queue.offer("Strawberry");
boolean offered = queue.offer("Watermelon");
System.out.println(offered);
assertFalse(offered);
System.out.println(queue);
assertEquals(expectedSize, queue.size());
}
false
[Papaya, Strawberry]
offerTimeout
LinkedBlockingQueue java 建立一個 LinkedBlockingQueue ,大小容量為 2,當佇列已滿時,加入第三個元素,等待 3 秒後,若無法加入,傳回 false。
@Test
public void offerTimeout() {
try {
int expectedSize = 2;
BlockingQueue<String> queue = new LinkedBlockingQueue<String>(2);
queue.offer("Papaya");
queue.offer("Strawberry");
boolean offered = queue.offer("Watermelon", 3, TimeUnit.SECONDS);
System.out.println(offered);
assertFalse(offered);
System.out.println(queue);
assertEquals(expectedSize, queue.size());
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
false
[Papaya, Strawberry]
poll
LinkedBlockingQueue java 建立一個 LinkedBlockingQueue ,內有三個元素,取得第一個位置元素,元素會移出佇列。
@Test
public void poll() {
int expectedSize = 2;
BlockingQueue<String> queue = new LinkedBlockingQueue<String>();
queue.offer("Papaya");
queue.offer("Strawberry");
queue.offer("Watermelon");
String value = queue.poll();
System.out.println(value);
assertNotNull(value);
System.out.println(queue);
assertEquals(expectedSize, queue.size());
}
Papaya
[Strawberry, Watermelon]
pollWhenEmpty
LinkedBlockingQueue java 建立一個 LinkedBlockingQueue ,當佇列為空,沒有任何元素時,取得第一個位置元素,會傳回 null 。
@Test
public void pollWhenEmpty() {
int expectedSize = 0;
BlockingQueue<String> queue = new LinkedBlockingQueue<String>();
String value = queue.poll();
System.out.println(value);
assertNull(value);
System.out.println(queue);
assertEquals(expectedSize, queue.size());
}
null
[]
pollTimeout
LinkedBlockingQueue java 建立一個 LinkedBlockingQueue ,當佇列為空,沒有任何元素時,取得第一個位置元素,等待 3 秒後,若無法取得,會傳回 null 。
@Test
public void pollTimeout() {
try {
int expectedSize = 0;
BlockingQueue<String> queue = new LinkedBlockingQueue<String>();
String value = queue.poll(3, TimeUnit.SECONDS);
System.out.println(value);
assertNull(value);
System.out.println(queue);
assertEquals(expectedSize, queue.size());
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
null
[]
put
BlockingQueue Java 建立一個 LinkedBlockingQueue ,容量大小為 2 ,當佇列已滿時, put 方法會暫停調用執行緒,直到能放入物件時,才會喚醒執行緒。
@Test
public void put() {
try {
int expectedSize = 2;
BlockingQueue<String> queue = new LinkedBlockingQueue<String>(2);
Thread thread = new Thread(() -> {
try {
String value = queue.take();
System.out.println("T[" + Thread.currentThread().getId() + "] take: " + value);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
});
thread.start();
queue.put("Papaya");
queue.put("Strawberry");
queue.put("Watermelon");
System.out.println("T[" + Thread.currentThread().getId() + "] put: " + queue);
assertEquals(expectedSize, queue.size());
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
T[11] take: Papaya
T[1] put: [Strawberry, Watermelon]
take
BlockingQueue Java 建立一個 LinkedBlockingQueue ,當佇列為空時,take 方法會暫停調用執行緒,直到能取出物件時,才會喚醒執行緒。
@Test
public void take() {
try {
int expectedSize = 2;
BlockingQueue<String> queue = new ArrayBlockingQueue<String>(2);
Thread thread = new Thread(() -> {
try {
queue.put("Papaya");
queue.put("Strawberry");
queue.put("Watermelon");
System.out.println("T[" + Thread.currentThread().getId() + "] put: " + queue);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
});
thread.start();
String value = queue.take();
System.out.println("T[" + Thread.currentThread().getId() + "] take: " + value);
assertEquals(expectedSize, queue.size());
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
T[1] take: Papaya
T[11] put: [Strawberry, Watermelon]
contains
BlockingQueue Java 建立一個 LinkedBlockingQueue ,內有三個元素,檢查包含指定的元素。
@Test
public void contains() {
BlockingQueue<String> queue = new LinkedBlockingQueue<String>();
queue.add("Papaya");
queue.add("Strawberry");
queue.add("Watermelon");
boolean contains = queue.contains("Papaya");
System.out.println(contains);
assertTrue(contains);
contains = queue.contains("Grape");
System.out.println(contains);
assertFalse(contains);
}
true
false
containsAll
BlockingQueue Java 建立一個 LinkedBlockingQueue ,內有三個元素,查包含所有指定的元素。
@Test
public void containsAll() {
BlockingQueue<String> queue = new LinkedBlockingQueue<String>();
queue.add("Papaya");
queue.add("Strawberry");
queue.add("Watermelon");
BlockingQueue<String> queue2 = new LinkedBlockingQueue<String>();
queue2.add("Papaya");
queue2.add("Strawberry");
boolean contains = queue.containsAll(queue2);
System.out.println(contains);
assertTrue(contains);
contains = queue2.containsAll(queue);
System.out.println(contains);
assertFalse(contains);
}
true
false
stream
建立一個 LinkedBlockingQueue ,內有三個元素,執行串流,取得長度大於 6 的元素。
@Test
public void stream() {
int expectedSize = 2;
BlockingQueue<String> queue = new LinkedBlockingQueue<String>();
queue.add("Papaya");
queue.add("Strawberry");
queue.add("Watermelon");
List<String> list = queue.stream().filter(e -> e.length() > 6).collect(Collectors.toList());
System.out.println(list);
assertEquals(expectedSize, list.size());
}
[Strawberry, Watermelon]
parallelStream
建立一個 LinkedBlockingQueue ,內有三個元素,並行執行串流。
@Test
public void parallelStream() {
BlockingQueue<String> queue = new LinkedBlockingQueue<String>();
queue.add("Papaya");
queue.add("Strawberry");
queue.add("Watermelon");
queue.parallelStream().forEach(System.out::println);
System.out.println("----------");
queue.parallelStream().forEachOrdered(System.out::println);
}
Strawberry
Papaya
Watermelon
----------
Papaya
Strawberry
Watermelon
retainAll
建立一個 LinkedBlockingQueue ,內各有三個元素,只保留相同元素。
@Test
public void retainAll() {
int expectedSize = 1;
BlockingQueue<String> queue = new LinkedBlockingQueue<String>();
queue.add("Papaya");
queue.add("Strawberry");
queue.add("Watermelon");
BlockingQueue<String> queue2 = new LinkedBlockingQueue<String>();
queue2.add("Papaya");
queue2.add("Lemon");
queue2.add("Mango");
queue.retainAll(queue2);
System.out.println(queue);
assertEquals(expectedSize, queue.size());
}
[Papaya]
LinkedBlockingQueueClassTest.java
BlockingQueue Java 新增單元測試,驗證是否符合預期。
package org.ruoxue.java_147.queue.linkedblockingqueue;
import static org.junit.Assert.*;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.junit.Test;
public class LinkedBlockingQueueClassTest {
@Test
public void offer() {
int expectedSize = 3;
BlockingQueue<String> queue = new LinkedBlockingQueue<String>();
queue.offer("Papaya");
queue.offer("Strawberry");
queue.offer("Watermelon");
System.out.println(queue);
assertEquals(expectedSize, queue.size());
}
@Test
public void offerWhenFull() {
int expectedSize = 2;
BlockingQueue<String> queue = new LinkedBlockingQueue<String>(2);
queue.offer("Papaya");
queue.offer("Strawberry");
boolean offered = queue.offer("Watermelon");
System.out.println(offered);
assertFalse(offered);
System.out.println(queue);
assertEquals(expectedSize, queue.size());
}
@Test
public void offerTimeout() {
try {
int expectedSize = 2;
BlockingQueue<String> queue = new LinkedBlockingQueue<String>(2);
queue.offer("Papaya");
queue.offer("Strawberry");
boolean offered = queue.offer("Watermelon", 3, TimeUnit.SECONDS);
System.out.println(offered);
assertFalse(offered);
System.out.println(queue);
assertEquals(expectedSize, queue.size());
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
@Test
public void poll() {
int expectedSize = 2;
BlockingQueue<String> queue = new LinkedBlockingQueue<String>();
queue.offer("Papaya");
queue.offer("Strawberry");
queue.offer("Watermelon");
String value = queue.poll();
System.out.println(value);
assertNotNull(value);
System.out.println(queue);
assertEquals(expectedSize, queue.size());
}
@Test
public void pollWhenEmpty() {
int expectedSize = 0;
BlockingQueue<String> queue = new LinkedBlockingQueue<String>();
String value = queue.poll();
System.out.println(value);
assertNull(value);
System.out.println(queue);
assertEquals(expectedSize, queue.size());
}
@Test
public void pollTimeout() {
try {
int expectedSize = 0;
BlockingQueue<String> queue = new LinkedBlockingQueue<String>();
String value = queue.poll(3, TimeUnit.SECONDS);
System.out.println(value);
assertNull(value);
System.out.println(queue);
assertEquals(expectedSize, queue.size());
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
@Test
public void put() {
try {
int expectedSize = 2;
BlockingQueue<String> queue = new LinkedBlockingQueue<String>(2);
Thread thread = new Thread(() -> {
try {
String value = queue.take();
System.out.println("T[" + Thread.currentThread().getId() + "] take: " + value);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
});
thread.start();
queue.put("Papaya");
queue.put("Strawberry");
queue.put("Watermelon");
System.out.println("T[" + Thread.currentThread().getId() + "] put: " + queue);
assertEquals(expectedSize, queue.size());
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
@Test
public void take() {
try {
int expectedSize = 2;
BlockingQueue<String> queue = new LinkedBlockingQueue<String>(2);
Thread thread = new Thread(() -> {
try {
queue.put("Papaya");
queue.put("Strawberry");
queue.put("Watermelon");
System.out.println("T[" + Thread.currentThread().getId() + "] put: " + queue);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
});
thread.start();
String value = queue.take();
System.out.println("T[" + Thread.currentThread().getId() + "] take: " + value);
assertEquals(expectedSize, queue.size());
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
@Test
public void contains() {
BlockingQueue<String> queue = new LinkedBlockingQueue<String>();
queue.add("Papaya");
queue.add("Strawberry");
queue.add("Watermelon");
boolean contains = queue.contains("Papaya");
System.out.println(contains);
assertTrue(contains);
contains = queue.contains("Grape");
System.out.println(contains);
assertFalse(contains);
}
@Test
public void containsAll() {
BlockingQueue<String> queue = new LinkedBlockingQueue<String>();
queue.add("Papaya");
queue.add("Strawberry");
queue.add("Watermelon");
BlockingQueue<String> queue2 = new LinkedBlockingQueue<String>();
queue2.add("Papaya");
queue2.add("Strawberry");
boolean contains = queue.containsAll(queue2);
System.out.println(contains);
assertTrue(contains);
contains = queue2.containsAll(queue);
System.out.println(contains);
assertFalse(contains);
}
@Test
public void stream() {
int expectedSize = 2;
BlockingQueue<String> queue = new LinkedBlockingQueue<String>();
queue.add("Papaya");
queue.add("Strawberry");
queue.add("Watermelon");
List<String> list = queue.stream().filter(e -> e.length() > 6).collect(Collectors.toList());
System.out.println(list);
assertEquals(expectedSize, list.size());
}
@Test
public void parallelStream() {
BlockingQueue<String> queue = new LinkedBlockingQueue<String>();
queue.add("Papaya");
queue.add("Strawberry");
queue.add("Watermelon");
queue.parallelStream().forEach(System.out::println);
System.out.println("----------");
queue.parallelStream().forEachOrdered(System.out::println);
}
@Test
public void retainAll() {
int expectedSize = 1;
BlockingQueue<String> queue = new LinkedBlockingQueue<String>();
queue.add("Papaya");
queue.add("Strawberry");
queue.add("Watermelon");
BlockingQueue<String> queue2 = new LinkedBlockingQueue<String>();
queue2.add("Papaya");
queue2.add("Lemon");
queue2.add("Mango");
queue.retainAll(queue2);
System.out.println(queue);
assertEquals(expectedSize, queue.size());
}
}
心得分享
LinkedBlockingQueue in Java 為了防止生產者和消費者執行緒之間的爭用鎖,使用兩個獨立的 ReentrantLocks 進行讀取和寫入操作,可建立容量有限或無限的佇列,採取 FIFO 先進先出的策略, BlockingQueue Java 提供了幾種 LinkedBlockingQueue 常見方法的操作範例。