Java ScheduledExecutorService Interface - Java 147

Java ScheduledExecutorService Interface – Java 147

Java ScheduledExecutorService Interface

固定週期或固定延遲執行任務,提交到執行緒池中執行,當有空閑的執行緒時,將會從佇列中取出任務執行,任務在不同週期內執行它的執行緒可能是不同的,傳回 ScheduledFuture 物件,用來判斷任務是否完成、取消任務、取得結果等, ScheduledExecutorService Interface 本篇增加了範例,並透過單元測試來驗證產出結果。

檔案目錄

./
   +- src
       +- test
       |   +- org
       |       +- ruoxue
       |           +- java_147
       |               +- multithreading
       |                   +- scheduledexecutorservice
       |                       +- ScheduledExecutorServiceInterfaceTest.java   

單元測試

ScheduledExecutorService Interface Java 提供週期與延遲執行任務等,交由執行緒池中的執行緒執行。

cancelFalse

ScheduledExecutorService Interface Java 建立一個執行緒池,固定數量 1 條執行緒,執行 1 個任務,每個任務耗時約 6 秒完成, Interface ScheduledExecutorService Java 初始延遲 1 秒後才會開始執行,主執行緒 4 秒後發送 cancel 取消任務執行。

	@Test
	public void cancelFalse() {
		int poolSize = 1;
		ScheduledExecutorService executorService = Executors.newScheduledThreadPool(poolSize);
		SimpleDateFormat df = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
		System.out.println(String.format("%s T[%d] init", df.format(new Date()), Thread.currentThread().getId()));
		long start = System.currentTimeMillis();
		ScheduledFuture<?> future = executorService.scheduleAtFixedRate(() -> {
			try {
				SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
				System.out.println(String.format("%s T[%d] %d task ready", sdf.format(new Date()),
						Thread.currentThread().getId(), System.currentTimeMillis() - start));
				for (int i = 0; i < 100000; i++) {
					byte[] bytes = new byte[1024 * 1000];
				}
				System.out.println(String.format("%s T[%d] %d task finished", sdf.format(new Date()),
						Thread.currentThread().getId(), System.currentTimeMillis() - start));
			} catch (Exception ex) {
				ex.printStackTrace();
			}
		}, 1_000, 1_000, TimeUnit.MILLISECONDS);

		try {
			TimeUnit.SECONDS.sleep(4);
		} catch (InterruptedException ex) {
			ex.printStackTrace();
		}

		if (false == future.isDone()) {
			boolean cancel = future.cancel(false);
			System.out.println(String.format("%s T[%d] %d task cancel: %b, done: %b", df.format(new Date()),
					Thread.currentThread().getId(), System.currentTimeMillis() - start, cancel, future.isDone()));
		}

		try {
			executorService.awaitTermination(20, TimeUnit.SECONDS);
		} catch (InterruptedException ex) {
			ex.printStackTrace();
		}
	}
2023/03/17 16:38:03 T[1] init
2023/03/17 16:38:04 T[11] 1098 task ready
2023/03/17 16:38:07 T[1] 4096 task cancel: true, done: true
2023/03/17 16:38:11 T[11] 7592 task finished

cancelTrue

