Java ArrayBlockingQueue Class - Java 147

Java ArrayBlockingQueue Class – Java 147

Java ArrayBlockingQueue Class

通常用於多執行緒的應用程式, Queue 的大小是固定的,一旦建立後,便無法再更改其容量,當佇列已滿時,將元素放入佇列,將會導致操作阻塞,同樣地,當佇列無元素時,取得元素也會被阻塞, ArrayBlockingQueue Class in Java 介紹常見的 offer 、 poll 、 put 、 take 、 contains 等方法,本篇增加了範例,並透過單元測試來驗證產出結果。

檔案目錄

./
   +- src
       +- test
       |   +- org
       |       +- ruoxue
       |           +- java_147
       |               +- queue
       |                   +- arrayblockingqueue
       |                       +- ArrayBlockingQueueClassTest.java  

單元測試

ArrayBlockingQueue Java 提供新增、取得、檢查包含、串流操作佇列中的元素。

offer

建立一個 ArrayBlockingQueue ,增加三個元素。

	@Test
	public void offer() {
		int expectedSize = 3;
		BlockingQueue<String> queue = new ArrayBlockingQueue<String>(10);
		queue.offer("Papaya");
		queue.offer("Strawberry");
		queue.offer("Watermelon");
		System.out.println(queue);
		assertEquals(expectedSize, queue.size());
	}
[Papaya, Strawberry, Watermelon]

offerWhenFull

建立一個 ArrayBlockingQueue ,大小容量為 2,當佇列已滿時,加入第三個元素,無法加入,傳回 false。

	@Test
	public void offerWhenFull() {
		int expectedSize = 2;
		BlockingQueue<String> queue = new ArrayBlockingQueue<String>(2);
		queue.offer("Papaya");
		queue.offer("Strawberry");
		boolean offered = queue.offer("Watermelon");
		System.out.println(offered);
		assertFalse(offered);
		System.out.println(queue);
		assertEquals(expectedSize, queue.size());
	}
false
[Papaya, Strawberry]

offerTimeout

ArrayBlockingQueue Java 建立一個 ArrayBlockingQueue ,大小容量為 2,當佇列已滿時,加入第三個元素,等待 3 秒後,若無法加入,傳回 false。

	@Test
	public void offerTimeout() {
		try {
			int expectedSize = 2;
			BlockingQueue<String> queue = new ArrayBlockingQueue<String>(2);
			queue.offer("Papaya");
			queue.offer("Strawberry");
			boolean offered = queue.offer("Watermelon", 3, TimeUnit.SECONDS);
			System.out.println(offered);
			assertFalse(offered);
			System.out.println(queue);
			assertEquals(expectedSize, queue.size());
		} catch (InterruptedException ex) {
			ex.printStackTrace();
		}
	}
false
[Papaya, Strawberry]

poll

ArrayBlockingQueue Java 建立一個 ArrayBlockingQueue ,內有三個元素,取得第一個位置元素,元素會移出佇列。

	@Test
	public void poll() {
		int expectedSize = 2;
		BlockingQueue<String> queue = new ArrayBlockingQueue<String>(10);
		queue.offer("Papaya");
		queue.offer("Strawberry");
		queue.offer("Watermelon");
		String value = queue.poll();
		System.out.println(value);
		assertNotNull(value);
		System.out.println(queue);
		assertEquals(expectedSize, queue.size());
	}
Papaya
[Strawberry, Watermelon]

pollWhenEmpty

ArrayBlockingQueue Java 建立一個 ArrayBlockingQueue ,當佇列為空,沒有任何元素時,取得第一個位置元素,會傳回 null 。

	@Test
	public void pollWhenEmpty() {
		int expectedSize = 0;
		BlockingQueue<String> queue = new ArrayBlockingQueue<String>(2);
		String value = queue.poll();
		System.out.println(value);
		assertNull(value);
		System.out.println(queue);
		assertEquals(expectedSize, queue.size());
	}
null
[]

pollTimeout

