Java CountDownLatch Class - Java 147

Java CountDownLatch Class – Java 147

Java CountDownLatch Class

可以設定等待的執行緒數量,這些執行緒在完成或準備好工作後,調用 countDown 來進行倒數計數,當計數器歸零時,主要任務就會開始執行, CountDownLatch Class Java 用於任務在開始之前等待其他執行緒完成任務,像是 Server,主要任務只有在所有必需的服務都已啟動時才能啟動, Class CountDownLatch Java 本篇增加了範例,並透過單元測試來驗證產出結果。

檔案目錄

./
   +- src
       +- test
       |   +- org
       |       +- ruoxue
       |           +- java_147
       |               +- multithreading
       |                   +- countdownlatch
       |                       +- CountDownLatchClassTest.java 

單元測試

Class CountDownLatch Java 提供倒數計數器、阻塞等操作。

getCount

CountDownLatch Class Java 建立 3 條執行緒,執行 3 個任務,每個任務耗時 1 秒完成,主執行緒等待 3 條執行緒全部完成,取得開始時以及結束時的計數器。

	protected class Worker implements Runnable {

		private CountDownLatch latch;
		private int id;

		public Worker(CountDownLatch latch, int id) {
			this.latch = latch;
			this.id = id;
		}

		@Override
		public void run() {
			try {
				SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd hh:mm:ss");
				System.out.println(
						sdf.format(new Date()) + " T[" + Thread.currentThread().getId() + "] worker: " + id + " ready");
				TimeUnit.SECONDS.sleep(1);
				System.out.println(sdf.format(new Date()) + " T[" + Thread.currentThread().getId() + "] worker: " + id
						+ " finished");
			} catch (Exception ex) {
				ex.printStackTrace();
			} finally {
				latch.countDown();
			}
		}
	}

	@Test
	public void getCount() {
		try {
			int taskSize = 3;
			CountDownLatch latch = new CountDownLatch(taskSize);
			AtomicInteger ids = new AtomicInteger();

			List<Thread> workers = Stream.generate(() -> new Thread(new Worker(latch, ids.getAndIncrement())))
					.limit(taskSize).collect(Collectors.toList());
			workers.forEach(e -> e.start());
			System.out.println(latch.getCount());
			assertThat(latch.getCount()).isEqualTo(taskSize);
			latch.await();
			System.out.println(latch.getCount());
			assertThat(latch.getCount()).isEqualTo(0);
		} catch (Exception ex) {
			ex.printStackTrace();
		}
	}
3
2023/02/06 05:26:52 T[13] worker: 2 ready
2023/02/06 05:26:52 T[11] worker: 0 ready
2023/02/06 05:26:52 T[12] worker: 1 ready
2023/02/06 05:26:53 T[13] worker: 2 finished
2023/02/06 05:26:53 T[12] worker: 1 finished
2023/02/06 05:26:53 T[11] worker: 0 finished
0

waitingWorker

CountDownLatch Class Java 建立 3 條執行緒,執行 3 個任務,每個任務耗時 3 秒完成,3 個任務都進入準備才開始執行,當 3 個任務都全部完成時,主執行緒才結束任務。

	protected class WaitingWorker implements Runnable {

		private CountDownLatch readyLatch;
		private CountDownLatch callingLatch;
		private CountDownLatch finishedLatch;
		private int id;
		private Map<Integer, String> output;

		public WaitingWorker(CountDownLatch readyLatch, CountDownLatch callingLatch, CountDownLatch finishedLatch,
				int id, Map<Integer, String> output) {
			this.readyLatch = readyLatch;
			this.callingLatch = callingLatch;
			this.finishedLatch = finishedLatch;
			this.id = id;
			this.output = output;
		}

		@Override
		public void run() {
			try {
				readyLatch.countDown();
				callingLatch.await();
				SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd hh:mm:ss");
				System.out.println(
						sdf.format(new Date()) + " T[" + Thread.currentThread().getId() + "] worker: " + id + " ready");
				TimeUnit.SECONDS.sleep(3);
				System.out.println(sdf.format(new Date()) + " T[" + Thread.currentThread().getId() + "] worker: " + id
						+ " finished");
				output.put(id, "finished");
			} catch (Exception ex) {
				ex.printStackTrace();
			} finally {
				finishedLatch.countDown();
			}
		}
	}

	@Test
	public void waitingWorker() {
		try {
			int taskSize = 3;
			CountDownLatch readyLatch = new CountDownLatch(taskSize);
			CountDownLatch callingLatch = new CountDownLatch(1);
			CountDownLatch finishedLatch = new CountDownLatch(taskSize);
			Map<Integer, String> output = Collections.synchronizedMap(new LinkedHashMap<Integer, String>());
			AtomicInteger ids = new AtomicInteger();

			List<Thread> workers = Stream
					.generate(() -> new Thread(
							new WaitingWorker(readyLatch, callingLatch, finishedLatch, ids.getAndIncrement(), output)))
					.limit(taskSize).collect(Collectors.toList());
			workers.forEach(e -> e.start());
			readyLatch.await();
			callingLatch.countDown();
			finishedLatch.await();

			assertThat(output).hasSize(taskSize).containsOnly(entry(0, "finished"), entry(1, "finished"),
					entry(2, "finished"));
		} catch (Exception ex) {
			ex.printStackTrace();
		}
	}
