📜  Java中的BlockingQueue接口

📅  最后修改于: 2022-05-13 01:54:33.920000             🧑  作者: Mango

Java中的BlockingQueue接口

Java 1.5 中添加了Java中的BlockingQueue 接口以及各种其他并发实用程序类,如 ConcurrentHashMap、 Counting Semaphore 、CopyOnWriteArrrayList 等。BlockingQueue 接口通过在 BlockingQueue 为满或为空时引入阻塞来支持流控制(除了队列)。试图将一个元素加入一个完整队列的线程被阻塞,直到某个其他线程通过使一个或多个元素出队或完全清除队列来在队列中腾出空间。类似地,它会阻止一个线程试图从一个空队列中删除,直到其他一些线程插入一个项目。 BlockingQueue 不接受空值。如果我们尝试将 null 项排入队列,则会抛出NullPointerException
Java提供了几种 BlockingQueue 实现,如 LinkedBlockingQueue、ArrayBlockingQueue、PriorityBlockingQueue、 SynchronousQueue等Java BlockingQueue 接口实现是线程安全的。 BlockingQueue 的所有方法本质上都是原子的,并使用内部锁或其他形式的并发控制。 Java 5 在Java.util.concurrent 包中附带了 BlockingQueue 实现。

阻塞队列的使用

阻塞队列的使用

由生产者(放置)线程和消费者(获取)线程访问的 BlockingQueue

BlockingQueue 的层次结构

BlockingQueue 的层次结构

宣言

public interface BlockingQueue extends Queue

这里, E是存储在 Collection 中的元素的类型。

实现 BlockingQueue 的类

我们不能直接提供 BlockingQueue 的实例,因为它是一个接口,所以要利用 BlockingQueue 的功能,我们需要利用实现它的类。此外,要在代码中使用 BlockingQueue,请使用此 import 语句。

import java.util.concurrent.BlockingQueue;
                    (or)
import java.util.concurrent.*;
  • 链接阻塞队列
  • 数组阻塞队列

BlockingDeque 的实现类是 LinkedBlockingDeque。这个类是BlockingDeque和链表数据结构的实现。 LinkedBlockingDeque 可以选择使用构造函数来限制,但是,如果未指定容量,则默认为 Integer.MAX_VALUE。节点在插入时动态添加,遵守容量限制。

创建对象的语法:

BlockingQueue objectName = new LinkedBlockingDeque();   
                         (or)
LinkedBlockingDeque objectName = new LinkedBlockingDeque();

示例:在下面给出的代码中,我们对 LinkedBlockingDeque 执行一些基本操作,例如创建对象、添加元素、删除元素以及使用迭代器遍历 LinkedBlockingDeque。

阻塞队列类型

BlockingQueue 有两种类型

1. 无界队列:阻塞队列的容量将设置为 Integer.MAX_VALUE。在无界阻塞队列的情况下,队列永远不会阻塞,因为它可能会增长到非常大的大小。当您添加元素时,它的大小会增长。

句法:

BlockingQueue blockingQueue = new LinkedBlockingDeque();

2.有界队列:第二种队列是有界队列。在有界队列的情况下,您可以在队列构造函数中创建一个传递队列容量的队列:

句法:

// Creates a Blocking Queue with capacity 5
BlockingQueue blockingQueue = new LinkedBlockingDeque(5);

使用 BlockingQueue 实现有界信号量

Java
// Java program that explains the internal
// implementation of BlockingQueue
 
import java.io.*;
import java.util.*;
 
class BlockingQueue {
 
    // BlockingQueue using LinkedList structure
    // with a constraint on capacity
    private List queue = new LinkedList();
 
    // limit variable to define capacity
    private int limit = 10;
 
    // constructor of BlockingQueue
    public BlockingQueue(int limit) { this.limit = limit; }
 
    // enqueue method that throws Exception
    // when you try to insert after the limit
    public synchronized void enqueue(E item)
        throws InterruptedException
    {
        while (this.queue.size() == this.limit) {
            wait();
        }
        if (this.queue.size() == 0) {
            notifyAll();
        }
        this.queue.add(item);
    }
 
    // dequeue methods that throws Exception
    // when you try to remove element from an
    // empty queue
    public synchronized E dequeue()
        throws InterruptedException
    {
        while (this.queue.size() == 0) {
            wait();
        }
        if (this.queue.size() == this.limit) {
            notifyAll();
        }
 
        return this.queue.remove(0);
    }
   
    public static void main(String []args)
    {
    }
}


Java
// Java Program to demonstrate usuage of BlockingQueue
 
import java.util.concurrent.*;
import java.util.*;
 
public class GFG {
 
