CountDownLatchは、カウントがゼロになるまですべてのスレッドをブロックします。 カウントがゼロになると、すべての待機スレッドが再び有効になります。
CountDownLatch doneSignal = new CountDownLatch(1); // Start Thread with Latch // Wait 10 seconds for thread to complete doneSignal.await(10, TimeUnit.SECONDS);
次の例では、リストからタスクを消費するスレッドを持つExecutorServiceを作成します。 各タスクにはCountDownLatchがあり、メインスレッドは処理の完了またはタイムアウトを待つことができます。 この例では、タイムアウトは10秒です。
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class CountDownLatchExample {
public static void main(String[] argv){
final int NB_THREAD = 1;
// Create the executor service with 5 Workers
ExecutorService consumerExecutors = Executors.newFixedThreadPool(NB_THREAD);
// The queue holding the messages
List<Message> queue = Collections.synchronizedList(new LinkedList<Message>());
CountDownLatch doneSignal = new CountDownLatch(1);
CountDownLatch doneProducingSignal = new CountDownLatch(1);
CountDownLatch doneConsumingSignal = new CountDownLatch(NB_THREAD);
// Create the consumers
for( int i = 0; i < NB_THREAD ; i++ ){
consumerExecutors.execute( new Consumer( ""+i , queue, doneProducingSignal, doneConsumingSignal) );
}
queue.add( new Message( "1", 15000, doneSignal ) );
queue.add( new Message( "2", 15000, new CountDownLatch(1) ) ); // second latch is not used
// All the elements have been added to the list
doneProducingSignal.countDown();
// Wait done signal or 10 sec
boolean doneProcessing = false;
try {
doneProcessing = doneSignal.await(10, TimeUnit.SECONDS);
} catch (InterruptedException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
if ( doneProcessing ){
System.out.println( "Processing is done." );
} else {
System.out.println( "Processing is still running." );
}
System.out.println( "Shutting down the consumerExecutors" );
// Stop the Consumer threads.
doneProducingSignal.countDown();
// Wait for all the consuming Threads to complete
try {
doneConsumingSignal.await();
} catch (InterruptedException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
// Stop the Executor
consumerExecutors.shutdown();
System.out.println( "Done" );
}
}
class Consumer implements Runnable {
private String id;
private List<Message> queue;
private CountDownLatch doneProducing;
private CountDownLatch doneConsuming;
Consumer(String id, List<Message> queue, CountDownLatch doneProducing, CountDownLatch doneConsuming){
this.id = id;
this.queue = queue;
this.doneProducing = doneProducing;
this.doneConsuming = doneConsuming;
}
@Override
public void run() {
while( doneProducing.getCount() != 0 || !queue.isEmpty() ){
Message m = null;
synchronized( queue ){
if( !queue.isEmpty() )
m = queue.remove(0);
}
if( m != null )
consume(m);
}
System.out.println( "Consumer " + id + " Done" );
doneConsuming.countDown();
}
public void consume( Message m ){
System.out.println( "Consumer " + id + " consuming message " + m.getId() );
// Call the sleep time
try {
Thread.sleep( m.getTime() );
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println( "Consumer " + id + " done consuming message " + m.getId() );
// Decrement the Latch as the process is done.
m.getLatch().countDown();
}
}
class Message {
private String id;
private int time;
private CountDownLatch latch;
Message(String id, int time, CountDownLatch latch){
this.id = id;
this.time = time;
this.latch = latch;
}
public String getId(){
return id;
}
public int getTime(){
return time;
}
public CountDownLatch getLatch(){
return latch;
}
}
Consumer 0 consuming message 1 Processing is still running. Shutting down the consumerExecutors Consumer 0 done consuming message 1 Consumer 0 consuming message 2 Consumer 0 done consuming message 2 Consumer 0 Done Done
java.util.concurrent.Executors
java.util.concurrent.CountDownLatch