1 package org.oxerr.webmagic.proxy; 2 3 import java.io.Externalizable; 4 import java.io.IOException; 5 import java.io.ObjectInput; 6 import java.io.ObjectOutput; 7 import java.time.Duration; 8 import java.time.Instant; 9 import java.util.ArrayList; 10 import java.util.Collections; 11 import java.util.HashMap; 12 import java.util.List; 13 import java.util.Map; 14 import java.util.concurrent.DelayQueue; 15 import java.util.concurrent.TimeUnit; 16 import java.util.stream.Collectors; 17 18 import org.apache.commons.lang3.RandomUtils; 19 import org.apache.logging.log4j.LogManager; 20 import org.apache.logging.log4j.Logger; 21 22 import us.codecraft.webmagic.Page; 23 import us.codecraft.webmagic.Task; 24 import us.codecraft.webmagic.proxy.Proxy; 25 import us.codecraft.webmagic.proxy.ProxyProvider; 26 27 public class DelayedProxyProvider implements ProxyProvider, Externalizable { 28 29 private final transient Logger log = LogManager.getLogger(); 30 31 private final transient DelayQueue<DelayedProxy> proxies; 32 33 private final transient Map<Proxy, DelayedProxy> allProxies; 34 35 private long minSuccessDelay; 36 37 private long maxSuccessDelay; 38 39 private long minFailureDelay; 40 41 private long maxFailureDelay; 42 43 public DelayedProxyProvider() { 44 this(Duration.ZERO, Duration.ZERO, Duration.ZERO, Duration.ZERO); 45 } 46 47 public DelayedProxyProvider( 48 Duration minSuccessDelay, 49 Duration maxSuccessDelay, 50 Duration minFailureDelay, 51 Duration maxFailureDelay 52 ) { 53 this.proxies = new DelayQueue<>(); 54 this.allProxies = new HashMap<>(); 55 56 final TimeUnit unit = TimeUnit.MILLISECONDS; 57 58 this.minSuccessDelay = unit.convert(minSuccessDelay); 59 this.maxSuccessDelay = unit.convert(maxSuccessDelay); 60 61 this.minFailureDelay = unit.convert(minFailureDelay); 62 this.maxFailureDelay = unit.convert(maxFailureDelay); 63 } 64 65 @Override 66 public void returnProxy(Proxy proxy, Page page, Task task) { 67 final DelayedProxy delayedProxy = this.allProxies.get(proxy); 68 final boolean success = this.isSuccess(proxy, page, task); 69 70 if (success) { 71 delayedProxy.incrementAndGetSuccessCount(); 72 } else { 73 delayedProxy.incrementAndGetFailureCount(); 74 } 75 76 final Duration delay = this.getDelay(delayedProxy, page, task, success); 77 delayedProxy.setAvailableTime(Instant.now().plus(delay)); 78 79 this.proxies.put(delayedProxy); 80 81 this.printInfo(); 82 } 83 84 @Override 85 public Proxy getProxy(Task task) { 86 this.printInfo(); 87 88 try { 89 return this.proxies.take().getProxy(); 90 } catch (InterruptedException e) { 91 Thread.currentThread().interrupt(); 92 throw new IllegalStateException(e); 93 } 94 } 95 96 public synchronized void put(Proxy proxy) { 97 if (!this.allProxies.containsKey(proxy)) { 98 log.trace("Put proxy: {}.", proxy); 99 100 final DelayedProxy delayedProxy = new DelayedProxy(proxy); 101 this.allProxies.put(proxy, delayedProxy); 102 this.proxies.put(delayedProxy); 103 104 this.printInfo(); 105 } else { 106 log.trace("Skipping put proxy: {}.", proxy); 107 } 108 } 109 110 public DelayQueue<DelayedProxy> getProxies() { 111 return proxies; 112 } 113 114 public Map<Proxy, DelayedProxy> getAllProxies() { 115 return Collections.unmodifiableMap(allProxies); 116 } 117 118 @Override 119 public void writeExternal(ObjectOutput out) throws IOException { 120 List<DelayedProxy> delayedProxies = new ArrayList<>(this.allProxies.values()); 121 out.writeObject(delayedProxies); 122 out.writeLong(this.minSuccessDelay); 123 out.writeLong(this.maxSuccessDelay); 124 out.writeLong(this.minFailureDelay); 125 out.writeLong(this.maxFailureDelay); 126 } 127 128 @Override 129 public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { 130 @SuppressWarnings("unchecked") 131 List<DelayedProxy> delayedProxies = (List<DelayedProxy>) in.readObject(); 132 delayedProxies.forEach(dp -> this.allProxies.put(dp.getProxy(), dp)); 133 this.minSuccessDelay = in.readLong(); 134 this.maxSuccessDelay = in.readLong(); 135 this.minFailureDelay = in.readLong(); 136 this.maxFailureDelay = in.readLong(); 137 this.proxies.addAll(this.allProxies.values()); 138 } 139 140 protected Duration getDelay(DelayedProxy delayedProxy, Page page, Task task, boolean success) { 141 final long minDelay; 142 final long maxDelay; 143 144 if (success) { 145 minDelay = this.minSuccessDelay; 146 maxDelay = this.maxSuccessDelay; 147 } else { 148 minDelay = this.minFailureDelay; 149 maxDelay = this.maxFailureDelay; 150 } 151 152 final float delayFactor = this.getDelayFactor(delayedProxy, page, task, success); 153 final float amount = RandomUtils.nextLong(minDelay, maxDelay) * delayFactor; 154 final Duration delay = Duration.ofMillis((long) amount); 155 156 log.trace("Proxy: {}, success count: {}, failure count: {}, delayFactor: {} delay: {}", 157 delayedProxy::getProxy, 158 delayedProxy::getSuccessCount, 159 delayedProxy::getFailureCount, 160 () -> delayFactor, 161 () -> delay 162 ); 163 164 return delay; 165 } 166 167 protected float getDelayFactor(DelayedProxy delayedProxy, Page page, Task task, boolean success) { 168 final long totalCount = delayedProxy.getSuccessCount() + delayedProxy.getFailureCount(); 169 final float failureRate = totalCount != 0 ? (float) delayedProxy.getFailureCount() / (float) totalCount : 0; 170 return 1 + failureRate * delayedProxy.getFailureCount(); 171 } 172 173 protected boolean isSuccess(Proxy proxy, Page page, Task task) { 174 boolean success = page.isDownloadSuccess() 175 && page.getStatusCode() >= 100 176 && page.getStatusCode() < 500; 177 log.trace("{} is {}.", proxy, success ? "success" : "failure"); 178 return success; 179 } 180 181 protected void printInfo() { 182 if (log.isTraceEnabled()) { 183 String prefix = String.format("%1$32s\t%2$32s\t%3$8s\t%4$8s\t%5$16s%6$s", "Proxy", "Available Time", "Success", "Failure", "Delayed(ms)", System.lineSeparator()); 184 String stat = this.proxies.stream().sorted() 185 .map(p -> String.format("%1$32s\t%2$32s\t%3$8d\t%4$8d\t%5$16d", p.getProxy(), p.getAvailableTime(), p.getSuccessCount(), p.getFailureCount(), p.getDelay(TimeUnit.MILLISECONDS))) 186 .collect(Collectors.joining(System.lineSeparator(), prefix, "")); 187 log.trace("\nAll proxy count: {}, proxy queue size: {}, expired delay available count: {}.\n{}", 188 this.allProxies.size(), 189 this.proxies.size(), 190 this.proxies.stream().filter(p -> p.getDelay(TimeUnit.MILLISECONDS) <= 0).count(), 191 stat 192 ); 193 } 194 } 195 196 }