    public static void main(String[] args)
        throws InterruptedException
    {
 
        // define capacity of ArrayBlockingQueue
        int capacity = 5;
 
        // create object of ArrayBlockingQueue
        BlockingQueue queue
            = new ArrayBlockingQueue(capacity);
 
        // Add elements to ArrayBlockingQueue using put
        // method
        queue.put("StarWars");
        queue.put("SuperMan");
        queue.put("Flash");
        queue.put("BatMan");
        queue.put("Avengers");
 
        // print Queue
        System.out.println("queue contains " + queue);
 
        // remove some elements
        queue.remove();
        queue.remove();
 
        // Add elements to ArrayBlockingQueue
        // using put method
        queue.put("CaptainAmerica");
        queue.put("Thor");
 
        System.out.println("queue contains " + queue);
    }
}


Java
// Java Program Demonstrate add()
// method of BlockingQueue
 
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.BlockingQueue;
import java.util.*;
 
public class GFG {
   
    public static void main(String[] args)
        throws IllegalStateException
    {
 
        // create object of BlockingQueue
        BlockingQueue BQ
            = new LinkedBlockingDeque();
 
        // Add numbers to the BlockingQueue
        BQ.add(7855642);
        BQ.add(35658786);
        BQ.add(5278367);
        BQ.add(74381793);
 
        // before removing print BlockingQueue
        System.out.println("Blocking Queue: " + BQ);
    }
}


Java
// Java Program for Accessing the elements of a
// LinkedBlockingDeque
 
import java.util.concurrent.*;
 
public class AccessingElements {
 
    public static void main(String[] args)
    {
 
        // Instantiate an object of LinkedBlockingDeque
        // named lbdq
        BlockingQueue lbdq
            = new LinkedBlockingDeque();
 
        // Add elements using add()
        lbdq.add(22);
        lbdq.add(125);
        lbdq.add(723);
        lbdq.add(172);
        lbdq.add(100);
 
        // Print the elements of lbdq on the console
        System.out.println(
            "The LinkedBlockingDeque, lbdq contains:");
        System.out.println(lbdq);
 
        // To check if the deque contains 22
        if (lbdq.contains(22))
            System.out.println(
                "The LinkedBlockingDeque, lbdq contains 22");
        else
            System.out.println("No such element exists");
 
        // Using element() to retrieve the head of the deque
        int head = lbdq.element();
        System.out.println("The head of lbdq: " + head);
    }
}


Java
// Java Program for removing elements from a
// LinkedBlockingDeque
 
import java.util.concurrent.*;
 
public class RemovingElements {
 
    public static void main(String[] args)
    {
 
        // Instantiate an object of LinkedBlockingDeque
        // named lbdq
        BlockingQueue lbdq
            = new LinkedBlockingDeque();
 
        // Add elements using add()
        lbdq.add(75);
        lbdq.add(86);
        lbdq.add(13);
        lbdq.add(44);
        lbdq.add(10);
 
        // Print the elements of lbdq on the console
        System.out.println(
            "The LinkedBlockingDeque, lbdq contains:");
        System.out.println(lbdq);
 
        // Remove elements using remove();
        lbdq.remove(86);
        lbdq.remove(44);
 
        // Trying to remove an element
        // that doesn't exist
        // in the LinkedBlockingDeque
        lbdq.remove(1);
 
        // Print the elements of lbdq on the console
        System.out.println(
            "\nThe LinkedBlockingDeque, lbdq contains:");
        System.out.println(lbdq);
    }
}


Java
// Java Program to iterate
// through the LinkedBlockingDeque
import java.util.Iterator;
import java.util.concurrent.*;
 
public class IteratingThroughElements {
 