ScheduledExecutorService Interface Java 建立一個執行緒池,固定數量 1 條執行緒,執行 1 個任務,每個任務耗時約 6 秒完成, Interface ScheduledExecutorService Java 初始延遲 1 秒後才會開始執行,主執行緒 4 秒後發送 cancel 取消任務執行,任務內必須檢查執行緒是否已被中斷 isInterrupted ,若為真則拋出 InterruptedException 例外,中斷任務。

	@Test
	public void cancelTrue() {
		int poolSize = 1;
		ScheduledExecutorService executorService = Executors.newScheduledThreadPool(poolSize);
		SimpleDateFormat df = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
		System.out.println(String.format("%s T[%d] init", df.format(new Date()), Thread.currentThread().getId()));
		long start = System.currentTimeMillis();
		ScheduledFuture<?> future = executorService.scheduleAtFixedRate(() -> {
			try {
				SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
				System.out.println(String.format("%s T[%d] %d task ready", sdf.format(new Date()),
						Thread.currentThread().getId(), System.currentTimeMillis() - start));
				for (int i = 0; i < 100000; i++) {
					byte[] bytes = new byte[1024 * 1000];
					if (Thread.interrupted()) {
						throw new InterruptedException();
					}
				}
				System.out.println(String.format("%s T[%d] %d task finished", sdf.format(new Date()),
						Thread.currentThread().getId(), System.currentTimeMillis() - start));
			} catch (InterruptedException ex) {
				ex.printStackTrace();
				Thread.currentThread().interrupt();
			}
		}, 1_000, 1_000, TimeUnit.MILLISECONDS);

		try {
			TimeUnit.SECONDS.sleep(4);
		} catch (InterruptedException ex) {
			ex.printStackTrace();
		}

		if (false == future.isDone()) {
			boolean cancel = future.cancel(true);
			System.out.println(String.format("%s T[%d] %d task cancel: %b", df.format(new Date()),
					Thread.currentThread().getId(), System.currentTimeMillis() - start, cancel));
		}

		try {
			executorService.awaitTermination(20, TimeUnit.SECONDS);
		} catch (InterruptedException ex) {
			ex.printStackTrace();
		}
	}
2023/03/17 16:43:07 T[1] init
2023/03/17 16:43:08 T[11] 1109 task ready
java.lang.InterruptedException
	at org.ruoxue.java_147.multithreading.ScheduledExecutorServiceInterfaceTest.lambda$1(ScheduledExecutorServiceInterfaceTest.java:70)
2023/03/17 16:43:11 T[1] 4108 task cancel: true

noCatchUp

ScheduledExecutorService Interface Java 建立一個執行緒池,固定數量 1 條執行緒,執行 2 個任務, A 任務耗時 3 秒完成, Interface ScheduledExecutorService Java 初始延遲 1 秒後才會開始執行,固定延遲執行任務,當任務完成時,延遲 1 秒後才會進入下一個週期,主執行緒 4 秒後發送 cancel 取消 A 任務執行,此時 B 任務會在 A 任務取消後立即執行。

	@Test
	public void noCatchUp() {
		int poolSize = 1;
		ScheduledExecutorService executorService = Executors.newScheduledThreadPool(poolSize);
		SimpleDateFormat df = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
		System.out.println(String.format("%s T[%d] init", df.format(new Date()), Thread.currentThread().getId()));
		long start = System.currentTimeMillis();
		ScheduledFuture<?> futureA = executorService.scheduleWithFixedDelay(() -> {
			try {
				SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
				System.out.println(String.format("%s T[%d] %d task: A ready", sdf.format(new Date()),
						Thread.currentThread().getId(), System.currentTimeMillis() - start));
				TimeUnit.SECONDS.sleep(3);
				System.out.println(String.format("%s T[%d] %d task: A finished", sdf.format(new Date()),
						Thread.currentThread().getId(), System.currentTimeMillis() - start));
			} catch (InterruptedException ex) {
				ex.printStackTrace();
			}
		}, 1_000, 1_000, TimeUnit.MILLISECONDS);

		ScheduledFuture<?> futureB = executorService.scheduleWithFixedDelay(() -> {
			SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
			try {
				System.out.println(String.format("%s T[%d] %d task: B ready", sdf.format(new Date()),
						Thread.currentThread().getId(), System.currentTimeMillis() - start));
				System.out.println(String.format("%s T[%d] %d task: B finished", sdf.format(new Date()),
						Thread.currentThread().getId(), System.currentTimeMillis() - start));
			} catch (Exception ex) {
				ex.printStackTrace();
			}
		}, 1_000, 1_000, TimeUnit.MILLISECONDS);

		try {
			TimeUnit.SECONDS.sleep(4);
		} catch (InterruptedException ex) {
			ex.printStackTrace();
		}

		if (false == futureA.isDone()) {
			futureA.cancel(true);
		}

		try {
			executorService.awaitTermination(10, TimeUnit.SECONDS);
		} catch (InterruptedException ex) {
			ex.printStackTrace();
		}
	}
