DelayQueue介绍 - 高飞网
543 人阅读

DelayQueue介绍

2017-07-28 02:09:46

    本文将介绍DelayQueue类的使用方法。DelayQueue是一个空间大小不受限制、基于时间计时器的阻塞队列。其中的元素放在堆内存中。而这些元素只有等到它时间到期时才能够被取出(take)。如果多个元素同时到期,那么那个延时时间最长的元素将被率先取出。

DelayQueue特性

  1. DelayQueue是一个不限制空间大小的队列。
  2. DelayQueue队列中的元素只有在延时到期后才能被取出,如果没有到期的元素,将没有队头,poll操作将返回null。
  3. 在队列的头部,是过期最时间最长的元素。
  4. DelayQueue不接受Null值。
  5. 在DelayQueue队列中,其中保存的元素对象必须实现Delayed接口。这个接口强制你实现以下两个方法:
    1)getDelay:该方法返回延时过期前剩余的时间。该方法的重要性是因为Java程序需要根据该方法返回队列中值那些大于等于0元素。
    2)compareTo:Delayed接口继承了Comparable接口,因此还必须实现compareTo方法,去指定其中的一个元素与另外的过期元素之间如何排序。

    有许多场景都可以利用上DelayQueue,比如说传真机。之所以传真机在正在传输的情况下仍然不会丢失任何进入的请求,是因为它把进入的传真请求放置在队列中,然后立即返回给请求的客户端。另外,有一些社交网站让用户在指定的时间内可以修改评论,这时将把这些评论放到延时队列中。在时间到了以后,将会被移出队列,也就不能被编辑了。

    下面看一下DelayQueue的使用示例。

示例

Delayed接口的实现类,必须实现getDelay和compareTo接口

public class DelayObject implements Delayed {

    private String data;
    private long startTime;// 出队的时间

    public DelayObject(String data, long delay) {
        this.data = data;
        this.startTime = System.currentTimeMillis() + delay;
    }

    public String getData() {
        return data;
    }

    @Override
    public long getDelay(TimeUnit unit) {
        long diff = startTime - System.currentTimeMillis();
        return unit.convert(diff, TimeUnit.MILLISECONDS);
    }

    @Override
    public int compareTo(Delayed o) {
        if (this.startTime < ((DelayObject) o).startTime) {
            return -1;
        }
        if (this.startTime > ((DelayObject) o).startTime) {
            return 1;
        }
        return 0;
    }

    @Override
    public String toString() {
        return "{" + "data='" + data + '\'' + ", startTime=" + startTime + '}';
    }
}

队列的生产者,创建DelayObject对象并放到DelayQueue中

import java.util.Random;
import java.util.UUID;
import java.util.concurrent.DelayQueue;

public class DelayQueueProducer extends Thread{

    private DelayQueue<DelayObject> queue;

    private final Random random = new Random();

    public DelayQueueProducer(DelayQueue<DelayObject> queue) {
        this.queue = queue;
        Thread.currentThread().setName("Producer Thread");
    }

    @Override
    public void run() {
        while (true) {
            try {
                int delay = random.nextInt(10000);
                DelayObject object = new DelayObject(UUID.randomUUID().toString(), delay);

                System.out.printf("Put object = %s%n", object);
                queue.put(object);//放置延时对象
                Thread.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

    }

}

在延时队列消息者类中,尝试去找到过期元素并从队列中取出,如果找不到则等待直到其过期。

import java.util.concurrent.BlockingQueue;

public class DelayQueueConsumer extends Thread {

    private BlockingQueue<DelayObject> queue;

    public DelayQueueConsumer(BlockingQueue<DelayObject> queue) {
        Thread.currentThread().setName("Consumer Thread-1");
        this.queue = queue;
    }

    @Override
    public void run() {
        while (true) {
            try {
                DelayObject object = queue.take();// 取出延时对象
                System.out.printf("[%s] - Take object = %s%n", Thread.currentThread().getName(), object);
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

}

最后运行一个延时队列测试类DelayQueueTest。在这个类中创建生产消费两个线程,分别让他们生产、消息延时对象。

import java.util.concurrent.DelayQueue;

public class DelayQueueTest {
    public static void main(String[] args) {
        // 创建一个公用的延时队列实现
        DelayQueue<DelayObject> queue = new DelayQueue<DelayObject>();
        // 启动延时队列的生产线程,把一些延时对象放置到队列中
        new DelayQueueProducer(queue).start();
        // 启动延时队列的消费线程,从队列中取出到期的对象
        new DelayQueueConsumer(queue).start();
    }
}

输出:

Put object = {data='85aeebd4-06be-4789-89fb-20aea85a5ece', startTime=1461683046771}
Put object = {data='6f755620-47d9-48b1-b4b5-9ace094eac75', startTime=1461683043462}
Put object = {data='1bb89fdb-07fe-477d-8fc0-edf20de5f501', startTime=1461683046998}
Put object = {data='58a576a6-023d-4a2d-b74d-68039edaf8f6', startTime=1461683043236}
Put object = {data='96e62b37-833f-4774-9249-49f40a451168', startTime=1461683044293}
Put object = {data='11f33b92-5f64-46b9-9012-c7ebb21dfdc3', startTime=1461683043135}
Put object = {data='079f8065-de46-4992-82be-bd7a3bdb244a', startTime=1461683050980}
Put object = {data='54be3970-47a1-4593-9b4c-e0fc8ccd9fff', startTime=1461683049942}
Put object = {data='62031a48-4add-4257-a63f-806fcaf73b42', startTime=1461683051172}
Put object = {data='94c89fad-e3ad-4b40-a16e-6a1a9481f9a7', startTime=1461683046254}
Put object = {data='676804c0-47a2-4979-aab0-c8b680bb2dc9', startTime=1461683045340}
[Thread-1] - Take object = {data='11f33b92-5f64-46b9-9012-c7ebb21dfdc3', startTime=1461683043135}
Put object = {data='55980b8c-1d7b-41ad-af47-48bc2b64cd4e', startTime=1461683046746}
Put object = {data='33ba9aca-d247-4358-8eab-09905167cc5c', startTime=1461683048577}
[Thread-1] - Take object = {data='58a576a6-023d-4a2d-b74d-68039edaf8f6', startTime=1461683043236}
Put object = {data='7fad58d1-378c-4b82-8e8d-0031c611b51f', startTime=1461683044580}
Put object = {data='b02595e8-4514-4b8e-a432-837325c06279', startTime=1461683054089}
[Thread-1] - Take object = {data='6f755620-47d9-48b1-b4b5-9ace094eac75', startTime=1461683043462}
Put object = {data='fc5e8e2d-7657-4332-82d5-6fec89d6458a', startTime=1461683052866}
Put object = {data='f41cd2b8-5dbe-4f20-9522-cf3fa0d501c2', startTime=1461683055726}






还没有评论!
23.20.162.200