· BlockingQueue
in Java is added in Java 1.5 along with various other concurrent Utility
classes like ConcurrentHashMap, Counting Semaphore, CopyOnWriteArrrayList etc.
· BlockingQueue
is a unique collection type which not only store elements but also supports flow
control by introducing blocking if either BlockingQueue is full or empty. take()
method of BlockingQueue will block if Queue is empty and put() method of
BlockingQueue will block if Queue is full.
· This
property makes BlockingQueue an ideal choice for implementing Producer consumer
design pattern where one thread insert element into BlockingQueue and other
thread consumes it.

· BlockingQueue in Java doesn't allow null elements, various implementation of BlockingQueue like ArrayBlockingQueue, LinkedBlockingQueue throws NullPointerException when you try to add null on queue.
BlockingQueue<String> bQueue = new ArrayBlockingQueue<String>(10);
//bQueue.put(null); //NullPointerException - BlockingQueue in Java doesn't allow null
bQueue = new LinkedBlockingQueue<String>();
bQueue.put(null);
Exception in thread "main" java.lang.NullPointerException
java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:288)
//bQueue.put(null); //NullPointerException - BlockingQueue in Java doesn't allow null
bQueue = new LinkedBlockingQueue<String>();
bQueue.put(null);
Exception in thread "main" java.lang.NullPointerException
java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:288)
· BlockingQueue
can be bounded or unbounded. A bounded BlockingQueue is one which is
initialized with initial capacity and call to put() will be blocked if
BlockingQueue is full and size is equal to capacity. This bounding nature makes
it ideal to use a shared queue between multiple threads like in most common
Producer consumer solutions in Java. An unbounded Queue is one which is initialized
without capacity, actually by default it initialized with Integer.MAX_VALUE. Most
common example of BlockingQueue uses bounded BlockingQueue as shown in below
example.
· BlockingQueue
implementations like ArrayBlockingQueue, LinkedBlockingQueue and PriorityBlockingQueue
are thread-safe. All queuing method uses concurrency control and internal locks
to perform operation atomically. Since BlockingQueue also extend Collection,
bulk Collection operations like addAll(), containsAll() are not performed atomically
until any BlockingQueue implementation specifically supports it. So call to
addAll() may fail after inserting couple of elements.
BlockingQueue<String> bQueue
= new ArrayBlockingQueue<String>(2);
bQueue.put("Java");
System.out.println("Item 1 inserted into BlockingQueue");
bQueue.put("JDK");
System.out.println("Item 2 is inserted on BlockingQueue");
bQueue.put("J2SE");
System.out.println("Done");
Output:
Item 1 inserted into BlockingQueue
Item 2 is inserted on BlockingQueue
bQueue.put("Java");
System.out.println("Item 1 inserted into BlockingQueue");
bQueue.put("JDK");
System.out.println("Item 2 is inserted on BlockingQueue");
bQueue.put("J2SE");
System.out.println("Done");
Output:
Item 1 inserted into BlockingQueue
Item 2 is inserted on BlockingQueue
·
Common
methods of BlockingQueue is are put() and take() which are blocking methods in
Java and used to insert and retrieve elements from BlockingQueue in Java. put()
will block if BlockingQueue is full and take() will block if BlockingQueue is empty,
call to take() removes element from head of Queue
·
BlockingQueue
interface extends Collection, Queue and Iterable interface which provides it
all Collection and Queue related methods like poll(), and peak(), unlike
take(), peek() method returns head of the queue without removing it, poll()
also retrieves and removes elements from head but can wait till specified time
if Queue is empty.
·
Other
important methods from BlockingQueue in Java is remainingCapacity() and
offer(), former returns number remaining space in BlockingQueue, which can be
filled without blocking while later insert object into queue if possible and
return true if success and false if fail unlike add() method which throws
IllegalStateException if it fails to insert object into BlockingQueue. Use
offer() over add() wherever possible.
import java.util.concurrent.BlockingQueue;
public class Producer implements Runnable{
BlockingQueue<Object> queue = null;
Producer(BlockingQueue<Object> theQueue) {
this.queue = theQueue;
}
@Override
public void run() {
while (true) {
try {
Object obj = getResource();
queue.put(obj);
System.out.println("Produced Resource
- Queue size now : " +
queue.size());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public Object getResource() {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
System.out.println("Thread
Interrupted");
}
return new Object();
}
}
import java.util.concurrent.BlockingQueue;
public class Consumer implements Runnable{
BlockingQueue<Object> queue = null;
public Consumer(BlockingQueue<Object> theQueue) {
this.queue = theQueue;
}
@Override
public void run() {
try {
Object obj = queue.take();
System.out.println("Consumed Object - Queue
size now : " + queue.size());
take(obj);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void take(Object obj) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
System.out.println("Thread
Interrupted");
}
System.out.println("Consumed Resource : " + obj);
}
}
import java.util.concurrent.BlockingQueue;
import
java.util.concurrent.LinkedBlockingQueue;
public class ProducerConsumerExample {
public static void main(String[] args) throws
InterruptedException {
BlockingQueue<Object> queue = new
LinkedBlockingQueue<>(20);
int numProducer = 4;
int numConsumer = 4;
for (int i=0; i<numProducer; i++) {
new Thread(new Producer(queue)).start();;
}
for (int j=0; j<numConsumer; j++) {
new Thread(new Consumer(queue)).start();
}
Thread.sleep(10*100);
System.exit(0);
}
}
[Multithreading Interview Questions]
We recommend you take Big Data Hadoop class room training at eMexo Technologies in electronic city, Bangalore to learn more about Big Data Hadoop.
0 Comments:
Post a Comment