Java Semaphore Class - Java 147

Java Semaphore Class – Java 147

Java Semaphore Class

想要訪問共享資源的執行緒,通常可以使用信號量,嘗試取得許可,如果信號量的計數大於零,則執行緒取得許可,信號量的計數遞減,否則,執行緒將被阻塞,直到取得許可,當執行緒不再需要訪問共享資源時,會釋放許可,信號量的計數增加,如果有另一個執行緒在等待許可,那麼該執行緒將在此時取得許可, Semaphore Example Java 提供限流器功能,避免系統短時間處理大量執行緒的任務,造成壅塞卡住服務,本篇增加了範例,並透過單元測試來驗證產出結果。

檔案目錄

./
   +- src
       +- test
       |   +- org
       |       +- ruoxue
       |           +- java_147
       |               +- synchronization
       |                   +- semaphore
       |                       +- SemaphoreClassTest.java   

單元測試

Semaphore Java Example 提供取得許可、釋放許可等操作。

lock

Semaphore Java Example 建立 3 個執行緒,設定 1 個許可,對計數器加 1 ,等待 1 秒後結束任務,等同於 ReentrantLock 加鎖解鎖功能。

	protected class Worker implements Runnable {

		private Semaphore semaphore = new Semaphore(1);
		private int count;

		public Worker() {
		}

		@Override
		public void run() {
			try {
				System.out.println(String.format("T[%d] waiting", Thread.currentThread().getId()));
				semaphore.acquire();
				try {
					System.out.println(String.format("T[%d] acquire", Thread.currentThread().getId()));
					TimeUnit.SECONDS.sleep(1);
					count++;
					System.out.println(String.format("T[%d] count: %d", Thread.currentThread().getId(), count));
				} catch (InterruptedException ex) {
					ex.printStackTrace();
				} finally {
					semaphore.release();
					System.out.println(String.format("T[%d] release", Thread.currentThread().getId()));
				}
			} catch (InterruptedException ex) {
				ex.printStackTrace();
			}
		}

		public int getCount() {
			return count;
		}
	}

	@Test
	public void lock() {
		int expected = 3;
		int taskSize = 3;
		Worker worker = new Worker();
		List<Thread> threads = Stream.generate(() -> new Thread(worker)).limit(taskSize).collect(Collectors.toList());
		threads.forEach(e -> e.start());

		threads.forEach(e -> {
			try {
				e.join();
			} catch (InterruptedException ex) {
				ex.printStackTrace();
			}
		});
		int count = worker.getCount();
		System.out.println(count);
		assertEquals(expected, count);
	}
T[13] waiting
T[12] waiting
T[11] waiting
T[13] acquire
T[13] count: 1
T[13] release
T[12] acquire
T[12] count: 2
T[12] release
T[11] acquire
T[11] count: 3
T[11] release
3

tryAcquire

Semaphore Java Example 建立 3 個執行緒,取得等候許可對計數器加 1 ,等待 1 秒後結束任務, Semaphore Example Java 等候逾時,無執行任務。

	protected class TryAcquireWorker implements Runnable {

		private Semaphore semaphore = new Semaphore(2);
		private AtomicInteger count = new AtomicInteger();

		public TryAcquireWorker() {
		}

		@Override
		public void run() {
			try {
				System.out.println(String.format("T[%d] waiting", Thread.currentThread().getId()));
				boolean isLockAcquired = semaphore.tryAcquire(100, TimeUnit.MILLISECONDS);
				System.out.println(
						String.format("T[%d] isLockAcquired: %b", Thread.currentThread().getId(), isLockAcquired));
				if (isLockAcquired) {
					try {
						System.out.println(String.format("T[%d] acquire", Thread.currentThread().getId()));
						TimeUnit.SECONDS.sleep(1);
						count.incrementAndGet();
						System.out
								.println(String.format("T[%d] count: %d", Thread.currentThread().getId(), count.get()));
					} catch (InterruptedException ex) {
						ex.printStackTrace();
					} finally {
						semaphore.release();
						System.out.println(String.format("T[%d] release", Thread.currentThread().getId()));
					}
				}
			} catch (InterruptedException ex) {
				ex.printStackTrace();
			}
		}

		public int getCount() {
			return count.get();
		}
	}

	@Test
	public void tryAcquire() {
		int expected = 2;
		int taskSize = 3;
		TryAcquireWorker worker = new TryAcquireWorker();
		List<Thread> threads = Stream.generate(() -> new Thread(worker)).limit(taskSize).collect(Collectors.toList());
		threads.forEach(e -> e.start());

		threads.forEach(e -> {
			try {
				e.join();
			} catch (InterruptedException ex) {
				ex.printStackTrace();
			}
		});
		int count = worker.getCount();
		System.out.println(count);
		assertEquals(expected, count);
	}
