How to use the CountDownLatch Class in java

The CountDownLatch blocks all threads until the count reaches zero. When the count reaches zero, all the waiting threads a re-enabled.

Syntax:

CountDownLatch doneSignal = new CountDownLatch(1);
		
// Start Thread with Latch
		
// Wait 10 seconds for thread to complete
doneSignal.await(10, TimeUnit.SECONDS);

The following example creates an ExecutorService that will have a thread consuming tasks from a List. Each task has a CountDownLatch, this way the main Thread can wait for the processing to complete or timeout. In this example the timeout is 10 seconds.

Create the following java file:

import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
 
public class CountDownLatchExample {

	public static void main(String[] argv){

		final int NB_THREAD  = 1;
		
		// Create the executor service with 5 Workers
		ExecutorService consumerExecutors = Executors.newFixedThreadPool(NB_THREAD);

		// The queue holding the messages
		List<Message> queue = Collections.synchronizedList(new LinkedList<Message>());

		CountDownLatch doneSignal = new CountDownLatch(1);

		CountDownLatch doneProducingSignal = new CountDownLatch(1);
		CountDownLatch doneConsumingSignal = new CountDownLatch(NB_THREAD);

		// Create the consumers
		for( int i = 0; i < NB_THREAD ; i++ ){
		    consumerExecutors.execute( new Consumer( ""+i , queue, doneProducingSignal, doneConsumingSignal) );
		}
		
		queue.add( new Message( "1", 15000, doneSignal ) );
		queue.add( new Message( "2", 15000, new CountDownLatch(1) ) ); // second latch is not used
		
		// All the elements have been added to the list
		doneProducingSignal.countDown();
		
		// Wait done signal or 10 sec
		boolean doneProcessing = false;
		try {
		    doneProcessing = doneSignal.await(10, TimeUnit.SECONDS);
		} catch (InterruptedException e1) {
			// TODO Auto-generated catch block
			e1.printStackTrace();
		}
		
		if ( doneProcessing ){
		    System.out.println( "Processing is done." ); 		
		} else {
		    System.out.println( "Processing is still running." ); 		
		}

		System.out.println( "Shutting down the consumerExecutors" ); 		

		// Stop the Consumer threads.
		doneProducingSignal.countDown();		

		// Wait for all the consuming Threads to complete
		try {
		    doneConsumingSignal.await();
		} catch (InterruptedException e1) {
			// TODO Auto-generated catch block
			e1.printStackTrace();
		}
		
		// Stop the Executor
		consumerExecutors.shutdown();
		System.out.println( "Done" ); 		
     }
}

class Consumer implements Runnable {
	private String id;
	private List<Message> queue;
	private CountDownLatch doneProducing;
	private CountDownLatch doneConsuming;
	
	Consumer(String id, List<Message> queue,  CountDownLatch doneProducing, CountDownLatch doneConsuming){
		this.id = id;
		this.queue = queue;
		this.doneProducing = doneProducing;
		this.doneConsuming = doneConsuming;
	}
	 
	@Override
	public void run() {
		
	    while( doneProducing.getCount() != 0 || !queue.isEmpty() ){ 

		Message m = null;

		synchronized( queue ){
		    if( !queue.isEmpty() )
			m = queue.remove(0);
		}

		if( m != null )
		    consume(m); 
		
	    }
		
	    System.out.println( "Consumer " + id + " Done" );
	    doneConsuming.countDown();
	}	
	
	public void consume( Message m ){
	    System.out.println( "Consumer " + id + " consuming message " + m.getId() );
	    
	    // Call the sleep time
	    try {
		Thread.sleep( m.getTime() );
	    } catch (InterruptedException e) {
		// TODO Auto-generated catch block
		e.printStackTrace();
	    }
	    
	    System.out.println( "Consumer " + id + " done consuming message " + m.getId() );

	    // Decrement the Latch as the process is done.
	    m.getLatch().countDown();
	}
}

class Message {
	private String id;
	private int time;
	private CountDownLatch latch;
	
	Message(String id, int time, CountDownLatch latch){
		this.id = id;
		this.time = time;
		this.latch = latch;
	}
	
	public String getId(){
		return id;
	}

	public int getTime(){
		return time;
	}
	public CountDownLatch getLatch(){
		return latch;
	}
}

The output will be:

Consumer 0 consuming message 1
Processing is still running.
Shutting down the consumerExecutors
Consumer 0 done consuming message 1
Consumer 0 consuming message 2
Consumer 0 done consuming message 2
Consumer 0 Done
Done


References:

java.util.concurrent.Executors

java.util.concurrent.CountDownLatch

Share this article:

Recent Comments