This example uses the producer consumer pattern with 2 threadpools, one holding the producer Threads and the other one the consumer Threads. The data to process is passed between the 2 pools using a BlockingQueue holding the Message object to process.
final int NB_THREAD = 3; final int QUEUE_SIZE = 5; // Create the executor service with 5 Workers ExecutorService producerExecutors = Executors.newFixedThreadPool(NB_THREAD); ExecutorService consumerExecutors = Executors.newFixedThreadPool(NB_THREAD); // The queue holding the messages BlockingQueue<Message> queue = new ArrayBlockingQueue<>(QUEUE_SIZE);
The Threads monitor 3 events:
CountDownLatch startSignal = new CountDownLatch(1);
CountDownLatch doneProducing = new CountDownLatch(NB_THREAD);
CountDownLatch doneConsuming = new CountDownLatch(NB_THREAD);
// Create the producers and consumers
for( int i = 0; i < NB_THREAD ; i++ ){
producerExecutors.execute( new Producer( ""+i , queue, startSignal, doneProducing) );
}
// Create the consumers
for( int i = 0; i < NB_THREAD ; i++ ){
consumerExecutors.execute( new Consumer( ""+i , queue, startSignal, doneProducing, doneConsuming) );
}
The main Thread starts the process changing the startSignal CountDownLatch. Then waits for the doneProducing event to happen, for stoping the producer threadpool. After it waits for the doneConsuming event to happen and stops the Consumer Threadpool
// Start the process
startSignal.countDown();
try {
doneProducing.await();
} catch (InterruptedException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
// Shutdown the producers so no new producers are accepted
producerExecutors.shutdown();
try {
doneConsuming.await();
} catch (InterruptedException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
System.out.println( "shutting down the consumerExecutors" );
consumerExecutors.shutdown();
System.out.println( "Done" );
In a similar way, a Producer waits for the start signal to happen. Then produce the Message Objects and pass them in the Queue. When no more Objects need to be created, the done signal is called.
class Producer implements Runnable {
private String id;
private int localID = 0;
private BlockingQueue queue;
private CountDownLatch startSignal;
private CountDownLatch doneSignal;
Producer(String id, BlockingQueue<Message> queue, CountDownLatch startSignal, CountDownLatch doneProducing){
this.id = id;
this.queue = queue;
this.startSignal = startSignal;
this.doneSignal = doneProducing;
}
@Override
public void run() {
// Wait for the start signal
try {
startSignal.await();
} catch (InterruptedException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
for(int i = 0; i<5 ; i++){
try {
Message m = produce();
try {
Thread.sleep( 100 );
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
queue.put( m );
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
// update the done signal
doneSignal.countDown();
System.out.println( "Producer " + id + " Done" );
}
Message produce(){
Message m = new Message( id + " " + localID );
System.out.println( "Producer " + id + " producing message " + m.getId() );
localID++;
return m;
}
}
The Consumer Thread is implemented the same way as the producer Thread except that it also checks if the queue is empty and the producing is done to call the doneConsuming latch.
class Consumer implements Runnable {
private String id;
private BlockingQueue<Message> queue;
private CountDownLatch startSignal;
private CountDownLatch doneProducing;
private CountDownLatch doneConsuming;
Consumer(String id, BlockingQueue<Message> queue, CountDownLatch startSignal, CountDownLatch doneProducing, CountDownLatch doneConsuming){
this.id = id;
this.queue = queue;
this.startSignal = startSignal;
this.doneProducing = doneProducing;
this.doneConsuming = doneConsuming;
}
@Override
public void run() {
try {
startSignal.await();
} catch (InterruptedException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
while( doneProducing.getCount() != 0 || !queue.isEmpty() ){
Message m = null;
try {
synchronized( queue ){
if( !queue.isEmpty() )
m = queue.take();
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
if( m != null )
consume(m);
}
doneConsuming.countDown();
System.out.println( "Consumer " + id + " Done" );
}
public void consume( Message m ){
System.out.println( "Consumer " + id + " consuming message " + m.getId() );
}
}
To run the full example.
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ProducerConsumerPattern {
public static void main(String[] argv){
final int NB_THREAD = 3;
final int QUEUE_SIZE = 5;
// Create the executor service with 5 Workers
ExecutorService producerExecutors = Executors.newFixedThreadPool(NB_THREAD);
ExecutorService consumerExecutors = Executors.newFixedThreadPool(NB_THREAD);
// The queue holding the messages
BlockingQueue<Message> queue = new ArrayBlockingQueue<>(QUEUE_SIZE);
CountDownLatch startSignal = new CountDownLatch(1);
CountDownLatch doneProducing = new CountDownLatch(NB_THREAD);
CountDownLatch doneConsuming = new CountDownLatch(NB_THREAD);
// Create the producers and consumers
for( int i = 0; i < NB_THREAD ; i++ ){
producerExecutors.execute( new Producer( ""+i , queue, startSignal, doneProducing) );
}
// Create the consumers
for( int i = 0; i < NB_THREAD ; i++ ){
consumerExecutors.execute( new Consumer( ""+i , queue, startSignal, doneProducing, doneConsuming) );
}
// Start the process
startSignal.countDown();
try {
doneProducing.await();
} catch (InterruptedException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
// Shutdown the producers so no new producers are accepted
producerExecutors.shutdown();
try {
doneConsuming.await();
} catch (InterruptedException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
System.out.println( "shutting down the consumerExecutors" );
consumerExecutors.shutdown();
System.out.println( "Done" );
}
}
class Producer implements Runnable {
private String id;
private int localID = 0;
private BlockingQueue<Message> queue;
private CountDownLatch startSignal;
private CountDownLatch doneSignal;
Producer(String id, BlockingQueue<Message> queue, CountDownLatch startSignal, CountDownLatch doneProducing){
this.id = id;
this.queue = queue;
this.startSignal = startSignal;
this.doneSignal = doneProducing;
}
@Override
public void run() {
// Wait for the start signal
try {
startSignal.await();
} catch (InterruptedException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
for(int i = 0; i<5 ; i++){
try {
Message m = produce();
try {
Thread.sleep( 100 );
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
queue.put( m );
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
// update the done signal
doneSignal.countDown();
System.out.println( "Producer " + id + " Done" );
}
Message produce(){
Message m = new Message( id + " " + localID );
System.out.println( "Producer " + id + " producing message " + m.getId() );
localID++;
return m;
}
}
class Consumer implements Runnable {
private String id;
private BlockingQueue<Message> queue;
private CountDownLatch startSignal;
private CountDownLatch doneProducing;
private CountDownLatch doneConsuming;
Consumer(String id, BlockingQueue<Message> queue, CountDownLatch startSignal, CountDownLatch doneProducing, CountDownLatch doneConsuming){
this.id = id;
this.queue = queue;
this.startSignal = startSignal;
this.doneProducing = doneProducing;
this.doneConsuming = doneConsuming;
}
@Override
public void run() {
try {
startSignal.await();
} catch (InterruptedException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
while( doneProducing.getCount() != 0 || !queue.isEmpty() ){
Message m = null;
try {
synchronized( queue ){
if( !queue.isEmpty() )
m = queue.take();
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
if( m != null )
consume(m);
}
doneConsuming.countDown();
System.out.println( "Consumer " + id + " Done" );
}
public void consume( Message m ){
System.out.println( "Consumer " + id + " consuming message " + m.getId() );
}
}
class Message {
private String id;
Message(String id){
this.id = id;
}
public String getId(){
return id;
}
}
Producer 2 producing message 2 0 Producer 0 producing message 0 0 Producer 1 producing message 1 0 Producer 2 producing message 2 1 Producer 0 producing message 0 1 Producer 1 producing message 1 1 Consumer 0 consuming message 0 0 Consumer 2 consuming message 2 0 Consumer 1 consuming message 1 0 Producer 1 producing message 1 2 Consumer 0 consuming message 1 1 Producer 2 producing message 2 2 Producer 0 producing message 0 2 Consumer 1 consuming message 2 1 Consumer 2 consuming message 0 1 Producer 2 producing message 2 3 Consumer 0 consuming message 1 2 Producer 1 producing message 1 3 Consumer 2 consuming message 0 2 Producer 0 producing message 0 3 Consumer 1 consuming message 2 2 Producer 2 producing message 2 4 Consumer 1 consuming message 1 3 Producer 1 producing message 1 4 Producer 0 producing message 0 4 Consumer 0 consuming message 2 3 Consumer 2 consuming message 0 3 Consumer 0 consuming message 0 4 Producer 2 Done Consumer 1 consuming message 2 4 Producer 1 Done Producer 0 Done Consumer 2 consuming message 1 4 Consumer 1 Done Consumer 0 Done Consumer 2 Done shutting down the consumerExecutors Done
java.util.concurrent.Executors