Table of Contents
ToggleJava ScheduledExecutorService Interface
固定週期或固定延遲執行任務,提交到執行緒池中執行,當有空閑的執行緒時,將會從佇列中取出任務執行,任務在不同週期內執行它的執行緒可能是不同的,傳回 ScheduledFuture 物件,用來判斷任務是否完成、取消任務、取得結果等, ScheduledExecutorService Interface 本篇增加了範例,並透過單元測試來驗證產出結果。
檔案目錄
./
+- src
+- test
| +- org
| +- ruoxue
| +- java_147
| +- multithreading
| +- scheduledexecutorservice
| +- ScheduledExecutorServiceInterfaceTest.java
單元測試
ScheduledExecutorService Interface Java 提供週期與延遲執行任務等,交由執行緒池中的執行緒執行。
cancelFalse
ScheduledExecutorService Interface Java 建立一個執行緒池,固定數量 1 條執行緒,執行 1 個任務,每個任務耗時約 6 秒完成, Interface ScheduledExecutorService Java 初始延遲 1 秒後才會開始執行,主執行緒 4 秒後發送 cancel 取消任務執行。
@Test
public void cancelFalse() {
int poolSize = 1;
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(poolSize);
SimpleDateFormat df = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
System.out.println(String.format("%s T[%d] init", df.format(new Date()), Thread.currentThread().getId()));
long start = System.currentTimeMillis();
ScheduledFuture<?> future = executorService.scheduleAtFixedRate(() -> {
try {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
System.out.println(String.format("%s T[%d] %d task ready", sdf.format(new Date()),
Thread.currentThread().getId(), System.currentTimeMillis() - start));
for (int i = 0; i < 100000; i++) {
byte[] bytes = new byte[1024 * 1000];
}
System.out.println(String.format("%s T[%d] %d task finished", sdf.format(new Date()),
Thread.currentThread().getId(), System.currentTimeMillis() - start));
} catch (Exception ex) {
ex.printStackTrace();
}
}, 1_000, 1_000, TimeUnit.MILLISECONDS);
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
if (false == future.isDone()) {
boolean cancel = future.cancel(false);
System.out.println(String.format("%s T[%d] %d task cancel: %b, done: %b", df.format(new Date()),
Thread.currentThread().getId(), System.currentTimeMillis() - start, cancel, future.isDone()));
}
try {
executorService.awaitTermination(20, TimeUnit.SECONDS);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
2023/03/17 16:38:03 T[1] init
2023/03/17 16:38:04 T[11] 1098 task ready
2023/03/17 16:38:07 T[1] 4096 task cancel: true, done: true
2023/03/17 16:38:11 T[11] 7592 task finished
cancelTrue
ScheduledExecutorService Interface Java 建立一個執行緒池,固定數量 1 條執行緒,執行 1 個任務,每個任務耗時約 6 秒完成, Interface ScheduledExecutorService Java 初始延遲 1 秒後才會開始執行,主執行緒 4 秒後發送 cancel 取消任務執行,任務內必須檢查執行緒是否已被中斷 isInterrupted ,若為真則拋出 InterruptedException 例外,中斷任務。
@Test
public void cancelTrue() {
int poolSize = 1;
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(poolSize);
SimpleDateFormat df = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
System.out.println(String.format("%s T[%d] init", df.format(new Date()), Thread.currentThread().getId()));
long start = System.currentTimeMillis();
ScheduledFuture<?> future = executorService.scheduleAtFixedRate(() -> {
try {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
System.out.println(String.format("%s T[%d] %d task ready", sdf.format(new Date()),
Thread.currentThread().getId(), System.currentTimeMillis() - start));
for (int i = 0; i < 100000; i++) {
byte[] bytes = new byte[1024 * 1000];
if (Thread.interrupted()) {
throw new InterruptedException();
}
}
System.out.println(String.format("%s T[%d] %d task finished", sdf.format(new Date()),
Thread.currentThread().getId(), System.currentTimeMillis() - start));
} catch (InterruptedException ex) {
ex.printStackTrace();
Thread.currentThread().interrupt();
}
}, 1_000, 1_000, TimeUnit.MILLISECONDS);
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
if (false == future.isDone()) {
boolean cancel = future.cancel(true);
System.out.println(String.format("%s T[%d] %d task cancel: %b", df.format(new Date()),
Thread.currentThread().getId(), System.currentTimeMillis() - start, cancel));
}
try {
executorService.awaitTermination(20, TimeUnit.SECONDS);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
2023/03/17 16:43:07 T[1] init
2023/03/17 16:43:08 T[11] 1109 task ready
java.lang.InterruptedException
at org.ruoxue.java_147.multithreading.ScheduledExecutorServiceInterfaceTest.lambda$1(ScheduledExecutorServiceInterfaceTest.java:70)
2023/03/17 16:43:11 T[1] 4108 task cancel: true
noCatchUp
ScheduledExecutorService Interface Java 建立一個執行緒池,固定數量 1 條執行緒,執行 2 個任務, A 任務耗時 3 秒完成, Interface ScheduledExecutorService Java 初始延遲 1 秒後才會開始執行,固定延遲執行任務,當任務完成時,延遲 1 秒後才會進入下一個週期,主執行緒 4 秒後發送 cancel 取消 A 任務執行,此時 B 任務會在 A 任務取消後立即執行。
@Test
public void noCatchUp() {
int poolSize = 1;
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(poolSize);
SimpleDateFormat df = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
System.out.println(String.format("%s T[%d] init", df.format(new Date()), Thread.currentThread().getId()));
long start = System.currentTimeMillis();
ScheduledFuture<?> futureA = executorService.scheduleWithFixedDelay(() -> {
try {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
System.out.println(String.format("%s T[%d] %d task: A ready", sdf.format(new Date()),
Thread.currentThread().getId(), System.currentTimeMillis() - start));
TimeUnit.SECONDS.sleep(3);
System.out.println(String.format("%s T[%d] %d task: A finished", sdf.format(new Date()),
Thread.currentThread().getId(), System.currentTimeMillis() - start));
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}, 1_000, 1_000, TimeUnit.MILLISECONDS);
ScheduledFuture<?> futureB = executorService.scheduleWithFixedDelay(() -> {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
try {
System.out.println(String.format("%s T[%d] %d task: B ready", sdf.format(new Date()),
Thread.currentThread().getId(), System.currentTimeMillis() - start));
System.out.println(String.format("%s T[%d] %d task: B finished", sdf.format(new Date()),
Thread.currentThread().getId(), System.currentTimeMillis() - start));
} catch (Exception ex) {
ex.printStackTrace();
}
}, 1_000, 1_000, TimeUnit.MILLISECONDS);
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
if (false == futureA.isDone()) {
futureA.cancel(true);
}
try {
executorService.awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
2023/03/17 16:45:48 T[1] init
2023/03/17 16:45:49 T[11] 1103 task: A ready
java.lang.InterruptedException: sleep interrupted
at java.lang.Thread.sleep(Native Method)
at java.lang.Thread.sleep(Thread.java:342)
at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:386)
at org.ruoxue.java_147.multithreading.ScheduledExecutorServiceInterfaceTest.lambda$2(ScheduledExecutorServiceInterfaceTest.java:112)
2023/03/17 16:45:52 T[11] 4105 task: B ready
2023/03/17 16:45:52 T[11] 4106 task: B finished
2023/03/17 16:45:53 T[11] 5112 task: B ready
2023/03/17 16:45:53 T[11] 5113 task: B finished
2023/03/17 16:45:54 T[11] 6119 task: B ready
2023/03/17 16:45:54 T[11] 6119 task: B finished
catchUp
ScheduledExecutorService Interface Java 建立一個執行緒池,固定數量 1 條執行緒,執行 2 個任務, A 任務耗時 3 秒完成, Interface ScheduledExecutorService Java 初始延遲 1 秒後才會開始執行,每個週期 1 秒,主執行緒 4 秒後發送 cancel 取消 A 任務執行,此時 B 任務會在 A 任務取消後立即執行,B 延遲的任務會補上執行。
@Test
public void catchUp() {
int poolSize = 1;
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(poolSize);
SimpleDateFormat df = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
System.out.println(String.format("%s T[%d] init", df.format(new Date()), Thread.currentThread().getId()));
long start = System.currentTimeMillis();
ScheduledFuture<?> futureA = executorService.scheduleAtFixedRate(() -> {
try {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
System.out.println(String.format("%s T[%d] %d task: A ready", sdf.format(new Date()),
Thread.currentThread().getId(), System.currentTimeMillis() - start));
TimeUnit.SECONDS.sleep(3);
System.out.println(String.format("%s T[%d] %d task: A finished", sdf.format(new Date()),
Thread.currentThread().getId(), System.currentTimeMillis() - start));
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}, 1_000, 1_000, TimeUnit.MILLISECONDS);
ScheduledFuture<?> futureB = executorService.scheduleAtFixedRate(() -> {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
try {
System.out.println(String.format("%s T[%d] %d task: B ready", sdf.format(new Date()),
Thread.currentThread().getId(), System.currentTimeMillis() - start));
System.out.println(String.format("%s T[%d] %d task: B finished", sdf.format(new Date()),
Thread.currentThread().getId(), System.currentTimeMillis() - start));
} catch (Exception ex) {
ex.printStackTrace();
}
}, 1_000, 1_000, TimeUnit.MILLISECONDS);
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
if (false == futureA.isDone()) {
futureA.cancel(true);
}
try {
executorService.awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
2023/03/17 17:00:58 T[1] init
2023/03/17 17:00:59 T[11] 1101 task: A ready
java.lang.InterruptedException: sleep interrupted
at java.lang.Thread.sleep(Native Method)
at java.lang.Thread.sleep(Thread.java:342)
at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:386)
at org.ruoxue.java_147.multithreading.ScheduledExecutorServiceInterfaceTest.lambda$4(ScheduledExecutorServiceInterfaceTest.java:161)
2023/03/17 17:01:02 T[11] 4103 task: B ready
2023/03/17 17:01:02 T[11] 4103 task: B finished
2023/03/17 17:01:02 T[11] 4104 task: B ready
2023/03/17 17:01:02 T[11] 4104 task: B finished
2023/03/17 17:01:02 T[11] 4105 task: B ready
2023/03/17 17:01:02 T[11] 4105 task: B finished
2023/03/17 17:01:02 T[11] 4106 task: B ready
2023/03/17 17:01:02 T[11] 4107 task: B finished
2023/03/17 17:01:03 T[11] 5096 task: B ready
2023/03/17 17:01:03 T[11] 5096 task: B finished
2023/03/17 17:01:04 T[11] 6097 task: B ready
2023/03/17 17:01:04 T[11] 6099 task: B finished
ScheduledExecutorServiceInterfaceTest.java
Interface ScheduledExecutorService Java 新增單元測試,驗證 Java ScheduledExecutorService Example 是否符合預期。
package org.ruoxue.java_147.multithreading.scheduledexecutorservice;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
public class ScheduledExecutorServiceInterfaceTest {
@Test
public void cancelFalse() {
int poolSize = 1;
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(poolSize);
SimpleDateFormat df = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
System.out.println(String.format("%s T[%d] init", df.format(new Date()), Thread.currentThread().getId()));
long start = System.currentTimeMillis();
ScheduledFuture<?> future = executorService.scheduleAtFixedRate(() -> {
try {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
System.out.println(String.format("%s T[%d] %d task ready", sdf.format(new Date()),
Thread.currentThread().getId(), System.currentTimeMillis() - start));
for (int i = 0; i < 100000; i++) {
byte[] bytes = new byte[1024 * 1000];
}
System.out.println(String.format("%s T[%d] %d task finished", sdf.format(new Date()),
Thread.currentThread().getId(), System.currentTimeMillis() - start));
} catch (Exception ex) {
ex.printStackTrace();
}
}, 1_000, 1_000, TimeUnit.MILLISECONDS);
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
if (false == future.isDone()) {
boolean cancel = future.cancel(false);
System.out.println(String.format("%s T[%d] %d task cancel: %b, done: %b", df.format(new Date()),
Thread.currentThread().getId(), System.currentTimeMillis() - start, cancel, future.isDone()));
}
try {
executorService.awaitTermination(20, TimeUnit.SECONDS);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
@Test
public void cancelTrue() {
int poolSize = 1;
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(poolSize);
SimpleDateFormat df = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
System.out.println(String.format("%s T[%d] init", df.format(new Date()), Thread.currentThread().getId()));
long start = System.currentTimeMillis();
ScheduledFuture<?> future = executorService.scheduleAtFixedRate(() -> {
try {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
System.out.println(String.format("%s T[%d] %d task ready", sdf.format(new Date()),
Thread.currentThread().getId(), System.currentTimeMillis() - start));
for (int i = 0; i < 100000; i++) {
byte[] bytes = new byte[1024 * 1000];
if (Thread.interrupted()) {
throw new InterruptedException();
}
}
System.out.println(String.format("%s T[%d] %d task finished", sdf.format(new Date()),
Thread.currentThread().getId(), System.currentTimeMillis() - start));
} catch (InterruptedException ex) {
ex.printStackTrace();
Thread.currentThread().interrupt();
}
}, 1_000, 1_000, TimeUnit.MILLISECONDS);
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
if (false == future.isDone()) {
boolean cancel = future.cancel(true);
System.out.println(String.format("%s T[%d] %d task cancel: %b", df.format(new Date()),
Thread.currentThread().getId(), System.currentTimeMillis() - start, cancel));
}
try {
executorService.awaitTermination(20, TimeUnit.SECONDS);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
@Test
public void noCatchUp() {
int poolSize = 1;
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(poolSize);
SimpleDateFormat df = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
System.out.println(String.format("%s T[%d] init", df.format(new Date()), Thread.currentThread().getId()));
long start = System.currentTimeMillis();
ScheduledFuture<?> futureA = executorService.scheduleWithFixedDelay(() -> {
try {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
System.out.println(String.format("%s T[%d] %d task: A ready", sdf.format(new Date()),
Thread.currentThread().getId(), System.currentTimeMillis() - start));
TimeUnit.SECONDS.sleep(3);
System.out.println(String.format("%s T[%d] %d task: A finished", sdf.format(new Date()),
Thread.currentThread().getId(), System.currentTimeMillis() - start));
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}, 1_000, 1_000, TimeUnit.MILLISECONDS);
ScheduledFuture<?> futureB = executorService.scheduleWithFixedDelay(() -> {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
try {
System.out.println(String.format("%s T[%d] %d task: B ready", sdf.format(new Date()),
Thread.currentThread().getId(), System.currentTimeMillis() - start));
System.out.println(String.format("%s T[%d] %d task: B finished", sdf.format(new Date()),
Thread.currentThread().getId(), System.currentTimeMillis() - start));
} catch (Exception ex) {
ex.printStackTrace();
}
}, 1_000, 1_000, TimeUnit.MILLISECONDS);
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
if (false == futureA.isDone()) {
futureA.cancel(true);
}
try {
executorService.awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
@Test
public void catchUp() {
int poolSize = 1;
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(poolSize);
SimpleDateFormat df = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
System.out.println(String.format("%s T[%d] init", df.format(new Date()), Thread.currentThread().getId()));
long start = System.currentTimeMillis();
ScheduledFuture<?> futureA = executorService.scheduleAtFixedRate(() -> {
try {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
System.out.println(String.format("%s T[%d] %d task: A ready", sdf.format(new Date()),
Thread.currentThread().getId(), System.currentTimeMillis() - start));
TimeUnit.SECONDS.sleep(3);
System.out.println(String.format("%s T[%d] %d task: A finished", sdf.format(new Date()),
Thread.currentThread().getId(), System.currentTimeMillis() - start));
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}, 1_000, 1_000, TimeUnit.MILLISECONDS);
ScheduledFuture<?> futureB = executorService.scheduleAtFixedRate(() -> {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
try {
System.out.println(String.format("%s T[%d] %d task: B ready", sdf.format(new Date()),
Thread.currentThread().getId(), System.currentTimeMillis() - start));
System.out.println(String.format("%s T[%d] %d task: B finished", sdf.format(new Date()),
Thread.currentThread().getId(), System.currentTimeMillis() - start));
} catch (Exception ex) {
ex.printStackTrace();
}
}, 1_000, 1_000, TimeUnit.MILLISECONDS);
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
if (false == futureA.isDone()) {
futureA.cancel(true);
}
try {
executorService.awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
}
心得分享
Java ScheduledExecutorService Example 基於 ExecutorService 的功能實現週期與延遲執行任務,定期排程的工作,交由執行緒池中的執行緒執行,由池中空閒的執行緒從任務佇列取出任務執行, Interface ScheduledExecutorService Java 提供了幾種常見方法的操作範例。