Java ExecutorService Methods - Java 147

Java ExecutorService Methods – Java 147

Java ExecutorService Methods

多執行緒異步執行任務的接口,管理維護執行緒池、分配任務執行等,如果任務數多於可用執行緒數, ExecutorService in Java 可以將任務暫時放置佇列,並提供中斷,放棄等不同策略供選擇,以進行任務的下一步操作, ExecutorService Java Methods 介紹常見的 execute 、 submit 、 awaitTermination 等方法,本篇增加了範例,並透過單元測試來驗證產出結果。

檔案目錄

./
   +- src
       +- test
       |   +- org
       |       +- ruoxue
       |           +- java_147
       |               +- multithreading
       |                   +- executorservice
       |                       +- ExecutorServiceMethodsTest.java   

單元測試

ExecutorService Methods Java 提供執行、提交、取消等操作執行緒池。

execute

ExecutorService Methods Java 建立一個執行緒池,核心數量 2 條執行緒,最大數量 5 條執行緒,使用 LinkedBlockingQueue ,執行 3 個任務,每個任務耗時 1 秒完成。

	@Test
	public void execute() {
		int poolSize = 2;
		int maxPoolSize = 5;
		ExecutorService executorService = new ThreadPoolExecutor(poolSize, maxPoolSize, 0L, TimeUnit.SECONDS,
				new LinkedBlockingQueue<Runnable>());
		int taskSize = 3;
		CountDownLatch latch = new CountDownLatch(taskSize);
		AtomicInteger ids = new AtomicInteger();
		for (int i = 0; i < taskSize; i++) {
			executorService.execute(() -> {
				try {
					int id = ids.incrementAndGet();
					System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + id + " ready");
					TimeUnit.SECONDS.sleep(1);
					System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + id + " finished");
				} catch (Exception ex) {
					ex.printStackTrace();
				} finally {
					latch.countDown();
				}
			});
		}

		try {
			latch.await();
		} catch (InterruptedException ex) {
			ex.printStackTrace();
		}
	}
T[11] worker: 1 ready
T[12] worker: 2 ready
T[11] worker: 1 finished
T[12] worker: 2 finished
T[11] worker: 3 ready
T[11] worker: 3 finished

awaitTermination

ExecutorService Methods Java 建立一個執行緒池,核心數量 2 條執行緒,最大數量 5 條執行緒,使用 LinkedBlockingQueue ,執行 3 個任務,每個任務耗時 1 秒完成,執行緒池等待 3 秒後結束。

	@Test
	public void awaitTermination() {
		int poolSize = 2;
		int maxPoolSize = 5;
		ExecutorService executorService = new ThreadPoolExecutor(poolSize, maxPoolSize, 0L, TimeUnit.SECONDS,
				new LinkedBlockingQueue<Runnable>());
		int taskSize = 3;
		AtomicInteger ids = new AtomicInteger();
		for (int i = 0; i < taskSize; i++) {
			executorService.execute(() -> {
				try {
					int id = ids.incrementAndGet();
					System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + id + " ready");
					TimeUnit.SECONDS.sleep(1);
					System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + id + " finished");
				} catch (Exception ex) {
					ex.printStackTrace();
				}
			});
		}

		executorService.shutdown();
		try {
			if (!executorService.awaitTermination(3, TimeUnit.SECONDS)) {
				executorService.shutdownNow();
			}
		} catch (InterruptedException e) {
			executorService.shutdownNow();
		}
	}
T[12] worker: 2 ready
T[11] worker: 1 ready
T[11] worker: 1 finished
T[12] worker: 2 finished
T[12] worker: 3 ready
T[12] worker: 3 finished

submitRunnable

ExecutorService Methods Java 建立一個執行緒池,核心數量 2 條執行緒,最大數量 5 條執行緒,使用 LinkedBlockingQueue ,執行 3 個任務,每個任務耗時 1 秒完成。

	@Test
	public void submitRunnable() {
		int poolSize = 2;
		int maxPoolSize = 5;
		ThreadPoolExecutor executorService = new ThreadPoolExecutor(poolSize, maxPoolSize, 0L, TimeUnit.SECONDS,
				new LinkedBlockingQueue<Runnable>());
		int taskSize = 3;
		AtomicInteger ids = new AtomicInteger();
		List<Future<String>> futures = new ArrayList<Future<String>>();
		for (int i = 0; i < taskSize; i++) {
			int id = ids.incrementAndGet();
			Future<String> future = executorService.submit(() -> {
				System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + id + " ready");
				try {
					TimeUnit.SECONDS.sleep(1);
				} catch (InterruptedException ex) {
					ex.printStackTrace();
				}
				System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + id + " finished");
			}, id + " OK");
			futures.add(future);
		}

		futures.forEach(e -> {
			try {
				String result = e.get();
				System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + result);
			} catch (Exception ex) {
				ex.printStackTrace();
			}
		});
	}
T[11] worker: 1 ready
T[12] worker: 2 ready
T[11] worker: 1 finished
T[12] worker: 2 finished
T[11] worker: 3 ready
T[1] worker: 1 OK
T[1] worker: 2 OK
T[11] worker: 3 finished
T[1] worker: 3 OK

submitCallable

