ExecutorService in Java with Examples - Java 147

ExecutorService in Java with Examples – Java 147

ExecutorService in Java with Examples

繼承 Executor,擴充了一些方法,協助管理和控制執行緒,定義了執行傳回結果的執行緒、一組執行緒和確定關閉狀態的方法,為了執行異步任務,實現了 Runnable 接口, Java ExecutorService Examples 包含 execute 方法,此外還有另一個可用的接口,submit 方法,實現了 Callable 接口,本篇增加了範例,並透過單元測試來驗證產出結果。

檔案目錄

./
   +- src
       +- test
       |   +- org
       |       +- ruoxue
       |           +- java_147
       |               +- multithreading
       |                   +- executorservice
       |                       +- ExecutorServiceWithExamplesTest.java  

單元測試

ExecutorService Java Examples 提供執行、提交等操作執行緒池。

linkedBlockingQueue

ExecutorService Java Examples 建立一個執行緒池,核心數量 2 條執行緒,最大數量 5 條執行緒, Java ExecutorService 使用 LinkedBlockingQueue ,執行 6 個任務,每個任務耗時 1 秒完成。

	@Test
	public void linkedBlockingQueue() {
		ThreadPoolExecutor executorService = new ThreadPoolExecutor(2, 5, 60L, TimeUnit.SECONDS,
				new LinkedBlockingQueue<Runnable>());
		int taskSize = 6;
		CountDownLatch latch = new CountDownLatch(taskSize);
		AtomicInteger ids = new AtomicInteger();
		for (int i = 0; i < taskSize; i++) {
			executorService.execute(() -> {
				try {
					int id = ids.incrementAndGet();
					System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + id + " ready");
					TimeUnit.SECONDS.sleep(1);
					System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + id + " finished");
				} catch (Exception ex) {
					ex.printStackTrace();
				} finally {
					latch.countDown();
				}
			});
		}

		try {
			latch.await();
			System.out.println("corePoolSize: " + executorService.getCorePoolSize());
			System.out.println("maximumPoolSize: " + executorService.getMaximumPoolSize());
			System.out.println("poolSize: " + executorService.getPoolSize());
		} catch (InterruptedException ex) {
			ex.printStackTrace();
		}
	}
T[11] worker: 1 ready
T[12] worker: 2 ready
T[11] worker: 1 finished
T[12] worker: 2 finished
T[12] worker: 4 ready
T[11] worker: 3 ready
T[12] worker: 4 finished
T[12] worker: 5 ready
T[11] worker: 3 finished
T[11] worker: 6 ready
T[11] worker: 6 finished
T[12] worker: 5 finished
corePoolSize: 2
maximumPoolSize: 5
poolSize: 2

arrayBlockingQueue

ExecutorService Java Examples 建立一個執行緒池,核心數量 2 條執行緒,最大數量 5 條執行緒, Java ExecutorService 使用 ArrayBlockingQueue 容量為 2 個 ,執行 6 個任務,每個任務耗時 1 秒完成。

	@Test
	public void arrayBlockingQueue() {
		ThreadPoolExecutor executorService = new ThreadPoolExecutor(2, 5, 60L, TimeUnit.SECONDS,
				new ArrayBlockingQueue<Runnable>(2));
		int taskSize = 6;
		CountDownLatch latch = new CountDownLatch(taskSize);
		AtomicInteger ids = new AtomicInteger();
		for (int i = 0; i < taskSize; i++) {
			executorService.execute(() -> {
				try {
					int id = ids.incrementAndGet();
					System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + id + " ready");
					TimeUnit.SECONDS.sleep(1);
					System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + id + " finished");
				} catch (Exception ex) {
					ex.printStackTrace();
				} finally {
					latch.countDown();
				}
			});
		}

		try {
			latch.await();
			System.out.println("corePoolSize: " + executorService.getCorePoolSize());
			System.out.println("maximumPoolSize: " + executorService.getMaximumPoolSize());
			System.out.println("poolSize: " + executorService.getPoolSize());
		} catch (InterruptedException ex) {
			ex.printStackTrace();
		}
	}