Blocking Queue Java 建立一個 ArrayBlockingQueue ,當佇列為空,沒有任何元素時,取得第一個位置元素,等待 3 秒後,若無法取得,會傳回 null 。

	@Test
	public void pollTimeout() {
		try {
			int expectedSize = 0;
			BlockingQueue<String> queue = new ArrayBlockingQueue<String>(2);
			String value = queue.poll(3, TimeUnit.SECONDS);
			System.out.println(value);
			assertNull(value);
			System.out.println(queue);
			assertEquals(expectedSize, queue.size());
		} catch (InterruptedException ex) {
			ex.printStackTrace();
		}
	}
null
[]

put

Blocking Queue Java 建立一個 ArrayBlockingQueue ,容量大小為 2 ,當佇列已滿時, put 方法會暫停調用執行緒,直到能放入物件時,才會喚醒執行緒。

	@Test
	public void put() {
		try {
			int expectedSize = 2;
			BlockingQueue<String> queue = new ArrayBlockingQueue<String>(2);

			Thread thread = new Thread(() -> {
				try {
					String value = queue.take();
					System.out.println("T[" + Thread.currentThread().getId() + "] take: " + value);
				} catch (InterruptedException ex) {
					ex.printStackTrace();
				}
			});
			thread.start();

			queue.put("Papaya");
			queue.put("Strawberry");
			queue.put("Watermelon");
			System.out.println("T[" + Thread.currentThread().getId() + "] put: " + queue);
			assertEquals(expectedSize, queue.size());
		} catch (InterruptedException ex) {
			ex.printStackTrace();
		}
	}
T[11] take: Papaya
T[1] put: [Strawberry, Watermelon]

take

Blocking Queue Java 建立一個 ArrayBlockingQueue ,當佇列為空時,take 方法會暫停調用執行緒,直到能取出物件時,才會喚醒執行緒。

	@Test
	public void take() {
		try {
			int expectedSize = 2;
			BlockingQueue<String> queue = new ArrayBlockingQueue<String>(2);

			Thread thread = new Thread(() -> {
				try {
					queue.put("Papaya");
					queue.put("Strawberry");
					queue.put("Watermelon");
					System.out.println("T[" + Thread.currentThread().getId() + "] put: " + queue);
				} catch (InterruptedException ex) {
					ex.printStackTrace();
				}
			});
			thread.start();

			String value = queue.take();
			System.out.println("T[" + Thread.currentThread().getId() + "] take: " + value);
			assertEquals(expectedSize, queue.size());
		} catch (InterruptedException ex) {
			ex.printStackTrace();
		}
	}
T[1] take: Papaya
T[11] put: [Strawberry, Watermelon]

contains

ArrayBlockingQueue Class in Java 建立一個 ArrayBlockingQueue ,內有三個元素,檢查包含指定的元素。

	@Test
	public void contains() {
		BlockingQueue<String> queue = new ArrayBlockingQueue<String>(10);
		queue.add("Papaya");
		queue.add("Strawberry");
		queue.add("Watermelon");

		boolean contains = queue.contains("Papaya");
		System.out.println(contains);
		assertTrue(contains);

		contains = queue.contains("Grape");
		System.out.println(contains);
		assertFalse(contains);
	}
true
false

containsAll

ArrayBlockingQueue Class in Java 建立一個 ArrayBlockingQueue ,內有三個元素,查包含所有指定的元素。

	@Test
	public void containsAll() {
		BlockingQueue<String> queue = new ArrayBlockingQueue<String>(10);
		queue.add("Papaya");
		queue.add("Strawberry");
		queue.add("Watermelon");

		BlockingQueue<String> queue2 = new ArrayBlockingQueue<String>(10);
		queue2.add("Papaya");
		queue2.add("Strawberry");

		boolean contains = queue.containsAll(queue2);
		System.out.println(contains);
		assertTrue(contains);

		contains = queue2.containsAll(queue);
		System.out.println(contains);
		assertFalse(contains);
	}
true
false

stream

ArrayBlockingQueue Class in Java 建立一個 ArrayBlockingQueue ,內有三個元素,執行串流,取得長度大於 6 的元素。

	@Test
	public void stream() {
		int expectedSize = 2;
		BlockingQueue<String> queue = new ArrayBlockingQueue<String>(10);
		queue.add("Papaya");
		queue.add("Strawberry");
		queue.add("Watermelon");
		List<String> list = queue.stream().filter(e -> e.length() > 6).collect(Collectors.toList());
		System.out.println(list);
		assertEquals(expectedSize, list.size());
	}
