View Javadoc
1   package us.codecraft.webmagic.scheduler;
2   
3   import org.apache.commons.codec.digest.DigestUtils;
4   import org.apache.commons.lang3.StringUtils;
5   
6   import com.alibaba.fastjson.JSON;
7   
8   import redis.clients.jedis.Jedis;
9   import redis.clients.jedis.JedisPool;
10  import redis.clients.jedis.JedisPoolConfig;
11  import us.codecraft.webmagic.Request;
12  import us.codecraft.webmagic.Task;
13  import us.codecraft.webmagic.scheduler.component.DuplicateRemover;
14  
15  /**
16   * Use Redis as url scheduler for distributed crawlers.<br>
17   *
18   * @author code4crafter@gmail.com <br>
19   * @since 0.2.0
20   */
21  public class RedisScheduler extends DuplicateRemovedScheduler implements MonitorableScheduler, DuplicateRemover {
22  
23      protected JedisPool pool;
24  
25      private static final String QUEUE_PREFIX = "queue_";
26  
27      private static final String SET_PREFIX = "set_";
28  
29      private static final String ITEM_PREFIX = "item_";
30  
31      public RedisScheduler(String host) {
32          this(new JedisPool(new JedisPoolConfig(), host));
33      }
34  
35      public RedisScheduler(JedisPool pool) {
36          this.pool = pool;
37          setDuplicateRemover(this);
38      }
39  
40      @Override
41      public void resetDuplicateCheck(Task task) {
42          try (Jedis jedis = pool.getResource()) {
43              jedis.del(getSetKey(task));
44          }
45      }
46  
47      @Override
48      public boolean isDuplicate(Request request, Task task) {
49  		try (Jedis jedis = pool.getResource()) {
50              return jedis.sadd(getSetKey(task), request.getUrl()) == 0;
51          }
52  
53      }
54  
55      @Override
56      protected void pushWhenNoDuplicate(Request request, Task task) {
57          Jedis jedis = pool.getResource();
58          try {
59              jedis.rpush(getQueueKey(task), request.getUrl());
60              if (checkForAdditionalInfo(request)) {
61                  String field = DigestUtils.sha1Hex(request.getUrl());
62                  String value = JSON.toJSONString(request);
63                  jedis.hset((ITEM_PREFIX + task.getUUID()), field, value);
64              }
65          } finally {
66              jedis.close();
67          }
68      }
69  
70      private boolean checkForAdditionalInfo(Request request) {
71          if (request == null) {
72              return false;
73          }
74  
75          if (!request.getHeaders().isEmpty() || !request.getCookies().isEmpty()) {
76              return true;
77          }
78  
79          if (StringUtils.isNotBlank(request.getCharset()) || StringUtils.isNotBlank(request.getMethod())) {
80              return true;
81          }
82  
83          if (request.isBinaryContent() || request.getRequestBody() != null) {
84              return true;
85          }
86  
87          if (!request.getExtras().isEmpty()) {
88              return true;
89          }
90          if (request.getPriority() != 0L) {
91              return true;
92          }
93  
94          return false;
95      }
96  
97      @Override
98      public synchronized Request poll(Task task) {
99  		try (Jedis jedis = pool.getResource()) {
100             String url = jedis.lpop(getQueueKey(task));
101             if (url == null) {
102                 return null;
103             }
104             String key = ITEM_PREFIX + task.getUUID();
105             String field = DigestUtils.sha1Hex(url);
106             byte[] bytes = jedis.hget(key.getBytes(), field.getBytes());
107             if (bytes != null) {
108                 Request o = JSON.parseObject(new String(bytes), Request.class);
109                 return o;
110             }
111             Request request = new Request(url);
112             return request;
113         }
114     }
115 
116     protected String getSetKey(Task task) {
117         return SET_PREFIX + task.getUUID();
118     }
119 
120     protected String getQueueKey(Task task) {
121         return QUEUE_PREFIX + task.getUUID();
122     }
123 
124     protected String getItemKey(Task task) {
125         return ITEM_PREFIX + task.getUUID();
126     }
127 
128     @Override
129     public int getLeftRequestsCount(Task task) {
130         try (Jedis jedis = pool.getResource()) {
131             Long size = jedis.llen(getQueueKey(task));
132             return size.intValue();
133         }
134     }
135 
136     @Override
137     public int getTotalRequestsCount(Task task) {
138         try (Jedis jedis = pool.getResource()) {
139             Long size = jedis.scard(getSetKey(task));
140             return size.intValue();
141         }
142     }
143 }