Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.when;

import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -60,7 +59,8 @@ public static RecordingScheduler create(final FakeApiClock clock) {

// mock class methods:
// ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
when(mock.schedule(any(Runnable.class), anyLong(), any(TimeUnit.class)))
Mockito.lenient()
.when(mock.schedule(any(Runnable.class), anyLong(), any(TimeUnit.class)))
.then(
new Answer<ScheduledFuture<?>>() {
@Override
Expand All @@ -78,7 +78,8 @@ public ScheduledFuture<?> answer(InvocationOnMock invocation) throws Throwable {
});

// List<Runnable> shutdownNow()
when(mock.shutdownNow())
Mockito.lenient()
.when(mock.shutdownNow())
.then(
new Answer<List<Runnable>>() {
@Override
Expand All @@ -88,10 +89,11 @@ public List<Runnable> answer(InvocationOnMock invocation) throws Throwable {
});

// List<java.time.Duration> getSleepDurations()
when(mock.getSleepDurations()).thenReturn(sleepDurations);
Mockito.lenient().when(mock.getSleepDurations()).thenReturn(sleepDurations);

// int getIterationsCount()
when(mock.getIterationsCount())
Mockito.lenient()
.when(mock.getIterationsCount())
.then(
new Answer<Integer>() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,11 @@
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertTrue;

import com.google.api.core.ApiClock;
import com.google.api.core.ApiFuture;
import com.google.api.core.NanoClock;
import com.google.api.gax.core.FakeApiClock;
import com.google.api.gax.core.RecordingScheduler;
import com.google.api.gax.retrying.FailingCallable.CustomException;
import com.google.api.gax.rpc.testing.FakeCallContext;
import java.time.Duration;
Expand All @@ -51,13 +54,21 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

class ScheduledRetryingExecutorTest extends AbstractRetryingExecutorTest {

// Number of test runs, essential for multithreaded tests.
private static final int EXECUTIONS_COUNT = 5;
private FakeApiClock fakeClock;

@BeforeEach
void setUp() {
fakeClock = new FakeApiClock(0L);
scheduledExecutorService = RecordingScheduler.create(fakeClock);
}

@Override
protected RetryingExecutorWithContext<String> getExecutor(RetryAlgorithm<String> retryAlgorithm) {
Expand All @@ -67,9 +78,17 @@ protected RetryingExecutorWithContext<String> getExecutor(RetryAlgorithm<String>
@Override
protected RetryAlgorithm<String> getAlgorithm(
RetrySettings retrySettings, int apocalypseCountDown, RuntimeException apocalypseException) {
return getAlgorithm(retrySettings, apocalypseCountDown, apocalypseException, fakeClock);
}

protected RetryAlgorithm<String> getAlgorithm(
RetrySettings retrySettings,
int apocalypseCountDown,
RuntimeException apocalypseException,
ApiClock clock) {
return new RetryAlgorithm<>(
new TestResultRetryAlgorithm<String>(apocalypseCountDown, apocalypseException),
new ExponentialRetryAlgorithm(retrySettings, NanoClock.getDefaultClock()));
new ExponentialRetryAlgorithm(retrySettings, clock));
}

private RetryingExecutorWithContext<String> getRetryingExecutor(
Expand All @@ -81,54 +100,61 @@ private RetryingExecutorWithContext<String> getRetryingExecutor(
void testSuccessWithFailuresPeekAttempt() throws Exception {
RetrySettings retrySettings =
FAST_RETRY_SETTINGS.toBuilder()
.setTotalTimeoutDuration(java.time.Duration.ofMillis(1000L))
.setTotalTimeoutDuration(java.time.Duration.ofMillis(10000L))
.setMaxAttempts(100)
.build();
for (int executionsCount = 0; executionsCount < EXECUTIONS_COUNT; executionsCount++) {

FailingCallable callable = new FailingCallable(15, "request", "SUCCESS", tracer);

RetryingExecutorWithContext<String> executor =
getRetryingExecutor(getAlgorithm(retrySettings, 0, null), scheduledExecutorService);
RetryingFuture<String> future =
executor.createFuture(
callable,
FakeCallContext.createDefault().withTracer(tracer).withRetrySettings(retrySettings));
callable.setExternalFuture(future);

assertNull(future.peekAttemptResult());
assertSame(future.peekAttemptResult(), future.peekAttemptResult());
assertFalse(future.getAttemptResult().isDone());
assertFalse(future.getAttemptResult().isCancelled());

future.setAttemptFuture(executor.submit(future));

final AtomicInteger failedAttempts = new AtomicInteger(0);
final AtomicReference<ApiFuture<String>> lastSeenAttempt = new AtomicReference<>();
await()
.pollInterval(Duration.ofMillis(2))
.atMost(Duration.ofSeconds(5))
.until(
() -> {
ApiFuture<String> attemptResult = future.peekAttemptResult();
if (attemptResult != null && attemptResult != lastSeenAttempt.get()) {
lastSeenAttempt.set(attemptResult);
assertTrue(attemptResult.isDone());
assertFalse(attemptResult.isCancelled());
try {
attemptResult.get();
} catch (ExecutionException e) {
if (e.getCause() instanceof CustomException) {
failedAttempts.incrementAndGet();
ScheduledExecutorService localExecutor = Executors.newSingleThreadScheduledExecutor();
try {
FailingCallable callable = new FailingCallable(15, "request", "SUCCESS", tracer);

RetryingExecutorWithContext<String> executor =
getRetryingExecutor(
getAlgorithm(retrySettings, 0, null, NanoClock.getDefaultClock()), localExecutor);
RetryingFuture<String> future =
executor.createFuture(
callable,
FakeCallContext.createDefault()
.withTracer(tracer)
.withRetrySettings(retrySettings));
callable.setExternalFuture(future);

assertNull(future.peekAttemptResult());
assertSame(future.peekAttemptResult(), future.peekAttemptResult());
assertFalse(future.getAttemptResult().isDone());
assertFalse(future.getAttemptResult().isCancelled());

future.setAttemptFuture(executor.submit(future));

final AtomicInteger failedAttempts = new AtomicInteger(0);
final AtomicReference<ApiFuture<String>> lastSeenAttempt = new AtomicReference<>();
await()
.pollInterval(Duration.ofMillis(2))
.atMost(Duration.ofSeconds(5))
.until(
() -> {
ApiFuture<String> attemptResult = future.peekAttemptResult();
if (attemptResult != null && attemptResult != lastSeenAttempt.get()) {
lastSeenAttempt.set(attemptResult);
assertTrue(attemptResult.isDone());
assertFalse(attemptResult.isCancelled());
try {
attemptResult.get();
} catch (ExecutionException e) {
if (e.getCause() instanceof CustomException) {
failedAttempts.incrementAndGet();
}
}
}
}
return future.isDone();
});

assertFutureSuccess(future);
assertEquals(15, future.getAttemptSettings().getAttemptCount());
assertTrue(failedAttempts.get() > 0);
return future.isDone();
});

assertFutureSuccess(future);
assertEquals(15, future.getAttemptSettings().getAttemptCount());
assertTrue(failedAttempts.get() > 0);
} finally {
localExecutor.shutdownNow();
}
}
}

Expand Down Expand Up @@ -260,35 +286,43 @@ void testCancelOuterFutureAfterStart() throws Exception {
.setJittered(false)
.build();
for (int executionsCount = 0; executionsCount < EXECUTIONS_COUNT; executionsCount++) {
FailingCallable callable = new FailingCallable(4, "request", "SUCCESS", tracer);
RetryingExecutorWithContext<String> executor =
getRetryingExecutor(getAlgorithm(retrySettings, 0, null), scheduledExecutorService);
RetryingFuture<String> future =
executor.createFuture(
callable,
FakeCallContext.createDefault().withTracer(tracer).withRetrySettings(retrySettings));
callable.setExternalFuture(future);
future.setAttemptFuture(executor.submit(future));

await()
.atMost(Duration.ofSeconds(5))
.until(
() ->
future.getAttemptSettings() != null
&& future.getAttemptSettings().getAttemptCount() > 0);

boolean res = future.cancel(false);
assertTrue(res);
assertFutureCancel(future);

// Verify that the cancelled future is traced. Every attempt increases the number
// of cancellation attempts from the tracer.
Mockito.verify(tracer, Mockito.times(executionsCount + 1)).attemptCancelled();

// Assert that future has at least been attempted once
// i.e. The future from executor.submit() has been run by the ScheduledExecutor
assertTrue(future.getAttemptSettings().getAttemptCount() > 0);
assertTrue(future.getAttemptSettings().getAttemptCount() < 4);
ScheduledExecutorService localExecutor = Executors.newSingleThreadScheduledExecutor();
try {
FailingCallable callable = new FailingCallable(4, "request", "SUCCESS", tracer);
RetryingExecutorWithContext<String> executor =
getRetryingExecutor(
getAlgorithm(retrySettings, 0, null, NanoClock.getDefaultClock()), localExecutor);
RetryingFuture<String> future =
executor.createFuture(
callable,
FakeCallContext.createDefault()
.withTracer(tracer)
.withRetrySettings(retrySettings));
callable.setExternalFuture(future);
future.setAttemptFuture(executor.submit(future));

await()
.atMost(Duration.ofSeconds(5))
.until(
() ->
future.getAttemptSettings() != null
&& future.getAttemptSettings().getAttemptCount() > 0);

boolean res = future.cancel(false);
assertTrue(res);
assertFutureCancel(future);

// Verify that the cancelled future is traced. Every attempt increases the number
// of cancellation attempts from the tracer.
Mockito.verify(tracer, Mockito.times(executionsCount + 1)).attemptCancelled();

// Assert that future has at least been attempted once
// i.e. The future from executor.submit() has been run by the ScheduledExecutor
assertTrue(future.getAttemptSettings().getAttemptCount() > 0);
assertTrue(future.getAttemptSettings().getAttemptCount() < 4);
} finally {
localExecutor.shutdownNow();
}
}
}

Expand Down
Loading