PageIterator.java
package org.oxerr.stubhub.client.cxf.impl.util;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.function.Supplier;
import org.apache.commons.lang3.time.StopWatch;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import io.github.resilience4j.retry.Retry;
import io.github.resilience4j.retry.RetryConfig;
import jakarta.ws.rs.InternalServerErrorException;
import jakarta.ws.rs.client.ResponseProcessingException;
public abstract class PageIterator<T> implements Iterator<T> {
private final Logger log = LogManager.getFormatterLogger();
private final int pageSize;
private final Retry retry;
private Long paginationToken;
private Iterator<T> current = Collections.emptyIterator();
private boolean finished;
private int loadedPageCount;
private StopWatch stopWatch = new StopWatch();
protected PageIterator(int pageSize) {
this(pageSize, defaultRetry());
}
protected PageIterator(int pageSize, Retry retry) {
this.pageSize = pageSize;
this.retry = retry == null ? defaultRetry() : retry;
}
private static Retry defaultRetry() {
RetryConfig config = RetryConfig.custom()
.maxAttempts(3)
.retryExceptions(
ResponseProcessingException.class,
InternalServerErrorException.class
)
.failAfterMaxAttempts(true)
.build();
return Retry.of("pageIteratorFetchPage", config);
}
@Override
public boolean hasNext() {
loadIfNeeded();
return current.hasNext();
}
@Override
public T next() {
loadIfNeeded();
return current.next();
}
private void loadIfNeeded() {
if (!finished && !current.hasNext()) {
if (!stopWatch.isStarted()) {
stopWatch.start();
}
long startTime = System.nanoTime();
Page<T> page = fetchPageWithRetry(this.paginationToken);
long elapsedNanos = System.nanoTime() - startTime;
paginationToken = page.getPaginationToken();
current = page.getItems().iterator();
finished = page.getNumberOfItems() < pageSize;
loadedPageCount++;
log.debug(
"loaded page: cursor=%,d, numberOfItems=%,d, timeUsed=%,d ms,"
+ " loadedPageCount=%,d, elapsed=%s, avgTimePerPage=%,d ms",
paginationToken, page.getNumberOfItems(),
elapsedNanos / 1_000_000, loadedPageCount, stopWatch,
stopWatch.getTime() / loadedPageCount);
}
}
private Page<T> fetchPageWithRetry(Long paginationToken) {
Supplier<Page<T>> supplier = () -> fetchPage(paginationToken);
Supplier<Page<T>> decorated = Retry.decorateSupplier(retry, supplier);
try {
return decorated.get();
} catch (ResponseProcessingException e) {
log.warn("failed to fetch page after retries: cursor={}", paginationToken, e);
throw e;
}
}
protected abstract Page<T> fetchPage(Long paginationToken);
public interface Page<T> {
Long getPaginationToken();
Integer getNumberOfItems();
List<T> getItems();
}
public class PageImpl implements Page<T> {
private Long paginationToken;
private Integer numberOfItems;
private List<T> items;
public PageImpl(Long paginationToken, Integer numberOfItems, List<T> items) {
this.paginationToken = paginationToken;
this.numberOfItems = numberOfItems != null ? numberOfItems : 0;
this.items = items != null ? items : Collections.emptyList();
}
@Override
public Long getPaginationToken() {
return paginationToken;
}
@Override
public Integer getNumberOfItems() {
return numberOfItems;
}
@Override
public List<T> getItems() {
return items;
}
}
}