📜  实现同步队列 API 的Java程序

📅  最后修改于: 2022-05-13 01:55:45.924000             🧑  作者: Mango

实现同步队列 API 的Java程序

SynchronousQueue 是一个特殊的阻塞队列,没有内部容量。它有助于以线程安全的方式在线程之间交换数据或信息。

SynchronousQueue 仅支持 2 个操作:

这两种方法都是阻塞方法,这意味着当我们想在队列中添加一条信息或数据时,我们调用put()方法,但该方法将保持阻塞状态或等待其他线程调用take()方法和允许线程获取数据或信息。

1.采取()

Java
try
{
  synchronousQueue.put("data or information goes here");
}
 
catch(InterruptedException iex)
{
  iex.printStackTrace();
}


Java
try
{
  // data type according to the data or information
  String info = synchronousQueue.take();
}
 
catch(InterruptedException iex)
{
  iex.printStackTrace();
}


Java
// Java program to implement SynchronousQueue API.
 
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
 
public class SynchronousQAPI {
 
    public SynchronousQueue synchronousQ;
 
    // we create a SynchronousQueue with no fair policy
 
    public SynchronousQAPI()
    {
        synchronousQ = new SynchronousQueue();
    }
 
    // we create a SynchronousQueue with fair policy
 
    public SynchronousQAPI(boolean fair)
    {
        synchronousQ = new SynchronousQueue();
    }
 
    // As we discussed above in API overview that
    // SynchronousQueue has 2 supported operations put() and
    // take() So, we will implement this methods only
 
    // put() method: It insert element at tail of the queue
    // and used to wait until the queue is full.
 
    public void put(E e) throws InterruptedException
    {
        synchronousQ.put(e);
    }
 
    // take() method: return element at the head of the
    // queue
 
    public E take() throws InterruptedException
    {
        return synchronousQ.take();
    }
 
    // Implementation of Put Thread (producer)
    class Put implements Runnable {
 
        @SuppressWarnings("rawtypes")
        BlockingQueue SynchronousQueue;
 
        @SuppressWarnings("rawtypes")
        public Put(BlockingQueue q)
        {
            this.SynchronousQueue = q;
        }
 
        @SuppressWarnings("unchecked")
        @Override
        public void run()
        {
            try {
                // put the data
                SynchronousQueue.put(1);
                System.out.println(
                    "1 added to synchronous queue.");
                Thread.sleep(1000);
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
 
    class Take implements Runnable {
        @SuppressWarnings("rawtypes")
        BlockingQueue SynchronousQueue;
 
        @SuppressWarnings("rawtypes")
        public Take(BlockingQueue q)
        {
            this.SynchronousQueue = q;
        }
 
        @Override public void run()
        {
            try {
                // take out the previously inserted data
                this.SynchronousQueue.take();
 
                System.out.println(
                    "1 removed from synchronous queue.");
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
 
    public static void main(String[] args)
        throws InterruptedException
    {
        SynchronousQAPI synchronousQueue
            = new SynchronousQAPI();
 
        new Thread(new SynchronousQAPI<>().new Put(
                       synchronousQueue.synchronousQ))
                       .start();
 
        new Thread(new SynchronousQAPI<>().new Take(
                       synchronousQueue.synchronousQ))
                       .start();
    }
}


2.放()

Java

try
{
  // data type according to the data or information
  String info = synchronousQueue.take();
}
 
catch(InterruptedException iex)
{
  iex.printStackTrace();
}

SynchronousQueue 有两种类型的构造函数,它们基于两种不同的访问策略:

1. SynchronousQueue():在这种情况下,如果多个线程正在等待,则这些线程被授予随机或未指定的访问权限,这称为不公平策略。

2. SynchronousQueue(boolean fair):在这种情况下,如果有多个线程在等待,那么这些线程以先进先出的方式被授予访问权限。

执行:

Java

// Java program to implement SynchronousQueue API.
 
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
 
public class SynchronousQAPI {
 
    public SynchronousQueue synchronousQ;
 
    // we create a SynchronousQueue with no fair policy
 
    public SynchronousQAPI()
    {
        synchronousQ = new SynchronousQueue();
    }
 
    // we create a SynchronousQueue with fair policy
 
    public SynchronousQAPI(boolean fair)
    {
        synchronousQ = new SynchronousQueue();
    }
 
    // As we discussed above in API overview that
    // SynchronousQueue has 2 supported operations put() and
    // take() So, we will implement this methods only
 
    // put() method: It insert element at tail of the queue
    // and used to wait until the queue is full.
 
    public void put(E e) throws InterruptedException
    {
        synchronousQ.put(e);
    }
 
    // take() method: return element at the head of the
    // queue
 
    public E take() throws InterruptedException
    {
        return synchronousQ.take();
    }
 
    // Implementation of Put Thread (producer)
    class Put implements Runnable {
 
        @SuppressWarnings("rawtypes")
        BlockingQueue SynchronousQueue;
 
        @SuppressWarnings("rawtypes")
        public Put(BlockingQueue q)
        {
            this.SynchronousQueue = q;
        }
 
        @SuppressWarnings("unchecked")
        @Override
        public void run()
        {
            try {
                // put the data
                SynchronousQueue.put(1);
                System.out.println(
                    "1 added to synchronous queue.");
                Thread.sleep(1000);
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
 
    class Take implements Runnable {
        @SuppressWarnings("rawtypes")
        BlockingQueue SynchronousQueue;
 
        @SuppressWarnings("rawtypes")
        public Take(BlockingQueue q)
        {
            this.SynchronousQueue = q;
        }
 
        @Override public void run()
        {
            try {
                // take out the previously inserted data
                this.SynchronousQueue.take();
 
                System.out.println(
                    "1 removed from synchronous queue.");
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
 
    public static void main(String[] args)
        throws InterruptedException
    {
        SynchronousQAPI synchronousQueue
            = new SynchronousQAPI();
 
        new Thread(new SynchronousQAPI<>().new Put(
                       synchronousQueue.synchronousQ))
                       .start();
 
        new Thread(new SynchronousQAPI<>().new Take(
                       synchronousQueue.synchronousQ))
                       .start();
    }
}
输出
1 added to synchronous queue.
1 removed from synchronous queue.