Table of Contents
ToggleJava CountDownLatch Class
可以設定等待的執行緒數量,這些執行緒在完成或準備好工作後,調用 countDown 來進行倒數計數,當計數器歸零時,主要任務就會開始執行, CountDownLatch Class Java 用於任務在開始之前等待其他執行緒完成任務,像是 Server,主要任務只有在所有必需的服務都已啟動時才能啟動, Class CountDownLatch Java 本篇增加了範例,並透過單元測試來驗證產出結果。
檔案目錄
./
+- src
+- test
| +- org
| +- ruoxue
| +- java_147
| +- multithreading
| +- countdownlatch
| +- CountDownLatchClassTest.java
單元測試
Class CountDownLatch Java 提供倒數計數器、阻塞等操作。
getCount
CountDownLatch Class Java 建立 3 條執行緒,執行 3 個任務,每個任務耗時 1 秒完成,主執行緒等待 3 條執行緒全部完成,取得開始時以及結束時的計數器。
protected class Worker implements Runnable {
private CountDownLatch latch;
private int id;
public Worker(CountDownLatch latch, int id) {
this.latch = latch;
this.id = id;
}
@Override
public void run() {
try {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd hh:mm:ss");
System.out.println(
sdf.format(new Date()) + " T[" + Thread.currentThread().getId() + "] worker: " + id + " ready");
TimeUnit.SECONDS.sleep(1);
System.out.println(sdf.format(new Date()) + " T[" + Thread.currentThread().getId() + "] worker: " + id
+ " finished");
} catch (Exception ex) {
ex.printStackTrace();
} finally {
latch.countDown();
}
}
}
@Test
public void getCount() {
try {
int taskSize = 3;
CountDownLatch latch = new CountDownLatch(taskSize);
AtomicInteger ids = new AtomicInteger();
List<Thread> workers = Stream.generate(() -> new Thread(new Worker(latch, ids.getAndIncrement())))
.limit(taskSize).collect(Collectors.toList());
workers.forEach(e -> e.start());
System.out.println(latch.getCount());
assertThat(latch.getCount()).isEqualTo(taskSize);
latch.await();
System.out.println(latch.getCount());
assertThat(latch.getCount()).isEqualTo(0);
} catch (Exception ex) {
ex.printStackTrace();
}
}
3
2023/02/06 05:26:52 T[13] worker: 2 ready
2023/02/06 05:26:52 T[11] worker: 0 ready
2023/02/06 05:26:52 T[12] worker: 1 ready
2023/02/06 05:26:53 T[13] worker: 2 finished
2023/02/06 05:26:53 T[12] worker: 1 finished
2023/02/06 05:26:53 T[11] worker: 0 finished
0
waitingWorker
CountDownLatch Class Java 建立 3 條執行緒,執行 3 個任務,每個任務耗時 3 秒完成,3 個任務都進入準備才開始執行,當 3 個任務都全部完成時,主執行緒才結束任務。
protected class WaitingWorker implements Runnable {
private CountDownLatch readyLatch;
private CountDownLatch callingLatch;
private CountDownLatch finishedLatch;
private int id;
private Map<Integer, String> output;
public WaitingWorker(CountDownLatch readyLatch, CountDownLatch callingLatch, CountDownLatch finishedLatch,
int id, Map<Integer, String> output) {
this.readyLatch = readyLatch;
this.callingLatch = callingLatch;
this.finishedLatch = finishedLatch;
this.id = id;
this.output = output;
}
@Override
public void run() {
try {
readyLatch.countDown();
callingLatch.await();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd hh:mm:ss");
System.out.println(
sdf.format(new Date()) + " T[" + Thread.currentThread().getId() + "] worker: " + id + " ready");
TimeUnit.SECONDS.sleep(3);
System.out.println(sdf.format(new Date()) + " T[" + Thread.currentThread().getId() + "] worker: " + id
+ " finished");
output.put(id, "finished");
} catch (Exception ex) {
ex.printStackTrace();
} finally {
finishedLatch.countDown();
}
}
}
@Test
public void waitingWorker() {
try {
int taskSize = 3;
CountDownLatch readyLatch = new CountDownLatch(taskSize);
CountDownLatch callingLatch = new CountDownLatch(1);
CountDownLatch finishedLatch = new CountDownLatch(taskSize);
Map<Integer, String> output = Collections.synchronizedMap(new LinkedHashMap<Integer, String>());
AtomicInteger ids = new AtomicInteger();
List<Thread> workers = Stream
.generate(() -> new Thread(
new WaitingWorker(readyLatch, callingLatch, finishedLatch, ids.getAndIncrement(), output)))
.limit(taskSize).collect(Collectors.toList());
workers.forEach(e -> e.start());
readyLatch.await();
callingLatch.countDown();
finishedLatch.await();
assertThat(output).hasSize(taskSize).containsOnly(entry(0, "finished"), entry(1, "finished"),
entry(2, "finished"));
} catch (Exception ex) {
ex.printStackTrace();
}
}
2023/02/04 03:24:53 T[13] worker: 2 ready
2023/02/04 03:24:53 T[11] worker: 0 ready
2023/02/04 03:24:53 T[12] worker: 1 ready
2023/02/04 03:24:56 T[12] worker: 1 finished
2023/02/04 03:24:56 T[11] worker: 0 finished
2023/02/04 03:24:56 T[13] worker: 2 finished
CountDownLatchClassTest.java
Coordinating Threads with CountDownLatch 新增單元測試,驗證是否符合預期。
package org.ruoxue.java_147.multithreading.countdownlatch;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.entry;
import java.text.SimpleDateFormat;
import java.util.Collections;
import java.util.Date;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.junit.Test;
public class CountDownLatchClassTest {
protected class Worker implements Runnable {
private CountDownLatch latch;
private int id;
public Worker(CountDownLatch latch, int id) {
this.latch = latch;
this.id = id;
}
@Override
public void run() {
try {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd hh:mm:ss");
System.out.println(
sdf.format(new Date()) + " T[" + Thread.currentThread().getId() + "] worker: " + id + " ready");
TimeUnit.SECONDS.sleep(1);
System.out.println(sdf.format(new Date()) + " T[" + Thread.currentThread().getId() + "] worker: " + id
+ " finished");
} catch (Exception ex) {
ex.printStackTrace();
} finally {
latch.countDown();
}
}
}
@Test
public void getCount() {
try {
int taskSize = 3;
CountDownLatch latch = new CountDownLatch(taskSize);
AtomicInteger ids = new AtomicInteger();
List<Thread> workers = Stream.generate(() -> new Thread(new Worker(latch, ids.getAndIncrement())))
.limit(taskSize).collect(Collectors.toList());
workers.forEach(e -> e.start());
System.out.println(latch.getCount());
assertThat(latch.getCount()).isEqualTo(taskSize);
latch.await();
System.out.println(latch.getCount());
assertThat(latch.getCount()).isEqualTo(0);
} catch (Exception ex) {
ex.printStackTrace();
}
}
protected class WaitingWorker implements Runnable {
private CountDownLatch readyLatch;
private CountDownLatch callingLatch;
private CountDownLatch finishedLatch;
private int id;
private Map<Integer, String> output;
public WaitingWorker(CountDownLatch readyLatch, CountDownLatch callingLatch, CountDownLatch finishedLatch,
int id, Map<Integer, String> output) {
this.readyLatch = readyLatch;
this.callingLatch = callingLatch;
this.finishedLatch = finishedLatch;
this.id = id;
this.output = output;
}
@Override
public void run() {
try {
readyLatch.countDown();
callingLatch.await();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd hh:mm:ss");
System.out.println(
sdf.format(new Date()) + " T[" + Thread.currentThread().getId() + "] worker: " + id + " ready");
TimeUnit.SECONDS.sleep(3);
System.out.println(sdf.format(new Date()) + " T[" + Thread.currentThread().getId() + "] worker: " + id
+ " finished");
output.put(id, "finished");
} catch (Exception ex) {
ex.printStackTrace();
} finally {
finishedLatch.countDown();
}
}
}
@Test
public void waitingWorker() {
try {
int taskSize = 3;
CountDownLatch readyLatch = new CountDownLatch(taskSize);
CountDownLatch callingLatch = new CountDownLatch(1);
CountDownLatch finishedLatch = new CountDownLatch(taskSize);
Map<Integer, String> output = Collections.synchronizedMap(new LinkedHashMap<Integer, String>());
AtomicInteger ids = new AtomicInteger();
List<Thread> workers = Stream
.generate(() -> new Thread(
new WaitingWorker(readyLatch, callingLatch, finishedLatch, ids.getAndIncrement(), output)))
.limit(taskSize).collect(Collectors.toList());
workers.forEach(e -> e.start());
readyLatch.await();
callingLatch.countDown();
finishedLatch.await();
assertThat(output).hasSize(taskSize).containsOnly(entry(0, "finished"), entry(1, "finished"),
entry(2, "finished"));
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
心得分享
Coordinating Threads with CountDownLatch 程式設計並行處理時,使用阻塞調用執行緒的方式,來保持同步,當倒數計時為零,則進行下一個步驟,如果正在做一些並行處理任務,可以實例化相同的計數器值的執行緒, Java CountDownLatch with Example 提供每個執行緒完成後調用 countDown ,保證調用 await 的依賴執行緒將阻塞, Class CountDownLatch Java 控制程式流程,直到工作執行緒完成。