CountDownLatch in Java with Examples - Java 147

CountDownLatch in Java with Examples – Java 147

CountDownLatch in Java with Examples

用於確保任務在開始之前等待其他執行緒,例如一個服務器,主要任務只有在所有必需的服務都已啟動時才能啟動,Java CountDownLatch 需要設定應該等待的執行緒數量,這些執行緒在完成或準備好工作後,通過調用 countDown 來進行倒數計數,一旦計數達到零,主要任務就會開始執行, CountDownLatch in Java 本篇增加了範例,並透過單元測試來驗證產出結果。

檔案目錄

./
   +- src
       +- test
       |   +- org
       |       +- ruoxue
       |           +- java_147
       |               +- multithreading
       |                   +- countdownlatch
       |                       +- CountDownLatchWithExamplesTest.java   

單元測試

CountDownLatch Java 提供倒數計數、等待、等待超時等操作。

await

CountDownLatch Java 建立 3 條執行緒,執行 3 個任務,每個任務耗時 1 秒完成,主執行緒等待 3 條執行緒全部完成才結束任務。

	protected class Worker implements Runnable {

		private CountDownLatch latch;
		private int id;
		private Map<Integer, String> output;

		public Worker(CountDownLatch latch, int id, Map<Integer, String> output) {
			this.latch = latch;
			this.id = id;
			this.output = output;
		}

		@Override
		public void run() {
			try {
				SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd hh:mm:ss");
				System.out.println(
						sdf.format(new Date()) + " T[" + Thread.currentThread().getId() + "] worker: " + id + " ready");
				TimeUnit.SECONDS.sleep(1);
				System.out.println(sdf.format(new Date()) + " T[" + Thread.currentThread().getId() + "] worker: " + id
						+ " finished");
				output.put(id, "finished");
			} catch (Exception ex) {
				ex.printStackTrace();
			} finally {
				latch.countDown();
			}
		}
	}

	@Test
	public void await() {
		try {
			int taskSize = 3;
			CountDownLatch latch = new CountDownLatch(taskSize);
			Map<Integer, String> output = Collections.synchronizedMap(new LinkedHashMap<Integer, String>());
			AtomicInteger ids = new AtomicInteger();

			List<Thread> workers = Stream
					.generate(() -> new Thread(new Worker(latch, ids.getAndIncrement(), output))).limit(taskSize)
					.collect(Collectors.toList());
			workers.forEach(e -> e.start());
			latch.await();

			assertThat(output).hasSize(taskSize).containsOnly(entry(0, "finished"), entry(1, "finished"),
					entry(2, "finished"));
		} catch (Exception ex) {
			ex.printStackTrace();
		}
	}
2023/02/04 03:21:00 T[12] worker: 1 ready
2023/02/04 03:21:00 T[11] worker: 0 ready
2023/02/04 03:21:00 T[13] worker: 2 ready
2023/02/04 03:21:01 T[11] worker: 0 finished
2023/02/04 03:21:01 T[13] worker: 2 finished
2023/02/04 03:21:01 T[12] worker: 1 finished

awaitTimeout

CountDownLatch Java 建立 3 條執行緒,執行 3 個任務,每個任務耗時 1 秒完成,其中 1 條執行緒拋出例外,沒有調用 countDown,主執行緒等待超時之後結束任務。

	protected class BrokenWorker implements Runnable {

		private CountDownLatch latch;
		private int id;
		private Map<Integer, String> output;

		public BrokenWorker(CountDownLatch latch, int id, Map<Integer, String> output) {
			this.latch = latch;
			this.id = id;
			this.output = output;
		}

		@Override
		public void run() {
			try {
				if (id == 1) {
					throw new RuntimeException("BrokenWorker " + id + " throw exception");
				}
				SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd hh:mm:ss");
				System.out.println(
						sdf.format(new Date()) + " T[" + Thread.currentThread().getId() + "] worker: " + id + " ready");
				TimeUnit.SECONDS.sleep(1);
				System.out.println(sdf.format(new Date()) + " T[" + Thread.currentThread().getId() + "] worker: " + id
						+ " finished");
				output.put(id, "finished");
				latch.countDown();
			} catch (Exception ex) {
				ex.printStackTrace();
			}
		}
	}

	@Test
	public void awaitTimeout() {
		try {
			int taskSize = 3;
			CountDownLatch latch = new CountDownLatch(taskSize);
			Map<Integer, String> output = Collections.synchronizedMap(new LinkedHashMap<Integer, String>());
			AtomicInteger ids = new AtomicInteger();

			List<Thread> workers = Stream
					.generate(() -> new Thread(new BrokenWorker(latch, ids.getAndIncrement(), output)))
					.limit(taskSize).collect(Collectors.toList());
			workers.forEach(e -> e.start());
			boolean completed = latch.await(5, TimeUnit.SECONDS);

			assertThat(completed).isFalse();
			assertThat(output).hasSize(2).containsOnly(entry(0, "finished"), entry(2, "finished"));
		} catch (Exception ex) {
			ex.printStackTrace();
		}
	}
