View Javadoc
1   package us.codecraft.webmagic.recover;
2   
3   import com.google.common.base.Charsets;
4   import com.google.common.hash.BloomFilter;
5   import com.google.common.hash.Funnels;
6   import org.mapdb.DB;
7   import org.mapdb.DBMaker;
8   import org.mapdb.IndexTreeList;
9   import org.mapdb.Serializer;
10  import us.codecraft.webmagic.Request;
11  import us.codecraft.webmagic.Task;
12  import us.codecraft.webmagic.scheduler.component.DuplicateRemover;
13  
14  import java.util.concurrent.atomic.AtomicInteger;
15  
16  /**
17   * @author :linweisen
18   */
19  public class DuplicateStorageRemover implements DuplicateRemover {
20  
21      private DB db;
22  
23      private static String DATABASE_NAME = "duplicate";
24  
25      private IndexTreeList<String> urlDuplicateQueue;
26  
27      private BloomFilter<CharSequence> bloomFilter;
28  
29      private AtomicInteger counter;
30  
31      public DuplicateStorageRemover(String path) {
32  
33          String duplicatStoragePath = path;
34  
35          DB db = DBMaker.fileDB(duplicatStoragePath)
36                  .fileMmapEnableIfSupported()
37                  .fileMmapPreclearDisable()
38                  .cleanerHackEnable()
39                  .closeOnJvmShutdown()
40                  .transactionEnable()
41                  .concurrencyScale(128)
42                  .make();
43          this.db = db;
44  
45          this.urlDuplicateQueue = db.indexTreeList(DATABASE_NAME, Serializer.STRING).createOrOpen();
46  
47          counter = new AtomicInteger(this.urlDuplicateQueue.size());
48          this.bloomFilter = BloomFilter.create(Funnels.stringFunnel(Charsets.UTF_8), 200000, 1E-7);
49          for (String url : this.urlDuplicateQueue){
50              bloomFilter.put(url);
51          }
52  
53      }
54  
55      @Override
56      public boolean isDuplicate(Request request, Task task) {
57          String url = request.getUrl();
58          boolean isDuplicate = bloomFilter.mightContain(url);
59          if (!isDuplicate) {
60              bloomFilter.put(url);
61              urlDuplicateQueue.add(url);
62              this.db.commit();
63              counter.incrementAndGet();
64          }
65          return isDuplicate;
66      }
67  
68      @Override
69      public void resetDuplicateCheck(Task task) {
70          this.bloomFilter = BloomFilter.create(Funnels.stringFunnel(Charsets.UTF_8), 200000, 1E-7);
71          this.urlDuplicateQueue.clear();
72      }
73  
74      @Override
75      public int getTotalRequestsCount(Task task) {
76          return counter.get();
77      }
78  }