Java ExecutorService Interface - Java 147

Java ExecutorService Interface – Java 147

Java ExecutorService Interface

執行異步任務,實現了 Runnable 接口 及 Callable 接口,提供 execute 方法與 submit 方法,當任務數多於可用執行緒數,可以將任務暫時放置佇列,多元的佇列類別,可視實際需求選擇, ExecutorService Interface 定義了執行、取消、觸發單一或所有任務及關閉等執行緒池操作,介紹常見的 cancel 、 invokeAny 、 invokeAll 等方法,本篇增加了範例,並透過單元測試來驗證產出結果。

檔案目錄

./
   +- src
       +- test
       |   +- org
       |       +- ruoxue
       |           +- java_147
       |               +- multithreading
       |                   +- executorservice
       |                       +- ExecutorServiceInterfaceTest.java   

單元測試

ExecutorService Interface Java 提供執行、提交等操作執行緒池。

cancel

ExecutorService Interface Java 建立一個執行緒池,核心數量 2 條執行緒,最大數量 5 條執行緒, Java ExecutorService Example 使用 LinkedBlockingQueue ,執行 3 個任務,每個任務耗時 1 秒完成,當尚未完成時,取消任務。

	@Test
	public void cancel() {
		int poolSize = 2;
		int maxPoolSize = 5;
		ThreadPoolExecutor executorService = new ThreadPoolExecutor(poolSize, maxPoolSize, 0L, TimeUnit.SECONDS,
				new LinkedBlockingQueue<Runnable>());
		int taskSize = 3;
		AtomicInteger ids = new AtomicInteger();
		List<Future<String>> futures = new ArrayList<Future<String>>();
		for (int i = 0; i < taskSize; i++) {
			Future<String> future = executorService.submit(() -> {
				int id = ids.incrementAndGet();
				System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + id + " ready");
				TimeUnit.SECONDS.sleep(5);
				System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + id + " finished");
				return id + " OK";
			});
			futures.add(future);
		}

		futures.forEach(e -> {
			try {
				if (!e.isDone()) {
					e.cancel(true);
				}
				String result = e.get();
				System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + result);
			} catch (Exception ex) {
				ex.printStackTrace();
			}
		});

		try {
			executorService.awaitTermination(3, TimeUnit.SECONDS);
		} catch (InterruptedException ex) {
			ex.printStackTrace();
		}
	}
T[11] worker: 2 ready
T[12] worker: 1 ready
T[11] worker: 3 ready
java.util.concurrent.CancellationException
	at java.util.concurrent.FutureTask.report(FutureTask.java:121)
	at java.util.concurrent.FutureTask.get(FutureTask.java:192)
	at org.ruoxue.java_147.multithreading.ExecutorServiceMethodsTest.lambda$7(ExecutorServiceMethodsTest.java:169)

invokeAny

ExecutorService Interface Java 建立一個執行緒池,核心數量 2 條執行緒,最大數量 5 條執行緒,使用 LinkedBlockingQueue , Java ExecutorService Example 執行 3 個任務,每個任務耗時 1 秒完成,只要任何一個任務完成或拋出例外,就結束工作。

	@Test
	public void invokeAny() {
		int poolSize = 2;
		int maxPoolSize = 5;
		ExecutorService executorService = new ThreadPoolExecutor(poolSize, maxPoolSize, 0L, TimeUnit.SECONDS,
				new LinkedBlockingQueue<Runnable>());
		int taskSize = 3;
		AtomicInteger ids = new AtomicInteger();
		List<Callable<String>> callables = new ArrayList<Callable<String>>();
		for (int i = 0; i < taskSize; i++) {
			callables.add(new Callable<String>() {
				public String call() throws Exception {
					int id = ids.incrementAndGet();
					System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + id + " ready");
					TimeUnit.SECONDS.sleep(1);
					System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + id + " finished");

					return id + " OK";
				}
			});
		}

		String result = null;
		try {
			result = executorService.invokeAny(callables);
		} catch (InterruptedException | ExecutionException ex) {
			ex.printStackTrace();
		}
		System.out.println("worker: " + result);
	}
T[11] worker: 1 ready
T[12] worker: 2 ready
T[12] worker: 2 finished
T[11] worker: 1 finished
T[11] worker: 3 ready
worker: 2 OK

invokeAll

ExecutorService Interface Java 建立一個執行緒池,核心數量 2 條執行緒,最大數量 5 條執行緒, Java ExecutorService Example 使用 LinkedBlockingQueue ,執行 3 個任務,每個任務耗時 1 秒完成。

	@Test
	public void invokeAll() {
		int poolSize = 2;
		int maxPoolSize = 5;
		ExecutorService executorService = new ThreadPoolExecutor(poolSize, maxPoolSize, 0L, TimeUnit.SECONDS,
				new LinkedBlockingQueue<Runnable>());
		int taskSize = 3;
		AtomicInteger ids = new AtomicInteger();
		List<Callable<String>> callables = new ArrayList<Callable<String>>();
		for (int i = 0; i < taskSize; i++) {
			callables.add(() -> {
				int id = ids.incrementAndGet();
				System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + id + " ready");
				TimeUnit.SECONDS.sleep(1);
				System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + id + " finished");
				return id + " OK";
			});
		}

		List<Future<String>> futures = null;
		try {
			futures = executorService.invokeAll(callables);
		} catch (InterruptedException ex) {
			ex.printStackTrace();
		}

		futures.forEach(e -> {
			try {
				String result = e.get();
				System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + result);
			} catch (Exception ex) {
				ex.printStackTrace();
			}
		});
	}
