1 package us.codecraft.webmagic.scheduler;
2
3 import org.apache.commons.codec.digest.DigestUtils;
4 import org.apache.commons.lang3.StringUtils;
5
6 import com.alibaba.fastjson.JSON;
7
8 import redis.clients.jedis.Jedis;
9 import redis.clients.jedis.JedisPool;
10 import redis.clients.jedis.JedisPoolConfig;
11 import us.codecraft.webmagic.Request;
12 import us.codecraft.webmagic.Task;
13 import us.codecraft.webmagic.scheduler.component.DuplicateRemover;
14
15
16
17
18
19
20
21 public class RedisScheduler extends DuplicateRemovedScheduler implements MonitorableScheduler, DuplicateRemover {
22
23 protected JedisPool pool;
24
25 private static final String QUEUE_PREFIX = "queue_";
26
27 private static final String SET_PREFIX = "set_";
28
29 private static final String ITEM_PREFIX = "item_";
30
31 public RedisScheduler(String host) {
32 this(new JedisPool(new JedisPoolConfig(), host));
33 }
34
35 public RedisScheduler(JedisPool pool) {
36 this.pool = pool;
37 setDuplicateRemover(this);
38 }
39
40 @Override
41 public void resetDuplicateCheck(Task task) {
42 try (Jedis jedis = pool.getResource()) {
43 jedis.del(getSetKey(task));
44 }
45 }
46
47 @Override
48 public boolean isDuplicate(Request request, Task task) {
49 try (Jedis jedis = pool.getResource()) {
50 return jedis.sadd(getSetKey(task), request.getUrl()) == 0;
51 }
52
53 }
54
55 @Override
56 protected void pushWhenNoDuplicate(Request request, Task task) {
57 Jedis jedis = pool.getResource();
58 try {
59 jedis.rpush(getQueueKey(task), request.getUrl());
60 if (checkForAdditionalInfo(request)) {
61 String field = DigestUtils.sha1Hex(request.getUrl());
62 String value = JSON.toJSONString(request);
63 jedis.hset((ITEM_PREFIX + task.getUUID()), field, value);
64 }
65 } finally {
66 jedis.close();
67 }
68 }
69
70 private boolean checkForAdditionalInfo(Request request) {
71 if (request == null) {
72 return false;
73 }
74
75 if (!request.getHeaders().isEmpty() || !request.getCookies().isEmpty()) {
76 return true;
77 }
78
79 if (StringUtils.isNotBlank(request.getCharset()) || StringUtils.isNotBlank(request.getMethod())) {
80 return true;
81 }
82
83 if (request.isBinaryContent() || request.getRequestBody() != null) {
84 return true;
85 }
86
87 if (!request.getExtras().isEmpty()) {
88 return true;
89 }
90 if (request.getPriority() != 0L) {
91 return true;
92 }
93
94 return false;
95 }
96
97 @Override
98 public synchronized Request poll(Task task) {
99 try (Jedis jedis = pool.getResource()) {
100 String url = jedis.lpop(getQueueKey(task));
101 if (url == null) {
102 return null;
103 }
104 String key = ITEM_PREFIX + task.getUUID();
105 String field = DigestUtils.sha1Hex(url);
106 byte[] bytes = jedis.hget(key.getBytes(), field.getBytes());
107 if (bytes != null) {
108 Request o = JSON.parseObject(new String(bytes), Request.class);
109 return o;
110 }
111 Request request = new Request(url);
112 return request;
113 }
114 }
115
116 protected String getSetKey(Task task) {
117 return SET_PREFIX + task.getUUID();
118 }
119
120 protected String getQueueKey(Task task) {
121 return QUEUE_PREFIX + task.getUUID();
122 }
123
124 protected String getItemKey(Task task) {
125 return ITEM_PREFIX + task.getUUID();
126 }
127
128 @Override
129 public int getLeftRequestsCount(Task task) {
130 try (Jedis jedis = pool.getResource()) {
131 Long size = jedis.llen(getQueueKey(task));
132 return size.intValue();
133 }
134 }
135
136 @Override
137 public int getTotalRequestsCount(Task task) {
138 try (Jedis jedis = pool.getResource()) {
139 Long size = jedis.scard(getSetKey(task));
140 return size.intValue();
141 }
142 }
143 }