Table of Contents
ToggleCountDownLatch in Java with Examples
用於確保任務在開始之前等待其他執行緒,例如一個服務器,主要任務只有在所有必需的服務都已啟動時才能啟動,Java CountDownLatch 需要設定應該等待的執行緒數量,這些執行緒在完成或準備好工作後,通過調用 countDown 來進行倒數計數,一旦計數達到零,主要任務就會開始執行, CountDownLatch in Java 本篇增加了範例,並透過單元測試來驗證產出結果。
檔案目錄
./
+- src
+- test
| +- org
| +- ruoxue
| +- java_147
| +- multithreading
| +- countdownlatch
| +- CountDownLatchWithExamplesTest.java
單元測試
CountDownLatch Java 提供倒數計數、等待、等待超時等操作。
await
CountDownLatch Java 建立 3 條執行緒,執行 3 個任務,每個任務耗時 1 秒完成,主執行緒等待 3 條執行緒全部完成才結束任務。
protected class Worker implements Runnable {
private CountDownLatch latch;
private int id;
private Map<Integer, String> output;
public Worker(CountDownLatch latch, int id, Map<Integer, String> output) {
this.latch = latch;
this.id = id;
this.output = output;
}
@Override
public void run() {
try {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd hh:mm:ss");
System.out.println(
sdf.format(new Date()) + " T[" + Thread.currentThread().getId() + "] worker: " + id + " ready");
TimeUnit.SECONDS.sleep(1);
System.out.println(sdf.format(new Date()) + " T[" + Thread.currentThread().getId() + "] worker: " + id
+ " finished");
output.put(id, "finished");
} catch (Exception ex) {
ex.printStackTrace();
} finally {
latch.countDown();
}
}
}
@Test
public void await() {
try {
int taskSize = 3;
CountDownLatch latch = new CountDownLatch(taskSize);
Map<Integer, String> output = Collections.synchronizedMap(new LinkedHashMap<Integer, String>());
AtomicInteger ids = new AtomicInteger();
List<Thread> workers = Stream
.generate(() -> new Thread(new Worker(latch, ids.getAndIncrement(), output))).limit(taskSize)
.collect(Collectors.toList());
workers.forEach(e -> e.start());
latch.await();
assertThat(output).hasSize(taskSize).containsOnly(entry(0, "finished"), entry(1, "finished"),
entry(2, "finished"));
} catch (Exception ex) {
ex.printStackTrace();
}
}
2023/02/04 03:21:00 T[12] worker: 1 ready
2023/02/04 03:21:00 T[11] worker: 0 ready
2023/02/04 03:21:00 T[13] worker: 2 ready
2023/02/04 03:21:01 T[11] worker: 0 finished
2023/02/04 03:21:01 T[13] worker: 2 finished
2023/02/04 03:21:01 T[12] worker: 1 finished
awaitTimeout
CountDownLatch Java 建立 3 條執行緒,執行 3 個任務,每個任務耗時 1 秒完成,其中 1 條執行緒拋出例外,沒有調用 countDown,主執行緒等待超時之後結束任務。
protected class BrokenWorker implements Runnable {
private CountDownLatch latch;
private int id;
private Map<Integer, String> output;
public BrokenWorker(CountDownLatch latch, int id, Map<Integer, String> output) {
this.latch = latch;
this.id = id;
this.output = output;
}
@Override
public void run() {
try {
if (id == 1) {
throw new RuntimeException("BrokenWorker " + id + " throw exception");
}
SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd hh:mm:ss");
System.out.println(
sdf.format(new Date()) + " T[" + Thread.currentThread().getId() + "] worker: " + id + " ready");
TimeUnit.SECONDS.sleep(1);
System.out.println(sdf.format(new Date()) + " T[" + Thread.currentThread().getId() + "] worker: " + id
+ " finished");
output.put(id, "finished");
latch.countDown();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
@Test
public void awaitTimeout() {
try {
int taskSize = 3;
CountDownLatch latch = new CountDownLatch(taskSize);
Map<Integer, String> output = Collections.synchronizedMap(new LinkedHashMap<Integer, String>());
AtomicInteger ids = new AtomicInteger();
List<Thread> workers = Stream
.generate(() -> new Thread(new BrokenWorker(latch, ids.getAndIncrement(), output)))
.limit(taskSize).collect(Collectors.toList());
workers.forEach(e -> e.start());
boolean completed = latch.await(5, TimeUnit.SECONDS);
assertThat(completed).isFalse();
assertThat(output).hasSize(2).containsOnly(entry(0, "finished"), entry(2, "finished"));
} catch (Exception ex) {
ex.printStackTrace();
}
}
java.lang.RuntimeException: BrokenWorker 1 throw exception
at org.ruoxue.java_147.multithreading.CountDownLatchTest$BrokenWorker.run(CountDownLatchTest.java:89)
at java.lang.Thread.run(Thread.java:750)
2023/02/04 03:22:56 T[11] worker: 0 ready
2023/02/04 03:22:56 T[13] worker: 2 ready
2023/02/04 03:22:57 T[11] worker: 0 finished
2023/02/04 03:22:57 T[13] worker: 2 finished
CountDownLatchWithExamplesTest.java
Java CountDownLatch 新增單元測試,驗證是否符合預期。
package org.ruoxue.java_147.multithreading.countdownlatch;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.entry;
import java.text.SimpleDateFormat;
import java.util.Collections;
import java.util.Date;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.junit.Test;
public class CountDownLatchWithExamplesTest {
protected class Worker implements Runnable {
private CountDownLatch latch;
private int id;
private Map<Integer, String> output;
public Worker(CountDownLatch latch, int id, Map<Integer, String> output) {
this.latch = latch;
this.id = id;
this.output = output;
}
@Override
public void run() {
try {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd hh:mm:ss");
System.out.println(
sdf.format(new Date()) + " T[" + Thread.currentThread().getId() + "] worker: " + id + " ready");
TimeUnit.SECONDS.sleep(1);
System.out.println(sdf.format(new Date()) + " T[" + Thread.currentThread().getId() + "] worker: " + id
+ " finished");
output.put(id, "finished");
} catch (Exception ex) {
ex.printStackTrace();
} finally {
latch.countDown();
}
}
}
@Test
public void await() {
try {
int taskSize = 3;
CountDownLatch latch = new CountDownLatch(taskSize);
Map<Integer, String> output = Collections.synchronizedMap(new LinkedHashMap<Integer, String>());
AtomicInteger ids = new AtomicInteger();
List<Thread> workers = Stream
.generate(() -> new Thread(new Worker(latch, ids.getAndIncrement(), output))).limit(taskSize)
.collect(Collectors.toList());
workers.forEach(e -> e.start());
latch.await();
assertThat(output).hasSize(taskSize).containsOnly(entry(0, "finished"), entry(1, "finished"),
entry(2, "finished"));
} catch (Exception ex) {
ex.printStackTrace();
}
}
protected class BrokenWorker implements Runnable {
private CountDownLatch latch;
private int id;
private Map<Integer, String> output;
public BrokenWorker(CountDownLatch latch, int id, Map<Integer, String> output) {
this.latch = latch;
this.id = id;
this.output = output;
}
@Override
public void run() {
try {
if (id == 1) {
throw new RuntimeException("BrokenWorker " + id + " throw exception");
}
SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd hh:mm:ss");
System.out.println(
sdf.format(new Date()) + " T[" + Thread.currentThread().getId() + "] worker: " + id + " ready");
TimeUnit.SECONDS.sleep(1);
System.out.println(sdf.format(new Date()) + " T[" + Thread.currentThread().getId() + "] worker: " + id
+ " finished");
output.put(id, "finished");
latch.countDown();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
@Test
public void awaitTimeout() {
try {
int taskSize = 3;
CountDownLatch latch = new CountDownLatch(taskSize);
Map<Integer, String> output = Collections.synchronizedMap(new LinkedHashMap<Integer, String>());
AtomicInteger ids = new AtomicInteger();
List<Thread> workers = Stream
.generate(() -> new Thread(new BrokenWorker(latch, ids.getAndIncrement(), output)))
.limit(taskSize).collect(Collectors.toList());
workers.forEach(e -> e.start());
boolean completed = latch.await(5, TimeUnit.SECONDS);
assertThat(completed).isFalse();
assertThat(output).hasSize(2).containsOnly(entry(0, "finished"), entry(2, "finished"));
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
心得分享
CountDownLatch Example 可以使執行緒阻塞,直到其他執行緒完成指定任務,也可以在並行程式設計中使用 Java CountDownLatch 阻塞調用執行緒,直到倒數計時為零,如果正在做一些並行處理,可以想要處理的執行緒數相同的計數器值來實例化, CountDownLatch in Java 每個執行緒完成後調用 countDown ,保證調用 await 的依賴執行緒將阻塞,直到工作執行緒完成。