T[12] worker: 2 ready
T[11] worker: 1 ready
T[11] worker: 1 finished
T[12] worker: 2 finished
T[11] worker: 3 ready
T[11] worker: 3 finished
T[1] worker: 1 OK
T[1] worker: 2 OK
T[1] worker: 3 OK

ExecutorServiceInterfaceTest.java

ExecutorService Methods in Java 新增單元測試,驗證 ExecutorService in Java 是否符合預期。

package org.ruoxue.java_147.multithreading.executorservice;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.Test;

public class ExecutorServiceInterfaceTest {

	@Test
	public void cancel() {
		int poolSize = 2;
		int maxPoolSize = 5;
		ThreadPoolExecutor executorService = new ThreadPoolExecutor(poolSize, maxPoolSize, 0L, TimeUnit.SECONDS,
				new LinkedBlockingQueue<Runnable>());
		int taskSize = 3;
		AtomicInteger ids = new AtomicInteger();
		List<Future<String>> futures = new ArrayList<Future<String>>();
		for (int i = 0; i < taskSize; i++) {
			Future<String> future = executorService.submit(() -> {
				int id = ids.incrementAndGet();
				System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + id + " ready");
				TimeUnit.SECONDS.sleep(5);
				System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + id + " finished");
				return id + " OK";
			});
			futures.add(future);
		}

		futures.forEach(e -> {
			try {
				if (!e.isDone()) {
					e.cancel(true);
				}
				String result = e.get();
				System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + result);
			} catch (Exception ex) {
				ex.printStackTrace();
			}
		});

		try {
			executorService.awaitTermination(3, TimeUnit.SECONDS);
		} catch (InterruptedException ex) {
			ex.printStackTrace();
		}
	}

	@Test
	public void invokeAny() {
		int poolSize = 2;
		int maxPoolSize = 5;
		ExecutorService executorService = new ThreadPoolExecutor(poolSize, maxPoolSize, 0L, TimeUnit.SECONDS,
				new LinkedBlockingQueue<Runnable>());
		int taskSize = 3;
		AtomicInteger ids = new AtomicInteger();
		List<Callable<String>> callables = new ArrayList<Callable<String>>();
		for (int i = 0; i < taskSize; i++) {
			callables.add(new Callable<String>() {
				public String call() throws Exception {
					int id = ids.incrementAndGet();
					System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + id + " ready");
					TimeUnit.SECONDS.sleep(1);
					System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + id + " finished");

					return id + " OK";
				}
			});
		}

		String result = null;
		try {
			result = executorService.invokeAny(callables);
		} catch (InterruptedException | ExecutionException ex) {
			ex.printStackTrace();
		}
		System.out.println("worker: " + result);
	}

	@Test
	public void invokeAll() {
		int poolSize = 2;
		int maxPoolSize = 5;
		ExecutorService executorService = new ThreadPoolExecutor(poolSize, maxPoolSize, 0L, TimeUnit.SECONDS,
				new LinkedBlockingQueue<Runnable>());
		int taskSize = 3;
		AtomicInteger ids = new AtomicInteger();
		List<Callable<String>> callables = new ArrayList<Callable<String>>();
		for (int i = 0; i < taskSize; i++) {
			callables.add(() -> {
				int id = ids.incrementAndGet();
				System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + id + " ready");
				TimeUnit.SECONDS.sleep(1);
				System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + id + " finished");
				return id + " OK";
			});
		}

		List<Future<String>> futures = null;
		try {
			futures = executorService.invokeAll(callables);
		} catch (InterruptedException ex) {
			ex.printStackTrace();
		}

		futures.forEach(e -> {
			try {
				String result = e.get();
				System.out.println("T[" + Thread.currentThread().getId() + "] worker: " + result);
			} catch (Exception ex) {
				ex.printStackTrace();
			}
		});
	}
}

心得分享

Java ExecutorService Example 為了重用執行緒,不要在每次使用時建立,用完執行緒後回收,下次再重新建立新的執行緒,因此,為了減少系統負擔,提供一個能夠穩定的調度管理執行緒的工具,是非常重要的課題, Interface ExecutorService Java 提供一個執行緒池及用於為其分配任務的 API ,高效率地管理有限數量的執行緒和任務分配等狀況,提供了幾種 ExecutorService 常見方法的操作範例。

發佈留言