Java ScheduledThreadPoolExecutor Class - Java 147

Java ScheduledThreadPoolExecutor Class – Java 147

Java ScheduledThreadPoolExecutor Class

固定週期或固定延遲執行任務,提交到執行緒池中執行,當有空閑的執行緒時,將會從佇列中取出任務執行,任務在不同週期內執行它的執行緒可能是不同的,傳回 ScheduledFuture 物件,用來判斷任務是否完成、取消任務、取得結果等, ScheduledThreadPoolExecutor Class 本篇增加了範例,並透過單元測試來驗證產出結果。

檔案目錄

./
   +- src
       +- test
       |   +- org
       |       +- ruoxue
       |           +- java_147
       |               +- multithreading
       |                   +- scheduledthreadpoolexecutor
       |                       +- ScheduledThreadPoolExecutorClassTest.java   

單元測試

ScheduledThreadPoolExecutor Class Java 提供週期與延遲執行任務等,交由執行緒池中的執行緒執行。

cancelFalse

ScheduledThreadPoolExecutor Class Java 建立一個執行緒池,固定數量 1 條執行緒,執行 1 個任務,每個任務耗時 1 秒完成, Class ScheduledThreadPoolExecutor Java 初始延遲 1 秒後才會開始執行,當計數器到 3 時,主執行緒發送 cancel 取消任務執行。

	protected class CancelFalseWorker implements Runnable {

		private AtomicInteger counter;
		private long start;

		public CancelFalseWorker(AtomicInteger counter, long start) {
			this.counter = counter;
			this.start = start;
		}

		@Override
		public void run() {
			try {
				SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
				System.out.println(String.format("%s T[%d] %d worker ready", sdf.format(new Date()),
						Thread.currentThread().getId(), System.currentTimeMillis() - start));
				counter.getAndIncrement();
				System.out.println(String.format("%s T[%d] %d worker finished, counter: %d", sdf.format(new Date()),
						Thread.currentThread().getId(), System.currentTimeMillis() - start, counter.get()));
			} catch (Exception ex) {
				ex.printStackTrace();
			}
		}
	}

	@Test
	public void cancelFalseWorker() {
		int poolSize = 1;
		ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(poolSize);
		SimpleDateFormat df = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
		System.out.println(String.format("%s T[%d] init", df.format(new Date()), Thread.currentThread().getId()));
		long start = System.currentTimeMillis();
		AtomicInteger counter = new AtomicInteger();
		ScheduledFuture<?> future = executorService.scheduleAtFixedRate(new CancelFalseWorker(counter, start), 1, 1,
				TimeUnit.SECONDS);

		while (false == future.isDone()) {
			try {
				TimeUnit.SECONDS.sleep(1);
			} catch (InterruptedException ex) {
				ex.printStackTrace();
			}
			if (counter.get() == 3) {
				boolean cancel = future.cancel(false);
				System.out.println(String.format("%s T[%d] %d worker cancel: %b, done: %b", df.format(new Date()),
						Thread.currentThread().getId(), System.currentTimeMillis() - start, cancel, future.isDone()));
			}
		}

		try {
			executorService.awaitTermination(10, TimeUnit.SECONDS);
		} catch (InterruptedException ex) {
			ex.printStackTrace();
		}
	}
2023/03/18 19:06:41 T[1] init
2023/03/18 19:06:42 T[11] 1004 worker ready
2023/03/18 19:06:42 T[11] 1004 worker finished, counter: 1
2023/03/18 19:06:43 T[11] 2002 worker ready
2023/03/18 19:06:43 T[11] 2002 worker finished, counter: 2
2023/03/18 19:06:44 T[11] 3002 worker ready
2023/03/18 19:06:44 T[11] 3002 worker finished, counter: 3
2023/03/18 19:06:44 T[1] 3003 worker cancel: true, done: true

cancelTrue