2023/03/17 16:45:48 T[1] init
2023/03/17 16:45:49 T[11] 1103 task: A ready
java.lang.InterruptedException: sleep interrupted
	at java.lang.Thread.sleep(Native Method)
	at java.lang.Thread.sleep(Thread.java:342)
	at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:386)
	at org.ruoxue.java_147.multithreading.ScheduledExecutorServiceInterfaceTest.lambda$2(ScheduledExecutorServiceInterfaceTest.java:112)

2023/03/17 16:45:52 T[11] 4105 task: B ready
2023/03/17 16:45:52 T[11] 4106 task: B finished
2023/03/17 16:45:53 T[11] 5112 task: B ready
2023/03/17 16:45:53 T[11] 5113 task: B finished
2023/03/17 16:45:54 T[11] 6119 task: B ready
2023/03/17 16:45:54 T[11] 6119 task: B finished

catchUp

ScheduledExecutorService Interface Java 建立一個執行緒池,固定數量 1 條執行緒,執行 2 個任務, A 任務耗時 3 秒完成, Interface ScheduledExecutorService Java 初始延遲 1 秒後才會開始執行,每個週期 1 秒,主執行緒 4 秒後發送 cancel 取消 A 任務執行,此時 B 任務會在 A 任務取消後立即執行,B 延遲的任務會補上執行。

	@Test
	public void catchUp() {
		int poolSize = 1;
		ScheduledExecutorService executorService = Executors.newScheduledThreadPool(poolSize);
		SimpleDateFormat df = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
		System.out.println(String.format("%s T[%d] init", df.format(new Date()), Thread.currentThread().getId()));
		long start = System.currentTimeMillis();
		ScheduledFuture<?> futureA = executorService.scheduleAtFixedRate(() -> {
			try {
				SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
				System.out.println(String.format("%s T[%d] %d task: A ready", sdf.format(new Date()),
						Thread.currentThread().getId(), System.currentTimeMillis() - start));
				TimeUnit.SECONDS.sleep(3);
				System.out.println(String.format("%s T[%d] %d task: A finished", sdf.format(new Date()),
						Thread.currentThread().getId(), System.currentTimeMillis() - start));
			} catch (InterruptedException ex) {
				ex.printStackTrace();
			}
		}, 1_000, 1_000, TimeUnit.MILLISECONDS);

		ScheduledFuture<?> futureB = executorService.scheduleAtFixedRate(() -> {
			SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
			try {
				System.out.println(String.format("%s T[%d] %d task: B ready", sdf.format(new Date()),
						Thread.currentThread().getId(), System.currentTimeMillis() - start));
				System.out.println(String.format("%s T[%d] %d task: B finished", sdf.format(new Date()),
						Thread.currentThread().getId(), System.currentTimeMillis() - start));
			} catch (Exception ex) {
				ex.printStackTrace();
			}
		}, 1_000, 1_000, TimeUnit.MILLISECONDS);

		try {
			TimeUnit.SECONDS.sleep(4);
		} catch (InterruptedException ex) {
			ex.printStackTrace();
		}

		if (false == futureA.isDone()) {
			futureA.cancel(true);
		}

		try {
			executorService.awaitTermination(10, TimeUnit.SECONDS);
		} catch (InterruptedException ex) {
			ex.printStackTrace();
		}
	}
2023/03/17 17:00:58 T[1] init
2023/03/17 17:00:59 T[11] 1101 task: A ready
java.lang.InterruptedException: sleep interrupted
	at java.lang.Thread.sleep(Native Method)
	at java.lang.Thread.sleep(Thread.java:342)
	at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:386)
	at org.ruoxue.java_147.multithreading.ScheduledExecutorServiceInterfaceTest.lambda$4(ScheduledExecutorServiceInterfaceTest.java:161)
	
2023/03/17 17:01:02 T[11] 4103 task: B ready
2023/03/17 17:01:02 T[11] 4103 task: B finished
2023/03/17 17:01:02 T[11] 4104 task: B ready
2023/03/17 17:01:02 T[11] 4104 task: B finished
2023/03/17 17:01:02 T[11] 4105 task: B ready
2023/03/17 17:01:02 T[11] 4105 task: B finished
2023/03/17 17:01:02 T[11] 4106 task: B ready
2023/03/17 17:01:02 T[11] 4107 task: B finished
2023/03/17 17:01:03 T[11] 5096 task: B ready
2023/03/17 17:01:03 T[11] 5096 task: B finished
2023/03/17 17:01:04 T[11] 6097 task: B ready
2023/03/17 17:01:04 T[11] 6099 task: B finished

