20、JUC源码分析-队列-DelayQueue

画个JUC阻塞队列的类关系图,之前都没在意,画一下感觉会清楚很多

 

DelayQueue是无界的阻塞队列,其特点是实现队列元素的延迟出队,通俗点说就是队列元素可以设置延迟时间,时间不到,就待在队列中,很有意思的东西,感觉跟redis设置过期时间一样。队列元素不容许添加null元素。DelayQueue可以用来实现调度的定时任务或者缓存的过期。

添加的队列元素必须实现Delayed接口:

//实现Delayed接口的类也必须实现Comparable接口
public interface Delayed extends Comparable<Delayed> {

    /**
     * 获取剩余延迟时间,注意这里的传入的时间单位,getDelay返回的时间要做转换
     */
    long getDelay(TimeUnit unit);
}

实现类必须同时实现Delayed和Comparable接口。

看下内部结构和构造:

private transient final ReentrantLock lock = new ReentrantLock();
//内部使用PriorityQueue存储数据
private final PriorityQueue<E> q = new PriorityQueue<E>();

/**
 * Thread designated to wait for the element at the head of
 * the queue.  This variant of the Leader-Follower pattern
 * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
 * minimize unnecessary timed waiting.  When a thread becomes
 * the leader, it waits only for the next delay to elapse, but
 * other threads await indefinitely.  The leader thread must
 * signal some other thread before returning from take() or
 * poll(...), unless some other thread becomes leader in the
 * interim.  Whenever the head of the queue is replaced with
 * an element with an earlier expiration time, the leader
 * field is invalidated by being reset to null, and some
 * waiting thread, but not necessarily the current leader, is
 * signalled.  So waiting threads must be prepared to acquire
 * and lose leadership while waiting.
 */
private Thread leader = null;

/**
 * Condition signalled when a newer element becomes available
 * at the head of the queue or a new thread may need to
 * become leader.
 */
private final Condition available = lock.newCondition();

/**
 * 默认空构造
 */
public DelayQueue() {}

使用PriorityQueue存储元素,leader的用法有点ReetrantLock的独占锁的意思。

添加元素的方法:

public boolean add(E e) {
    return offer(e);
}

/**
 * 添加元素
 */
public boolean offer(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        q.offer(e);
        if (q.peek() == e) { //添加元素后peek还是e,重置leader,通知条件队列
            leader = null;
            available.signal();
        }
        return true;
    } finally {
        lock.unlock();
    }
}

public void put(E e) {
    offer(e);
}

/**
 * 超时时间没有,无界,肯定成功
 */
public boolean offer(E e, long timeout, TimeUnit unit) {
    return offer(e);
}

最后都是调动offer,带超时的offer,超时时间不起作用,因为是无界的,不会产生阻塞,所以超时没有意思。

看下获取poll\take\peek:

public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        E first = q.peek();
        if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0) //队列为空或者延迟时间未过期
            return null;
        else
            return q.poll();
    } finally {
        lock.unlock();
    }
}

/**
 * take元素,元素未过期需要阻塞
 */
public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            E first = q.peek();
            if (first == null)
                available.await(); //队列空,加入条件队列
            else {
                long delay = first.getDelay(TimeUnit.NANOSECONDS); //获取剩余延迟时间
                if (delay <= 0) //小于0,那就poll元素
                    return q.poll();
                else if (leader != null) //有延迟,检查leader,不为空说明有其他线程在等待,那就加入条件队列
                    available.await();
                else { 
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread; //设置当前为leader等待
                    try {
                        available.awaitNanos(delay); //条件队列等待指定时间
                    } finally {
                        if (leader == thisThread) //检查是否被其他线程改变,没有就重置,再次循环
                            leader = null;
                    }
                }
            }
        }
    } finally {
        if (leader == null && q.peek() != null) //leader为空并且队列不空,说明没有其他线程在等待,那就通知条件队列
            available.signal();
        lock.unlock();
    }
}

/**
 * 响应超时的poll
 */
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            E first = q.peek();
            if (first == null) {
                if (nanos <= 0)
                    return null;
                else
                    nanos = available.awaitNanos(nanos);
            } else {
                long delay = first.getDelay(TimeUnit.NANOSECONDS);
                if (delay <= 0)
                    return q.poll();
                if (nanos <= 0)
                    return null;
                if (nanos < delay || leader != null)
                    nanos = available.awaitNanos(nanos);
                else {
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        long timeLeft = available.awaitNanos(delay);
                        nanos -= delay - timeLeft;
                    } finally {
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        if (leader == null && q.peek() != null)
            available.signal();
        lock.unlock();
    }
}

/**
 * 获取queue[0],peek是不移除的
 */
public E peek() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return q.peek();
    } finally {
        lock.unlock();
    }
}

不带阻塞的poll,直接peek元素,判断非空并且延迟时间未到,那就return null,到了那就poll。take()方法考虑的就比较多:1.队列为空,阻塞;2.不空就看延迟时间,到了就poll,没到就看有没有其他线程已经占用等待了,有就阻塞,没有就自己占用leader,然后wait。

DelayedQueue其他方法不看了,理解offer和take就OK了。