T[11] waiting
T[12] waiting
T[13] waiting
T[11] isLockAcquired: true
T[13] isLockAcquired: true
T[11] acquire
T[13] acquire
T[12] isLockAcquired: false
T[11] count: 2
T[13] count: 2
T[11] release
T[13] release
2

availablePermits

Semaphore in Java 建立 3 個執行緒,每個執行緒取得 2 個許可,設定 5 個許可,對計數器加 1 , Semaphore Example Java  等待 1 秒後結束任務。

	protected class AvailablePermitsWorker implements Runnable {

		private Semaphore semaphore = new Semaphore(5);
		private AtomicInteger count = new AtomicInteger();

		public AvailablePermitsWorker() {
		}

		@Override
		public void run() {
			try {
				System.out.println(String.format("T[%d] waiting", Thread.currentThread().getId()));
				semaphore.acquire(2);
				System.out.println(String.format("T[%d] availablePermits: %d", Thread.currentThread().getId(),
						semaphore.availablePermits()));
				try {
					System.out.println(String.format("T[%d] acquire", Thread.currentThread().getId()));
					TimeUnit.SECONDS.sleep(1);
					count.incrementAndGet();
					System.out.println(String.format("T[%d] count: %d", Thread.currentThread().getId(), count.get()));
				} catch (InterruptedException ex) {
					ex.printStackTrace();
				} finally {
					semaphore.release();
					System.out.println(String.format("T[%d] release", Thread.currentThread().getId()));
				}
			} catch (InterruptedException ex) {
				ex.printStackTrace();
			}
		}

		public int getCount() {
			return count.get();
		}
	}

	@Test
	public void availablePermits() {
		int expected = 3;
		int taskSize = 3;
		AvailablePermitsWorker worker = new AvailablePermitsWorker();
		List<Thread> threads = Stream.generate(() -> new Thread(worker)).limit(taskSize).collect(Collectors.toList());
		threads.forEach(e -> e.start());

		threads.forEach(e -> {
			try {
				e.join();
			} catch (InterruptedException ex) {
				ex.printStackTrace();
			}
		});
		int count = worker.getCount();
		System.out.println(count);
		assertEquals(expected, count);
	}
T[11] waiting
T[13] waiting
T[11] availablePermits: 3
T[12] waiting
T[13] availablePermits: 1
T[11] acquire
T[13] acquire
T[13] count: 2
T[11] count: 2
T[13] release
T[11] release
T[12] availablePermits: 1
T[12] acquire
T[12] count: 3
T[12] release
3

SemaphoreClassTest.java

Semaphore in Java 新增單元測試,驗證 Semaphore Java Examples 是否符合預期。

package org.ruoxue.java_147.synchronization.semaphore;

import static org.junit.Assert.assertEquals;

import java.util.List;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.junit.Test;

public class SemaphoreClassTest {

	protected class Worker implements Runnable {

		private Semaphore semaphore = new Semaphore(1);
		private int count;

		public Worker() {
		}

		@Override
		public void run() {
			try {
				System.out.println(String.format("T[%d] waiting", Thread.currentThread().getId()));
				semaphore.acquire();
				try {
					System.out.println(String.format("T[%d] acquire", Thread.currentThread().getId()));
					TimeUnit.SECONDS.sleep(1);
					count++;
					System.out.println(String.format("T[%d] count: %d", Thread.currentThread().getId(), count));
				} catch (InterruptedException ex) {
					ex.printStackTrace();
				} finally {
					semaphore.release();
					System.out.println(String.format("T[%d] release", Thread.currentThread().getId()));
				}
			} catch (InterruptedException ex) {
				ex.printStackTrace();
			}
		}

		public int getCount() {
			return count;
		}
	}

