MmapQueueScheduler.java
package us.codecraft.webmagic.recover;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.lang3.StringUtils;
import org.mapdb.DB;
import org.mapdb.DBMaker;
import org.mapdb.IndexTreeList;
import org.mapdb.Serializer;
import us.codecraft.webmagic.Request;
import us.codecraft.webmagic.Task;
import us.codecraft.webmagic.scheduler.DuplicateRemovedScheduler;
import us.codecraft.webmagic.scheduler.component.DuplicateRemover;
import java.io.IOException;
/**
* @author :linweisen
*/
public class MmapQueueScheduler extends DuplicateRemovedScheduler {
private DB db;
private static String DATABASE_NAME = "queue";
private IndexTreeList<String> queue;
private static ObjectMapper mapper;
public MmapQueueScheduler(DuplicateRemover duplicateRemover, String path) {
super.setDuplicateRemover(duplicateRemover);
String queuePath = path;
DB db = DBMaker.fileDB(queuePath)
.fileMmapEnableIfSupported()
.fileMmapPreclearDisable()
.cleanerHackEnable()
.closeOnJvmShutdown()
.transactionEnable()
.concurrencyScale(128)
.make();
this.db = db;
this.mapper = new ObjectMapper();
this.queue = db.indexTreeList(MmapQueueScheduler.DATABASE_NAME, Serializer.STRING).createOrOpen();
}
@Override
public Request poll(Task task) {
if (this.queue.size() > 0){
String s = queue.remove(0);
return fromJson(s, Request.class);
}else{
return null;
}
}
@Override
public void pushWhenNoDuplicate(Request request, Task task) {
queue.add(toJson(request));
this.db.commit();
}
public String toJson(Object object) {
try {
return mapper.writeValueAsString(object);
} catch (IOException e) {
logger.warn("write to json string error:" + object, e);
return null;
}
}
public <T> T fromJson(String jsonString, Class<T> clazz) {
if (StringUtils.isEmpty(jsonString)) {
return null;
}
try {
return mapper.readValue(jsonString, clazz);
} catch (IOException e) {
logger.warn("parse json string error:" + jsonString, e);
return null;
}
}
}