📜  Java BlockingQueue(1)

📅  最后修改于: 2023-12-03 14:42:13.394000             🧑  作者: Mango

Java BlockingQueue

简介

Java BlockingQueue 是一个线程安全的队列,在多线程并发访问时,可以操作队列进行并发处理。

BlockingQueue 有以下特点:

  • 队列支持两个附加操作,即 put 和 take ,而不只是常规的 add 和 remove 操作。
  • BlockingQueue 在队列为空时,取元素的操作将会被阻塞,直到队列中有元素。
  • 在队列满时,存元素的操作将会被阻塞,直到队列中出现空位。
  • BlockingQueue 可以用于线程间的数据交换和传递。

Java 5 提供了 7 个不同的 BlockingQueue 实现,每个实现都有不同的特性和用途。

BlockingQueue 的实现类

下面是 7 个 BlockingQueue 的实现类:

  1. ArrayBlockingQueue
  2. LinkedBlockingQueue
  3. PriorityBlockingQueue
  4. DelayQueue
  5. SynchronousQueue
  6. LinkedTransferQueue
  7. LinkedBlockingDeque
ArrayBlockingQueue

ArrayBlockingQueue 是一个有界队列,其底层实现是一个数组,必须指定容量。如果队列已满,就无法再添加元素,直到队列中的某个元素被移除为止。ArrayBlockingQueue 内部通过 ReentrantLock 来保证线程安全。

示例代码:

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class ArrayBlockingQueueDemo {
    public static void main(String[] args) {
        BlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<>(10);
        //添加元素
        blockingQueue.add(1);
        //获取元素
        Integer element = blockingQueue.poll();
    }
}
LinkedBlockingQueue

LinkedBlockingQueue 是一个无界队列,其底层实现是链表。当队列为空时,取元素的操作将会被阻塞,直到队列中有元素;当队列已满时,存元素的操作将会被阻塞,直到队列中出现空位。LinkedBlockingQueue 内部通过 ReentrantLock 来保证线程安全。

示例代码:

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class LinkedBlockingQueueDemo {
    public static void main(String[] args) {
        BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<>();
        //添加元素
        blockingQueue.add(1);
        //获取元素
        Integer element = blockingQueue.poll();
    }
}
PriorityBlockingQueue

PriorityBlockingQueue 是一个无界队列,其底层实现是二叉堆。PriorityBlockingQueue 内部使用 ReentrantLock 来保证线程安全。根据元素的自然顺序或 Comparator 顺序,元素会被加入队列中的适当位置。如果队列为空,那么调用 remove() 和 poll() 时会抛出异常。

示例代码:

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;

public class PriorityBlockingQueueDemo {
    public static void main(String[] args) {
        BlockingQueue<Integer> blockingQueue = new PriorityBlockingQueue<>();
        //添加元素
        blockingQueue.add(2);
        blockingQueue.add(1);
        //获取元素
        Integer element = blockingQueue.poll();
    }
}
DelayQueue

DelayQueue 是一个无界队列,其底层实现是 PriorityBlockingQueue 。DelayQueue 内部通过 ReentrantLock 来保证线程安全。只有在延迟期满时才能从中提取元素。DelayQueue 实现了 Delayed 接口,需要实现 getDelay() 方法来指定元素延迟的时间,以及 compareTo() 方法来进行元素的排序。

示例代码:

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

public class DelayQueueDemo {
    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<DelayedElement> blockingQueue = new DelayQueue<>();
        //添加元素
        blockingQueue.add(new DelayedElement("1", 1000));
        blockingQueue.add(new DelayedElement("2", 2000));
        //获取元素
        DelayedElement element = blockingQueue.take();
    }
}

class DelayedElement implements Delayed {
    private String name;
    private long startTime;

    public DelayedElement(String name, long delayTime) {
        this.name = name;
        this.startTime = System.currentTimeMillis() + delayTime;
    }

    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(startTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
    }

    @Override
    public int compareTo(Delayed o) {
        return (int) (this.startTime - ((DelayedElement) o).startTime);
    }
}
SynchronousQueue

SynchronousQueue 是一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作。如果队列中已有元素,则插入操作将会阻塞,直到另一个线程取走该元素;如果队列为空,则取元素的操作将会阻塞,直到另一个线程插入该元素。

示例代码:

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;

public class SynchronousQueueDemo {
    public static void main(String[] args) {
        BlockingQueue<Integer> blockingQueue = new SynchronousQueue<>();
        new Thread(() -> {
            try {
                //添加元素
                blockingQueue.put(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
        new Thread(() -> {
            try {
                //获取元素
                Integer element = blockingQueue.take();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
    }
}
LinkedTransferQueue

LinkedTransferQueue 是一个无界队列,继承自 TransferQueue 接口。LinkedTransferQueue 内部通过 ReentrantLock 和 Condition 来保证线程安全。LinkedTransferQueue 提供了两个方法:transfer() 和 tryTransfer() 。transfer() 方法在元素被取走之前会一直阻塞;tryTransfer() 方法可以设置超时时间,如果在超时时间内元素无法被取走,则返回 false 。

示例代码:

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedTransferQueue;

public class LinkedTransferQueueDemo {
    public static void main(String[] args) {
        BlockingQueue<Integer> blockingQueue = new LinkedTransferQueue<>();
        new Thread(() -> {
            try {
                //添加元素
                blockingQueue.transfer(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
        new Thread(() -> {
            try {
                //获取元素
                Integer element = blockingQueue.take();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
    }
}
LinkedBlockingDeque

LinkedBlockingDeque 是一个双向队列,其底层实现是链表。LinkedBlockingDeque 内部通过 ReentrantLock 来保证线程安全。除了实现 BlockingQueue 接口外,LinkedBlockingDeque 还提供了双端队列相关的方法,例如 addFirst() 、 addLast() 、 peekFirst() 和 peekLast() 。

示例代码:

import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;

public class LinkedBlockingDequeDemo {
    public static void main(String[] args) {
        BlockingDeque<Integer> blockingDeque = new LinkedBlockingDeque<>();
        //添加元素
        blockingDeque.addFirst(2);
        blockingDeque.addLast(1);
        //获取元素
        Integer first = blockingDeque.pollFirst();
        Integer last = blockingDeque.pollLast();
    }
}
总结

Java BlockingQueue 是一个非常实用的工具类,可以在多线程并发访问时进行并发处理。Java 5 提供了 7 个不同的 BlockingQueue 实现类,每个实现都有不同的特点和用途,程序员需要根据实际需求选择合适的实现类。