1 package us.codecraft.webmagic;
2
3
4 import java.io.Closeable;
5 import java.io.IOException;
6 import java.util.ArrayList;
7 import java.util.Collection;
8 import java.util.Date;
9 import java.util.List;
10 import java.util.UUID;
11 import java.util.concurrent.ExecutorService;
12 import java.util.concurrent.atomic.AtomicInteger;
13 import java.util.concurrent.atomic.AtomicLong;
14 import org.apache.commons.collections4.CollectionUtils;
15 import org.apache.commons.lang3.SerializationUtils;
16 import org.slf4j.Logger;
17 import org.slf4j.LoggerFactory;
18 import us.codecraft.webmagic.downloader.Downloader;
19 import us.codecraft.webmagic.downloader.HttpClientDownloader;
20 import us.codecraft.webmagic.pipeline.CollectorPipeline;
21 import us.codecraft.webmagic.pipeline.ConsolePipeline;
22 import us.codecraft.webmagic.pipeline.Pipeline;
23 import us.codecraft.webmagic.pipeline.ResultItemsCollectorPipeline;
24 import us.codecraft.webmagic.processor.PageProcessor;
25 import us.codecraft.webmagic.scheduler.QueueScheduler;
26 import us.codecraft.webmagic.scheduler.Scheduler;
27 import us.codecraft.webmagic.thread.CountableThreadPool;
28 import us.codecraft.webmagic.utils.UrlUtils;
29 import us.codecraft.webmagic.utils.WMCollections;
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62 public class Spider implements Runnable, Task {
63
64 protected Downloader downloader;
65
66 protected List<Pipeline> pipelines = new ArrayList<Pipeline>();
67
68 protected PageProcessor pageProcessor;
69
70 protected List<Request> startRequests;
71
72 protected Site site;
73
74 protected String uuid;
75
76 protected SpiderScheduler scheduler;
77
78 protected Logger logger = LoggerFactory.getLogger(getClass());
79
80 protected CountableThreadPool threadPool;
81
82 protected ExecutorService executorService;
83
84 protected int threadNum = 1;
85
86 protected AtomicInteger stat = new AtomicInteger(STAT_INIT);
87
88 protected volatile boolean exitWhenComplete = true;
89
90 protected final static int STAT_INIT = 0;
91
92 protected final static int STAT_RUNNING = 1;
93
94 protected final static int STAT_STOPPED = 2;
95
96 protected boolean spawnUrl = true;
97
98 protected boolean destroyWhenExit = true;
99
100 private List<SpiderListener> spiderListeners;
101
102 private final AtomicLong pageCount = new AtomicLong(0);
103
104 private Date startTime;
105
106 private long emptySleepTime = 30000;
107
108
109
110
111
112
113
114
115 public static Spider create(PageProcessor pageProcessor) {
116 return new Spider(pageProcessor);
117 }
118
119
120
121
122
123
124 public Spider(PageProcessor pageProcessor) {
125 this.pageProcessor = pageProcessor;
126 this.site = pageProcessor.getSite();
127 this.scheduler = new SpiderScheduler(new QueueScheduler());
128 }
129
130
131
132
133
134
135
136
137 public Spider startUrls(List<String> startUrls) {
138 checkIfRunning();
139 this.startRequests = UrlUtils.convertToRequests(startUrls);
140 return this;
141 }
142
143
144
145
146
147
148
149
150 public Spider startRequest(List<Request> startRequests) {
151 checkIfRunning();
152 this.startRequests = startRequests;
153 return this;
154 }
155
156
157
158
159
160
161
162
163 public Spider setUUID(String uuid) {
164 this.uuid = uuid;
165 return this;
166 }
167
168
169
170
171
172
173
174
175 @Deprecated
176 public Spider scheduler(Scheduler scheduler) {
177 return setScheduler(scheduler);
178 }
179
180
181
182
183
184
185
186
187
188 public Spider setScheduler(Scheduler updateScheduler) {
189 checkIfRunning();
190 Scheduler oldScheduler = scheduler.getScheduler();
191 scheduler.setScheduler(updateScheduler);
192 if (oldScheduler != null) {
193 Request request;
194 while ((request = oldScheduler.poll(this)) != null) {
195 this.scheduler.push(request, this);
196 }
197 }
198 return this;
199 }
200
201
202
203
204
205
206
207
208
209 @Deprecated
210 public Spider pipeline(Pipeline pipeline) {
211 return addPipeline(pipeline);
212 }
213
214
215
216
217
218
219
220
221
222 public Spider addPipeline(Pipeline pipeline) {
223 checkIfRunning();
224 this.pipelines.add(pipeline);
225 return this;
226 }
227
228
229
230
231
232
233
234
235
236 public Spider setPipelines(List<Pipeline> pipelines) {
237 checkIfRunning();
238 this.pipelines = pipelines;
239 return this;
240 }
241
242
243
244
245
246
247 public Spider clearPipeline() {
248 pipelines = new ArrayList<Pipeline>();
249 return this;
250 }
251
252
253
254
255
256
257
258
259
260 @Deprecated
261 public Spider downloader(Downloader downloader) {
262 return setDownloader(downloader);
263 }
264
265
266
267
268
269
270
271
272 public Spider setDownloader(Downloader downloader) {
273 checkIfRunning();
274 this.downloader = downloader;
275 return this;
276 }
277
278 protected void initComponent() {
279 if (downloader == null) {
280 this.downloader = new HttpClientDownloader();
281 }
282 if (pipelines.isEmpty()) {
283 pipelines.add(new ConsolePipeline());
284 }
285 downloader.setThread(threadNum);
286 if (threadPool == null || threadPool.isShutdown()) {
287 if (executorService != null && !executorService.isShutdown()) {
288 threadPool = new CountableThreadPool(threadNum, executorService);
289 } else {
290 threadPool = new CountableThreadPool(threadNum);
291 }
292 }
293 if (startRequests != null) {
294 for (Request request : startRequests) {
295 addRequest(request);
296 }
297 startRequests.clear();
298 }
299 startTime = new Date();
300 }
301
302 @Override
303 public void run() {
304 checkRunningStat();
305 initComponent();
306 logger.info("Spider {} started!", getUUID());
307
308 while (!Thread.currentThread().isInterrupted() && stat.get() == STAT_RUNNING) {
309 Request poll = scheduler.poll(this);
310 if (poll == null) {
311 if (threadPool.getThreadAlive() == 0) {
312
313 poll = scheduler.poll(this);
314 if (poll == null) {
315 if (exitWhenComplete) {
316 break;
317 } else {
318
319 try {
320 Thread.sleep(emptySleepTime);
321 continue;
322 } catch (InterruptedException e) {
323 Thread.currentThread().interrupt();
324 break;
325 }
326 }
327 }
328 } else {
329
330 if (scheduler.waitNewUrl(threadPool, emptySleepTime)) {
331
332 break;
333 }
334 continue;
335 }
336 }
337 final Request request = poll;
338
339 threadPool.execute(new Runnable() {
340 @Override
341 public void run() {
342 try {
343 processRequest(request);
344 onSuccess(request);
345 } catch (Exception e) {
346 onError(request, e);
347 logger.error("process request " + request + " error", e);
348 } finally {
349 pageCount.incrementAndGet();
350 scheduler.signalNewUrl();
351 }
352 }
353 });
354 }
355 stat.set(STAT_STOPPED);
356
357 if (destroyWhenExit) {
358 close();
359 }
360 logger.info("Spider {} closed! {} pages downloaded.", getUUID(), pageCount.get());
361 }
362
363
364
365
366 @Deprecated
367 protected void onError(Request request) {
368 }
369
370 protected void onError(Request request, Exception e) {
371 this.onError(request);
372
373 if (CollectionUtils.isNotEmpty(spiderListeners)) {
374 for (SpiderListener spiderListener : spiderListeners) {
375 spiderListener.onError(request, e);
376 }
377 }
378 }
379
380 protected void onSuccess(Request request) {
381 if (CollectionUtils.isNotEmpty(spiderListeners)) {
382 for (SpiderListener spiderListener : spiderListeners) {
383 spiderListener.onSuccess(request);
384 }
385 }
386 }
387
388 private void checkRunningStat() {
389 while (true) {
390 int statNow = stat.get();
391 if (statNow == STAT_RUNNING) {
392 throw new IllegalStateException("Spider is already running!");
393 }
394 if (stat.compareAndSet(statNow, STAT_RUNNING)) {
395 break;
396 }
397 }
398 }
399
400 public void close() {
401 destroyEach(downloader);
402 destroyEach(pageProcessor);
403 destroyEach(scheduler);
404 for (Pipeline pipeline : pipelines) {
405 destroyEach(pipeline);
406 }
407 threadPool.shutdown();
408 }
409
410 private void destroyEach(Object object) {
411 if (object instanceof Closeable) {
412 try {
413 ((Closeable) object).close();
414 } catch (IOException e) {
415 e.printStackTrace();
416 }
417 }
418 }
419
420
421
422
423
424
425 public void test(String... urls) {
426 initComponent();
427 if (urls.length > 0) {
428 for (String url : urls) {
429 processRequest(new Request(url));
430 }
431 }
432 }
433
434 private void processRequest(Request request) {
435 Page page;
436 if (null != request.getDownloader()){
437 page = request.getDownloader().download(request,this);
438 }else {
439 page = downloader.download(request, this);
440 }
441 if (page.isDownloadSuccess()){
442 onDownloadSuccess(request, page);
443 } else {
444 onDownloaderFail(request);
445 }
446 }
447
448 private void onDownloadSuccess(Request request, Page page) {
449 if (site.getAcceptStatCode().contains(page.getStatusCode())){
450 pageProcessor.process(page);
451 extractAndAddRequests(page, spawnUrl);
452 if (!page.getResultItems().isSkip()) {
453 for (Pipeline pipeline : pipelines) {
454 pipeline.process(page.getResultItems(), this);
455 }
456 }
457 } else {
458 logger.info("page status code error, page {} , code: {}", request.getUrl(), page.getStatusCode());
459 }
460 sleep(site.getSleepTime());
461 }
462
463 private void onDownloaderFail(Request request) {
464 if (site.getCycleRetryTimes() == 0) {
465 sleep(site.getSleepTime());
466 } else {
467
468 doCycleRetry(request);
469 }
470 }
471
472 private void doCycleRetry(Request request) {
473 Object cycleTriedTimesObject = request.getExtra(Request.CYCLE_TRIED_TIMES);
474 if (cycleTriedTimesObject == null) {
475 addRequest(SerializationUtils.clone(request).setPriority(0).putExtra(Request.CYCLE_TRIED_TIMES, 1));
476 } else {
477 int cycleTriedTimes = (Integer) cycleTriedTimesObject;
478 cycleTriedTimes++;
479 if (cycleTriedTimes < site.getCycleRetryTimes()) {
480 addRequest(SerializationUtils.clone(request).setPriority(0).putExtra(Request.CYCLE_TRIED_TIMES, cycleTriedTimes));
481 }
482 }
483 sleep(site.getRetrySleepTime());
484 }
485
486 protected void sleep(int time) {
487 try {
488 Thread.sleep(time);
489 } catch (InterruptedException e) {
490 logger.error("Thread interrupted when sleep",e);
491 Thread.currentThread().interrupt();
492 }
493 }
494
495 protected void extractAndAddRequests(Page page, boolean spawnUrl) {
496 if (spawnUrl && CollectionUtils.isNotEmpty(page.getTargetRequests())) {
497 for (Request request : page.getTargetRequests()) {
498 addRequest(request);
499 }
500 }
501 }
502
503 private void addRequest(Request request) {
504 if (site.getDomain() == null && request != null && request.getUrl() != null) {
505 site.setDomain(UrlUtils.getDomain(request.getUrl()));
506 }
507 scheduler.push(request, this);
508 }
509
510 protected void checkIfRunning() {
511 if (stat.get() == STAT_RUNNING) {
512 throw new IllegalStateException("Spider is already running!");
513 }
514 }
515
516 public void runAsync() {
517 Thread thread = new Thread(this);
518 thread.setDaemon(false);
519 thread.start();
520 }
521
522
523
524
525
526
527
528 public Spider addUrl(String... urls) {
529 for (String url : urls) {
530 addRequest(new Request(url));
531 }
532 scheduler.signalNewUrl();
533 return this;
534 }
535
536
537
538
539
540
541
542
543 public <T> List<T> getAll(Collection<String> urls) {
544 destroyWhenExit = false;
545 spawnUrl = false;
546 if (startRequests!=null){
547 startRequests.clear();
548 }
549 for (Request request : UrlUtils.convertToRequests(urls)) {
550 addRequest(request);
551 }
552 CollectorPipeline collectorPipeline = getCollectorPipeline();
553 pipelines.add(collectorPipeline);
554 run();
555 spawnUrl = true;
556 destroyWhenExit = true;
557 return collectorPipeline.getCollected();
558 }
559
560 protected CollectorPipeline getCollectorPipeline() {
561 return new ResultItemsCollectorPipeline();
562 }
563
564 public <T> T get(String url) {
565 List<String> urls = WMCollections.newArrayList(url);
566 List<T> resultItemses = getAll(urls);
567 if (resultItemses != null && resultItemses.size() > 0) {
568 return resultItemses.get(0);
569 } else {
570 return null;
571 }
572 }
573
574
575
576
577
578
579
580 public Spider addRequest(Request... requests) {
581 for (Request request : requests) {
582 addRequest(request);
583 }
584 scheduler.signalNewUrl();
585 return this;
586 }
587
588 public void start() {
589 runAsync();
590 }
591
592 public void stop() {
593 if (stat.compareAndSet(STAT_RUNNING, STAT_STOPPED)) {
594 logger.info("Spider " + getUUID() + " stop success!");
595 } else {
596 logger.info("Spider " + getUUID() + " stop fail!");
597 }
598 }
599
600
601
602
603 public void stopWhenComplete(){
604 this.exitWhenComplete = true;
605 }
606
607
608
609
610
611
612
613 public Spider thread(int threadNum) {
614 checkIfRunning();
615 this.threadNum = threadNum;
616 if (threadNum <= 0) {
617 throw new IllegalArgumentException("threadNum should be more than one!");
618 }
619 return this;
620 }
621
622
623
624
625
626
627
628
629 public Spider thread(ExecutorService executorService, int threadNum) {
630 checkIfRunning();
631 this.threadNum = threadNum;
632 if (threadNum <= 0) {
633 throw new IllegalArgumentException("threadNum should be more than one!");
634 }
635 this.executorService = executorService;
636 return this;
637 }
638
639 public boolean isExitWhenComplete() {
640 return exitWhenComplete;
641 }
642
643
644
645
646
647
648
649
650
651 public Spider setExitWhenComplete(boolean exitWhenComplete) {
652 this.exitWhenComplete = exitWhenComplete;
653 return this;
654 }
655
656 public boolean isSpawnUrl() {
657 return spawnUrl;
658 }
659
660
661
662
663
664
665
666 public long getPageCount() {
667 return pageCount.get();
668 }
669
670
671
672
673
674
675
676
677 public Status getStatus() {
678 return Status.fromValue(stat.get());
679 }
680
681
682 public enum Status {
683 Init(0), Running(1), Stopped(2);
684
685 private Status(int value) {
686 this.value = value;
687 }
688
689 private int value;
690
691 int getValue() {
692 return value;
693 }
694
695 public static Status fromValue(int value) {
696 for (Status status : Status.values()) {
697 if (status.getValue() == value) {
698 return status;
699 }
700 }
701
702 return Init;
703 }
704 }
705
706
707
708
709
710
711
712 public int getThreadAlive() {
713 if (threadPool == null) {
714 return 0;
715 }
716 return threadPool.getThreadAlive();
717 }
718
719
720
721
722
723
724
725
726
727
728 public Spider setSpawnUrl(boolean spawnUrl) {
729 this.spawnUrl = spawnUrl;
730 return this;
731 }
732
733 @Override
734 public String getUUID() {
735 if (uuid != null) {
736 return uuid;
737 }
738 if (site != null) {
739 return site.getDomain();
740 }
741 uuid = UUID.randomUUID().toString();
742 return uuid;
743 }
744
745 public Spider setExecutorService(ExecutorService executorService) {
746 checkIfRunning();
747 this.executorService = executorService;
748 return this;
749 }
750
751 @Override
752 public Site getSite() {
753 return site;
754 }
755
756 public List<SpiderListener> getSpiderListeners() {
757 return spiderListeners;
758 }
759
760 public Spider setSpiderListeners(List<SpiderListener> spiderListeners) {
761 this.spiderListeners = spiderListeners;
762 return this;
763 }
764
765 public Date getStartTime() {
766 return startTime;
767 }
768
769 public Scheduler getScheduler() {
770 return scheduler.getScheduler();
771 }
772
773
774
775
776
777
778
779 public Spider setEmptySleepTime(long emptySleepTime) {
780 if(emptySleepTime<=0){
781 throw new IllegalArgumentException("emptySleepTime should be more than zero!");
782 }
783 this.emptySleepTime = emptySleepTime;
784 return this;
785 }
786 }