📜  Java Java

📅  最后修改于: 2022-05-13 01:55:44.231000             🧑  作者: Mango

Java Java

CyclicBarrier 用于使线程相互等待。用于不同线程处理一部分计算,当所有线程都完成执行后,需要在父线程中合并结果。换句话说,当多个线程执行不同的子任务并且这些子任务的输出需要组合形成最终输出时,使用了CyclicBarrier。完成执行后,线程调用 await() 方法并等待其他线程到达屏障。一旦所有线程都到达,障碍就会让线程继续前进。

CyclicBarrier的工作

CyclicBarriers 在Java.util.concurrent 包中定义。首先创建一个 CyclicBarriers 的新实例,指定障碍应等待的线程数。

CyclicBarrier newBarrier = new CyclicBarrier(numberOfThreads);

每个线程都进行一些计算,并在完成执行后调用 await() 方法,如下所示:

public void run()
{
    // thread does the computation
    newBarrier.await();
}

CyclicBarrier的工作:


一旦调用 await() 的线程数等于numberOfThreads ,屏障就会为等待的线程提供方法。 CyclicBarrier 也可以通过所有线程到达屏障后执行的某些操作来初始化。该动作可以组合/利用在屏障中等待的各个线程的计算结果。

Runnable action = ... 
//action to be performed when all threads reach the barrier;
CyclicBarrier newBarrier = new CyclicBarrier(numberOfThreads, action);

CyclicBarrier 的重要方法:

  1. getParties:返回触发此障碍所需的参与方数量。
    句法:
    public int getParties()

    回报:
    打破这一障碍所需的参与方数量

  2. 重置:将屏障重置为其初始状态。
    句法:
    public void reset()

    回报:
    void 但将屏障重置为其初始状态。如果任何一方当前在屏障处等待,他们将返回一个 BrokenBarrierException。

  3. isBroken:查询此屏障是否处于损坏状态。
    句法:
    public boolean isBroken()

    回报:
    如果一个或多个参与方由于自构造或上次重置后的中断或超时而突破此屏障,或者由于异常而导致屏障动作失败,则为 true;否则为假。

  4. getNumberWaiting:返回当前在屏障处等待的参与方数量。
    句法:
    public int getNumberWaiting()

    回报:
    当前在 await() 中被阻止的参与方数量

  5. await:等到所有各方都在此屏障上调用了 await。
    句法:
    public int await() throws InterruptedException, BrokenBarrierException

    回报:
    当前线程的到达索引,其中 index getParties() – 1 表示第一个到达,0 表示最后一个到达。

  6. await:等待直到所有各方都在此屏障上调用了 await,或者指定的等待时间过去。
    句法:
    public int await(long timeout, TimeUnit unit) 
    throws InterruptedException,
    BrokenBarrierException, TimeoutException

    回报:
    当前线程的到达索引,其中 index getParties() – 1 表示第一个到达,0 表示最后一个到达

//JAVA program to demonstrate execution on Cyclic Barrier
  
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
  
class Computation1 implements Runnable
{
    public static int product = 0;
    public void run()
    {
        product = 2 * 3;
        try
        {
            Tester.newBarrier.await();
        } 
        catch (InterruptedException | BrokenBarrierException e) 
        {
            e.printStackTrace();
        }
    }
}
  
class Computation2 implements Runnable
{
    public static int sum = 0;
    public void run()
    {
        // check if newBarrier is broken or not
        System.out.println("Is the barrier broken? - " + Tester.newBarrier.isBroken());
        sum = 10 + 20;
        try
        {
            Tester.newBarrier.await(3000, TimeUnit.MILLISECONDS);
          
            // number of parties waiting at the barrier
            System.out.println("Number of parties waiting at the barrier "+
            "at this point = " + Tester.newBarrier.getNumberWaiting());
        } 
        catch (InterruptedException | BrokenBarrierException e) 
        {
            e.printStackTrace();
        } 
        catch (TimeoutException e) 
        {
            e.printStackTrace();
        }
    }
}
  
  
public class Tester implements Runnable
{
    public static CyclicBarrier newBarrier = new CyclicBarrier(3);
      
    public static void main(String[] args)
    {
        // parent thread
        Tester test = new Tester();
          
        Thread t1 = new Thread(test);
        t1.start();
    }
    public void run()
    {
        System.out.println("Number of parties required to trip the barrier = "+
        newBarrier.getParties());
        System.out.println("Sum of product and sum = " + (Computation1.product + 
        Computation2.sum));
          
        // objects on which the child thread has to run
        Computation1 comp1 = new Computation1();
        Computation2 comp2 = new Computation2();
          
        // creation of child thread
        Thread t1 = new Thread(comp1);
        Thread t2 = new Thread(comp2);
          
        // moving child thread to runnable state
        t1.start();
        t2.start();
  
        try
        {
            Tester.newBarrier.await();
        } 
        catch (InterruptedException | BrokenBarrierException e) 
        {
            e.printStackTrace();
        }
          
        // barrier breaks as the number of thread waiting for the barrier
        // at this point = 3
        System.out.println("Sum of product and sum = " + (Computation1.product + 
        Computation2.sum));
                  
        // Resetting the newBarrier
        newBarrier.reset();
        System.out.println("Barrier reset successful");
    }
}

输出:

解释: (sum + product) = 0 的值打印在控制台上,因为子线程还没有运行来设置 sum 和 product 变量的值。在此之后,(sum + product) = 36 打印在控制台上,因为子线程运行设置 sum 和 product 的值。此外,屏障上的等待线程数达到了 3 个,因此屏障允许所有线程通过,最后打印了 36 个。 “此时在屏障处等待的参与方数量”的值 = 0,因为所有三个线程都已经调用了 await() 方法,因此屏障不再处于活动状态。最后,newBarrier 被重置,可以再次使用。

破碎屏障异常

当任何等待线程离开屏障时,屏障就会中断。当一个或多个等待线程被中断或等待时间完成时会发生这种情况,因为线程调用了具有超时的 await() 方法,如下所示:

newBarrier.await(1000, TimeUnit.MILLISECONDS);
// thread calling this await() 
// methods waits for only 1000 milliseconds.

当屏障由于多个参与线程之一而中断时,所有其他线程的 await() 方法将引发 BrokenThreadException。然而,已经在屏障中等待的线程的 await() 调用终止。

CyclicBarrier 和 CountDownLatch 之间的区别

  • CountDownLatch 只能在程序中使用一次(直到它的计数达到 0)。
  • 一旦释放了屏障中的所有线程,就可以一次又一次地使用 CyclicBarrier。

参考:甲骨文