CountDownLatch阻止所有線程,直到計數達到零。 當計數達到零時,所有等待的線程都被重新啟用。
CountDownLatch doneSignal = new CountDownLatch(1); // Start Thread with Latch // Wait 10 seconds for thread to complete doneSignal.await(10, TimeUnit.SECONDS);
下面的示例創建一個ExecutorService,它將有一個線程從一個List中消耗任務。 每個任務都有一個CountDownLatch,這樣主線程可以等待處理完成或超時。 在這個例子中,超時時間是10秒。
import java.util.Collections; import java.util.LinkedList; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; public class CountDownLatchExample { public static void main(String[] argv){ final int NB_THREAD = 1; // Create the executor service with 5 Workers ExecutorService consumerExecutors = Executors.newFixedThreadPool(NB_THREAD); // The queue holding the messages List<Message> queue = Collections.synchronizedList(new LinkedList<Message>()); CountDownLatch doneSignal = new CountDownLatch(1); CountDownLatch doneProducingSignal = new CountDownLatch(1); CountDownLatch doneConsumingSignal = new CountDownLatch(NB_THREAD); // Create the consumers for( int i = 0; i < NB_THREAD ; i++ ){ consumerExecutors.execute( new Consumer( ""+i , queue, doneProducingSignal, doneConsumingSignal) ); } queue.add( new Message( "1", 15000, doneSignal ) ); queue.add( new Message( "2", 15000, new CountDownLatch(1) ) ); // second latch is not used // All the elements have been added to the list doneProducingSignal.countDown(); // Wait done signal or 10 sec boolean doneProcessing = false; try { doneProcessing = doneSignal.await(10, TimeUnit.SECONDS); } catch (InterruptedException e1) { // TODO Auto-generated catch block e1.printStackTrace(); } if ( doneProcessing ){ System.out.println( "Processing is done." ); } else { System.out.println( "Processing is still running." ); } System.out.println( "Shutting down the consumerExecutors" ); // Stop the Consumer threads. doneProducingSignal.countDown(); // Wait for all the consuming Threads to complete try { doneConsumingSignal.await(); } catch (InterruptedException e1) { // TODO Auto-generated catch block e1.printStackTrace(); } // Stop the Executor consumerExecutors.shutdown(); System.out.println( "Done" ); } } class Consumer implements Runnable { private String id; private List<Message> queue; private CountDownLatch doneProducing; private CountDownLatch doneConsuming; Consumer(String id, List<Message> queue, CountDownLatch doneProducing, CountDownLatch doneConsuming){ this.id = id; this.queue = queue; this.doneProducing = doneProducing; this.doneConsuming = doneConsuming; } @Override public void run() { while( doneProducing.getCount() != 0 || !queue.isEmpty() ){ Message m = null; synchronized( queue ){ if( !queue.isEmpty() ) m = queue.remove(0); } if( m != null ) consume(m); } System.out.println( "Consumer " + id + " Done" ); doneConsuming.countDown(); } public void consume( Message m ){ System.out.println( "Consumer " + id + " consuming message " + m.getId() ); // Call the sleep time try { Thread.sleep( m.getTime() ); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println( "Consumer " + id + " done consuming message " + m.getId() ); // Decrement the Latch as the process is done. m.getLatch().countDown(); } } class Message { private String id; private int time; private CountDownLatch latch; Message(String id, int time, CountDownLatch latch){ this.id = id; this.time = time; this.latch = latch; } public String getId(){ return id; } public int getTime(){ return time; } public CountDownLatch getLatch(){ return latch; } }
Consumer 0 consuming message 1 Processing is still running. Shutting down the consumerExecutors Consumer 0 done consuming message 1 Consumer 0 consuming message 2 Consumer 0 done consuming message 2 Consumer 0 Done Done