1 package us.codecraft.webmagic.recover;
2
3 import com.fasterxml.jackson.databind.ObjectMapper;
4 import org.apache.commons.lang3.StringUtils;
5 import org.mapdb.DB;
6 import org.mapdb.DBMaker;
7 import org.mapdb.IndexTreeList;
8 import org.mapdb.Serializer;
9 import us.codecraft.webmagic.Request;
10 import us.codecraft.webmagic.Task;
11 import us.codecraft.webmagic.scheduler.DuplicateRemovedScheduler;
12 import us.codecraft.webmagic.scheduler.component.DuplicateRemover;
13
14 import java.io.IOException;
15
16
17
18
19 public class MmapQueueScheduler extends DuplicateRemovedScheduler {
20
21 private DB db;
22
23 private static String DATABASE_NAME = "queue";
24
25 private IndexTreeList<String> queue;
26
27 private static ObjectMapper mapper;
28
29 public MmapQueueScheduler(DuplicateRemover duplicateRemover, String path) {
30 super.setDuplicateRemover(duplicateRemover);
31
32 String queuePath = path;
33
34 DB db = DBMaker.fileDB(queuePath)
35 .fileMmapEnableIfSupported()
36 .fileMmapPreclearDisable()
37 .cleanerHackEnable()
38 .closeOnJvmShutdown()
39 .transactionEnable()
40 .concurrencyScale(128)
41 .make();
42 this.db = db;
43 this.mapper = new ObjectMapper();
44 this.queue = db.indexTreeList(MmapQueueScheduler.DATABASE_NAME, Serializer.STRING).createOrOpen();
45 }
46
47 @Override
48 public Request poll(Task task) {
49 if (this.queue.size() > 0){
50 String s = queue.remove(0);
51 return fromJson(s, Request.class);
52 }else{
53 return null;
54 }
55
56 }
57
58 @Override
59 public void pushWhenNoDuplicate(Request request, Task task) {
60 queue.add(toJson(request));
61 this.db.commit();
62 }
63
64 public String toJson(Object object) {
65 try {
66 return mapper.writeValueAsString(object);
67 } catch (IOException e) {
68 logger.warn("write to json string error:" + object, e);
69 return null;
70 }
71 }
72
73 public <T> T fromJson(String jsonString, Class<T> clazz) {
74 if (StringUtils.isEmpty(jsonString)) {
75 return null;
76 }
77 try {
78 return mapper.readValue(jsonString, clazz);
79 } catch (IOException e) {
80 logger.warn("parse json string error:" + jsonString, e);
81 return null;
82 }
83 }
84
85 }