View Javadoc
1   package org.oxerr.viagogo.client.cached.redisson.inventory;
2   
3   import java.io.IOException;
4   import java.time.Duration;
5   import java.util.ArrayList;
6   import java.util.Collections;
7   import java.util.List;
8   import java.util.Objects;
9   import java.util.Optional;
10  import java.util.Random;
11  import java.util.Set;
12  import java.util.concurrent.CompletableFuture;
13  import java.util.concurrent.Executor;
14  import java.util.function.Function;
15  import java.util.function.Supplier;
16  import java.util.stream.Collectors;
17  
18  import javax.annotation.Nonnull;
19  import javax.annotation.Nullable;
20  
21  import org.apache.commons.lang3.ThreadUtils;
22  import org.apache.commons.lang3.time.StopWatch;
23  import org.apache.logging.log4j.LogManager;
24  import org.apache.logging.log4j.Logger;
25  import org.oxerr.ticket.inventory.support.cached.redisson.ListingConfiguration;
26  import org.oxerr.ticket.inventory.support.cached.redisson.RedissonCachedListingServiceSupport;
27  import org.oxerr.ticket.inventory.support.cached.redisson.Status;
28  import org.oxerr.viagogo.client.cached.inventory.CachedSellerListingsService;
29  import org.oxerr.viagogo.client.cached.inventory.ViagogoEvent;
30  import org.oxerr.viagogo.client.cached.inventory.ViagogoListing;
31  import org.oxerr.viagogo.client.inventory.SellerListingService;
32  import org.oxerr.viagogo.model.request.inventory.CreateSellerListingRequest;
33  import org.oxerr.viagogo.model.request.inventory.SellerListingRequest;
34  import org.oxerr.viagogo.model.response.PagedResource;
35  import org.oxerr.viagogo.model.response.inventory.SellerListing;
36  import org.redisson.api.RedissonClient;
37  
38  public class RedissonCachedSellerListingsService
39  	extends RedissonCachedListingServiceSupport<String, String, CreateSellerListingRequest, ViagogoListing, ViagogoEvent, ViagogoCachedListing>
40  	implements CachedSellerListingsService {
41  
42  	private final Logger log = LogManager.getLogger();
43  
44  	private final SellerListingService sellerListingsService;
45  
46  	private final int pageSize;
47  
48  	private final RetryConfiguration retryConfig;
49  
50  	@Deprecated(since = "5.0.0", forRemoval = true)
51  	public RedissonCachedSellerListingsService(
52  		SellerListingService sellerListingsService,
53  		RedissonClient redissonClient,
54  		String keyPrefix,
55  		Executor executor,
56  		boolean create
57  	) {
58  		this(sellerListingsService, redissonClient, keyPrefix, executor, new ListingConfiguration(create, true, true), 10_000, new RetryConfiguration());
59  	}
60  
61  	/**
62  	 * Constructs with default {@link ListingConfiguration} and default {@link RetryConfiguration}.
63  	 *
64  	 * @param sellerListingsService the seller listings service.
65  	 * @param redissonClient the redisson client.
66  	 * @param keyPrefix the key prefix for the cache.
67  	 * @param executor the executor.
68  	 *
69  	 * @since 5.0.0
70  	 */
71  	public RedissonCachedSellerListingsService(
72  		SellerListingService sellerListingsService,
73  		RedissonClient redissonClient,
74  		String keyPrefix,
75  		Executor executor
76  	) {
77  		this(sellerListingsService, redissonClient, keyPrefix, executor, new ListingConfiguration(), 10_000, new RetryConfiguration());
78  	}
79  
80  	/**
81  	 * Constructs with specified {@link ListingConfiguration} and specified {@link RetryConfiguration}.
82  	 *
83  	 * @param sellerListingsService the seller listings service.
84  	 * @param redissonClient the redisson client.
85  	 * @param keyPrefix the key prefix for the cache.
86  	 * @param executor the executor.
87  	 * @param listingConfiguration the listing configuration.
88  	 * @param pageSize the page size when do check.
89  	 * @param retryConfiguration the retry configuration.
90  	 *
91  	 * @since 5.0.0
92  	 */
93  	public RedissonCachedSellerListingsService(
94  		SellerListingService sellerListingsService,
95  		RedissonClient redissonClient,
96  		String keyPrefix,
97  		Executor executor,
98  		ListingConfiguration listingConfiguration,
99  		int pageSize,
100 		RetryConfiguration retryConfiguration
101 	) {
102 		super(redissonClient, keyPrefix, executor, listingConfiguration);
103 		this.sellerListingsService = sellerListingsService;
104 		this.pageSize = pageSize;
105 		this.retryConfig = retryConfiguration;
106 	}
107 
108 	@Override
109 	protected boolean shouldCreate(
110 		@Nonnull ViagogoEvent event,
111 		@Nonnull ViagogoListing listing,
112 		@Nullable ViagogoCachedListing cachedListing
113 	) {
114 		boolean shouldCreate = super.shouldCreate(event, listing, cachedListing);
115 		return shouldCreate || (cachedListing != null && !listing.getViagogoEventId().equals(cachedListing.getViagogoEventId()));
116 	}
117 
118 	@Override
119 	protected boolean shouldUpdate(
120 		@Nonnull ViagogoEvent event,
121 		@Nonnull ViagogoListing listing,
122 		@Nullable ViagogoCachedListing cachedListing
123 	) {
124 		boolean shouldUpdate = super.shouldUpdate(event, listing, cachedListing);
125 		return shouldUpdate || (cachedListing != null && !listing.getViagogoEventId().equals(cachedListing.getViagogoEventId()));
126 	}
127 
128 	@Override
129 	protected int getPriority(
130 		@Nonnull ViagogoEvent event,
131 		@Nullable ViagogoListing listing,
132 		@Nullable ViagogoCachedListing cachedListing
133 	) {
134 		if (listing == null || cachedListing == null) {
135 			return 0;
136 		}
137 
138 		int priority = 0;
139 
140 		var r = listing.getRequest();
141 		var cr = cachedListing.getRequest();
142 
143 		priority += Objects.equals(r.getNumberOfTickets(), cr.getNumberOfTickets()) ? 0 : 1;
144 		priority += Objects.equals(r.getSeating(), cr.getSeating()) ? 0 : 1;
145 		priority += Objects.equals(r.getNotes(), cr.getNotes()) ? 0 : 1;
146 
147 		return priority;
148 	}
149 
150 	@Override
151 	protected boolean shouldDelete(
152 		@Nonnull ViagogoEvent event,
153 		@Nonnull Set<String> inventoryListingIds,
154 		@Nonnull String listingId,
155 		@Nonnull ViagogoCachedListing cachedListing
156 	) {
157 		return super.shouldDelete(event, inventoryListingIds, listingId, cachedListing)
158 			|| !event.getViagogoEventId().equals(cachedListing.getViagogoEventId());
159 	}
160 
161 	@Override
162 	protected void createListing(ViagogoEvent event, ViagogoListing listing) throws IOException {
163 		this.sellerListingsService.createListing(listing.getViagogoEventId(), listing.getRequest());
164 	}
165 
166 	@Override
167 	protected void deleteListing(ViagogoEvent event, String listingId) throws IOException {
168 		this.sellerListingsService.deleteListingByExternalListingId(listingId);
169 	}
170 
171 	@Override
172 	protected ViagogoCachedListing toCached(ViagogoEvent event, ViagogoListing listing, Status status) {
173 		return new ViagogoCachedListing(listing, status);
174 	}
175 
176 	@Override
177 	public void check() {
178 		log.info("[check] begin.");
179 
180 		StopWatch stopWatch = StopWatch.createStarted();
181 
182 		// The external IDs in cache.
183 		var externalIds = this.getExternalIds();
184 
185 		var deleting = Collections.synchronizedList(new ArrayList<CompletableFuture<Void>>());
186 
187 		// Check the first page.
188 		var listings = this.check(request(1), externalIds, deleting).join();
189 
190 		// Check the next page to the last page.
191 		log.debug("[check] total items: {}, next link: {}, last link: {}",
192 			listings.getTotalItems(), listings.getNextLink(), listings.getLastLink());
193 
194 		// When only 1 page left, the next link and last link is null.
195 		var next = Optional.ofNullable(listings.getNextLink()).map(SellerListingRequest::from);
196 		var last = Optional.ofNullable(listings.getLastLink()).map(SellerListingRequest::from);
197 
198 		var checking = new ArrayList<CompletableFuture<PagedResource<SellerListing>>>();
199 
200 		if (next.isPresent() && last.isPresent()) {
201 			for(int i = next.get().getPage(); i <= last.get().getPage(); i++) {
202 				checking.add(this.check(request(i), externalIds, deleting));
203 			}
204 		}
205 
206 		// Wait all checking to complete.
207 		log.debug("[check] checking size: {}", checking.size());
208 		CompletableFuture.allOf(checking.toArray(CompletableFuture[]::new)).join();
209 
210 		// Wait all deleting to complete.
211 		log.debug("[check] deleting size: {}", deleting.size());
212 		CompletableFuture.allOf(deleting.toArray(CompletableFuture[]::new)).join();
213 
214 		stopWatch.stop();
215 		log.info("[check] end. Checked {} items in {}", listings.getTotalItems(), stopWatch);
216 	}
217 
218 	private Set<String> getExternalIds() {
219 		var externalIds = this.getCacheNamesStream()
220 			.map(name -> this.getCache(name).keySet().stream())
221 			.flatMap(Function.identity())
222 			.collect(Collectors.toUnmodifiableSet());
223 		log.debug("[check] externalIds size: {}", externalIds.size());
224 		return externalIds;
225 	}
226 
227 	private CompletableFuture<PagedResource<SellerListing>> check(
228 		SellerListingRequest request,
229 		Set<String> externalIds,
230 		List<CompletableFuture<Void>> deleting
231 	) {
232 		return this.<PagedResource<SellerListing>>callAsync(() -> {
233 			var page = this.getSellerListings(request);
234 			deleting.addAll(this.check(page, externalIds));
235 			log.debug("[check] page: {}, deleting size: {}", request.getPage(), deleting.size());
236 			return page;
237 		});
238 	}
239 
240 	private List<CompletableFuture<Void>> check(
241 		PagedResource<SellerListing> page,
242 		Set<String> externalIds
243 	) {
244 		return page.getItems().stream()
245 			.filter(listing -> !externalIds.contains(listing.getExternalId()))
246 			.map(listing -> this.<Void>callAsync(() -> {
247 				this.sellerListingsService.deleteListingByExternalListingId(listing.getExternalId());
248 				return null;
249 			})).collect(Collectors.toUnmodifiableList());
250 	}
251 
252 	private PagedResource<SellerListing> getSellerListings(SellerListingRequest request) {
253 		return this.retry(() -> {
254 			try {
255 				return this.sellerListingsService.getSellerListings(request);
256 			} catch (IOException e) {
257 				throw new RetryableException(e);
258 			}
259 		});
260 	}
261 
262 	private SellerListingRequest request(int page) {
263 		var r = new SellerListingRequest();
264 		r.setSort(SellerListingRequest.Sort.EVENT_DATE);
265 		r.setPage(page);
266 		r.setPageSize(pageSize);
267 		return r;
268 	}
269 
270 	private final Random random = new Random();
271 
272 	private <T> T retry(Supplier<T> supplier) {
273 		int attempts = 0;
274 
275 		T t = null;
276 
277 		try {
278 			t = supplier.get();
279 		} catch (RetryableException e) {
280 			if (++attempts < retryConfig.getMaxAttempts()) {
281 				long delay = random.nextInt(retryConfig.getMaxDelay());
282 				sleep(delay);
283 			} else {
284 				log.debug("attempts: {}", attempts);
285 				throw e;
286 			}
287 		}
288 
289 		return t;
290 	}
291 
292 	private void sleep(long millis) {
293 		if (millis < 0) {
294 			return;
295 		}
296 
297 		log.debug("sleeping {}", millis);
298 		ThreadUtils.sleepQuietly(Duration.ofMillis(millis));
299 	}
300 
301 	private static class RetryableException extends RuntimeException {
302 
303 		private static final long serialVersionUID = 2023120801L;
304 
305 		public RetryableException(Throwable cause) {
306 			super(cause);
307 		}
308 
309 	}
310 
311 }