12、Semaphore原理/源码解析

一、概述

1、作用?

限制并发量,用来控制同时访问特定资源的线程数量。

2、使用场景?

适用于限制使用共享资源线程数量的场景。

3、常用类方法?

  • availablePermits():返回此信号量中当前可用的许可证数。
  • getQueueLength():返回正在等待获取许可证的线程数。
  • hashQueuedThreads():是否有线程正在等待获取许可证。

4、案例

public class SemaphoreTest {

    public static void main(String[] args) {

        ExecutorService threadPool = Executors.newCachedThreadPool();
        Semaphore semaphore = new Semaphore(5);//定义5个许可证,也就是说服务器只允许5个人在里面玩
        for(int i = 1 ; i <= 20 ; i++) {

            final int index = i;
            threadPool.execute(new Runnable() {

                @Override
                public void run() {

                    try {

                        semaphore.acquire();//获取一个许可证
                        play();
                        semaphore.release();//执行完成后释放这个许可证
                    } catch (InterruptedException e) {

                        e.printStackTrace();
                    }
​
                }
            });
        }
        threadPool.shutdown();
    }
​
    public static void play(){

​
        try {

            System.out.println(new Date() + " " + Thread.currentThread().getName() + ":获得服务器进入资格");
            Thread.sleep(2000);
            System.out.println(new Date() + " " + Thread.currentThread().getName() + ":退出服务器");
        } catch (InterruptedException e) {

            e.printStackTrace();
        }
    }
}

二、原理

Semaphore用于做并发控制,通过设置初始permit(即AQS的state变量)许可量,每获取一个permit,就减1,直到为0时开始限制并发;线程在AQS队列中阻塞等待。

  • 使用AQS做为模板类,内部集成一个AQS的子类Sync,采用state变量表示permit信号许可量;
  • 使用CAS用来控制多线程并发操作;
  • 使用AQS提供的共享锁机制实现线程的排队,所以其本质上是一个共享锁;
  • sync也分为:公平的FairSync和非公平的NonfairSync,两者的区别主要体现在抢锁上,公平的要看是不是有人在排队,非公平的直接就去CAS获取锁。

线程通过acquire()操作获取permit:

  • 当信号量中有可用的许可时,线程能获取该许可;否则线程必须等待,直到有可用的许可为止。

线程通过release()来释放它所持有的信号量许可。

acquire()和release()操作均由Sync类完成。

三、源码解析

1、acquire()获取锁流程

acquire()方法中调用AQS的实现方法acquireSharedInterruptibly(),我们可以看到模板方法acquireSharedInterruptibly(),会响应线程中断式的获取锁。

public void acquire() throws InterruptedException {

    // 调用Sync的acquireSharedInterruptibly()方法
    sync.acquireSharedInterruptibly(1);
}

// AQS类的模板方法
public final void acquireSharedInterruptibly(int arg)
    throws InterruptedException {

    // 响应线程的中断
    if (Thread.interrupted())
        throw new InterruptedException();

    // 调用子类tryAcquireShared()方法的实现,让子类实现自己获取信号量的机制。
    // tryAcquireShared()方法返回-1表示获取锁失败,阻塞当前线程节点
    if (tryAcquireShared(arg) < 0)
        // 线程获取锁失败,加入到AQS的同步队列中阻塞等待。
        doAcquireSharedInterruptibly(arg);
}

公平与非公平主要体现在抢锁上,公平的要看是不是有人在排队,非公平的直接就去CAS获取锁。

// Sync类中
// 非公平锁
static final class NonfairSync extends Sync {

    protected int tryAcquireShared(int acquires) {

        return nonfairTryAcquireShared(acquires);
    }
}

// Sync中的非公平锁获取逻辑
abstract static class Sync extends AbstractQueuedSynchronizer {

    final int nonfairTryAcquireShared(int acquires) {

        for (;;) {

            // 给公平锁直接CAS抢即可,直到可用的资源数小于0
            int available = getState();
            int remaining = available - acquires;
            if (remaining < 0 ||
                compareAndSetState(available, remaining))
                return remaining;
        }
    }
}

// 公平锁
static final class FairSync extends Sync {

    protected int tryAcquireShared(int acquires) {

        for (;;) {

            // 如果有线程在AQS队列中排队,则返回-1,由AQS完成阻塞操作
            if (hasQueuedPredecessors())
                return -1;
            int available = getState();
            int remaining = available - acquires;
            // 此时有多余的信号量时,进行CAS操作;如果失败,返回剩下的资源数
            // 如果CAS成功,则返回的资源数就是当前值:为0 或者 大于0
            if (remaining < 0 ||
                compareAndSetState(available, remaining))
                return remaining;
        }
    }
}

2、release()释放锁流程

release()方法中调用AQS的实现方法releaseShared(),我们可以看到模板方法releaseShared(),由子类完成释放后,将会调用doReleaseShared()方法唤醒后面等待的线程。

public void release() 
    sync.releaseShared(1);
}

// 调用AQS类的模板方法
public final boolean releaseShared(int arg) {

    // 子类自己实现自己的释放锁逻辑
    if (tryReleaseShared(arg)) {

        doReleaseShared();
        return true;
    }
    return false;
}

// Sync类中
protected final boolean tryReleaseShared(int releases) {

    for (;;) {

        // 纯CAS操作
        int current = getState();
        int next = current + releases;
        if (next < current)
            throw new Error("Maximum permit count exceeded");
        if (compareAndSetState(current, next))
            return true;
    }
}

3、减少许可量

这里表示permit许可的个数时可以动态调整的。

// 减少几个许可量
final void reducePermits(int reductions) {

    for (;;) {

        int current = getState();
        int next = current - reductions;
        // 整型溢出
        if (next > current)
            throw new Error("Permit count underflow");
        if (compareAndSetState(current, next))
            return;
    }
}

四、总结

1)Semaphore,也叫信号量,通常用于控制多线程场景下对共享资源的访问,即限制并发量;
2)Semaphore的内部实现是基于AQS的共享锁来实现的;
3)Semaphore初始化的时候需要指定许可的次数,许可的次数是存储在AQS的state变量中;
4)可以动态的调整permit许可量。