一、概述
1、作用?
限制并发量,用来控制同时访问特定资源的线程数量。
2、使用场景?
适用于限制使用共享资源线程数量的场景。
3、常用类方法?
- availablePermits():返回此信号量中当前可用的许可证数。
- getQueueLength():返回正在等待获取许可证的线程数。
- hashQueuedThreads():是否有线程正在等待获取许可证。
4、案例
public class SemaphoreTest {
public static void main(String[] args) {
ExecutorService threadPool = Executors.newCachedThreadPool();
Semaphore semaphore = new Semaphore(5);//定义5个许可证,也就是说服务器只允许5个人在里面玩
for(int i = 1 ; i <= 20 ; i++) {
final int index = i;
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
semaphore.acquire();//获取一个许可证
play();
semaphore.release();//执行完成后释放这个许可证
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
threadPool.shutdown();
}
public static void play(){
try {
System.out.println(new Date() + " " + Thread.currentThread().getName() + ":获得服务器进入资格");
Thread.sleep(2000);
System.out.println(new Date() + " " + Thread.currentThread().getName() + ":退出服务器");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
二、原理
Semaphore用于做并发控制,通过设置初始permit(即AQS的state变量)许可量,每获取一个permit,就减1,直到为0时开始限制并发;线程在AQS队列中阻塞等待。
- 使用AQS做为模板类,内部集成一个AQS的子类Sync,采用state变量表示permit信号许可量;
- 使用CAS用来控制多线程并发操作;
- 使用AQS提供的共享锁机制实现线程的排队,所以其本质上是一个共享锁;
- sync也分为:公平的FairSync和非公平的NonfairSync,两者的区别主要体现在抢锁上,公平的要看是不是有人在排队,非公平的直接就去CAS获取锁。
线程通过acquire()操作获取permit:
- 当信号量中有可用的许可时,线程能获取该许可;否则线程必须等待,直到有可用的许可为止。
线程通过release()来释放它所持有的信号量许可。
acquire()和release()操作均由Sync类完成。
三、源码解析
1、acquire()获取锁流程
acquire()
方法中调用AQS的实现方法acquireSharedInterruptibly()
,我们可以看到模板方法acquireSharedInterruptibly()
,会响应线程中断式的获取锁。
public void acquire() throws InterruptedException {
// 调用Sync的acquireSharedInterruptibly()方法
sync.acquireSharedInterruptibly(1);
}
// AQS类的模板方法
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
// 响应线程的中断
if (Thread.interrupted())
throw new InterruptedException();
// 调用子类tryAcquireShared()方法的实现,让子类实现自己获取信号量的机制。
// tryAcquireShared()方法返回-1表示获取锁失败,阻塞当前线程节点
if (tryAcquireShared(arg) < 0)
// 线程获取锁失败,加入到AQS的同步队列中阻塞等待。
doAcquireSharedInterruptibly(arg);
}
公平与非公平主要体现在抢锁上,公平的要看是不是有人在排队,非公平的直接就去CAS获取锁。
// Sync类中
// 非公平锁
static final class NonfairSync extends Sync {
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
// Sync中的非公平锁获取逻辑
abstract static class Sync extends AbstractQueuedSynchronizer {
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
// 给公平锁直接CAS抢即可,直到可用的资源数小于0
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
// 公平锁
static final class FairSync extends Sync {
protected int tryAcquireShared(int acquires) {
for (;;) {
// 如果有线程在AQS队列中排队,则返回-1,由AQS完成阻塞操作
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
// 此时有多余的信号量时,进行CAS操作;如果失败,返回剩下的资源数
// 如果CAS成功,则返回的资源数就是当前值:为0 或者 大于0
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
2、release()释放锁流程
release()
方法中调用AQS的实现方法releaseShared()
,我们可以看到模板方法releaseShared()
,由子类完成释放后,将会调用doReleaseShared()
方法唤醒后面等待的线程。
public void release()
sync.releaseShared(1);
}
// 调用AQS类的模板方法
public final boolean releaseShared(int arg) {
// 子类自己实现自己的释放锁逻辑
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
// Sync类中
protected final boolean tryReleaseShared(int releases) {
for (;;) {
// 纯CAS操作
int current = getState();
int next = current + releases;
if (next < current)
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
3、减少许可量
这里表示permit许可的个数时可以动态调整的。
// 减少几个许可量
final void reducePermits(int reductions) {
for (;;) {
int current = getState();
int next = current - reductions;
// 整型溢出
if (next > current)
throw new Error("Permit count underflow");
if (compareAndSetState(current, next))
return;
}
}
四、总结
1)Semaphore,也叫信号量,通常用于控制多线程场景下对共享资源的访问,即限制并发量;
2)Semaphore的内部实现是基于AQS的共享锁来实现的;
3)Semaphore初始化的时候需要指定许可的次数,许可的次数是存储在AQS的state变量中;
4)可以动态的调整permit许可量。