Table of Contents
ToggleSemaphore in Java with Examples
信號量通過使用計數器來控制對共享資源的訪問,也就是所謂的限流器,如果計數器大於零,則允許訪問,如果為零,則拒絕訪問,計數器計數的是允許訪問共享資源的許可,因此,要訪問資源,執行緒必須取得信號量的許可, Semaphore Java 本篇增加了範例,並透過單元測試來驗證產出結果。
檔案目錄
./
+- src
+- test
| +- org
| +- ruoxue
| +- java_147
| +- synchronization
| +- semaphore
| +- SemaphoreClassTest.java
單元測試
Java Semaphore 提供取得許可、釋放許可等操作。
acquire
Java Semaphore 建立 5 個執行緒,設定 2 個許可,對計數器加 1 , Java Semaphore Examples 等待 1 秒後結束任務。
protected class AcquireWorker implements Runnable {
private Semaphore semaphore = new Semaphore(2);
private AtomicInteger count = new AtomicInteger();
public AcquireWorker() {
}
@Override
public void run() {
try {
System.out.println(String.format("T[%d] waiting", Thread.currentThread().getId()));
semaphore.acquire();
try {
System.out.println(String.format("T[%d] acquire", Thread.currentThread().getId()));
TimeUnit.SECONDS.sleep(1);
count.incrementAndGet();
System.out.println(String.format("T[%d] count: %d", Thread.currentThread().getId(), count.get()));
} catch (InterruptedException ex) {
ex.printStackTrace();
} finally {
semaphore.release();
System.out.println(String.format("T[%d] release", Thread.currentThread().getId()));
}
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
public int getCount() {
return count.get();
}
}
@Test
public void acquire() {
int expected = 5;
int taskSize = 5;
AcquireWorker worker = new AcquireWorker();
List<Thread> threads = Stream.generate(() -> new Thread(worker)).limit(taskSize).collect(Collectors.toList());
threads.forEach(e -> e.start());
threads.forEach(e -> {
try {
e.join();
} catch (InterruptedException ex) {
ex.printStackTrace();
}
});
int count = worker.getCount();
System.out.println(count);
assertEquals(expected, count);
}
T[11] waiting
T[12] waiting
T[13] waiting
T[12] acquire
T[14] waiting
T[15] waiting
T[11] acquire
T[12] count: 1
T[11] count: 2
T[12] release
T[13] acquire
T[11] release
T[14] acquire
T[14] count: 4
T[14] release
T[15] acquire
T[13] count: 4
T[13] release
T[15] count: 5
T[15] release
5
acquireInterruptibly
Java Semaphore 建立 3 個執行緒,設定 2 個許可,對計數器加 1 , Java Semaphore Examples 等待 1 秒後結束任務,中斷第 2 條執行緒,會拋出例外。
@Test
public void acquireInterruptibly() {
int expected = 2;
int taskSize = 3;
AcquireWorker worker = new AcquireWorker();
List<Thread> threads = Stream.generate(() -> new Thread(worker)).limit(taskSize).collect(Collectors.toList());
threads.forEach(e -> e.start());
Thread thread = threads.get(1);
thread.interrupt();
threads.forEach(e -> {
try {
e.join();
} catch (InterruptedException ex) {
ex.printStackTrace();
}
});
int count = worker.getCount();
System.out.println(count);
assertEquals(expected, count);
}
T[12] waiting
T[13] waiting
T[11] waiting
java.lang.InterruptedException
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1302)
at java.util.concurrent.Semaphore.acquire(Semaphore.java:312)
at org.ruoxue.java_147.synchronization.SemaphoreWithExamplesTest$AcquireWorker.run(SemaphoreWithExamplesTest.java:28)
at java.lang.Thread.run(Thread.java:750)
T[13] acquire
T[11] acquire
T[13] count: 2
T[11] count: 2
T[13] release
T[11] release
2
acquireUninterruptibly
Java Semaphore 建立 3 個執行緒,設定 2 個許可,對計數器加 1 ,等待 1 秒後結束任務,中斷第 2 條執行緒,會發生不可中斷的狀況,任務會繼續執行。
protected class AcquireUninterruptiblyWorker implements Runnable {
private Semaphore semaphore = new Semaphore(2);
private AtomicInteger count = new AtomicInteger();
public AcquireUninterruptiblyWorker() {
}
@Override
public void run() {
System.out.println(String.format("T[%d] waiting", Thread.currentThread().getId()));
semaphore.acquireUninterruptibly();
try {
System.out.println(String.format("T[%d] acquire", Thread.currentThread().getId()));
count.incrementAndGet();
System.out.println(String.format("T[%d] count: %d", Thread.currentThread().getId(), count.get()));
} catch (Exception ex) {
ex.printStackTrace();
} finally {
semaphore.release();
System.out.println(String.format("T[%d] release", Thread.currentThread().getId()));
}
}
public int getCount() {
return count.get();
}
}
@Test
public void acquireUninterruptibly() {
int expected = 3;
int taskSize = 3;
AcquireUninterruptiblyWorker worker = new AcquireUninterruptiblyWorker();
List<Thread> threads = Stream.generate(() -> new Thread(worker)).limit(taskSize).collect(Collectors.toList());
threads.forEach(e -> e.start());
Thread thread = threads.get(1);
thread.interrupt();
threads.forEach(e -> {
try {
e.join();
} catch (InterruptedException ex) {
ex.printStackTrace();
}
});
int count = worker.getCount();
System.out.println(count);
assertEquals(expected, count);
}
T[11] waiting
T[12] waiting
T[13] waiting
T[12] acquire
T[11] acquire
T[12] count: 1
T[11] count: 2
T[11] release
T[12] release
T[13] acquire
T[13] count: 3
T[13] release
3
SemaphoreWithExamplesTest.java
Semaphore Example in Java 新增單元測試,驗證 Semaphore Java 是否符合預期。
package org.ruoxue.java_147.synchronization.semaphore;
import static org.junit.Assert.assertEquals;
import java.util.List;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.junit.Test;
public class SemaphoreWithExamplesTest {
protected class AcquireWorker implements Runnable {
private Semaphore semaphore = new Semaphore(2);
private AtomicInteger count = new AtomicInteger();
public AcquireWorker() {
}
@Override
public void run() {
try {
System.out.println(String.format("T[%d] waiting", Thread.currentThread().getId()));
semaphore.acquire();
try {
System.out.println(String.format("T[%d] acquire", Thread.currentThread().getId()));
TimeUnit.SECONDS.sleep(1);
count.incrementAndGet();
System.out.println(String.format("T[%d] count: %d", Thread.currentThread().getId(), count.get()));
} catch (InterruptedException ex) {
ex.printStackTrace();
} finally {
semaphore.release();
System.out.println(String.format("T[%d] release", Thread.currentThread().getId()));
}
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
public int getCount() {
return count.get();
}
}
@Test
public void acquire() {
int expected = 5;
int taskSize = 5;
AcquireWorker worker = new AcquireWorker();
List<Thread> threads = Stream.generate(() -> new Thread(worker)).limit(taskSize).collect(Collectors.toList());
threads.forEach(e -> e.start());
threads.forEach(e -> {
try {
e.join();
} catch (InterruptedException ex) {
ex.printStackTrace();
}
});
int count = worker.getCount();
System.out.println(count);
assertEquals(expected, count);
}
@Test
public void acquireInterruptibly() {
int expected = 2;
int taskSize = 3;
AcquireWorker worker = new AcquireWorker();
List<Thread> threads = Stream.generate(() -> new Thread(worker)).limit(taskSize).collect(Collectors.toList());
threads.forEach(e -> e.start());
Thread thread = threads.get(1);
thread.interrupt();
threads.forEach(e -> {
try {
e.join();
} catch (InterruptedException ex) {
ex.printStackTrace();
}
});
int count = worker.getCount();
System.out.println(count);
assertEquals(expected, count);
}
protected class AcquireUninterruptiblyWorker implements Runnable {
private Semaphore semaphore = new Semaphore(2);
private AtomicInteger count = new AtomicInteger();
public AcquireUninterruptiblyWorker() {
}
@Override
public void run() {
System.out.println(String.format("T[%d] waiting", Thread.currentThread().getId()));
semaphore.acquireUninterruptibly();
try {
System.out.println(String.format("T[%d] acquire", Thread.currentThread().getId()));
count.incrementAndGet();
System.out.println(String.format("T[%d] count: %d", Thread.currentThread().getId(), count.get()));
} catch (Exception ex) {
ex.printStackTrace();
} finally {
semaphore.release();
System.out.println(String.format("T[%d] release", Thread.currentThread().getId()));
}
}
public int getCount() {
return count.get();
}
}
@Test
public void acquireUninterruptibly() {
int expected = 3;
int taskSize = 3;
AcquireUninterruptiblyWorker worker = new AcquireUninterruptiblyWorker();
List<Thread> threads = Stream.generate(() -> new Thread(worker)).limit(taskSize).collect(Collectors.toList());
threads.forEach(e -> e.start());
Thread thread = threads.get(1);
thread.interrupt();
threads.forEach(e -> {
try {
e.join();
} catch (InterruptedException ex) {
ex.printStackTrace();
}
});
int count = worker.getCount();
System.out.println(count);
assertEquals(expected, count);
}
}
心得分享
Java Semaphore Examples 通常使用信號量,想要訪問共享資源的執行緒會嘗試取得許可,如果信號量的計數大於零,則執行緒取得許可,信號量的計數遞減,否則,執行緒將被阻塞,直到取得許可,當執行緒不再需要訪問共享資源時,會釋放許可,信號量的計數增加,如果有另一個執行緒在等待許可,那麼該執行緒將在此時取得許可, Semaphore Example in Java 提供限流器功能,避免系統短時間處理大量執行緒的任務,造成壅塞卡住服務。