View Javadoc
1   package org.oxerr.webmagic.proxy;
2   
3   import java.io.Externalizable;
4   import java.io.IOException;
5   import java.io.ObjectInput;
6   import java.io.ObjectOutput;
7   import java.time.Duration;
8   import java.time.Instant;
9   import java.util.ArrayList;
10  import java.util.Collections;
11  import java.util.HashMap;
12  import java.util.List;
13  import java.util.Map;
14  import java.util.concurrent.DelayQueue;
15  import java.util.concurrent.TimeUnit;
16  import java.util.stream.Collectors;
17  
18  import org.apache.commons.lang3.RandomUtils;
19  import org.apache.logging.log4j.LogManager;
20  import org.apache.logging.log4j.Logger;
21  
22  import us.codecraft.webmagic.Page;
23  import us.codecraft.webmagic.Task;
24  import us.codecraft.webmagic.proxy.Proxy;
25  import us.codecraft.webmagic.proxy.ProxyProvider;
26  
27  public class DelayedProxyProvider implements ProxyProvider, Externalizable {
28  
29  	private final transient Logger log = LogManager.getLogger();
30  
31  	private final transient DelayQueue<DelayedProxy> proxies;
32  
33  	private final transient Map<Proxy, DelayedProxy> allProxies;
34  
35  	private long minSuccessDelay;
36  
37  	private long maxSuccessDelay;
38  
39  	private long minFailureDelay;
40  
41  	private long maxFailureDelay;
42  
43  	public DelayedProxyProvider() {
44  		this(Duration.ZERO, Duration.ZERO, Duration.ZERO, Duration.ZERO);
45  	}
46  
47  	public DelayedProxyProvider(
48  		Duration minSuccessDelay,
49  		Duration maxSuccessDelay,
50  		Duration minFailureDelay,
51  		Duration maxFailureDelay
52  	) {
53  		this.proxies = new DelayQueue<>();
54  		this.allProxies = new HashMap<>();
55  
56  		final TimeUnit unit = TimeUnit.MILLISECONDS;
57  
58  		this.minSuccessDelay = unit.convert(minSuccessDelay);
59  		this.maxSuccessDelay = unit.convert(maxSuccessDelay);
60  
61  		this.minFailureDelay = unit.convert(minFailureDelay);
62  		this.maxFailureDelay = unit.convert(maxFailureDelay);
63  	}
64  
65  	@Override
66  	public void returnProxy(Proxy proxy, Page page, Task task) {
67  		final DelayedProxy delayedProxy = this.allProxies.get(proxy);
68  		final boolean success = this.isSuccess(proxy, page, task);
69  
70  		if (success) {
71  			delayedProxy.incrementAndGetSuccessCount();
72  		} else {
73  			delayedProxy.incrementAndGetFailureCount();
74  		}
75  
76  		final Duration delay = this.getDelay(delayedProxy, page, task, success);
77  		delayedProxy.setAvailableTime(Instant.now().plus(delay));
78  
79  		this.proxies.put(delayedProxy);
80  
81  		this.printInfo();
82  	}
83  
84  	@Override
85  	public Proxy getProxy(Task task) {
86  		this.printInfo();
87  
88  		try {
89  			return this.proxies.take().getProxy();
90  		} catch (InterruptedException e) {
91  			Thread.currentThread().interrupt();
92  			throw new IllegalStateException(e);
93  		}
94  	}
95  
96  	public synchronized void put(Proxy proxy) {
97  		if (!this.allProxies.containsKey(proxy)) {
98  			log.trace("Put proxy: {}.", proxy);
99  
100 			final DelayedProxy delayedProxy = new DelayedProxy(proxy);
101 			this.allProxies.put(proxy, delayedProxy);
102 			this.proxies.put(delayedProxy);
103 
104 			this.printInfo();
105 		} else {
106 			log.trace("Skipping put proxy: {}.", proxy);
107 		}
108 	}
109 
110 	public DelayQueue<DelayedProxy> getProxies() {
111 		return proxies;
112 	}
113 
114 	public Map<Proxy, DelayedProxy> getAllProxies() {
115 		return Collections.unmodifiableMap(allProxies);
116 	}
117 
118 	@Override
119 	public void writeExternal(ObjectOutput out) throws IOException {
120 		List<DelayedProxy> delayedProxies = new ArrayList<>(this.allProxies.values());
121 		out.writeObject(delayedProxies);
122 		out.writeLong(this.minSuccessDelay);
123 		out.writeLong(this.maxSuccessDelay);
124 		out.writeLong(this.minFailureDelay);
125 		out.writeLong(this.maxFailureDelay);
126 	}
127 
128 	@Override
129 	public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
130 		@SuppressWarnings("unchecked")
131 		List<DelayedProxy> delayedProxies = (List<DelayedProxy>) in.readObject();
132 		delayedProxies.forEach(dp -> this.allProxies.put(dp.getProxy(), dp));
133 		this.minSuccessDelay = in.readLong();
134 		this.maxSuccessDelay = in.readLong();
135 		this.minFailureDelay = in.readLong();
136 		this.maxFailureDelay = in.readLong();
137 		this.proxies.addAll(this.allProxies.values());
138 	}
139 
140 	protected Duration getDelay(DelayedProxy delayedProxy, Page page, Task task, boolean success) {
141 		final long minDelay;
142 		final long maxDelay;
143 
144 		if (success) {
145 			minDelay = this.minSuccessDelay;
146 			maxDelay = this.maxSuccessDelay;
147 		} else {
148 			minDelay = this.minFailureDelay;
149 			maxDelay = this.maxFailureDelay;
150 		}
151 
152 		final float delayFactor = this.getDelayFactor(delayedProxy, page, task, success);
153 		final float amount = RandomUtils.nextLong(minDelay, maxDelay) * delayFactor;
154 		final Duration delay = Duration.ofMillis((long) amount);
155 
156 		log.trace("Proxy: {}, success count: {}, failure count: {}, delayFactor: {} delay: {}",
157 			delayedProxy::getProxy,
158 			delayedProxy::getSuccessCount,
159 			delayedProxy::getFailureCount,
160 			() -> delayFactor,
161 			() -> delay
162 		);
163 
164 		return delay;
165 	}
166 
167 	protected float getDelayFactor(DelayedProxy delayedProxy, Page page, Task task, boolean success) {
168 		final long totalCount = delayedProxy.getSuccessCount() + delayedProxy.getFailureCount();
169 		final float failureRate = totalCount != 0 ? (float) delayedProxy.getFailureCount() / (float) totalCount : 0;
170 		return 1 + failureRate * delayedProxy.getFailureCount();
171 	}
172 
173 	protected boolean isSuccess(Proxy proxy, Page page, Task task) {
174 		boolean success = page.isDownloadSuccess()
175 			&& page.getStatusCode() >= 100
176 			&& page.getStatusCode() < 500;
177 		log.trace("{} is {}.", proxy, success ? "success" : "failure");
178 		return success;
179 	}
180 
181 	protected void printInfo() {
182 		if (log.isTraceEnabled()) {
183 			String prefix = String.format("%1$32s\t%2$32s\t%3$8s\t%4$8s\t%5$16s%6$s", "Proxy", "Available Time", "Success", "Failure", "Delayed(ms)", System.lineSeparator());
184 			String stat = this.proxies.stream().sorted()
185 				.map(p -> String.format("%1$32s\t%2$32s\t%3$8d\t%4$8d\t%5$16d", p.getProxy(), p.getAvailableTime(), p.getSuccessCount(), p.getFailureCount(), p.getDelay(TimeUnit.MILLISECONDS)))
186 				.collect(Collectors.joining(System.lineSeparator(), prefix, ""));
187 			log.trace("\nAll proxy count: {}, proxy queue size: {}, expired delay available count: {}.\n{}",
188 				this.allProxies.size(),
189 				this.proxies.size(),
190 				this.proxies.stream().filter(p -> p.getDelay(TimeUnit.MILLISECONDS) <= 0).count(),
191 				stat
192 			);
193 		}
194 	}
195 
196 }