View Javadoc
1   package us.codecraft.webmagic;
2   
3   
4   import java.io.Closeable;
5   import java.io.IOException;
6   import java.util.ArrayList;
7   import java.util.Collection;
8   import java.util.Date;
9   import java.util.List;
10  import java.util.UUID;
11  import java.util.concurrent.ExecutorService;
12  import java.util.concurrent.atomic.AtomicInteger;
13  import java.util.concurrent.atomic.AtomicLong;
14  import org.apache.commons.collections4.CollectionUtils;
15  import org.apache.commons.lang3.SerializationUtils;
16  import org.slf4j.Logger;
17  import org.slf4j.LoggerFactory;
18  import us.codecraft.webmagic.downloader.Downloader;
19  import us.codecraft.webmagic.downloader.HttpClientDownloader;
20  import us.codecraft.webmagic.pipeline.CollectorPipeline;
21  import us.codecraft.webmagic.pipeline.ConsolePipeline;
22  import us.codecraft.webmagic.pipeline.Pipeline;
23  import us.codecraft.webmagic.pipeline.ResultItemsCollectorPipeline;
24  import us.codecraft.webmagic.processor.PageProcessor;
25  import us.codecraft.webmagic.scheduler.QueueScheduler;
26  import us.codecraft.webmagic.scheduler.Scheduler;
27  import us.codecraft.webmagic.thread.CountableThreadPool;
28  import us.codecraft.webmagic.utils.UrlUtils;
29  import us.codecraft.webmagic.utils.WMCollections;
30  
31  /**
32   * Entrance of a crawler.<br>
33   * A spider contains four modules: Downloader, Scheduler, PageProcessor and
34   * Pipeline.<br>
35   * Every module is a field of Spider. <br>
36   * The modules are defined in interface. <br>
37   * You can customize a spider with various implementations of them. <br>
38   * Examples: <br>
39   * <br>
40   * A simple crawler: <br>
41   * Spider.create(new SimplePageProcessor("http://my.oschina.net/",
42   * "http://my.oschina.net/*blog/*")).run();<br>
43   * <br>
44   * Store results to files by FilePipeline: <br>
45   * Spider.create(new SimplePageProcessor("http://my.oschina.net/",
46   * "http://my.oschina.net/*blog/*")) <br>
47   * .pipeline(new FilePipeline("/data/temp/webmagic/")).run(); <br>
48   * <br>
49   * Use FileCacheQueueScheduler to store urls and cursor in files, so that a
50   * Spider can resume the status when shutdown. <br>
51   * Spider.create(new SimplePageProcessor("http://my.oschina.net/",
52   * "http://my.oschina.net/*blog/*")) <br>
53   * .scheduler(new FileCacheQueueScheduler("/data/temp/webmagic/cache/")).run(); <br>
54   *
55   * @author code4crafter@gmail.com <br>
56   * @see Downloader
57   * @see Scheduler
58   * @see PageProcessor
59   * @see Pipeline
60   * @since 0.1.0
61   */
62  public class Spider implements Runnable, Task {
63  
64      protected Downloader downloader;
65  
66      protected List<Pipeline> pipelines = new ArrayList<Pipeline>();
67  
68      protected PageProcessor pageProcessor;
69  
70      protected List<Request> startRequests;
71  
72      protected Site site;
73  
74      protected String uuid;
75      
76      protected SpiderScheduler scheduler;
77      
78      protected Logger logger = LoggerFactory.getLogger(getClass());
79  
80      protected CountableThreadPool threadPool;
81  
82      protected ExecutorService executorService;
83  
84      protected int threadNum = 1;
85  
86      protected AtomicInteger stat = new AtomicInteger(STAT_INIT);
87  
88      protected boolean exitWhenComplete = true;
89  
90      protected final static int STAT_INIT = 0;
91  
92      protected final static int STAT_RUNNING = 1;
93  
94      protected final static int STAT_STOPPED = 2;
95  
96      protected boolean spawnUrl = true;
97  
98      protected boolean destroyWhenExit = true;
99  
100     private List<SpiderListener> spiderListeners;
101 
102     private final AtomicLong pageCount = new AtomicLong(0);
103 
104     private Date startTime;
105 
106     private long emptySleepTime = 30000;
107 
108     /**
109      * create a spider with pageProcessor.
110      *
111      * @param pageProcessor pageProcessor
112      * @return new spider
113      * @see PageProcessor
114      */
115     public static Spider create(PageProcessor pageProcessor) {
116         return new Spider(pageProcessor);
117     }
118 
119     /**
120      * create a spider with pageProcessor.
121      *
122      * @param pageProcessor pageProcessor
123      */
124     public Spider(PageProcessor pageProcessor) {
125         this.pageProcessor = pageProcessor;
126         this.site = pageProcessor.getSite();
127         this.scheduler = new SpiderScheduler(new QueueScheduler());
128     }
129 
130     /**
131      * Set startUrls of Spider.<br>
132      * Prior to startUrls of Site.
133      *
134      * @param startUrls startUrls
135      * @return this
136      */
137     public Spider startUrls(List<String> startUrls) {
138         checkIfRunning();
139         this.startRequests = UrlUtils.convertToRequests(startUrls);
140         return this;
141     }
142 
143     /**
144      * Set startUrls of Spider.<br>
145      * Prior to startUrls of Site.
146      *
147      * @param startRequests startRequests
148      * @return this
149      */
150     public Spider startRequest(List<Request> startRequests) {
151         checkIfRunning();
152         this.startRequests = startRequests;
153         return this;
154     }
155 
156     /**
157      * Set an uuid for spider.<br>
158      * Default uuid is domain of site.<br>
159      *
160      * @param uuid uuid
161      * @return this
162      */
163     public Spider setUUID(String uuid) {
164         this.uuid = uuid;
165         return this;
166     }
167 
168     /**
169      * set scheduler for Spider
170      *
171      * @param scheduler scheduler
172      * @return this
173      * @see #setScheduler(us.codecraft.webmagic.scheduler.Scheduler)
174      */
175     @Deprecated
176     public Spider scheduler(Scheduler scheduler) {
177         return setScheduler(scheduler);
178     }
179 
180     /**
181      * set scheduler for Spider
182      *
183      * @param updateScheduler scheduler
184      * @return this
185      * @see Scheduler
186      * @since 0.2.1
187      */
188     public Spider setScheduler(Scheduler updateScheduler) {
189         checkIfRunning();
190         SpiderScheduler oldScheduler = this.scheduler;
191         scheduler.setScheduler(updateScheduler);
192         if (oldScheduler != null) {
193             Request request;
194             while ((request = oldScheduler.poll(this)) != null) {
195                 this.scheduler.push(request, this);
196             }
197         }
198         return this;
199     }
200 
201     /**
202      * add a pipeline for Spider
203      *
204      * @param pipeline pipeline
205      * @return this
206      * @see #addPipeline(us.codecraft.webmagic.pipeline.Pipeline)
207      * @deprecated
208      */
209     @Deprecated
210     public Spider pipeline(Pipeline pipeline) {
211         return addPipeline(pipeline);
212     }
213 
214     /**
215      * add a pipeline for Spider
216      *
217      * @param pipeline pipeline
218      * @return this
219      * @see Pipeline
220      * @since 0.2.1
221      */
222     public Spider addPipeline(Pipeline pipeline) {
223         checkIfRunning();
224         this.pipelines.add(pipeline);
225         return this;
226     }
227 
228     /**
229      * set pipelines for Spider
230      *
231      * @param pipelines pipelines
232      * @return this
233      * @see Pipeline
234      * @since 0.4.1
235      */
236     public Spider setPipelines(List<Pipeline> pipelines) {
237         checkIfRunning();
238         this.pipelines = pipelines;
239         return this;
240     }
241 
242     /**
243      * clear the pipelines set
244      *
245      * @return this
246      */
247     public Spider clearPipeline() {
248         pipelines = new ArrayList<Pipeline>();
249         return this;
250     }
251 
252     /**
253      * set the downloader of spider
254      *
255      * @param downloader downloader
256      * @return this
257      * @see #setDownloader(us.codecraft.webmagic.downloader.Downloader)
258      * @deprecated
259      */
260     @Deprecated
261     public Spider downloader(Downloader downloader) {
262         return setDownloader(downloader);
263     }
264 
265     /**
266      * set the downloader of spider
267      *
268      * @param downloader downloader
269      * @return this
270      * @see Downloader
271      */
272     public Spider setDownloader(Downloader downloader) {
273         checkIfRunning();
274         this.downloader = downloader;
275         return this;
276     }
277 
278     protected void initComponent() {
279         if (downloader == null) {
280             this.downloader = new HttpClientDownloader();
281         }
282         if (pipelines.isEmpty()) {
283             pipelines.add(new ConsolePipeline());
284         }
285         downloader.setThread(threadNum);
286         if (threadPool == null || threadPool.isShutdown()) {
287             if (executorService != null && !executorService.isShutdown()) {
288                 threadPool = new CountableThreadPool(threadNum, executorService);
289             } else {
290                 threadPool = new CountableThreadPool(threadNum);
291             }
292         }
293         if (startRequests != null) {
294             for (Request request : startRequests) {
295                 addRequest(request);
296             }
297             startRequests.clear();
298         }
299         startTime = new Date();
300     }
301 
302     @Override
303     public void run() {
304         checkRunningStat();
305         initComponent();
306         logger.info("Spider {} started!", getUUID());
307         // interrupt won't be necessarily detected
308         while (!Thread.currentThread().isInterrupted() && stat.get() == STAT_RUNNING) {
309             Request poll = scheduler.poll(this);
310             if (poll == null) {
311                 if (threadPool.getThreadAlive() == 0) {
312                     //no alive thread anymore , try again
313                     poll = scheduler.poll(this);
314                     if (poll == null) {
315                         if (exitWhenComplete) {
316                             break;
317                         } else {
318                             // wait
319                             try {
320                                 Thread.sleep(emptySleepTime);
321                                 continue;
322                             } catch (InterruptedException e) {
323                                 Thread.currentThread().interrupt();
324                                 break;
325                             }
326                         }
327                     }
328                 } else {
329                     // wait until new url added,
330                     if (scheduler.waitNewUrl(threadPool, emptySleepTime)) {
331                         // if interrupted
332                         break;
333                     }
334                     continue;
335                 }
336             }
337             final Request request = poll;
338             //this may swallow the interruption
339             threadPool.execute(new Runnable() {
340                 @Override
341                 public void run() {
342                     try {
343                         processRequest(request);
344                         onSuccess(request);
345                     } catch (Exception e) {
346                         onError(request, e);
347                         logger.error("process request " + request + " error", e);
348                     } finally {
349                         pageCount.incrementAndGet();
350                         scheduler.signalNewUrl();
351                     }
352                 }
353             });
354         }
355         stat.set(STAT_STOPPED);
356         // release some resources
357         if (destroyWhenExit) {
358             close();
359         }
360         logger.info("Spider {} closed! {} pages downloaded.", getUUID(), pageCount.get());
361     }
362 
363     /**
364      * @deprecated Use {@link #onError(Request, Exception)} instead.
365      */
366     @Deprecated
367     protected void onError(Request request) {
368     }
369 
370     protected void onError(Request request, Exception e) {
371         this.onError(request);
372 
373         if (CollectionUtils.isNotEmpty(spiderListeners)) {
374             for (SpiderListener spiderListener : spiderListeners) {
375                 spiderListener.onError(request, e);
376             }
377         }
378     }
379 
380     protected void onSuccess(Request request) {
381         if (CollectionUtils.isNotEmpty(spiderListeners)) {
382             for (SpiderListener spiderListener : spiderListeners) {
383                 spiderListener.onSuccess(request);
384             }
385         }
386     }
387 
388     private void checkRunningStat() {
389         while (true) {
390             int statNow = stat.get();
391             if (statNow == STAT_RUNNING) {
392                 throw new IllegalStateException("Spider is already running!");
393             }
394             if (stat.compareAndSet(statNow, STAT_RUNNING)) {
395                 break;
396             }
397         }
398     }
399 
400     public void close() {
401         destroyEach(downloader);
402         destroyEach(pageProcessor);
403         destroyEach(scheduler);
404         for (Pipeline pipeline : pipelines) {
405             destroyEach(pipeline);
406         }
407         threadPool.shutdown();
408     }
409 
410     private void destroyEach(Object object) {
411         if (object instanceof Closeable) {
412             try {
413                 ((Closeable) object).close();
414             } catch (IOException e) {
415                 e.printStackTrace();
416             }
417         }
418     }
419 
420     /**
421      * Process specific urls without url discovering.
422      *
423      * @param urls urls to process
424      */
425     public void test(String... urls) {
426         initComponent();
427         if (urls.length > 0) {
428             for (String url : urls) {
429                 processRequest(new Request(url));
430             }
431         }
432     }
433 
434     private void processRequest(Request request) {
435         Page page;
436         if (null != request.getDownloader()){
437             page = request.getDownloader().download(request,this);
438         }else {
439             page = downloader.download(request, this);
440         }
441         if (page.isDownloadSuccess()){
442             onDownloadSuccess(request, page);
443         } else {
444             onDownloaderFail(request);
445         }
446     }
447 
448     private void onDownloadSuccess(Request request, Page page) {
449         if (site.getAcceptStatCode().contains(page.getStatusCode())){
450             pageProcessor.process(page);
451             extractAndAddRequests(page, spawnUrl);
452             if (!page.getResultItems().isSkip()) {
453                 for (Pipeline pipeline : pipelines) {
454                     pipeline.process(page.getResultItems(), this);
455                 }
456             }
457         } else {
458             logger.info("page status code error, page {} , code: {}", request.getUrl(), page.getStatusCode());
459         }
460         sleep(site.getSleepTime());
461         return;
462     }
463 
464     private void onDownloaderFail(Request request) {
465         if (site.getCycleRetryTimes() == 0) {
466             sleep(site.getSleepTime());
467         } else {
468             // for cycle retry
469             doCycleRetry(request);
470         }
471     }
472 
473     private void doCycleRetry(Request request) {
474         Object cycleTriedTimesObject = request.getExtra(Request.CYCLE_TRIED_TIMES);
475         if (cycleTriedTimesObject == null) {
476             addRequest(SerializationUtils.clone(request).setPriority(0).putExtra(Request.CYCLE_TRIED_TIMES, 1));
477         } else {
478             int cycleTriedTimes = (Integer) cycleTriedTimesObject;
479             cycleTriedTimes++;
480             if (cycleTriedTimes < site.getCycleRetryTimes()) {
481                 addRequest(SerializationUtils.clone(request).setPriority(0).putExtra(Request.CYCLE_TRIED_TIMES, cycleTriedTimes));
482             }
483         }
484         sleep(site.getRetrySleepTime());
485     }
486 
487     protected void sleep(int time) {
488         try {
489             Thread.sleep(time);
490         } catch (InterruptedException e) {
491             logger.error("Thread interrupted when sleep",e);
492             Thread.currentThread().interrupt();
493         }
494     }
495 
496     protected void extractAndAddRequests(Page page, boolean spawnUrl) {
497         if (spawnUrl && CollectionUtils.isNotEmpty(page.getTargetRequests())) {
498             for (Request request : page.getTargetRequests()) {
499                 addRequest(request);
500             }
501         }
502     }
503 
504     private void addRequest(Request request) {
505         if (site.getDomain() == null && request != null && request.getUrl() != null) {
506             site.setDomain(UrlUtils.getDomain(request.getUrl()));
507         }
508         scheduler.push(request, this);
509     }
510 
511     protected void checkIfRunning() {
512         if (stat.get() == STAT_RUNNING) {
513             throw new IllegalStateException("Spider is already running!");
514         }
515     }
516 
517     public void runAsync() {
518         Thread thread = new Thread(this);
519         thread.setDaemon(false);
520         thread.start();
521     }
522 
523     /**
524      * Add urls to crawl. <br>
525      *
526      * @param urls urls
527      * @return this
528      */
529     public Spider addUrl(String... urls) {
530         for (String url : urls) {
531             addRequest(new Request(url));
532         }
533         scheduler.signalNewUrl();
534         return this;
535     }
536 
537     /**
538      * Download urls synchronizing.
539      *
540      * @param urls urls
541      * @param <T> type of process result
542      * @return list downloaded
543      */
544     public <T> List<T> getAll(Collection<String> urls) {
545         destroyWhenExit = false;
546         spawnUrl = false;
547         if (startRequests!=null){
548             startRequests.clear();
549         }
550         for (Request request : UrlUtils.convertToRequests(urls)) {
551             addRequest(request);
552         }
553         CollectorPipeline collectorPipeline = getCollectorPipeline();
554         pipelines.add(collectorPipeline);
555         run();
556         spawnUrl = true;
557         destroyWhenExit = true;
558         return collectorPipeline.getCollected();
559     }
560 
561     protected CollectorPipeline getCollectorPipeline() {
562         return new ResultItemsCollectorPipeline();
563     }
564 
565     public <T> T get(String url) {
566         List<String> urls = WMCollections.newArrayList(url);
567         List<T> resultItemses = getAll(urls);
568         if (resultItemses != null && resultItemses.size() > 0) {
569             return resultItemses.get(0);
570         } else {
571             return null;
572         }
573     }
574 
575     /**
576      * Add urls with information to crawl.<br>
577      *
578      * @param requests requests
579      * @return this
580      */
581     public Spider addRequest(Request... requests) {
582         for (Request request : requests) {
583             addRequest(request);
584         }
585         scheduler.signalNewUrl();
586         return this;
587     }
588 
589     public void start() {
590         runAsync();
591     }
592 
593     public void stop() {
594         if (stat.compareAndSet(STAT_RUNNING, STAT_STOPPED)) {
595             logger.info("Spider " + getUUID() + " stop success!");
596         } else {
597             logger.info("Spider " + getUUID() + " stop fail!");
598         }
599     }
600 
601     /**
602      * start with more than one threads
603      *
604      * @param threadNum threadNum
605      * @return this
606      */
607     public Spider thread(int threadNum) {
608         checkIfRunning();
609         this.threadNum = threadNum;
610         if (threadNum <= 0) {
611             throw new IllegalArgumentException("threadNum should be more than one!");
612         }
613         return this;
614     }
615 
616     /**
617      * start with more than one threads
618      *
619      * @param executorService executorService to run the spider
620      * @param threadNum threadNum
621      * @return this
622      */
623     public Spider thread(ExecutorService executorService, int threadNum) {
624         checkIfRunning();
625         this.threadNum = threadNum;
626         if (threadNum <= 0) {
627             throw new IllegalArgumentException("threadNum should be more than one!");
628         }
629         this.executorService = executorService;
630         return this;
631     }
632 
633     public boolean isExitWhenComplete() {
634         return exitWhenComplete;
635     }
636 
637     /**
638      * Exit when complete. <br>
639      * True: exit when all url of the site is downloaded. <br>
640      * False: not exit until call stop() manually.<br>
641      *
642      * @param exitWhenComplete exitWhenComplete
643      * @return this
644      */
645     public Spider setExitWhenComplete(boolean exitWhenComplete) {
646         this.exitWhenComplete = exitWhenComplete;
647         return this;
648     }
649 
650     public boolean isSpawnUrl() {
651         return spawnUrl;
652     }
653 
654     /**
655      * Get page count downloaded by spider.
656      *
657      * @return total downloaded page count
658      * @since 0.4.1
659      */
660     public long getPageCount() {
661         return pageCount.get();
662     }
663 
664     /**
665      * Get running status by spider.
666      *
667      * @return running status
668      * @see Status
669      * @since 0.4.1
670      */
671     public Status getStatus() {
672         return Status.fromValue(stat.get());
673     }
674 
675 
676     public enum Status {
677         Init(0), Running(1), Stopped(2);
678 
679         private Status(int value) {
680             this.value = value;
681         }
682 
683         private int value;
684 
685         int getValue() {
686             return value;
687         }
688 
689         public static Status fromValue(int value) {
690             for (Status status : Status.values()) {
691                 if (status.getValue() == value) {
692                     return status;
693                 }
694             }
695             //default value
696             return Init;
697         }
698     }
699 
700     /**
701      * Get thread count which is running
702      *
703      * @return thread count which is running
704      * @since 0.4.1
705      */
706     public int getThreadAlive() {
707         if (threadPool == null) {
708             return 0;
709         }
710         return threadPool.getThreadAlive();
711     }
712 
713     /**
714      * Whether add urls extracted to download.<br>
715      * Add urls to download when it is true, and just download seed urls when it is false. <br>
716      * DO NOT set it unless you know what it means!
717      *
718      * @param spawnUrl spawnUrl
719      * @return this
720      * @since 0.4.0
721      */
722     public Spider setSpawnUrl(boolean spawnUrl) {
723         this.spawnUrl = spawnUrl;
724         return this;
725     }
726 
727     @Override
728     public String getUUID() {
729         if (uuid != null) {
730             return uuid;
731         }
732         if (site != null) {
733             return site.getDomain();
734         }
735         uuid = UUID.randomUUID().toString();
736         return uuid;
737     }
738 
739     public Spider setExecutorService(ExecutorService executorService) {
740         checkIfRunning();
741         this.executorService = executorService;
742         return this;
743     }
744 
745     @Override
746     public Site getSite() {
747         return site;
748     }
749 
750     public List<SpiderListener> getSpiderListeners() {
751         return spiderListeners;
752     }
753 
754     public Spider setSpiderListeners(List<SpiderListener> spiderListeners) {
755         this.spiderListeners = spiderListeners;
756         return this;
757     }
758 
759     public Date getStartTime() {
760         return startTime;
761     }
762 
763     public Scheduler getScheduler() {
764         return scheduler.getScheduler();
765     }
766 
767     /**
768      * Set wait time when no url is polled.<br><br>
769      *
770      * @param emptySleepTime In MILLISECONDS.
771      * @return this
772      */
773     public Spider setEmptySleepTime(long emptySleepTime) {
774         if(emptySleepTime<=0){
775             throw new IllegalArgumentException("emptySleepTime should be more than zero!");
776         }
777         this.emptySleepTime = emptySleepTime;
778         return this;
779     }
780 }