How to implement the producer consumer pattern

This example uses the producer consumer pattern with 2 threadpools, one holding the producer Threads and the other one the consumer Threads. The data to process is passed between the 2 pools using a BlockingQueue holding the Message object to process.

final int NB_THREAD  = 3;
final int QUEUE_SIZE = 5;
		
// Create the executor service with 5 Workers
ExecutorService producerExecutors = Executors.newFixedThreadPool(NB_THREAD);
ExecutorService consumerExecutors = Executors.newFixedThreadPool(NB_THREAD);

// The queue holding the messages
BlockingQueue<Message> queue = new ArrayBlockingQueue<>(QUEUE_SIZE);

The Threads monitor 3 events:

  1. The start of the process or start signal.
  2. When all Messages are generated, with the doneProducing CountDownLatch
  3. When all Messages are consumers, with the DoneConsuming CountDownLatch

		CountDownLatch startSignal = new CountDownLatch(1);
		CountDownLatch doneProducing = new CountDownLatch(NB_THREAD);
		CountDownLatch doneConsuming = new CountDownLatch(NB_THREAD);

		// Create the producers and consumers 
		for( int i = 0; i < NB_THREAD ; i++ ){
			producerExecutors.execute( new Producer( ""+i , queue, startSignal, doneProducing) );
		}
		
		// Create the consumers
		for( int i = 0; i < NB_THREAD ; i++ ){
			consumerExecutors.execute( new Consumer( ""+i , queue, startSignal, doneProducing, doneConsuming) );
		}

The main Thread starts the process changing the startSignal CountDownLatch. Then waits for the doneProducing event to happen, for stoping the producer threadpool. After it waits for the doneConsuming event to happen and stops the Consumer Threadpool

		// Start the process
		startSignal.countDown();
		
		try {
			doneProducing.await();
		} catch (InterruptedException e1) {
			// TODO Auto-generated catch block
			e1.printStackTrace();
		}
		
		// Shutdown the producers so no new producers are accepted
		producerExecutors.shutdown();
		
		try {
			doneConsuming.await();
		} catch (InterruptedException e1) {
			// TODO Auto-generated catch block
			e1.printStackTrace();
		}
		System.out.println( "shutting down the consumerExecutors" ); 		
		consumerExecutors.shutdown();

		System.out.println( "Done" ); 		

In a similar way, a Producer waits for the start signal to happen. Then produce the Message Objects and pass them in the Queue. When no more Objects need to be created, the done signal is called.

class Producer implements Runnable {
	private String id;
	private int localID = 0;

	private BlockingQueue queue;
	private CountDownLatch startSignal;
	private CountDownLatch doneSignal;
	
	Producer(String id, BlockingQueue<Message> queue, CountDownLatch startSignal, CountDownLatch doneProducing){
		this.id = id;
		this.queue = queue;
		
		this.startSignal = startSignal;
		this.doneSignal  = doneProducing;
	}
	