ScheduledThreadPoolExecutor Class Java 建立一個執行緒池,固定數量 1 條執行緒,執行 1 個任務,每個任務耗時 1 秒完成, Class ScheduledThreadPoolExecutor Java 初始延遲 1 秒後才會開始執行,當計數器到 3 時,主執行緒發送 cancel 取消任務執行,任務內必須檢查執行緒是否已被中斷 isInterrupted ,若為真則拋出 InterruptedException 例外,中斷任務。

	protected class CancelTrueWorker implements Runnable {

		private AtomicInteger counter;
		private long start;

		public CancelTrueWorker(AtomicInteger counter, long start) {
			this.counter = counter;
			this.start = start;
		}

		@Override
		public void run() {
			try {
				SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
				System.out.println(String.format("%s T[%d] %d worker ready", sdf.format(new Date()),
						Thread.currentThread().getId(), System.currentTimeMillis() - start));
				counter.getAndIncrement();
				TimeUnit.SECONDS.sleep(1);
				System.out.println(String.format("%s T[%d] %d worker finished, counter: %d", sdf.format(new Date()),
						Thread.currentThread().getId(), System.currentTimeMillis() - start, counter.get()));
			} catch (InterruptedException ex) {
				ex.printStackTrace();
				Thread.currentThread().interrupt();
			}
		}
	}

	@Test
	public void cancelTrueWorker() {
		int poolSize = 1;
		ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(poolSize);
		SimpleDateFormat df = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
		System.out.println(String.format("%s T[%d] init", df.format(new Date()), Thread.currentThread().getId()));
		long start = System.currentTimeMillis();
		AtomicInteger counter = new AtomicInteger();
		ScheduledFuture<?> future = executorService.scheduleAtFixedRate(new CancelTrueWorker(counter, start), 1, 1,
				TimeUnit.SECONDS);

		try {
			TimeUnit.SECONDS.sleep(4);
		} catch (InterruptedException ex) {
			ex.printStackTrace();
		}

		if (false == future.isDone()) {
			boolean cancel = future.cancel(true);
			System.out.println(String.format("%s T[%d] %d worker cancel: %b, done: %b", df.format(new Date()),
					Thread.currentThread().getId(), System.currentTimeMillis() - start, cancel, future.isDone()));
		}

		try {
			executorService.awaitTermination(10, TimeUnit.SECONDS);
		} catch (InterruptedException ex) {
			ex.printStackTrace();
		}
	}
2023/03/18 19:10:59 T[1] init
2023/03/18 19:11:00 T[14] 1001 worker ready
2023/03/18 19:11:01 T[14] 2002 worker finished, counter: 1
2023/03/18 19:11:01 T[14] 2002 worker ready
2023/03/18 19:11:02 T[14] 3002 worker finished, counter: 2
2023/03/18 19:11:02 T[14] 3002 worker ready
2023/03/18 19:11:03 T[1] 4001 worker cancel: true, done: true
java.lang.InterruptedException: sleep interrupted
	at java.lang.Thread.sleep(Native Method)
	at java.lang.Thread.sleep(Thread.java:340)
	at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:386)
	at org.ruoxue.java_147.multithreading.ScheduledThreadPoolExecutorClassTest$CancelTrueWorker.run(ScheduledThreadPoolExecutorClassTest.java:88)

noCatchUp

ScheduledThreadPoolExecutor Class Java 建立一個執行緒池,固定數量 1 條執行緒,執行 1 個任務,當計數器為 1 時,任務耗時 3 秒完成, Class ScheduledThreadPoolExecutor Java 初始延遲 1 秒後才會開始執行,固定延遲執行任務,當任務完成時,延遲 1 秒後才會進入下一個週期,下個任務會立即執行,延遲的任務不會執行。

	protected class NoCatchUpWorker implements Runnable {

		private AtomicInteger counter;
		private long start;

		public NoCatchUpWorker(AtomicInteger counter, long start) {
			this.counter = counter;
			this.start = start;
		}

		@Override
		public void run() {
			try {
				SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
				System.out.println(String.format("%s T[%d] %d worker ready", sdf.format(new Date()),
						Thread.currentThread().getId(), System.currentTimeMillis() - start));
				if (counter.get() == 0) {
					TimeUnit.SECONDS.sleep(3);
				}
				counter.getAndIncrement();
				System.out.println(String.format("%s T[%d] %d worker finished", sdf.format(new Date()),
						Thread.currentThread().getId(), System.currentTimeMillis() - start));
			} catch (InterruptedException ex) {
				ex.printStackTrace();
			}
		}
	}

	@Test
	public void noCatchUpWorker() {
		int poolSize = 1;
		ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(poolSize);
		SimpleDateFormat df = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
		System.out.println(String.format("%s T[%d] init", df.format(new Date()), Thread.currentThread().getId()));
		long start = System.currentTimeMillis();
		AtomicInteger counter = new AtomicInteger();
		ScheduledFuture<?> future = executorService.scheduleWithFixedDelay(new NoCatchUpWorker(counter, start), 1, 1,
				TimeUnit.SECONDS);

		try {
			executorService.awaitTermination(10, TimeUnit.SECONDS);
		} catch (InterruptedException ex) {
			ex.printStackTrace();
		}
	}
