ScheduledThreadPoolExecutor in Java with Examples - Java 147

ScheduledThreadPoolExecutor in Java with Examples – Java 147

ScheduledThreadPoolExecutor in Java with Examples

繼承 ThreadPoolExecutor 實作 ExecutorService 的功能實現週期與延遲執行任務,重用執行緒池,定期排程的工作,交由執行緒池中的執行緒執行,任務在不同週期內執行它的執行緒可能是不同的,由池中空閒的執行緒從任務佇列取出任務執行, Java ScheduledThreadPoolExecutor Examples 本篇增加了範例,並透過單元測試來驗證產出結果。

檔案目錄

./
   +- src
       +- test
       |   +- org
       |       +- ruoxue
       |           +- java_147
       |               +- multithreading
       |                   +- scheduledthreadpoolexecutor
       |                       +- ScheduledThreadPoolExecutorWithExamplesTest.java   

單元測試

ScheduledThreadPoolExecutor Java Examples 提供週期與延遲執行任務等, Scheduled Thread Pool Executor 交由執行緒池中的執行緒執行。

scheduleRunnable

ScheduledThreadPoolExecutor Java Examples 建立一個執行緒池,固定數量 1 條執行緒,執行 3 個任務,每個任務耗時 1 秒完成,無傳回值, Java ScheduledThreadPoolExecutor 初始延遲 1 秒後才會開始執行。

	protected class RunWorker implements Runnable {

		private long start;
		private int id;

		public RunWorker(long start, int id) {
			this.start = start;
			this.id = id;
		}

		@Override
		public void run() {
			try {
				SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
				System.out.println(String.format("%s T[%d] %d worker: %d ready", sdf.format(new Date()),
						Thread.currentThread().getId(), System.currentTimeMillis() - start, id));
				TimeUnit.SECONDS.sleep(1);
				System.out.println(String.format("%s T[%d] %d worker: %d finished", sdf.format(new Date()),
						Thread.currentThread().getId(), System.currentTimeMillis() - start, id));
			} catch (InterruptedException ex) {
				ex.printStackTrace();
			}
		}
	}

	@Test
	public void scheduleRunnable() {
		int poolSize = 1;
		ScheduledThreadPoolExecutor executorService = new ScheduledThreadPoolExecutor(poolSize);
		int taskSize = 3;
		SimpleDateFormat df = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
		System.out.println(String.format("%s T[%d] init", df.format(new Date()), Thread.currentThread().getId()));
		long start = System.currentTimeMillis();
		List<ScheduledFuture<?>> futures = new ArrayList<>();
		IntStream.range(0, taskSize).forEach(e -> {
			ScheduledFuture<?> future = executorService.schedule(new RunWorker(start, e), 1, TimeUnit.SECONDS);
			futures.add(future);
		});

		try {
			executorService.awaitTermination(10, TimeUnit.SECONDS);
		} catch (InterruptedException ex) {
			ex.printStackTrace();
		}
	}
2023/03/17 17:48:29 T[1] init
2023/03/17 17:48:30 T[11] 1116 worker: 0 ready
2023/03/17 17:48:31 T[11] 2121 worker: 0 finished
2023/03/17 17:48:31 T[11] 2122 worker: 1 ready
2023/03/17 17:48:32 T[11] 3124 worker: 1 finished
2023/03/17 17:48:32 T[11] 3125 worker: 2 ready
2023/03/17 17:48:33 T[11] 4128 worker: 2 finished

scheduleCallable

