1 package org.oxerr.webmagic.proxy;
2
3 import java.io.Externalizable;
4 import java.io.IOException;
5 import java.io.ObjectInput;
6 import java.io.ObjectOutput;
7 import java.time.Duration;
8 import java.time.Instant;
9 import java.util.ArrayList;
10 import java.util.Collections;
11 import java.util.HashMap;
12 import java.util.List;
13 import java.util.Map;
14 import java.util.NoSuchElementException;
15 import java.util.concurrent.DelayQueue;
16 import java.util.concurrent.TimeUnit;
17 import java.util.stream.Collectors;
18
19 import org.apache.commons.lang3.RandomUtils;
20 import org.apache.logging.log4j.LogManager;
21 import org.apache.logging.log4j.Logger;
22
23 import us.codecraft.webmagic.Page;
24 import us.codecraft.webmagic.Request;
25 import us.codecraft.webmagic.Task;
26 import us.codecraft.webmagic.proxy.Proxy;
27 import us.codecraft.webmagic.proxy.ProxyProvider;
28
29 public class DelayedProxyProvider implements ProxyProvider, Externalizable {
30
31 private final transient Logger log = LogManager.getLogger();
32
33 private final transient DelayQueue<DelayedProxy> proxies;
34
35 private final transient Map<Proxy, DelayedProxy> allProxies;
36
37 private long minSuccessDelay;
38
39 private long maxSuccessDelay;
40
41 private long minFailureDelay;
42
43 private long maxFailureDelay;
44
45 private long waitTimeout;
46
47 public DelayedProxyProvider() {
48 this(Duration.ZERO, Duration.ZERO, Duration.ZERO, Duration.ZERO);
49 }
50
51 public DelayedProxyProvider(
52 Duration minSuccessDelay,
53 Duration maxSuccessDelay,
54 Duration minFailureDelay,
55 Duration maxFailureDelay
56 ) {
57 this(
58 minSuccessDelay,
59 maxSuccessDelay,
60 minFailureDelay,
61 maxFailureDelay,
62 Duration.ZERO
63 );
64 }
65
66 public DelayedProxyProvider(
67 Duration minSuccessDelay,
68 Duration maxSuccessDelay,
69 Duration minFailureDelay,
70 Duration maxFailureDelay,
71 Duration waitTimeout
72 ) {
73 this.proxies = new DelayQueue<>();
74 this.allProxies = new HashMap<>();
75
76 final TimeUnit unit = TimeUnit.MILLISECONDS;
77
78 this.minSuccessDelay = unit.convert(minSuccessDelay);
79 this.maxSuccessDelay = unit.convert(maxSuccessDelay);
80
81 this.minFailureDelay = unit.convert(minFailureDelay);
82 this.maxFailureDelay = unit.convert(maxFailureDelay);
83
84 this.waitTimeout = unit.convert(waitTimeout);
85 }
86
87 @Override
88 public void returnProxy(Proxy proxy, Page page, Task task) {
89 final DelayedProxy delayedProxy = this.allProxies.get(proxy);
90
91 if (delayedProxy == null) {
92 throw new NoSuchElementException(String.format("No %s found.", proxy));
93 }
94
95 final boolean success = this.isSuccess(proxy, page, task);
96
97 if (success) {
98 delayedProxy.incrementAndGetSuccessCount();
99 } else {
100 delayedProxy.incrementAndGetFailureCount();
101 }
102
103 final Duration delay = this.getDelay(delayedProxy, page, task, success);
104 delayedProxy.setAvailableTime(Instant.now().plus(delay));
105
106 this.proxies.put(delayedProxy);
107
108 this.printInfo();
109 }
110
111 @Override
112 public Proxy getProxy(Request request, Task task) {
113 final Proxy proxy;
114
115 this.printInfo();
116
117 try {
118 if (this.waitTimeout > 0) {
119 DelayedProxy dp = this.proxies.poll(this.waitTimeout, TimeUnit.MILLISECONDS);
120
121 if (dp != null) {
122 proxy = dp.getProxy();
123 } else {
124 log.warn("Wait for proxy timed out.");
125 proxy = null;
126 }
127 } else {
128 proxy = this.proxies.take().getProxy();
129 }
130 } catch (InterruptedException e) {
131 Thread.currentThread().interrupt();
132 throw new IllegalStateException(e);
133 }
134
135 return proxy;
136 }
137
138 public synchronized void put(Proxy proxy) {
139 if (!this.allProxies.containsKey(proxy)) {
140 log.trace("Put proxy: {}.", proxy);
141
142 final DelayedProxy delayedProxy = new DelayedProxy(proxy);
143 this.allProxies.put(proxy, delayedProxy);
144 this.proxies.put(delayedProxy);
145
146 this.printInfo();
147 } else {
148 log.trace("Skipping put proxy: {}.", proxy);
149 }
150 }
151
152 public DelayQueue<DelayedProxy> getProxies() {
153 return proxies;
154 }
155
156 public Map<Proxy, DelayedProxy> getAllProxies() {
157 return Collections.unmodifiableMap(allProxies);
158 }
159
160 @Override
161 public void writeExternal(ObjectOutput out) throws IOException {
162 List<DelayedProxy> delayedProxies = new ArrayList<>(this.allProxies.values());
163 out.writeObject(delayedProxies);
164 out.writeLong(this.minSuccessDelay);
165 out.writeLong(this.maxSuccessDelay);
166 out.writeLong(this.minFailureDelay);
167 out.writeLong(this.maxFailureDelay);
168 }
169
170 @Override
171 public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
172 @SuppressWarnings("unchecked")
173 List<DelayedProxy> delayedProxies = (List<DelayedProxy>) in.readObject();
174 delayedProxies.forEach(dp -> this.allProxies.put(dp.getProxy(), dp));
175 this.minSuccessDelay = in.readLong();
176 this.maxSuccessDelay = in.readLong();
177 this.minFailureDelay = in.readLong();
178 this.maxFailureDelay = in.readLong();
179 this.proxies.addAll(this.allProxies.values());
180 }
181
182 protected Duration getDelay(DelayedProxy delayedProxy, Page page, Task task, boolean success) {
183 final long minDelay;
184 final long maxDelay;
185
186 if (success) {
187 minDelay = this.minSuccessDelay;
188 maxDelay = this.maxSuccessDelay;
189 } else {
190 minDelay = this.minFailureDelay;
191 maxDelay = this.maxFailureDelay;
192 }
193
194 final float delayFactor = this.getDelayFactor(delayedProxy, page, task, success);
195 final float amount = RandomUtils.nextLong(minDelay, maxDelay) * delayFactor;
196 final Duration delay = Duration.ofMillis((long) amount);
197
198 log.trace("Proxy: {}, success count: {}, failure count: {}, delayFactor: {} delay: {}",
199 delayedProxy::getProxy,
200 delayedProxy::getSuccessCount,
201 delayedProxy::getFailureCount,
202 () -> delayFactor,
203 () -> delay
204 );
205
206 return delay;
207 }
208
209 protected float getDelayFactor(DelayedProxy delayedProxy, Page page, Task task, boolean success) {
210 final long totalCount = delayedProxy.getSuccessCount() + delayedProxy.getFailureCount();
211 final float failureRate = totalCount != 0 ? (float) delayedProxy.getFailureCount() / (float) totalCount : 0;
212 return 1 + failureRate * delayedProxy.getFailureCount();
213 }
214
215 protected boolean isSuccess(Proxy proxy, Page page, Task task) {
216 boolean success = page.isDownloadSuccess()
217 && page.getStatusCode() >= 100
218 && page.getStatusCode() < 500;
219 log.trace("{} is {}.", proxy, success ? "success" : "failure");
220 return success;
221 }
222
223 protected void printInfo() {
224 if (log.isTraceEnabled()) {
225 String prefix = String.format("%1$32s\t%2$32s\t%3$8s\t%4$8s\t%5$16s%6$s", "Proxy", "Available Time", "Success", "Failure", "Delayed(ms)", System.lineSeparator());
226 String stat = this.proxies.stream().sorted()
227 .map(p -> String.format("%1$32s\t%2$32s\t%3$8d\t%4$8d\t%5$16d", p.getProxy(), p.getAvailableTime(), p.getSuccessCount(), p.getFailureCount(), p.getDelay(TimeUnit.MILLISECONDS)))
228 .collect(Collectors.joining(System.lineSeparator(), prefix, ""));
229 log.trace("\nAll proxy count: {}, proxy queue size: {}, expired delay available count: {}.\n{}",
230 this.allProxies.size(),
231 this.proxies.size(),
232 this.proxies.stream().filter(p -> p.getDelay(TimeUnit.MILLISECONDS) <= 0).count(),
233 stat
234 );
235 }
236 }
237
238 }