[Strawberry, Watermelon]

parallelStream

建立一個 ArrayBlockingQueue ,內有三個元素,並行執行串流。

	@Test
	public void parallelStream() {
		BlockingQueue<String> queue = new ArrayBlockingQueue<String>(10);
		queue.add("Papaya");
		queue.add("Strawberry");
		queue.add("Watermelon");
		queue.parallelStream().forEach(System.out::println);
		System.out.println("----------");
		queue.parallelStream().forEachOrdered(System.out::println);
	}
Strawberry
Papaya
Watermelon
----------
Papaya
Strawberry
Watermelon

retainAll

建立一個 ArrayBlockingQueue ,內各有三個元素,只保留相同元素。

	@Test
	public void retainAll() {
		int expectedSize = 1;
		BlockingQueue<String> queue = new ArrayBlockingQueue<String>(10);
		queue.add("Papaya");
		queue.add("Strawberry");
		queue.add("Watermelon");

		BlockingQueue<String> queue2 = new ArrayBlockingQueue<String>(10);
		queue2.add("Papaya");
		queue2.add("Lemon");
		queue2.add("Mango");

		queue.retainAll(queue2);
		System.out.println(queue);
		assertEquals(expectedSize, queue.size());
	}
[Papaya]

ArrayBlockingQueueClassTest.java

Blocking Queue Java 新增單元測試,驗證是否符合預期。

package org.ruoxue.java_147.queue.arrayblockingqueue;

import static org.junit.Assert.*;

import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import org.junit.Test;

public class ArrayBlockingQueueClassTest {

	@Test
	public void offer() {
		int expectedSize = 3;
		BlockingQueue<String> queue = new ArrayBlockingQueue<String>(10);
		queue.offer("Papaya");
		queue.offer("Strawberry");
		queue.offer("Watermelon");
		System.out.println(queue);
		assertEquals(expectedSize, queue.size());
	}

	@Test
	public void offerWhenFull() {
		int expectedSize = 2;
		BlockingQueue<String> queue = new ArrayBlockingQueue<String>(2);
		queue.offer("Papaya");
		queue.offer("Strawberry");
		boolean offered = queue.offer("Watermelon");
		System.out.println(offered);
		assertFalse(offered);
		System.out.println(queue);
		assertEquals(expectedSize, queue.size());
	}

	@Test
	public void offerTimeout() {
		try {
			int expectedSize = 2;
			BlockingQueue<String> queue = new ArrayBlockingQueue<String>(2);
			queue.offer("Papaya");
			queue.offer("Strawberry");
			boolean offered = queue.offer("Watermelon", 3, TimeUnit.SECONDS);
			System.out.println(offered);
			assertFalse(offered);
			System.out.println(queue);
			assertEquals(expectedSize, queue.size());
		} catch (InterruptedException ex) {
			ex.printStackTrace();
		}
	}

	@Test
	public void poll() {
		int expectedSize = 2;
		BlockingQueue<String> queue = new ArrayBlockingQueue<String>(10);
		queue.offer("Papaya");
		queue.offer("Strawberry");
		queue.offer("Watermelon");
		String value = queue.poll();
		System.out.println(value);
		assertNotNull(value);
		System.out.println(queue);
		assertEquals(expectedSize, queue.size());
	}

	@Test
	public void pollWhenEmpty() {
		int expectedSize = 0;
		BlockingQueue<String> queue = new ArrayBlockingQueue<String>(2);
		String value = queue.poll();
		System.out.println(value);
		assertNull(value);
		System.out.println(queue);
		assertEquals(expectedSize, queue.size());
	}

	@Test
	public void pollTimeout() {
		try {
			int expectedSize = 0;
			BlockingQueue<String> queue = new ArrayBlockingQueue<String>(2);
			String value = queue.poll(3, TimeUnit.SECONDS);
			System.out.println(value);
			assertNull(value);
			System.out.println(queue);
			assertEquals(expectedSize, queue.size());
		} catch (InterruptedException ex) {
			ex.printStackTrace();
		}
	}

