diff --git a/libraries/effect/src/main/java/androidx/media3/effect/FrameProcessingTaskExecutor.java b/libraries/effect/src/main/java/androidx/media3/effect/FrameProcessingTaskExecutor.java index d5e08c7d95..8b345f626a 100644 --- a/libraries/effect/src/main/java/androidx/media3/effect/FrameProcessingTaskExecutor.java +++ b/libraries/effect/src/main/java/androidx/media3/effect/FrameProcessingTaskExecutor.java @@ -33,7 +33,8 @@ import java.util.concurrent.atomic.AtomicBoolean; * *
The wrapper handles calling {@link * FrameProcessor.Listener#onFrameProcessingError(FrameProcessingException)} for errors that occur - * during these tasks. + * during these tasks. Errors are assumed to be non-recoverable, so the {@code + * FrameProcessingTaskExecutor} should be released if an error occurs. * *
{@linkplain #submitWithHighPriority(FrameProcessingTask) High priority tasks} are always * executed before {@linkplain #submit(FrameProcessingTask) default priority tasks}. Tasks with @@ -69,9 +70,7 @@ import java.util.concurrent.atomic.AtomicBoolean; try { futures.add(wrapTaskAndSubmitToExecutorService(task)); } catch (RejectedExecutionException e) { - if (!shouldCancelTasks.getAndSet(true)) { - listener.onFrameProcessingError(new FrameProcessingException(e)); - } + handleException(e); } } @@ -104,9 +103,7 @@ import java.util.concurrent.atomic.AtomicBoolean; public void release(FrameProcessingTask releaseTask, long releaseWaitTimeMs) throws InterruptedException { shouldCancelTasks.getAndSet(true); - while (!futures.isEmpty()) { - futures.remove().cancel(/* mayInterruptIfRunning= */ false); - } + cancelNonStartedTasks(); Future> releaseFuture = wrapTaskAndSubmitToExecutorService(releaseTask); singleThreadExecutorService.shutdown(); try { @@ -129,11 +126,27 @@ import java.util.concurrent.atomic.AtomicBoolean; defaultPriorityTask.run(); removeFinishedFutures(); } catch (FrameProcessingException | GlUtil.GlException | RuntimeException e) { - listener.onFrameProcessingError(FrameProcessingException.from(e)); + handleException(e); } }); } + private void cancelNonStartedTasks() { + while (!futures.isEmpty()) { + futures.remove().cancel(/* mayInterruptIfRunning= */ false); + } + } + + private void handleException(Exception exception) { + if (shouldCancelTasks.getAndSet(true)) { + // Ignore exception after cancelation as it can be caused by a previously reported exception + // that is the reason for the cancelation. + return; + } + listener.onFrameProcessingError(FrameProcessingException.from(exception)); + cancelNonStartedTasks(); + } + private void removeFinishedFutures() { while (!futures.isEmpty()) { if (!futures.element().isDone()) { @@ -141,11 +154,12 @@ import java.util.concurrent.atomic.AtomicBoolean; } try { futures.remove().get(); - } catch (ExecutionException e) { - listener.onFrameProcessingError(new FrameProcessingException(e)); + } catch (ExecutionException impossible) { + // All exceptions are already caught in wrapTaskAndSubmitToExecutorService. + handleException(new IllegalStateException("Unexpected error", impossible)); } catch (InterruptedException e) { Thread.currentThread().interrupt(); - listener.onFrameProcessingError(new FrameProcessingException(e)); + handleException(e); } } }