Defensively cancel frame processing tasks on error.

FrameProcessingTaskExecutor should be released on error.
There can be a delay until this happens, so
FrameProcessingTaskExecutor will cancel any pending tasks
and drop new tasks until it is released.

PiperOrigin-RevId: 468171820
This commit is contained in:
Googler 2022-08-17 12:25:53 +00:00 committed by Marc Baechinger
parent 9f6940eaa4
commit 0c961a7abd

View File

@ -33,7 +33,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
*
* <p>The wrapper handles calling {@link
* FrameProcessor.Listener#onFrameProcessingError(FrameProcessingException)} for errors that occur
* during these tasks.
* during these tasks. Errors are assumed to be non-recoverable, so the {@code
* FrameProcessingTaskExecutor} should be released if an error occurs.
*
* <p>{@linkplain #submitWithHighPriority(FrameProcessingTask) High priority tasks} are always
* executed before {@linkplain #submit(FrameProcessingTask) default priority tasks}. Tasks with
@ -69,9 +70,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
try {
futures.add(wrapTaskAndSubmitToExecutorService(task));
} catch (RejectedExecutionException e) {
if (!shouldCancelTasks.getAndSet(true)) {
listener.onFrameProcessingError(new FrameProcessingException(e));
}
handleException(e);
}
}
@ -104,9 +103,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
public void release(FrameProcessingTask releaseTask, long releaseWaitTimeMs)
throws InterruptedException {
shouldCancelTasks.getAndSet(true);
while (!futures.isEmpty()) {
futures.remove().cancel(/* mayInterruptIfRunning= */ false);
}
cancelNonStartedTasks();
Future<?> releaseFuture = wrapTaskAndSubmitToExecutorService(releaseTask);
singleThreadExecutorService.shutdown();
try {
@ -129,11 +126,27 @@ import java.util.concurrent.atomic.AtomicBoolean;
defaultPriorityTask.run();
removeFinishedFutures();
} catch (FrameProcessingException | GlUtil.GlException | RuntimeException e) {
listener.onFrameProcessingError(FrameProcessingException.from(e));
handleException(e);
}
});
}
private void cancelNonStartedTasks() {
while (!futures.isEmpty()) {
futures.remove().cancel(/* mayInterruptIfRunning= */ false);
}
}
private void handleException(Exception exception) {
if (shouldCancelTasks.getAndSet(true)) {
// Ignore exception after cancelation as it can be caused by a previously reported exception
// that is the reason for the cancelation.
return;
}
listener.onFrameProcessingError(FrameProcessingException.from(exception));
cancelNonStartedTasks();
}
private void removeFinishedFutures() {
while (!futures.isEmpty()) {
if (!futures.element().isDone()) {
@ -141,11 +154,12 @@ import java.util.concurrent.atomic.AtomicBoolean;
}
try {
futures.remove().get();
} catch (ExecutionException e) {
listener.onFrameProcessingError(new FrameProcessingException(e));
} catch (ExecutionException impossible) {
// All exceptions are already caught in wrapTaskAndSubmitToExecutorService.
handleException(new IllegalStateException("Unexpected error", impossible));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
listener.onFrameProcessingError(new FrameProcessingException(e));
handleException(e);
}
}
}