ExecutorService Methods in Java 建立一個執行緒池,核心數量 2 條執行緒,最大數量 5 條執行緒,使用 LinkedBlockingQueue ,執行 3 個任務,每個任務耗時 1 秒完成。

	@Test
	public void submitCallable() {
		int poolSize = 2;
		int maxPoolSize = 5;
		ThreadPoolExecutor executorService = new ThreadPoolExecutor(poolSize, maxPoolSize, 0L, TimeUnit.SECONDS,
				new LinkedBlockingQueue<Runnable>());
		int taskSize = 3;
		AtomicInteger ids = new AtomicInteger();
		List<Future<String>> futures = new ArrayList<Future<String>>();
		for (int i = 0; i < taskSize; i++) {
			Future<String> future = executorService.submit(() -> {
				int id = ids.incrementAndGet();
				System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + id + " ready");
				TimeUnit.SECONDS.sleep(1);
				System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + id + " finished");
				return id + " OK";
			});
			futures.add(future);
		}

		futures.forEach(e -> {
			try {
				String result = e.get();
				System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + result);
			} catch (Exception ex) {
				ex.printStackTrace();
			}
		});
	}
T[11] worker: 1 ready
T[12] worker: 2 ready
T[12] worker: 2 finished
T[11] worker: 1 finished
T[12] worker: 3 ready
T[1] worker: 1 OK
T[1] worker: 2 OK
T[12] worker: 3 finished
T[1] worker: 3 OK

ExecutorServiceMethodsTest.java

ExecutorService Methods in Java 新增單元測試,驗證 ExecutorService in Java 是否符合預期。

package org.ruoxue.java_147.multithreading.executorservice;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.Test;

public class ExecutorServiceMethodsTest {

	@Test
	public void execute() {
		int poolSize = 2;
		int maxPoolSize = 5;
		ExecutorService executorService = new ThreadPoolExecutor(poolSize, maxPoolSize, 0L, TimeUnit.SECONDS,
				new LinkedBlockingQueue<Runnable>());
		int taskSize = 3;
		CountDownLatch latch = new CountDownLatch(taskSize);
		AtomicInteger ids = new AtomicInteger();
		for (int i = 0; i < taskSize; i++) {
			executorService.execute(() -> {
				try {
					int id = ids.incrementAndGet();
					System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + id + " ready");
					TimeUnit.SECONDS.sleep(1);
					System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + id + " finished");
				} catch (Exception ex) {
					ex.printStackTrace();
				} finally {
					latch.countDown();
				}
			});
		}

		try {
			latch.await();
		} catch (InterruptedException ex) {
			ex.printStackTrace();
		}
	}

	@Test
	public void awaitTermination() {
		int poolSize = 2;
		int maxPoolSize = 5;
		ExecutorService executorService = new ThreadPoolExecutor(poolSize, maxPoolSize, 0L, TimeUnit.SECONDS,
				new LinkedBlockingQueue<Runnable>());
		int taskSize = 3;
		AtomicInteger ids = new AtomicInteger();
		for (int i = 0; i < taskSize; i++) {
			executorService.execute(() -> {
				try {
					int id = ids.incrementAndGet();
					System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + id + " ready");
					TimeUnit.SECONDS.sleep(1);
					System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + id + " finished");
				} catch (Exception ex) {
					ex.printStackTrace();
				}
			});
		}

		executorService.shutdown();
		try {
			if (!executorService.awaitTermination(3, TimeUnit.SECONDS)) {
				executorService.shutdownNow();
			}
		} catch (InterruptedException e) {
			executorService.shutdownNow();
		}
	}

	@Test
	public void submitRunnable() {
		int poolSize = 2;
		int maxPoolSize = 5;
		ThreadPoolExecutor executorService = new ThreadPoolExecutor(poolSize, maxPoolSize, 0L, TimeUnit.SECONDS,
				new LinkedBlockingQueue<Runnable>());
		int taskSize = 3;
		AtomicInteger ids = new AtomicInteger();
		List<Future<String>> futures = new ArrayList<Future<String>>();
		for (int i = 0; i < taskSize; i++) {
			int id = ids.incrementAndGet();
			Future<String> future = executorService.submit(() -> {
				System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + id + " ready");
				try {
					TimeUnit.SECONDS.sleep(1);
				} catch (InterruptedException ex) {
					ex.printStackTrace();
				}
				System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + id + " finished");
			}, id + " OK");
			futures.add(future);
		}

		futures.forEach(e -> {
			try {
				String result = e.get();
				System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + result);
			} catch (Exception ex) {
				ex.printStackTrace();
			}
		});
	}

	@Test
	public void submitCallable() {
		int poolSize = 2;
		int maxPoolSize = 5;
		ThreadPoolExecutor executorService = new ThreadPoolExecutor(poolSize, maxPoolSize, 0L, TimeUnit.SECONDS,
				new LinkedBlockingQueue<Runnable>());
		int taskSize = 3;
		AtomicInteger ids = new AtomicInteger();
		List<Future<String>> futures = new ArrayList<Future<String>>();
		for (int i = 0; i < taskSize; i++) {
			Future<String> future = executorService.submit(() -> {
				int id = ids.incrementAndGet();
				System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + id + " ready");
				TimeUnit.SECONDS.sleep(1);
				System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + id + " finished");
				return id + " OK";
			});
			futures.add(future);
		}

		futures.forEach(e -> {
			try {
				String result = e.get();
				System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + result);
			} catch (Exception ex) {
				ex.printStackTrace();
			}
		});
	}
}

心得分享

ExecutorService in Java 提供一個執行緒池和一個用於為其分配任務的 API ,統一管理有限數量的執行緒和任務分配等問題, ExecutorService Methods in Java 提供了幾種 ExecutorService 常見方法的操作範例。

發佈留言