2023/03/18 19:14:19 T[1] init
2023/03/18 19:14:20 T[11] 1002 worker ready
2023/03/18 19:14:23 T[11] 4003 worker finished
2023/03/18 19:14:24 T[11] 5005 worker ready
2023/03/18 19:14:24 T[11] 5005 worker finished
2023/03/18 19:14:25 T[11] 6006 worker ready
2023/03/18 19:14:25 T[11] 6006 worker finished
2023/03/18 19:14:26 T[11] 7007 worker ready
2023/03/18 19:14:26 T[11] 7007 worker finished
2023/03/18 19:14:27 T[11] 8008 worker ready
2023/03/18 19:14:27 T[11] 8008 worker finished

catchUp

ScheduledThreadPoolExecutor Class Java 建立一個執行緒池,固定數量 1 條執行緒,執行 1 個任務,當計數器為 1 時,任務耗時 3 秒完成, Class ScheduledThreadPoolExecutor Java 初始延遲 1 秒後才會開始執行,每個週期 1 秒,當任務完成時,下個任務會立即執行,延遲的任務會補上執行。

	protected class CatchUpWorker implements Runnable {

		private AtomicInteger counter;
		private long start;

		public CatchUpWorker(AtomicInteger counter, long start) {
			this.counter = counter;
			this.start = start;
		}

		@Override
		public void run() {
			try {
				SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
				System.out.println(String.format("%s T[%d] %d worker ready", sdf.format(new Date()),
						Thread.currentThread().getId(), System.currentTimeMillis() - start));
				if (counter.get() == 0) {
					TimeUnit.SECONDS.sleep(3);
				}
				counter.getAndIncrement();
				System.out.println(String.format("%s T[%d] %d worker finished", sdf.format(new Date()),
						Thread.currentThread().getId(), System.currentTimeMillis() - start));
			} catch (InterruptedException ex) {
				ex.printStackTrace();
			}
		}
	}

	@Test
	public void catchUpWorker() {
		int poolSize = 1;
		ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(poolSize);
		SimpleDateFormat df = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
		System.out.println(String.format("%s T[%d] init", df.format(new Date()), Thread.currentThread().getId()));
		long start = System.currentTimeMillis();
		AtomicInteger counter = new AtomicInteger();
		ScheduledFuture<?> future = executorService.scheduleAtFixedRate(new CatchUpWorker(counter, start), 1, 1,
				TimeUnit.SECONDS);

		try {
			executorService.awaitTermination(10, TimeUnit.SECONDS);
		} catch (InterruptedException ex) {
			ex.printStackTrace();
		}
	}
2023/03/18 19:21:01 T[1] init
2023/03/18 19:21:02 T[11] 1002 worker ready
2023/03/18 19:21:05 T[11] 4002 worker finished
2023/03/18 19:21:05 T[11] 4002 worker ready
2023/03/18 19:21:05 T[11] 4002 worker finished
2023/03/18 19:21:05 T[11] 4002 worker ready
2023/03/18 19:21:05 T[11] 4003 worker finished
2023/03/18 19:21:05 T[11] 4003 worker ready
2023/03/18 19:21:05 T[11] 4003 worker finished
2023/03/18 19:21:06 T[11] 5003 worker ready
2023/03/18 19:21:06 T[11] 5003 worker finished
2023/03/18 19:21:07 T[11] 6002 worker ready
2023/03/18 19:21:07 T[11] 6002 worker finished
2023/03/18 19:21:08 T[11] 7002 worker ready
2023/03/18 19:21:08 T[11] 7002 worker finished
2023/03/18 19:21:09 T[11] 8001 worker ready
2023/03/18 19:21:09 T[11] 8001 worker finished

