ThreadPool in Java with Examples - Java 147

ThreadPool in Java with Examples – Java 147

ThreadPool in Java with Examples

執行緒池表示 1 組正在等待任務並被多次重用的工作執行緒,對於執行緒池,通常會建立一組固定大小的執行緒,ThreadPoolExecutor 從執行緒池中取出 1 條執行緒,並由服務提供者分配一項任務,當任務完成後,執行緒又會放回執行緒池中, Java Thread Pools 本篇增加了範例,並透過單元測試來驗證產出結果。

檔案目錄

./
   +- src
       +- test
       |   +- org
       |       +- ruoxue
       |           +- java_147
       |               +- multithreading
       |                   +- executorservice
       |                       +- ThreadPoolWithExamplesTest.java   

單元測試

ThreadPoolExecutor 提供單執行緒池、固定數量執行緒池、無限制數量執行緒池、排程執行緒池等,規劃不同類型的任務,按照實際需求 ThreadPoolExecutor 可以使用不同的執行緒池。

newSingleThreadExecutor

ThreadPool Java 建立一個單執行緒池,只有 1 條執行緒,執行 3 個任務,每個任務耗時 3 秒完成。

	@Test
	public void newSingleThreadExecutor() {
		ExecutorService executorService = Executors.newSingleThreadExecutor();
		int taskSize = 3;
		CountDownLatch latch = new CountDownLatch(taskSize);
		try {
			IntStream.range(0, taskSize).forEach(e -> {
				executorService.execute(() -> {
					SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd hh:mm:ss");
					try {
						System.out.println(sdf.format(new Date()) + " T[" + Thread.currentThread().getId() + "] task: "
								+ e + " ready");
						TimeUnit.SECONDS.sleep(3);
						System.out.println(sdf.format(new Date()) + " T[" + Thread.currentThread().getId() + "] task: "
								+ e + " finished");
					} catch (InterruptedException ex) {
						ex.printStackTrace();
					} finally {
						latch.countDown();
					}
				});
			});
			latch.await();
		} catch (Exception ex) {
			ex.printStackTrace();
		}
	}
2023/02/04 03:07:51 T[11] task: 0 ready
2023/02/04 03:07:54 T[11] task: 0 finished
2023/02/04 03:07:54 T[11] task: 1 ready
2023/02/04 03:07:57 T[11] task: 1 finished
2023/02/04 03:07:57 T[11] task: 2 ready
2023/02/04 03:08:00 T[11] task: 2 finished

newFixedThreadPool

ThreadPool Java 建立一個執行緒池,固定數量 2 條執行緒,執行 3 個任務,每個任務耗時 3 秒完成。

	@Test
	public void newFixedThreadPool() {
		int poolSize = 2;
		ExecutorService executorService = Executors.newFixedThreadPool(poolSize);
		int taskSize = 3;
		CountDownLatch latch = new CountDownLatch(taskSize);
		try {
			IntStream.range(0, taskSize).forEach(e -> {
				executorService.execute(() -> {
					SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd hh:mm:ss");
					try {
						System.out.println(sdf.format(new Date()) + " T[" + Thread.currentThread().getId() + "] task: "
								+ e + " ready");
						TimeUnit.SECONDS.sleep(3);
						System.out.println(sdf.format(new Date()) + " T[" + Thread.currentThread().getId() + "] task: "
								+ e + " finished");
					} catch (InterruptedException ex) {
						ex.printStackTrace();
					} finally {
						latch.countDown();
					}
				});
			});
			latch.await();
		} catch (Exception ex) {
			ex.printStackTrace();
		}
	}
2023/02/04 03:11:53 T[12] task: 1 ready
2023/02/04 03:11:53 T[11] task: 0 ready
2023/02/04 03:11:56 T[11] task: 0 finished
2023/02/04 03:11:56 T[12] task: 1 finished
2023/02/04 03:11:56 T[12] task: 2 ready
2023/02/04 03:11:59 T[12] task: 2 finished

newCachedThreadPool

ThreadPool Java 建立一個執行緒池,無限制數量執行緒,執行 3 個任務,每個任務耗時 3 秒完成。

	@Test
	public void newCachedThreadPool() {
		ExecutorService executorService = Executors.newCachedThreadPool();
		int taskSize = 3;
		CountDownLatch latch = new CountDownLatch(taskSize);
		try {
			IntStream.range(0, taskSize).forEach(e -> {
				executorService.execute(() -> {
					SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd hh:mm:ss");
					try {
						System.out.println(sdf.format(new Date()) + " T[" + Thread.currentThread().getId() + "] task: "
								+ e + " ready");
						TimeUnit.SECONDS.sleep(3);
						System.out.println(sdf.format(new Date()) + " T[" + Thread.currentThread().getId() + "] task: "
								+ e + " finished");
					} catch (InterruptedException ex) {
						ex.printStackTrace();
					} finally {
						latch.countDown();
					}
				});
			});
			latch.await();
		} catch (Exception ex) {
			ex.printStackTrace();
		}
	}
