Table of Contents
ToggleExecutorService in Java with Examples
繼承 Executor,擴充了一些方法,協助管理和控制執行緒,定義了執行傳回結果的執行緒、一組執行緒和確定關閉狀態的方法,為了執行異步任務,實現了 Runnable 接口, Java ExecutorService Examples 包含 execute 方法,此外還有另一個可用的接口,submit 方法,實現了 Callable 接口,本篇增加了範例,並透過單元測試來驗證產出結果。
檔案目錄
./
+- src
+- test
| +- org
| +- ruoxue
| +- java_147
| +- multithreading
| +- executorservice
| +- ExecutorServiceWithExamplesTest.java
單元測試
ExecutorService Java Examples 提供執行、提交等操作執行緒池。
linkedBlockingQueue
ExecutorService Java Examples 建立一個執行緒池,核心數量 2 條執行緒,最大數量 5 條執行緒, Java ExecutorService 使用 LinkedBlockingQueue ,執行 6 個任務,每個任務耗時 1 秒完成。
@Test
public void linkedBlockingQueue() {
ThreadPoolExecutor executorService = new ThreadPoolExecutor(2, 5, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>());
int taskSize = 6;
CountDownLatch latch = new CountDownLatch(taskSize);
AtomicInteger ids = new AtomicInteger();
for (int i = 0; i < taskSize; i++) {
executorService.execute(() -> {
try {
int id = ids.incrementAndGet();
System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + id + " ready");
TimeUnit.SECONDS.sleep(1);
System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + id + " finished");
} catch (Exception ex) {
ex.printStackTrace();
} finally {
latch.countDown();
}
});
}
try {
latch.await();
System.out.println("corePoolSize: " + executorService.getCorePoolSize());
System.out.println("maximumPoolSize: " + executorService.getMaximumPoolSize());
System.out.println("poolSize: " + executorService.getPoolSize());
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
T[11] worker: 1 ready
T[12] worker: 2 ready
T[11] worker: 1 finished
T[12] worker: 2 finished
T[12] worker: 4 ready
T[11] worker: 3 ready
T[12] worker: 4 finished
T[12] worker: 5 ready
T[11] worker: 3 finished
T[11] worker: 6 ready
T[11] worker: 6 finished
T[12] worker: 5 finished
corePoolSize: 2
maximumPoolSize: 5
poolSize: 2
arrayBlockingQueue
ExecutorService Java Examples 建立一個執行緒池,核心數量 2 條執行緒,最大數量 5 條執行緒, Java ExecutorService 使用 ArrayBlockingQueue 容量為 2 個 ,執行 6 個任務,每個任務耗時 1 秒完成。
@Test
public void arrayBlockingQueue() {
ThreadPoolExecutor executorService = new ThreadPoolExecutor(2, 5, 60L, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(2));
int taskSize = 6;
CountDownLatch latch = new CountDownLatch(taskSize);
AtomicInteger ids = new AtomicInteger();
for (int i = 0; i < taskSize; i++) {
executorService.execute(() -> {
try {
int id = ids.incrementAndGet();
System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + id + " ready");
TimeUnit.SECONDS.sleep(1);
System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + id + " finished");
} catch (Exception ex) {
ex.printStackTrace();
} finally {
latch.countDown();
}
});
}
try {
latch.await();
System.out.println("corePoolSize: " + executorService.getCorePoolSize());
System.out.println("maximumPoolSize: " + executorService.getMaximumPoolSize());
System.out.println("poolSize: " + executorService.getPoolSize());
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
T[12] worker: 2 ready
T[13] worker: 3 ready
T[11] worker: 1 ready
T[14] worker: 4 ready
T[13] worker: 3 finished
T[11] worker: 1 finished
T[12] worker: 2 finished
T[14] worker: 4 finished
T[11] worker: 5 ready
T[13] worker: 6 ready
T[13] worker: 6 finished
T[11] worker: 5 finished
corePoolSize: 2
maximumPoolSize: 5
poolSize: 4
abortPolicy
ExecutorService Java Examples 建立一個執行緒池,核心數量 2 條執行緒,最大數量 5 條執行緒, Java ExecutorService 使用 SynchronousQueue ,當達到執行緒最大數量時,採用中止策略,執行 6 個任務,每個任務耗時 1 秒完成。
@Test
public void abortPolicy() {
ThreadPoolExecutor executorService = new ThreadPoolExecutor(2, 5, 60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
int taskSize = 6;
CountDownLatch latch = new CountDownLatch(taskSize);
AtomicInteger ids = new AtomicInteger();
for (int i = 0; i < taskSize; i++) {
executorService.execute(() -> {
try {
int id = ids.incrementAndGet();
System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + id + " ready");
TimeUnit.SECONDS.sleep(1);
System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + id + " finished");
} catch (Exception ex) {
ex.printStackTrace();
} finally {
latch.countDown();
}
});
}
try {
latch.await();
System.out.println("corePoolSize: " + executorService.getCorePoolSize());
System.out.println("maximumPoolSize: " + executorService.getMaximumPoolSize());
System.out.println("poolSize: " + executorService.getPoolSize());
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
T[11] worker: 1 ready
T[13] worker: 3 ready
T[12] worker: 2 ready
T[14] worker: 4 ready
T[15] worker: 5 ready
java.util.concurrent.RejectedExecutionException: Task org.ruoxue.java_147.multithreading.ExecutorServiceWithExamplesTest$$Lambda$1/971848845@6f75e721 rejected from java.util.concurrent.ThreadPoolExecutor@69222c14[Running, pool size = 5, active threads = 5, queued tasks = 0, completed tasks = 0]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
at org.ruoxue.java_147.multithreading.ExecutorServiceWithExamplesTest.abortPolicy(ExecutorServiceWithExamplesTest.java:87)
discardPolicy
Java ExecutorService 建立一個執行緒池,核心數量 2 條執行緒,最大數量 5 條執行緒, Executor Service Java 使用 SynchronousQueue ,當達到執行緒最大數量時,採用放棄策略,執行 6 個任務,每個任務耗時 1 秒完成。
@Test
public void discardPolicy() {
ThreadPoolExecutor executorService = new ThreadPoolExecutor(2, 5, 60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), new ThreadPoolExecutor.DiscardPolicy());
int taskSize = 6;
AtomicInteger ids = new AtomicInteger();
for (int i = 0; i < taskSize; i++) {
executorService.execute(() -> {
try {
int id = ids.incrementAndGet();
System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + id + " ready");
TimeUnit.SECONDS.sleep(1);
System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + id + " finished");
} catch (Exception ex) {
ex.printStackTrace();
}
});
}
try {
executorService.awaitTermination(3, TimeUnit.SECONDS);
System.out.println("corePoolSize: " + executorService.getCorePoolSize());
System.out.println("maximumPoolSize: " + executorService.getMaximumPoolSize());
System.out.println("poolSize: " + executorService.getPoolSize());
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
T[11] worker: 2 ready
T[14] worker: 4 ready
T[13] worker: 3 ready
T[12] worker: 1 ready
T[15] worker: 5 ready
T[14] worker: 4 finished
T[11] worker: 2 finished
T[12] worker: 1 finished
T[15] worker: 5 finished
T[13] worker: 3 finished
corePoolSize: 2
maximumPoolSize: 5
poolSize: 5
callerRunsPolicy
Java ExecutorService 建立一個執行緒池,核心數量 2 條執行緒,最大數量 5 條執行緒, Executor Service Java 使用 SynchronousQueue ,當達到執行緒最大數量時,採用調用者執行緒執行策略,執行 6 個任務,每個任務耗時 1 秒完成。
@Test
public void callerRunsPolicy() {
ThreadPoolExecutor executorService = new ThreadPoolExecutor(2, 5, 60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), new ThreadPoolExecutor.CallerRunsPolicy());
int taskSize = 6;
CountDownLatch latch = new CountDownLatch(taskSize);
AtomicInteger ids = new AtomicInteger();
for (int i = 0; i < taskSize; i++) {
executorService.execute(() -> {
try {
int id = ids.incrementAndGet();
System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + id + " ready");
TimeUnit.SECONDS.sleep(1);
System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + id + " finished");
} catch (Exception ex) {
ex.printStackTrace();
} finally {
latch.countDown();
}
});
}
try {
latch.await();
System.out.println("corePoolSize: " + executorService.getCorePoolSize());
System.out.println("maximumPoolSize: " + executorService.getMaximumPoolSize());
System.out.println("poolSize: " + executorService.getPoolSize());
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
T[11] worker: 1 ready
T[1] worker: 3 ready
T[13] worker: 4 ready
T[15] worker: 6 ready
T[12] worker: 2 ready
T[14] worker: 5 ready
T[14] worker: 5 finished
T[13] worker: 4 finished
T[1] worker: 3 finished
T[11] worker: 1 finished
T[12] worker: 2 finished
T[15] worker: 6 finished
corePoolSize: 2
maximumPoolSize: 5
poolSize: 5
ExecutorServiceWithExamplesTest.java
Java ExecutorService 新增單元測試,驗證 Executor Service Java 是否符合預期。
package org.ruoxue.java_147.multithreading.executorservice;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Test;
public class ExecutorServiceWithExamplesTest {
@Test
public void linkedBlockingQueue() {
ThreadPoolExecutor executorService = new ThreadPoolExecutor(2, 5, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>());
int taskSize = 6;
CountDownLatch latch = new CountDownLatch(taskSize);
AtomicInteger ids = new AtomicInteger();
for (int i = 0; i < taskSize; i++) {
executorService.execute(() -> {
try {
int id = ids.incrementAndGet();
System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + id + " ready");
TimeUnit.SECONDS.sleep(1);
System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + id + " finished");
} catch (Exception ex) {
ex.printStackTrace();
} finally {
latch.countDown();
}
});
}
try {
latch.await();
System.out.println("corePoolSize: " + executorService.getCorePoolSize());
System.out.println("maximumPoolSize: " + executorService.getMaximumPoolSize());
System.out.println("poolSize: " + executorService.getPoolSize());
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
@Test
public void arrayBlockingQueue() {
ThreadPoolExecutor executorService = new ThreadPoolExecutor(2, 5, 60L, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(2));
int taskSize = 6;
CountDownLatch latch = new CountDownLatch(taskSize);
AtomicInteger ids = new AtomicInteger();
for (int i = 0; i < taskSize; i++) {
executorService.execute(() -> {
try {
int id = ids.incrementAndGet();
System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + id + " ready");
TimeUnit.SECONDS.sleep(1);
System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + id + " finished");
} catch (Exception ex) {
ex.printStackTrace();
} finally {
latch.countDown();
}
});
}
try {
latch.await();
System.out.println("corePoolSize: " + executorService.getCorePoolSize());
System.out.println("maximumPoolSize: " + executorService.getMaximumPoolSize());
System.out.println("poolSize: " + executorService.getPoolSize());
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
@Test
public void abortPolicy() {
ThreadPoolExecutor executorService = new ThreadPoolExecutor(2, 5, 60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
int taskSize = 6;
CountDownLatch latch = new CountDownLatch(taskSize);
AtomicInteger ids = new AtomicInteger();
for (int i = 0; i < taskSize; i++) {
executorService.execute(() -> {
try {
int id = ids.incrementAndGet();
System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + id + " ready");
TimeUnit.SECONDS.sleep(1);
System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + id + " finished");
} catch (Exception ex) {
ex.printStackTrace();
} finally {
latch.countDown();
}
});
}
try {
latch.await();
System.out.println("corePoolSize: " + executorService.getCorePoolSize());
System.out.println("maximumPoolSize: " + executorService.getMaximumPoolSize());
System.out.println("poolSize: " + executorService.getPoolSize());
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
@Test
public void discardPolicy() {
ThreadPoolExecutor executorService = new ThreadPoolExecutor(2, 5, 60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), new ThreadPoolExecutor.DiscardPolicy());
int taskSize = 6;
AtomicInteger ids = new AtomicInteger();
for (int i = 0; i < taskSize; i++) {
executorService.execute(() -> {
try {
int id = ids.incrementAndGet();
System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + id + " ready");
TimeUnit.SECONDS.sleep(1);
System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + id + " finished");
} catch (Exception ex) {
ex.printStackTrace();
}
});
}
try {
executorService.awaitTermination(3, TimeUnit.SECONDS);
System.out.println("corePoolSize: " + executorService.getCorePoolSize());
System.out.println("maximumPoolSize: " + executorService.getMaximumPoolSize());
System.out.println("poolSize: " + executorService.getPoolSize());
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
@Test
public void callerRunsPolicy() {
ThreadPoolExecutor executorService = new ThreadPoolExecutor(2, 5, 60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), new ThreadPoolExecutor.CallerRunsPolicy());
int taskSize = 6;
CountDownLatch latch = new CountDownLatch(taskSize);
AtomicInteger ids = new AtomicInteger();
for (int i = 0; i < taskSize; i++) {
executorService.execute(() -> {
try {
int id = ids.incrementAndGet();
System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + id + " ready");
TimeUnit.SECONDS.sleep(1);
System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + id + " finished");
} catch (Exception ex) {
ex.printStackTrace();
} finally {
latch.countDown();
}
});
}
try {
latch.await();
System.out.println("corePoolSize: " + executorService.getCorePoolSize());
System.out.println("maximumPoolSize: " + executorService.getMaximumPoolSize());
System.out.println("poolSize: " + executorService.getPoolSize());
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
}
心得分享
Executor Service Java 執行緒池提供了一個在未來的某個時間執行任務,調度管理執行緒,提升了系統穩定度,大量減少每次建立新執行緒的開銷,重用執行緒,避免資源耗盡的狀況, Java ExecutorService 提供了幾種 ExecutorService 常見方法的操作範例。