Table of Contents
ToggleScheduledThreadPoolExecutor in Java with Examples
繼承 ThreadPoolExecutor 實作 ExecutorService 的功能實現週期與延遲執行任務,重用執行緒池,定期排程的工作,交由執行緒池中的執行緒執行,任務在不同週期內執行它的執行緒可能是不同的,由池中空閒的執行緒從任務佇列取出任務執行, Java ScheduledThreadPoolExecutor Examples 本篇增加了範例,並透過單元測試來驗證產出結果。
檔案目錄
./
+- src
+- test
| +- org
| +- ruoxue
| +- java_147
| +- multithreading
| +- scheduledthreadpoolexecutor
| +- ScheduledThreadPoolExecutorWithExamplesTest.java
單元測試
ScheduledThreadPoolExecutor Java Examples 提供週期與延遲執行任務等, Scheduled Thread Pool Executor 交由執行緒池中的執行緒執行。
scheduleRunnable
ScheduledThreadPoolExecutor Java Examples 建立一個執行緒池,固定數量 1 條執行緒,執行 3 個任務,每個任務耗時 1 秒完成,無傳回值, Java ScheduledThreadPoolExecutor 初始延遲 1 秒後才會開始執行。
protected class RunWorker implements Runnable {
private long start;
private int id;
public RunWorker(long start, int id) {
this.start = start;
this.id = id;
}
@Override
public void run() {
try {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
System.out.println(String.format("%s T[%d] %d worker: %d ready", sdf.format(new Date()),
Thread.currentThread().getId(), System.currentTimeMillis() - start, id));
TimeUnit.SECONDS.sleep(1);
System.out.println(String.format("%s T[%d] %d worker: %d finished", sdf.format(new Date()),
Thread.currentThread().getId(), System.currentTimeMillis() - start, id));
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
}
@Test
public void scheduleRunnable() {
int poolSize = 1;
ScheduledThreadPoolExecutor executorService = new ScheduledThreadPoolExecutor(poolSize);
int taskSize = 3;
SimpleDateFormat df = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
System.out.println(String.format("%s T[%d] init", df.format(new Date()), Thread.currentThread().getId()));
long start = System.currentTimeMillis();
List<ScheduledFuture<?>> futures = new ArrayList<>();
IntStream.range(0, taskSize).forEach(e -> {
ScheduledFuture<?> future = executorService.schedule(new RunWorker(start, e), 1, TimeUnit.SECONDS);
futures.add(future);
});
try {
executorService.awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
2023/03/17 17:48:29 T[1] init
2023/03/17 17:48:30 T[11] 1116 worker: 0 ready
2023/03/17 17:48:31 T[11] 2121 worker: 0 finished
2023/03/17 17:48:31 T[11] 2122 worker: 1 ready
2023/03/17 17:48:32 T[11] 3124 worker: 1 finished
2023/03/17 17:48:32 T[11] 3125 worker: 2 ready
2023/03/17 17:48:33 T[11] 4128 worker: 2 finished
scheduleCallable
ScheduledThreadPoolExecutor Java Examples 建立一個執行緒池,固定數量 1 條執行緒,執行 3 個任務,每個任務耗時 1 秒完成,有傳回值, Java ScheduledThreadPoolExecutor 初始延遲 1 秒後才會開始執行。
protected class CallWorker implements Callable<String> {
private long start;
private int id;
private String result;
public CallWorker(long start, int id) {
this.start = start;
this.id = id;
}
public int getId() {
return id;
}
@Override
public String call() throws Exception {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
System.out.println(String.format("%s T[%d] %d worker: %d ready", sdf.format(new Date()),
Thread.currentThread().getId(), System.currentTimeMillis() - start, id));
TimeUnit.SECONDS.sleep(1);
System.out.println(String.format("%s T[%d] %d worker: %d finished", sdf.format(new Date()),
Thread.currentThread().getId(), System.currentTimeMillis() - start, id));
result = id + " OK";
return result;
}
}
@Test
public void scheduleCallable() {
int poolSize = 1;
ScheduledThreadPoolExecutor executorService = new ScheduledThreadPoolExecutor(poolSize);
int taskSize = 3;
SimpleDateFormat df = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
System.out.println(String.format("%s T[%d] init", df.format(new Date()), Thread.currentThread().getId()));
long start = System.currentTimeMillis();
List<ScheduledFuture<String>> futures = new ArrayList<>();
IntStream.range(0, taskSize).forEach(e -> {
ScheduledFuture<String> future = executorService.schedule(new CallWorker(start, e), 1, TimeUnit.SECONDS);
futures.add(future);
});
try {
executorService.awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
2023/03/17 17:49:45 T[1] init
2023/03/17 17:49:46 T[11] 1103 worker: 0 ready
2023/03/17 17:49:47 T[11] 2113 worker: 0 finished
2023/03/17 17:49:47 T[11] 2115 worker: 1 ready
2023/03/17 17:49:48 T[11] 3119 worker: 1 finished
2023/03/17 17:49:48 T[11] 3120 worker: 2 ready
2023/03/17 17:49:49 T[11] 4124 worker: 2 finished
scheduleAtFixedRate
ScheduledThreadPoolExecutor Java Examples 建立一個執行緒池,固定數量 1 條執行緒,執行 3 個任務,每個任務耗時 1 秒完成, Java ScheduledThreadPoolExecutor 初始延遲 1 秒後才會開始執行,每個週期 1 秒,固定週期執行任務,當任務的執行時間大於週期時,下一個週期任務將在上一個執行完畢後馬上執行,延遲的任務會補上執行。
@Test
public void scheduleAtFixedRate() {
int poolSize = 1;
ScheduledThreadPoolExecutor executorService = new ScheduledThreadPoolExecutor(poolSize);
int taskSize = 3;
SimpleDateFormat df = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
System.out.println(String.format("%s T[%d] init", df.format(new Date()), Thread.currentThread().getId()));
long start = System.currentTimeMillis();
List<ScheduledFuture<?>> futures = new ArrayList<>();
IntStream.range(0, taskSize).forEach(e -> {
ScheduledFuture<?> future = executorService.scheduleAtFixedRate(new RunWorker(start, e), 1, 1,
TimeUnit.SECONDS);
futures.add(future);
});
try {
executorService.awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
2023/03/17 17:51:03 T[1] init
2023/03/17 17:51:04 T[11] 1096 worker: 0 ready
2023/03/17 17:51:05 T[11] 2099 worker: 0 finished
2023/03/17 17:51:05 T[11] 2101 worker: 1 ready
2023/03/17 17:51:06 T[11] 3107 worker: 1 finished
2023/03/17 17:51:06 T[11] 3108 worker: 2 ready
2023/03/17 17:51:07 T[11] 4112 worker: 2 finished
2023/03/17 17:51:07 T[11] 4114 worker: 0 ready
2023/03/17 17:51:08 T[11] 5115 worker: 0 finished
2023/03/17 17:51:08 T[11] 5116 worker: 1 ready
2023/03/17 17:51:09 T[11] 6121 worker: 1 finished
2023/03/17 17:51:09 T[11] 6124 worker: 2 ready
2023/03/17 17:51:10 T[11] 7132 worker: 2 finished
2023/03/17 17:51:10 T[11] 7136 worker: 0 ready
2023/03/17 17:51:11 T[11] 8141 worker: 0 finished
2023/03/17 17:51:11 T[11] 8143 worker: 1 ready
scheduleWithFixedDelay
ScheduledThreadPoolExecutor Java Examples 建立一個執行緒池,固定數量 1 條執行緒,執行 3 個任務,每個任務耗時 1 秒完成, Java ScheduledThreadPoolExecutor 初始延遲 1 秒後才會開始執行,固定延遲執行任務,無論任務的執行時間是否大於週期時,固定延遲 3 秒才會進入下一個週期,延遲的任務不會執行。
@Test
public void scheduleWithFixedDelay() {
int poolSize = 1;
ScheduledThreadPoolExecutor executorService = new ScheduledThreadPoolExecutor(poolSize);
int taskSize = 3;
SimpleDateFormat df = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
System.out.println(String.format("%s T[%d] init", df.format(new Date()), Thread.currentThread().getId()));
long start = System.currentTimeMillis();
List<ScheduledFuture<?>> futures = new ArrayList<>();
IntStream.range(0, taskSize).forEach(e -> {
ScheduledFuture<?> future = executorService.scheduleWithFixedDelay(new RunWorker(start, e), 1, 3,
TimeUnit.SECONDS);
futures.add(future);
});
try {
executorService.awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
2023/03/17 17:52:01 T[1] init
2023/03/17 17:52:02 T[11] 1093 worker: 0 ready
2023/03/17 17:52:03 T[11] 2104 worker: 0 finished
2023/03/17 17:52:03 T[11] 2106 worker: 1 ready
2023/03/17 17:52:04 T[11] 3109 worker: 1 finished
2023/03/17 17:52:04 T[11] 3111 worker: 2 ready
2023/03/17 17:52:05 T[11] 4117 worker: 2 finished
2023/03/17 17:52:06 T[11] 5111 worker: 0 ready
2023/03/17 17:52:07 T[11] 6117 worker: 0 finished
2023/03/17 17:52:07 T[11] 6118 worker: 1 ready
2023/03/17 17:52:08 T[11] 7124 worker: 1 finished
2023/03/17 17:52:08 T[11] 7128 worker: 2 ready
ScheduledThreadPoolExecutorWithExamplesTest.java
Java ScheduledThreadPoolExecutor 新增單元測試,驗證 Scheduled Thread Pool Executor 是否符合預期。
package org.ruoxue.java_147.multithreading.scheduledthreadpoolexecutor;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import org.junit.Test;
public class ScheduledThreadPoolExecutorWithExamplesTest {
protected class RunWorker implements Runnable {
private long start;
private int id;
public RunWorker(long start, int id) {
this.start = start;
this.id = id;
}
@Override
public void run() {
try {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
System.out.println(String.format("%s T[%d] %d worker: %d ready", sdf.format(new Date()),
Thread.currentThread().getId(), System.currentTimeMillis() - start, id));
TimeUnit.SECONDS.sleep(1);
System.out.println(String.format("%s T[%d] %d worker: %d finished", sdf.format(new Date()),
Thread.currentThread().getId(), System.currentTimeMillis() - start, id));
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
}
@Test
public void scheduleRunnable() {
int poolSize = 1;
ScheduledThreadPoolExecutor executorService = new ScheduledThreadPoolExecutor(poolSize);
int taskSize = 3;
SimpleDateFormat df = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
System.out.println(String.format("%s T[%d] init", df.format(new Date()), Thread.currentThread().getId()));
long start = System.currentTimeMillis();
List<ScheduledFuture<?>> futures = new ArrayList<>();
IntStream.range(0, taskSize).forEach(e -> {
ScheduledFuture<?> future = executorService.schedule(new RunWorker(start, e), 1, TimeUnit.SECONDS);
futures.add(future);
});
try {
executorService.awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
protected class CallWorker implements Callable<String> {
private long start;
private int id;
private String result;
public CallWorker(long start, int id) {
this.start = start;
this.id = id;
}
public int getId() {
return id;
}
@Override
public String call() throws Exception {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
System.out.println(String.format("%s T[%d] %d worker: %d ready", sdf.format(new Date()),
Thread.currentThread().getId(), System.currentTimeMillis() - start, id));
TimeUnit.SECONDS.sleep(1);
System.out.println(String.format("%s T[%d] %d worker: %d finished", sdf.format(new Date()),
Thread.currentThread().getId(), System.currentTimeMillis() - start, id));
result = id + " OK";
return result;
}
}
@Test
public void scheduleCallable() {
int poolSize = 1;
ScheduledThreadPoolExecutor executorService = new ScheduledThreadPoolExecutor(poolSize);
int taskSize = 3;
SimpleDateFormat df = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
System.out.println(String.format("%s T[%d] init", df.format(new Date()), Thread.currentThread().getId()));
long start = System.currentTimeMillis();
List<ScheduledFuture<String>> futures = new ArrayList<>();
IntStream.range(0, taskSize).forEach(e -> {
ScheduledFuture<String> future = executorService.schedule(new CallWorker(start, e), 1, TimeUnit.SECONDS);
futures.add(future);
});
try {
executorService.awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
@Test
public void scheduleAtFixedRate() {
int poolSize = 1;
ScheduledThreadPoolExecutor executorService = new ScheduledThreadPoolExecutor(poolSize);
int taskSize = 3;
SimpleDateFormat df = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
System.out.println(String.format("%s T[%d] init", df.format(new Date()), Thread.currentThread().getId()));
long start = System.currentTimeMillis();
List<ScheduledFuture<?>> futures = new ArrayList<>();
IntStream.range(0, taskSize).forEach(e -> {
ScheduledFuture<?> future = executorService.scheduleAtFixedRate(new RunWorker(start, e), 1, 1,
TimeUnit.SECONDS);
futures.add(future);
});
try {
executorService.awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
@Test
public void scheduleWithFixedDelay() {
int poolSize = 1;
ScheduledThreadPoolExecutor executorService = new ScheduledThreadPoolExecutor(poolSize);
int taskSize = 3;
SimpleDateFormat df = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
System.out.println(String.format("%s T[%d] init", df.format(new Date()), Thread.currentThread().getId()));
long start = System.currentTimeMillis();
List<ScheduledFuture<?>> futures = new ArrayList<>();
IntStream.range(0, taskSize).forEach(e -> {
ScheduledFuture<?> future = executorService.scheduleWithFixedDelay(new RunWorker(start, e), 1, 3,
TimeUnit.SECONDS);
futures.add(future);
});
try {
executorService.awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
}
心得分享
Scheduled Thread Pool Executor 固定週期或固定延遲執行任務,提交到執行緒池中執行,當有空閒的執行緒時,將會從佇列中取出任務執行,然後會傳回 ScheduledFuture 物件,用來判斷任務是否完成、取消任務、取得結果等, Java ScheduledThreadPoolExecutor 提供了幾種常見方法的操作範例。