2023/02/04 03:12:41 T[11] task: 0 ready
2023/02/04 03:12:41 T[13] task: 2 ready
2023/02/04 03:12:41 T[12] task: 1 ready
2023/02/04 03:12:44 T[11] task: 0 finished
2023/02/04 03:12:44 T[13] task: 2 finished
2023/02/04 03:12:44 T[12] task: 1 finished

newScheduledThreadPool

Thread Pools in Java 建立一個定期排程執行緒池,固定數量為 2 條執行緒,初始後延遲 5 秒執行,然後每隔 5 秒執行 1 個任務,每個任務耗時 3 秒完成,總共 3 個任務。

	@Test
	public void newScheduledThreadPool() {
		int poolSize = 2;
		ScheduledExecutorService executorService = Executors.newScheduledThreadPool(poolSize);
		int taskSize = 3;
		CountDownLatch latch = new CountDownLatch(5);
		SimpleDateFormat df = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
		System.out.println(df.format(new Date()) + " T[" + Thread.currentThread().getId() + "] init");
		try {
			IntStream.range(0, taskSize).forEach(e -> {
				executorService.scheduleAtFixedRate(() -> {
					SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
					try {
						System.out.println(sdf.format(new Date()) + " T[" + Thread.currentThread().getId() + "] task: "
								+ e + " ready");
						TimeUnit.SECONDS.sleep(3);
						System.out.println(sdf.format(new Date()) + " T[" + Thread.currentThread().getId() + "] task: "
								+ e + " finished");
					} catch (InterruptedException ex) {
						ex.printStackTrace();
					} finally {
						latch.countDown();
					}
				}, 5_000, 5_000, TimeUnit.MILLISECONDS);
			});
			latch.await();
		} catch (Exception ex) {
			ex.printStackTrace();
		}
	}
2023/02/04 03:13:11 T[1] init
2023/02/04 03:13:16 T[12] task: 1 ready
2023/02/04 03:13:16 T[11] task: 0 ready
2023/02/04 03:13:19 T[12] task: 1 finished
2023/02/04 03:13:19 T[11] task: 0 finished
2023/02/04 03:13:19 T[12] task: 2 ready
2023/02/04 03:13:21 T[11] task: 0 ready
2023/02/04 03:13:22 T[12] task: 2 finished
2023/02/04 03:13:22 T[12] task: 1 ready
2023/02/04 03:13:24 T[11] task: 0 finished
2023/02/04 03:13:24 T[11] task: 2 ready
2023/02/04 03:13:25 T[12] task: 1 finished

newSingleThreadScheduledExecutor

Thread Pools in Java 建立一個定期排程單執行緒池,只有 1 條執行緒,延遲 5 秒執行,然後每隔 5 秒執行 1 個任務,每個任務耗時 3 秒完成,總共 3 個任務。

	@Test
	public void newSingleThreadScheduledExecutor() {
		ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
		int taskSize = 3;
		CountDownLatch latch = new CountDownLatch(5);
		SimpleDateFormat df = new SimpleDateFormat("yyyy/MM/dd hh:mm:ss");
		System.out.println(df.format(new Date()) + " T[" + Thread.currentThread().getId() + "] init");
		try {
			IntStream.range(0, taskSize).forEach(e -> {
				executorService.scheduleAtFixedRate(() -> {
					SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd hh:mm:ss");
					try {
						System.out.println(sdf.format(new Date()) + " T[" + Thread.currentThread().getId() + "] task: "
								+ e + " ready");
						TimeUnit.SECONDS.sleep(3);
						System.out.println(sdf.format(new Date()) + " T[" + Thread.currentThread().getId() + "] task: "
								+ e + " finished");
					} catch (InterruptedException ex) {
						ex.printStackTrace();
					} finally {
						latch.countDown();
					}
				}, 5_000, 5_000, TimeUnit.MILLISECONDS);
			});
			latch.await();
		} catch (Exception ex) {
			ex.printStackTrace();
		}
	}
2023/02/04 03:13:45 T[1] init
2023/02/04 03:13:50 T[11] task: 0 ready
2023/02/04 03:13:53 T[11] task: 0 finished
2023/02/04 03:13:53 T[11] task: 1 ready
2023/02/04 03:13:56 T[11] task: 1 finished
2023/02/04 03:13:56 T[11] task: 2 ready
2023/02/04 03:13:59 T[11] task: 2 finished
2023/02/04 03:13:59 T[11] task: 0 ready

ThreadPoolTest.java

Thread Pools in Java 新增單元測試,驗證是否符合預期。

package org.ruoxue.java_147.multithreading.executorservice;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;

import org.junit.Test;

public class ThreadPoolTest {

	@Test
	public void newSingleThreadExecutor() {
		ExecutorService executorService = Executors.newSingleThreadExecutor();
		int taskSize = 3;
		CountDownLatch latch = new CountDownLatch(taskSize);
		try {
			IntStream.range(0, taskSize).forEach(e -> {
				executorService.execute(() -> {
					SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd hh:mm:ss");
					try {
						System.out.println(sdf.format(new Date()) + " T[" + Thread.currentThread().getId() + "] task: "
								+ e + " ready");
						TimeUnit.SECONDS.sleep(3);
						System.out.println(sdf.format(new Date()) + " T[" + Thread.currentThread().getId() + "] task: "
								+ e + " finished");
					} catch (InterruptedException ex) {
						ex.printStackTrace();
					} finally {
						latch.countDown();
					}
				});
			});
			latch.await();
		} catch (Exception ex) {
			ex.printStackTrace();
		}
	}

