1 package org.oxerr.webmagic.proxy; 2 3 import java.io.Externalizable; 4 import java.io.IOException; 5 import java.io.ObjectInput; 6 import java.io.ObjectOutput; 7 import java.time.Duration; 8 import java.time.Instant; 9 import java.util.ArrayList; 10 import java.util.Collections; 11 import java.util.HashMap; 12 import java.util.List; 13 import java.util.Map; 14 import java.util.NoSuchElementException; 15 import java.util.concurrent.DelayQueue; 16 import java.util.concurrent.TimeUnit; 17 import java.util.stream.Collectors; 18 19 import org.apache.commons.lang3.RandomUtils; 20 import org.apache.logging.log4j.LogManager; 21 import org.apache.logging.log4j.Logger; 22 23 import us.codecraft.webmagic.Page; 24 import us.codecraft.webmagic.Request; 25 import us.codecraft.webmagic.Task; 26 import us.codecraft.webmagic.proxy.Proxy; 27 import us.codecraft.webmagic.proxy.ProxyProvider; 28 29 public class DelayedProxyProvider implements ProxyProvider, Externalizable { 30 31 private final transient Logger log = LogManager.getLogger(); 32 33 private final transient DelayQueue<DelayedProxy> proxies; 34 35 private final transient Map<Proxy, DelayedProxy> allProxies; 36 37 private long minSuccessDelay; 38 39 private long maxSuccessDelay; 40 41 private long minFailureDelay; 42 43 private long maxFailureDelay; 44 45 private long waitTimeout; 46 47 public DelayedProxyProvider() { 48 this(Duration.ZERO, Duration.ZERO, Duration.ZERO, Duration.ZERO); 49 } 50 51 public DelayedProxyProvider( 52 Duration minSuccessDelay, 53 Duration maxSuccessDelay, 54 Duration minFailureDelay, 55 Duration maxFailureDelay 56 ) { 57 this( 58 minSuccessDelay, 59 maxSuccessDelay, 60 minFailureDelay, 61 maxFailureDelay, 62 Duration.ZERO 63 ); 64 } 65 66 public DelayedProxyProvider( 67 Duration minSuccessDelay, 68 Duration maxSuccessDelay, 69 Duration minFailureDelay, 70 Duration maxFailureDelay, 71 Duration waitTimeout 72 ) { 73 this.proxies = new DelayQueue<>(); 74 this.allProxies = new HashMap<>(); 75 76 final TimeUnit unit = TimeUnit.MILLISECONDS; 77 78 this.minSuccessDelay = unit.convert(minSuccessDelay); 79 this.maxSuccessDelay = unit.convert(maxSuccessDelay); 80 81 this.minFailureDelay = unit.convert(minFailureDelay); 82 this.maxFailureDelay = unit.convert(maxFailureDelay); 83 84 this.waitTimeout = unit.convert(waitTimeout); 85 } 86 87 @Override 88 public void returnProxy(Proxy proxy, Page page, Task task) { 89 final DelayedProxy delayedProxy = this.allProxies.get(proxy); 90 91 if (delayedProxy == null) { 92 throw new NoSuchElementException(String.format("No %s found.", proxy)); 93 } 94 95 final boolean success = this.isSuccess(proxy, page, task); 96 97 if (success) { 98 delayedProxy.incrementAndGetSuccessCount(); 99 } else { 100 delayedProxy.incrementAndGetFailureCount(); 101 } 102 103 final Duration delay = this.getDelay(delayedProxy, page, task, success); 104 delayedProxy.setAvailableTime(Instant.now().plus(delay)); 105 106 this.proxies.put(delayedProxy); 107 108 this.printInfo(); 109 } 110 111 @Override 112 public Proxy getProxy(Request request, Task task) { 113 final Proxy proxy; 114 115 this.printInfo(); 116 117 try { 118 if (this.waitTimeout > 0) { 119 DelayedProxy dp = this.proxies.poll(this.waitTimeout, TimeUnit.MILLISECONDS); 120 121 if (dp != null) { 122 proxy = dp.getProxy(); 123 } else { 124 log.warn("Wait for proxy timed out."); 125 proxy = null; 126 } 127 } else { 128 proxy = this.proxies.take().getProxy(); 129 } 130 } catch (InterruptedException e) { 131 Thread.currentThread().interrupt(); 132 throw new IllegalStateException(e); 133 } 134 135 return proxy; 136 } 137 138 public synchronized void put(Proxy proxy) { 139 if (!this.allProxies.containsKey(proxy)) { 140 log.trace("Put proxy: {}.", proxy); 141 142 final DelayedProxy delayedProxy = new DelayedProxy(proxy); 143 this.allProxies.put(proxy, delayedProxy); 144 this.proxies.put(delayedProxy); 145 146 this.printInfo(); 147 } else { 148 log.trace("Skipping put proxy: {}.", proxy); 149 } 150 } 151 152 public DelayQueue<DelayedProxy> getProxies() { 153 return proxies; 154 } 155 156 public Map<Proxy, DelayedProxy> getAllProxies() { 157 return Collections.unmodifiableMap(allProxies); 158 } 159 160 @Override 161 public void writeExternal(ObjectOutput out) throws IOException { 162 List<DelayedProxy> delayedProxies = new ArrayList<>(this.allProxies.values()); 163 out.writeObject(delayedProxies); 164 out.writeLong(this.minSuccessDelay); 165 out.writeLong(this.maxSuccessDelay); 166 out.writeLong(this.minFailureDelay); 167 out.writeLong(this.maxFailureDelay); 168 } 169 170 @Override 171 public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { 172 @SuppressWarnings("unchecked") 173 List<DelayedProxy> delayedProxies = (List<DelayedProxy>) in.readObject(); 174 delayedProxies.forEach(dp -> this.allProxies.put(dp.getProxy(), dp)); 175 this.minSuccessDelay = in.readLong(); 176 this.maxSuccessDelay = in.readLong(); 177 this.minFailureDelay = in.readLong(); 178 this.maxFailureDelay = in.readLong(); 179 this.proxies.addAll(this.allProxies.values()); 180 } 181 182 protected Duration getDelay(DelayedProxy delayedProxy, Page page, Task task, boolean success) { 183 final long minDelay; 184 final long maxDelay; 185 186 if (success) { 187 minDelay = this.minSuccessDelay; 188 maxDelay = this.maxSuccessDelay; 189 } else { 190 minDelay = this.minFailureDelay; 191 maxDelay = this.maxFailureDelay; 192 } 193 194 final float delayFactor = this.getDelayFactor(delayedProxy, page, task, success); 195 final float amount = RandomUtils.nextLong(minDelay, maxDelay) * delayFactor; 196 final Duration delay = Duration.ofMillis((long) amount); 197 198 log.trace("Proxy: {}, success count: {}, failure count: {}, delayFactor: {} delay: {}", 199 delayedProxy::getProxy, 200 delayedProxy::getSuccessCount, 201 delayedProxy::getFailureCount, 202 () -> delayFactor, 203 () -> delay 204 ); 205 206 return delay; 207 } 208 209 protected float getDelayFactor(DelayedProxy delayedProxy, Page page, Task task, boolean success) { 210 final long totalCount = delayedProxy.getSuccessCount() + delayedProxy.getFailureCount(); 211 final float failureRate = totalCount != 0 ? (float) delayedProxy.getFailureCount() / (float) totalCount : 0; 212 return 1 + failureRate * delayedProxy.getFailureCount(); 213 } 214 215 protected boolean isSuccess(Proxy proxy, Page page, Task task) { 216 boolean success = page.isDownloadSuccess() 217 && page.getStatusCode() >= 100 218 && page.getStatusCode() < 500; 219 log.trace("{} is {}.", proxy, success ? "success" : "failure"); 220 return success; 221 } 222 223 protected void printInfo() { 224 if (log.isTraceEnabled()) { 225 String prefix = String.format("%1$32s\t%2$32s\t%3$8s\t%4$8s\t%5$16s%6$s", "Proxy", "Available Time", "Success", "Failure", "Delayed(ms)", System.lineSeparator()); 226 String stat = this.proxies.stream().sorted() 227 .map(p -> String.format("%1$32s\t%2$32s\t%3$8d\t%4$8d\t%5$16d", p.getProxy(), p.getAvailableTime(), p.getSuccessCount(), p.getFailureCount(), p.getDelay(TimeUnit.MILLISECONDS))) 228 .collect(Collectors.joining(System.lineSeparator(), prefix, "")); 229 log.trace("\nAll proxy count: {}, proxy queue size: {}, expired delay available count: {}.\n{}", 230 this.allProxies.size(), 231 this.proxies.size(), 232 this.proxies.stream().filter(p -> p.getDelay(TimeUnit.MILLISECONDS) <= 0).count(), 233 stat 234 ); 235 } 236 } 237 238 }