T[12] worker: 2 ready
T[13] worker: 3 ready
T[11] worker: 1 ready
T[14] worker: 4 ready
T[13] worker: 3 finished
T[11] worker: 1 finished
T[12] worker: 2 finished
T[14] worker: 4 finished
T[11] worker: 5 ready
T[13] worker: 6 ready
T[13] worker: 6 finished
T[11] worker: 5 finished
corePoolSize: 2
maximumPoolSize: 5
poolSize: 4

abortPolicy

ExecutorService Java Examples 建立一個執行緒池,核心數量 2 條執行緒,最大數量 5 條執行緒, Java ExecutorService 使用 SynchronousQueue ,當達到執行緒最大數量時,採用中止策略,執行 6 個任務,每個任務耗時 1 秒完成。

	@Test
	public void abortPolicy() {
		ThreadPoolExecutor executorService = new ThreadPoolExecutor(2, 5, 60L, TimeUnit.SECONDS,
				new SynchronousQueue<Runnable>());
		int taskSize = 6;
		CountDownLatch latch = new CountDownLatch(taskSize);
		AtomicInteger ids = new AtomicInteger();
		for (int i = 0; i < taskSize; i++) {
			executorService.execute(() -> {
				try {
					int id = ids.incrementAndGet();
					System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + id + " ready");
					TimeUnit.SECONDS.sleep(1);
					System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + id + " finished");
				} catch (Exception ex) {
					ex.printStackTrace();
				} finally {
					latch.countDown();
				}
			});
		}

		try {
			latch.await();
			System.out.println("corePoolSize: " + executorService.getCorePoolSize());
			System.out.println("maximumPoolSize: " + executorService.getMaximumPoolSize());
			System.out.println("poolSize: " + executorService.getPoolSize());
		} catch (InterruptedException ex) {
			ex.printStackTrace();
		}
	}
T[11] worker: 1 ready
T[13] worker: 3 ready
T[12] worker: 2 ready
T[14] worker: 4 ready
T[15] worker: 5 ready

java.util.concurrent.RejectedExecutionException: Task org.ruoxue.java_147.multithreading.ExecutorServiceWithExamplesTest$$Lambda$1/971848845@6f75e721 rejected from java.util.concurrent.ThreadPoolExecutor@69222c14[Running, pool size = 5, active threads = 5, queued tasks = 0, completed tasks = 0]
	at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
	at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
	at org.ruoxue.java_147.multithreading.ExecutorServiceWithExamplesTest.abortPolicy(ExecutorServiceWithExamplesTest.java:87)

discardPolicy

Java ExecutorService 建立一個執行緒池,核心數量 2 條執行緒,最大數量 5 條執行緒, Executor Service Java 使用 SynchronousQueue ,當達到執行緒最大數量時,採用放棄策略,執行 6 個任務,每個任務耗時 1 秒完成。

	@Test
	public void discardPolicy() {
		ThreadPoolExecutor executorService = new ThreadPoolExecutor(2, 5, 60L, TimeUnit.SECONDS,
				new SynchronousQueue<Runnable>(), new ThreadPoolExecutor.DiscardPolicy());
		int taskSize = 6;
		AtomicInteger ids = new AtomicInteger();
		for (int i = 0; i < taskSize; i++) {
			executorService.execute(() -> {
				try {
					int id = ids.incrementAndGet();
					System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + id + " ready");
					TimeUnit.SECONDS.sleep(1);
					System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + id + " finished");
				} catch (Exception ex) {
					ex.printStackTrace();
				}
			});
		}

		try {
			executorService.awaitTermination(3, TimeUnit.SECONDS);
			System.out.println("corePoolSize: " + executorService.getCorePoolSize());
			System.out.println("maximumPoolSize: " + executorService.getMaximumPoolSize());
			System.out.println("poolSize: " + executorService.getPoolSize());
		} catch (InterruptedException ex) {
			ex.printStackTrace();
		}
	}
T[11] worker: 2 ready
T[14] worker: 4 ready
T[13] worker: 3 ready
T[12] worker: 1 ready
T[15] worker: 5 ready
T[14] worker: 4 finished
T[11] worker: 2 finished
T[12] worker: 1 finished
T[15] worker: 5 finished
T[13] worker: 3 finished
corePoolSize: 2
maximumPoolSize: 5
poolSize: 5