ScheduledThreadPoolExecutor Java Examples 建立一個執行緒池,固定數量 1 條執行緒,執行 3 個任務,每個任務耗時 1 秒完成,有傳回值, Java ScheduledThreadPoolExecutor 初始延遲 1 秒後才會開始執行。

	protected class CallWorker implements Callable<String> {

		private long start;
		private int id;
		private String result;

		public CallWorker(long start, int id) {
			this.start = start;
			this.id = id;
		}

		public int getId() {
			return id;
		}

		@Override
		public String call() throws Exception {
			SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
			System.out.println(String.format("%s T[%d] %d worker: %d ready", sdf.format(new Date()),
					Thread.currentThread().getId(), System.currentTimeMillis() - start, id));
			TimeUnit.SECONDS.sleep(1);
			System.out.println(String.format("%s T[%d] %d worker: %d finished", sdf.format(new Date()),
					Thread.currentThread().getId(), System.currentTimeMillis() - start, id));

			result = id + " OK";
			return result;
		}
	}

	@Test
	public void scheduleCallable() {
		int poolSize = 1;
		ScheduledThreadPoolExecutor executorService = new ScheduledThreadPoolExecutor(poolSize);
		int taskSize = 3;
		SimpleDateFormat df = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
		System.out.println(String.format("%s T[%d] init", df.format(new Date()), Thread.currentThread().getId()));
		long start = System.currentTimeMillis();
		List<ScheduledFuture<String>> futures = new ArrayList<>();
		IntStream.range(0, taskSize).forEach(e -> {
			ScheduledFuture<String> future = executorService.schedule(new CallWorker(start, e), 1, TimeUnit.SECONDS);
			futures.add(future);
		});

		try {
			executorService.awaitTermination(10, TimeUnit.SECONDS);
		} catch (InterruptedException ex) {
			ex.printStackTrace();
		}
	}
2023/03/17 17:49:45 T[1] init
2023/03/17 17:49:46 T[11] 1103 worker: 0 ready
2023/03/17 17:49:47 T[11] 2113 worker: 0 finished
2023/03/17 17:49:47 T[11] 2115 worker: 1 ready
2023/03/17 17:49:48 T[11] 3119 worker: 1 finished
2023/03/17 17:49:48 T[11] 3120 worker: 2 ready
2023/03/17 17:49:49 T[11] 4124 worker: 2 finished

scheduleAtFixedRate

