BlockingQueue Usage
A BlockingQueue
is typically used to have on thread produce objects, which another thread consumes.
The producing thread will keep producing new objects and insert them into the queue, until the queue reaches some upper bound on what it can contain. If the blocking queue reaches its upper limit, the producing thread is blocked while trying to insert the new object. It remains blocked until a consuming thread takes an object out of the queue.
The consuming thread keeps taking objects out of the blocking queue, and processes them. If the consuming thread tries to take an object out of an empty queue, the consuming thread is blocked until a producing thread puts an object into the queue.
It is not possible to insert null
into a BlockingQueue
. If you try to insert null
, the BlockingQueue
will throw a NullPointerException
.
It is also possible to access all the elements inside a BlockingQueue
, and not just the elements at the start and end. For instance, say you have queued an object for processing, but your application decides to cancel it. You can then call remove(o)
to remove a specific object in the queue. However, this is not done very efficiently.
public class BlockingQueueExample {
public static void main(String[] args) throws Exception {
BlockingQueue queue = new ArrayBlockingQueue(1024);
Producer producer = new Producer(queue);
Consumer consumer = new Consumer(queue);
new Thread(producer).start();
new Thread(consumer).start();
Thread.sleep(4000);
}
}
public class Producer implements Runnable{
protected BlockingQueue queue = null;
public Producer(BlockingQueue queue) {
this.queue = queue;
}
public void run() {
try {
queue.put("1");
Thread.sleep(1000);
queue.put("2");
Thread.sleep(1000);
queue.put("3");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
ArrayBlockingQueue
ArrayBlockingQueue
is a bounded, blocking queue that stores the elements internally in an array. That it is bounded means that it cannot store unlimited amounts of elements. There is an upper bound on the number of elements it can store at the same time. You set the upper bound at instantiation time, and after that it cannot be changed.
The ArrayBlockingQueue
stores the elements internally in FIFO order. The head
of the queue is the element which has been in queue the longest time, and the tail
of the queue is the element which has been in the queue the shortest time.
BlockingQueue<String> queue = new ArrayBlockingQueue<String>(1024);
queue.put("1");
String string = queue.take();
DelayQueue
The DelayQueue
blocks the elements internally until a certain delay has expired. The elements must implement the interface java.util.concurrent.Delayed
public interface Delayed extends Comparable<Delayed< {
public long getDelay(TimeUnit timeUnit);
}
The value returned by the getDelay()
method should be the delay remaining before this element can be released. If 0 or a negative value is returned, the delay will be considered expired, and the element released at the next take()
etc. call on the DelayQueue.
LinkedBlockingQueue
The LinkedBlockingQueue
keeps the elements internally in a linked structure (linked nodes). This linked structure can optionally have an upper bound if desired. If no upper bound is specified, Integer.MAX_VALUE
is used as the upper bound.
The LinkedBlockingQueue
stores the elements internally in FIFO order.
BlockingQueue<String> unbounded = new LinkedBlockingQueue<String>();
BlockingQueue<String> bounded = new LinkedBlockingQueue<String>(1024);
bounded.put("Value");
String value = bounded.take();
PriorityBlockingQueue
The PriorityBlockingQueue
is an unbounded concurrent queue. It uses the same ordering rules as the java.util.PriorityQueue
class. You cannot insert null into this queue.
All elements inserted into the PriorityBlockingQueue
must implement the java.lang.Comparable
interface. The elements thus order themselves according to whatever priority you decide in your Comparable
implementation.
In case you obtain an Iterator
from a PriorityBlockingQueue
, the Iterator
does not guarantee to iterate the elements in priority order.
BlockingQueue queue = new PriorityBlockingQueue();
//String implements java.lang.Comparable
queue.put("Value");
String value = queue.take();
SynchronousQueue
The SynchronousQueue is a queue that can only contain a single element internally. A thread inserting an element into the queueis blocked until another thread takes that element from the queue.
Likewise, if a thread tries to take an element and no element is currently present, that thread is blocked until a thread insert an element into the queue.