Semaphore in Java with Examples - Java 147

Semaphore in Java with Examples – Java 147

Semaphore in Java with Examples

信號量通過使用計數器來控制對共享資源的訪問,也就是所謂的限流器,如果計數器大於零,則允許訪問,如果為零,則拒絕訪問,計數器計數的是允許訪問共享資源的許可,因此,要訪問資源,執行緒必須取得信號量的許可, Semaphore Java 本篇增加了範例,並透過單元測試來驗證產出結果。

檔案目錄

./
   +- src
       +- test
       |   +- org
       |       +- ruoxue
       |           +- java_147
       |               +- synchronization
       |                   +- semaphore
       |                       +- SemaphoreClassTest.java   

單元測試

Java Semaphore 提供取得許可、釋放許可等操作。

acquire

Java Semaphore 建立 5 個執行緒,設定 2 個許可,對計數器加 1 , Java Semaphore Examples 等待 1 秒後結束任務。

	protected class AcquireWorker implements Runnable {

		private Semaphore semaphore = new Semaphore(2);
		private AtomicInteger count = new AtomicInteger();

		public AcquireWorker() {
		}

		@Override
		public void run() {
			try {
				System.out.println(String.format("T[%d] waiting", Thread.currentThread().getId()));
				semaphore.acquire();
				try {
					System.out.println(String.format("T[%d] acquire", Thread.currentThread().getId()));
					TimeUnit.SECONDS.sleep(1);
					count.incrementAndGet();
					System.out.println(String.format("T[%d] count: %d", Thread.currentThread().getId(), count.get()));
				} catch (InterruptedException ex) {
					ex.printStackTrace();
				} finally {
					semaphore.release();
					System.out.println(String.format("T[%d] release", Thread.currentThread().getId()));
				}
			} catch (InterruptedException ex) {
				ex.printStackTrace();
			}
		}

		public int getCount() {
			return count.get();
		}
	}

	@Test
	public void acquire() {
		int expected = 5;
		int taskSize = 5;
		AcquireWorker worker = new AcquireWorker();
		List<Thread> threads = Stream.generate(() -> new Thread(worker)).limit(taskSize).collect(Collectors.toList());
		threads.forEach(e -> e.start());

		threads.forEach(e -> {
			try {
				e.join();
			} catch (InterruptedException ex) {
				ex.printStackTrace();
			}
		});
		int count = worker.getCount();
		System.out.println(count);
		assertEquals(expected, count);
	}
T[11] waiting
T[12] waiting
T[13] waiting
T[12] acquire
T[14] waiting
T[15] waiting
T[11] acquire
T[12] count: 1
T[11] count: 2
T[12] release
T[13] acquire
T[11] release
T[14] acquire
T[14] count: 4
T[14] release
T[15] acquire
T[13] count: 4
T[13] release
T[15] count: 5
T[15] release
5

acquireInterruptibly

Java Semaphore 建立 3 個執行緒,設定 2 個許可,對計數器加 1 , Java Semaphore Examples 等待 1 秒後結束任務,中斷第 2 條執行緒,會拋出例外。

	@Test
	public void acquireInterruptibly() {
		int expected = 2;
		int taskSize = 3;
		AcquireWorker worker = new AcquireWorker();
		List<Thread> threads = Stream.generate(() -> new Thread(worker)).limit(taskSize).collect(Collectors.toList());
		threads.forEach(e -> e.start());

		Thread thread = threads.get(1);
		thread.interrupt();

		threads.forEach(e -> {
			try {
				e.join();
			} catch (InterruptedException ex) {
				ex.printStackTrace();
			}
		});
		int count = worker.getCount();
		System.out.println(count);
		assertEquals(expected, count);
	}
T[12] waiting
T[13] waiting
T[11] waiting
java.lang.InterruptedException
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1302)
	at java.util.concurrent.Semaphore.acquire(Semaphore.java:312)
	at org.ruoxue.java_147.synchronization.SemaphoreWithExamplesTest$AcquireWorker.run(SemaphoreWithExamplesTest.java:28)
	at java.lang.Thread.run(Thread.java:750)
T[13] acquire
T[11] acquire
T[13] count: 2
T[11] count: 2
T[13] release
T[11] release
2

acquireUninterruptibly

Java Semaphore 建立 3 個執行緒,設定 2 個許可,對計數器加 1 ,等待 1 秒後結束任務,中斷第 2 條執行緒,會發生不可中斷的狀況,任務會繼續執行。

	protected class AcquireUninterruptiblyWorker implements Runnable {

		private Semaphore semaphore = new Semaphore(2);
		private AtomicInteger count = new AtomicInteger();

		public AcquireUninterruptiblyWorker() {
		}

		@Override
		public void run() {
			System.out.println(String.format("T[%d] waiting", Thread.currentThread().getId()));
			semaphore.acquireUninterruptibly();
			try {
				System.out.println(String.format("T[%d] acquire", Thread.currentThread().getId()));
				count.incrementAndGet();
				System.out.println(String.format("T[%d] count: %d", Thread.currentThread().getId(), count.get()));
			} catch (Exception ex) {
				ex.printStackTrace();
			} finally {
				semaphore.release();
				System.out.println(String.format("T[%d] release", Thread.currentThread().getId()));
			}
		}

		public int getCount() {
			return count.get();
		}
	}

	@Test
	public void acquireUninterruptibly() {
		int expected = 3;
		int taskSize = 3;
		AcquireUninterruptiblyWorker worker = new AcquireUninterruptiblyWorker();
		List<Thread> threads = Stream.generate(() -> new Thread(worker)).limit(taskSize).collect(Collectors.toList());
		threads.forEach(e -> e.start());

		Thread thread = threads.get(1);
		thread.interrupt();

		threads.forEach(e -> {
			try {
				e.join();
			} catch (InterruptedException ex) {
				ex.printStackTrace();
			}
		});
		int count = worker.getCount();
		System.out.println(count);
		assertEquals(expected, count);
	}