ScheduledExecutorServiceInterfaceTest.java

Interface ScheduledExecutorService Java 新增單元測試,驗證 Java ScheduledExecutorService Example 是否符合預期。

package org.ruoxue.java_147.multithreading.scheduledexecutorservice;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

import org.junit.Test;

public class ScheduledExecutorServiceInterfaceTest {

	@Test
	public void cancelFalse() {
		int poolSize = 1;
		ScheduledExecutorService executorService = Executors.newScheduledThreadPool(poolSize);
		SimpleDateFormat df = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
		System.out.println(String.format("%s T[%d] init", df.format(new Date()), Thread.currentThread().getId()));
		long start = System.currentTimeMillis();
		ScheduledFuture<?> future = executorService.scheduleAtFixedRate(() -> {
			try {
				SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
				System.out.println(String.format("%s T[%d] %d task ready", sdf.format(new Date()),
						Thread.currentThread().getId(), System.currentTimeMillis() - start));
				for (int i = 0; i < 100000; i++) {
					byte[] bytes = new byte[1024 * 1000];
				}
				System.out.println(String.format("%s T[%d] %d task finished", sdf.format(new Date()),
						Thread.currentThread().getId(), System.currentTimeMillis() - start));
			} catch (Exception ex) {
				ex.printStackTrace();
			}
		}, 1_000, 1_000, TimeUnit.MILLISECONDS);

		try {
			TimeUnit.SECONDS.sleep(4);
		} catch (InterruptedException ex) {
			ex.printStackTrace();
		}

		if (false == future.isDone()) {
			boolean cancel = future.cancel(false);
			System.out.println(String.format("%s T[%d] %d task cancel: %b, done: %b", df.format(new Date()),
					Thread.currentThread().getId(), System.currentTimeMillis() - start, cancel, future.isDone()));
		}

		try {
			executorService.awaitTermination(20, TimeUnit.SECONDS);
		} catch (InterruptedException ex) {
			ex.printStackTrace();
		}
	}

	@Test
	public void cancelTrue() {
		int poolSize = 1;
		ScheduledExecutorService executorService = Executors.newScheduledThreadPool(poolSize);
		SimpleDateFormat df = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
		System.out.println(String.format("%s T[%d] init", df.format(new Date()), Thread.currentThread().getId()));
		long start = System.currentTimeMillis();
		ScheduledFuture<?> future = executorService.scheduleAtFixedRate(() -> {
			try {
				SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
				System.out.println(String.format("%s T[%d] %d task ready", sdf.format(new Date()),
						Thread.currentThread().getId(), System.currentTimeMillis() - start));
				for (int i = 0; i < 100000; i++) {
					byte[] bytes = new byte[1024 * 1000];
					if (Thread.interrupted()) {
						throw new InterruptedException();
					}
				}
				System.out.println(String.format("%s T[%d] %d task finished", sdf.format(new Date()),
						Thread.currentThread().getId(), System.currentTimeMillis() - start));
			} catch (InterruptedException ex) {
				ex.printStackTrace();
				Thread.currentThread().interrupt();
			}
		}, 1_000, 1_000, TimeUnit.MILLISECONDS);

		try {
			TimeUnit.SECONDS.sleep(4);
		} catch (InterruptedException ex) {
			ex.printStackTrace();
		}

		if (false == future.isDone()) {
			boolean cancel = future.cancel(true);
			System.out.println(String.format("%s T[%d] %d task cancel: %b", df.format(new Date()),
					Thread.currentThread().getId(), System.currentTimeMillis() - start, cancel));
		}

		try {
			executorService.awaitTermination(20, TimeUnit.SECONDS);
		} catch (InterruptedException ex) {
			ex.printStackTrace();
		}
	}

