RedissonCachedSellerListingService.java
package org.oxerr.viagogo.client.cached.redisson.inventory;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.commons.lang3.ThreadUtils;
import org.apache.commons.lang3.time.StopWatch;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.oxerr.ticket.inventory.support.cached.redisson.ListingConfiguration;
import org.oxerr.ticket.inventory.support.cached.redisson.RedissonCachedListingServiceSupport;
import org.oxerr.ticket.inventory.support.cached.redisson.Status;
import org.oxerr.viagogo.client.cached.inventory.CachedSellerListingService;
import org.oxerr.viagogo.client.cached.inventory.CachedSellerListingsService;
import org.oxerr.viagogo.client.cached.inventory.ViagogoEvent;
import org.oxerr.viagogo.client.cached.inventory.ViagogoListing;
import org.oxerr.viagogo.client.inventory.SellerListingService;
import org.oxerr.viagogo.model.request.inventory.CreateSellerListingRequest;
import org.oxerr.viagogo.model.request.inventory.SellerListingRequest;
import org.oxerr.viagogo.model.response.PagedResource;
import org.oxerr.viagogo.model.response.inventory.SellerListing;
import org.redisson.api.RedissonClient;
public class RedissonCachedSellerListingService
extends RedissonCachedListingServiceSupport<String, String, CreateSellerListingRequest, ViagogoListing, ViagogoEvent, ViagogoCachedListing>
implements CachedSellerListingsService, CachedSellerListingService {
private final Logger log = LogManager.getLogger();
private final SellerListingService sellerListingService;
private final int pageSize;
private final RetryConfiguration retryConfig;
@Deprecated(since = "5.0.0", forRemoval = true)
public RedissonCachedSellerListingService(
SellerListingService sellerListingService,
RedissonClient redissonClient,
String keyPrefix,
Executor executor,
boolean create
) {
this(sellerListingService, redissonClient, keyPrefix, executor, new ListingConfiguration(create, true, true), 10_000, new RetryConfiguration());
}
/**
* Constructs with default {@link ListingConfiguration} and default {@link RetryConfiguration}.
*
* @param sellerListingService the seller listing service.
* @param redissonClient the redisson client.
* @param keyPrefix the key prefix for the cache.
* @param executor the executor.
*
* @since 5.0.0
*/
public RedissonCachedSellerListingService(
SellerListingService sellerListingService,
RedissonClient redissonClient,
String keyPrefix,
Executor executor
) {
this(sellerListingService, redissonClient, keyPrefix, executor, new ListingConfiguration(), 10_000, new RetryConfiguration());
}
/**
* Constructs with specified {@link ListingConfiguration} and specified {@link RetryConfiguration}.
*
* @param sellerListingService the seller listing service.
* @param redissonClient the redisson client.
* @param keyPrefix the key prefix for the cache.
* @param executor the executor.
* @param listingConfiguration the listing configuration.
* @param pageSize the page size when do check.
* @param retryConfiguration the retry configuration.
*
* @since 5.0.0
*/
public RedissonCachedSellerListingService(
SellerListingService sellerListingService,
RedissonClient redissonClient,
String keyPrefix,
Executor executor,
ListingConfiguration listingConfiguration,
int pageSize,
RetryConfiguration retryConfiguration
) {
super(redissonClient, keyPrefix, executor, listingConfiguration);
this.sellerListingService = sellerListingService;
this.pageSize = pageSize;
this.retryConfig = retryConfiguration;
}
@Override
protected boolean shouldCreate(
@Nonnull ViagogoEvent event,
@Nonnull ViagogoListing listing,
@Nullable ViagogoCachedListing cachedListing
) {
boolean shouldCreate = super.shouldCreate(event, listing, cachedListing);
return shouldCreate || (cachedListing != null && !listing.getViagogoEventId().equals(cachedListing.getViagogoEventId()));
}
@Override
protected boolean shouldUpdate(
@Nonnull ViagogoEvent event,
@Nonnull ViagogoListing listing,
@Nullable ViagogoCachedListing cachedListing
) {
boolean shouldUpdate = super.shouldUpdate(event, listing, cachedListing);
return shouldUpdate || (cachedListing != null && !listing.getViagogoEventId().equals(cachedListing.getViagogoEventId()));
}
@Override
protected int getPriority(
@Nonnull ViagogoEvent event,
@Nullable ViagogoListing listing,
@Nullable ViagogoCachedListing cachedListing
) {
if (listing == null || cachedListing == null) {
return 0;
}
int priority = 0;
var r = listing.getRequest();
var cr = cachedListing.getRequest();
priority += Objects.equals(r.getNumberOfTickets(), cr.getNumberOfTickets()) ? 0 : 1;
priority += Objects.equals(r.getSeating(), cr.getSeating()) ? 0 : 1;
priority += Objects.equals(r.getNotes(), cr.getNotes()) ? 0 : 1;
log.trace("[getPriority] listing request: {}, cached listing request: {} priority: {}", r, cr, priority);
return priority;
}
@Override
protected boolean shouldDelete(
@Nonnull ViagogoEvent event,
@Nonnull Set<String> inventoryListingIds,
@Nonnull String listingId,
@Nonnull ViagogoCachedListing cachedListing
) {
return super.shouldDelete(event, inventoryListingIds, listingId, cachedListing)
|| !event.getViagogoEventId().equals(cachedListing.getViagogoEventId());
}
@Override
protected void createListing(ViagogoEvent event, ViagogoListing listing) throws IOException {
this.sellerListingService.createListing(listing.getViagogoEventId(), listing.getRequest());
}
@Override
protected void deleteListing(ViagogoEvent event, String listingId) throws IOException {
this.sellerListingService.deleteListingByExternalListingId(listingId);
}
@Override
protected ViagogoCachedListing toCached(ViagogoEvent event, ViagogoListing listing, Status status) {
return new ViagogoCachedListing(new ViagogoCachedEvent(event), listing, status);
}
private class CheckContext {
private final Map<String, String> externalIdToCacheName;
/**
* The external IDs listed on viagogo.
*/
private final Set<String> listedExternalIds;
private final List<CompletableFuture<Void>> tasks;
private final List<CompletableFuture<PagedResource<SellerListing>>> checkings;
public CheckContext(
Map<String, String> externalIdToCacheName,
List<CompletableFuture<Void>> tasks,
List<CompletableFuture<PagedResource<SellerListing>>> checkings
) {
this.externalIdToCacheName = externalIdToCacheName;
this.listedExternalIds = new HashSet<>();
this.tasks = tasks;
this.checkings = checkings;
}
public Map<String, String> getExternalIdToCacheName() {
return externalIdToCacheName;
}
public List<CompletableFuture<Void>> getTasks() {
return tasks;
}
public List<CompletableFuture<PagedResource<SellerListing>>> getCheckings() {
return checkings;
}
/**
* Adds external IDs which is listed on viagogo.
*
* @param externalId the external ID.
*/
public void addListedExternalId(String externalId) {
listedExternalIds.add(externalId);
}
/**
* Returns the missing external IDs on viagogo.
*
* @return the missing external IDs.
*/
public Set<String> getMissingExternalIds() {
var missingExternalIds = new HashSet<>(externalIdToCacheName.keySet());
missingExternalIds.removeAll(listedExternalIds);
log.debug("missingExternalIds count: {}", missingExternalIds::size);
return missingExternalIds;
}
}
@Override
public void check() {
log.info("[check] begin.");
// Create a stop watch to measure the time taken to check the listings.
StopWatch stopWatch = StopWatch.createStarted();
// Create a new check context.
CheckContext context = newCheckContext();
// Check the first page.
PagedResource<SellerListing> listings = this.check(request(1), context).join();
// Check the next page to the last page.
log.debug("[check] total items: {}, next link: {}, last link: {}",
listings.getTotalItems(), listings.getNextLink(), listings.getLastLink());
// Check subsequent pages if available
// When only 1 page left, the next link and last link is null.
Optional.ofNullable(listings.getNextLink()).map(SellerListingRequest::from)
.ifPresent(next -> Optional.ofNullable(listings.getLastLink()).map(SellerListingRequest::from)
.ifPresent(last -> IntStream.rangeClosed(next.getPage(), last.getPage())
.mapToObj(this::request)
.map(request -> this.check(request, context)).forEach(context.getCheckings()::add)
)
);
// Wait all checking to complete.
log.debug("[check] checking size: {}", context.getCheckings().size());
CompletableFuture.allOf(context.getCheckings().toArray(CompletableFuture[]::new)).join();
// Wait all tasks to complete.
log.debug("[check] tasks size: {}", context.getTasks().size());
CompletableFuture.allOf(context.getTasks().toArray(CompletableFuture[]::new)).join();
// Create the listings which in cache but not on viagogo.
context.getMissingExternalIds().forEach(t -> {
var cacheName = context.getExternalIdToCacheName().get(t);
var cache = this.getCache(cacheName);
var viagogoCachedListing = cache.get(t);
if (viagogoCachedListing != null) {
// Double check if the cached listing still exists.
var viagogoEvent = viagogoCachedListing.getEvent().toViagogoEvent();
var viagogoListing = viagogoCachedListing.toViagogoListing();
try {
this.createListing(viagogoEvent, viagogoListing);
} catch (IOException e) {
log.warn("Create listing failed, external ID: {}.", viagogoListing.getId(), e);
}
}
});
// Log the time taken to check the listings.
stopWatch.stop();
log.info("[check] end. Checked {} items in {}", listings.getTotalItems(), stopWatch);
}
/**
* Creates a new check context.
*
* @return a new check context.
*/
private CheckContext newCheckContext() {
// The mapping of external IDs to their corresponding cache names.
var externalIdToCacheName = this.getExternalIdToCacheName();
// The tasks to delete or update the listings.
var tasks = Collections.synchronizedList(new ArrayList<CompletableFuture<Void>>());
// The checking tasks.
var checkings = new ArrayList<CompletableFuture<PagedResource<SellerListing>>>();
// The context for checking.
return new CheckContext(externalIdToCacheName, tasks, checkings);
}
/**
* Retrieves a mapping of external IDs to their corresponding cache names.
*
* This method iterates over all available cache names, retrieves each cache,
* and then creates a map entry for each external ID pointing to its cache name.
*
* @return a map where the keys are external IDs and the values are cache names.
*/
private Map<String, String> getExternalIdToCacheName() {
// Create a map to hold the external ID to cache name mapping
Map<String, String> externalIdToCacheName =
this.getCacheNamesStream() // Stream of cache names
.flatMap(cacheName ->
// Retrieve the cache and create a stream of externalId-to-cacheName entries
this.getCache(cacheName).keySet().stream()
.map(externalId -> Map.entry(externalId, cacheName))
)
// Collect the entries into a map
.collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, Map.Entry::getValue));
// Log the size of the resulting map for debugging purposes
log.debug("[check] externalIdToCacheName size: {}", externalIdToCacheName.size());
// Return the map of external IDs to cache names
return externalIdToCacheName;
}
/**
* Checks the listings of the request.
*
* @param request the request.
* @param context the context.
* @return the page in checking.
*/
private CompletableFuture<PagedResource<SellerListing>> check(SellerListingRequest request, CheckContext context) {
return this.<PagedResource<SellerListing>>callAsync(() -> {
var page = this.getSellerListings(request);
this.check(page, context);
log.debug("[check] page: {}, tasks size: {}", request.getPage(), context.getTasks().size());
return page;
});
}
/**
* Checks the listings in the page.
*
* @param page the page.
* @param context the context.
*/
private void check(PagedResource<SellerListing> page, CheckContext context) {
// Delete the listings not in the page
var deleteTasks = page.getItems().stream()
.filter(listing -> !context.getExternalIdToCacheName().keySet().contains(listing.getExternalId()))
.map(listing -> this.<Void>callAsync(() -> {
this.sellerListingService.deleteListingByExternalListingId(listing.getExternalId());
return null;
})).collect(Collectors.toUnmodifiableList());
context.getTasks().addAll(deleteTasks);
// Check the listings in the page.
page.getItems().stream()
.filter(listing -> context.getExternalIdToCacheName().keySet().contains(listing.getExternalId()))
.forEach((SellerListing listing) -> check(listing, context));
}
/**
* Checks the listing.
*
* If the listing is not cached, delete the listing from viagogo.
* If the listing is not same as the cached listing, update the listing.
*
* @param listing the listing.
* @param context the context.
*/
private void check(SellerListing listing, CheckContext context) {
log.trace("Checking {}", listing.getExternalId());
context.addListedExternalId(listing.getExternalId());
String cacheName = context.getExternalIdToCacheName().get(listing.getExternalId());
ViagogoCachedListing cachedListing = this.getCache(cacheName).get(listing.getExternalId());
if (cachedListing == null) {
// Double check the listing if it is not cached.
// If the listing is not cached, delete the listing from viagogo.
context.getTasks().add(this.<Void>callAsync(() -> {
log.trace("Deleting {}", listing.getExternalId());
this.sellerListingService.deleteListingByExternalListingId(listing.getExternalId());
return null;
}));
} else if (!isSame(listing, cachedListing.getRequest())) {
// If the listing is not same as the cached listing, update the listing.
context.getTasks().add(this.<Void>callAsync(() -> {
log.trace("Updating {}", listing.getExternalId());
var e = cachedListing.getEvent().toViagogoEvent();
var l = cachedListing.toViagogoListing();
var p = getPriority(e, l, cachedListing);
if (e.getViagogoEventId().equals(listing.getEvent().getId())) {
this.updateListing(e, l, p);
} else {
log.warn("Viagogo Event ID mismatch: {} != {}, event ID = {}",
e.getViagogoEventId(), listing.getEvent().getId(), e.getId());
this.deleteListing(e, listing.getExternalId(), p);
}
return null;
}));
}
}
private boolean isSame(SellerListing l, CreateSellerListingRequest r) {
var same = Listings.isSame(l, r);
log.trace("[isSame] externalId: {}, numberOfTickets: {} -> {}, seating: {} -> {}, ticketPrice: {} {} -> {} {}, isSame: {}",
l::getExternalId,
l::getNumberOfTickets,
r::getNumberOfTickets,
l::getSeating,
r::getSeating,
() -> l.getTicketPrice().getCurrencyCode(),
() -> l.getTicketPrice().getAmount(),
() -> r.getTicketPrice().getCurrencyCode(),
() -> r.getTicketPrice().getAmount(),
() -> same
);
return same;
}
/**
* Gets the seller listings.
*
* @param request the request.
* @return the seller listings.
*/
private PagedResource<SellerListing> getSellerListings(SellerListingRequest request) {
return this.retry(() -> {
try {
return this.sellerListingService.getSellerListings(request);
} catch (IOException e) {
throw new RetryableException(e);
}
});
}
/**
* Creates a seller listing request.
*
* @param page the page.
* @return a seller listing request.
*/
private SellerListingRequest request(int page) {
var r = new SellerListingRequest();
r.setSort(SellerListingRequest.Sort.EVENT_DATE);
r.setPage(page);
r.setPageSize(pageSize);
return r;
}
private final Random random = new Random();
private <T> T retry(Supplier<T> supplier) {
int attempts = 0;
T t = null;
try {
t = supplier.get();
} catch (RetryableException e) {
if (++attempts < retryConfig.getMaxAttempts()) {
long delay = random.nextInt(retryConfig.getMaxDelay());
sleep(delay);
} else {
log.debug("attempts: {}", attempts);
throw e;
}
}
return t;
}
private void sleep(long millis) {
if (millis < 0) {
return;
}
log.debug("sleeping {}", millis);
ThreadUtils.sleepQuietly(Duration.ofMillis(millis));
}
private static class RetryableException extends RuntimeException {
private static final long serialVersionUID = 2023120801L;
public RetryableException(Throwable cause) {
super(cause);
}
}
}