callerRunsPolicy

Java ExecutorService 建立一個執行緒池,核心數量 2 條執行緒,最大數量 5 條執行緒, Executor Service Java 使用 SynchronousQueue ,當達到執行緒最大數量時,採用調用者執行緒執行策略,執行 6 個任務,每個任務耗時 1 秒完成。

	@Test
	public void callerRunsPolicy() {
		ThreadPoolExecutor executorService = new ThreadPoolExecutor(2, 5, 60L, TimeUnit.SECONDS,
				new SynchronousQueue<Runnable>(), new ThreadPoolExecutor.CallerRunsPolicy());
		int taskSize = 6;
		CountDownLatch latch = new CountDownLatch(taskSize);
		AtomicInteger ids = new AtomicInteger();
		for (int i = 0; i < taskSize; i++) {
			executorService.execute(() -> {
				try {
					int id = ids.incrementAndGet();
					System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + id + " ready");
					TimeUnit.SECONDS.sleep(1);
					System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + id + " finished");
				} catch (Exception ex) {
					ex.printStackTrace();
				} finally {
					latch.countDown();
				}
			});
		}

		try {
			latch.await();
			System.out.println("corePoolSize: " + executorService.getCorePoolSize());
			System.out.println("maximumPoolSize: " + executorService.getMaximumPoolSize());
			System.out.println("poolSize: " + executorService.getPoolSize());
		} catch (InterruptedException ex) {
			ex.printStackTrace();
		}
	}
T[11] worker: 1 ready
T[1] worker: 3 ready
T[13] worker: 4 ready
T[15] worker: 6 ready
T[12] worker: 2 ready
T[14] worker: 5 ready
T[14] worker: 5 finished
T[13] worker: 4 finished
T[1] worker: 3 finished
T[11] worker: 1 finished
T[12] worker: 2 finished
T[15] worker: 6 finished
corePoolSize: 2
maximumPoolSize: 5
poolSize: 5

ExecutorServiceWithExamplesTest.java

Java ExecutorService 新增單元測試,驗證 Executor Service Java 是否符合預期。

package org.ruoxue.java_147.multithreading.executorservice;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.Test;

public class ExecutorServiceWithExamplesTest {

