diff --git a/libraries/effect/src/main/java/androidx/media3/effect/FrameProcessingTaskExecutor.java b/libraries/effect/src/main/java/androidx/media3/effect/FrameProcessingTaskExecutor.java index a87356e965..d5e08c7d95 100644 --- a/libraries/effect/src/main/java/androidx/media3/effect/FrameProcessingTaskExecutor.java +++ b/libraries/effect/src/main/java/androidx/media3/effect/FrameProcessingTaskExecutor.java @@ -34,12 +34,17 @@ import java.util.concurrent.atomic.AtomicBoolean; *

The wrapper handles calling {@link * FrameProcessor.Listener#onFrameProcessingError(FrameProcessingException)} for errors that occur * during these tasks. + * + *

{@linkplain #submitWithHighPriority(FrameProcessingTask) High priority tasks} are always + * executed before {@linkplain #submit(FrameProcessingTask) default priority tasks}. Tasks with + * equal priority are executed in FIFO order. */ /* package */ final class FrameProcessingTaskExecutor { private final ExecutorService singleThreadExecutorService; private final FrameProcessor.Listener listener; private final ConcurrentLinkedQueue> futures; + private final ConcurrentLinkedQueue highPriorityTasks; private final AtomicBoolean shouldCancelTasks; /** Creates a new instance. */ @@ -49,11 +54,12 @@ import java.util.concurrent.atomic.AtomicBoolean; this.listener = listener; futures = new ConcurrentLinkedQueue<>(); + highPriorityTasks = new ConcurrentLinkedQueue<>(); shouldCancelTasks = new AtomicBoolean(); } /** - * Submits the given {@link FrameProcessingTask} to be executed after any pending tasks have + * Submits the given {@link FrameProcessingTask} to be executed after all pending tasks have * completed. */ public void submit(FrameProcessingTask task) { @@ -61,7 +67,7 @@ import java.util.concurrent.atomic.AtomicBoolean; return; } try { - futures.add(submitTask(task)); + futures.add(wrapTaskAndSubmitToExecutorService(task)); } catch (RejectedExecutionException e) { if (!shouldCancelTasks.getAndSet(true)) { listener.onFrameProcessingError(new FrameProcessingException(e)); @@ -69,6 +75,24 @@ import java.util.concurrent.atomic.AtomicBoolean; } } + /** + * Submits the given {@link FrameProcessingTask} to be executed after the currently running task + * and all previously submitted high-priority tasks have completed. + * + *

Tasks that were previously {@linkplain #submit(FrameProcessingTask) submitted} without + * high-priority and have not started executing will be executed after this task is complete. + */ + public void submitWithHighPriority(FrameProcessingTask task) { + if (shouldCancelTasks.get()) { + return; + } + highPriorityTasks.add(task); + // If the ExecutorService has non-started tasks, the first of these non-started tasks will run + // the task passed to this method. Just in case there are no non-started tasks, submit another + // task to run high-priority tasks. + submit(() -> {}); + } + /** * Cancels remaining tasks, runs the given release task, and shuts down the background thread. * @@ -83,7 +107,7 @@ import java.util.concurrent.atomic.AtomicBoolean; while (!futures.isEmpty()) { futures.remove().cancel(/* mayInterruptIfRunning= */ false); } - Future releaseFuture = submitTask(releaseTask); + Future releaseFuture = wrapTaskAndSubmitToExecutorService(releaseTask); singleThreadExecutorService.shutdown(); try { if (!singleThreadExecutorService.awaitTermination(releaseWaitTimeMs, MILLISECONDS)) { @@ -95,11 +119,14 @@ import java.util.concurrent.atomic.AtomicBoolean; } } - private Future submitTask(FrameProcessingTask glTask) { + private Future wrapTaskAndSubmitToExecutorService(FrameProcessingTask defaultPriorityTask) { return singleThreadExecutorService.submit( () -> { try { - glTask.run(); + while (!highPriorityTasks.isEmpty()) { + highPriorityTasks.remove().run(); + } + defaultPriorityTask.run(); removeFinishedFutures(); } catch (FrameProcessingException | GlUtil.GlException | RuntimeException e) { listener.onFrameProcessingError(FrameProcessingException.from(e));