View Javadoc
1   package us.codecraft.webmagic.samples.scheduler;
2   
3   import us.codecraft.webmagic.Request;
4   import us.codecraft.webmagic.Task;
5   import us.codecraft.webmagic.scheduler.PriorityScheduler;
6   
7   import java.util.HashSet;
8   import java.util.Set;
9   import java.util.concurrent.DelayQueue;
10  import java.util.concurrent.Delayed;
11  import java.util.concurrent.TimeUnit;
12  
13  /**
14   * @author code4crafter@gmail.com
15   */
16  public class DelayQueueScheduler extends PriorityScheduler {
17  
18      private DelayQueue<RequestWrapper> queue = new DelayQueue<RequestWrapper>();
19  
20      private Set<String> urls = new HashSet<String>();
21  
22      private long time;
23  
24      private TimeUnit timeUnit;
25  
26      private class RequestWrapper implements Delayed {
27  
28          private long startTime = System.currentTimeMillis();
29  
30          private Request request;
31  
32          private RequestWrapper(Request request) {
33              this.request = request;
34          }
35  
36          private long getStartTime() {
37              return startTime;
38          }
39  
40          private Request getRequest() {
41              return request;
42          }
43  
44          @Override
45          public long getDelay(TimeUnit unit) {
46              long convert = unit.convert(TimeUnit.MILLISECONDS.convert(time, timeUnit) - System.currentTimeMillis() + startTime, TimeUnit.MILLISECONDS);
47              return convert;
48          }
49  
50          @Override
51          public int compareTo(Delayed o) {
52              return new Long(getDelay(TimeUnit.MILLISECONDS)).compareTo(o.getDelay(TimeUnit.MILLISECONDS));
53          }
54      }
55  
56      public DelayQueueScheduler(long time, TimeUnit timeUnit) {
57          this.time = time;
58          this.timeUnit = timeUnit;
59      }
60  
61      @Override
62      public synchronized void push(Request request, Task task) {
63          if (urls.add(request.getUrl())) {
64              queue.add(new RequestWrapper(request));
65          }
66  
67      }
68  
69      @Override
70      public synchronized Request poll(Task task) {
71          RequestWrapper take = null;
72          while (take == null) {
73              try {
74                  take = queue.take();
75              } catch (InterruptedException e) {
76                  e.printStackTrace();
77              }
78          }
79          queue.add(new RequestWrapper(take.getRequest()));
80          return take.getRequest();
81      }
82  }