ScheduledThreadPoolExecutorClassTest.java

Class ScheduledThreadPoolExecutor Java 新增單元測試,驗證 Java ScheduledThreadPoolExecutor Example 是否符合預期。

package org.ruoxue.java_147.multithreading.scheduledthreadpoolexecutor;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.Test;

public class ScheduledThreadPoolExecutorClassTest {

	protected class CancelFalseWorker implements Runnable {

		private AtomicInteger counter;
		private long start;

		public CancelFalseWorker(AtomicInteger counter, long start) {
			this.counter = counter;
			this.start = start;
		}

		@Override
		public void run() {
			try {
				SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
				System.out.println(String.format("%s T[%d] %d worker ready", sdf.format(new Date()),
						Thread.currentThread().getId(), System.currentTimeMillis() - start));
				counter.getAndIncrement();
				System.out.println(String.format("%s T[%d] %d worker finished, counter: %d", sdf.format(new Date()),
						Thread.currentThread().getId(), System.currentTimeMillis() - start, counter.get()));
			} catch (Exception ex) {
				ex.printStackTrace();
			}
		}
	}

	@Test
	public void cancelFalseWorker() {
		int poolSize = 1;
		ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(poolSize);
		SimpleDateFormat df = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
		System.out.println(String.format("%s T[%d] init", df.format(new Date()), Thread.currentThread().getId()));
		long start = System.currentTimeMillis();
		AtomicInteger counter = new AtomicInteger();
		ScheduledFuture<?> future = executorService.scheduleAtFixedRate(new CancelFalseWorker(counter, start), 1, 1,
				TimeUnit.SECONDS);

		while (false == future.isDone()) {
			try {
				TimeUnit.SECONDS.sleep(1);
			} catch (InterruptedException ex) {
				ex.printStackTrace();
			}
			if (counter.get() == 3) {
				boolean cancel = future.cancel(false);
				System.out.println(String.format("%s T[%d] %d worker cancel: %b, done: %b", df.format(new Date()),
						Thread.currentThread().getId(), System.currentTimeMillis() - start, cancel, future.isDone()));
			}
		}

		try {
			executorService.awaitTermination(10, TimeUnit.SECONDS);
		} catch (InterruptedException ex) {
			ex.printStackTrace();
		}
	}

	protected class CancelTrueWorker implements Runnable {

		private AtomicInteger counter;
		private long start;

		public CancelTrueWorker(AtomicInteger counter, long start) {
			this.counter = counter;
			this.start = start;
		}

		@Override
		public void run() {
			try {
				SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
				System.out.println(String.format("%s T[%d] %d worker ready", sdf.format(new Date()),
						Thread.currentThread().getId(), System.currentTimeMillis() - start));
				counter.getAndIncrement();
				TimeUnit.SECONDS.sleep(1);
				System.out.println(String.format("%s T[%d] %d worker finished, counter: %d", sdf.format(new Date()),
						Thread.currentThread().getId(), System.currentTimeMillis() - start, counter.get()));
			} catch (InterruptedException ex) {
				ex.printStackTrace();
				Thread.currentThread().interrupt();
			}
		}
	}

