View Javadoc
1   package us.codecraft.webmagic.thread;
2   
3   import java.util.concurrent.ExecutorService;
4   import java.util.concurrent.Executors;
5   import java.util.concurrent.atomic.AtomicInteger;
6   import java.util.concurrent.locks.Condition;
7   import java.util.concurrent.locks.ReentrantLock;
8   
9   /**
10   * Thread pool for workers.<br><br>
11   * Use {@link java.util.concurrent.ExecutorService} as inner implement. <br><br>
12   * New feature: <br><br>
13   * 1. Block when thread pool is full to avoid poll many urls without process. <br><br>
14   * 2. Count of thread alive for monitor.
15   *
16   * @author code4crafer@gmail.com
17   * @since 0.5.0
18   */
19  public class CountableThreadPool {
20  
21      private int threadNum;
22  
23      private AtomicInteger threadAlive = new AtomicInteger();
24  
25      private ReentrantLock reentrantLock = new ReentrantLock();
26  
27      private Condition condition = reentrantLock.newCondition();
28  
29      public CountableThreadPool(int threadNum) {
30          this.threadNum = threadNum;
31          this.executorService = Executors.newFixedThreadPool(threadNum);
32      }
33  
34      public CountableThreadPool(int threadNum, ExecutorService executorService) {
35          this.threadNum = threadNum;
36          this.executorService = executorService;
37      }
38  
39      public void setExecutorService(ExecutorService executorService) {
40          this.executorService = executorService;
41      }
42  
43      public int getThreadAlive() {
44          return threadAlive.get();
45      }
46  
47      public int getThreadNum() {
48          return threadNum;
49      }
50  
51      private ExecutorService executorService;
52  
53      public void execute(final Runnable runnable) {
54  
55  
56          if (threadAlive.get() >= threadNum) {
57              try {
58                  reentrantLock.lock();
59                  while (threadAlive.get() >= threadNum) {
60                      try {
61                          condition.await();
62                      } catch (InterruptedException e) {
63                      }
64                  }
65              } finally {
66                  reentrantLock.unlock();
67              }
68          }
69          threadAlive.incrementAndGet();
70          executorService.execute(new Runnable() {
71              @Override
72              public void run() {
73                  try {
74                      runnable.run();
75                  } finally {
76                      try {
77                          reentrantLock.lock();
78                          threadAlive.decrementAndGet();
79                          condition.signal();
80                      } finally {
81                          reentrantLock.unlock();
82                      }
83                  }
84              }
85          });
86      }
87  
88      public boolean isShutdown() {
89          return executorService.isShutdown();
90      }
91  
92      public void shutdown() {
93          executorService.shutdown();
94      }
95  
96  
97  }