	@Test
	public void linkedBlockingQueue() {
		ThreadPoolExecutor executorService = new ThreadPoolExecutor(2, 5, 60L, TimeUnit.SECONDS,
				new LinkedBlockingQueue<Runnable>());
		int taskSize = 6;
		CountDownLatch latch = new CountDownLatch(taskSize);
		AtomicInteger ids = new AtomicInteger();
		for (int i = 0; i < taskSize; i++) {
			executorService.execute(() -> {
				try {
					int id = ids.incrementAndGet();
					System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + id + " ready");
					TimeUnit.SECONDS.sleep(1);
					System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + id + " finished");
				} catch (Exception ex) {
					ex.printStackTrace();
				} finally {
					latch.countDown();
				}
			});
		}

		try {
			latch.await();
			System.out.println("corePoolSize: " + executorService.getCorePoolSize());
			System.out.println("maximumPoolSize: " + executorService.getMaximumPoolSize());
			System.out.println("poolSize: " + executorService.getPoolSize());
		} catch (InterruptedException ex) {
			ex.printStackTrace();
		}
	}

	@Test
	public void arrayBlockingQueue() {
		ThreadPoolExecutor executorService = new ThreadPoolExecutor(2, 5, 60L, TimeUnit.SECONDS,
				new ArrayBlockingQueue<Runnable>(2));
		int taskSize = 6;
		CountDownLatch latch = new CountDownLatch(taskSize);
		AtomicInteger ids = new AtomicInteger();
		for (int i = 0; i < taskSize; i++) {
			executorService.execute(() -> {
				try {
					int id = ids.incrementAndGet();
					System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + id + " ready");
					TimeUnit.SECONDS.sleep(1);
					System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + id + " finished");
				} catch (Exception ex) {
					ex.printStackTrace();
				} finally {
					latch.countDown();
				}
			});
		}

		try {
			latch.await();
			System.out.println("corePoolSize: " + executorService.getCorePoolSize());
			System.out.println("maximumPoolSize: " + executorService.getMaximumPoolSize());
			System.out.println("poolSize: " + executorService.getPoolSize());
		} catch (InterruptedException ex) {
			ex.printStackTrace();
		}
	}

	@Test
	public void abortPolicy() {
		ThreadPoolExecutor executorService = new ThreadPoolExecutor(2, 5, 60L, TimeUnit.SECONDS,
				new SynchronousQueue<Runnable>());
		int taskSize = 6;
		CountDownLatch latch = new CountDownLatch(taskSize);
		AtomicInteger ids = new AtomicInteger();
		for (int i = 0; i < taskSize; i++) {
			executorService.execute(() -> {
				try {
					int id = ids.incrementAndGet();
					System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + id + " ready");
					TimeUnit.SECONDS.sleep(1);
					System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + id + " finished");
				} catch (Exception ex) {
					ex.printStackTrace();
				} finally {
					latch.countDown();
				}
			});
		}

		try {
			latch.await();
			System.out.println("corePoolSize: " + executorService.getCorePoolSize());
			System.out.println("maximumPoolSize: " + executorService.getMaximumPoolSize());
			System.out.println("poolSize: " + executorService.getPoolSize());
		} catch (InterruptedException ex) {
			ex.printStackTrace();
		}
	}

	@Test
	public void discardPolicy() {
		ThreadPoolExecutor executorService = new ThreadPoolExecutor(2, 5, 60L, TimeUnit.SECONDS,
				new SynchronousQueue<Runnable>(), new ThreadPoolExecutor.DiscardPolicy());
		int taskSize = 6;
		AtomicInteger ids = new AtomicInteger();
		for (int i = 0; i < taskSize; i++) {
			executorService.execute(() -> {
				try {
					int id = ids.incrementAndGet();
					System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + id + " ready");
					TimeUnit.SECONDS.sleep(1);
					System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + id + " finished");
				} catch (Exception ex) {
					ex.printStackTrace();
				}
			});
		}

		try {
			executorService.awaitTermination(3, TimeUnit.SECONDS);
			System.out.println("corePoolSize: " + executorService.getCorePoolSize());
			System.out.println("maximumPoolSize: " + executorService.getMaximumPoolSize());
			System.out.println("poolSize: " + executorService.getPoolSize());
		} catch (InterruptedException ex) {
			ex.printStackTrace();
		}
	}

	@Test
	public void callerRunsPolicy() {
		ThreadPoolExecutor executorService = new ThreadPoolExecutor(2, 5, 60L, TimeUnit.SECONDS,
				new SynchronousQueue<Runnable>(), new ThreadPoolExecutor.CallerRunsPolicy());
		int taskSize = 6;
		CountDownLatch latch = new CountDownLatch(taskSize);
		AtomicInteger ids = new AtomicInteger();
		for (int i = 0; i < taskSize; i++) {
			executorService.execute(() -> {
				try {
					int id = ids.incrementAndGet();
					System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + id + " ready");
					TimeUnit.SECONDS.sleep(1);
					System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + id + " finished");
				} catch (Exception ex) {
					ex.printStackTrace();
				} finally {
					latch.countDown();
				}
			});
		}

		try {
			latch.await();
			System.out.println("corePoolSize: " + executorService.getCorePoolSize());
			System.out.println("maximumPoolSize: " + executorService.getMaximumPoolSize());
			System.out.println("poolSize: " + executorService.getPoolSize());
		} catch (InterruptedException ex) {
			ex.printStackTrace();
		}
	}
}

心得分享

Executor Service Java 執行緒池提供了一個在未來的某個時間執行任務,調度管理執行緒,提升了系統穩定度,大量減少每次建立新執行緒的開銷,重用執行緒,避免資源耗盡的狀況, Java ExecutorService 提供了幾種 ExecutorService 常見方法的操作範例。

發佈留言