diff --git a/libraries/effect/src/main/java/androidx/media3/effect/FrameProcessingTaskExecutor.java b/libraries/effect/src/main/java/androidx/media3/effect/FrameProcessingTaskExecutor.java index 8b345f626a..1148921b1b 100644 --- a/libraries/effect/src/main/java/androidx/media3/effect/FrameProcessingTaskExecutor.java +++ b/libraries/effect/src/main/java/androidx/media3/effect/FrameProcessingTaskExecutor.java @@ -17,24 +17,27 @@ package androidx.media3.effect; import static java.util.concurrent.TimeUnit.MILLISECONDS; +import androidx.annotation.GuardedBy; +import androidx.annotation.Nullable; import androidx.media3.common.FrameProcessingException; import androidx.media3.common.FrameProcessor; -import androidx.media3.common.util.GlUtil; -import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.ArrayDeque; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.atomic.AtomicBoolean; /** * Wrapper around a single thread {@link ExecutorService} for executing {@link FrameProcessingTask} * instances. * + *

Public methods can be called from any thread. + * *

The wrapper handles calling {@link * FrameProcessor.Listener#onFrameProcessingError(FrameProcessingException)} for errors that occur - * during these tasks. Errors are assumed to be non-recoverable, so the {@code - * FrameProcessingTaskExecutor} should be released if an error occurs. + * during these tasks. The listener is invoked from the {@link ExecutorService}. Errors are assumed + * to be non-recoverable, so the {@code FrameProcessingTaskExecutor} should be released if an error + * occurs. * *

{@linkplain #submitWithHighPriority(FrameProcessingTask) High priority tasks} are always * executed before {@linkplain #submit(FrameProcessingTask) default priority tasks}. Tasks with @@ -44,33 +47,43 @@ import java.util.concurrent.atomic.AtomicBoolean; private final ExecutorService singleThreadExecutorService; private final FrameProcessor.Listener listener; - private final ConcurrentLinkedQueue> futures; - private final ConcurrentLinkedQueue highPriorityTasks; - private final AtomicBoolean shouldCancelTasks; + private final Object lock; + + @GuardedBy("lock") + private final ArrayDeque highPriorityTasks; + + @GuardedBy("lock") + private boolean shouldCancelTasks; /** Creates a new instance. */ public FrameProcessingTaskExecutor( ExecutorService singleThreadExecutorService, FrameProcessor.Listener listener) { this.singleThreadExecutorService = singleThreadExecutorService; this.listener = listener; - - futures = new ConcurrentLinkedQueue<>(); - highPriorityTasks = new ConcurrentLinkedQueue<>(); - shouldCancelTasks = new AtomicBoolean(); + lock = new Object(); + highPriorityTasks = new ArrayDeque<>(); } /** * Submits the given {@link FrameProcessingTask} to be executed after all pending tasks have * completed. */ + @SuppressWarnings("FutureReturnValueIgnored") public void submit(FrameProcessingTask task) { - if (shouldCancelTasks.get()) { - return; + @Nullable RejectedExecutionException executionException = null; + synchronized (lock) { + if (shouldCancelTasks) { + return; + } + try { + wrapTaskAndSubmitToExecutorService(task, /* isReleaseTask= */ false); + } catch (RejectedExecutionException e) { + executionException = e; + } } - try { - futures.add(wrapTaskAndSubmitToExecutorService(task)); - } catch (RejectedExecutionException e) { - handleException(e); + + if (executionException != null) { + handleException(executionException); } } @@ -82,10 +95,12 @@ import java.util.concurrent.atomic.AtomicBoolean; * high-priority and have not started executing will be executed after this task is complete. */ public void submitWithHighPriority(FrameProcessingTask task) { - if (shouldCancelTasks.get()) { - return; + synchronized (lock) { + if (shouldCancelTasks) { + return; + } + highPriorityTasks.add(task); } - highPriorityTasks.add(task); // If the ExecutorService has non-started tasks, the first of these non-started tasks will run // the task passed to this method. Just in case there are no non-started tasks, submit another // task to run high-priority tasks. @@ -102,9 +117,12 @@ import java.util.concurrent.atomic.AtomicBoolean; */ public void release(FrameProcessingTask releaseTask, long releaseWaitTimeMs) throws InterruptedException { - shouldCancelTasks.getAndSet(true); - cancelNonStartedTasks(); - Future releaseFuture = wrapTaskAndSubmitToExecutorService(releaseTask); + synchronized (lock) { + shouldCancelTasks = true; + highPriorityTasks.clear(); + } + Future releaseFuture = + wrapTaskAndSubmitToExecutorService(releaseTask, /* isReleaseTask= */ true); singleThreadExecutorService.shutdown(); try { if (!singleThreadExecutorService.awaitTermination(releaseWaitTimeMs, MILLISECONDS)) { @@ -116,51 +134,44 @@ import java.util.concurrent.atomic.AtomicBoolean; } } - private Future wrapTaskAndSubmitToExecutorService(FrameProcessingTask defaultPriorityTask) { + private Future wrapTaskAndSubmitToExecutorService( + FrameProcessingTask defaultPriorityTask, boolean isReleaseTask) { return singleThreadExecutorService.submit( () -> { try { - while (!highPriorityTasks.isEmpty()) { - highPriorityTasks.remove().run(); + synchronized (lock) { + if (shouldCancelTasks && !isReleaseTask) { + return; + } + } + + @Nullable FrameProcessingTask nextHighPriorityTask; + while (true) { + synchronized (lock) { + // Lock only polling to prevent blocking the public method calls. + nextHighPriorityTask = highPriorityTasks.poll(); + } + if (nextHighPriorityTask == null) { + break; + } + nextHighPriorityTask.run(); } defaultPriorityTask.run(); - removeFinishedFutures(); - } catch (FrameProcessingException | GlUtil.GlException | RuntimeException e) { + } catch (Exception e) { handleException(e); } }); } - private void cancelNonStartedTasks() { - while (!futures.isEmpty()) { - futures.remove().cancel(/* mayInterruptIfRunning= */ false); - } - } - private void handleException(Exception exception) { - if (shouldCancelTasks.getAndSet(true)) { - // Ignore exception after cancelation as it can be caused by a previously reported exception - // that is the reason for the cancelation. - return; - } - listener.onFrameProcessingError(FrameProcessingException.from(exception)); - cancelNonStartedTasks(); - } - - private void removeFinishedFutures() { - while (!futures.isEmpty()) { - if (!futures.element().isDone()) { + synchronized (lock) { + if (shouldCancelTasks) { + // Ignore exception after cancelation as it can be caused by a previously reported exception + // that is the reason for the cancelation. return; } - try { - futures.remove().get(); - } catch (ExecutionException impossible) { - // All exceptions are already caught in wrapTaskAndSubmitToExecutorService. - handleException(new IllegalStateException("Unexpected error", impossible)); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - handleException(e); - } + shouldCancelTasks = true; } + listener.onFrameProcessingError(FrameProcessingException.from(exception)); } }