	@Override
	public void run() {
		// Wait for the start signal
		try {
			startSignal.await();
		} catch (InterruptedException e1) {
			// TODO Auto-generated catch block
			e1.printStackTrace();
		}
		
		for(int i = 0; i<5 ; i++){
			try {
				Message m = produce();

				try {
					Thread.sleep( 100 );
				} catch (InterruptedException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
				
				queue.put( m );
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
		
		// update the done signal
		doneSignal.countDown();
		
		System.out.println( "Producer " + id + " Done" ); 		
	}
	
	Message produce(){
		Message m = new Message( id + " " + localID );
		System.out.println( "Producer " + id + " producing message " + m.getId() ); 		
		
		localID++;
		
		return m;
	}
}

The Consumer Thread is implemented the same way as the producer Thread except that it also checks if the queue is empty and the producing is done to call the doneConsuming latch.

class Consumer implements Runnable {
	private String id;
	private BlockingQueue<Message> queue;
	private CountDownLatch startSignal;
	private CountDownLatch doneProducing;
	private CountDownLatch doneConsuming;
	
	Consumer(String id, BlockingQueue<Message> queue,  CountDownLatch startSignal, CountDownLatch doneProducing, CountDownLatch doneConsuming){
		this.id = id;
		this.queue = queue;
		this.startSignal = startSignal;
		this.doneProducing = doneProducing;
		this.doneConsuming = doneConsuming;
	}
	 
	@Override
	public void run() {
		try {
			startSignal.await();
		} catch (InterruptedException e1) {
			// TODO Auto-generated catch block
			e1.printStackTrace();
		}
		
		while( doneProducing.getCount() != 0 || !queue.isEmpty() ){ 

			Message m = null;

			try {
				synchronized( queue ){
					if( !queue.isEmpty() )
						m = queue.take();
				}
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}

			if( m != null )
				consume(m); 
			
		}
		
		doneConsuming.countDown();
		
		System.out.println( "Consumer " + id + " Done" ); 		
	}	
	
	public void consume( Message m ){
		System.out.println( "Consumer " + id + " consuming message " + m.getId() );
	}
}

To run the full example.

Create the following java file:

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ProducerConsumerPattern {

	public static void main(String[] argv){

		final int NB_THREAD  = 3;
		final int QUEUE_SIZE = 5;
		
		// Create the executor service with 5 Workers
		ExecutorService producerExecutors = Executors.newFixedThreadPool(NB_THREAD);
		ExecutorService consumerExecutors = Executors.newFixedThreadPool(NB_THREAD);

		// The queue holding the messages
		BlockingQueue<Message> queue = new ArrayBlockingQueue<>(QUEUE_SIZE);

		CountDownLatch startSignal = new CountDownLatch(1);
		CountDownLatch doneProducing = new CountDownLatch(NB_THREAD);
		CountDownLatch doneConsuming = new CountDownLatch(NB_THREAD);

		// Create the producers and consumers 
		for( int i = 0; i < NB_THREAD ; i++ ){
			producerExecutors.execute( new Producer( ""+i , queue, startSignal, doneProducing) );
		}
		
		// Create the consumers
		for( int i = 0; i < NB_THREAD ; i++ ){
			consumerExecutors.execute( new Consumer( ""+i , queue, startSignal, doneProducing, doneConsuming) );
		}
		
		// Start the process
		startSignal.countDown();
		
		try {
			doneProducing.await();
		} catch (InterruptedException e1) {
			// TODO Auto-generated catch block
			e1.printStackTrace();
		}
		
		// Shutdown the producers so no new producers are accepted
		producerExecutors.shutdown();
		
		try {
			doneConsuming.await();
		} catch (InterruptedException e1) {
			// TODO Auto-generated catch block
			e1.printStackTrace();
		}
		System.out.println( "shutting down the consumerExecutors" ); 		
		consumerExecutors.shutdown();

		System.out.println( "Done" ); 		
     }
}

class Producer implements Runnable {
	private String id;
	private int localID = 0;

	private BlockingQueue<Message> queue;
	private CountDownLatch startSignal;
	private CountDownLatch doneSignal;
	
	Producer(String id, BlockingQueue<Message> queue, CountDownLatch startSignal, CountDownLatch doneProducing){
		this.id = id;
		this.queue = queue;
		
		this.startSignal = startSignal;
		this.doneSignal  = doneProducing;
	}
	
	@Override
	public void run() {
		// Wait for the start signal
		try {
			startSignal.await();
		} catch (InterruptedException e1) {
			// TODO Auto-generated catch block
			e1.printStackTrace();
		}
		
		for(int i = 0; i<5 ; i++){
			try {
				Message m = produce();

				try {
					Thread.sleep( 100 );
				} catch (InterruptedException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
				
				queue.put( m );
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
		
		// update the done signal
		doneSignal.countDown();
		
		System.out.println( "Producer " + id + " Done" ); 		
	}
	
	Message produce(){
		Message m = new Message( id + " " + localID );
		System.out.println( "Producer " + id + " producing message " + m.getId() ); 		
		
		localID++;
		
		return m;
	}
}

class Consumer implements Runnable {
	private String id;
	private BlockingQueue<Message> queue;
	private CountDownLatch startSignal;
	private CountDownLatch doneProducing;
	private CountDownLatch doneConsuming;
	
	Consumer(String id, BlockingQueue<Message> queue,  CountDownLatch startSignal, CountDownLatch doneProducing, CountDownLatch doneConsuming){
		this.id = id;
		this.queue = queue;
		this.startSignal = startSignal;
		this.doneProducing = doneProducing;
		this.doneConsuming = doneConsuming;
	}
	 
	@Override
	public void run() {
		try {
			startSignal.await();
		} catch (InterruptedException e1) {
			// TODO Auto-generated catch block
			e1.printStackTrace();
		}
		
		while( doneProducing.getCount() != 0 || !queue.isEmpty() ){ 

			Message m = null;

			try {
				synchronized( queue ){
					if( !queue.isEmpty() )
						m = queue.take();
				}
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}

			if( m != null )
				consume(m); 
			
		}
		
		doneConsuming.countDown();
		
		System.out.println( "Consumer " + id + " Done" ); 		
	}	
	
	public void consume( Message m ){
		System.out.println( "Consumer " + id + " consuming message " + m.getId() );
	}
}

class Message {
	private String id;
	Message(String id){
		this.id = id;
	}
	
	public String getId(){
		return id;
	}
}

The output will be:

Producer 2 producing message 2 0
Producer 0 producing message 0 0
Producer 1 producing message 1 0
Producer 2 producing message 2 1
Producer 0 producing message 0 1
Producer 1 producing message 1 1
Consumer 0 consuming message 0 0
Consumer 2 consuming message 2 0
Consumer 1 consuming message 1 0
Producer 1 producing message 1 2
Consumer 0 consuming message 1 1
Producer 2 producing message 2 2
Producer 0 producing message 0 2
Consumer 1 consuming message 2 1
Consumer 2 consuming message 0 1
Producer 2 producing message 2 3
Consumer 0 consuming message 1 2
Producer 1 producing message 1 3
Consumer 2 consuming message 0 2
Producer 0 producing message 0 3
Consumer 1 consuming message 2 2
Producer 2 producing message 2 4
Consumer 1 consuming message 1 3
Producer 1 producing message 1 4
Producer 0 producing message 0 4
Consumer 0 consuming message 2 3
Consumer 2 consuming message 0 3
Consumer 0 consuming message 0 4
Producer 2 Done
Consumer 1 consuming message 2 4
Producer 1 Done
Producer 0 Done
Consumer 2 consuming message 1 4
Consumer 1 Done
Consumer 0 Done
Consumer 2 Done
shutting down the consumerExecutors
Done


References:

java.util.concurrent.Executors

Recent Comments