Table of Contents
ToggleJava ExecutorService Methods
多執行緒異步執行任務的接口,管理維護執行緒池、分配任務執行等,如果任務數多於可用執行緒數, ExecutorService in Java 可以將任務暫時放置佇列,並提供中斷,放棄等不同策略供選擇,以進行任務的下一步操作, ExecutorService Java Methods 介紹常見的 execute 、 submit 、 awaitTermination 等方法,本篇增加了範例,並透過單元測試來驗證產出結果。
檔案目錄
./
+- src
+- test
| +- org
| +- ruoxue
| +- java_147
| +- multithreading
| +- executorservice
| +- ExecutorServiceMethodsTest.java
單元測試
ExecutorService Methods Java 提供執行、提交、取消等操作執行緒池。
execute
ExecutorService Methods Java 建立一個執行緒池,核心數量 2 條執行緒,最大數量 5 條執行緒,使用 LinkedBlockingQueue ,執行 3 個任務,每個任務耗時 1 秒完成。
@Test
public void execute() {
int poolSize = 2;
int maxPoolSize = 5;
ExecutorService executorService = new ThreadPoolExecutor(poolSize, maxPoolSize, 0L, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>());
int taskSize = 3;
CountDownLatch latch = new CountDownLatch(taskSize);
AtomicInteger ids = new AtomicInteger();
for (int i = 0; i < taskSize; i++) {
executorService.execute(() -> {
try {
int id = ids.incrementAndGet();
System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + id + " ready");
TimeUnit.SECONDS.sleep(1);
System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + id + " finished");
} catch (Exception ex) {
ex.printStackTrace();
} finally {
latch.countDown();
}
});
}
try {
latch.await();
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
T[11] worker: 1 ready
T[12] worker: 2 ready
T[11] worker: 1 finished
T[12] worker: 2 finished
T[11] worker: 3 ready
T[11] worker: 3 finished
awaitTermination
ExecutorService Methods Java 建立一個執行緒池,核心數量 2 條執行緒,最大數量 5 條執行緒,使用 LinkedBlockingQueue ,執行 3 個任務,每個任務耗時 1 秒完成,執行緒池等待 3 秒後結束。
@Test
public void awaitTermination() {
int poolSize = 2;
int maxPoolSize = 5;
ExecutorService executorService = new ThreadPoolExecutor(poolSize, maxPoolSize, 0L, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>());
int taskSize = 3;
AtomicInteger ids = new AtomicInteger();
for (int i = 0; i < taskSize; i++) {
executorService.execute(() -> {
try {
int id = ids.incrementAndGet();
System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + id + " ready");
TimeUnit.SECONDS.sleep(1);
System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + id + " finished");
} catch (Exception ex) {
ex.printStackTrace();
}
});
}
executorService.shutdown();
try {
if (!executorService.awaitTermination(3, TimeUnit.SECONDS)) {
executorService.shutdownNow();
}
} catch (InterruptedException e) {
executorService.shutdownNow();
}
}
T[12] worker: 2 ready
T[11] worker: 1 ready
T[11] worker: 1 finished
T[12] worker: 2 finished
T[12] worker: 3 ready
T[12] worker: 3 finished
submitRunnable
ExecutorService Methods Java 建立一個執行緒池,核心數量 2 條執行緒,最大數量 5 條執行緒,使用 LinkedBlockingQueue ,執行 3 個任務,每個任務耗時 1 秒完成。
@Test
public void submitRunnable() {
int poolSize = 2;
int maxPoolSize = 5;
ThreadPoolExecutor executorService = new ThreadPoolExecutor(poolSize, maxPoolSize, 0L, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>());
int taskSize = 3;
AtomicInteger ids = new AtomicInteger();
List<Future<String>> futures = new ArrayList<Future<String>>();
for (int i = 0; i < taskSize; i++) {
int id = ids.incrementAndGet();
Future<String> future = executorService.submit(() -> {
System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + id + " ready");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + id + " finished");
}, id + " OK");
futures.add(future);
}
futures.forEach(e -> {
try {
String result = e.get();
System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + result);
} catch (Exception ex) {
ex.printStackTrace();
}
});
}
T[11] worker: 1 ready
T[12] worker: 2 ready
T[11] worker: 1 finished
T[12] worker: 2 finished
T[11] worker: 3 ready
T[1] worker: 1 OK
T[1] worker: 2 OK
T[11] worker: 3 finished
T[1] worker: 3 OK
submitCallable
ExecutorService Methods in Java 建立一個執行緒池,核心數量 2 條執行緒,最大數量 5 條執行緒,使用 LinkedBlockingQueue ,執行 3 個任務,每個任務耗時 1 秒完成。
@Test
public void submitCallable() {
int poolSize = 2;
int maxPoolSize = 5;
ThreadPoolExecutor executorService = new ThreadPoolExecutor(poolSize, maxPoolSize, 0L, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>());
int taskSize = 3;
AtomicInteger ids = new AtomicInteger();
List<Future<String>> futures = new ArrayList<Future<String>>();
for (int i = 0; i < taskSize; i++) {
Future<String> future = executorService.submit(() -> {
int id = ids.incrementAndGet();
System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + id + " ready");
TimeUnit.SECONDS.sleep(1);
System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + id + " finished");
return id + " OK";
});
futures.add(future);
}
futures.forEach(e -> {
try {
String result = e.get();
System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + result);
} catch (Exception ex) {
ex.printStackTrace();
}
});
}
T[11] worker: 1 ready
T[12] worker: 2 ready
T[12] worker: 2 finished
T[11] worker: 1 finished
T[12] worker: 3 ready
T[1] worker: 1 OK
T[1] worker: 2 OK
T[12] worker: 3 finished
T[1] worker: 3 OK
ExecutorServiceMethodsTest.java
ExecutorService Methods in Java 新增單元測試,驗證 ExecutorService in Java 是否符合預期。
package org.ruoxue.java_147.multithreading.executorservice;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Test;
public class ExecutorServiceMethodsTest {
@Test
public void execute() {
int poolSize = 2;
int maxPoolSize = 5;
ExecutorService executorService = new ThreadPoolExecutor(poolSize, maxPoolSize, 0L, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>());
int taskSize = 3;
CountDownLatch latch = new CountDownLatch(taskSize);
AtomicInteger ids = new AtomicInteger();
for (int i = 0; i < taskSize; i++) {
executorService.execute(() -> {
try {
int id = ids.incrementAndGet();
System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + id + " ready");
TimeUnit.SECONDS.sleep(1);
System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + id + " finished");
} catch (Exception ex) {
ex.printStackTrace();
} finally {
latch.countDown();
}
});
}
try {
latch.await();
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
@Test
public void awaitTermination() {
int poolSize = 2;
int maxPoolSize = 5;
ExecutorService executorService = new ThreadPoolExecutor(poolSize, maxPoolSize, 0L, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>());
int taskSize = 3;
AtomicInteger ids = new AtomicInteger();
for (int i = 0; i < taskSize; i++) {
executorService.execute(() -> {
try {
int id = ids.incrementAndGet();
System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + id + " ready");
TimeUnit.SECONDS.sleep(1);
System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + id + " finished");
} catch (Exception ex) {
ex.printStackTrace();
}
});
}
executorService.shutdown();
try {
if (!executorService.awaitTermination(3, TimeUnit.SECONDS)) {
executorService.shutdownNow();
}
} catch (InterruptedException e) {
executorService.shutdownNow();
}
}
@Test
public void submitRunnable() {
int poolSize = 2;
int maxPoolSize = 5;
ThreadPoolExecutor executorService = new ThreadPoolExecutor(poolSize, maxPoolSize, 0L, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>());
int taskSize = 3;
AtomicInteger ids = new AtomicInteger();
List<Future<String>> futures = new ArrayList<Future<String>>();
for (int i = 0; i < taskSize; i++) {
int id = ids.incrementAndGet();
Future<String> future = executorService.submit(() -> {
System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + id + " ready");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + id + " finished");
}, id + " OK");
futures.add(future);
}
futures.forEach(e -> {
try {
String result = e.get();
System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + result);
} catch (Exception ex) {
ex.printStackTrace();
}
});
}
@Test
public void submitCallable() {
int poolSize = 2;
int maxPoolSize = 5;
ThreadPoolExecutor executorService = new ThreadPoolExecutor(poolSize, maxPoolSize, 0L, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>());
int taskSize = 3;
AtomicInteger ids = new AtomicInteger();
List<Future<String>> futures = new ArrayList<Future<String>>();
for (int i = 0; i < taskSize; i++) {
Future<String> future = executorService.submit(() -> {
int id = ids.incrementAndGet();
System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + id + " ready");
TimeUnit.SECONDS.sleep(1);
System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + id + " finished");
return id + " OK";
});
futures.add(future);
}
futures.forEach(e -> {
try {
String result = e.get();
System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + result);
} catch (Exception ex) {
ex.printStackTrace();
}
});
}
}
心得分享
ExecutorService in Java 提供一個執行緒池和一個用於為其分配任務的 API ,統一管理有限數量的執行緒和任務分配等問題, ExecutorService Methods in Java 提供了幾種 ExecutorService 常見方法的操作範例。