RedissonCachedSellerListingsService.java

package org.oxerr.viagogo.client.cached.redisson.inventory;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import org.apache.commons.lang3.ThreadUtils;
import org.apache.commons.lang3.time.StopWatch;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.oxerr.ticket.inventory.support.cached.redisson.ListingConfiguration;
import org.oxerr.ticket.inventory.support.cached.redisson.RedissonCachedListingServiceSupport;
import org.oxerr.ticket.inventory.support.cached.redisson.Status;
import org.oxerr.viagogo.client.cached.inventory.CachedSellerListingsService;
import org.oxerr.viagogo.client.cached.inventory.ViagogoEvent;
import org.oxerr.viagogo.client.cached.inventory.ViagogoListing;
import org.oxerr.viagogo.client.inventory.SellerListingService;
import org.oxerr.viagogo.model.request.inventory.CreateSellerListingRequest;
import org.oxerr.viagogo.model.request.inventory.SellerListingRequest;
import org.oxerr.viagogo.model.response.PagedResource;
import org.oxerr.viagogo.model.response.inventory.SellerListing;
import org.redisson.api.RedissonClient;

public class RedissonCachedSellerListingsService
	extends RedissonCachedListingServiceSupport<String, String, CreateSellerListingRequest, ViagogoListing, ViagogoEvent, ViagogoCachedListing>
	implements CachedSellerListingsService {

	private final Logger log = LogManager.getLogger();

	private final SellerListingService sellerListingsService;

	private final int pageSize;

	private final RetryConfiguration retryConfig;

	@Deprecated(since = "5.0.0", forRemoval = true)
	public RedissonCachedSellerListingsService(
		SellerListingService sellerListingsService,
		RedissonClient redissonClient,
		String keyPrefix,
		Executor executor,
		boolean create
	) {
		this(sellerListingsService, redissonClient, keyPrefix, executor, new ListingConfiguration(create, true, true), 10_000, new RetryConfiguration());
	}

	/**
	 * Constructs with default {@link ListingConfiguration} and default {@link RetryConfiguration}.
	 *
	 * @param sellerListingsService the seller listings service.
	 * @param redissonClient the redisson client.
	 * @param keyPrefix the key prefix for the cache.
	 * @param executor the executor.
	 *
	 * @since 5.0.0
	 */
	public RedissonCachedSellerListingsService(
		SellerListingService sellerListingsService,
		RedissonClient redissonClient,
		String keyPrefix,
		Executor executor
	) {
		this(sellerListingsService, redissonClient, keyPrefix, executor, new ListingConfiguration(), 10_000, new RetryConfiguration());
	}

	/**
	 * Constructs with specified {@link ListingConfiguration} and specified {@link RetryConfiguration}.
	 *
	 * @param sellerListingsService the seller listings service.
	 * @param redissonClient the redisson client.
	 * @param keyPrefix the key prefix for the cache.
	 * @param executor the executor.
	 * @param listingConfiguration the listing configuration.
	 * @param pageSize the page size when do check.
	 * @param retryConfiguration the retry configuration.
	 *
	 * @since 5.0.0
	 */
	public RedissonCachedSellerListingsService(
		SellerListingService sellerListingsService,
		RedissonClient redissonClient,
		String keyPrefix,
		Executor executor,
		ListingConfiguration listingConfiguration,
		int pageSize,
		RetryConfiguration retryConfiguration
	) {
		super(redissonClient, keyPrefix, executor, listingConfiguration);
		this.sellerListingsService = sellerListingsService;
		this.pageSize = pageSize;
		this.retryConfig = retryConfiguration;
	}

	@Override
	protected boolean shouldCreate(
		@Nonnull ViagogoEvent event,
		@Nonnull ViagogoListing listing,
		@Nullable ViagogoCachedListing cachedListing
	) {
		boolean shouldCreate = super.shouldCreate(event, listing, cachedListing);
		return shouldCreate || (cachedListing != null && !listing.getViagogoEventId().equals(cachedListing.getViagogoEventId()));
	}

	@Override
	protected boolean shouldUpdate(
		@Nonnull ViagogoEvent event,
		@Nonnull ViagogoListing listing,
		@Nullable ViagogoCachedListing cachedListing
	) {
		boolean shouldUpdate = super.shouldUpdate(event, listing, cachedListing);
		return shouldUpdate || (cachedListing != null && !listing.getViagogoEventId().equals(cachedListing.getViagogoEventId()));
	}

	@Override
	protected int getPriority(
		@Nonnull ViagogoEvent event,
		@Nullable ViagogoListing listing,
		@Nullable ViagogoCachedListing cachedListing
	) {
		if (listing == null || cachedListing == null) {
			return 0;
		}

		int priority = 0;

		var r = listing.getRequest();
		var cr = cachedListing.getRequest();

		priority += Objects.equals(r.getNumberOfTickets(), cr.getNumberOfTickets()) ? 0 : 1;
		priority += Objects.equals(r.getSeating(), cr.getSeating()) ? 0 : 1;
		priority += Objects.equals(r.getNotes(), cr.getNotes()) ? 0 : 1;

		return priority;
	}

	@Override
	protected boolean shouldDelete(
		@Nonnull ViagogoEvent event,
		@Nonnull Set<String> inventoryListingIds,
		@Nonnull String listingId,
		@Nonnull ViagogoCachedListing cachedListing
	) {
		return super.shouldDelete(event, inventoryListingIds, listingId, cachedListing)
			|| !event.getViagogoEventId().equals(cachedListing.getViagogoEventId());
	}

	@Override
	protected void createListing(ViagogoEvent event, ViagogoListing listing) throws IOException {
		this.sellerListingsService.createListing(listing.getViagogoEventId(), listing.getRequest());
	}

	@Override
	protected void deleteListing(ViagogoEvent event, String listingId) throws IOException {
		this.sellerListingsService.deleteListingByExternalListingId(listingId);
	}

	@Override
	protected ViagogoCachedListing toCached(ViagogoEvent event, ViagogoListing listing, Status status) {
		return new ViagogoCachedListing(listing, status);
	}

	@Override
	public void check() {
		log.info("[check] begin.");

		StopWatch stopWatch = StopWatch.createStarted();

		// The external IDs in cache.
		var externalIds = this.getExternalIds();

		var deleting = Collections.synchronizedList(new ArrayList<CompletableFuture<Void>>());

		// Check the first page.
		var listings = this.check(request(1), externalIds, deleting).join();

		// Check the next page to the last page.
		log.debug("[check] total items: {}, next link: {}, last link: {}",
			listings.getTotalItems(), listings.getNextLink(), listings.getLastLink());

		// When only 1 page left, the next link and last link is null.
		var next = Optional.ofNullable(listings.getNextLink()).map(SellerListingRequest::from);
		var last = Optional.ofNullable(listings.getLastLink()).map(SellerListingRequest::from);

		var checking = new ArrayList<CompletableFuture<PagedResource<SellerListing>>>();

		if (next.isPresent() && last.isPresent()) {
			for(int i = next.get().getPage(); i <= last.get().getPage(); i++) {
				checking.add(this.check(request(i), externalIds, deleting));
			}
		}

		// Wait all checking to complete.
		log.debug("[check] checking size: {}", checking.size());
		CompletableFuture.allOf(checking.toArray(CompletableFuture[]::new)).join();

		// Wait all deleting to complete.
		log.debug("[check] deleting size: {}", deleting.size());
		CompletableFuture.allOf(deleting.toArray(CompletableFuture[]::new)).join();

		stopWatch.stop();
		log.info("[check] end. Checked {} items in {}", listings.getTotalItems(), stopWatch);
	}

	private Set<String> getExternalIds() {
		var externalIds = this.getCacheNamesStream()
			.map(name -> this.getCache(name).keySet().stream())
			.flatMap(Function.identity())
			.collect(Collectors.toUnmodifiableSet());
		log.debug("[check] externalIds size: {}", externalIds.size());
		return externalIds;
	}

	private CompletableFuture<PagedResource<SellerListing>> check(
		SellerListingRequest request,
		Set<String> externalIds,
		List<CompletableFuture<Void>> deleting
	) {
		return this.<PagedResource<SellerListing>>callAsync(() -> {
			var page = this.getSellerListings(request);
			deleting.addAll(this.check(page, externalIds));
			log.debug("[check] page: {}, deleting size: {}", request.getPage(), deleting.size());
			return page;
		});
	}

	private List<CompletableFuture<Void>> check(
		PagedResource<SellerListing> page,
		Set<String> externalIds
	) {
		return page.getItems().stream()
			.filter(listing -> !externalIds.contains(listing.getExternalId()))
			.map(listing -> this.<Void>callAsync(() -> {
				this.sellerListingsService.deleteListingByExternalListingId(listing.getExternalId());
				return null;
			})).collect(Collectors.toUnmodifiableList());
	}

	private PagedResource<SellerListing> getSellerListings(SellerListingRequest request) {
		return this.retry(() -> {
			try {
				return this.sellerListingsService.getSellerListings(request);
			} catch (IOException e) {
				throw new RetryableException(e);
			}
		});
	}

	private SellerListingRequest request(int page) {
		var r = new SellerListingRequest();
		r.setSort(SellerListingRequest.Sort.EVENT_DATE);
		r.setPage(page);
		r.setPageSize(pageSize);
		return r;
	}

	private final Random random = new Random();

	private <T> T retry(Supplier<T> supplier) {
		int attempts = 0;

		T t = null;

		try {
			t = supplier.get();
		} catch (RetryableException e) {
			if (++attempts < retryConfig.getMaxAttempts()) {
				long delay = random.nextInt(retryConfig.getMaxDelay());
				sleep(delay);
			} else {
				log.debug("attempts: {}", attempts);
				throw e;
			}
		}

		return t;
	}

	private void sleep(long millis) {
		if (millis < 0) {
			return;
		}

		log.debug("sleeping {}", millis);
		ThreadUtils.sleepQuietly(Duration.ofMillis(millis));
	}

	private static class RetryableException extends RuntimeException {

		private static final long serialVersionUID = 2023120801L;

		public RetryableException(Throwable cause) {
			super(cause);
		}

	}

}