Table of Contents
ToggleJava ExecutorService Interface
執行異步任務,實現了 Runnable 接口 及 Callable 接口,提供 execute 方法與 submit 方法,當任務數多於可用執行緒數,可以將任務暫時放置佇列,多元的佇列類別,可視實際需求選擇, ExecutorService Interface 定義了執行、取消、觸發單一或所有任務及關閉等執行緒池操作,介紹常見的 cancel 、 invokeAny 、 invokeAll 等方法,本篇增加了範例,並透過單元測試來驗證產出結果。
檔案目錄
./
+- src
+- test
| +- org
| +- ruoxue
| +- java_147
| +- multithreading
| +- executorservice
| +- ExecutorServiceInterfaceTest.java
單元測試
ExecutorService Interface Java 提供執行、提交等操作執行緒池。
cancel
ExecutorService Interface Java 建立一個執行緒池,核心數量 2 條執行緒,最大數量 5 條執行緒, Java ExecutorService Example 使用 LinkedBlockingQueue ,執行 3 個任務,每個任務耗時 1 秒完成,當尚未完成時,取消任務。
@Test
public void cancel() {
int poolSize = 2;
int maxPoolSize = 5;
ThreadPoolExecutor executorService = new ThreadPoolExecutor(poolSize, maxPoolSize, 0L, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>());
int taskSize = 3;
AtomicInteger ids = new AtomicInteger();
List<Future<String>> futures = new ArrayList<Future<String>>();
for (int i = 0; i < taskSize; i++) {
Future<String> future = executorService.submit(() -> {
int id = ids.incrementAndGet();
System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + id + " ready");
TimeUnit.SECONDS.sleep(5);
System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + id + " finished");
return id + " OK";
});
futures.add(future);
}
futures.forEach(e -> {
try {
if (!e.isDone()) {
e.cancel(true);
}
String result = e.get();
System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + result);
} catch (Exception ex) {
ex.printStackTrace();
}
});
try {
executorService.awaitTermination(3, TimeUnit.SECONDS);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
T[11] worker: 2 ready
T[12] worker: 1 ready
T[11] worker: 3 ready
java.util.concurrent.CancellationException
at java.util.concurrent.FutureTask.report(FutureTask.java:121)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at org.ruoxue.java_147.multithreading.ExecutorServiceMethodsTest.lambda$7(ExecutorServiceMethodsTest.java:169)
invokeAny
ExecutorService Interface Java 建立一個執行緒池,核心數量 2 條執行緒,最大數量 5 條執行緒,使用 LinkedBlockingQueue , Java ExecutorService Example 執行 3 個任務,每個任務耗時 1 秒完成,只要任何一個任務完成或拋出例外,就結束工作。
@Test
public void invokeAny() {
int poolSize = 2;
int maxPoolSize = 5;
ExecutorService executorService = new ThreadPoolExecutor(poolSize, maxPoolSize, 0L, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>());
int taskSize = 3;
AtomicInteger ids = new AtomicInteger();
List<Callable<String>> callables = new ArrayList<Callable<String>>();
for (int i = 0; i < taskSize; i++) {
callables.add(new Callable<String>() {
public String call() throws Exception {
int id = ids.incrementAndGet();
System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + id + " ready");
TimeUnit.SECONDS.sleep(1);
System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + id + " finished");
return id + " OK";
}
});
}
String result = null;
try {
result = executorService.invokeAny(callables);
} catch (InterruptedException | ExecutionException ex) {
ex.printStackTrace();
}
System.out.println("worker: " + result);
}
T[11] worker: 1 ready
T[12] worker: 2 ready
T[12] worker: 2 finished
T[11] worker: 1 finished
T[11] worker: 3 ready
worker: 2 OK
invokeAll
ExecutorService Interface Java 建立一個執行緒池,核心數量 2 條執行緒,最大數量 5 條執行緒, Java ExecutorService Example 使用 LinkedBlockingQueue ,執行 3 個任務,每個任務耗時 1 秒完成。
@Test
public void invokeAll() {
int poolSize = 2;
int maxPoolSize = 5;
ExecutorService executorService = new ThreadPoolExecutor(poolSize, maxPoolSize, 0L, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>());
int taskSize = 3;
AtomicInteger ids = new AtomicInteger();
List<Callable<String>> callables = new ArrayList<Callable<String>>();
for (int i = 0; i < taskSize; i++) {
callables.add(() -> {
int id = ids.incrementAndGet();
System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + id + " ready");
TimeUnit.SECONDS.sleep(1);
System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + id + " finished");
return id + " OK";
});
}
List<Future<String>> futures = null;
try {
futures = executorService.invokeAll(callables);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
futures.forEach(e -> {
try {
String result = e.get();
System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + result);
} catch (Exception ex) {
ex.printStackTrace();
}
});
}
T[12] worker: 2 ready
T[11] worker: 1 ready
T[11] worker: 1 finished
T[12] worker: 2 finished
T[11] worker: 3 ready
T[11] worker: 3 finished
T[1] worker: 1 OK
T[1] worker: 2 OK
T[1] worker: 3 OK
ExecutorServiceInterfaceTest.java
ExecutorService Methods in Java 新增單元測試,驗證 ExecutorService in Java 是否符合預期。
package org.ruoxue.java_147.multithreading.executorservice;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Test;
public class ExecutorServiceInterfaceTest {
@Test
public void cancel() {
int poolSize = 2;
int maxPoolSize = 5;
ThreadPoolExecutor executorService = new ThreadPoolExecutor(poolSize, maxPoolSize, 0L, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>());
int taskSize = 3;
AtomicInteger ids = new AtomicInteger();
List<Future<String>> futures = new ArrayList<Future<String>>();
for (int i = 0; i < taskSize; i++) {
Future<String> future = executorService.submit(() -> {
int id = ids.incrementAndGet();
System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + id + " ready");
TimeUnit.SECONDS.sleep(5);
System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + id + " finished");
return id + " OK";
});
futures.add(future);
}
futures.forEach(e -> {
try {
if (!e.isDone()) {
e.cancel(true);
}
String result = e.get();
System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + result);
} catch (Exception ex) {
ex.printStackTrace();
}
});
try {
executorService.awaitTermination(3, TimeUnit.SECONDS);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
@Test
public void invokeAny() {
int poolSize = 2;
int maxPoolSize = 5;
ExecutorService executorService = new ThreadPoolExecutor(poolSize, maxPoolSize, 0L, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>());
int taskSize = 3;
AtomicInteger ids = new AtomicInteger();
List<Callable<String>> callables = new ArrayList<Callable<String>>();
for (int i = 0; i < taskSize; i++) {
callables.add(new Callable<String>() {
public String call() throws Exception {
int id = ids.incrementAndGet();
System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + id + " ready");
TimeUnit.SECONDS.sleep(1);
System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + id + " finished");
return id + " OK";
}
});
}
String result = null;
try {
result = executorService.invokeAny(callables);
} catch (InterruptedException | ExecutionException ex) {
ex.printStackTrace();
}
System.out.println("worker: " + result);
}
@Test
public void invokeAll() {
int poolSize = 2;
int maxPoolSize = 5;
ExecutorService executorService = new ThreadPoolExecutor(poolSize, maxPoolSize, 0L, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>());
int taskSize = 3;
AtomicInteger ids = new AtomicInteger();
List<Callable<String>> callables = new ArrayList<Callable<String>>();
for (int i = 0; i < taskSize; i++) {
callables.add(() -> {
int id = ids.incrementAndGet();
System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + id + " ready");
TimeUnit.SECONDS.sleep(1);
System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + id + " finished");
return id + " OK";
});
}
List<Future<String>> futures = null;
try {
futures = executorService.invokeAll(callables);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
futures.forEach(e -> {
try {
String result = e.get();
System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + result);
} catch (Exception ex) {
ex.printStackTrace();
}
});
}
}
心得分享
Java ExecutorService Example 為了重用執行緒,不要在每次使用時建立,用完執行緒後回收,下次再重新建立新的執行緒,因此,為了減少系統負擔,提供一個能夠穩定的調度管理執行緒的工具,是非常重要的課題, Interface ExecutorService Java 提供一個執行緒池及用於為其分配任務的 API ,高效率地管理有限數量的執行緒和任務分配等狀況,提供了幾種 ExecutorService 常見方法的操作範例。