java.lang.RuntimeException: BrokenWorker 1 throw exception
	at org.ruoxue.java_147.multithreading.CountDownLatchTest$BrokenWorker.run(CountDownLatchTest.java:89)
	at java.lang.Thread.run(Thread.java:750)
2023/02/04 03:22:56 T[11] worker: 0 ready
2023/02/04 03:22:56 T[13] worker: 2 ready
2023/02/04 03:22:57 T[11] worker: 0 finished
2023/02/04 03:22:57 T[13] worker: 2 finished

CountDownLatchWithExamplesTest.java

Java CountDownLatch 新增單元測試,驗證是否符合預期。

package org.ruoxue.java_147.multithreading.countdownlatch;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.entry;

import java.text.SimpleDateFormat;
import java.util.Collections;
import java.util.Date;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.junit.Test;

public class CountDownLatchWithExamplesTest {

	protected class Worker implements Runnable {

		private CountDownLatch latch;
		private int id;
		private Map<Integer, String> output;

		public Worker(CountDownLatch latch, int id, Map<Integer, String> output) {
			this.latch = latch;
			this.id = id;
			this.output = output;
		}

		@Override
		public void run() {
			try {
				SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd hh:mm:ss");
				System.out.println(
						sdf.format(new Date()) + " T[" + Thread.currentThread().getId() + "] worker: " + id + " ready");
				TimeUnit.SECONDS.sleep(1);
				System.out.println(sdf.format(new Date()) + " T[" + Thread.currentThread().getId() + "] worker: " + id
						+ " finished");
				output.put(id, "finished");
			} catch (Exception ex) {
				ex.printStackTrace();
			} finally {
				latch.countDown();
			}
		}
	}

	@Test
	public void await() {
		try {
			int taskSize = 3;
			CountDownLatch latch = new CountDownLatch(taskSize);
			Map<Integer, String> output = Collections.synchronizedMap(new LinkedHashMap<Integer, String>());
			AtomicInteger ids = new AtomicInteger();

			List<Thread> workers = Stream
					.generate(() -> new Thread(new Worker(latch, ids.getAndIncrement(), output))).limit(taskSize)
					.collect(Collectors.toList());
			workers.forEach(e -> e.start());
			latch.await();

			assertThat(output).hasSize(taskSize).containsOnly(entry(0, "finished"), entry(1, "finished"),
					entry(2, "finished"));
		} catch (Exception ex) {
			ex.printStackTrace();
		}
	}

	protected class BrokenWorker implements Runnable {

		private CountDownLatch latch;
		private int id;
		private Map<Integer, String> output;

		public BrokenWorker(CountDownLatch latch, int id, Map<Integer, String> output) {
			this.latch = latch;
			this.id = id;
			this.output = output;
		}

		@Override
		public void run() {
			try {
				if (id == 1) {
					throw new RuntimeException("BrokenWorker " + id + " throw exception");
				}
				SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd hh:mm:ss");
				System.out.println(
						sdf.format(new Date()) + " T[" + Thread.currentThread().getId() + "] worker: " + id + " ready");
				TimeUnit.SECONDS.sleep(1);
				System.out.println(sdf.format(new Date()) + " T[" + Thread.currentThread().getId() + "] worker: " + id
						+ " finished");
				output.put(id, "finished");
				latch.countDown();
			} catch (Exception ex) {
				ex.printStackTrace();
			}
		}
	}

	@Test
	public void awaitTimeout() {
		try {
			int taskSize = 3;
			CountDownLatch latch = new CountDownLatch(taskSize);
			Map<Integer, String> output = Collections.synchronizedMap(new LinkedHashMap<Integer, String>());
			AtomicInteger ids = new AtomicInteger();

			List<Thread> workers = Stream
					.generate(() -> new Thread(new BrokenWorker(latch, ids.getAndIncrement(), output)))
					.limit(taskSize).collect(Collectors.toList());
			workers.forEach(e -> e.start());
			boolean completed = latch.await(5, TimeUnit.SECONDS);

			assertThat(completed).isFalse();
			assertThat(output).hasSize(2).containsOnly(entry(0, "finished"), entry(2, "finished"));
		} catch (Exception ex) {
			ex.printStackTrace();
		}
	}
}

心得分享

CountDownLatch Example 可以使執行緒阻塞,直到其他執行緒完成指定任務,也可以在並行程式設計中使用 Java CountDownLatch 阻塞調用執行緒,直到倒數計時為零,如果正在做一些並行處理,可以想要處理的執行緒數相同的計數器值來實例化, CountDownLatch in Java 每個執行緒完成後調用 countDown ,保證調用 await 的依賴執行緒將阻塞,直到工作執行緒完成。

發佈留言