	@Test
	public void newFixedThreadPool() {
		int poolSize = 2;
		ExecutorService executorService = Executors.newFixedThreadPool(poolSize);
		int taskSize = 3;
		CountDownLatch latch = new CountDownLatch(taskSize);
		try {
			IntStream.range(0, taskSize).forEach(e -> {
				executorService.execute(() -> {
					SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd hh:mm:ss");
					try {
						System.out.println(sdf.format(new Date()) + " T[" + Thread.currentThread().getId() + "] task: "
								+ e + " ready");
						TimeUnit.SECONDS.sleep(3);
						System.out.println(sdf.format(new Date()) + " T[" + Thread.currentThread().getId() + "] task: "
								+ e + " finished");
					} catch (InterruptedException ex) {
						ex.printStackTrace();
					} finally {
						latch.countDown();
					}
				});
			});
			latch.await();
		} catch (Exception ex) {
			ex.printStackTrace();
		}
	}

	@Test
	public void newCachedThreadPool() {
		ExecutorService executorService = Executors.newCachedThreadPool();
		int taskSize = 3;
		CountDownLatch latch = new CountDownLatch(taskSize);
		try {
			IntStream.range(0, taskSize).forEach(e -> {
				executorService.execute(() -> {
					SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd hh:mm:ss");
					try {
						System.out.println(sdf.format(new Date()) + " T[" + Thread.currentThread().getId() + "] task: "
								+ e + " ready");
						TimeUnit.SECONDS.sleep(3);
						System.out.println(sdf.format(new Date()) + " T[" + Thread.currentThread().getId() + "] task: "
								+ e + " finished");
					} catch (InterruptedException ex) {
						ex.printStackTrace();
					} finally {
						latch.countDown();
					}
				});
			});
			latch.await();
		} catch (Exception ex) {
			ex.printStackTrace();
		}
	}

	@Test
	public void newScheduledThreadPool() {
		int poolSize = 2;
		ScheduledExecutorService executorService = Executors.newScheduledThreadPool(poolSize);
		int taskSize = 3;
		CountDownLatch latch = new CountDownLatch(5);
		SimpleDateFormat df = new SimpleDateFormat("yyyy/MM/dd hh:mm:ss");
		System.out.println(df.format(new Date()) + " T[" + Thread.currentThread().getId() + "] init");
		try {
			IntStream.range(0, taskSize).forEach(e -> {
				executorService.scheduleAtFixedRate(() -> {
					SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd hh:mm:ss");
					try {
						System.out.println(sdf.format(new Date()) + " T[" + Thread.currentThread().getId() + "] task: "
								+ e + " ready");
						TimeUnit.SECONDS.sleep(3);
						System.out.println(sdf.format(new Date()) + " T[" + Thread.currentThread().getId() + "] task: "
								+ e + " finished");
					} catch (InterruptedException ex) {
						ex.printStackTrace();
					} finally {
						latch.countDown();
					}
				}, 5_000, 5_000, TimeUnit.MILLISECONDS);
			});
			latch.await();
		} catch (Exception ex) {
			ex.printStackTrace();
		}
	}

	@Test
	public void newSingleThreadScheduledExecutor() {
		ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
		int taskSize = 3;
		CountDownLatch latch = new CountDownLatch(5);
		SimpleDateFormat df = new SimpleDateFormat("yyyy/MM/dd hh:mm:ss");
		System.out.println(df.format(new Date()) + " T[" + Thread.currentThread().getId() + "] init");
		try {
			IntStream.range(0, taskSize).forEach(e -> {
				executorService.scheduleAtFixedRate(() -> {
					SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd hh:mm:ss");
					try {
						System.out.println(sdf.format(new Date()) + " T[" + Thread.currentThread().getId() + "] task: "
								+ e + " ready");
						TimeUnit.SECONDS.sleep(3);
						System.out.println(sdf.format(new Date()) + " T[" + Thread.currentThread().getId() + "] task: "
								+ e + " finished");
					} catch (InterruptedException ex) {
						ex.printStackTrace();
					} finally {
						latch.countDown();
					}
				}, 5_000, 5_000, TimeUnit.MILLISECONDS);
			});
			latch.await();
		} catch (Exception ex) {
			ex.printStackTrace();
		}
	}
}

心得分享

Java Thread Pools 可擴展的執行緒池實現,建立執行緒池,具有許多參數可調整設定,由固定數量的核心執行緒組成,這些執行緒始終保持在內部,Thread Pools in Java 提供了幾種 ThreadPoolExecutor 常見方法的操作範例,當有任務需要執行時,提交到執行緒池執行,重用執行緒,可以大幅減少資源的消耗。

發佈留言