View Javadoc
1   package us.codecraft.webmagic.recover;
2   
3   import com.fasterxml.jackson.databind.ObjectMapper;
4   import org.apache.commons.lang3.StringUtils;
5   import org.mapdb.DB;
6   import org.mapdb.DBMaker;
7   import org.mapdb.IndexTreeList;
8   import org.mapdb.Serializer;
9   import us.codecraft.webmagic.Request;
10  import us.codecraft.webmagic.Task;
11  import us.codecraft.webmagic.scheduler.DuplicateRemovedScheduler;
12  import us.codecraft.webmagic.scheduler.component.DuplicateRemover;
13  
14  import java.io.IOException;
15  
16  /**
17   * @author :linweisen
18   */
19  public class MmapQueueScheduler extends DuplicateRemovedScheduler {
20  
21      private DB db;
22  
23      private static String DATABASE_NAME = "queue";
24  
25      private IndexTreeList<String> queue;
26  
27      private static ObjectMapper mapper;
28  
29      public MmapQueueScheduler(DuplicateRemover duplicateRemover, String path) {
30          super.setDuplicateRemover(duplicateRemover);
31  
32          String queuePath = path;
33  
34          DB db = DBMaker.fileDB(queuePath)
35                  .fileMmapEnableIfSupported()
36                  .fileMmapPreclearDisable()
37                  .cleanerHackEnable()
38                  .closeOnJvmShutdown()
39                  .transactionEnable()
40                  .concurrencyScale(128)
41                  .make();
42          this.db = db;
43          this.mapper = new ObjectMapper();
44          this.queue = db.indexTreeList(MmapQueueScheduler.DATABASE_NAME, Serializer.STRING).createOrOpen();
45      }
46  
47      @Override
48      public Request poll(Task task) {
49          if (this.queue.size() > 0){
50              String s = queue.remove(0);
51              return fromJson(s, Request.class);
52          }else{
53              return null;
54          }
55  
56      }
57  
58      @Override
59      public void pushWhenNoDuplicate(Request request, Task task) {
60          queue.add(toJson(request));
61          this.db.commit();
62      }
63  
64      public String toJson(Object object) {
65          try {
66              return mapper.writeValueAsString(object);
67          } catch (IOException e) {
68              logger.warn("write to json string error:" + object, e);
69              return null;
70          }
71      }
72  
73      public <T> T fromJson(String jsonString, Class<T> clazz) {
74          if (StringUtils.isEmpty(jsonString)) {
75              return null;
76          }
77          try {
78              return mapper.readValue(jsonString, clazz);
79          } catch (IOException e) {
80              logger.warn("parse json string error:" + jsonString, e);
81              return null;
82          }
83      }
84  
85  }