This example uses the producer consumer pattern with 2 threadpools, one holding the producer Threads and the other one the consumer Threads. The data to process is passed between the 2 pools using a BlockingQueue holding the Message object to process.
final int NB_THREAD = 3; final int QUEUE_SIZE = 5; // Create the executor service with 5 Workers ExecutorService producerExecutors = Executors.newFixedThreadPool(NB_THREAD); ExecutorService consumerExecutors = Executors.newFixedThreadPool(NB_THREAD); // The queue holding the messages BlockingQueue<Message> queue = new ArrayBlockingQueue<>(QUEUE_SIZE);
The Threads monitor 3 events:
CountDownLatch startSignal = new CountDownLatch(1); CountDownLatch doneProducing = new CountDownLatch(NB_THREAD); CountDownLatch doneConsuming = new CountDownLatch(NB_THREAD); // Create the producers and consumers for( int i = 0; i < NB_THREAD ; i++ ){ producerExecutors.execute( new Producer( ""+i , queue, startSignal, doneProducing) ); } // Create the consumers for( int i = 0; i < NB_THREAD ; i++ ){ consumerExecutors.execute( new Consumer( ""+i , queue, startSignal, doneProducing, doneConsuming) ); }
The main Thread starts the process changing the startSignal CountDownLatch. Then waits for the doneProducing event to happen, for stoping the producer threadpool. After it waits for the doneConsuming event to happen and stops the Consumer Threadpool
// Start the process startSignal.countDown(); try { doneProducing.await(); } catch (InterruptedException e1) { // TODO Auto-generated catch block e1.printStackTrace(); } // Shutdown the producers so no new producers are accepted producerExecutors.shutdown(); try { doneConsuming.await(); } catch (InterruptedException e1) { // TODO Auto-generated catch block e1.printStackTrace(); } System.out.println( "shutting down the consumerExecutors" ); consumerExecutors.shutdown(); System.out.println( "Done" );
In a similar way, a Producer waits for the start signal to happen. Then produce the Message Objects and pass them in the Queue. When no more Objects need to be created, the done signal is called.
class Producer implements Runnable { private String id; private int localID = 0; private BlockingQueuequeue; private CountDownLatch startSignal; private CountDownLatch doneSignal; Producer(String id, BlockingQueue<Message> queue, CountDownLatch startSignal, CountDownLatch doneProducing){ this.id = id; this.queue = queue; this.startSignal = startSignal; this.doneSignal = doneProducing; } @Override public void run() { // Wait for the start signal try { startSignal.await(); } catch (InterruptedException e1) { // TODO Auto-generated catch block e1.printStackTrace(); } for(int i = 0; i<5 ; i++){ try { Message m = produce(); try { Thread.sleep( 100 ); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } queue.put( m ); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } // update the done signal doneSignal.countDown(); System.out.println( "Producer " + id + " Done" ); } Message produce(){ Message m = new Message( id + " " + localID ); System.out.println( "Producer " + id + " producing message " + m.getId() ); localID++; return m; } }
The Consumer Thread is implemented the same way as the producer Thread except that it also checks if the queue is empty and the producing is done to call the doneConsuming latch.
class Consumer implements Runnable { private String id; private BlockingQueue<Message> queue; private CountDownLatch startSignal; private CountDownLatch doneProducing; private CountDownLatch doneConsuming; Consumer(String id, BlockingQueue<Message> queue, CountDownLatch startSignal, CountDownLatch doneProducing, CountDownLatch doneConsuming){ this.id = id; this.queue = queue; this.startSignal = startSignal; this.doneProducing = doneProducing; this.doneConsuming = doneConsuming; } @Override public void run() { try { startSignal.await(); } catch (InterruptedException e1) { // TODO Auto-generated catch block e1.printStackTrace(); } while( doneProducing.getCount() != 0 || !queue.isEmpty() ){ Message m = null; try { synchronized( queue ){ if( !queue.isEmpty() ) m = queue.take(); } } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } if( m != null ) consume(m); } doneConsuming.countDown(); System.out.println( "Consumer " + id + " Done" ); } public void consume( Message m ){ System.out.println( "Consumer " + id + " consuming message " + m.getId() ); } }
To run the full example.
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class ProducerConsumerPattern { public static void main(String[] argv){ final int NB_THREAD = 3; final int QUEUE_SIZE = 5; // Create the executor service with 5 Workers ExecutorService producerExecutors = Executors.newFixedThreadPool(NB_THREAD); ExecutorService consumerExecutors = Executors.newFixedThreadPool(NB_THREAD); // The queue holding the messages BlockingQueue<Message> queue = new ArrayBlockingQueue<>(QUEUE_SIZE); CountDownLatch startSignal = new CountDownLatch(1); CountDownLatch doneProducing = new CountDownLatch(NB_THREAD); CountDownLatch doneConsuming = new CountDownLatch(NB_THREAD); // Create the producers and consumers for( int i = 0; i < NB_THREAD ; i++ ){ producerExecutors.execute( new Producer( ""+i , queue, startSignal, doneProducing) ); } // Create the consumers for( int i = 0; i < NB_THREAD ; i++ ){ consumerExecutors.execute( new Consumer( ""+i , queue, startSignal, doneProducing, doneConsuming) ); } // Start the process startSignal.countDown(); try { doneProducing.await(); } catch (InterruptedException e1) { // TODO Auto-generated catch block e1.printStackTrace(); } // Shutdown the producers so no new producers are accepted producerExecutors.shutdown(); try { doneConsuming.await(); } catch (InterruptedException e1) { // TODO Auto-generated catch block e1.printStackTrace(); } System.out.println( "shutting down the consumerExecutors" ); consumerExecutors.shutdown(); System.out.println( "Done" ); } } class Producer implements Runnable { private String id; private int localID = 0; private BlockingQueue<Message> queue; private CountDownLatch startSignal; private CountDownLatch doneSignal; Producer(String id, BlockingQueue<Message> queue, CountDownLatch startSignal, CountDownLatch doneProducing){ this.id = id; this.queue = queue; this.startSignal = startSignal; this.doneSignal = doneProducing; } @Override public void run() { // Wait for the start signal try { startSignal.await(); } catch (InterruptedException e1) { // TODO Auto-generated catch block e1.printStackTrace(); } for(int i = 0; i<5 ; i++){ try { Message m = produce(); try { Thread.sleep( 100 ); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } queue.put( m ); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } // update the done signal doneSignal.countDown(); System.out.println( "Producer " + id + " Done" ); } Message produce(){ Message m = new Message( id + " " + localID ); System.out.println( "Producer " + id + " producing message " + m.getId() ); localID++; return m; } } class Consumer implements Runnable { private String id; private BlockingQueue<Message> queue; private CountDownLatch startSignal; private CountDownLatch doneProducing; private CountDownLatch doneConsuming; Consumer(String id, BlockingQueue<Message> queue, CountDownLatch startSignal, CountDownLatch doneProducing, CountDownLatch doneConsuming){ this.id = id; this.queue = queue; this.startSignal = startSignal; this.doneProducing = doneProducing; this.doneConsuming = doneConsuming; } @Override public void run() { try { startSignal.await(); } catch (InterruptedException e1) { // TODO Auto-generated catch block e1.printStackTrace(); } while( doneProducing.getCount() != 0 || !queue.isEmpty() ){ Message m = null; try { synchronized( queue ){ if( !queue.isEmpty() ) m = queue.take(); } } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } if( m != null ) consume(m); } doneConsuming.countDown(); System.out.println( "Consumer " + id + " Done" ); } public void consume( Message m ){ System.out.println( "Consumer " + id + " consuming message " + m.getId() ); } } class Message { private String id; Message(String id){ this.id = id; } public String getId(){ return id; } }
Producer 2 producing message 2 0 Producer 0 producing message 0 0 Producer 1 producing message 1 0 Producer 2 producing message 2 1 Producer 0 producing message 0 1 Producer 1 producing message 1 1 Consumer 0 consuming message 0 0 Consumer 2 consuming message 2 0 Consumer 1 consuming message 1 0 Producer 1 producing message 1 2 Consumer 0 consuming message 1 1 Producer 2 producing message 2 2 Producer 0 producing message 0 2 Consumer 1 consuming message 2 1 Consumer 2 consuming message 0 1 Producer 2 producing message 2 3 Consumer 0 consuming message 1 2 Producer 1 producing message 1 3 Consumer 2 consuming message 0 2 Producer 0 producing message 0 3 Consumer 1 consuming message 2 2 Producer 2 producing message 2 4 Consumer 1 consuming message 1 3 Producer 1 producing message 1 4 Producer 0 producing message 0 4 Consumer 0 consuming message 2 3 Consumer 2 consuming message 0 3 Consumer 0 consuming message 0 4 Producer 2 Done Consumer 1 consuming message 2 4 Producer 1 Done Producer 0 Done Consumer 2 consuming message 1 4 Consumer 1 Done Consumer 0 Done Consumer 2 Done shutting down the consumerExecutors Done
java.util.concurrent.Executors