2023/02/04 03:24:53 T[13] worker: 2 ready
2023/02/04 03:24:53 T[11] worker: 0 ready
2023/02/04 03:24:53 T[12] worker: 1 ready
2023/02/04 03:24:56 T[12] worker: 1 finished
2023/02/04 03:24:56 T[11] worker: 0 finished
2023/02/04 03:24:56 T[13] worker: 2 finished

CountDownLatchClassTest.java

Coordinating Threads with CountDownLatch 新增單元測試,驗證是否符合預期。

package org.ruoxue.java_147.multithreading.countdownlatch;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.entry;

import java.text.SimpleDateFormat;
import java.util.Collections;
import java.util.Date;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.junit.Test;

public class CountDownLatchClassTest {

	protected class Worker implements Runnable {

		private CountDownLatch latch;
		private int id;

		public Worker(CountDownLatch latch, int id) {
			this.latch = latch;
			this.id = id;
		}

		@Override
		public void run() {
			try {
				SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd hh:mm:ss");
				System.out.println(
						sdf.format(new Date()) + " T[" + Thread.currentThread().getId() + "] worker: " + id + " ready");
				TimeUnit.SECONDS.sleep(1);
				System.out.println(sdf.format(new Date()) + " T[" + Thread.currentThread().getId() + "] worker: " + id
						+ " finished");
			} catch (Exception ex) {
				ex.printStackTrace();
			} finally {
				latch.countDown();
			}
		}
	}

	@Test
	public void getCount() {
		try {
			int taskSize = 3;
			CountDownLatch latch = new CountDownLatch(taskSize);
			AtomicInteger ids = new AtomicInteger();

			List<Thread> workers = Stream.generate(() -> new Thread(new Worker(latch, ids.getAndIncrement())))
					.limit(taskSize).collect(Collectors.toList());
			workers.forEach(e -> e.start());
			System.out.println(latch.getCount());
			assertThat(latch.getCount()).isEqualTo(taskSize);
			latch.await();
			System.out.println(latch.getCount());
			assertThat(latch.getCount()).isEqualTo(0);
		} catch (Exception ex) {
			ex.printStackTrace();
		}
	}

	protected class WaitingWorker implements Runnable {

		private CountDownLatch readyLatch;
		private CountDownLatch callingLatch;
		private CountDownLatch finishedLatch;
		private int id;
		private Map<Integer, String> output;

		public WaitingWorker(CountDownLatch readyLatch, CountDownLatch callingLatch, CountDownLatch finishedLatch,
				int id, Map<Integer, String> output) {
			this.readyLatch = readyLatch;
			this.callingLatch = callingLatch;
			this.finishedLatch = finishedLatch;
			this.id = id;
			this.output = output;
		}

		@Override
		public void run() {
			try {
				readyLatch.countDown();
				callingLatch.await();
				SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd hh:mm:ss");
				System.out.println(
						sdf.format(new Date()) + " T[" + Thread.currentThread().getId() + "] worker: " + id + " ready");
				TimeUnit.SECONDS.sleep(3);
				System.out.println(sdf.format(new Date()) + " T[" + Thread.currentThread().getId() + "] worker: " + id
						+ " finished");
				output.put(id, "finished");
			} catch (Exception ex) {
				ex.printStackTrace();
			} finally {
				finishedLatch.countDown();
			}
		}
	}

	@Test
	public void waitingWorker() {
		try {
			int taskSize = 3;
			CountDownLatch readyLatch = new CountDownLatch(taskSize);
			CountDownLatch callingLatch = new CountDownLatch(1);
			CountDownLatch finishedLatch = new CountDownLatch(taskSize);
			Map<Integer, String> output = Collections.synchronizedMap(new LinkedHashMap<Integer, String>());
			AtomicInteger ids = new AtomicInteger();

			List<Thread> workers = Stream
					.generate(() -> new Thread(
							new WaitingWorker(readyLatch, callingLatch, finishedLatch, ids.getAndIncrement(), output)))
					.limit(taskSize).collect(Collectors.toList());
			workers.forEach(e -> e.start());
			readyLatch.await();
			callingLatch.countDown();
			finishedLatch.await();

			assertThat(output).hasSize(taskSize).containsOnly(entry(0, "finished"), entry(1, "finished"),
					entry(2, "finished"));
		} catch (Exception ex) {
			ex.printStackTrace();
		}
	}
}

心得分享

Coordinating Threads with CountDownLatch 程式設計並行處理時,使用阻塞調用執行緒的方式,來保持同步,當倒數計時為零,則進行下一個步驟,如果正在做一些並行處理任務,可以實例化相同的計數器值的執行緒, Java CountDownLatch with Example 提供每個執行緒完成後調用 countDown ,保證調用 await 的依賴執行緒將阻塞, Class CountDownLatch Java 控制程式流程,直到工作執行緒完成。

發佈留言