📜  Java中的 DelayQueue offer() 方法及示例(1)

📅  最后修改于: 2023-12-03 15:16:22.715000             🧑  作者: Mango

Java中的 DelayQueue offer() 方法及示例

DelayQueue简介

DelayQueue是Java中的一个带有延迟时间的队列,延迟时间指的是在指定的时间后才能够获取队列中的元素。可以用来实现定时任务、消息发送等。

该队列的元素必须实现Delayed接口,该接口只有一个方法:getDelay(TimeUnit unit),表示队列到期的剩余时间。

DelayQueue内部采用PriorityQueue进行存储,每个元素的延迟时间会影响其在队列中的排序。

offer()方法

offer()方法为DelayQueue提供了添加元素的功能。它的源码实现如下:

public boolean offer(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        if (e == null)
            throw new NullPointerException();
        if (!shutdown) {
            final long delay = e.getDelay(NANOSECONDS);
            if (delay < 0)
                delayQueue.offer(e);
            else {
                final long now = System.nanoTime();
                if (delay == Long.MAX_VALUE)
                    delay = now;
                else
                    delay += now;
                if (delay < 0) // overflow
                    delay = Long.MAX_VALUE;
                q.offer(new DelayedTask(e, delay));
                if (q.peek() == e)
                    available.signalAll();
            }
            return true;
        }
        throw new RejectedExecutionException("Shutdown");
    } finally {
        lock.unlock();
    }
}

可以看出,offer()方法是通过ReentrantLock加锁实现的,该锁保证了对队列操作的原子性。

当添加的元素是null时,会抛出NullPointerException异常;当队列已经被shutdown时,会抛出RejectedExecutionException异常。

当元素的延迟时间小于0时,将直接添加到队列delayQueue中,否则会将元素封装为一个DelayedTask并加入到PriorityQueue中。

示例
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

public class DelayQueueExample {
    static class Message implements Delayed {
        private String content;
        private long delayTime;

        public Message(String content, long delayTime) {
            this.content = content;
            this.delayTime = delayTime;
        }

        @Override
        public long getDelay(TimeUnit unit) {
            return unit.convert(delayTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        }

        @Override
        public int compareTo(Delayed o) {
            return content.compareTo(((Message) o).content);
        }

        public String getContent() {
            return content;
        }
    }

    public static void main(String[] args) throws InterruptedException {
        DelayQueue<Message> queue = new DelayQueue<>();
        queue.offer(new Message("message 1", System.currentTimeMillis() + 10000));
        queue.offer(new Message("message 2", System.currentTimeMillis() + 5000));
        queue.offer(new Message("message 3", System.currentTimeMillis() + 15000));
        queue.offer(new Message("message 4", System.currentTimeMillis() + 7000));
        queue.offer(new Message("message 5", System.currentTimeMillis() + 2000));

        while (!queue.isEmpty()) {
            Message message = queue.take();
            System.out.println("got message: " + message.getContent());
        }
    }
}

该示例模拟了消息发送,将需要发送的消息添加进DelayQueue中,每个消息有不同的延迟时间。

使用queue.take()获取队列中的元素时,若元素的延迟时间还未到,则会被阻塞。

输出结果:

got message: message 5
got message: message 2
got message: message 4
got message: message 1
got message: message 3

可以看到,消息被按照延迟时间依次取出。