ScheduledThreadPoolExecutor Java Examples 建立一個執行緒池,固定數量 1 條執行緒,執行 3 個任務,每個任務耗時 1 秒完成, Java ScheduledThreadPoolExecutor 初始延遲 1 秒後才會開始執行,每個週期 1 秒,固定週期執行任務,當任務的執行時間大於週期時,下一個週期任務將在上一個執行完畢後馬上執行,延遲的任務會補上執行。

	@Test
	public void scheduleAtFixedRate() {
		int poolSize = 1;
		ScheduledThreadPoolExecutor executorService = new ScheduledThreadPoolExecutor(poolSize);
		int taskSize = 3;
		SimpleDateFormat df = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
		System.out.println(String.format("%s T[%d] init", df.format(new Date()), Thread.currentThread().getId()));
		long start = System.currentTimeMillis();
		List<ScheduledFuture<?>> futures = new ArrayList<>();
		IntStream.range(0, taskSize).forEach(e -> {
			ScheduledFuture<?> future = executorService.scheduleAtFixedRate(new RunWorker(start, e), 1, 1,
					TimeUnit.SECONDS);
			futures.add(future);
		});

		try {
			executorService.awaitTermination(10, TimeUnit.SECONDS);
		} catch (InterruptedException ex) {
			ex.printStackTrace();
		}
2023/03/17 17:51:03 T[1] init
2023/03/17 17:51:04 T[11] 1096 worker: 0 ready
2023/03/17 17:51:05 T[11] 2099 worker: 0 finished
2023/03/17 17:51:05 T[11] 2101 worker: 1 ready
2023/03/17 17:51:06 T[11] 3107 worker: 1 finished
2023/03/17 17:51:06 T[11] 3108 worker: 2 ready
2023/03/17 17:51:07 T[11] 4112 worker: 2 finished
2023/03/17 17:51:07 T[11] 4114 worker: 0 ready
2023/03/17 17:51:08 T[11] 5115 worker: 0 finished
2023/03/17 17:51:08 T[11] 5116 worker: 1 ready
2023/03/17 17:51:09 T[11] 6121 worker: 1 finished
2023/03/17 17:51:09 T[11] 6124 worker: 2 ready
2023/03/17 17:51:10 T[11] 7132 worker: 2 finished
2023/03/17 17:51:10 T[11] 7136 worker: 0 ready
2023/03/17 17:51:11 T[11] 8141 worker: 0 finished
2023/03/17 17:51:11 T[11] 8143 worker: 1 ready

scheduleWithFixedDelay

ScheduledThreadPoolExecutor Java Examples 建立一個執行緒池,固定數量 1 條執行緒,執行 3 個任務,每個任務耗時 1 秒完成, Java ScheduledThreadPoolExecutor 初始延遲 1 秒後才會開始執行,固定延遲執行任務,無論任務的執行時間是否大於週期時,固定延遲 3 秒才會進入下一個週期,延遲的任務不會執行。

	@Test
	public void scheduleWithFixedDelay() {
		int poolSize = 1;
		ScheduledThreadPoolExecutor executorService = new ScheduledThreadPoolExecutor(poolSize);
		int taskSize = 3;
		SimpleDateFormat df = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
		System.out.println(String.format("%s T[%d] init", df.format(new Date()), Thread.currentThread().getId()));
		long start = System.currentTimeMillis();
		List<ScheduledFuture<?>> futures = new ArrayList<>();
		IntStream.range(0, taskSize).forEach(e -> {
			ScheduledFuture<?> future = executorService.scheduleWithFixedDelay(new RunWorker(start, e), 1, 3,
					TimeUnit.SECONDS);
			futures.add(future);
		});

		try {
			executorService.awaitTermination(10, TimeUnit.SECONDS);
		} catch (InterruptedException ex) {
			ex.printStackTrace();
		}
	}
2023/03/17 17:52:01 T[1] init
2023/03/17 17:52:02 T[11] 1093 worker: 0 ready
2023/03/17 17:52:03 T[11] 2104 worker: 0 finished
2023/03/17 17:52:03 T[11] 2106 worker: 1 ready
2023/03/17 17:52:04 T[11] 3109 worker: 1 finished
2023/03/17 17:52:04 T[11] 3111 worker: 2 ready
2023/03/17 17:52:05 T[11] 4117 worker: 2 finished
2023/03/17 17:52:06 T[11] 5111 worker: 0 ready
2023/03/17 17:52:07 T[11] 6117 worker: 0 finished
2023/03/17 17:52:07 T[11] 6118 worker: 1 ready
2023/03/17 17:52:08 T[11] 7124 worker: 1 finished
2023/03/17 17:52:08 T[11] 7128 worker: 2 ready

ScheduledThreadPoolExecutorWithExamplesTest.java

Java ScheduledThreadPoolExecutor 新增單元測試,驗證 Scheduled Thread Pool Executor 是否符合預期。

package org.ruoxue.java_147.multithreading.scheduledthreadpoolexecutor;

import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;

import org.junit.Test;

public class ScheduledThreadPoolExecutorWithExamplesTest {

	protected class RunWorker implements Runnable {

		private long start;
		private int id;

		public RunWorker(long start, int id) {
			this.start = start;
			this.id = id;
		}

		@Override
		public void run() {
			try {
				SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
				System.out.println(String.format("%s T[%d] %d worker: %d ready", sdf.format(new Date()),
						Thread.currentThread().getId(), System.currentTimeMillis() - start, id));
				TimeUnit.SECONDS.sleep(1);
				System.out.println(String.format("%s T[%d] %d worker: %d finished", sdf.format(new Date()),
						Thread.currentThread().getId(), System.currentTimeMillis() - start, id));
			} catch (InterruptedException ex) {
				ex.printStackTrace();
			}
		}
	}

	@Test
	public void scheduleRunnable() {
		int poolSize = 1;
		ScheduledThreadPoolExecutor executorService = new ScheduledThreadPoolExecutor(poolSize);
		int taskSize = 3;
		SimpleDateFormat df = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
		System.out.println(String.format("%s T[%d] init", df.format(new Date()), Thread.currentThread().getId()));
		long start = System.currentTimeMillis();
		List<ScheduledFuture<?>> futures = new ArrayList<>();
		IntStream.range(0, taskSize).forEach(e -> {
			ScheduledFuture<?> future = executorService.schedule(new RunWorker(start, e), 1, TimeUnit.SECONDS);
			futures.add(future);
		});

		try {
			executorService.awaitTermination(10, TimeUnit.SECONDS);
		} catch (InterruptedException ex) {
			ex.printStackTrace();
		}
	}

	protected class CallWorker implements Callable<String> {

		private long start;
		private int id;
		private String result;

		public CallWorker(long start, int id) {
			this.start = start;
			this.id = id;
		}

		public int getId() {
			return id;
		}

		@Override
		public String call() throws Exception {
			SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
			System.out.println(String.format("%s T[%d] %d worker: %d ready", sdf.format(new Date()),
					Thread.currentThread().getId(), System.currentTimeMillis() - start, id));
			TimeUnit.SECONDS.sleep(1);
			System.out.println(String.format("%s T[%d] %d worker: %d finished", sdf.format(new Date()),
					Thread.currentThread().getId(), System.currentTimeMillis() - start, id));

			result = id + " OK";
			return result;
		}
	}

	@Test
	public void scheduleCallable() {
		int poolSize = 1;
		ScheduledThreadPoolExecutor executorService = new ScheduledThreadPoolExecutor(poolSize);
		int taskSize = 3;
		SimpleDateFormat df = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
		System.out.println(String.format("%s T[%d] init", df.format(new Date()), Thread.currentThread().getId()));
		long start = System.currentTimeMillis();
		List<ScheduledFuture<String>> futures = new ArrayList<>();
		IntStream.range(0, taskSize).forEach(e -> {
			ScheduledFuture<String> future = executorService.schedule(new CallWorker(start, e), 1, TimeUnit.SECONDS);
			futures.add(future);
		});

		try {
			executorService.awaitTermination(10, TimeUnit.SECONDS);
		} catch (InterruptedException ex) {
			ex.printStackTrace();
		}
	}

	@Test
	public void scheduleAtFixedRate() {
		int poolSize = 1;
		ScheduledThreadPoolExecutor executorService = new ScheduledThreadPoolExecutor(poolSize);
		int taskSize = 3;
		SimpleDateFormat df = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
		System.out.println(String.format("%s T[%d] init", df.format(new Date()), Thread.currentThread().getId()));
		long start = System.currentTimeMillis();
		List<ScheduledFuture<?>> futures = new ArrayList<>();
		IntStream.range(0, taskSize).forEach(e -> {
			ScheduledFuture<?> future = executorService.scheduleAtFixedRate(new RunWorker(start, e), 1, 1,
					TimeUnit.SECONDS);
			futures.add(future);
		});

		try {
			executorService.awaitTermination(10, TimeUnit.SECONDS);
		} catch (InterruptedException ex) {
			ex.printStackTrace();
		}
	}

	@Test
	public void scheduleWithFixedDelay() {
		int poolSize = 1;
		ScheduledThreadPoolExecutor executorService = new ScheduledThreadPoolExecutor(poolSize);
		int taskSize = 3;
		SimpleDateFormat df = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
		System.out.println(String.format("%s T[%d] init", df.format(new Date()), Thread.currentThread().getId()));
		long start = System.currentTimeMillis();
		List<ScheduledFuture<?>> futures = new ArrayList<>();
		IntStream.range(0, taskSize).forEach(e -> {
			ScheduledFuture<?> future = executorService.scheduleWithFixedDelay(new RunWorker(start, e), 1, 3,
					TimeUnit.SECONDS);
			futures.add(future);
		});

		try {
			executorService.awaitTermination(10, TimeUnit.SECONDS);
		} catch (InterruptedException ex) {
			ex.printStackTrace();
		}
	}
}

心得分享

Scheduled Thread Pool Executor 固定週期或固定延遲執行任務,提交到執行緒池中執行,當有空閒的執行緒時,將會從佇列中取出任務執行,然後會傳回 ScheduledFuture 物件,用來判斷任務是否完成、取消任務、取得結果等, Java ScheduledThreadPoolExecutor 提供了幾種常見方法的操作範例。

發佈留言