    public static void main(String[] args) {
         
        // Instantiate an object of LinkedBlockingDeque named lbdq
        BlockingQueue lbdq = new LinkedBlockingDeque();
         
        // Add elements using add()
        lbdq.add(166);
        lbdq.add(246);
        lbdq.add(66);
        lbdq.add(292);
        lbdq.add(98);
         
        // Create an iterator to traverse lbdq
        Iterator lbdqIter = lbdq.iterator();
         
        // Print the elements of lbdq on to the console
        System.out.println("The LinkedBlockingDeque, lbdq contains:");
         
        for(int i = 0; i



例子:

Java

// Java Program to demonstrate usuage of BlockingQueue
 
import java.util.concurrent.*;
import java.util.*;
 
public class GFG {
 
    public static void main(String[] args)
        throws InterruptedException
    {
 
        // define capacity of ArrayBlockingQueue
        int capacity = 5;
 
        // create object of ArrayBlockingQueue
        BlockingQueue queue
            = new ArrayBlockingQueue(capacity);
 
        // Add elements to ArrayBlockingQueue using put
        // method
        queue.put("StarWars");
        queue.put("SuperMan");
        queue.put("Flash");
        queue.put("BatMan");
        queue.put("Avengers");
 
        // print Queue
        System.out.println("queue contains " + queue);
 
        // remove some elements
        queue.remove();
        queue.remove();
 
        // Add elements to ArrayBlockingQueue
        // using put method
        queue.put("CaptainAmerica");
        queue.put("Thor");
 
        System.out.println("queue contains " + queue);
    }
}
输出:
queue contains [StarWars, SuperMan, Flash, BatMan, Avengers]
queue contains [Flash, BatMan, Avengers, CaptainAmerica, Thor]

基本操作

1.添加元素

根据我们尝试使用的结构类型,可以以不同的方式将元素添加到 LinkedBlockedDeque 中。最常用的方法是 add() 方法,我们可以使用它在双端队列的末尾添加元素。我们还可以使用 addAll() 方法(这是 Collection 接口的一个方法)将整个集合添加到 LinkedBlockingDeque。如果我们希望将双端队列用作队列,我们可以使用 add() 和 put()。

Java

// Java Program Demonstrate add()
// method of BlockingQueue
 
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.BlockingQueue;
import java.util.*;
 
public class GFG {
   
    public static void main(String[] args)
        throws IllegalStateException
    {
 
        // create object of BlockingQueue
        BlockingQueue BQ
            = new LinkedBlockingDeque();
 
        // Add numbers to the BlockingQueue
        BQ.add(7855642);
        BQ.add(35658786);
        BQ.add(5278367);
        BQ.add(74381793);
 
        // before removing print BlockingQueue
        System.out.println("Blocking Queue: " + BQ);
    }
}
输出
Blocking Queue: [7855642, 35658786, 5278367, 74381793]

2. 访问元素

可以使用 contains()、element()、peek()、poll() 访问 LinkedBlockingDeque 的元素。这些方法也有不同的变体,上表中给出了这些方法及其描述。

Java

// Java Program for Accessing the elements of a
// LinkedBlockingDeque
 
import java.util.concurrent.*;
 
public class AccessingElements {
 
    public static void main(String[] args)
    {
 
        // Instantiate an object of LinkedBlockingDeque
        // named lbdq
        BlockingQueue lbdq
            = new LinkedBlockingDeque();
 
        // Add elements using add()
        lbdq.add(22);
        lbdq.add(125);
        lbdq.add(723);
        lbdq.add(172);
        lbdq.add(100);
 
        // Print the elements of lbdq on the console
        System.out.println(
            "The LinkedBlockingDeque, lbdq contains:");
        System.out.println(lbdq);
 
        // To check if the deque contains 22
        if (lbdq.contains(22))
            System.out.println(
                "The LinkedBlockingDeque, lbdq contains 22");
        else
            System.out.println("No such element exists");
 
        // Using element() to retrieve the head of the deque
        int head = lbdq.element();
        System.out.println("The head of lbdq: " + head);
    }
}


输出
The LinkedBlockingDeque, lbdq contains:
[22, 125, 723, 172, 100]
The LinkedBlockingDeque, lbdq contains 22
The head of lbdq: 22

3. 删除元素

可以使用 remove() 从 LinkedBlockingDeque 中删除元素。其他方法,如 take() 和 poll() 也可以用于删除第一个和最后一个元素。

Java

// Java Program for removing elements from a
// LinkedBlockingDeque
 
import java.util.concurrent.*;
 
public class RemovingElements {
 
    public static void main(String[] args)
    {
 
        // Instantiate an object of LinkedBlockingDeque
        // named lbdq
        BlockingQueue lbdq
            = new LinkedBlockingDeque();
 
        // Add elements using add()
        lbdq.add(75);
        lbdq.add(86);
        lbdq.add(13);
        lbdq.add(44);
        lbdq.add(10);
 
        // Print the elements of lbdq on the console
        System.out.println(
            "The LinkedBlockingDeque, lbdq contains:");
        System.out.println(lbdq);
 
        // Remove elements using remove();
        lbdq.remove(86);
        lbdq.remove(44);
 
        // Trying to remove an element
        // that doesn't exist
        // in the LinkedBlockingDeque
        lbdq.remove(1);
 
        // Print the elements of lbdq on the console
        System.out.println(
            "\nThe LinkedBlockingDeque, lbdq contains:");
        System.out.println(lbdq);
    }
}
输出
The LinkedBlockingDeque, lbdq contains:
[75, 86, 13, 44, 10]

The LinkedBlockingDeque, lbdq contains:
[75, 13, 10]

4. 遍历元素

要遍历 LinkedBlockingDeque 的元素,我们可以创建一个迭代器并使用 Iterable 接口的方法来访问元素,该接口是Java集合框架的根。 Iterable 的 next() 方法返回任何集合的元素。

Java

// Java Program to iterate
// through the LinkedBlockingDeque
import java.util.Iterator;
import java.util.concurrent.*;
 
public class IteratingThroughElements {
 
    public static void main(String[] args) {
         
        // Instantiate an object of LinkedBlockingDeque named lbdq
        BlockingQueue lbdq = new LinkedBlockingDeque();
         
        // Add elements using add()
        lbdq.add(166);
        lbdq.add(246);
        lbdq.add(66);
        lbdq.add(292);
        lbdq.add(98);
         
        // Create an iterator to traverse lbdq
        Iterator lbdqIter = lbdq.iterator();
         
        // Print the elements of lbdq on to the console
        System.out.println("The LinkedBlockingDeque, lbdq contains:");
         
        for(int i = 0; i
输出
The LinkedBlockingDeque, lbdq contains:
166 246 66 292 98 

BlockingQueue的方法

METHOD

DESCRIPTION

add​(E e)Inserts the specified element into this queue if it is possible to do so immediately without violating capacity restrictions, returning true upon success, and throwing an IllegalStateException if no space is currently available.
contains​(Object o)Returns true if this queue contains the specified element.
drainTo​(Collection c)Removes all available elements from this queue and adds them to the given collection.
drainTo​(Collection c, int maxElements)Removes at most the given number of available elements from this queue and adds them to the given collection.
offer​(E e)Inserts the specified element into this queue if it is possible to do so immediately without violating capacity restrictions, returning true upon success and false if no space is currently available.
offer​(E e, long timeout, TimeUnit unit)Inserts the specified element into this queue, waiting up to the specified wait time if necessary for space to become available.
poll​(long timeout, TimeUnit unit)Retrieves and removes the head of this queue, waiting up to the specified wait time if necessary for an element to become available.
put​(E e)Inserts the specified element into this queue, waiting if necessary for space to become available.
remainingCapacity()Returns the number of additional elements that this queue can ideally (in the absence of memory or resource constraints) accept without blocking, or Integer.MAX_VALUE if there is no intrinsic limit.
remove​(Object o)Removes a single instance of the specified element from this queue, if it is present.
take()Retrieves and removes the head of this queue, waiting if necessary until an element becomes available.

在接口Java.util.Collection 中声明的方法

METHOD

DESCRIPTION

addAll​(Collection c)Adds all of the elements in the specified collection to this collection (optional operation).
clear()Removes all of the elements from this collection (optional operation).
containsAll​(Collection c)Returns true if this collection contains all of the elements in the specified collection.
equals​(Object o)Compares the specified object with this collection for equality.
hashCode()Returns the hash code value for this collection.
isEmpty()Returns true if this collection contains no elements.
iterator()Returns an iterator over the elements in this collection.
parallelStream()Returns a possibly parallel Stream with this collection as its source.
removeAll​(Collection c)Removes all of this collection’s elements that are also contained in the specified collection (optional operation).
removeIf​(Predicate filter)Removes all of the elements of this collection that satisfy the given predicate.
retainAll​(Collection c)Retains only the elements in this collection that are contained in the specified collection (optional operation).
size()Returns the number of elements in this collection.
spliterator()Creates a Spliterator over the elements in this collection.
stream()Returns a sequential Stream with this collection as its source.
toArray()Returns an array containing all of the elements in this collection.
toArray​(IntFunction generator)Returns an array containing all of the elements in this collection, using the provided generator function to allocate the returned array.
toArray​(T[] a)Returns an array containing all of the elements in this collection; the runtime type of the returned array is that of the specified array.

在接口Java.lang.Iterable 中声明的方法

METHOD

DESCRIPTION

forEach​(Consumer action)Performs the given action for each element of the Iterable until all elements have been processed or the action throws an exception.

在接口Java .util.Queue 中声明的方法

METHOD

DESCRIPTION

element()Retrieves, but does not remove, the head of this queue.
peek()Retrieves, but does not remove, the head of this queue, or returns null if this queue is empty.
poll()Retrieves and removes the head of this queue, or returns null if this queue is empty.
remove()Retrieves and removes the head of this queue.

BlockingQueue 方法的行为

以下是 BlockingQueue 提供的对队列进行插入、移除和检查操作的方法。如果请求的操作没有立即得到满足,这四组方法中的每一个都会表现不同。

  • 抛出异常:如果请求的操作没有立即得到满足,将抛出异常。
  • 特殊值:如果没有立即满足操作,则返回一个特殊值。
  • 阻塞:如果尝试的操作没有立即满足,则方法调用被阻塞,并等待直到它被执行。
  • 超时:返回一个特殊值,告诉操作是否成功。如果请求的操作无法立即执行,则方法调用会阻塞,直到可以执行,但等待时间不会超过给定的超时时间。
OperationThrows ExceptionSpecial ValueBlocksTimes Out
Insert add(e) offer(e) put(e) offer(e, time, unit)
Removeremove() poll() take() poll(time, unit)
Examine element() peek() not applicable not applicable

参考: Java : Java