View Javadoc
1   package us.codecraft.webmagic;
2   
3   
4   import java.io.Closeable;
5   import java.io.IOException;
6   import java.util.ArrayList;
7   import java.util.Collection;
8   import java.util.Date;
9   import java.util.List;
10  import java.util.UUID;
11  import java.util.concurrent.ExecutorService;
12  import java.util.concurrent.atomic.AtomicInteger;
13  import java.util.concurrent.atomic.AtomicLong;
14  import org.apache.commons.collections4.CollectionUtils;
15  import org.apache.commons.lang3.SerializationUtils;
16  import org.slf4j.Logger;
17  import org.slf4j.LoggerFactory;
18  import us.codecraft.webmagic.downloader.Downloader;
19  import us.codecraft.webmagic.downloader.HttpClientDownloader;
20  import us.codecraft.webmagic.pipeline.CollectorPipeline;
21  import us.codecraft.webmagic.pipeline.ConsolePipeline;
22  import us.codecraft.webmagic.pipeline.Pipeline;
23  import us.codecraft.webmagic.pipeline.ResultItemsCollectorPipeline;
24  import us.codecraft.webmagic.processor.PageProcessor;
25  import us.codecraft.webmagic.scheduler.QueueScheduler;
26  import us.codecraft.webmagic.scheduler.Scheduler;
27  import us.codecraft.webmagic.thread.CountableThreadPool;
28  import us.codecraft.webmagic.utils.UrlUtils;
29  import us.codecraft.webmagic.utils.WMCollections;
30  
31  /**
32   * Entrance of a crawler.<br>
33   * A spider contains four modules: Downloader, Scheduler, PageProcessor and
34   * Pipeline.<br>
35   * Every module is a field of Spider. <br>
36   * The modules are defined in interface. <br>
37   * You can customize a spider with various implementations of them. <br>
38   * Examples: <br>
39   * <br>
40   * A simple crawler: <br>
41   * Spider.create(new SimplePageProcessor("http://my.oschina.net/",
42   * "http://my.oschina.net/*blog/*")).run();<br>
43   * <br>
44   * Store results to files by FilePipeline: <br>
45   * Spider.create(new SimplePageProcessor("http://my.oschina.net/",
46   * "http://my.oschina.net/*blog/*")) <br>
47   * .pipeline(new FilePipeline("/data/temp/webmagic/")).run(); <br>
48   * <br>
49   * Use FileCacheQueueScheduler to store urls and cursor in files, so that a
50   * Spider can resume the status when shutdown. <br>
51   * Spider.create(new SimplePageProcessor("http://my.oschina.net/",
52   * "http://my.oschina.net/*blog/*")) <br>
53   * .scheduler(new FileCacheQueueScheduler("/data/temp/webmagic/cache/")).run(); <br>
54   *
55   * @author code4crafter@gmail.com <br>
56   * @see Downloader
57   * @see Scheduler
58   * @see PageProcessor
59   * @see Pipeline
60   * @since 0.1.0
61   */
62  public class Spider implements Runnable, Task {
63  
64      protected Downloader downloader;
65  
66      protected List<Pipeline> pipelines = new ArrayList<Pipeline>();
67  
68      protected PageProcessor pageProcessor;
69  
70      protected List<Request> startRequests;
71  
72      protected Site site;
73  
74      protected String uuid;
75      
76      protected SpiderScheduler scheduler;
77      
78      protected Logger logger = LoggerFactory.getLogger(getClass());
79  
80      protected CountableThreadPool threadPool;
81  
82      protected ExecutorService executorService;
83  
84      protected int threadNum = 1;
85  
86      protected AtomicInteger stat = new AtomicInteger(STAT_INIT);
87  
88      protected volatile boolean exitWhenComplete = true;
89  
90      protected final static int STAT_INIT = 0;
91  
92      protected final static int STAT_RUNNING = 1;
93  
94      protected final static int STAT_STOPPED = 2;
95  
96      protected boolean spawnUrl = true;
97  
98      protected boolean destroyWhenExit = true;
99  
100     private List<SpiderListener> spiderListeners;
101 
102     private final AtomicLong pageCount = new AtomicLong(0);
103 
104     private Date startTime;
105 
106     private long emptySleepTime = 30000;
107 
108     /**
109      * create a spider with pageProcessor.
110      *
111      * @param pageProcessor pageProcessor
112      * @return new spider
113      * @see PageProcessor
114      */
115     public static Spider create(PageProcessor pageProcessor) {
116         return new Spider(pageProcessor);
117     }
118 
119     /**
120      * create a spider with pageProcessor.
121      *
122      * @param pageProcessor pageProcessor
123      */
124     public Spider(PageProcessor pageProcessor) {
125         this.pageProcessor = pageProcessor;
126         this.site = pageProcessor.getSite();
127         this.scheduler = new SpiderScheduler(new QueueScheduler());
128     }
129 
130     /**
131      * Set startUrls of Spider.<br>
132      * Prior to startUrls of Site.
133      *
134      * @param startUrls startUrls
135      * @return this
136      */
137     public Spider startUrls(List<String> startUrls) {
138         checkIfRunning();
139         this.startRequests = UrlUtils.convertToRequests(startUrls);
140         return this;
141     }
142 
143     /**
144      * Set startUrls of Spider.<br>
145      * Prior to startUrls of Site.
146      *
147      * @param startRequests startRequests
148      * @return this
149      */
150     public Spider startRequest(List<Request> startRequests) {
151         checkIfRunning();
152         this.startRequests = startRequests;
153         return this;
154     }
155 
156     /**
157      * Set an uuid for spider.<br>
158      * Default uuid is domain of site.<br>
159      *
160      * @param uuid uuid
161      * @return this
162      */
163     public Spider setUUID(String uuid) {
164         this.uuid = uuid;
165         return this;
166     }
167 
168     /**
169      * set scheduler for Spider
170      *
171      * @param scheduler scheduler
172      * @return this
173      * @see #setScheduler(us.codecraft.webmagic.scheduler.Scheduler)
174      */
175     @Deprecated
176     public Spider scheduler(Scheduler scheduler) {
177         return setScheduler(scheduler);
178     }
179 
180     /**
181      * set scheduler for Spider
182      *
183      * @param updateScheduler scheduler
184      * @return this
185      * @see Scheduler
186      * @since 0.2.1
187      */
188     public Spider setScheduler(Scheduler updateScheduler) {
189         checkIfRunning();
190         Scheduler oldScheduler = scheduler.getScheduler();
191         scheduler.setScheduler(updateScheduler);
192         if (oldScheduler != null) {
193             Request request;
194             while ((request = oldScheduler.poll(this)) != null) {
195                 this.scheduler.push(request, this);
196             }
197         }
198         return this;
199     }
200 
201     /**
202      * add a pipeline for Spider
203      *
204      * @param pipeline pipeline
205      * @return this
206      * @see #addPipeline(us.codecraft.webmagic.pipeline.Pipeline)
207      * @deprecated
208      */
209     @Deprecated
210     public Spider pipeline(Pipeline pipeline) {
211         return addPipeline(pipeline);
212     }
213 
214     /**
215      * add a pipeline for Spider
216      *
217      * @param pipeline pipeline
218      * @return this
219      * @see Pipeline
220      * @since 0.2.1
221      */
222     public Spider addPipeline(Pipeline pipeline) {
223         checkIfRunning();
224         this.pipelines.add(pipeline);
225         return this;
226     }
227 
228     /**
229      * set pipelines for Spider
230      *
231      * @param pipelines pipelines
232      * @return this
233      * @see Pipeline
234      * @since 0.4.1
235      */
236     public Spider setPipelines(List<Pipeline> pipelines) {
237         checkIfRunning();
238         this.pipelines = pipelines;
239         return this;
240     }
241 
242     /**
243      * clear the pipelines set
244      *
245      * @return this
246      */
247     public Spider clearPipeline() {
248         pipelines = new ArrayList<Pipeline>();
249         return this;
250     }
251 
252     /**
253      * set the downloader of spider
254      *
255      * @param downloader downloader
256      * @return this
257      * @see #setDownloader(us.codecraft.webmagic.downloader.Downloader)
258      * @deprecated
259      */
260     @Deprecated
261     public Spider downloader(Downloader downloader) {
262         return setDownloader(downloader);
263     }
264 
265     /**
266      * set the downloader of spider
267      *
268      * @param downloader downloader
269      * @return this
270      * @see Downloader
271      */
272     public Spider setDownloader(Downloader downloader) {
273         checkIfRunning();
274         this.downloader = downloader;
275         return this;
276     }
277 
278     protected void initComponent() {
279         if (downloader == null) {
280             this.downloader = new HttpClientDownloader();
281         }
282         if (pipelines.isEmpty()) {
283             pipelines.add(new ConsolePipeline());
284         }
285         downloader.setThread(threadNum);
286         if (threadPool == null || threadPool.isShutdown()) {
287             if (executorService != null && !executorService.isShutdown()) {
288                 threadPool = new CountableThreadPool(threadNum, executorService);
289             } else {
290                 threadPool = new CountableThreadPool(threadNum);
291             }
292         }
293         if (startRequests != null) {
294             for (Request request : startRequests) {
295                 addRequest(request);
296             }
297             startRequests.clear();
298         }
299         startTime = new Date();
300     }
301 
302     @Override
303     public void run() {
304         checkRunningStat();
305         initComponent();
306         logger.info("Spider {} started!", getUUID());
307         // interrupt won't be necessarily detected
308         while (!Thread.currentThread().isInterrupted() && stat.get() == STAT_RUNNING) {
309             Request poll = scheduler.poll(this);
310             if (poll == null) {
311                 if (threadPool.getThreadAlive() == 0) {
312                     //no alive thread anymore , try again
313                     poll = scheduler.poll(this);
314                     if (poll == null) {
315                         if (exitWhenComplete) {
316                             break;
317                         } else {
318                             // wait
319                             try {
320                                 Thread.sleep(emptySleepTime);
321                                 continue;
322                             } catch (InterruptedException e) {
323                                 Thread.currentThread().interrupt();
324                                 break;
325                             }
326                         }
327                     }
328                 } else {
329                     // wait until new url added,
330                     if (scheduler.waitNewUrl(threadPool, emptySleepTime)) {
331                         // if interrupted
332                         break;
333                     }
334                     continue;
335                 }
336             }
337             final Request request = poll;
338             //this may swallow the interruption
339             threadPool.execute(new Runnable() {
340                 @Override
341                 public void run() {
342                     try {
343                         processRequest(request);
344                         onSuccess(request);
345                     } catch (Exception e) {
346                         onError(request, e);
347                         logger.error("process request " + request + " error", e);
348                     } finally {
349                         pageCount.incrementAndGet();
350                         scheduler.signalNewUrl();
351                     }
352                 }
353             });
354         }
355         stat.set(STAT_STOPPED);
356         // release some resources
357         if (destroyWhenExit) {
358             close();
359         }
360         logger.info("Spider {} closed! {} pages downloaded.", getUUID(), pageCount.get());
361     }
362 
363     /**
364      * @deprecated Use {@link #onError(Request, Exception)} instead.
365      */
366     @Deprecated
367     protected void onError(Request request) {
368     }
369 
370     protected void onError(Request request, Exception e) {
371         this.onError(request);
372 
373         if (CollectionUtils.isNotEmpty(spiderListeners)) {
374             for (SpiderListener spiderListener : spiderListeners) {
375                 spiderListener.onError(request, e);
376             }
377         }
378     }
379 
380     protected void onSuccess(Request request) {
381         if (CollectionUtils.isNotEmpty(spiderListeners)) {
382             for (SpiderListener spiderListener : spiderListeners) {
383                 spiderListener.onSuccess(request);
384             }
385         }
386     }
387 
388     private void checkRunningStat() {
389         while (true) {
390             int statNow = stat.get();
391             if (statNow == STAT_RUNNING) {
392                 throw new IllegalStateException("Spider is already running!");
393             }
394             if (stat.compareAndSet(statNow, STAT_RUNNING)) {
395                 break;
396             }
397         }
398     }
399 
400     public void close() {
401         destroyEach(downloader);
402         destroyEach(pageProcessor);
403         destroyEach(scheduler);
404         for (Pipeline pipeline : pipelines) {
405             destroyEach(pipeline);
406         }
407         threadPool.shutdown();
408     }
409 
410     private void destroyEach(Object object) {
411         if (object instanceof Closeable) {
412             try {
413                 ((Closeable) object).close();
414             } catch (IOException e) {
415                 e.printStackTrace();
416             }
417         }
418     }
419 
420     /**
421      * Process specific urls without url discovering.
422      *
423      * @param urls urls to process
424      */
425     public void test(String... urls) {
426         initComponent();
427         if (urls.length > 0) {
428             for (String url : urls) {
429                 processRequest(new Request(url));
430             }
431         }
432     }
433 
434     private void processRequest(Request request) {
435         Page page;
436         if (null != request.getDownloader()){
437             page = request.getDownloader().download(request,this);
438         }else {
439             page = downloader.download(request, this);
440         }
441         if (page.isDownloadSuccess()){
442             onDownloadSuccess(request, page);
443         } else {
444             onDownloaderFail(request);
445         }
446     }
447 
448     private void onDownloadSuccess(Request request, Page page) {
449         if (site.getAcceptStatCode().contains(page.getStatusCode())){
450             pageProcessor.process(page);
451             extractAndAddRequests(page, spawnUrl);
452             if (!page.getResultItems().isSkip()) {
453                 for (Pipeline pipeline : pipelines) {
454                     pipeline.process(page.getResultItems(), this);
455                 }
456             }
457         } else {
458             logger.info("page status code error, page {} , code: {}", request.getUrl(), page.getStatusCode());
459         }
460         sleep(site.getSleepTime());
461     }
462 
463     private void onDownloaderFail(Request request) {
464         if (site.getCycleRetryTimes() == 0) {
465             sleep(site.getSleepTime());
466         } else {
467             // for cycle retry
468             doCycleRetry(request);
469         }
470     }
471 
472     private void doCycleRetry(Request request) {
473         Object cycleTriedTimesObject = request.getExtra(Request.CYCLE_TRIED_TIMES);
474         if (cycleTriedTimesObject == null) {
475             addRequest(SerializationUtils.clone(request).setPriority(0).putExtra(Request.CYCLE_TRIED_TIMES, 1));
476         } else {
477             int cycleTriedTimes = (Integer) cycleTriedTimesObject;
478             cycleTriedTimes++;
479             if (cycleTriedTimes < site.getCycleRetryTimes()) {
480                 addRequest(SerializationUtils.clone(request).setPriority(0).putExtra(Request.CYCLE_TRIED_TIMES, cycleTriedTimes));
481             }
482         }
483         sleep(site.getRetrySleepTime());
484     }
485 
486     protected void sleep(int time) {
487         try {
488             Thread.sleep(time);
489         } catch (InterruptedException e) {
490             logger.error("Thread interrupted when sleep",e);
491             Thread.currentThread().interrupt();
492         }
493     }
494 
495     protected void extractAndAddRequests(Page page, boolean spawnUrl) {
496         if (spawnUrl && CollectionUtils.isNotEmpty(page.getTargetRequests())) {
497             for (Request request : page.getTargetRequests()) {
498                 addRequest(request);
499             }
500         }
501     }
502 
503     private void addRequest(Request request) {
504         if (site.getDomain() == null && request != null && request.getUrl() != null) {
505             site.setDomain(UrlUtils.getDomain(request.getUrl()));
506         }
507         scheduler.push(request, this);
508     }
509 
510     protected void checkIfRunning() {
511         if (stat.get() == STAT_RUNNING) {
512             throw new IllegalStateException("Spider is already running!");
513         }
514     }
515 
516     public void runAsync() {
517         Thread thread = new Thread(this);
518         thread.setDaemon(false);
519         thread.start();
520     }
521 
522     /**
523      * Add urls to crawl. <br>
524      *
525      * @param urls urls
526      * @return this
527      */
528     public Spider addUrl(String... urls) {
529         for (String url : urls) {
530             addRequest(new Request(url));
531         }
532         scheduler.signalNewUrl();
533         return this;
534     }
535 
536     /**
537      * Download urls synchronizing.
538      *
539      * @param urls urls
540      * @param <T> type of process result
541      * @return list downloaded
542      */
543     public <T> List<T> getAll(Collection<String> urls) {
544         destroyWhenExit = false;
545         spawnUrl = false;
546         if (startRequests!=null){
547             startRequests.clear();
548         }
549         for (Request request : UrlUtils.convertToRequests(urls)) {
550             addRequest(request);
551         }
552         CollectorPipeline collectorPipeline = getCollectorPipeline();
553         pipelines.add(collectorPipeline);
554         run();
555         spawnUrl = true;
556         destroyWhenExit = true;
557         return collectorPipeline.getCollected();
558     }
559 
560     protected CollectorPipeline getCollectorPipeline() {
561         return new ResultItemsCollectorPipeline();
562     }
563 
564     public <T> T get(String url) {
565         List<String> urls = WMCollections.newArrayList(url);
566         List<T> resultItemses = getAll(urls);
567         if (resultItemses != null && resultItemses.size() > 0) {
568             return resultItemses.get(0);
569         } else {
570             return null;
571         }
572     }
573 
574     /**
575      * Add urls with information to crawl.<br>
576      *
577      * @param requests requests
578      * @return this
579      */
580     public Spider addRequest(Request... requests) {
581         for (Request request : requests) {
582             addRequest(request);
583         }
584         scheduler.signalNewUrl();
585         return this;
586     }
587 
588     public void start() {
589         runAsync();
590     }
591 
592     public void stop() {
593         if (stat.compareAndSet(STAT_RUNNING, STAT_STOPPED)) {
594             logger.info("Spider " + getUUID() + " stop success!");
595         } else {
596             logger.info("Spider " + getUUID() + " stop fail!");
597         }
598     }
599 
600     /**
601      * Stop when all tasks in the queue are completed and all worker threads are also completed
602      */
603     public void stopWhenComplete(){
604         this.exitWhenComplete = true;
605     }
606 
607     /**
608      * start with more than one threads
609      *
610      * @param threadNum threadNum
611      * @return this
612      */
613     public Spider thread(int threadNum) {
614         checkIfRunning();
615         this.threadNum = threadNum;
616         if (threadNum <= 0) {
617             throw new IllegalArgumentException("threadNum should be more than one!");
618         }
619         return this;
620     }
621 
622     /**
623      * start with more than one threads
624      *
625      * @param executorService executorService to run the spider
626      * @param threadNum threadNum
627      * @return this
628      */
629     public Spider thread(ExecutorService executorService, int threadNum) {
630         checkIfRunning();
631         this.threadNum = threadNum;
632         if (threadNum <= 0) {
633             throw new IllegalArgumentException("threadNum should be more than one!");
634         }
635         this.executorService = executorService;
636         return this;
637     }
638 
639     public boolean isExitWhenComplete() {
640         return exitWhenComplete;
641     }
642 
643     /**
644      * Exit when complete. <br>
645      * True: exit when all url of the site is downloaded. <br>
646      * False: not exit until call stop() manually.<br>
647      *
648      * @param exitWhenComplete exitWhenComplete
649      * @return this
650      */
651     public Spider setExitWhenComplete(boolean exitWhenComplete) {
652         this.exitWhenComplete = exitWhenComplete;
653         return this;
654     }
655 
656     public boolean isSpawnUrl() {
657         return spawnUrl;
658     }
659 
660     /**
661      * Get page count downloaded by spider.
662      *
663      * @return total downloaded page count
664      * @since 0.4.1
665      */
666     public long getPageCount() {
667         return pageCount.get();
668     }
669 
670     /**
671      * Get running status by spider.
672      *
673      * @return running status
674      * @see Status
675      * @since 0.4.1
676      */
677     public Status getStatus() {
678         return Status.fromValue(stat.get());
679     }
680 
681 
682     public enum Status {
683         Init(0), Running(1), Stopped(2);
684 
685         private Status(int value) {
686             this.value = value;
687         }
688 
689         private int value;
690 
691         int getValue() {
692             return value;
693         }
694 
695         public static Status fromValue(int value) {
696             for (Status status : Status.values()) {
697                 if (status.getValue() == value) {
698                     return status;
699                 }
700             }
701             //default value
702             return Init;
703         }
704     }
705 
706     /**
707      * Get thread count which is running
708      *
709      * @return thread count which is running
710      * @since 0.4.1
711      */
712     public int getThreadAlive() {
713         if (threadPool == null) {
714             return 0;
715         }
716         return threadPool.getThreadAlive();
717     }
718 
719     /**
720      * Whether add urls extracted to download.<br>
721      * Add urls to download when it is true, and just download seed urls when it is false. <br>
722      * DO NOT set it unless you know what it means!
723      *
724      * @param spawnUrl spawnUrl
725      * @return this
726      * @since 0.4.0
727      */
728     public Spider setSpawnUrl(boolean spawnUrl) {
729         this.spawnUrl = spawnUrl;
730         return this;
731     }
732 
733     @Override
734     public String getUUID() {
735         if (uuid != null) {
736             return uuid;
737         }
738         if (site != null) {
739             return site.getDomain();
740         }
741         uuid = UUID.randomUUID().toString();
742         return uuid;
743     }
744 
745     public Spider setExecutorService(ExecutorService executorService) {
746         checkIfRunning();
747         this.executorService = executorService;
748         return this;
749     }
750 
751     @Override
752     public Site getSite() {
753         return site;
754     }
755 
756     public List<SpiderListener> getSpiderListeners() {
757         return spiderListeners;
758     }
759 
760     public Spider setSpiderListeners(List<SpiderListener> spiderListeners) {
761         this.spiderListeners = spiderListeners;
762         return this;
763     }
764 
765     public Date getStartTime() {
766         return startTime;
767     }
768 
769     public Scheduler getScheduler() {
770         return scheduler.getScheduler();
771     }
772 
773     /**
774      * Set wait time when no url is polled.<br><br>
775      *
776      * @param emptySleepTime In MILLISECONDS.
777      * @return this
778      */
779     public Spider setEmptySleepTime(long emptySleepTime) {
780         if(emptySleepTime<=0){
781             throw new IllegalArgumentException("emptySleepTime should be more than zero!");
782         }
783         this.emptySleepTime = emptySleepTime;
784         return this;
785     }
786 }