	@Test
	public void noCatchUp() {
		int poolSize = 1;
		ScheduledExecutorService executorService = Executors.newScheduledThreadPool(poolSize);
		SimpleDateFormat df = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
		System.out.println(String.format("%s T[%d] init", df.format(new Date()), Thread.currentThread().getId()));
		long start = System.currentTimeMillis();
		ScheduledFuture<?> futureA = executorService.scheduleWithFixedDelay(() -> {
			try {
				SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
				System.out.println(String.format("%s T[%d] %d task: A ready", sdf.format(new Date()),
						Thread.currentThread().getId(), System.currentTimeMillis() - start));
				TimeUnit.SECONDS.sleep(3);
				System.out.println(String.format("%s T[%d] %d task: A finished", sdf.format(new Date()),
						Thread.currentThread().getId(), System.currentTimeMillis() - start));
			} catch (InterruptedException ex) {
				ex.printStackTrace();
			}
		}, 1_000, 1_000, TimeUnit.MILLISECONDS);

		ScheduledFuture<?> futureB = executorService.scheduleWithFixedDelay(() -> {
			SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
			try {
				System.out.println(String.format("%s T[%d] %d task: B ready", sdf.format(new Date()),
						Thread.currentThread().getId(), System.currentTimeMillis() - start));
				System.out.println(String.format("%s T[%d] %d task: B finished", sdf.format(new Date()),
						Thread.currentThread().getId(), System.currentTimeMillis() - start));
			} catch (Exception ex) {
				ex.printStackTrace();
			}
		}, 1_000, 1_000, TimeUnit.MILLISECONDS);

		try {
			TimeUnit.SECONDS.sleep(4);
		} catch (InterruptedException ex) {
			ex.printStackTrace();
		}

		if (false == futureA.isDone()) {
			futureA.cancel(true);
		}

		try {
			executorService.awaitTermination(10, TimeUnit.SECONDS);
		} catch (InterruptedException ex) {
			ex.printStackTrace();
		}
	}

	@Test
	public void catchUp() {
		int poolSize = 1;
		ScheduledExecutorService executorService = Executors.newScheduledThreadPool(poolSize);
		SimpleDateFormat df = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
		System.out.println(String.format("%s T[%d] init", df.format(new Date()), Thread.currentThread().getId()));
		long start = System.currentTimeMillis();
		ScheduledFuture<?> futureA = executorService.scheduleAtFixedRate(() -> {
			try {
				SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
				System.out.println(String.format("%s T[%d] %d task: A ready", sdf.format(new Date()),
						Thread.currentThread().getId(), System.currentTimeMillis() - start));
				TimeUnit.SECONDS.sleep(3);
				System.out.println(String.format("%s T[%d] %d task: A finished", sdf.format(new Date()),
						Thread.currentThread().getId(), System.currentTimeMillis() - start));
			} catch (InterruptedException ex) {
				ex.printStackTrace();
			}
		}, 1_000, 1_000, TimeUnit.MILLISECONDS);

		ScheduledFuture<?> futureB = executorService.scheduleAtFixedRate(() -> {
			SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
			try {
				System.out.println(String.format("%s T[%d] %d task: B ready", sdf.format(new Date()),
						Thread.currentThread().getId(), System.currentTimeMillis() - start));
				System.out.println(String.format("%s T[%d] %d task: B finished", sdf.format(new Date()),
						Thread.currentThread().getId(), System.currentTimeMillis() - start));
			} catch (Exception ex) {
				ex.printStackTrace();
			}
		}, 1_000, 1_000, TimeUnit.MILLISECONDS);

		try {
			TimeUnit.SECONDS.sleep(4);
		} catch (InterruptedException ex) {
			ex.printStackTrace();
		}

		if (false == futureA.isDone()) {
			futureA.cancel(true);
		}

		try {
			executorService.awaitTermination(10, TimeUnit.SECONDS);
		} catch (InterruptedException ex) {
			ex.printStackTrace();
		}
	}
}

心得分享

Java ScheduledExecutorService Example 基於 ExecutorService 的功能實現週期與延遲執行任務,定期排程的工作,交由執行緒池中的執行緒執行,由池中空閒的執行緒從任務佇列取出任務執行, Interface ScheduledExecutorService Java 提供了幾種常見方法的操作範例。

發佈留言