	@Test
	public void lock() {
		int expected = 3;
		int taskSize = 3;
		Worker worker = new Worker();
		List<Thread> threads = Stream.generate(() -> new Thread(worker)).limit(taskSize).collect(Collectors.toList());
		threads.forEach(e -> e.start());

		threads.forEach(e -> {
			try {
				e.join();
			} catch (InterruptedException ex) {
				ex.printStackTrace();
			}
		});
		int count = worker.getCount();
		System.out.println(count);
		assertEquals(expected, count);
	}

	protected class TryAcquireWorker implements Runnable {

		private Semaphore semaphore = new Semaphore(2);
		private AtomicInteger count = new AtomicInteger();

		public TryAcquireWorker() {
		}

		@Override
		public void run() {
			try {
				System.out.println(String.format("T[%d] waiting", Thread.currentThread().getId()));
				boolean isLockAcquired = semaphore.tryAcquire(100, TimeUnit.MILLISECONDS);
				System.out.println(
						String.format("T[%d] isLockAcquired: %b", Thread.currentThread().getId(), isLockAcquired));
				if (isLockAcquired) {
					try {
						System.out.println(String.format("T[%d] acquire", Thread.currentThread().getId()));
						TimeUnit.SECONDS.sleep(1);
						count.incrementAndGet();
						System.out
								.println(String.format("T[%d] count: %d", Thread.currentThread().getId(), count.get()));
					} catch (InterruptedException ex) {
						ex.printStackTrace();
					} finally {
						semaphore.release();
						System.out.println(String.format("T[%d] release", Thread.currentThread().getId()));
					}
				}
			} catch (InterruptedException ex) {
				ex.printStackTrace();
			}
		}

		public int getCount() {
			return count.get();
		}
	}

	@Test
	public void tryAcquire() {
		int expected = 2;
		int taskSize = 3;
		TryAcquireWorker worker = new TryAcquireWorker();
		List<Thread> threads = Stream.generate(() -> new Thread(worker)).limit(taskSize).collect(Collectors.toList());
		threads.forEach(e -> e.start());

		threads.forEach(e -> {
			try {
				e.join();
			} catch (InterruptedException ex) {
				ex.printStackTrace();
			}
		});
		int count = worker.getCount();
		System.out.println(count);
		assertEquals(expected, count);
	}

	protected class AvailablePermitsWorker implements Runnable {

		private Semaphore semaphore = new Semaphore(5);
		private AtomicInteger count = new AtomicInteger();

		public AvailablePermitsWorker() {
		}

		@Override
		public void run() {
			try {
				System.out.println(String.format("T[%d] waiting", Thread.currentThread().getId()));
				semaphore.acquire(2);
				System.out.println(String.format("T[%d] availablePermits: %d", Thread.currentThread().getId(),
						semaphore.availablePermits()));
				try {
					System.out.println(String.format("T[%d] acquire", Thread.currentThread().getId()));
					TimeUnit.SECONDS.sleep(1);
					count.incrementAndGet();
					System.out.println(String.format("T[%d] count: %d", Thread.currentThread().getId(), count.get()));
				} catch (InterruptedException ex) {
					ex.printStackTrace();
				} finally {
					semaphore.release();
					System.out.println(String.format("T[%d] release", Thread.currentThread().getId()));
				}
			} catch (InterruptedException ex) {
				ex.printStackTrace();
			}
		}

		public int getCount() {
			return count.get();
		}
	}

	@Test
	public void availablePermits() {
		int expected = 3;
		int taskSize = 3;
		AvailablePermitsWorker worker = new AvailablePermitsWorker();
		List<Thread> threads = Stream.generate(() -> new Thread(worker)).limit(taskSize).collect(Collectors.toList());
		threads.forEach(e -> e.start());

		threads.forEach(e -> {
			try {
				e.join();
			} catch (InterruptedException ex) {
				ex.printStackTrace();
			}
		});
		int count = worker.getCount();
		System.out.println(count);
		assertEquals(expected, count);
	}
}

心得分享

Semaphore Java Examples 信號量通過使用計數器來控制對共享資源的訪問,如果計數器大於零,則允許訪問,如果為零,則拒絕訪問,計數器計數的是允許訪問共享資源的許可,因此,要訪問資源,執行緒必須取得信號量的許可, Semaphore in Java 提供了幾種 Semaphore 常見方法的操作範例。

發佈留言