Table of Contents
ToggleJava ScheduledThreadPoolExecutor Class
固定週期或固定延遲執行任務,提交到執行緒池中執行,當有空閑的執行緒時,將會從佇列中取出任務執行,任務在不同週期內執行它的執行緒可能是不同的,傳回 ScheduledFuture 物件,用來判斷任務是否完成、取消任務、取得結果等, ScheduledThreadPoolExecutor Class 本篇增加了範例,並透過單元測試來驗證產出結果。
檔案目錄
./
+- src
+- test
| +- org
| +- ruoxue
| +- java_147
| +- multithreading
| +- scheduledthreadpoolexecutor
| +- ScheduledThreadPoolExecutorClassTest.java
單元測試
ScheduledThreadPoolExecutor Class Java 提供週期與延遲執行任務等,交由執行緒池中的執行緒執行。
cancelFalse
ScheduledThreadPoolExecutor Class Java 建立一個執行緒池,固定數量 1 條執行緒,執行 1 個任務,每個任務耗時 1 秒完成, Class ScheduledThreadPoolExecutor Java 初始延遲 1 秒後才會開始執行,當計數器到 3 時,主執行緒發送 cancel 取消任務執行。
protected class CancelFalseWorker implements Runnable {
private AtomicInteger counter;
private long start;
public CancelFalseWorker(AtomicInteger counter, long start) {
this.counter = counter;
this.start = start;
}
@Override
public void run() {
try {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
System.out.println(String.format("%s T[%d] %d worker ready", sdf.format(new Date()),
Thread.currentThread().getId(), System.currentTimeMillis() - start));
counter.getAndIncrement();
System.out.println(String.format("%s T[%d] %d worker finished, counter: %d", sdf.format(new Date()),
Thread.currentThread().getId(), System.currentTimeMillis() - start, counter.get()));
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
@Test
public void cancelFalseWorker() {
int poolSize = 1;
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(poolSize);
SimpleDateFormat df = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
System.out.println(String.format("%s T[%d] init", df.format(new Date()), Thread.currentThread().getId()));
long start = System.currentTimeMillis();
AtomicInteger counter = new AtomicInteger();
ScheduledFuture<?> future = executorService.scheduleAtFixedRate(new CancelFalseWorker(counter, start), 1, 1,
TimeUnit.SECONDS);
while (false == future.isDone()) {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
if (counter.get() == 3) {
boolean cancel = future.cancel(false);
System.out.println(String.format("%s T[%d] %d worker cancel: %b, done: %b", df.format(new Date()),
Thread.currentThread().getId(), System.currentTimeMillis() - start, cancel, future.isDone()));
}
}
try {
executorService.awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
2023/03/18 19:06:41 T[1] init
2023/03/18 19:06:42 T[11] 1004 worker ready
2023/03/18 19:06:42 T[11] 1004 worker finished, counter: 1
2023/03/18 19:06:43 T[11] 2002 worker ready
2023/03/18 19:06:43 T[11] 2002 worker finished, counter: 2
2023/03/18 19:06:44 T[11] 3002 worker ready
2023/03/18 19:06:44 T[11] 3002 worker finished, counter: 3
2023/03/18 19:06:44 T[1] 3003 worker cancel: true, done: true
cancelTrue
ScheduledThreadPoolExecutor Class Java 建立一個執行緒池,固定數量 1 條執行緒,執行 1 個任務,每個任務耗時 1 秒完成, Class ScheduledThreadPoolExecutor Java 初始延遲 1 秒後才會開始執行,當計數器到 3 時,主執行緒發送 cancel 取消任務執行,任務內必須檢查執行緒是否已被中斷 isInterrupted ,若為真則拋出 InterruptedException 例外,中斷任務。
protected class CancelTrueWorker implements Runnable {
private AtomicInteger counter;
private long start;
public CancelTrueWorker(AtomicInteger counter, long start) {
this.counter = counter;
this.start = start;
}
@Override
public void run() {
try {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
System.out.println(String.format("%s T[%d] %d worker ready", sdf.format(new Date()),
Thread.currentThread().getId(), System.currentTimeMillis() - start));
counter.getAndIncrement();
TimeUnit.SECONDS.sleep(1);
System.out.println(String.format("%s T[%d] %d worker finished, counter: %d", sdf.format(new Date()),
Thread.currentThread().getId(), System.currentTimeMillis() - start, counter.get()));
} catch (InterruptedException ex) {
ex.printStackTrace();
Thread.currentThread().interrupt();
}
}
}
@Test
public void cancelTrueWorker() {
int poolSize = 1;
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(poolSize);
SimpleDateFormat df = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
System.out.println(String.format("%s T[%d] init", df.format(new Date()), Thread.currentThread().getId()));
long start = System.currentTimeMillis();
AtomicInteger counter = new AtomicInteger();
ScheduledFuture<?> future = executorService.scheduleAtFixedRate(new CancelTrueWorker(counter, start), 1, 1,
TimeUnit.SECONDS);
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
if (false == future.isDone()) {
boolean cancel = future.cancel(true);
System.out.println(String.format("%s T[%d] %d worker cancel: %b, done: %b", df.format(new Date()),
Thread.currentThread().getId(), System.currentTimeMillis() - start, cancel, future.isDone()));
}
try {
executorService.awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
2023/03/18 19:10:59 T[1] init
2023/03/18 19:11:00 T[14] 1001 worker ready
2023/03/18 19:11:01 T[14] 2002 worker finished, counter: 1
2023/03/18 19:11:01 T[14] 2002 worker ready
2023/03/18 19:11:02 T[14] 3002 worker finished, counter: 2
2023/03/18 19:11:02 T[14] 3002 worker ready
2023/03/18 19:11:03 T[1] 4001 worker cancel: true, done: true
java.lang.InterruptedException: sleep interrupted
at java.lang.Thread.sleep(Native Method)
at java.lang.Thread.sleep(Thread.java:340)
at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:386)
at org.ruoxue.java_147.multithreading.ScheduledThreadPoolExecutorClassTest$CancelTrueWorker.run(ScheduledThreadPoolExecutorClassTest.java:88)
noCatchUp
ScheduledThreadPoolExecutor Class Java 建立一個執行緒池,固定數量 1 條執行緒,執行 1 個任務,當計數器為 1 時,任務耗時 3 秒完成, Class ScheduledThreadPoolExecutor Java 初始延遲 1 秒後才會開始執行,固定延遲執行任務,當任務完成時,延遲 1 秒後才會進入下一個週期,下個任務會立即執行,延遲的任務不會執行。
protected class NoCatchUpWorker implements Runnable {
private AtomicInteger counter;
private long start;
public NoCatchUpWorker(AtomicInteger counter, long start) {
this.counter = counter;
this.start = start;
}
@Override
public void run() {
try {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
System.out.println(String.format("%s T[%d] %d worker ready", sdf.format(new Date()),
Thread.currentThread().getId(), System.currentTimeMillis() - start));
if (counter.get() == 0) {
TimeUnit.SECONDS.sleep(3);
}
counter.getAndIncrement();
System.out.println(String.format("%s T[%d] %d worker finished", sdf.format(new Date()),
Thread.currentThread().getId(), System.currentTimeMillis() - start));
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
}
@Test
public void noCatchUpWorker() {
int poolSize = 1;
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(poolSize);
SimpleDateFormat df = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
System.out.println(String.format("%s T[%d] init", df.format(new Date()), Thread.currentThread().getId()));
long start = System.currentTimeMillis();
AtomicInteger counter = new AtomicInteger();
ScheduledFuture<?> future = executorService.scheduleWithFixedDelay(new NoCatchUpWorker(counter, start), 1, 1,
TimeUnit.SECONDS);
try {
executorService.awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
2023/03/18 19:14:19 T[1] init
2023/03/18 19:14:20 T[11] 1002 worker ready
2023/03/18 19:14:23 T[11] 4003 worker finished
2023/03/18 19:14:24 T[11] 5005 worker ready
2023/03/18 19:14:24 T[11] 5005 worker finished
2023/03/18 19:14:25 T[11] 6006 worker ready
2023/03/18 19:14:25 T[11] 6006 worker finished
2023/03/18 19:14:26 T[11] 7007 worker ready
2023/03/18 19:14:26 T[11] 7007 worker finished
2023/03/18 19:14:27 T[11] 8008 worker ready
2023/03/18 19:14:27 T[11] 8008 worker finished
catchUp
ScheduledThreadPoolExecutor Class Java 建立一個執行緒池,固定數量 1 條執行緒,執行 1 個任務,當計數器為 1 時,任務耗時 3 秒完成, Class ScheduledThreadPoolExecutor Java 初始延遲 1 秒後才會開始執行,每個週期 1 秒,當任務完成時,下個任務會立即執行,延遲的任務會補上執行。
protected class CatchUpWorker implements Runnable {
private AtomicInteger counter;
private long start;
public CatchUpWorker(AtomicInteger counter, long start) {
this.counter = counter;
this.start = start;
}
@Override
public void run() {
try {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
System.out.println(String.format("%s T[%d] %d worker ready", sdf.format(new Date()),
Thread.currentThread().getId(), System.currentTimeMillis() - start));
if (counter.get() == 0) {
TimeUnit.SECONDS.sleep(3);
}
counter.getAndIncrement();
System.out.println(String.format("%s T[%d] %d worker finished", sdf.format(new Date()),
Thread.currentThread().getId(), System.currentTimeMillis() - start));
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
}
@Test
public void catchUpWorker() {
int poolSize = 1;
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(poolSize);
SimpleDateFormat df = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
System.out.println(String.format("%s T[%d] init", df.format(new Date()), Thread.currentThread().getId()));
long start = System.currentTimeMillis();
AtomicInteger counter = new AtomicInteger();
ScheduledFuture<?> future = executorService.scheduleAtFixedRate(new CatchUpWorker(counter, start), 1, 1,
TimeUnit.SECONDS);
try {
executorService.awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
2023/03/18 19:21:01 T[1] init
2023/03/18 19:21:02 T[11] 1002 worker ready
2023/03/18 19:21:05 T[11] 4002 worker finished
2023/03/18 19:21:05 T[11] 4002 worker ready
2023/03/18 19:21:05 T[11] 4002 worker finished
2023/03/18 19:21:05 T[11] 4002 worker ready
2023/03/18 19:21:05 T[11] 4003 worker finished
2023/03/18 19:21:05 T[11] 4003 worker ready
2023/03/18 19:21:05 T[11] 4003 worker finished
2023/03/18 19:21:06 T[11] 5003 worker ready
2023/03/18 19:21:06 T[11] 5003 worker finished
2023/03/18 19:21:07 T[11] 6002 worker ready
2023/03/18 19:21:07 T[11] 6002 worker finished
2023/03/18 19:21:08 T[11] 7002 worker ready
2023/03/18 19:21:08 T[11] 7002 worker finished
2023/03/18 19:21:09 T[11] 8001 worker ready
2023/03/18 19:21:09 T[11] 8001 worker finished
ScheduledThreadPoolExecutorClassTest.java
Class ScheduledThreadPoolExecutor Java 新增單元測試,驗證 Java ScheduledThreadPoolExecutor Example 是否符合預期。
package org.ruoxue.java_147.multithreading.scheduledthreadpoolexecutor;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Test;
public class ScheduledThreadPoolExecutorClassTest {
protected class CancelFalseWorker implements Runnable {
private AtomicInteger counter;
private long start;
public CancelFalseWorker(AtomicInteger counter, long start) {
this.counter = counter;
this.start = start;
}
@Override
public void run() {
try {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
System.out.println(String.format("%s T[%d] %d worker ready", sdf.format(new Date()),
Thread.currentThread().getId(), System.currentTimeMillis() - start));
counter.getAndIncrement();
System.out.println(String.format("%s T[%d] %d worker finished, counter: %d", sdf.format(new Date()),
Thread.currentThread().getId(), System.currentTimeMillis() - start, counter.get()));
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
@Test
public void cancelFalseWorker() {
int poolSize = 1;
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(poolSize);
SimpleDateFormat df = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
System.out.println(String.format("%s T[%d] init", df.format(new Date()), Thread.currentThread().getId()));
long start = System.currentTimeMillis();
AtomicInteger counter = new AtomicInteger();
ScheduledFuture<?> future = executorService.scheduleAtFixedRate(new CancelFalseWorker(counter, start), 1, 1,
TimeUnit.SECONDS);
while (false == future.isDone()) {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
if (counter.get() == 3) {
boolean cancel = future.cancel(false);
System.out.println(String.format("%s T[%d] %d worker cancel: %b, done: %b", df.format(new Date()),
Thread.currentThread().getId(), System.currentTimeMillis() - start, cancel, future.isDone()));
}
}
try {
executorService.awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
protected class CancelTrueWorker implements Runnable {
private AtomicInteger counter;
private long start;
public CancelTrueWorker(AtomicInteger counter, long start) {
this.counter = counter;
this.start = start;
}
@Override
public void run() {
try {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
System.out.println(String.format("%s T[%d] %d worker ready", sdf.format(new Date()),
Thread.currentThread().getId(), System.currentTimeMillis() - start));
counter.getAndIncrement();
TimeUnit.SECONDS.sleep(1);
System.out.println(String.format("%s T[%d] %d worker finished, counter: %d", sdf.format(new Date()),
Thread.currentThread().getId(), System.currentTimeMillis() - start, counter.get()));
} catch (InterruptedException ex) {
ex.printStackTrace();
Thread.currentThread().interrupt();
}
}
}
@Test
public void cancelTrueWorker() {
int poolSize = 1;
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(poolSize);
SimpleDateFormat df = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
System.out.println(String.format("%s T[%d] init", df.format(new Date()), Thread.currentThread().getId()));
long start = System.currentTimeMillis();
AtomicInteger counter = new AtomicInteger();
ScheduledFuture<?> future = executorService.scheduleAtFixedRate(new CancelTrueWorker(counter, start), 1, 1,
TimeUnit.SECONDS);
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
if (false == future.isDone()) {
boolean cancel = future.cancel(true);
System.out.println(String.format("%s T[%d] %d worker cancel: %b, done: %b", df.format(new Date()),
Thread.currentThread().getId(), System.currentTimeMillis() - start, cancel, future.isDone()));
}
try {
executorService.awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
protected class NoCatchUpWorker implements Runnable {
private AtomicInteger counter;
private long start;
public NoCatchUpWorker(AtomicInteger counter, long start) {
this.counter = counter;
this.start = start;
}
@Override
public void run() {
try {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
System.out.println(String.format("%s T[%d] %d worker ready", sdf.format(new Date()),
Thread.currentThread().getId(), System.currentTimeMillis() - start));
if (counter.get() == 0) {
TimeUnit.SECONDS.sleep(3);
}
counter.getAndIncrement();
System.out.println(String.format("%s T[%d] %d worker finished", sdf.format(new Date()),
Thread.currentThread().getId(), System.currentTimeMillis() - start));
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
}
@Test
public void noCatchUpWorker() {
int poolSize = 1;
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(poolSize);
SimpleDateFormat df = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
System.out.println(String.format("%s T[%d] init", df.format(new Date()), Thread.currentThread().getId()));
long start = System.currentTimeMillis();
AtomicInteger counter = new AtomicInteger();
ScheduledFuture<?> future = executorService.scheduleWithFixedDelay(new NoCatchUpWorker(counter, start), 1, 1,
TimeUnit.SECONDS);
try {
executorService.awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
protected class CatchUpWorker implements Runnable {
private AtomicInteger counter;
private long start;
public CatchUpWorker(AtomicInteger counter, long start) {
this.counter = counter;
this.start = start;
}
@Override
public void run() {
try {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
System.out.println(String.format("%s T[%d] %d worker ready", sdf.format(new Date()),
Thread.currentThread().getId(), System.currentTimeMillis() - start));
if (counter.get() == 0) {
TimeUnit.SECONDS.sleep(3);
}
counter.getAndIncrement();
System.out.println(String.format("%s T[%d] %d worker finished", sdf.format(new Date()),
Thread.currentThread().getId(), System.currentTimeMillis() - start));
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
}
@Test
public void catchUpWorker() {
int poolSize = 1;
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(poolSize);
SimpleDateFormat df = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
System.out.println(String.format("%s T[%d] init", df.format(new Date()), Thread.currentThread().getId()));
long start = System.currentTimeMillis();
AtomicInteger counter = new AtomicInteger();
ScheduledFuture<?> future = executorService.scheduleAtFixedRate(new CatchUpWorker(counter, start), 1, 1,
TimeUnit.SECONDS);
try {
executorService.awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
}
心得分享
Java ScheduledThreadPoolExecutor Example 繼承 ThreadPoolExecutor 實作 ExecutorService 的功能實現週期與延遲執行任務,定期排程的工作,交由執行緒池中的執行緒執行,由池中空閒的執行緒從任務佇列取出任務執行, Class ScheduledThreadPoolExecutor Java 提供了幾種常見方法的操作範例。