View Javadoc
1   package us.codecraft.webmagic.scheduler;
2   
3   import java.util.Set;
4   
5   import org.apache.commons.codec.digest.DigestUtils;
6   import org.apache.commons.lang3.StringUtils;
7   
8   import com.alibaba.fastjson.JSON;
9   
10  import redis.clients.jedis.Jedis;
11  import redis.clients.jedis.JedisPool;
12  import us.codecraft.webmagic.Request;
13  import us.codecraft.webmagic.Task;
14  
15  /**
16   * the redis scheduler with priority
17   * @author sai
18   * Created by sai on 16-5-27.
19   */
20  public class RedisPriorityScheduler extends RedisScheduler {
21  
22      private static final String ZSET_PREFIX = "zset_";
23  
24      private static final String QUEUE_PREFIX = "queue_";
25  
26      private static final String NO_PRIORITY_SUFFIX = "_zore";
27  
28      private static final String PLUS_PRIORITY_SUFFIX    = "_plus";
29  
30      private static final String MINUS_PRIORITY_SUFFIX   = "_minus";
31  
32      public RedisPriorityScheduler(String host) {
33          super(host);
34      }
35  
36      public RedisPriorityScheduler(JedisPool pool) {
37          super(pool);
38      }
39  
40      @Override
41      protected void pushWhenNoDuplicate(Request request, Task task) {
42          try (Jedis jedis = pool.getResource()) {
43              if (request.getPriority() > 0) {
44                  jedis.zadd(getZsetPlusPriorityKey(task), request.getPriority(), request.getUrl());
45              } else if (request.getPriority() < 0) {
46                  jedis.zadd(getZsetMinusPriorityKey(task), request.getPriority(), request.getUrl());
47              } else {
48                  jedis.lpush(getQueueNoPriorityKey(task), request.getUrl());
49              }
50  
51              setExtrasInItem(jedis, request, task);
52          }
53      }
54  
55      @Override
56      public synchronized Request poll(Task task) {
57          try (Jedis jedis = pool.getResource()) {
58              String url = getRequest(jedis, task);
59              if (StringUtils.isBlank(url)) {
60                  return null;
61              }
62              return getExtrasInItem(jedis, url, task);
63          }
64      }
65  
66      private String getRequest(Jedis jedis, Task task) {
67          String url;
68          Set<String> urls = jedis.zrevrange(getZsetPlusPriorityKey(task), 0, 0);
69          if (urls.isEmpty()) {
70              url = jedis.lpop(getQueueNoPriorityKey(task));
71              if (StringUtils.isBlank(url)) {
72                  urls = jedis.zrevrange(getZsetMinusPriorityKey(task), 0, 0);
73                  if (!urls.isEmpty()) {
74                      url = urls.toArray(new String[0])[0];
75                      jedis.zrem(getZsetMinusPriorityKey(task), url);
76                  }
77              }
78          } else {
79              url = urls.toArray(new String[0])[0];
80              jedis.zrem(getZsetPlusPriorityKey(task), url);
81          }
82          return url;
83      }
84  
85      @Override
86      public void resetDuplicateCheck(Task task) {
87          try (Jedis jedis = pool.getResource()) {
88              jedis.del(getSetKey(task));
89          }
90      }
91  
92      private String getZsetPlusPriorityKey(Task task) {
93          return ZSET_PREFIX + task.getUUID() + PLUS_PRIORITY_SUFFIX;
94      }
95  
96      private String getQueueNoPriorityKey(Task task) {
97          return QUEUE_PREFIX + task.getUUID() + NO_PRIORITY_SUFFIX;
98      }
99  
100     private String getZsetMinusPriorityKey(Task task) {
101         return ZSET_PREFIX + task.getUUID() + MINUS_PRIORITY_SUFFIX;
102     }
103 
104     private void setExtrasInItem(Jedis jedis,Request request, Task task) {
105         if (!request.getExtras().isEmpty()) {
106             String field = DigestUtils.sha1Hex(request.getUrl());
107             String value = JSON.toJSONString(request);
108             jedis.hset(getItemKey(task), field, value);
109         }
110     }
111 
112     private Request getExtrasInItem(Jedis jedis, String url, Task task) {
113         String key      = getItemKey(task);
114         String field    = DigestUtils.sha1Hex(url);
115         byte[] bytes    = jedis.hget(key.getBytes(), field.getBytes());
116         if (bytes != null) {
117             return JSON.parseObject(new String(bytes), Request.class);
118         }
119         return new Request(url);
120     }
121 }