View Javadoc
1   package org.oxerr.webmagic.proxy;
2   
3   import java.io.Externalizable;
4   import java.io.IOException;
5   import java.io.ObjectInput;
6   import java.io.ObjectOutput;
7   import java.time.Duration;
8   import java.time.Instant;
9   import java.util.ArrayList;
10  import java.util.Collections;
11  import java.util.HashMap;
12  import java.util.List;
13  import java.util.Map;
14  import java.util.NoSuchElementException;
15  import java.util.concurrent.DelayQueue;
16  import java.util.concurrent.TimeUnit;
17  import java.util.stream.Collectors;
18  
19  import org.apache.commons.lang3.RandomUtils;
20  import org.apache.logging.log4j.LogManager;
21  import org.apache.logging.log4j.Logger;
22  
23  import us.codecraft.webmagic.Page;
24  import us.codecraft.webmagic.Request;
25  import us.codecraft.webmagic.Task;
26  import us.codecraft.webmagic.proxy.Proxy;
27  import us.codecraft.webmagic.proxy.ProxyProvider;
28  
29  public class DelayedProxyProvider implements ProxyProvider, Externalizable {
30  
31  	private final transient Logger log = LogManager.getLogger();
32  
33  	private final transient DelayQueue<DelayedProxy> proxies;
34  
35  	private final transient Map<Proxy, DelayedProxy> allProxies;
36  
37  	private long minSuccessDelay;
38  
39  	private long maxSuccessDelay;
40  
41  	private long minFailureDelay;
42  
43  	private long maxFailureDelay;
44  
45  	private long waitTimeout;
46  
47  	public DelayedProxyProvider() {
48  		this(Duration.ZERO, Duration.ZERO, Duration.ZERO, Duration.ZERO);
49  	}
50  
51  	public DelayedProxyProvider(
52  		Duration minSuccessDelay,
53  		Duration maxSuccessDelay,
54  		Duration minFailureDelay,
55  		Duration maxFailureDelay
56  	) {
57  		this(
58  			minSuccessDelay,
59  			maxSuccessDelay,
60  			minFailureDelay,
61  			maxFailureDelay,
62  			Duration.ZERO
63  		);
64  	}
65  
66  	public DelayedProxyProvider(
67  		Duration minSuccessDelay,
68  		Duration maxSuccessDelay,
69  		Duration minFailureDelay,
70  		Duration maxFailureDelay,
71  		Duration waitTimeout
72  	) {
73  		this.proxies = new DelayQueue<>();
74  		this.allProxies = new HashMap<>();
75  
76  		final TimeUnit unit = TimeUnit.MILLISECONDS;
77  
78  		this.minSuccessDelay = unit.convert(minSuccessDelay);
79  		this.maxSuccessDelay = unit.convert(maxSuccessDelay);
80  
81  		this.minFailureDelay = unit.convert(minFailureDelay);
82  		this.maxFailureDelay = unit.convert(maxFailureDelay);
83  
84  		this.waitTimeout = unit.convert(waitTimeout);
85  	}
86  
87  	@Override
88  	public void returnProxy(Proxy proxy, Page page, Task task) {
89  		final DelayedProxy delayedProxy = this.allProxies.get(proxy);
90  
91  		if (delayedProxy == null) {
92  			throw new NoSuchElementException(String.format("No %s found.", proxy));
93  		}
94  
95  		final boolean success = this.isSuccess(proxy, page, task);
96  
97  		if (success) {
98  			delayedProxy.incrementAndGetSuccessCount();
99  		} else {
100 			delayedProxy.incrementAndGetFailureCount();
101 		}
102 
103 		final Duration delay = this.getDelay(delayedProxy, page, task, success);
104 		delayedProxy.setAvailableTime(Instant.now().plus(delay));
105 
106 		this.proxies.put(delayedProxy);
107 
108 		this.printInfo();
109 	}
110 
111 	@Override
112 	public Proxy getProxy(Request request, Task task) {
113 		final Proxy proxy;
114 
115 		this.printInfo();
116 
117 		try {
118 			if (this.waitTimeout > 0) {
119 				DelayedProxy dp = this.proxies.poll(this.waitTimeout, TimeUnit.MILLISECONDS);
120 
121 				if (dp != null) {
122 					proxy = dp.getProxy();
123 				} else {
124 					log.warn("Wait for proxy timed out.");
125 					proxy = null;
126 				}
127 			} else {
128 				proxy = this.proxies.take().getProxy();
129 			}
130 		} catch (InterruptedException e) {
131 			Thread.currentThread().interrupt();
132 			throw new IllegalStateException(e);
133 		}
134 
135 		return proxy;
136 	}
137 
138 	public synchronized void put(Proxy proxy) {
139 		if (!this.allProxies.containsKey(proxy)) {
140 			log.trace("Put proxy: {}.", proxy);
141 
142 			final DelayedProxy delayedProxy = new DelayedProxy(proxy);
143 			this.allProxies.put(proxy, delayedProxy);
144 			this.proxies.put(delayedProxy);
145 
146 			this.printInfo();
147 		} else {
148 			log.trace("Skipping put proxy: {}.", proxy);
149 		}
150 	}
151 
152 	public DelayQueue<DelayedProxy> getProxies() {
153 		return proxies;
154 	}
155 
156 	public Map<Proxy, DelayedProxy> getAllProxies() {
157 		return Collections.unmodifiableMap(allProxies);
158 	}
159 
160 	@Override
161 	public void writeExternal(ObjectOutput out) throws IOException {
162 		List<DelayedProxy> delayedProxies = new ArrayList<>(this.allProxies.values());
163 		out.writeObject(delayedProxies);
164 		out.writeLong(this.minSuccessDelay);
165 		out.writeLong(this.maxSuccessDelay);
166 		out.writeLong(this.minFailureDelay);
167 		out.writeLong(this.maxFailureDelay);
168 	}
169 
170 	@Override
171 	public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
172 		@SuppressWarnings("unchecked")
173 		List<DelayedProxy> delayedProxies = (List<DelayedProxy>) in.readObject();
174 		delayedProxies.forEach(dp -> this.allProxies.put(dp.getProxy(), dp));
175 		this.minSuccessDelay = in.readLong();
176 		this.maxSuccessDelay = in.readLong();
177 		this.minFailureDelay = in.readLong();
178 		this.maxFailureDelay = in.readLong();
179 		this.proxies.addAll(this.allProxies.values());
180 	}
181 
182 	protected Duration getDelay(DelayedProxy delayedProxy, Page page, Task task, boolean success) {
183 		final long minDelay;
184 		final long maxDelay;
185 
186 		if (success) {
187 			minDelay = this.minSuccessDelay;
188 			maxDelay = this.maxSuccessDelay;
189 		} else {
190 			minDelay = this.minFailureDelay;
191 			maxDelay = this.maxFailureDelay;
192 		}
193 
194 		final float delayFactor = this.getDelayFactor(delayedProxy, page, task, success);
195 		final float amount = RandomUtils.nextLong(minDelay, maxDelay) * delayFactor;
196 		final Duration delay = Duration.ofMillis((long) amount);
197 
198 		log.trace("Proxy: {}, success count: {}, failure count: {}, delayFactor: {} delay: {}",
199 			delayedProxy::getProxy,
200 			delayedProxy::getSuccessCount,
201 			delayedProxy::getFailureCount,
202 			() -> delayFactor,
203 			() -> delay
204 		);
205 
206 		return delay;
207 	}
208 
209 	protected float getDelayFactor(DelayedProxy delayedProxy, Page page, Task task, boolean success) {
210 		final long totalCount = delayedProxy.getSuccessCount() + delayedProxy.getFailureCount();
211 		final float failureRate = totalCount != 0 ? (float) delayedProxy.getFailureCount() / (float) totalCount : 0;
212 		return 1 + failureRate * delayedProxy.getFailureCount();
213 	}
214 
215 	protected boolean isSuccess(Proxy proxy, Page page, Task task) {
216 		boolean success = page.isDownloadSuccess()
217 			&& page.getStatusCode() >= 100
218 			&& page.getStatusCode() < 500;
219 		log.trace("{} is {}.", proxy, success ? "success" : "failure");
220 		return success;
221 	}
222 
223 	protected void printInfo() {
224 		if (log.isTraceEnabled()) {
225 			String prefix = String.format("%1$32s\t%2$32s\t%3$8s\t%4$8s\t%5$16s%6$s", "Proxy", "Available Time", "Success", "Failure", "Delayed(ms)", System.lineSeparator());
226 			String stat = this.proxies.stream().sorted()
227 				.map(p -> String.format("%1$32s\t%2$32s\t%3$8d\t%4$8d\t%5$16d", p.getProxy(), p.getAvailableTime(), p.getSuccessCount(), p.getFailureCount(), p.getDelay(TimeUnit.MILLISECONDS)))
228 				.collect(Collectors.joining(System.lineSeparator(), prefix, ""));
229 			log.trace("\nAll proxy count: {}, proxy queue size: {}, expired delay available count: {}.\n{}",
230 				this.allProxies.size(),
231 				this.proxies.size(),
232 				this.proxies.stream().filter(p -> p.getDelay(TimeUnit.MILLISECONDS) <= 0).count(),
233 				stat
234 			);
235 		}
236 	}
237 
238 }