	@Test
	public void cancelTrueWorker() {
		int poolSize = 1;
		ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(poolSize);
		SimpleDateFormat df = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
		System.out.println(String.format("%s T[%d] init", df.format(new Date()), Thread.currentThread().getId()));
		long start = System.currentTimeMillis();
		AtomicInteger counter = new AtomicInteger();
		ScheduledFuture<?> future = executorService.scheduleAtFixedRate(new CancelTrueWorker(counter, start), 1, 1,
				TimeUnit.SECONDS);

		try {
			TimeUnit.SECONDS.sleep(4);
		} catch (InterruptedException ex) {
			ex.printStackTrace();
		}

		if (false == future.isDone()) {
			boolean cancel = future.cancel(true);
			System.out.println(String.format("%s T[%d] %d worker cancel: %b, done: %b", df.format(new Date()),
					Thread.currentThread().getId(), System.currentTimeMillis() - start, cancel, future.isDone()));
		}

		try {
			executorService.awaitTermination(10, TimeUnit.SECONDS);
		} catch (InterruptedException ex) {
			ex.printStackTrace();
		}
	}

	protected class NoCatchUpWorker implements Runnable {

		private AtomicInteger counter;
		private long start;

		public NoCatchUpWorker(AtomicInteger counter, long start) {
			this.counter = counter;
			this.start = start;
		}

		@Override
		public void run() {
			try {
				SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
				System.out.println(String.format("%s T[%d] %d worker ready", sdf.format(new Date()),
						Thread.currentThread().getId(), System.currentTimeMillis() - start));
				if (counter.get() == 0) {
					TimeUnit.SECONDS.sleep(3);
				}
				counter.getAndIncrement();
				System.out.println(String.format("%s T[%d] %d worker finished", sdf.format(new Date()),
						Thread.currentThread().getId(), System.currentTimeMillis() - start));
			} catch (InterruptedException ex) {
				ex.printStackTrace();
			}
		}
	}

	@Test
	public void noCatchUpWorker() {
		int poolSize = 1;
		ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(poolSize);
		SimpleDateFormat df = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
		System.out.println(String.format("%s T[%d] init", df.format(new Date()), Thread.currentThread().getId()));
		long start = System.currentTimeMillis();
		AtomicInteger counter = new AtomicInteger();
		ScheduledFuture<?> future = executorService.scheduleWithFixedDelay(new NoCatchUpWorker(counter, start), 1, 1,
				TimeUnit.SECONDS);

		try {
			executorService.awaitTermination(10, TimeUnit.SECONDS);
		} catch (InterruptedException ex) {
			ex.printStackTrace();
		}
	}

	protected class CatchUpWorker implements Runnable {

		private AtomicInteger counter;
		private long start;

		public CatchUpWorker(AtomicInteger counter, long start) {
			this.counter = counter;
			this.start = start;
		}

		@Override
		public void run() {
			try {
				SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
				System.out.println(String.format("%s T[%d] %d worker ready", sdf.format(new Date()),
						Thread.currentThread().getId(), System.currentTimeMillis() - start));
				if (counter.get() == 0) {
					TimeUnit.SECONDS.sleep(3);
				}
				counter.getAndIncrement();
				System.out.println(String.format("%s T[%d] %d worker finished", sdf.format(new Date()),
						Thread.currentThread().getId(), System.currentTimeMillis() - start));
			} catch (InterruptedException ex) {
				ex.printStackTrace();
			}
		}
	}

	@Test
	public void catchUpWorker() {
		int poolSize = 1;
		ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(poolSize);
		SimpleDateFormat df = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
		System.out.println(String.format("%s T[%d] init", df.format(new Date()), Thread.currentThread().getId()));
		long start = System.currentTimeMillis();
		AtomicInteger counter = new AtomicInteger();
		ScheduledFuture<?> future = executorService.scheduleAtFixedRate(new CatchUpWorker(counter, start), 1, 1,
				TimeUnit.SECONDS);

		try {
			executorService.awaitTermination(10, TimeUnit.SECONDS);
		} catch (InterruptedException ex) {
			ex.printStackTrace();
		}
	}
}

心得分享

Java ScheduledThreadPoolExecutor Example 繼承 ThreadPoolExecutor 實作 ExecutorService 的功能實現週期與延遲執行任務,定期排程的工作,交由執行緒池中的執行緒執行,由池中空閒的執行緒從任務佇列取出任務執行, Class ScheduledThreadPoolExecutor Java 提供了幾種常見方法的操作範例。

發佈留言