	@Test
	public void put() {
		try {
			int expectedSize = 2;
			BlockingQueue<String> queue = new ArrayBlockingQueue<String>(2);

			Thread thread = new Thread(() -> {
				try {
					String value = queue.take();
					System.out.println("T[" + Thread.currentThread().getId() + "] take: " + value);
				} catch (InterruptedException ex) {
					ex.printStackTrace();
				}
			});
			thread.start();

			queue.put("Papaya");
			queue.put("Strawberry");
			queue.put("Watermelon");
			System.out.println("T[" + Thread.currentThread().getId() + "] put: " + queue);
			assertEquals(expectedSize, queue.size());
		} catch (InterruptedException ex) {
			ex.printStackTrace();
		}
	}

	@Test
	public void take() {
		try {
			int expectedSize = 2;
			BlockingQueue<String> queue = new ArrayBlockingQueue<String>(2);

			Thread thread = new Thread(() -> {
				try {
					queue.put("Papaya");
					queue.put("Strawberry");
					queue.put("Watermelon");
					System.out.println("T[" + Thread.currentThread().getId() + "] put: " + queue);
				} catch (InterruptedException ex) {
					ex.printStackTrace();
				}
			});
			thread.start();

			String value = queue.take();
			System.out.println("T[" + Thread.currentThread().getId() + "] take: " + value);
			assertEquals(expectedSize, queue.size());
		} catch (InterruptedException ex) {
			ex.printStackTrace();
		}
	}

	@Test
	public void contains() {
		BlockingQueue<String> queue = new ArrayBlockingQueue<String>(10);
		queue.add("Papaya");
		queue.add("Strawberry");
		queue.add("Watermelon");

		boolean contains = queue.contains("Papaya");
		System.out.println(contains);
		assertTrue(contains);

		contains = queue.contains("Grape");
		System.out.println(contains);
		assertFalse(contains);
	}

	@Test
	public void containsAll() {
		BlockingQueue<String> queue = new ArrayBlockingQueue<String>(10);
		queue.add("Papaya");
		queue.add("Strawberry");
		queue.add("Watermelon");

		BlockingQueue<String> queue2 = new ArrayBlockingQueue<String>(10);
		queue2.add("Papaya");
		queue2.add("Strawberry");

		boolean contains = queue.containsAll(queue2);
		System.out.println(contains);
		assertTrue(contains);

		contains = queue2.containsAll(queue);
		System.out.println(contains);
		assertFalse(contains);
	}

	@Test
	public void stream() {
		int expectedSize = 2;
		BlockingQueue<String> queue = new ArrayBlockingQueue<String>(10);
		queue.add("Papaya");
		queue.add("Strawberry");
		queue.add("Watermelon");
		List<String> list = queue.stream().filter(e -> e.length() > 6).collect(Collectors.toList());
		System.out.println(list);
		assertEquals(expectedSize, list.size());
	}

	@Test
	public void parallelStream() {
		BlockingQueue<String> queue = new ArrayBlockingQueue<String>(10);
		queue.add("Papaya");
		queue.add("Strawberry");
		queue.add("Watermelon");
		queue.parallelStream().forEach(System.out::println);
		System.out.println("----------");
		queue.parallelStream().forEachOrdered(System.out::println);
	}

	@Test
	public void retainAll() {
		int expectedSize = 1;
		BlockingQueue<String> queue = new ArrayBlockingQueue<String>(10);
		queue.add("Papaya");
		queue.add("Strawberry");
		queue.add("Watermelon");

		BlockingQueue<String> queue2 = new ArrayBlockingQueue<String>(10);
		queue2.add("Papaya");
		queue2.add("Lemon");
		queue2.add("Mango");

		queue.retainAll(queue2);
		System.out.println(queue);
		assertEquals(expectedSize, queue.size());
	}
}

心得分享

ArrayBlockingQueue in Java 使用 FIFO 先進先出的方式,阻塞兩個或多個執行緒對單個資源的操作,使用容量限制因子來阻塞線程, Blocking Queue Java 提供了幾種 ArrayBlockingQueue 常見方法的操作範例。

發佈留言