1 package us.codecraft.webmagic.scheduler;
2
3 import java.util.Set;
4
5 import org.apache.commons.codec.digest.DigestUtils;
6 import org.apache.commons.lang3.StringUtils;
7
8 import com.alibaba.fastjson.JSON;
9
10 import redis.clients.jedis.Jedis;
11 import redis.clients.jedis.JedisPool;
12 import us.codecraft.webmagic.Request;
13 import us.codecraft.webmagic.Task;
14
15
16
17
18
19
20 public class RedisPriorityScheduler extends RedisScheduler {
21
22 private static final String ZSET_PREFIX = "zset_";
23
24 private static final String QUEUE_PREFIX = "queue_";
25
26 private static final String NO_PRIORITY_SUFFIX = "_zore";
27
28 private static final String PLUS_PRIORITY_SUFFIX = "_plus";
29
30 private static final String MINUS_PRIORITY_SUFFIX = "_minus";
31
32 public RedisPriorityScheduler(String host) {
33 super(host);
34 }
35
36 public RedisPriorityScheduler(JedisPool pool) {
37 super(pool);
38 }
39
40 @Override
41 protected void pushWhenNoDuplicate(Request request, Task task) {
42 try (Jedis jedis = pool.getResource()) {
43 if (request.getPriority() > 0) {
44 jedis.zadd(getZsetPlusPriorityKey(task), request.getPriority(), request.getUrl());
45 } else if (request.getPriority() < 0) {
46 jedis.zadd(getZsetMinusPriorityKey(task), request.getPriority(), request.getUrl());
47 } else {
48 jedis.lpush(getQueueNoPriorityKey(task), request.getUrl());
49 }
50
51 setExtrasInItem(jedis, request, task);
52 }
53 }
54
55 @Override
56 public synchronized Request poll(Task task) {
57 try (Jedis jedis = pool.getResource()) {
58 String url = getRequest(jedis, task);
59 if (StringUtils.isBlank(url)) {
60 return null;
61 }
62 return getExtrasInItem(jedis, url, task);
63 }
64 }
65
66 private String getRequest(Jedis jedis, Task task) {
67 String url;
68 Set<String> urls = jedis.zrevrange(getZsetPlusPriorityKey(task), 0, 0);
69 if (urls.isEmpty()) {
70 url = jedis.lpop(getQueueNoPriorityKey(task));
71 if (StringUtils.isBlank(url)) {
72 urls = jedis.zrevrange(getZsetMinusPriorityKey(task), 0, 0);
73 if (!urls.isEmpty()) {
74 url = urls.toArray(new String[0])[0];
75 jedis.zrem(getZsetMinusPriorityKey(task), url);
76 }
77 }
78 } else {
79 url = urls.toArray(new String[0])[0];
80 jedis.zrem(getZsetPlusPriorityKey(task), url);
81 }
82 return url;
83 }
84
85 @Override
86 public void resetDuplicateCheck(Task task) {
87 try (Jedis jedis = pool.getResource()) {
88 jedis.del(getSetKey(task));
89 }
90 }
91
92 private String getZsetPlusPriorityKey(Task task) {
93 return ZSET_PREFIX + task.getUUID() + PLUS_PRIORITY_SUFFIX;
94 }
95
96 private String getQueueNoPriorityKey(Task task) {
97 return QUEUE_PREFIX + task.getUUID() + NO_PRIORITY_SUFFIX;
98 }
99
100 private String getZsetMinusPriorityKey(Task task) {
101 return ZSET_PREFIX + task.getUUID() + MINUS_PRIORITY_SUFFIX;
102 }
103
104 private void setExtrasInItem(Jedis jedis,Request request, Task task) {
105 if (!request.getExtras().isEmpty()) {
106 String field = DigestUtils.sha1Hex(request.getUrl());
107 String value = JSON.toJSONString(request);
108 jedis.hset(getItemKey(task), field, value);
109 }
110 }
111
112 private Request getExtrasInItem(Jedis jedis, String url, Task task) {
113 String key = getItemKey(task);
114 String field = DigestUtils.sha1Hex(url);
115 byte[] bytes = jedis.hget(key.getBytes(), field.getBytes());
116 if (bytes != null) {
117 return JSON.parseObject(new String(bytes), Request.class);
118 }
119 return new Request(url);
120 }
121 }