DelayQueueScheduler.java
package us.codecraft.webmagic.samples.scheduler;
import us.codecraft.webmagic.Request;
import us.codecraft.webmagic.Task;
import us.codecraft.webmagic.scheduler.PriorityScheduler;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
/**
* @author code4crafter@gmail.com
*/
public class DelayQueueScheduler extends PriorityScheduler {
private DelayQueue<RequestWrapper> queue = new DelayQueue<RequestWrapper>();
private Set<String> urls = new HashSet<String>();
private long time;
private TimeUnit timeUnit;
private class RequestWrapper implements Delayed {
private long startTime = System.currentTimeMillis();
private Request request;
private RequestWrapper(Request request) {
this.request = request;
}
private long getStartTime() {
return startTime;
}
private Request getRequest() {
return request;
}
@Override
public long getDelay(TimeUnit unit) {
long convert = unit.convert(TimeUnit.MILLISECONDS.convert(time, timeUnit) - System.currentTimeMillis() + startTime, TimeUnit.MILLISECONDS);
return convert;
}
@Override
public int compareTo(Delayed o) {
return new Long(getDelay(TimeUnit.MILLISECONDS)).compareTo(o.getDelay(TimeUnit.MILLISECONDS));
}
}
public DelayQueueScheduler(long time, TimeUnit timeUnit) {
this.time = time;
this.timeUnit = timeUnit;
}
@Override
public synchronized void push(Request request, Task task) {
if (urls.add(request.getUrl())) {
queue.add(new RequestWrapper(request));
}
}
@Override
public synchronized Request poll(Task task) {
RequestWrapper take = null;
while (take == null) {
try {
take = queue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.add(new RequestWrapper(take.getRequest()));
return take.getRequest();
}
}