1 package us.codecraft.webmagic.thread;
2
3 import java.util.concurrent.ExecutorService;
4 import java.util.concurrent.Executors;
5 import java.util.concurrent.atomic.AtomicInteger;
6 import java.util.concurrent.locks.Condition;
7 import java.util.concurrent.locks.ReentrantLock;
8
9
10
11
12
13
14
15
16
17
18
19 public class CountableThreadPool {
20
21 private int threadNum;
22
23 private AtomicInteger threadAlive = new AtomicInteger();
24
25 private ReentrantLock reentrantLock = new ReentrantLock();
26
27 private Condition condition = reentrantLock.newCondition();
28
29 public CountableThreadPool(int threadNum) {
30 this.threadNum = threadNum;
31 this.executorService = Executors.newFixedThreadPool(threadNum);
32 }
33
34 public CountableThreadPool(int threadNum, ExecutorService executorService) {
35 this.threadNum = threadNum;
36 this.executorService = executorService;
37 }
38
39 public void setExecutorService(ExecutorService executorService) {
40 this.executorService = executorService;
41 }
42
43 public int getThreadAlive() {
44 return threadAlive.get();
45 }
46
47 public int getThreadNum() {
48 return threadNum;
49 }
50
51 private ExecutorService executorService;
52
53 public void execute(final Runnable runnable) {
54
55
56 if (threadAlive.get() >= threadNum) {
57 try {
58 reentrantLock.lock();
59 while (threadAlive.get() >= threadNum) {
60 try {
61 condition.await();
62 } catch (InterruptedException e) {
63 }
64 }
65 } finally {
66 reentrantLock.unlock();
67 }
68 }
69 threadAlive.incrementAndGet();
70 executorService.execute(new Runnable() {
71 @Override
72 public void run() {
73 try {
74 runnable.run();
75 } finally {
76 try {
77 reentrantLock.lock();
78 threadAlive.decrementAndGet();
79 condition.signal();
80 } finally {
81 reentrantLock.unlock();
82 }
83 }
84 }
85 });
86 }
87
88 public boolean isShutdown() {
89 return executorService.isShutdown();
90 }
91
92 public void shutdown() {
93 executorService.shutdown();
94 }
95
96
97 }