1 package us.codecraft.webmagic;
2
3
4 import java.io.Closeable;
5 import java.io.IOException;
6 import java.util.ArrayList;
7 import java.util.Collection;
8 import java.util.Date;
9 import java.util.List;
10 import java.util.UUID;
11 import java.util.concurrent.ExecutorService;
12 import java.util.concurrent.atomic.AtomicInteger;
13 import java.util.concurrent.atomic.AtomicLong;
14 import org.apache.commons.collections4.CollectionUtils;
15 import org.apache.commons.lang3.SerializationUtils;
16 import org.slf4j.Logger;
17 import org.slf4j.LoggerFactory;
18 import us.codecraft.webmagic.downloader.Downloader;
19 import us.codecraft.webmagic.downloader.HttpClientDownloader;
20 import us.codecraft.webmagic.pipeline.CollectorPipeline;
21 import us.codecraft.webmagic.pipeline.ConsolePipeline;
22 import us.codecraft.webmagic.pipeline.Pipeline;
23 import us.codecraft.webmagic.pipeline.ResultItemsCollectorPipeline;
24 import us.codecraft.webmagic.processor.PageProcessor;
25 import us.codecraft.webmagic.scheduler.QueueScheduler;
26 import us.codecraft.webmagic.scheduler.Scheduler;
27 import us.codecraft.webmagic.thread.CountableThreadPool;
28 import us.codecraft.webmagic.utils.UrlUtils;
29 import us.codecraft.webmagic.utils.WMCollections;
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62 public class Spider implements Runnable, Task {
63
64 protected Downloader downloader;
65
66 protected List<Pipeline> pipelines = new ArrayList<Pipeline>();
67
68 protected PageProcessor pageProcessor;
69
70 protected List<Request> startRequests;
71
72 protected Site site;
73
74 protected String uuid;
75
76 protected SpiderScheduler scheduler;
77
78 protected Logger logger = LoggerFactory.getLogger(getClass());
79
80 protected CountableThreadPool threadPool;
81
82 protected ExecutorService executorService;
83
84 protected int threadNum = 1;
85
86 protected AtomicInteger stat = new AtomicInteger(STAT_INIT);
87
88 protected boolean exitWhenComplete = true;
89
90 protected final static int STAT_INIT = 0;
91
92 protected final static int STAT_RUNNING = 1;
93
94 protected final static int STAT_STOPPED = 2;
95
96 protected boolean spawnUrl = true;
97
98 protected boolean destroyWhenExit = true;
99
100 private List<SpiderListener> spiderListeners;
101
102 private final AtomicLong pageCount = new AtomicLong(0);
103
104 private Date startTime;
105
106 private long emptySleepTime = 30000;
107
108
109
110
111
112
113
114
115 public static Spider create(PageProcessor pageProcessor) {
116 return new Spider(pageProcessor);
117 }
118
119
120
121
122
123
124 public Spider(PageProcessor pageProcessor) {
125 this.pageProcessor = pageProcessor;
126 this.site = pageProcessor.getSite();
127 this.scheduler = new SpiderScheduler(new QueueScheduler());
128 }
129
130
131
132
133
134
135
136
137 public Spider startUrls(List<String> startUrls) {
138 checkIfRunning();
139 this.startRequests = UrlUtils.convertToRequests(startUrls);
140 return this;
141 }
142
143
144
145
146
147
148
149
150 public Spider startRequest(List<Request> startRequests) {
151 checkIfRunning();
152 this.startRequests = startRequests;
153 return this;
154 }
155
156
157
158
159
160
161
162
163 public Spider setUUID(String uuid) {
164 this.uuid = uuid;
165 return this;
166 }
167
168
169
170
171
172
173
174
175 @Deprecated
176 public Spider scheduler(Scheduler scheduler) {
177 return setScheduler(scheduler);
178 }
179
180
181
182
183
184
185
186
187
188 public Spider setScheduler(Scheduler updateScheduler) {
189 checkIfRunning();
190 SpiderScheduler oldScheduler = this.scheduler;
191 scheduler.setScheduler(updateScheduler);
192 if (oldScheduler != null) {
193 Request request;
194 while ((request = oldScheduler.poll(this)) != null) {
195 this.scheduler.push(request, this);
196 }
197 }
198 return this;
199 }
200
201
202
203
204
205
206
207
208
209 @Deprecated
210 public Spider pipeline(Pipeline pipeline) {
211 return addPipeline(pipeline);
212 }
213
214
215
216
217
218
219
220
221
222 public Spider addPipeline(Pipeline pipeline) {
223 checkIfRunning();
224 this.pipelines.add(pipeline);
225 return this;
226 }
227
228
229
230
231
232
233
234
235
236 public Spider setPipelines(List<Pipeline> pipelines) {
237 checkIfRunning();
238 this.pipelines = pipelines;
239 return this;
240 }
241
242
243
244
245
246
247 public Spider clearPipeline() {
248 pipelines = new ArrayList<Pipeline>();
249 return this;
250 }
251
252
253
254
255
256
257
258
259
260 @Deprecated
261 public Spider downloader(Downloader downloader) {
262 return setDownloader(downloader);
263 }
264
265
266
267
268
269
270
271
272 public Spider setDownloader(Downloader downloader) {
273 checkIfRunning();
274 this.downloader = downloader;
275 return this;
276 }
277
278 protected void initComponent() {
279 if (downloader == null) {
280 this.downloader = new HttpClientDownloader();
281 }
282 if (pipelines.isEmpty()) {
283 pipelines.add(new ConsolePipeline());
284 }
285 downloader.setThread(threadNum);
286 if (threadPool == null || threadPool.isShutdown()) {
287 if (executorService != null && !executorService.isShutdown()) {
288 threadPool = new CountableThreadPool(threadNum, executorService);
289 } else {
290 threadPool = new CountableThreadPool(threadNum);
291 }
292 }
293 if (startRequests != null) {
294 for (Request request : startRequests) {
295 addRequest(request);
296 }
297 startRequests.clear();
298 }
299 startTime = new Date();
300 }
301
302 @Override
303 public void run() {
304 checkRunningStat();
305 initComponent();
306 logger.info("Spider {} started!", getUUID());
307
308 while (!Thread.currentThread().isInterrupted() && stat.get() == STAT_RUNNING) {
309 Request poll = scheduler.poll(this);
310 if (poll == null) {
311 if (threadPool.getThreadAlive() == 0) {
312
313 poll = scheduler.poll(this);
314 if (poll == null) {
315 if (exitWhenComplete) {
316 break;
317 } else {
318
319 try {
320 Thread.sleep(emptySleepTime);
321 continue;
322 } catch (InterruptedException e) {
323 Thread.currentThread().interrupt();
324 break;
325 }
326 }
327 }
328 } else {
329
330 if (scheduler.waitNewUrl(threadPool, emptySleepTime)) {
331
332 break;
333 }
334 continue;
335 }
336 }
337 final Request request = poll;
338
339 threadPool.execute(new Runnable() {
340 @Override
341 public void run() {
342 try {
343 processRequest(request);
344 onSuccess(request);
345 } catch (Exception e) {
346 onError(request, e);
347 logger.error("process request " + request + " error", e);
348 } finally {
349 pageCount.incrementAndGet();
350 scheduler.signalNewUrl();
351 }
352 }
353 });
354 }
355 stat.set(STAT_STOPPED);
356
357 if (destroyWhenExit) {
358 close();
359 }
360 logger.info("Spider {} closed! {} pages downloaded.", getUUID(), pageCount.get());
361 }
362
363
364
365
366 @Deprecated
367 protected void onError(Request request) {
368 }
369
370 protected void onError(Request request, Exception e) {
371 this.onError(request);
372
373 if (CollectionUtils.isNotEmpty(spiderListeners)) {
374 for (SpiderListener spiderListener : spiderListeners) {
375 spiderListener.onError(request, e);
376 }
377 }
378 }
379
380 protected void onSuccess(Request request) {
381 if (CollectionUtils.isNotEmpty(spiderListeners)) {
382 for (SpiderListener spiderListener : spiderListeners) {
383 spiderListener.onSuccess(request);
384 }
385 }
386 }
387
388 private void checkRunningStat() {
389 while (true) {
390 int statNow = stat.get();
391 if (statNow == STAT_RUNNING) {
392 throw new IllegalStateException("Spider is already running!");
393 }
394 if (stat.compareAndSet(statNow, STAT_RUNNING)) {
395 break;
396 }
397 }
398 }
399
400 public void close() {
401 destroyEach(downloader);
402 destroyEach(pageProcessor);
403 destroyEach(scheduler);
404 for (Pipeline pipeline : pipelines) {
405 destroyEach(pipeline);
406 }
407 threadPool.shutdown();
408 }
409
410 private void destroyEach(Object object) {
411 if (object instanceof Closeable) {
412 try {
413 ((Closeable) object).close();
414 } catch (IOException e) {
415 e.printStackTrace();
416 }
417 }
418 }
419
420
421
422
423
424
425 public void test(String... urls) {
426 initComponent();
427 if (urls.length > 0) {
428 for (String url : urls) {
429 processRequest(new Request(url));
430 }
431 }
432 }
433
434 private void processRequest(Request request) {
435 Page page;
436 if (null != request.getDownloader()){
437 page = request.getDownloader().download(request,this);
438 }else {
439 page = downloader.download(request, this);
440 }
441 if (page.isDownloadSuccess()){
442 onDownloadSuccess(request, page);
443 } else {
444 onDownloaderFail(request);
445 }
446 }
447
448 private void onDownloadSuccess(Request request, Page page) {
449 if (site.getAcceptStatCode().contains(page.getStatusCode())){
450 pageProcessor.process(page);
451 extractAndAddRequests(page, spawnUrl);
452 if (!page.getResultItems().isSkip()) {
453 for (Pipeline pipeline : pipelines) {
454 pipeline.process(page.getResultItems(), this);
455 }
456 }
457 } else {
458 logger.info("page status code error, page {} , code: {}", request.getUrl(), page.getStatusCode());
459 }
460 sleep(site.getSleepTime());
461 return;
462 }
463
464 private void onDownloaderFail(Request request) {
465 if (site.getCycleRetryTimes() == 0) {
466 sleep(site.getSleepTime());
467 } else {
468
469 doCycleRetry(request);
470 }
471 }
472
473 private void doCycleRetry(Request request) {
474 Object cycleTriedTimesObject = request.getExtra(Request.CYCLE_TRIED_TIMES);
475 if (cycleTriedTimesObject == null) {
476 addRequest(SerializationUtils.clone(request).setPriority(0).putExtra(Request.CYCLE_TRIED_TIMES, 1));
477 } else {
478 int cycleTriedTimes = (Integer) cycleTriedTimesObject;
479 cycleTriedTimes++;
480 if (cycleTriedTimes < site.getCycleRetryTimes()) {
481 addRequest(SerializationUtils.clone(request).setPriority(0).putExtra(Request.CYCLE_TRIED_TIMES, cycleTriedTimes));
482 }
483 }
484 sleep(site.getRetrySleepTime());
485 }
486
487 protected void sleep(int time) {
488 try {
489 Thread.sleep(time);
490 } catch (InterruptedException e) {
491 logger.error("Thread interrupted when sleep",e);
492 Thread.currentThread().interrupt();
493 }
494 }
495
496 protected void extractAndAddRequests(Page page, boolean spawnUrl) {
497 if (spawnUrl && CollectionUtils.isNotEmpty(page.getTargetRequests())) {
498 for (Request request : page.getTargetRequests()) {
499 addRequest(request);
500 }
501 }
502 }
503
504 private void addRequest(Request request) {
505 if (site.getDomain() == null && request != null && request.getUrl() != null) {
506 site.setDomain(UrlUtils.getDomain(request.getUrl()));
507 }
508 scheduler.push(request, this);
509 }
510
511 protected void checkIfRunning() {
512 if (stat.get() == STAT_RUNNING) {
513 throw new IllegalStateException("Spider is already running!");
514 }
515 }
516
517 public void runAsync() {
518 Thread thread = new Thread(this);
519 thread.setDaemon(false);
520 thread.start();
521 }
522
523
524
525
526
527
528
529 public Spider addUrl(String... urls) {
530 for (String url : urls) {
531 addRequest(new Request(url));
532 }
533 scheduler.signalNewUrl();
534 return this;
535 }
536
537
538
539
540
541
542
543
544 public <T> List<T> getAll(Collection<String> urls) {
545 destroyWhenExit = false;
546 spawnUrl = false;
547 if (startRequests!=null){
548 startRequests.clear();
549 }
550 for (Request request : UrlUtils.convertToRequests(urls)) {
551 addRequest(request);
552 }
553 CollectorPipeline collectorPipeline = getCollectorPipeline();
554 pipelines.add(collectorPipeline);
555 run();
556 spawnUrl = true;
557 destroyWhenExit = true;
558 return collectorPipeline.getCollected();
559 }
560
561 protected CollectorPipeline getCollectorPipeline() {
562 return new ResultItemsCollectorPipeline();
563 }
564
565 public <T> T get(String url) {
566 List<String> urls = WMCollections.newArrayList(url);
567 List<T> resultItemses = getAll(urls);
568 if (resultItemses != null && resultItemses.size() > 0) {
569 return resultItemses.get(0);
570 } else {
571 return null;
572 }
573 }
574
575
576
577
578
579
580
581 public Spider addRequest(Request... requests) {
582 for (Request request : requests) {
583 addRequest(request);
584 }
585 scheduler.signalNewUrl();
586 return this;
587 }
588
589 public void start() {
590 runAsync();
591 }
592
593 public void stop() {
594 if (stat.compareAndSet(STAT_RUNNING, STAT_STOPPED)) {
595 logger.info("Spider " + getUUID() + " stop success!");
596 } else {
597 logger.info("Spider " + getUUID() + " stop fail!");
598 }
599 }
600
601
602
603
604
605
606
607 public Spider thread(int threadNum) {
608 checkIfRunning();
609 this.threadNum = threadNum;
610 if (threadNum <= 0) {
611 throw new IllegalArgumentException("threadNum should be more than one!");
612 }
613 return this;
614 }
615
616
617
618
619
620
621
622
623 public Spider thread(ExecutorService executorService, int threadNum) {
624 checkIfRunning();
625 this.threadNum = threadNum;
626 if (threadNum <= 0) {
627 throw new IllegalArgumentException("threadNum should be more than one!");
628 }
629 this.executorService = executorService;
630 return this;
631 }
632
633 public boolean isExitWhenComplete() {
634 return exitWhenComplete;
635 }
636
637
638
639
640
641
642
643
644
645 public Spider setExitWhenComplete(boolean exitWhenComplete) {
646 this.exitWhenComplete = exitWhenComplete;
647 return this;
648 }
649
650 public boolean isSpawnUrl() {
651 return spawnUrl;
652 }
653
654
655
656
657
658
659
660 public long getPageCount() {
661 return pageCount.get();
662 }
663
664
665
666
667
668
669
670
671 public Status getStatus() {
672 return Status.fromValue(stat.get());
673 }
674
675
676 public enum Status {
677 Init(0), Running(1), Stopped(2);
678
679 private Status(int value) {
680 this.value = value;
681 }
682
683 private int value;
684
685 int getValue() {
686 return value;
687 }
688
689 public static Status fromValue(int value) {
690 for (Status status : Status.values()) {
691 if (status.getValue() == value) {
692 return status;
693 }
694 }
695
696 return Init;
697 }
698 }
699
700
701
702
703
704
705
706 public int getThreadAlive() {
707 if (threadPool == null) {
708 return 0;
709 }
710 return threadPool.getThreadAlive();
711 }
712
713
714
715
716
717
718
719
720
721
722 public Spider setSpawnUrl(boolean spawnUrl) {
723 this.spawnUrl = spawnUrl;
724 return this;
725 }
726
727 @Override
728 public String getUUID() {
729 if (uuid != null) {
730 return uuid;
731 }
732 if (site != null) {
733 return site.getDomain();
734 }
735 uuid = UUID.randomUUID().toString();
736 return uuid;
737 }
738
739 public Spider setExecutorService(ExecutorService executorService) {
740 checkIfRunning();
741 this.executorService = executorService;
742 return this;
743 }
744
745 @Override
746 public Site getSite() {
747 return site;
748 }
749
750 public List<SpiderListener> getSpiderListeners() {
751 return spiderListeners;
752 }
753
754 public Spider setSpiderListeners(List<SpiderListener> spiderListeners) {
755 this.spiderListeners = spiderListeners;
756 return this;
757 }
758
759 public Date getStartTime() {
760 return startTime;
761 }
762
763 public Scheduler getScheduler() {
764 return scheduler.getScheduler();
765 }
766
767
768
769
770
771
772
773 public Spider setEmptySleepTime(long emptySleepTime) {
774 if(emptySleepTime<=0){
775 throw new IllegalArgumentException("emptySleepTime should be more than zero!");
776 }
777 this.emptySleepTime = emptySleepTime;
778 return this;
779 }
780 }