Table of Contents
ToggleJava Semaphore Class
想要訪問共享資源的執行緒,通常可以使用信號量,嘗試取得許可,如果信號量的計數大於零,則執行緒取得許可,信號量的計數遞減,否則,執行緒將被阻塞,直到取得許可,當執行緒不再需要訪問共享資源時,會釋放許可,信號量的計數增加,如果有另一個執行緒在等待許可,那麼該執行緒將在此時取得許可, Semaphore Example Java 提供限流器功能,避免系統短時間處理大量執行緒的任務,造成壅塞卡住服務,本篇增加了範例,並透過單元測試來驗證產出結果。
檔案目錄
./
+- src
+- test
| +- org
| +- ruoxue
| +- java_147
| +- synchronization
| +- semaphore
| +- SemaphoreClassTest.java
單元測試
Semaphore Java Example 提供取得許可、釋放許可等操作。
lock
Semaphore Java Example 建立 3 個執行緒,設定 1 個許可,對計數器加 1 ,等待 1 秒後結束任務,等同於 ReentrantLock 加鎖解鎖功能。
protected class Worker implements Runnable {
private Semaphore semaphore = new Semaphore(1);
private int count;
public Worker() {
}
@Override
public void run() {
try {
System.out.println(String.format("T[%d] waiting", Thread.currentThread().getId()));
semaphore.acquire();
try {
System.out.println(String.format("T[%d] acquire", Thread.currentThread().getId()));
TimeUnit.SECONDS.sleep(1);
count++;
System.out.println(String.format("T[%d] count: %d", Thread.currentThread().getId(), count));
} catch (InterruptedException ex) {
ex.printStackTrace();
} finally {
semaphore.release();
System.out.println(String.format("T[%d] release", Thread.currentThread().getId()));
}
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
public int getCount() {
return count;
}
}
@Test
public void lock() {
int expected = 3;
int taskSize = 3;
Worker worker = new Worker();
List<Thread> threads = Stream.generate(() -> new Thread(worker)).limit(taskSize).collect(Collectors.toList());
threads.forEach(e -> e.start());
threads.forEach(e -> {
try {
e.join();
} catch (InterruptedException ex) {
ex.printStackTrace();
}
});
int count = worker.getCount();
System.out.println(count);
assertEquals(expected, count);
}
T[13] waiting
T[12] waiting
T[11] waiting
T[13] acquire
T[13] count: 1
T[13] release
T[12] acquire
T[12] count: 2
T[12] release
T[11] acquire
T[11] count: 3
T[11] release
3
tryAcquire
Semaphore Java Example 建立 3 個執行緒,取得等候許可對計數器加 1 ,等待 1 秒後結束任務, Semaphore Example Java 等候逾時,無執行任務。
protected class TryAcquireWorker implements Runnable {
private Semaphore semaphore = new Semaphore(2);
private AtomicInteger count = new AtomicInteger();
public TryAcquireWorker() {
}
@Override
public void run() {
try {
System.out.println(String.format("T[%d] waiting", Thread.currentThread().getId()));
boolean isLockAcquired = semaphore.tryAcquire(100, TimeUnit.MILLISECONDS);
System.out.println(
String.format("T[%d] isLockAcquired: %b", Thread.currentThread().getId(), isLockAcquired));
if (isLockAcquired) {
try {
System.out.println(String.format("T[%d] acquire", Thread.currentThread().getId()));
TimeUnit.SECONDS.sleep(1);
count.incrementAndGet();
System.out
.println(String.format("T[%d] count: %d", Thread.currentThread().getId(), count.get()));
} catch (InterruptedException ex) {
ex.printStackTrace();
} finally {
semaphore.release();
System.out.println(String.format("T[%d] release", Thread.currentThread().getId()));
}
}
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
public int getCount() {
return count.get();
}
}
@Test
public void tryAcquire() {
int expected = 2;
int taskSize = 3;
TryAcquireWorker worker = new TryAcquireWorker();
List<Thread> threads = Stream.generate(() -> new Thread(worker)).limit(taskSize).collect(Collectors.toList());
threads.forEach(e -> e.start());
threads.forEach(e -> {
try {
e.join();
} catch (InterruptedException ex) {
ex.printStackTrace();
}
});
int count = worker.getCount();
System.out.println(count);
assertEquals(expected, count);
}
T[11] waiting
T[12] waiting
T[13] waiting
T[11] isLockAcquired: true
T[13] isLockAcquired: true
T[11] acquire
T[13] acquire
T[12] isLockAcquired: false
T[11] count: 2
T[13] count: 2
T[11] release
T[13] release
2
availablePermits
Semaphore in Java 建立 3 個執行緒,每個執行緒取得 2 個許可,設定 5 個許可,對計數器加 1 , Semaphore Example Java 等待 1 秒後結束任務。
protected class AvailablePermitsWorker implements Runnable {
private Semaphore semaphore = new Semaphore(5);
private AtomicInteger count = new AtomicInteger();
public AvailablePermitsWorker() {
}
@Override
public void run() {
try {
System.out.println(String.format("T[%d] waiting", Thread.currentThread().getId()));
semaphore.acquire(2);
System.out.println(String.format("T[%d] availablePermits: %d", Thread.currentThread().getId(),
semaphore.availablePermits()));
try {
System.out.println(String.format("T[%d] acquire", Thread.currentThread().getId()));
TimeUnit.SECONDS.sleep(1);
count.incrementAndGet();
System.out.println(String.format("T[%d] count: %d", Thread.currentThread().getId(), count.get()));
} catch (InterruptedException ex) {
ex.printStackTrace();
} finally {
semaphore.release();
System.out.println(String.format("T[%d] release", Thread.currentThread().getId()));
}
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
public int getCount() {
return count.get();
}
}
@Test
public void availablePermits() {
int expected = 3;
int taskSize = 3;
AvailablePermitsWorker worker = new AvailablePermitsWorker();
List<Thread> threads = Stream.generate(() -> new Thread(worker)).limit(taskSize).collect(Collectors.toList());
threads.forEach(e -> e.start());
threads.forEach(e -> {
try {
e.join();
} catch (InterruptedException ex) {
ex.printStackTrace();
}
});
int count = worker.getCount();
System.out.println(count);
assertEquals(expected, count);
}
T[11] waiting
T[13] waiting
T[11] availablePermits: 3
T[12] waiting
T[13] availablePermits: 1
T[11] acquire
T[13] acquire
T[13] count: 2
T[11] count: 2
T[13] release
T[11] release
T[12] availablePermits: 1
T[12] acquire
T[12] count: 3
T[12] release
3
SemaphoreClassTest.java
Semaphore in Java 新增單元測試,驗證 Semaphore Java Examples 是否符合預期。
package org.ruoxue.java_147.synchronization.semaphore;
import static org.junit.Assert.assertEquals;
import java.util.List;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.junit.Test;
public class SemaphoreClassTest {
protected class Worker implements Runnable {
private Semaphore semaphore = new Semaphore(1);
private int count;
public Worker() {
}
@Override
public void run() {
try {
System.out.println(String.format("T[%d] waiting", Thread.currentThread().getId()));
semaphore.acquire();
try {
System.out.println(String.format("T[%d] acquire", Thread.currentThread().getId()));
TimeUnit.SECONDS.sleep(1);
count++;
System.out.println(String.format("T[%d] count: %d", Thread.currentThread().getId(), count));
} catch (InterruptedException ex) {
ex.printStackTrace();
} finally {
semaphore.release();
System.out.println(String.format("T[%d] release", Thread.currentThread().getId()));
}
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
public int getCount() {
return count;
}
}
@Test
public void lock() {
int expected = 3;
int taskSize = 3;
Worker worker = new Worker();
List<Thread> threads = Stream.generate(() -> new Thread(worker)).limit(taskSize).collect(Collectors.toList());
threads.forEach(e -> e.start());
threads.forEach(e -> {
try {
e.join();
} catch (InterruptedException ex) {
ex.printStackTrace();
}
});
int count = worker.getCount();
System.out.println(count);
assertEquals(expected, count);
}
protected class TryAcquireWorker implements Runnable {
private Semaphore semaphore = new Semaphore(2);
private AtomicInteger count = new AtomicInteger();
public TryAcquireWorker() {
}
@Override
public void run() {
try {
System.out.println(String.format("T[%d] waiting", Thread.currentThread().getId()));
boolean isLockAcquired = semaphore.tryAcquire(100, TimeUnit.MILLISECONDS);
System.out.println(
String.format("T[%d] isLockAcquired: %b", Thread.currentThread().getId(), isLockAcquired));
if (isLockAcquired) {
try {
System.out.println(String.format("T[%d] acquire", Thread.currentThread().getId()));
TimeUnit.SECONDS.sleep(1);
count.incrementAndGet();
System.out
.println(String.format("T[%d] count: %d", Thread.currentThread().getId(), count.get()));
} catch (InterruptedException ex) {
ex.printStackTrace();
} finally {
semaphore.release();
System.out.println(String.format("T[%d] release", Thread.currentThread().getId()));
}
}
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
public int getCount() {
return count.get();
}
}
@Test
public void tryAcquire() {
int expected = 2;
int taskSize = 3;
TryAcquireWorker worker = new TryAcquireWorker();
List<Thread> threads = Stream.generate(() -> new Thread(worker)).limit(taskSize).collect(Collectors.toList());
threads.forEach(e -> e.start());
threads.forEach(e -> {
try {
e.join();
} catch (InterruptedException ex) {
ex.printStackTrace();
}
});
int count = worker.getCount();
System.out.println(count);
assertEquals(expected, count);
}
protected class AvailablePermitsWorker implements Runnable {
private Semaphore semaphore = new Semaphore(5);
private AtomicInteger count = new AtomicInteger();
public AvailablePermitsWorker() {
}
@Override
public void run() {
try {
System.out.println(String.format("T[%d] waiting", Thread.currentThread().getId()));
semaphore.acquire(2);
System.out.println(String.format("T[%d] availablePermits: %d", Thread.currentThread().getId(),
semaphore.availablePermits()));
try {
System.out.println(String.format("T[%d] acquire", Thread.currentThread().getId()));
TimeUnit.SECONDS.sleep(1);
count.incrementAndGet();
System.out.println(String.format("T[%d] count: %d", Thread.currentThread().getId(), count.get()));
} catch (InterruptedException ex) {
ex.printStackTrace();
} finally {
semaphore.release();
System.out.println(String.format("T[%d] release", Thread.currentThread().getId()));
}
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
public int getCount() {
return count.get();
}
}
@Test
public void availablePermits() {
int expected = 3;
int taskSize = 3;
AvailablePermitsWorker worker = new AvailablePermitsWorker();
List<Thread> threads = Stream.generate(() -> new Thread(worker)).limit(taskSize).collect(Collectors.toList());
threads.forEach(e -> e.start());
threads.forEach(e -> {
try {
e.join();
} catch (InterruptedException ex) {
ex.printStackTrace();
}
});
int count = worker.getCount();
System.out.println(count);
assertEquals(expected, count);
}
}
心得分享
Semaphore Java Examples 信號量通過使用計數器來控制對共享資源的訪問,如果計數器大於零,則允許訪問,如果為零,則拒絕訪問,計數器計數的是允許訪問共享資源的許可,因此,要訪問資源,執行緒必須取得信號量的許可, Semaphore in Java 提供了幾種 Semaphore 常見方法的操作範例。