T[11] waiting
T[12] waiting
T[13] waiting
T[12] acquire
T[11] acquire
T[12] count: 1
T[11] count: 2
T[11] release
T[12] release
T[13] acquire
T[13] count: 3
T[13] release
3

SemaphoreWithExamplesTest.java

Semaphore Example in Java 新增單元測試,驗證 Semaphore Java 是否符合預期。

package org.ruoxue.java_147.synchronization.semaphore;

import static org.junit.Assert.assertEquals;

import java.util.List;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.junit.Test;

public class SemaphoreWithExamplesTest {

	protected class AcquireWorker implements Runnable {

		private Semaphore semaphore = new Semaphore(2);
		private AtomicInteger count = new AtomicInteger();

		public AcquireWorker() {
		}

		@Override
		public void run() {
			try {
				System.out.println(String.format("T[%d] waiting", Thread.currentThread().getId()));
				semaphore.acquire();
				try {
					System.out.println(String.format("T[%d] acquire", Thread.currentThread().getId()));
					TimeUnit.SECONDS.sleep(1);
					count.incrementAndGet();
					System.out.println(String.format("T[%d] count: %d", Thread.currentThread().getId(), count.get()));
				} catch (InterruptedException ex) {
					ex.printStackTrace();
				} finally {
					semaphore.release();
					System.out.println(String.format("T[%d] release", Thread.currentThread().getId()));
				}
			} catch (InterruptedException ex) {
				ex.printStackTrace();
			}
		}

		public int getCount() {
			return count.get();
		}
	}

	@Test
	public void acquire() {
		int expected = 5;
		int taskSize = 5;
		AcquireWorker worker = new AcquireWorker();
		List<Thread> threads = Stream.generate(() -> new Thread(worker)).limit(taskSize).collect(Collectors.toList());
		threads.forEach(e -> e.start());

		threads.forEach(e -> {
			try {
				e.join();
			} catch (InterruptedException ex) {
				ex.printStackTrace();
			}
		});
		int count = worker.getCount();
		System.out.println(count);
		assertEquals(expected, count);
	}

	@Test
	public void acquireInterruptibly() {
		int expected = 2;
		int taskSize = 3;
		AcquireWorker worker = new AcquireWorker();
		List<Thread> threads = Stream.generate(() -> new Thread(worker)).limit(taskSize).collect(Collectors.toList());
		threads.forEach(e -> e.start());

		Thread thread = threads.get(1);
		thread.interrupt();

		threads.forEach(e -> {
			try {
				e.join();
			} catch (InterruptedException ex) {
				ex.printStackTrace();
			}
		});
		int count = worker.getCount();
		System.out.println(count);
		assertEquals(expected, count);
	}

	protected class AcquireUninterruptiblyWorker implements Runnable {

		private Semaphore semaphore = new Semaphore(2);
		private AtomicInteger count = new AtomicInteger();

		public AcquireUninterruptiblyWorker() {
		}

		@Override
		public void run() {
			System.out.println(String.format("T[%d] waiting", Thread.currentThread().getId()));
			semaphore.acquireUninterruptibly();
			try {
				System.out.println(String.format("T[%d] acquire", Thread.currentThread().getId()));
				count.incrementAndGet();
				System.out.println(String.format("T[%d] count: %d", Thread.currentThread().getId(), count.get()));
			} catch (Exception ex) {
				ex.printStackTrace();
			} finally {
				semaphore.release();
				System.out.println(String.format("T[%d] release", Thread.currentThread().getId()));
			}
		}

		public int getCount() {
			return count.get();
		}
	}

	@Test
	public void acquireUninterruptibly() {
		int expected = 3;
		int taskSize = 3;
		AcquireUninterruptiblyWorker worker = new AcquireUninterruptiblyWorker();
		List<Thread> threads = Stream.generate(() -> new Thread(worker)).limit(taskSize).collect(Collectors.toList());
		threads.forEach(e -> e.start());

		Thread thread = threads.get(1);
		thread.interrupt();

		threads.forEach(e -> {
			try {
				e.join();
			} catch (InterruptedException ex) {
				ex.printStackTrace();
			}
		});
		int count = worker.getCount();
		System.out.println(count);
		assertEquals(expected, count);
	}
}

心得分享

Java Semaphore Examples 通常使用信號量,想要訪問共享資源的執行緒會嘗試取得許可,如果信號量的計數大於零,則執行緒取得許可,信號量的計數遞減,否則,執行緒將被阻塞,直到取得許可,當執行緒不再需要訪問共享資源時,會釋放許可,信號量的計數增加,如果有另一個執行緒在等待許可,那麼該執行緒將在此時取得許可, Semaphore Example in Java 提供限流器功能,避免系統短時間處理大量執行緒的任務,造成壅塞卡住服務。

發佈留言