1 package us.codecraft.webmagic.samples.scheduler;
2
3 import us.codecraft.webmagic.Request;
4 import us.codecraft.webmagic.Task;
5 import us.codecraft.webmagic.scheduler.PriorityScheduler;
6
7 import java.util.HashSet;
8 import java.util.Set;
9 import java.util.concurrent.DelayQueue;
10 import java.util.concurrent.Delayed;
11 import java.util.concurrent.TimeUnit;
12
13
14
15
16 public class DelayQueueScheduler extends PriorityScheduler {
17
18 private DelayQueue<RequestWrapper> queue = new DelayQueue<RequestWrapper>();
19
20 private Set<String> urls = new HashSet<String>();
21
22 private long time;
23
24 private TimeUnit timeUnit;
25
26 private class RequestWrapper implements Delayed {
27
28 private long startTime = System.currentTimeMillis();
29
30 private Request request;
31
32 private RequestWrapper(Request request) {
33 this.request = request;
34 }
35
36 private long getStartTime() {
37 return startTime;
38 }
39
40 private Request getRequest() {
41 return request;
42 }
43
44 @Override
45 public long getDelay(TimeUnit unit) {
46 long convert = unit.convert(TimeUnit.MILLISECONDS.convert(time, timeUnit) - System.currentTimeMillis() + startTime, TimeUnit.MILLISECONDS);
47 return convert;
48 }
49
50 @Override
51 public int compareTo(Delayed o) {
52 return new Long(getDelay(TimeUnit.MILLISECONDS)).compareTo(o.getDelay(TimeUnit.MILLISECONDS));
53 }
54 }
55
56 public DelayQueueScheduler(long time, TimeUnit timeUnit) {
57 this.time = time;
58 this.timeUnit = timeUnit;
59 }
60
61 @Override
62 public synchronized void push(Request request, Task task) {
63 if (urls.add(request.getUrl())) {
64 queue.add(new RequestWrapper(request));
65 }
66
67 }
68
69 @Override
70 public synchronized Request poll(Task task) {
71 RequestWrapper take = null;
72 while (take == null) {
73 try {
74 take = queue.take();
75 } catch (InterruptedException e) {
76 e.printStackTrace();
77 }
78 }
79 queue.add(new RequestWrapper(take.getRequest()));
80 return take.getRequest();
81 }
82 }